This tutorial describes how to convert (almost) any method to run with multiprocessing
Although setting up multiprocessing can seem intimidating, it is actually amazingly easy to do in most cases. We are going to set up an example where we first run without multiprocessing and then we convert the method to run with multiprocessing.
The key to easy multiprocessing is setting up your function in a certain way. We are going to separate the function that does something from the list of inputs to that function (which are going to change for the N jobs that are run).
First, let's set up some imports:
from __future__ import print_function # so we can use print function easily
import sys # import sys
from libtbx import group_args # group_args function we will use to return items
We have imported the print_function
here so that this method will work in both python 2 and python 3.
Here is our function:
def run_something(value): # simple function
return value * 2 # just returns 2 * the input value
And here is our list of inputs to the function that we would like to run in parallel:
iteration_list = [5,7,9] # list of anything
Let's first set up this calculation running one by one through the values in the values in iteration_list
. It is a good idea to start this way even if you know from the start that you want to run with multiprocessing because once you have set it up in the way we have it below it is very easy to convert.
def run_one_by_one(iteration_list): #
result_list = [] # initialize result list
for i in range(len(iteration_list)): # iterate over input values
result = run_something(iteration_list[i]) # get result
result_list.append(result) # save result
return result_list # return the list of results
We can run run_one_by_one
and get a list of results:
result_list = run_one_by_one(iteration_list) # get results one by one
print(result_list) # prints list of values [10, 14, 18]
Now let's convert the run_one_by_one
method to run with multiprocessing.
def run_in_parallel(iteration_list, nproc = 4): #
from libtbx.easy_mp import simple_parallel # import the simple_parallel code
result_list = simple_parallel( # run in parallel
iteration_list = iteration_list, # our list of values to run with
function = run_something, # the method we are running
nproc = nproc ) # how many processors
return result_list # return the list of results
Now we can run run_in_parallel
and get a list of results:
result_list = run_in_parallel(iteration_list, nproc = 4) # run in parallel
print(result_list) # prints list of values [10, 14, 18]
Wow that was easy. All we did was replace the explicit iteration over our iteration_list of values and calls to run_something with a call to simple_parallel with these as arguments and specifying how many processors to use.
Bottom line on this: set up your calculation as an iteration over some list of values where the results are appended to a list and the list of result is returned. Then you can use simple_parallel without changing anything else at all.
You can set up multiprocessing in a more complicated situation pretty easily as well. Suppose we want to run a method that is going to do something with a really big input (like a map) that is going to be the same for every job, and some inputs that are going to be different for each job (like a different set of coordinates each time).
In this example the key features are the big object (which we do not want to copy and we do not want to write to disk and we may not be able to pickle) and some list of sets of inputs, one set of inputs for each job.
This is going to look a lot like the easy example, but it will have some new features.
Here is the function we want to run. We are going to call our fixed big object big_object
. (It can have any name, and there can be any number of such inputs, or none of them). We are going to call this function with an info
object (a group_args
object) that is going to be different for each job and that contains values of info.value
and info.index
. (Once again, this info object can contain anything you want.) In this case, we are going to multiply info.value
by two and add the value of big_object
at index info.index
:
def run_advanced(info, big_object = None, #
log = sys.stdout): # we can specify the log in this method if we want
output_value = info.value * 2 + big_object[info.index] # our result
print("Value: %s Index: %s Output value: %s" %(info.value, info.index, output_value), file = log)
return group_args( #
group_args_type = 'Result from one job', #
input_info = info, #
output_value = output_value,) #
In this example function we have added two new features:
log = sys.stdout
to your function and have print statements in your function that print to log
then the simple_parallel
procedure will handle all the text that is printed.
group_args
object and not a simple value. The use of group_args
for a return value allows returning multiple items in a simple way and allows saving and printing output text.
Note that we are returning the input info
object as an element of the result group_args
object. We don't have to do that, but it shows that you can return something that will identify which output came from which inputs.
Here is how we are going to set up to run this without multiprocessing. This time we are going to make our iteration_list
be a list of the info
group_args
objects that specify what inputs are to be used in each job, and set our big_object
to be a little array:
iteration_list = [] # initialize
from libtbx import group_args
for i in [5,7,9]: # our values to vary for each job
iteration_list.append( # a list of info objects
group_args( # info object (group_args)
group_args_type = 'value of info for one job', # title
value = i, # value of value
index = 2) # value of index
) #
big_object = [0,1,2,3] # just some supposedly big object
... and we can run it like this:
def advanced_run_as_is(iteration_list, #
big_object = None, #
log = sys.stdout): # run in usual way
result_list = [] # initialize
for i in range(len(iteration_list)): # iterate through jobs
result = run_advanced(iteration_list[i],
big_object = big_object,
log = log) # run job
result_list.append(result) #
return result_list # return list of results
We can run this and print out each result as a group_args
object. Notice how convenient it is that each result
records both the inputs and the output.
The use of the log
keyword allows us to capture the log output if we want to.
result_list = advanced_run_as_is( #
iteration_list, big_object = big_object,
log = sys.stdout) #
for result in result_list: # run through results
print("\nOne result:\n%s" %str(result)) # print this result (it is a group_args object)
Now we can run the same job in parallel with just tiny changes:
def advanced_run_in_parallel(iteration_list, #
big_object = None, nproc = 4, log = sys.stdout): # run in parallel w
from libtbx.easy_mp import simple_parallel #
result_list = simple_parallel( #
iteration_list = iteration_list, # list of varying inputs
big_object = big_object, # any number of keyword arguments allowed
function = run_advanced, # function to run
nproc = 3, # number of processors
verbose = False, # non-verbose output
log = log,
)
return result_list
With verbose = False
the output from the first job run (actually the first batch of jobs if more than one job is in each batch as will be the case if there are many more jobs than processors used) is sent to the log stream (here that is sys.stdout
) and all the other output is ignored. If we had set instead set verbose = True
then all the output would be printed (hard to read but useful for debugging).
We can now run and print out each result as a group_args
object:
result_list = advanced_run_in_parallel( #
iteration_list, #
big_object = big_object, #
log = sys.stdout) #
for result in result_list: # run through results
print("\nOne result:\n%s" %str(result)) # print this result (it is a group_args object)