This tutorial describes how to convert (almost) any method to run with multiprocessing
Although setting up multiprocessing can seem intimidating, it is actually amazingly easy to do in most cases. We are going to set up an example where we first run without multiprocessing and then we convert the method to run with multiprocessing.
The key to easy multiprocessing is setting up your function in a certain way. We are going to separate the function that does something from the list of inputs to that function (which are going to change for the N jobs that are run).
First, let's set up some imports:
from __future__ import print_function # so we can use print function easily import sys # import sys from libtbx import group_args # group_args function we will use to return items
We have imported the
print_function here so that this method will work in both python 2 and python 3.
Here is our function:
def run_something(value): # simple function return value * 2 # just returns 2 * the input value
And here is our list of inputs to the function that we would like to run in parallel:
iteration_list = [5,7,9] # list of anything
Let's first set up this calculation running one by one through the values in the values in
iteration_list. It is a good idea to start this way even if you know from the start that you want to run with multiprocessing because once you have set it up in the way we have it below it is very easy to convert.
def run_one_by_one(iteration_list): # result_list =  # initialize result list for i in range(len(iteration_list)): # iterate over input values result = run_something(iteration_list[i]) # get result result_list.append(result) # save result return result_list # return the list of results
We can run
run_one_by_one and get a list of results:
result_list = run_one_by_one(iteration_list) # get results one by one print(result_list) # prints list of values [10, 14, 18]
Now let's convert the
run_one_by_one method to run with multiprocessing.
def run_in_parallel(iteration_list, nproc = 4): # from libtbx.easy_mp import simple_parallel # import the simple_parallel code result_list = simple_parallel( # run in parallel iteration_list = iteration_list, # our list of values to run with function = run_something, # the method we are running nproc = nproc ) # how many processors return result_list # return the list of results
Now we can run
run_in_parallel and get a list of results:
result_list = run_in_parallel(iteration_list, nproc = 4) # run in parallel print(result_list) # prints list of values [10, 14, 18]
Wow that was easy. All we did was replace the explicit iteration over our iteration_list of values and calls to run_something with a call to simple_parallel with these as arguments and specifying how many processors to use.
Bottom line on this: set up your calculation as an iteration over some list of values where the results are appended to a list and the list of result is returned. Then you can use simple_parallel without changing anything else at all.
You can set up multiprocessing in a more complicated situation pretty easily as well. Suppose we want to run a method that is going to do something with a really big input (like a map) that is going to be the same for every job, and some inputs that are going to be different for each job (like a different set of coordinates each time).
In this example the key features are the big object (which we do not want to copy and we do not want to write to disk and we may not be able to pickle) and some list of sets of inputs, one set of inputs for each job.
This is going to look a lot like the easy example, but it will have some new features.
Here is the function we want to run. We are going to call our fixed big object
big_object. (It can have any name, and there can be any number of such inputs, or none of them). We are going to call this function with an
info object (a
group_args object) that is going to be different for each job and that contains values of
info.index. (Once again, this info object can contain anything you want.) In this case, we are going to multiply
info.value by two and add the value of
big_object at index
def run_advanced(info, big_object = None, # log = sys.stdout): # we can specify the log in this method if we want output_value = info.value * 2 + big_object[info.index] # our result print("Value: %s Index: %s Output value: %s" %(info.value, info.index, output_value), file = log) return group_args( # group_args_type = 'Result from one job', # input_info = info, # output_value = output_value,) #
In this example function we have added two new features:
log = sys.stdoutto your function and have print statements in your function that print to
simple_parallelprocedure will handle all the text that is printed.
group_argsobject and not a simple value. The use of
group_argsfor a return value allows returning multiple items in a simple way and allows saving and printing output text.
Note that we are returning the input
info object as an element of the result
group_args object. We don't have to do that, but it shows that you can return something that will identify which output came from which inputs.
Here is how we are going to set up to run this without multiprocessing. This time we are going to make our
iteration_list be a list of the
group_args objects that specify what inputs are to be used in each job, and set our
big_object to be a little array:
iteration_list =  # initialize from libtbx import group_args for i in [5,7,9]: # our values to vary for each job iteration_list.append( # a list of info objects group_args( # info object (group_args) group_args_type = 'value of info for one job', # title value = i, # value of value index = 2) # value of index ) # big_object = [0,1,2,3] # just some supposedly big object
... and we can run it like this:
def advanced_run_as_is(iteration_list, # big_object = None, # log = sys.stdout): # run in usual way result_list =  # initialize for i in range(len(iteration_list)): # iterate through jobs result = run_advanced(iteration_list[i], big_object = big_object, log = log) # run job result_list.append(result) # return result_list # return list of results
We can run this and print out each result as a
group_args object. Notice how convenient it is that each
result records both the inputs and the output.
The use of the
log keyword allows us to capture the log output if we want to.
result_list = advanced_run_as_is( # iteration_list, big_object = big_object, log = sys.stdout) # for result in result_list: # run through results print("\nOne result:\n%s" %str(result)) # print this result (it is a group_args object)
Now we can run the same job in parallel with just tiny changes:
def advanced_run_in_parallel(iteration_list, # big_object = None, nproc = 4, log = sys.stdout): # run in parallel w from libtbx.easy_mp import simple_parallel # result_list = simple_parallel( # iteration_list = iteration_list, # list of varying inputs big_object = big_object, # any number of keyword arguments allowed function = run_advanced, # function to run nproc = 3, # number of processors verbose = False, # non-verbose output log = log, ) return result_list
verbose = False the output from the first job run (actually the first batch of jobs if more than one job is in each batch as will be the case if there are many more jobs than processors used) is sent to the log stream (here that is
sys.stdout) and all the other output is ignored. If we had set instead set
verbose = True then all the output would be printed (hard to read but useful for debugging).
We can now run and print out each result as a
result_list = advanced_run_in_parallel( # iteration_list, # big_object = big_object, # log = sys.stdout) # for result in result_list: # run through results print("\nOne result:\n%s" %str(result)) # print this result (it is a group_args object)