- Add terminator function.

This commit is contained in:
w.pomp
2020-09-04 18:03:30 +02:00
parent da70cf7a2f
commit 6c4f2d5132
3 changed files with 23 additions and 11 deletions

View File

@@ -105,7 +105,7 @@ class Pickler(dill.Pickler):
except:
self.save_reduce(obj=obj, *failed_rv)
def dumps(obj, protocol=None, byref=None, fmode=None, recurse=None, **kwds):#, strictio=None):
def dumps(obj, protocol=None, byref=None, fmode=None, recurse=True, **kwds):
"""pickle an object to a string"""
protocol = dill.settings['protocol'] if protocol is None else int(protocol)
_kwds = kwds.copy()
@@ -238,7 +238,7 @@ class parpool(object):
""" Parallel processing with addition of iterations at any time and request of that result any time after that.
The target function and its argument can be changed at any time.
"""
def __init__(self, fun=None, args=None, kwargs=None, nP=0.33, bar=None, qbar=None, debug=False):
def __init__(self, fun=None, args=None, kwargs=None, nP=0.33, bar=None, qbar=None, terminator=None, debug=False):
""" fun, args, kwargs: target function and its arguments and keyword arguments.
nP: number of workers, or fraction of cpu cores to use, default: 0.33.
bar, qbar: instances of tqdm and tqdmm to use for monitoring buffer and progress. """
@@ -259,7 +259,7 @@ class parpool(object):
self.E = ctx.Event()
self.Qi = ctx.Queue(3*self.nP)
self.Qo = ctx.Queue(3*self.nP)
self.P = ctx.Pool(self.nP, self._worker(self.Qi, self.Qo, self.E, self.debug))
self.P = ctx.Pool(self.nP, self._worker(self.Qi, self.Qo, self.E, terminator, self.debug))
self.is_alive = True
self.res = {}
self.handle = 0
@@ -400,13 +400,15 @@ class parpool(object):
class _worker(object):
""" Manages executing the target function which will be executed in different processes. """
def __init__(self, Qi, Qo, E, debug=False, cachesize=48):
def __init__(self, Qi, Qo, E, terminator, debug=False, cachesize=48):
self.cache = []
self.Qi = Qi
self.Qo = Qo
self.E = E
self.terminator = dumps(terminator, recurse=True)
self.debug = debug
self.cachesize = cachesize
# print(self.terminator)
def __call__(self):
while not self.E.is_set():
@@ -424,6 +426,9 @@ class parpool(object):
self.Qo.put((False, i, format_exc()))
else:
self.Qo.put((True, i, (n, Fun, Args, Kwargs)))
terminator = dill.loads(self.terminator)
if not terminator is None:
terminator()
def get_from_cache(self, h, ser):
if len(self.cache):
@@ -436,7 +441,8 @@ class parpool(object):
self.cache.pop(0)
return obj
def pmap(fun, iterable=None, args=None, kwargs=None, length=None, desc=None, bar=True, qbar=False, nP=0.33, serial=4, debug=False):
def pmap(fun, iterable=None, args=None, kwargs=None, length=None, desc=None, bar=True, qbar=False, terminator=None,
nP=0.33, serial=4, debug=False):
""" map a function fun to each iteration in iterable
best use: iterable is a generator and length is given to this function
@@ -448,6 +454,7 @@ def pmap(fun, iterable=None, args=None, kwargs=None, length=None, desc=None, bar
desc: string with description of the progress bar
bar: bool enable progress bar
pbar: bool enable buffer indicator bar
terminator: function which is executed in each worker after all the work is done
nP: number of workers, default: number of cpu's/3
serial: switch to serial if number of tasks less than serial, default: 4
debug: if an error occurs in an iteration, return the erorr instead of retrying in the main process
@@ -463,7 +470,7 @@ def pmap(fun, iterable=None, args=None, kwargs=None, length=None, desc=None, bar
else: #parallel case
with tqdmm(total=0, desc='Task buffer', disable=not qbar, leave=False) as qbar,\
tqdm(total=length, desc=desc, disable=not bar) as bar:
with parpool(fun, args, kwargs, nP, bar, qbar, debug) as p:
with parpool(fun, args, kwargs, nP, bar, qbar, terminator, debug) as p:
length = 0
for i, j in enumerate(iterable): #add work to the queue
p[i] = j