aboutsummaryrefslogtreecommitdiff
path: root/plip/basic/parallel.py
diff options
context:
space:
mode:
Diffstat (limited to 'plip/basic/parallel.py')
-rw-r--r--plip/basic/parallel.py52
1 files changed, 0 insertions, 52 deletions
diff --git a/plip/basic/parallel.py b/plip/basic/parallel.py
deleted file mode 100644
index 1f71943..0000000
--- a/plip/basic/parallel.py
+++ /dev/null
@@ -1,52 +0,0 @@
-import itertools
-import multiprocessing
-from builtins import zip
-from functools import partial
-
-from numpy import asarray
-
-
-class SubProcessError(Exception):
- def __init__(self, e, exitcode=1):
- self.exitcode = exitcode
- super(SubProcessError, self).__init__(e)
-
- pass
-
-
-def universal_worker(input_pair):
- """This is a wrapper function expecting a tiplet of function, single
- argument, dict of keyword arguments. The provided function is called
- with the appropriate arguments."""
- function, arg, kwargs = input_pair
- return function(arg, **kwargs)
-
-
-def pool_args(function, sequence, kwargs):
- """Return a single iterator of n elements of lists of length 3, given a sequence of len n."""
- return zip(itertools.repeat(function), sequence, itertools.repeat(kwargs))
-
-
-def parallel_fn(f):
- """Simple wrapper function, returning a parallel version of the given function f.
- The function f must have one argument and may have an arbitray number of
- keyword arguments. """
-
- def simple_parallel(func, sequence, **args):
- """ f takes an element of sequence as input and the keyword args in **args"""
- if 'processes' in args:
- processes = args.get('processes')
- del args['processes']
- else:
- processes = multiprocessing.cpu_count()
-
- pool = multiprocessing.Pool(processes) # depends on available cores
-
- result = pool.map_async(universal_worker, pool_args(func, sequence, args))
- pool.close()
- pool.join()
- cleaned = [x for x in result.get() if x is not None] # getting results
- cleaned = asarray(cleaned)
- return cleaned
-
- return partial(simple_parallel, f)