How to run any method with multiprocessing

This tutorial describes how to convert (almost) any method to run with multiprocessing

Setting up a very simple method with multiprocessing

Although setting up multiprocessing can seem intimidating, it is actually amazingly easy to do in most cases. We are going to set up an example where we first run without multiprocessing and then we convert the method to run with multiprocessing.

The key to easy multiprocessing is setting up your function in a certain way. We are going to separate the function that does something from the list of inputs to that function (which are going to change for the N jobs that are run).

First, let's set up some imports:

from __future__ import print_function  # so we can use print function easily
import sys        # import sys
from libtbx import group_args #  group_args function we will use to return items

We have imported the print_function here so that this method will work in both python 2 and python 3.

Here is our function:

def run_something(value):  # simple function
  return value * 2    # just returns 2 * the input value

And here is our list of inputs to the function that we would like to run in parallel:

iteration_list = [5,7,9]  # list of anything

Let's first set up this calculation running one by one through the values in the values in iteration_list. It is a good idea to start this way even if you know from the start that you want to run with multiprocessing because once you have set it up in the way we have it below it is very easy to convert.

def run_one_by_one(iteration_list): #
  result_list = []                             # initialize result list
  for i in range(len(iteration_list)):         # iterate over input values
    result = run_something(iteration_list[i])  # get result
    result_list.append(result)  # save result
  return result_list   # return the list of results

We can run run_one_by_one and get a list of results:

result_list = run_one_by_one(iteration_list)  # get results one by one
print(result_list)  # prints list of values  [10, 14, 18]

Now let's convert the run_one_by_one method to run with multiprocessing.

def run_in_parallel(iteration_list, nproc = 4): #
  from libtbx.easy_mp import simple_parallel  # import the simple_parallel code
  result_list = simple_parallel(      # run in parallel
    iteration_list = iteration_list,  # our list of values to run with
    function = run_something,         # the method we are running
    nproc = nproc )                   # how many processors
  return result_list   # return the list of results

Now we can run run_in_parallel and get a list of results:

result_list = run_in_parallel(iteration_list, nproc = 4)  # run in parallel
print(result_list)  # prints list of values  [10, 14, 18]

Wow that was easy. All we did was replace the explicit iteration over our iteration_list of values and calls to run_something with a call to simple_parallel with these as arguments and specifying how many processors to use.

Bottom line on this: set up your calculation as an iteration over some list of values where the results are appended to a list and the list of result is returned. Then you can use simple_parallel without changing anything else at all.

Advanced multiprocessing made easy

You can set up multiprocessing in a more complicated situation pretty easily as well. Suppose we want to run a method that is going to do something with a really big input (like a map) that is going to be the same for every job, and some inputs that are going to be different for each job (like a different set of coordinates each time).

In this example the key features are the big object (which we do not want to copy and we do not want to write to disk and we may not be able to pickle) and some list of sets of inputs, one set of inputs for each job.

This is going to look a lot like the easy example, but it will have some new features.

Here is the function we want to run. We are going to call our fixed big object big_object. (It can have any name, and there can be any number of such inputs, or none of them). We are going to call this function with an info object (a group_args object) that is going to be different for each job and that contains values of info.value and info.index. (Once again, this info object can contain anything you want.) In this case, we are going to multiply info.value by two and add the value of big_object at index info.index:

def run_advanced(info, big_object = None,  #
     log = sys.stdout):   #  we can specify the log in this method if we want
  output_value = info.value * 2 + big_object[info.index]   # our result
  print("Value: %s Index: %s Output value: %s" %(info.value, info.index, output_value), file = log)
  return group_args( #
    group_args_type = 'Result from one job',  #
    input_info = info,   #
    output_value = output_value,)   #

In this example function we have added two new features:

  • A log stream. If you add the keyword log = sys.stdout to your function and have print statements in your function that print to log then the simple_parallel procedure will handle all the text that is printed.

  • We are returning a group_args object and not a simple value. The use of group_args for a return value allows returning multiple items in a simple way and allows saving and printing output text.

Note that we are returning the input info object as an element of the result group_args object. We don't have to do that, but it shows that you can return something that will identify which output came from which inputs.

Here is how we are going to set up to run this without multiprocessing. This time we are going to make our iteration_list be a list of the info group_args objects that specify what inputs are to be used in each job, and set our big_object to be a little array:

iteration_list = []   # initialize
from libtbx import group_args
for i in [5,7,9]:  # our values to vary for each job
  iteration_list.append(   # a list of info objects
    group_args(   # info object (group_args)
      group_args_type = 'value of info for one job',   # title
      value = i,   # value of value
      index = 2)   # value of index
    )   #

big_object = [0,1,2,3]  # just some supposedly big object

... and we can run it like this:

def advanced_run_as_is(iteration_list,  #
     big_object = None,  #
     log = sys.stdout): # run in usual way
  result_list = []   # initialize
  for i in range(len(iteration_list)):     #  iterate through jobs
    result = run_advanced(iteration_list[i],
      big_object = big_object,
      log = log)  # run job
    result_list.append(result)   #
  return result_list    # return list of results

We can run this and print out each result as a group_args object. Notice how convenient it is that each result records both the inputs and the output.

The use of the log keyword allows us to capture the log output if we want to.

result_list = advanced_run_as_is( #
   iteration_list, big_object = big_object,
   log = sys.stdout) #
for result in result_list:  # run through results
  print("\nOne result:\n%s" %str(result))  # print this result (it is a group_args object)

Now we can run the same job in parallel with just tiny changes:

def advanced_run_in_parallel(iteration_list,  #
      big_object = None, nproc = 4, log = sys.stdout): # run in parallel w
  from libtbx.easy_mp import simple_parallel  #
  result_list = simple_parallel(  #
    iteration_list = iteration_list, # list of varying inputs
    big_object = big_object, # any number of keyword arguments allowed
    function = run_advanced,  # function to run
    nproc = 3,   # number of processors
    verbose = False,   # non-verbose output
    log = log,
  return result_list

With verbose = False the output from the first job run (actually the first batch of jobs if more than one job is in each batch as will be the case if there are many more jobs than processors used) is sent to the log stream (here that is sys.stdout) and all the other output is ignored. If we had set instead set verbose = True then all the output would be printed (hard to read but useful for debugging).

We can now run and print out each result as a group_args object:

result_list = advanced_run_in_parallel(  #
    iteration_list, #
    big_object = big_object,  #
    log = sys.stdout) #
for result in result_list:  # run through results
  print("\nOne result:\n%s" %str(result))  # print this result (it is a group_args object)