diff options
Diffstat (limited to 'plip/basic/parallel.py')
-rw-r--r-- | plip/basic/parallel.py | 52 |
1 files changed, 0 insertions, 52 deletions
diff --git a/plip/basic/parallel.py b/plip/basic/parallel.py deleted file mode 100644 index 1f71943..0000000 --- a/plip/basic/parallel.py +++ /dev/null @@ -1,52 +0,0 @@ -import itertools -import multiprocessing -from builtins import zip -from functools import partial - -from numpy import asarray - - -class SubProcessError(Exception): - def __init__(self, e, exitcode=1): - self.exitcode = exitcode - super(SubProcessError, self).__init__(e) - - pass - - -def universal_worker(input_pair): - """This is a wrapper function expecting a tiplet of function, single - argument, dict of keyword arguments. The provided function is called - with the appropriate arguments.""" - function, arg, kwargs = input_pair - return function(arg, **kwargs) - - -def pool_args(function, sequence, kwargs): - """Return a single iterator of n elements of lists of length 3, given a sequence of len n.""" - return zip(itertools.repeat(function), sequence, itertools.repeat(kwargs)) - - -def parallel_fn(f): - """Simple wrapper function, returning a parallel version of the given function f. - The function f must have one argument and may have an arbitray number of - keyword arguments. """ - - def simple_parallel(func, sequence, **args): - """ f takes an element of sequence as input and the keyword args in **args""" - if 'processes' in args: - processes = args.get('processes') - del args['processes'] - else: - processes = multiprocessing.cpu_count() - - pool = multiprocessing.Pool(processes) # depends on available cores - - result = pool.map_async(universal_worker, pool_args(func, sequence, args)) - pool.close() - pool.join() - cleaned = [x for x in result.get() if x is not None] # getting results - cleaned = asarray(cleaned) - return cleaned - - return partial(simple_parallel, f) |