aboutsummaryrefslogtreecommitdiff
path: root/plip/basic/parallel.py
diff options
context:
space:
mode:
Diffstat (limited to 'plip/basic/parallel.py')
-rw-r--r--plip/basic/parallel.py52
1 files changed, 52 insertions, 0 deletions
diff --git a/plip/basic/parallel.py b/plip/basic/parallel.py
new file mode 100644
index 0000000..1f71943
--- /dev/null
+++ b/plip/basic/parallel.py
@@ -0,0 +1,52 @@
+import itertools
+import multiprocessing
+from builtins import zip
+from functools import partial
+
+from numpy import asarray
+
+
+class SubProcessError(Exception):
+ def __init__(self, e, exitcode=1):
+ self.exitcode = exitcode
+ super(SubProcessError, self).__init__(e)
+
+ pass
+
+
+def universal_worker(input_pair):
+ """This is a wrapper function expecting a tiplet of function, single
+ argument, dict of keyword arguments. The provided function is called
+ with the appropriate arguments."""
+ function, arg, kwargs = input_pair
+ return function(arg, **kwargs)
+
+
+def pool_args(function, sequence, kwargs):
+ """Return a single iterator of n elements of lists of length 3, given a sequence of len n."""
+ return zip(itertools.repeat(function), sequence, itertools.repeat(kwargs))
+
+
+def parallel_fn(f):
+ """Simple wrapper function, returning a parallel version of the given function f.
+ The function f must have one argument and may have an arbitray number of
+ keyword arguments. """
+
+ def simple_parallel(func, sequence, **args):
+ """ f takes an element of sequence as input and the keyword args in **args"""
+ if 'processes' in args:
+ processes = args.get('processes')
+ del args['processes']
+ else:
+ processes = multiprocessing.cpu_count()
+
+ pool = multiprocessing.Pool(processes) # depends on available cores
+
+ result = pool.map_async(universal_worker, pool_args(func, sequence, args))
+ pool.close()
+ pool.join()
+ cleaned = [x for x in result.get() if x is not None] # getting results
+ cleaned = asarray(cleaned)
+ return cleaned
+
+ return partial(simple_parallel, f)