PythonNotesForProfessionals (759 826)
PythonNotesForProfessionals (759 826)
Pyglet is a Python module used for visuals and sound. It has no dependencies on other modules. See [pyglet.org][1]
for the official information. [1]: http://pyglet.org
Python 2:
Python 3:
win = pyglet.window.Window()
@win.event()
def on_draw():
#OpenGL goes here. Use OpenGL as normal.
pyglet.app.run()
@win.event
def on_draw():
glBegin(GL_POINTS)
glVertex2f(x, y) #x is desired distance from left side of window, y is desired distance from
bottom of window
#make as many vertexes as you want
glEnd
Windows environment
import winsound
winsound.PlaySound("path_to_wav_file.wav", winsound.SND_FILENAME)
wave
Support mono/stereo
Doesn't support compression/decompression
import wave
with wave.open("path_to_wav_file.wav", "rb") as wav_file: # Open WAV file in read-only mode.
# Get basic information.
n_channels = wav_file.getnchannels() # Number of channels. (1=Mono, 2=Stereo).
sample_width = wav_file.getsampwidth() # Sample width in bytes.
framerate = wav_file.getframerate() # Frame rate.
n_frames = wav_file.getnframes() # Number of frames.
comp_type = wav_file.getcomptype() # Compression type (only supports "NONE").
comp_name = wav_file.getcompname() # Compression name.
ok = check_call(['ffmpeg','-i','input.mp3','output.wav'])
if ok:
with open('output.wav', 'rb') as f:
wav_file = f.read()
note:
http://superuser.com/questions/507386/why-would-i-choose-libav-over-ffmpeg-or-is-there-even-a-difference
What are the differences and similarities between ffmpeg, libav, and avconv?
import winsound
import pyaudio
import wave
import time
import sys
if len(sys.argv) < 2:
print("Plays a wave file.\n\nUsage: %s filename.wav" % sys.argv[0])
sys.exit(-1)
wf = wave.open(sys.argv[1], 'rb')
In callback mode, PyAudio will call a specified callback function (2) whenever it needs new audio data (to play)
and/or when there is new (recorded) audio data available. Note that PyAudio calls the callback function in a
separate thread. The function has the following signature callback(<input_data>, <frame_count>,
<time_info>, <status_flag>) and must return a tuple containing frame_count frames of audio data and a flag
Start processing the audio stream using pyaudio.Stream.start_stream() (4), which will call the callback function
repeatedly until that function returns pyaudio.paComplete.
To keep the stream active, the main thread must not terminate, e.g., by sleeping (5).
import pyaudio
import wave
import sys
CHUNK = 1024
if len(sys.argv) < 2:
print("Plays a wave file.\n\nUsage: %s filename.wav" % sys.argv[0])
sys.exit(-1)
wf = wave.open(sys.argv[1], 'rb')
# read data
data = wf.readframes(CHUNK)
To use PyAudio, first instantiate PyAudio using pyaudio.PyAudio() (1), which sets up the portaudio system.
To record or play audio, open a stream on the desired device with the desired audio parameters using
pyaudio.PyAudio.open() (2). This sets up a pyaudio.Stream to play or record audio.
Play audio by writing audio data to the stream using pyaudio.Stream.write(), or read audio data from the stream
using pyaudio.Stream.read(). (3)
Note that in “blocking mode”, each pyaudio.Stream.write() or pyaudio.Stream.read() blocks until all the
given/requested frames have been played/recorded. Alternatively, to generate audio data on the fly or immediately
process recorded audio data, use the “callback mode”(refer the example on call back mode)
import shelve
s = shelve.open('test_shelf.db')
try:
s['key1'] = { 'int': 10, 'float':9.5, 'string':'Sample data' }
finally:
s.close()
To access the data again, open the shelf and use it like a dictionary:
import shelve
s = shelve.open('test_shelf.db')
try:
existing = s['key1']
finally:
s.close()
print existing
$ python shelve_create.py
$ python shelve_existing.py
The dbm module does not support multiple applications writing to the same database at the same time. If you
know your client will not be modifying the shelf, you can tell shelve to open the database read-only.
import shelve
s = shelve.open('test_shelf.db', flag='r')
try:
existing = s['key1']
finally:
s.close()
print existing
If your program tries to modify the database while it is opened read-only, an access error exception is generated.
The exception type depends on the database module selected by anydbm when the database was created.
import shelve
database = shelve.open(filename.suffix)
object = Object()
database['key'] = object
d.close() # close it
import shelve
s = shelve.open('test_shelf.db')
try:
print s['key1']
s['key1']['new_value'] = 'this was not here before'
finally:
s.close()
s = shelve.open('test_shelf.db', writeback=True)
try:
In this example, the dictionary at ‘key1’ is not stored again, so when the shelf is re-opened, the changes have not
been preserved.
$ python shelve_create.py
$ python shelve_withoutwriteback.py
To automatically catch changes to volatile objects stored in the shelf, open the shelf with writeback enabled. The
writeback flag causes the shelf to remember all of the objects retrieved from the database using an in-memory
cache. Each cache object is also written back to the database when the shelf is closed.
import shelve
s = shelve.open('test_shelf.db', writeback=True)
try:
print s['key1']
s['key1']['new_value'] = 'this was not here before'
print s['key1']
finally:
s.close()
s = shelve.open('test_shelf.db', writeback=True)
try:
print s['key1']
finally:
s.close()
Although it reduces the chance of programmer error, and can make object persistence more transparent, using
writeback mode may not be desirable in every situation. The cache consumes extra memory while the shelf is open,
and pausing to write every cached object back to the database when it is closed can take extra time. Since there is
no way to tell if the cached objects have been modified, they are all written back. If your application reads data
more than it writes, writeback will add more overhead than you might want.
$ python shelve_create.py
$ python shelve_writeback.py
1. Vcc
2. Gnd
3. Data (One wire protocol)
1. Vcc should be connected to any of the 5v or 3.3v pins of Raspberry pi (PIN : 01, 02, 04, 17).
2. Gnd should be connected to any of the Gnd pins of Raspberry pi (PIN : 06, 09, 14, 20, 25).
nano /boot/config.txt
6. Now add the this line dtoverlay=w1–gpio to the end of the file.
11. Now you will found out a virtual directory created of your temperature sensor starting from 28-********.
13. Now there is a file name w1-slave, This file contains the temperature and other information like CRC. cat
w1-slave.
import glob
import time
RATE = 30
sensor_dirs = glob.glob("/sys/bus/w1/devices/28*")
if len(sensor_dirs) != 0:
while True:
time.sleep(RATE)
for directories in sensor_dirs:
temperature_file = open(directories + "/w1_slave")
# Reading the files
text = temperature_file.read()
temperature_file.close()
# Split the text with new lines (\n) and select the second line.
second_line = text.split("\n")[1]
# Split the line into words, and select the 10th word
temperature_data = second_line.split(" ")[9]
# We will read after ignoring first two character.
temperature = float(temperature_data[2:])
# Now normalise the temperature by dividing 1000.
temperature = temperature / 1000
print 'Address : '+str(directories.split('/')[-1])+', Temperature : '+str(temperature)
Above python module will print the temperature vs address for infinite time. RATE parameter is defined to change
or adjust the frequency of temperature query from the sensor.
1. [https://www.element14.com/community/servlet/JiveServlet/previewBody/73950-102-11-339300/pi3_gpio.pn
Watch Today →
Chapter 185: kivy - Cross-platform Python
Framework for NUI Development
NUI : A natural user interface (NUI) is a system for human-computer interaction that the user operates through
intuitive actions related to natural, everyday human behavior.
Kivy is a Python library for development of multi-touch enabled media rich applications which can be installed on
different devices. Multi-touch refers to the ability of a touch-sensing surface (usually a touch screen or a trackpad)
to detect or sense input from two or more points of contact simultaneously.
class Test(App):
def build(self):
return Label(text='Hello world')
if __name__ == '__main__':
Test().run()
Explanation
The above statement will import the parent class app. This will be present in your installation directory
your_installtion_directory/kivy/app.py
The above statement will import the ux element Label. All the ux element are present in your installation directory
your_installation_directory/kivy/uix/.
class Test(App):
The above statement is for to create your app and class name will be your app name. This class is inherited the
parent app class.
def build(self):
The above statement override the build method of app class. Which will return the widget that needs to be shown
when you will start the app.
The above statement is the body of the build method. It is returning the Label with its text Hello world.
The above statement is the entry point from where python interpreter start executing your app.
Test().run()
The above statement Initialise your Test class by creating its instance. And invoke the app class function run().
We assume that a customer can have n orders, an order can have m items, and items can be ordered more
multiple times
orders_df = pd.DataFrame()
orders_df['customer_id'] = [1,1,1,1,1,2,2,3,3,3,3,3]
orders_df['order_id'] = [1,1,1,2,2,3,3,4,5,6,6,6]
orders_df['item'] = ['apples', 'chocolate', 'chocolate', 'coffee', 'coffee', 'apples',
'bananas', 'coffee', 'milkshake', 'chocolate', 'strawberry', 'strawberry']
.
.
Now, we will use pandas transform function to count the number of orders per customer
# First, we define the function that will be applied per customer_id
count_number_of_orders = lambda x: len(x.unique())
# And now, we can transform each group using the logic defined above
orders_df['number_of_orders_per_cient'] = ( # Put the results into a new column that
is called 'number_of_orders_per_cient'
orders_df # Take the original dataframe
.groupby(['customer_id'])['order_id'] # Create a separate group for each
customer_id & select the order_id
.transform(count_number_of_orders)) # Apply the function to each group
separately
In the previous example, we had one result per client. However, functions returning different values for the group
can also be applied.
# Let's try to see if the items were ordered more than once in each orders
In Python this evaluates to True, but in JavaScript to false. This is because in Python in checks if a value is contained
in a list, so 2 is in [2, 3] as its first element. In JavaScript in is used with objects and checks if an object contains the
property with the name expressed by the value. So JavaScript considers [2, 3] as an object or a key-value map like
this:
{'0': 2, '1': 3}
and checks if it has a property or a key '2' in it. Integer 2 is silently converted to string '2'.
namespace python_csharp
{
class Program
{
static void Main(string[] args)
{
// full path to .py file
string pyScriptPath = "...../sum.py";
// convert input arguments to JSON string
BsonDocument argsBson = BsonDocument.Parse("{ 'x' : '1', 'y' : '2' }");
try
{
// write input arguments to .txt file
using (StreamWriter sw = new StreamWriter(argsFile))
>>> c_int * 16
<class '__main__.c_long_Array_16'>
This is not an actual array, but it's pretty darn close! We created a class that denotes an array of 16 ints.
Now arr is an actual array that contains the numbers from 0 to 15.
>>> arr[5]
5
>>> arr[5] = 20
>>> arr[5]
20
And just like any other ctypes object, it also has a size and a location:
>>> sizeof(arr)
64 # sizeof(c_int) * 16
>>> hex(addressof(arr))
'0xc000l0ff'
Now, that function takes two arguments and returns a result of the same type. For the sake of the example, let's
assume that type is an int.
Like we did on the array example, we can define an object that denotes that prototype:
That prototype denotes a function that returns an c_int (the first argument), and accepts two c_int arguments
(the other arguments).
Function prototypes have on more usage: They can wrap ctypes function (like libc.ntohl) and verify that the
correct arguments are used when invoking the function.
>>> ntohl(0x6C)
1811939328
>>> hex(_)
'0x6c000000'
The first possible error is failing to load the library. In that case an OSError is usually raised.
This is either because the file doesn't exists (or can't be found by the OS):
>>> cdll.LoadLibrary("foobar.so")
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
The second reason is that the file is found, but is not of the correct format.
>>> cdll.LoadLibrary("libc.so")
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/python3.5/ctypes/__init__.py", line 425, in LoadLibrary
return self._dlltype(name)
File "/usr/lib/python3.5/ctypes/__init__.py", line 347, in __init__
self._handle = _dlopen(self._name, mode)
OSError: /usr/lib/i386-linux-gnu/libc.so: invalid ELF header
In this case, the file is a script file and not a .so file. This might also happen when trying to open a .dll file on a
Linux machine or a 64bit file on a 32bit python interpreter. As you can see, in this case the error is a bit more vague,
and requires some digging around.
Assuming we successfully loaded the .so file, we then need to access our function like we've done on the first
example.
>>> libc.foo
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/python3.5/ctypes/__init__.py", line 360, in __getattr__
func = self.__getitem__(name)
File "/usr/lib/python3.5/ctypes/__init__.py", line 365, in __getitem__
func = self._FuncPtr((name_or_ordinal, self))
AttributeError: /lib/i386-linux-gnu/libc.so.6: undefined symbol: foo
>>> obj.value
12
>>> obj.value = 13
>>> obj
c_long(13)
>>> sizeof(obj)
4
>>> hex(addressof(obj))
'0xdeadbeef'
For more details about the function, read the man page. I urge you to read it before going on.
Notice that x, and y are POINTER(c_int), so we need to dereference them and take their values in order to actually
compare the value stored in the memory.
ptr is the returned void pointer. If key wasn't found in arr, the value would be None, but in this case we got a valid
value.
Also, we can see that ptr points to the correct value inside arr:
Watch Today →
✔ Support Vector Machines
Chapter 190: Writing extensions
Section 190.1: Hello World with C Extension
The following C source file (which we will call hello.c for demonstration purposes) produces an extension module
named hello that contains a single function greet():
#include <Python.h>
#include <stdio.h>
#ifdef IS_PY3K
static struct PyModuleDef hellomodule = {
PyModuleDef_HEAD_INIT, "hello", NULL, -1, HelloMethods
};
PyMODINIT_FUNC PyInit_hello(void)
{
return PyModule_Create(&hellomodule);
}
#else
PyMODINIT_FUNC inithello(void)
{
(void) Py_InitModule("hello", HelloMethods);
}
#endif
To compile the file with the gcc compiler, run the following command in your favourite terminal:
To execute the greet() function that we wrote earlier, create a file in the same directory, and call it hello.py
#include <boost/python/module.hpp>
#include <boost/python/list.hpp>
#include <boost/python/class.hpp>
#include <boost/python/def.hpp>
// hello class that can return a list of count hello world strings.
class hello_class
{
public:
private:
std::string _message;
};
To compile this into a python module you will need the python headers and the boost libraries. This example was
made on Ubuntu 12.04 using python 3.4 and gcc. Boost is supported on many platforms. In case of Ubuntu the
needed packages was installed using:
Compiling the source file into a .so-file that can later be imported as a module provided it is on the python path:
import hello
print(hello.get_hello())
h = hello.Hello("World hello!")
print(h.as_list(3))
Hello world!
['World hello!', 'World hello!', 'World hello!']
You can convert the file to an integer file descriptor using PyObject_AsFileDescriptor function:
PyObject *fobj;
int fd = PyObject_AsFileDescriptor(fobj);
if (fd < 0){
return NULL;
}
To convert an integer file descriptor back into a python object, use PyFile_FromFd.
If you completed all the above, you should now be able to use the PLY module. You can test it out by opening a
python interpreter and typing import ply.lex.
Note: Do not use pip to install PLY, it will install a broken distribution on your machine.
tokens = (
'PLUS',
'MINUS',
'TIMES',
'DIV',
'LPAREN',
'RPAREN',
'NUMBER',
)
t_PLUS = r'\+'
t_MINUS = r'-'
t_TIMES = r'\*'
t_DIV = r'/'
t_LPAREN = r'\('
t_RPAREN = r'\)'
def t_NUMBER( t ) :
r'[0-9]+'
t.value = int( t.value )
return t
def t_newline( t ):
r'\n+'
t.lexer.lineno += len( t.value )
def t_error( t ):
lexer = lex.lex()
precedence = (
( 'left', 'PLUS', 'MINUS' ),
( 'left', 'TIMES', 'DIV' ),
( 'nonassoc', 'UMINUS' )
)
def p_add( p ) :
'expr : expr PLUS expr'
p[0] = p[1] + p[3]
def p_sub( p ) :
'expr : expr MINUS expr'
p[0] = p[1] - p[3]
def p_expr2uminus( p ) :
'expr : MINUS expr %prec UMINUS'
p[0] = - p[2]
def p_mult_div( p ) :
'''expr : expr TIMES expr
| expr DIV expr'''
if p[2] == '*' :
p[0] = p[1] * p[3]
else :
if p[3] == 0 :
print("Can't divide by 0")
raise ZeroDivisionError('integer division by 0')
p[0] = p[1] / p[3]
def p_expr2NUM( p ) :
'expr : NUMBER'
p[0] = p[1]
def p_parens( p ) :
'expr : LPAREN expr RPAREN'
p[0] = p[2]
def p_error( p ):
print("Syntax error in input!")
parser = yacc.yacc()
Output:
-8
This section provides a simple example of how to tokenize user input, and then breaks it down line by line.
# Tokenize
while True:
tok = lexer.token()
if not tok:
break # No more input
print(tok)
Breakdown
2. All lexers must provide a list called tokens that defines all of the possible token names that can be produced
by the lexer. This list is always required.
tokens = [
'NUMBER',
'PLUS',
'MINUS',
'TIMES',
'DIVIDE',
'LPAREN',
'RPAREN',
]
tokens could also be a tuple of strings (rather than a string), where each string denotes a token as before.
3. The regex rule for each string may be defined either as a string or as a function. In either case, the variable
name should be prefixed by t_ to denote it is a rule for matching tokens.
For simple tokens, the regular expression can be specified as strings: t_PLUS = r'\+'
If some kind of action needs to be performed, a token rule can be specified as a function.
def t_NUMBER(t):
r'\d+'
t.value = int(t.value)
return t
Note, the rule is specified as a doc string within the function. The function accepts one argument which
is an instance of LexToken, performs some action and then returns back the argument.
If you want to use an external string as the regex rule for the function instead of specifying a doc
string, consider the following example:
An instance of LexToken object (let's call this object t) has the following attributes:
1. t.type which is the token type (as a string) (eg: 'NUMBER', 'PLUS', etc). By default, t.type is set
to the name following the t_ prefix.
2. t.value which is the lexeme (the actual text matched)
3. t.lineno which is the current line number (this is not automatically updated, as the lexer knows
nothing of line numbers). Update lineno using a function called t_newline.
def t_newline(t):
r'\n+'
t.lexer.lineno += len(t.value)
def t_COMMENT(t):
r'\#.*'
pass
# No return value. Token discarded
t_ignore_COMMENT = r'\#.*'
This is of course invalid if you're carrying out some action when you see a comment. In which case, use
a function to define the regex rule.
If you haven't defined a token for some characters but still want to ignore it, use t_ignore =
"<characters to ignore>" (these prefixes are necessary):
t_ignore_COMMENT = r'\#.*'
t_ignore = ' \t' # ignores spaces and tabs
When building the master regex, lex will add the regexes specified in the file as follows:
1. Tokens defined by functions are added in the same order as they appear in the file.
2. Tokens defined by strings are added in decreasing order of the string length of the string
defining the regex for that token.
If you are matching == and = in the same file, take advantage of these rules.
Literals are tokens that are returned as they are. Both t.type and t.value will be set to the character
itself. Define a list of literals as such:
or,
literals = "+-*/"
It is possible to write token functions that perform additional actions when literals are matched.
However, you'll need to set the token type appropriately. For example:
def t_lbrace(t):
r'\{'
t.type = '{' # Set token type to the expected literal (ABSOLUTE MUST if this is a
literal)
return t
4. Final preparations:
You can also put everything inside a class and call use instance of the class to define the lexer. Eg:
m = MyLexer()
m.build() # Build the lexer
m.test("3 + 4") #
To get the tokens, use lexer.token() which returns tokens matched. You can iterate over lexer in a loop as
in:
for i in lexer:
print(i)
# Yacc example
def p_expression_minus(p):
'expression : expression MINUS term'
p[0] = p[1] - p[3]
def p_expression_term(p):
'expression : term'
p[0] = p[1]
def p_term_times(p):
'term : term TIMES factor'
p[0] = p[1] * p[3]
def p_term_div(p):
'term : term DIVIDE factor'
p[0] = p[1] / p[3]
def p_term_factor(p):
'term : factor'
p[0] = p[1]
def p_factor_num(p):
'factor : NUMBER'
p[0] = p[1]
def p_factor_expr(p):
'factor : LPAREN expression RPAREN'
p[0] = p[2]
while True:
try:
s = raw_input('calc > ')
except EOFError:
break
if not s: continue
result = parser.parse(s)
print(result)
Breakdown
Each grammar rule is defined by a function where the docstring to that function contains the appropriate
context-free grammar specification. The statements that make up the function body implement the semantic
actions of the rule. Each function accepts a single argument p that is a sequence containing the values of
each grammar symbol in the corresponding rule. The values of p[i] are mapped to grammar symbols as
shown here:
def p_expression_plus(p):
'expression : expression PLUS term'
# ^ ^ ^ ^
# p[0] p[1] p[2] p[3]
For tokens, the "value" of the corresponding p[i] is the same as the p.value attribute assigned in the lexer
module. So, PLUS will have the value +.
For non-terminals, the value is determined by whatever is placed in p[0]. If nothing is placed, the value is
None. Also, p[-1] is not the same as p[3], since p is not a simple list (p[-1] can specify embedded actions
(not discussed here)).
Note that the function can have any name, as long as it is preceded by p_.
The p_error(p) rule is defined to catch syntax errors (same as yyerror in yacc/bison).
Multiple grammar rules can be combined into a single function, which is a good idea if productions have a
similar structure.
def p_binary_operators(p):
'''expression : expression PLUS term
| expression MINUS term
term : term TIMES factor
| term DIVIDE factor'''
if p[2] == '+':
p[0] = p[1] + p[3]
elif p[2] == '-':
p[0] = p[1] - p[3]
elif p[2] == '*':
p[0] = p[1] * p[3]
elif p[2] == '/':
p[0] = p[1] / p[3]
def p_binary_operators(p):
'''expression : expression '+' term
| expression '-' term
term : term '*' factor
| term '/' factor'''
if p[2] == '+':
p[0] = p[1] + p[3]
elif p[2] == '-':
p[0] = p[1] - p[3]
elif p[2] == '*':
p[0] = p[1] * p[3]
elif p[2] == '/':
p[0] = p[1] / p[3]
To explicitly set the start symbol, use start = 'foo', where foo is some non-terminal.
Setting precedence and associativity can be done using the precedence variable.
precedence = (
('nonassoc', 'LESSTHAN', 'GREATERTHAN'), # Nonassociative operators
Tokens are ordered from lowest to highest precedence. nonassoc means that those tokens do not associate.
This means that something like a < b < c is illegal whereas a < b is still legal.
parser.out is a debugging file that is created when the yacc program is executed for the first time. Whenever
a shift/reduce conflict occurs, the parser always shifts.
import unittest
class SomeTest(unittest.TestCase):
def setUp(self):
super(SomeTest, self).setUp()
self.mock_data = [1,2,3,4,5]
def test(self):
self.assertEqual(len(self.mock_data), 5)
def tearDown(self):
super(SomeTest, self).tearDown()
self.mock_data = []
if __name__ == '__main__':
unittest.main()
Note that in python2.7+, there is also the addCleanup method that registers functions to be called after the test is
run. In contrast to tearDown which only gets called if setUp succeeds, functions registered via addCleanup will be
called even in the event of an unhandled exception in setUp. As a concrete example, this method can frequently be
seen removing various mocks that were registered while the test was running:
import unittest
import some_module
class SomeOtherTest(unittest.TestCase):
def setUp(self):
super(SomeOtherTest, self).setUp()
# When the test finishes running, put the original method back.
self.addCleanup(my_patch.stop)
Another benefit of registering cleanups this way is that it allows the programmer to put the cleanup code next to
the setup code and it protects you in the event that a subclasser forgets to call super in tearDown.
class MyTestCase(unittest.TestCase):
def test_using_context_manager(self):
with self.assertRaises(ZeroDivisionError):
x = division_function(1, 0)
This will run the code inside of the context manager and, if it succeeds, it will fail the test because the exception was
not raised. If the code raises an exception of the correct type, the test will continue.
You can also get the content of the raised exception if you want to execute additional assertions against it.
class MyTestCase(unittest.TestCase):
def test_using_context_manager(self):
with self.assertRaises(ZeroDivisionError) as ex:
x = division_function(1, 0)
class MyTestCase(unittest.TestCase):
def test_passing_function(self):
self.assertRaises(ZeroDivisionError, division_function, 1, 0)
The exception to check for must be the first parameter, and a callable function must be passed as the second
parameter. Any other parameters specified will be passed directly to the function that is being called, allowing you
to specify the parameters that trigger the exception.
class WrongInputException(Exception):
pass
This exception is raised when wrong input is given, in the following context where we always expect a number as
text input.
To check whether an exception has been raised, we use assertRaises to check for that exception. assertRaises
can be used in two ways:
1. Using the regular function call. The first argument takes the exception type, second a callable (usually a
function) and the rest of arguments are passed to this callable.
2. Using a with clause, giving only the exception type to the function. This has as advantage that more code can
be executed, but should be used with care since multiple functions can use the same exception which can be
problematic. An example: with self.assertRaises(WrongInputException): convert2number("not a number")
import unittest
class ExceptionTestCase(unittest.TestCase):
def test_wrong_input_string(self):
self.assertRaises(WrongInputException, convert2number, "not a number")
def test_correct_input(self):
try:
result = convert2number("56")
self.assertIsInstance(result, int)
except WrongInputException:
self.fail()
There also may be a need to check for an exception which should not have been thrown. However, a test will
automatically fail when an exception is thrown and thus may not be necessary at all. Just to show the options, the
second test method shows a case on how one can check for an exception not to be thrown. Basically, this is
catching the exception and then failing the test using the fail method.
Perhaps the simplest assertion is assertTrue, which can be used like this:
import unittest
class SimplisticTest(unittest.TestCase):
def test_basic(self):
self.assertTrue(1 + 1 == 2)
This will run fine, but replacing the line above with
self.assertTrue(1 + 1 == 3)
will fail.
self.assertEqual(1 + 1, 3)
======================================================================
----------------------------------------------------------------------
self.assertTrue(1 + 1 == 3)
======================================================================
----------------------------------------------------------------------
self.assertEqual(1 + 1, 3)
AssertionError: 2 != 3
which is more informative (it actually evaluated the result of the left hand side).
You can find the list of assertions in the standard documentation. In general, it is a good idea to choose the
assertion that is the most specifically fitting the condition. Thus, as shown above, for asserting that 1 + 1 == 2 it is
better to use assertEqual than assertTrue. Similarly, for asserting that a is None, it is better to use assertIsNone
than assertEqual.
Note also that the assertions have negative forms. Thus assertEqual has its negative counterpart assertNotEqual,
and assertIsNone has its negative counterpart assertIsNotNone. Once again, using the negative counterparts
when appropriate, will lead to clearer error messages.
def docker_exec_something(something_file_string):
fl = Popen(["docker", "exec", "-i", "something_cont", "something"], stdin=PIPE, stdout=PIPE,
stderr=PIPE)
fl.stdin.write(something_file_string)
fl.stdin.close()
err = fl.stderr.read()
fl.stderr.close()
if err:
print(err)
exit()
result = fl.stdout.read()
print(result)
import os
from tempfile import NamedTemporaryFile
import pytest
from subprocess import Popen, PIPE
class MockBytes():
'''Used to collect bytes
'''
all_read = []
all_write = []
all_close = []
def get_all_mock_bytes(self):
return self.all_read, self.all_write, self.all_close
@pytest.fixture
def all_popens(monkeypatch):
'''This fixture overrides / mocks the builtin Popen
and replaces stdin, stdout, stderr with a MockBytes object
class MockPopen(object):
def __init__(self, args, stdout=None, stdin=None, stderr=None):
all_popens.append(self)
self.args = args
self.byte_collection = MockBytes()
self.stdin = self.byte_collection
self.stdout = self.byte_collection
self.stderr = self.byte_collection
pass
monkeypatch.setattr(helpers, 'Popen', MockPopen)
return all_popens
Example tests, must start with the prefix test_ in the test_docker.py file:
def test_docker_install():
p = Popen(['which', 'docker'], stdout=PIPE, stderr=PIPE)
result = p.stdout.read()
assert 'bin/docker' in result
def test_copy_file_to_docker(all_popens):
result = copy_file_to_docker('asdf', 'asdf')
collected_popen = all_popens.pop()
mock_read, mock_write, mock_close = collected_popen.byte_collection.get_all_mock_bytes()
assert mock_read
assert result.args == ['docker', 'cp', 'asdf', 'something_cont:asdf']
def test_docker_exec_something(all_popens):
docker_exec_something(something_file_string)
collected_popen = all_popens.pop()
mock_read, mock_write, mock_close = collected_popen.byte_collection.get_all_mock_bytes()
assert len(mock_read) == 3
something_template_stdin = mock_write[0][1][0]
these = [os.environ['USER'], os.environ['password_prod'], 'table_name_here', 'test_vdm',
'col_a', 'col_b', '/tmp/test.tsv']
assert all([x in something_template_stdin for x in these])
return multiples
We can test multiples_of alone by mocking out multiply. The below example uses the Python standard library
unittest, but this can be used with other testing frameworks as well, like pytest or nose:
class TestCustomMath(unittest.TestCase):
def test_multiples_of(self):
multiples = multiples_of(3, num_multiples=1)
custom_math.multiply.assert_called_with(3, 1)
# projectroot/module/code.py
def add(a, b):
return a + b
We create a test file in projectroot/tests/test_code.py. The file must begin with test_ to be recognized as a
testing file.
# projectroot/tests/test_code.py
from module import code
def test_add():
assert code.add(1, 2) == 3
tests/test_code.py .
# projectroot/module/stuff.py
class Stuff(object):
def prep(self):
self.foo = 1
self.bar = 2
# projectroot/tests/test_stuff.py
import pytest
from module import stuff
def test_foo_updates():
my_stuff = stuff.Stuff()
my_stuff.prep()
assert 1 == my_stuff.foo
my_stuff.foo = 30000
assert my_stuff.foo == 30000
def test_bar_updates():
my_stuff = stuff.Stuff()
my_stuff.prep()
assert 2 == my_stuff.bar
my_stuff.bar = 42
assert 42 == my_stuff.bar
These are pretty simple examples, but if our Stuff object needed a lot more setup, it would get unwieldy. We see
that there is some duplicated code between our test cases, so let's refactor that into a separate function first.
# projectroot/tests/test_stuff.py
import pytest
from module import stuff
def get_prepped_stuff():
my_stuff = stuff.Stuff()
my_stuff.prep()
return my_stuff
def test_foo_updates():
my_stuff = get_prepped_stuff()
assert 1 == my_stuff.foo
my_stuff.foo = 30000
assert my_stuff.foo == 30000
def test_bar_updates():
my_stuff = get_prepped_stuff()
assert 2 == my_stuff.bar
my_stuff.bar = 42
assert 42 == my_stuff.bar
This looks better but we still have the my_stuff = get_prepped_stuff() call cluttering up our test functions.
First we change get_prepped_stuff to a fixture called prepped_stuff. You want to name your fixtures with nouns
rather than verbs because of how the fixtures will end up being used in the test functions themselves later. The
@pytest.fixture indicates that this specific function should be handled as a fixture rather than a regular function.
@pytest.fixture
def prepped_stuff():
my_stuff = stuff.Stuff()
my_stuff.prep()
return my_stuff
Now we should update the test functions so that they use the fixture. This is done by adding a parameter to their
definition that exactly matches the fixture name. When py.test executes, it will run the fixture before running the
test, then pass the return value of the fixture into the test function through that parameter. (Note that fixtures
don't need to return a value; they can do other setup things instead, like calling an external resource, arranging
things on the filesystem, putting values in a database, whatever the tests need for setup)
def test_foo_updates(prepped_stuff):
my_stuff = prepped_stuff
assert 1 == my_stuff.foo
my_stuff.foo = 30000
assert my_stuff.foo == 30000
def test_bar_updates(prepped_stuff):
my_stuff = prepped_stuff
assert 2 == my_stuff.bar
my_stuff.bar = 42
assert 42 == my_stuff.bar
Now you can see why we named it with a noun. but the my_stuff = prepped_stuff line is pretty much useless, so
let's just use prepped_stuff directly instead.
def test_foo_updates(prepped_stuff):
assert 1 == prepped_stuff.foo
prepped_stuff.foo = 30000
assert prepped_stuff.foo == 30000
def test_bar_updates(prepped_stuff):
assert 2 == prepped_stuff.bar
prepped_stuff.bar = 42
assert 42 == prepped_stuff.bar
Now we're using fixtures! We can go further by changing the scope of the fixture (so it only runs once per test
module or test suite execution session instead of once per test function), building fixtures that use other fixtures,
parametrizing the fixture (so that the fixture and all tests using that fixture are run multiple times, once for each
parameter given to the fixture), fixtures that read values from the module that calls them... as mentioned earlier,
fixtures have a lot more power and flexibility than a normal setup function.
Let's say our code has grown and our Stuff object now needs special clean up.
# projectroot/module/stuff.py
def finish(self):
self.foo = 0
self.bar = 0
We could add some code to call the clean up at the bottom of every test function, but fixtures provide a better way
to do this. If you add a function to the fixture and register it as a finalizer, the code in the finalizer function will get
called after the test using the fixture is done. If the scope of the fixture is larger than a single function (like module
or session), the finalizer will be executed after all the tests in scope are completed, so after the module is done
running or at the end of the entire test running session.
@pytest.fixture
def prepped_stuff(request): # we need to pass in the request to use finalizers
my_stuff = stuff.Stuff()
my_stuff.prep()
def fin(): # finalizer function
# do all the cleanup here
my_stuff.finish()
request.addfinalizer(fin) # register fin() as a finalizer
# you can do more setup here if you really want to
return my_stuff
Using the finalizer function inside a function can be a bit hard to understand at first glance, especially when you
have more complicated fixtures. You can instead use a yield fixture to do the same thing with a more human
readable execution flow. The only real difference is that instead of using return we use a yield at the part of the
fixture where the setup is done and control should go to a test function, then add all the cleanup code after the
yield. We also decorate it as a yield_fixture so that py.test knows how to handle it.
@pytest.yield_fixture
def prepped_stuff(): # it doesn't need request now!
# do setup
my_stuff = stuff.Stuff()
my_stuff.prep()
# setup is done, pass control to the test functions
yield my_stuff
# do cleanup
my_stuff.finish()
For more information, see the official py.test fixture documentation and the official yield fixture documentation
# projectroot/tests/test_code.py
from module import code
def test_add__failing():
assert code.add(10, 11) == 33
$ py.test
================================================== test session starts
===================================================
platform darwin -- Python 2.7.10, pytest-2.9.2, py-1.4.31, pluggy-0.3.1
rootdir: /projectroot, inifile:
collected 1 items
tests/test_code.py F
======================================================== FAILURES
========================================================
___________________________________________________ test_add__failing
____________________________________________________
def test_add__failing():
> assert code.add(10, 11) == 33
E assert 21 == 33
E + where 21 = <function add at 0x105d4d6e0>(10, 11)
E + where <function add at 0x105d4d6e0> = code.add
tests/test_code.py:5: AssertionError
================================================ 1 failed in 0.01 seconds
================================================
It breaks down your entire script and for each method in your script it tells you:
To sort the returned list of profiled methods by the time taken in the method:
import requests
@profile
def slow_func():
s = requests.session()
html=s.get("https://en.wikipedia.org/").text
sum([pow(ord(x),3.1) for x in list(html)])
for i in range(50):
slow_func()
Page request is almost always slower than any calculation based on the information on the page.
Watch Today →
Chapter 195: Python speed of program
Section 195.1: Deque operations
A deque is a double-ended queue.
class Deque:
def __init__(self):
self.items = []
def isEmpty(self):
return self.items == []
def removeFront(self):
return self.items.pop()
def removeRear(self):
return self.items.pop(0)
def size(self):
return len(self.items)
Append : O(1)
Appendleft : O(1)
Copy : O(n)
Extend : O(k)
Extendleft : O(k)
Pop : O(1)
Popleft : O(1)
Remove : O(n)
Rotate : O(k)
Remember the 80/20 rule: In many fields you can get 80% of the result with 20% of the effort (also called the
Always run "before" and "after" benchmarks: How else will you know that your optimizations actually made a
difference? If your optimized code turns out to be only slightly faster or smaller than the original version, undo your
changes and go back to the original, clear code.
Use the right algorithms and data structures: Don't use an O(n2) bubble sort algorithm to sort a thousand elements
when there's an O(n log n) quicksort available. Similarly, don't store a thousand items in an array that requires an
O(n) search when you could use an O(log n) binary tree, or an O(1) Python hash table.
The following 3 asymptotic notations are mostly used to represent time complexity of algorithms.
1. Θ Notation: The theta notation bounds a functions from above and below, so it defines exact asymptotic
behavior. A simple way to get Theta notation of an expression is to drop low order terms and ignore leading
constants. For example, consider the following expression. 3n3 + 6n2 + 6000 = Θ(n3) Dropping lower order
terms is always fine because there will always be a n0 after which Θ(n3) has higher values than Θn2)
irrespective of the constants involved. For a given function g(n), we denote Θ(g(n)) is following set of
functions. Θ(g(n)) = {f(n): there exist positive constants c1, c2 and n0 such that 0 <= c1g(n) <= f(n) <= c2g(n) for
all n >= n0} The above definition means, if f(n) is theta of g(n), then the value f(n) is always between c1g(n) and
c2g(n) for large values of n (n >= n0). The definition of theta also requires that f(n) must be non-negative for
values of n greater than n0.
2. Big O Notation: The Big O notation defines an upper bound of an algorithm, it bounds a function only from
above. For example, consider the case of Insertion Sort. It takes linear time in best case and quadratic time in
worst case. We can safely say that the time complexity of Insertion sort is O(n^2). Note that O(n^2) also
covers linear time. If we use Θ notation to represent time complexity of Insertion sort, we have to use two
statements for best and worst cases:
The Big O notation is useful when we only have upper bound on time complexity of an algorithm. Many times we
easily find an upper bound by simply looking at the algorithm. O(g(n)) = { f(n): there exist positive constants c and n0
such that 0 <= f(n) <= cg(n) for all n >= n0}
0. Ω Notation: Just as Big O notation provides an asymptotic upper bound on a function, Ω notation provides
an asymptotic lower bound. Ω Notation< can be useful when we have lower bound on time complexity of an
algorithm. As discussed in the previous post, the best case performance of an algorithm is generally not
useful, the Omega notation is the least used notation among all three. For a given function g(n), we denote by
Ω(g(n)) the set of functions. Ω (g(n)) = {f(n): there exist positive constants c and n0 such that 0 <= cg(n) <= f(n)
for all n >= n0}. Let us consider the same Insertion sort example here. The time complexity of Insertion Sort
can be written as Ω(n), but it is not a very useful information about insertion sort, as we are generally
interested in worst case and sometimes in average case.
The notation used when describing the speed of your Python program is called Big-O notation. Let's say you have a
function:
This is a simple function to check if an item is in a list. To describe the complexity of this function, you will say O(n).
This means "Order of n" as the O function is known as the Order function.
O(k) - generally k is the value of the parameter or the number of elements in the parameter
Append : O(1)
Copy : O(n)
Insert : O(n)
Iteration : O(n)
Extend : O(k)
Multiply : O(nk)
x in s : O(n)
x in s : O(1)
Difference s - t : O(len(s))
s.symetric_difference_update(t) : O(len(t))
"We should forget about small efficiencies, say about 97% of the time: premature optimization is the root
of all evil. Yet we should not pass up our opportunities in that critical 3%"
To profile your code you have several tools: cProfile (or the slower profile) from the standard library,
line_profiler and timeit. Each of them serve a different purpose.
cProfile is a deterministic profiler: function call, function return, and exception events are monitored, and precise
timings are made for the intervals between these events (up to 0.001s). The library documentation
([https://docs.python.org/2/library/profile.html][1]) provides us with a simple use case
import cProfile
def f(x):
return "42!"
cProfile.run('f(12)')
This will create outputs looking like the table below, where you can quickly see where your program spends most of
its time and identify the functions to optimize.
kernprof will create an instance of LineProfiler and insert it into the __builtins__ namespace with the name
profile. It has been written to be used as a decorator, so in your script, you decorate the functions you want to
profile with @profile.
@profile
def slow_function(a, b, c):
...
The default behavior of kernprof is to put the results into a binary file script_to_profile.py.lprof . You can tell
kernprof to immediately view the formatted results at the terminal with the [-v/--view] option. Otherwise, you can
view the results later like so:
Finally timeit provides a simple way to test one liners or small expression both from the command line and the
python shell. This module will answer question such as, is it faster to do a list comprehension or use the built-in
list() when transforming a set into a list. Look for the setup keyword or -s option to add setup code.
from a terminal
import hashlib
import os
salt = os.urandom(16)
hash = hashlib.pbkdf2_hmac('sha256', b'password', salt, 100000)
PBKDF2 can work with any digest algorithm, the above example uses SHA256 which is usually recommended. The
random salt should be stored along with the hashed password, you will need it again in order to compare an
entered password to the stored hash. It is essential that each password is hashed with a different salt. As to the
number of rounds, it is recommended to set it as high as possible for your application.
If you want the result in hexadecimal, you can use the binascii module:
import binascii
hexhash = binascii.hexlify(hash)
Note: While PBKDF2 isn't bad, bcrypt and especially scrypt are considered stronger against brute-force attacks.
Neither is part of the Python standard library at the moment.
import hashlib
h = hashlib.new('sha256')
h.update(b'Nobody expects the Spanish Inquisition.')
h.digest()
# ==> b'.\xdf\xda\xdaVR[\x12\x90\xff\x16\xfb\x17D\xcf\xb4\x82\xdd)\x14\xff\xbc\xb6Iy\x0c\x0eX\x9eF-='
Note that you can call update an arbitrary number of times before calling digest which is useful to hash a large file
chunk by chunk. You can also get the digest in hexadecimal format by using hexdigest:
h.hexdigest()
# ==> '2edfdada56525b1290ff16fb1744cfb482dd2914ffbcb649790c0e589e462d3d'
The returned list will vary according to platform and interpreter; make sure you check your algorithm is available.
There are also some algorithms that are guaranteed to be available on all platforms and interpreters, which are
available using hashlib.algorithms_guaranteed:
hashlib.algorithms_guaranteed
# ==> {'sha256', 'sha384', 'sha1', 'sha224', 'md5', 'sha512'}
import hashlib
hasher = hashlib.new('sha256')
with open('myfile', 'r') as f:
contents = f.read()
hasher.update(contents)
print hasher.hexdigest()
import hashlib
SIZE = 65536
hasher = hashlib.new('sha256')
with open('myfile', 'r') as f:
buffer = f.read(SIZE)
while len(buffer) > 0:
hasher.update(buffer)
buffer = f.read(SIZE)
print(hasher.hexdigest())
import errno
try:
with open('privkey.pem', 'r') as f:
key = RSA.importKey(f.read())
except IOError as e:
if e.errno != errno.ENOENT:
raise
# No private key, generate a new one. This can take a few seconds.
key = RSA.generate(4096)
with open('privkey.pem', 'wb') as f:
f.write(key.exportKey('PEM'))
with open('pubkey.pem', 'wb') as f:
f.write(key.publickey().exportKey('PEM'))
hasher = SHA256.new(message)
signer = PKCS1_v1_5.new(key)
signature = signer.sign(hasher)
Verifying the signature works similarly but uses the public key rather than the private key:
Note: The above examples use PKCS#1 v1.5 signing algorithm which is very common. pycrypto also implements the
newer PKCS#1 PSS algorithm, replacing PKCS1_v1_5 by PKCS1_PSS in the examples should work if you want to use
that one. Currently there seems to be little reason to use it however.
The recipient can decrypt the message then if they have the right private key:
import hashlib
import math
import os
The AES algorithm takes three parameters: encryption key, initialization vector (IV) and the actual message to be
encrypted. If you have a randomly generated AES key then you can use that one directly and merely generate a
random initialization vector. A passphrase doesn't have the right size however, nor would it be recommendable to
use it directly given that it isn't truly random and thus has comparably little entropy. Instead, we use the built-in
implementation of the PBKDF2 algorithm to generate a 128 bit initialization vector and 256 bit encryption key from
the password.
Note the random salt which is important to have a different initialization vector and key for each message
encrypted. This ensures in particular that two equal messages won't result in identical encrypted text, but it also
prevents attackers from reusing work spent guessing one passphrase on messages encrypted with another
passphrase. This salt has to be stored along with the encrypted message in order to derive the same initialization
vector and key for decrypting.
salt = encrypted[0:SALT_SIZE]
derived = hashlib.pbkdf2_hmac('sha256', password, salt, 100000,
dklen=IV_SIZE + KEY_SIZE)
iv = derived[0:IV_SIZE]
key = derived[IV_SIZE:]
cleartext = AES.new(key, AES.MODE_CFB, iv).decrypt(encrypted[SALT_SIZE:])
try: res = get_result() res = res[0] log('got result: %r' % res) except: if not res: res = '' print('got exception')
1. The except with no exception type (line 5) will catch even healthy exceptions, including KeyboardInterrupt.
That will prevent the program from exiting in some cases.
2. The except block does not reraise the error, meaning that we won't be able to tell if the exception came from
within get_result or because res was an empty list.
3. Worst of all, if we were worried about result being empty, we've caused something much worse. If
get_result fails, res will stay completely unset, and the reference to res in the except block, will raise
NameError, completely masking the original error.
Always think about the type of exception you're trying to handle. Give the exceptions page a read and get a feel for
what basic exceptions exist.
import traceback try: res = get_result() except Exception: log_exception(traceback.format_exc()) raise try: res = res[0]
except IndexError: res = '' log('got result: %r' % res)
We catch more specific exceptions, reraising where necessary. A few more lines, but infinitely more correct.
For example, take a function which looks like this: it returns an integer if the input value can produce one, else
None:
x = 5
if intensive_f(x) is not None:
print(intensive_f(x) / 2)
else:
print(x, "could not be processed")
print(x)
Whilst this will work, it has the problem of calling intensive_f, which doubles the length of time for the code to
run. A better solution would be to get the return value of the function beforehand.
However, a clearer and possibly more pythonic way is to use exceptions, for example:
x = 5
try:
print(intensive_f(x) / 2)
except TypeError: # The exception raised if None + 1 is attempted
print(x, "could not be processed")
Here no temporary variable is needed. It may often be preferable to use a assert statement, and to catch the
AssertionError instead.
Dictionary keys
A common example of where this may be found is accessing dictionary keys. For example compare:
bird_speeds = get_very_long_dictionary()
print(speed)
with:
bird_speeds = get_very_long_dictionary()
try:
speed = bird_speeds["european swallow"]
except KeyError:
speed = input("What is the air-speed velocity of an unladen swallow?")
print(speed)
The first example has to look through the dictionary twice, and as this is a long dictionary, it may take a long time to
do so each time. The second only requires one search through the dictionary, and thus saves a lot of processor
time.
An alternative to this is to use dict.get(key, default), however many circumstances may require more complex
operations to be done in the case that the key is not present
Watch Today →