From 6c4f2d51325dadaf821bc0b602b82d01a1c79202 Mon Sep 17 00:00:00 2001 From: "w.pomp" Date: Fri, 4 Sep 2020 18:03:30 +0200 Subject: [PATCH] - Add terminator function. --- README.md | 13 +++++++++---- parfor/__init__.py | 19 +++++++++++++------ setup.py | 2 +- 3 files changed, 23 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index dea6513..a650a40 100644 --- a/README.md +++ b/README.md @@ -2,13 +2,17 @@ Used to parallelize for-loops using parfor in Matlab? This package allows you to do the same in python. Take any normal serial but parallelizable for-loop and execute it in parallel using easy syntax. Don't worry about the technical details of using the multiprocessing module, race conditions, queues, -parfor handles all that. Also, parfor uses dill instead of pickle, so that a lot more objects can be used -when parallelizing. +parfor handles all that. Tested on linux on python 2.7 and 3.8 +## Why is parfor better than just using multiprocessing? +- Easy to use +- Using dill instead of pickle: a lot more objects can be used when parallelizing +- Progress bars are built-in + ## Usage -Parfor decorates a functions and returns the result of that function evaluated for each iteration of +Parfor decorates a functions and returns the result of that function evaluated in parallel for each iteration of an iterator. ## Requires @@ -21,7 +25,8 @@ iterations need to be dillable. You might be able to make objects dillable anyho The function evaluated in parallel needs to terminate. If parfor hangs after seeming to complete the task, it probably is because the individual processes cannot terminate. Importing javabridge (used in python-bioformats) and starting the -java virtual machine can cause it to hang since the processes only terminate after the java vm has quit. +java virtual machine can cause it to hang since the processes only terminate after the java vm has quit. In this case, +pass terminator=javabridge.kill_vm to parfor. ## Arguments ### Required: diff --git a/parfor/__init__.py b/parfor/__init__.py index 1adeeae..c793539 100644 --- a/parfor/__init__.py +++ b/parfor/__init__.py @@ -105,7 +105,7 @@ class Pickler(dill.Pickler): except: self.save_reduce(obj=obj, *failed_rv) -def dumps(obj, protocol=None, byref=None, fmode=None, recurse=None, **kwds):#, strictio=None): +def dumps(obj, protocol=None, byref=None, fmode=None, recurse=True, **kwds): """pickle an object to a string""" protocol = dill.settings['protocol'] if protocol is None else int(protocol) _kwds = kwds.copy() @@ -238,7 +238,7 @@ class parpool(object): """ Parallel processing with addition of iterations at any time and request of that result any time after that. The target function and its argument can be changed at any time. """ - def __init__(self, fun=None, args=None, kwargs=None, nP=0.33, bar=None, qbar=None, debug=False): + def __init__(self, fun=None, args=None, kwargs=None, nP=0.33, bar=None, qbar=None, terminator=None, debug=False): """ fun, args, kwargs: target function and its arguments and keyword arguments. nP: number of workers, or fraction of cpu cores to use, default: 0.33. bar, qbar: instances of tqdm and tqdmm to use for monitoring buffer and progress. """ @@ -259,7 +259,7 @@ class parpool(object): self.E = ctx.Event() self.Qi = ctx.Queue(3*self.nP) self.Qo = ctx.Queue(3*self.nP) - self.P = ctx.Pool(self.nP, self._worker(self.Qi, self.Qo, self.E, self.debug)) + self.P = ctx.Pool(self.nP, self._worker(self.Qi, self.Qo, self.E, terminator, self.debug)) self.is_alive = True self.res = {} self.handle = 0 @@ -400,13 +400,15 @@ class parpool(object): class _worker(object): """ Manages executing the target function which will be executed in different processes. """ - def __init__(self, Qi, Qo, E, debug=False, cachesize=48): + def __init__(self, Qi, Qo, E, terminator, debug=False, cachesize=48): self.cache = [] self.Qi = Qi self.Qo = Qo self.E = E + self.terminator = dumps(terminator, recurse=True) self.debug = debug self.cachesize = cachesize + # print(self.terminator) def __call__(self): while not self.E.is_set(): @@ -424,6 +426,9 @@ class parpool(object): self.Qo.put((False, i, format_exc())) else: self.Qo.put((True, i, (n, Fun, Args, Kwargs))) + terminator = dill.loads(self.terminator) + if not terminator is None: + terminator() def get_from_cache(self, h, ser): if len(self.cache): @@ -436,7 +441,8 @@ class parpool(object): self.cache.pop(0) return obj -def pmap(fun, iterable=None, args=None, kwargs=None, length=None, desc=None, bar=True, qbar=False, nP=0.33, serial=4, debug=False): +def pmap(fun, iterable=None, args=None, kwargs=None, length=None, desc=None, bar=True, qbar=False, terminator=None, + nP=0.33, serial=4, debug=False): """ map a function fun to each iteration in iterable best use: iterable is a generator and length is given to this function @@ -448,6 +454,7 @@ def pmap(fun, iterable=None, args=None, kwargs=None, length=None, desc=None, bar desc: string with description of the progress bar bar: bool enable progress bar pbar: bool enable buffer indicator bar + terminator: function which is executed in each worker after all the work is done nP: number of workers, default: number of cpu's/3 serial: switch to serial if number of tasks less than serial, default: 4 debug: if an error occurs in an iteration, return the erorr instead of retrying in the main process @@ -463,7 +470,7 @@ def pmap(fun, iterable=None, args=None, kwargs=None, length=None, desc=None, bar else: #parallel case with tqdmm(total=0, desc='Task buffer', disable=not qbar, leave=False) as qbar,\ tqdm(total=length, desc=desc, disable=not bar) as bar: - with parpool(fun, args, kwargs, nP, bar, qbar, debug) as p: + with parpool(fun, args, kwargs, nP, bar, qbar, terminator, debug) as p: length = 0 for i, j in enumerate(iterable): #add work to the queue p[i] = j diff --git a/setup.py b/setup.py index 648cccf..eb4b936 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ with open("README.md", "r") as fh: setuptools.setup( name="parfor", - version="2020.09.2", + version="2020.09.3", author="Wim Pomp", author_email="wimpomp@gmail.com", description="A package to mimic the use of parfor as done in Matlab.",