diff --git a/README.md b/README.md index b65dd43..dea6513 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,8 @@ Used to parallelize for-loops using parfor in Matlab? This package allows you to do the same in python. Take any normal serial but parallelizable for-loop and execute it in parallel using easy syntax. Don't worry about the technical details of using the multiprocessing module, race conditions, queues, -parfor handles all that. +parfor handles all that. Also, parfor uses dill instead of pickle, so that a lot more objects can be used +when parallelizing. Tested on linux on python 2.7 and 3.8 @@ -14,14 +15,20 @@ an iterator. tqdm, dill ## Limitations -Some objects cannot be passed and or used in child processes. Such objects include objects relying on -java-bridge. Examples include reader objects from the python-bioformats package. +Objects passed to the pool need to be dillable (dill needs to serialize them). Generators and SwigPyObjects are examples +of objects that cannot be used. They can be used however, for the iterator argument when using parfor, but its +iterations need to be dillable. You might be able to make objects dillable anyhow using dill.register. -### Required arguments: +The function evaluated in parallel needs to terminate. If parfor hangs after seeming to complete the task, it probably +is because the individual processes cannot terminate. Importing javabridge (used in python-bioformats) and starting the +java virtual machine can cause it to hang since the processes only terminate after the java vm has quit. + +## Arguments +### Required: fun: function taking arguments: iteration from iterable, other arguments defined in args & kwargs iterable: iterable from which an item is given to fun as a first argument -### Optional arguments: +### Optional: args: tuple with other unnamed arguments to fun kwargs: dict with other named arguments to fun length: give the length of the iterator in cases where len(iterator) results in an error @@ -32,7 +39,7 @@ java-bridge. Examples include reader objects from the python-bioformats package. serial: switch to serial if number of tasks less than serial, default: 4 debug: if an error occurs in an iteration, return the erorr instead of retrying in the main process -### Output +### Return list with results from applying the decorated function to each iteration of the iterator specified as the first argument to the function diff --git a/parfor/__init__.py b/parfor/__init__.py index d333075..1adeeae 100644 --- a/parfor/__init__.py +++ b/parfor/__init__.py @@ -1,9 +1,118 @@ from __future__ import print_function +import sys import multiprocessing import warnings +import dill from tqdm.auto import tqdm -from dill import dumps, loads from traceback import format_exc +from pickle import PicklingError, dispatch_table + +PY3 = (sys.hexversion >= 0x3000000) + +try: + from cStringIO import StringIO +except ImportError: + if PY3: + from io import BytesIO as StringIO + else: + from StringIO import StringIO + +failed_rv = (lambda *args, **kwargs: None, ()) + +class Pickler(dill.Pickler): + """ Overload dill to ignore unpickleble parts of objects. + You probably didn't want to use these parts anyhow. + However, if you did, you'll have to find some way to make them pickleble. + """ + def save(self, obj, save_persistent_id=True): + """ Copied from pickle and amended. """ + if PY3: + self.framer.commit_frame() + + # Check for persistent id (defined by a subclass) + pid = self.persistent_id(obj) + if pid is not None and save_persistent_id: + self.save_pers(pid) + return + + # Check the memo + x = self.memo.get(id(obj)) + if x is not None: + self.write(self.get(x[0])) + return + + rv = NotImplemented + reduce = getattr(self, "reducer_override", None) + if reduce is not None: + rv = reduce(obj) + + if rv is NotImplemented: + # Check the type dispatch table + t = type(obj) + f = self.dispatch.get(t) + if f is not None: + f(self, obj) # Call unbound method with explicit self + return + + # Check private dispatch table if any, or else + # copyreg.dispatch_table + reduce = getattr(self, 'dispatch_table', dispatch_table).get(t) + if reduce is not None: + rv = reduce(obj) + else: + # Check for a class with a custom metaclass; treat as regular + # class + if issubclass(t, type): + self.save_global(obj) + return + + # Check for a __reduce_ex__ method, fall back to __reduce__ + reduce = getattr(obj, "__reduce_ex__", None) + try: + if reduce is not None: + rv = reduce(self.proto) + else: + reduce = getattr(obj, "__reduce__", None) + if reduce is not None: + rv = reduce() + else: + raise PicklingError("Can't pickle %r object: %r" % + (t.__name__, obj)) + except: + rv = failed_rv + + # Check for string returned by reduce(), meaning "save as global" + if isinstance(rv, str): + try: + self.save_global(obj, rv) + except: + self.save_global(obj, failed_rv) + return + + # Assert that reduce() returned a tuple + if not isinstance(rv, tuple): + raise PicklingError("%s must return string or tuple" % reduce) + + # Assert that it returned an appropriately sized tuple + l = len(rv) + if not (2 <= l <= 6): + raise PicklingError("Tuple returned by %s must have " + "two to six elements" % reduce) + + # Save the reduce() output and finally memoize the object + try: + self.save_reduce(obj=obj, *rv) + except: + self.save_reduce(obj=obj, *failed_rv) + +def dumps(obj, protocol=None, byref=None, fmode=None, recurse=None, **kwds):#, strictio=None): + """pickle an object to a string""" + protocol = dill.settings['protocol'] if protocol is None else int(protocol) + _kwds = kwds.copy() + _kwds.update(dict(byref=byref, fmode=fmode, recurse=recurse)) + file = StringIO() + Pickler(file, protocol, **_kwds).dump(obj) + return file.getvalue() def chunks(n, *args): """ Yield successive n-sized chunks from lists. """ @@ -205,9 +314,9 @@ class parpool(object): 'debuggable error. If it doesn\'t, it\'s an error specific to parallel execution.' .format(r[1])) warnings.formatwarning = pfmt - fun, args, kwargs = [loads(f[1]) for f in r[2][1:]] - r = (False, r[1], fun(r[2][0], *args, **kwargs)) - self.res[r[1]] = r[2] + fun, args, kwargs = [dill.loads(f[1]) for f in r[2][1:]] + r = (False, r[1], fun(dill.loads(r[2][0]), *args, **kwargs)) + self.res[r[1]] = dill.loads(r[2]) if not self.bar is None: self.bar.update() self._qbar_update() @@ -216,6 +325,7 @@ class parpool(object): def __call__(self, n, fun=None, args=None, kwargs=None, handle=None): """ Add new iteration, using optional manually defined handle.""" + n = dumps(n, recurse=True) if not fun is None: self.fun = fun if not args is None: @@ -306,7 +416,7 @@ class parpool(object): fun = self.get_from_cache(*Fun) args = self.get_from_cache(*Args) kwargs = self.get_from_cache(*Kwargs) - self.Qo.put((False, i, fun(n, *args, **kwargs))) + self.Qo.put((False, i, dumps(fun(dill.loads(n), *args, **kwargs), recurse=True))) except multiprocessing.queues.Empty: continue except: @@ -320,7 +430,7 @@ class parpool(object): hs, objs = zip(*self.cache) if h in hs: return objs[hs.index(h)] - obj = loads(ser) + obj = dill.loads(ser) self.cache.append((h, obj)) while len(self.cache) > self.cachesize: self.cache.pop(0) diff --git a/setup.py b/setup.py index b3fd721..648cccf 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ with open("README.md", "r") as fh: setuptools.setup( name="parfor", - version="2020.08.3", + version="2020.09.2", author="Wim Pomp", author_email="wimpomp@gmail.com", description="A package to mimic the use of parfor as done in Matlab.",