Using the Multiprocess Library in Python 3

Python has a nifty multiprocessing library which comes with a lot of helpful abstractions. However, as with concurrent programming in most languages, there are lots of footguns.

Here some of the gotchas I ran into:

  • Logging does not work as you’d expect. Global state associated with your logger will be wiped out, although if you’ve already defined a logger variable it will continue to reference the same object from the parent process. It seems like the easiest solution for logging is to setup a new file-based logger in the child process. If you can’t do this, you’ll need to implement some sort of message queue logging which sounds terrible.
  • Relatedly, be careful about using any database connections, file handles, etc in a forked process. This can cause strange, hard to debug errors.
  • When you pass variables to a forked process, they are ‘pickled’. This serializes the python data structure and deserializes on the ‘other end’ (i.e. in the forked process). I was trying to decorate a function and pickle it, and ran into weird issues. Only top-level module functions can be pickled.
  • If you are using the macos libraries via python, you cannot reference them both on a parent and child process. The solution here is to run all functions which hit the macos libraries in a subprocess. I was not able to get the decorator in this linked post working. Here’s a working example using a modified version of the source below.

I struggled to find full working examples of using the multiprocess library online (here’s the best I found). I’ve included an example of using multiprocessing to create a forked process to execute a function and result the results inline.

  • Send a signal from the parent process to the child process to start executing using multiprocessing.Condition. I want not able to get this working without first notify()ing the parent process.
  • Kill the child process after 10m. This works around memory leaks I was running into with the applescript I was trying to execute.
  • Configure logging in forked process.
  • Return result synchronously to the caller using a shared queue implemented using multiprocessing.Queue
import multiprocessing
import time
import logging

forked_condition = None
forked_result_queue = None
forked_process = None
forked_time = None

logger = logging.getLogger(__name__)

def _wrapped_function(condition, result_queue, function_reference):
    # this is run in a forked process, which wipes all logging configuration
    # you'll need to reconfigure your logging instance in the forked process
    logger.setLevel(logging.DEBUG)

    first_run = True

    while True:
        with condition:
            # notify parent process that we are ready to wait for notifications
            # an alternative here that I did not attempt is waiting for `is_alive()` https://stackoverflow.com/questions/57929895/python-multiprocessing-process-start-wait-for-process-to-be-started
            if first_run:
                condition.notify()
                first_run = False

            condition.wait()

        try:
            logger.debug("running operation in fork")
            result_queue.put(function_reference())
        except Exception as e:
            logger.exception("error running function in fork")
            result_queue.put(None)

def _run_in_forked_process(function_reference):
    global forked_condition, forked_result_queue, forked_process, forked_time

    # terminate the process after 10m
    if forked_time and time.time() - forked_time > 60 * 10:
        assert forked_process
        logger.debug("killing forked process, 10 minutes have passed")
        forked_process.kill()
        forked_process = None

    if not forked_process:
        forked_condition = multiprocessing.Condition()

        forked_result_queue = multiprocessing.Queue()
        forked_process = multiprocessing.Process(
            target=_wrapped_function,
            args=(forked_condition, forked_result_queue, function_reference)
        )
        forked_process.start()

        forked_time = time.time()

        # wait until fork is ready, if this isn't done the process seems to miss the
        # the parent process `notify()` call. My guess is `wait()` needs to be called before `notify()`
        with forked_condition:
            logger.debug("waiting for child process to indicate readiness")
            forked_condition.wait()

    # if forked_process is defined, forked_condition always should be as well
    assert forked_condition and forked_result_queue

    # signal to the process to run `getInfo` again and put the result on the queue
    with forked_condition:
        forked_condition.notify()

    logger.debug("waiting for result of child process")

    return forked_result_queue.get(block=True)

def _exampleFunction():
  # do something strange, like running applescript
  return "hello"

def exampleFunction():
    return _run_in_forked_process(_exampleFunction)

# you can use the wrapped function like a normal python function
print(exampleFunction())

# this doesn't make sense to use in a single-use script, but if you need to you'll need to terminate the forked process
forked_process.kill()

Note that the target environment here was macos. This may not work perfectly on linux or windows, it seems as though there are additional footguns on windows in particular.