- Make progress bar update with chunksize when using chunks.

- Catch exceptions, but not keyboard interrupts.
This commit is contained in:
Wim Pomp
2021-05-29 11:09:54 +02:00
parent 9caeb06c2b
commit bc2ae2ea25
2 changed files with 24 additions and 14 deletions

View File

@@ -78,14 +78,14 @@ class Pickler(dill.Pickler):
else: else:
raise PicklingError("Can't pickle %r object: %r" % raise PicklingError("Can't pickle %r object: %r" %
(t.__name__, obj)) (t.__name__, obj))
except: except Exception:
rv = failed_rv rv = failed_rv
# Check for string returned by reduce(), meaning "save as global" # Check for string returned by reduce(), meaning "save as global"
if isinstance(rv, str): if isinstance(rv, str):
try: try:
self.save_global(obj, rv) self.save_global(obj, rv)
except: except Exception:
self.save_global(obj, failed_rv) self.save_global(obj, failed_rv)
return return
@@ -102,7 +102,7 @@ class Pickler(dill.Pickler):
# Save the reduce() output and finally memoize the object # Save the reduce() output and finally memoize the object
try: try:
self.save_reduce(obj=obj, *rv) self.save_reduce(obj=obj, *rv)
except: except Exception:
self.save_reduce(obj=obj, *failed_rv) self.save_reduce(obj=obj, *failed_rv)
@@ -143,15 +143,16 @@ class chunks():
self.args = args self.args = args
self.A = len(args) == 1 self.A = len(args) == 1
self.N = N self.N = N
self.n = max(1, min(N, n)) self.len = max(1, min(N, n))
self.lengths = [((i + 1) * self.N // self.len) - (i * self.N // self.len) for i in range(self.len)]
def __iter__(self): def __iter__(self):
for i in range(self.n): for i in range(self.len):
p, q = (i * self.N // self.n), ((i + 1) * self.N // self.n) p, q = (i * self.N // self.len), ((i + 1) * self.N // self.len)
yield self.args[0][p:q] if self.A else [a[p:q] for a in self.args] yield self.args[0][p:q] if self.A else [a[p:q] for a in self.args]
def __len__(self): def __len__(self):
return self.n return self.len
class tqdmm(tqdm): class tqdmm(tqdm):
@@ -299,6 +300,7 @@ class parpool(object):
self.handle = 0 self.handle = 0
self.handles = [] self.handles = []
self.bar = bar self.bar = bar
self.barlengths = {}
self.qbar = qbar self.qbar = qbar
if self.qbar is not None: if self.qbar is not None:
self.qbar.total = 3*self.nP self.qbar.total = 3*self.nP
@@ -352,12 +354,12 @@ class parpool(object):
raise Exception('Function \'{}\' cannot be executed by parfor, amend or execute in serial.' raise Exception('Function \'{}\' cannot be executed by parfor, amend or execute in serial.'
.format(fun.__name__)) .format(fun.__name__))
if self.bar is not None: if self.bar is not None:
self.bar.update() self.bar.update(self.barlengths.pop(i))
self._qbar_update() self._qbar_update()
except multiprocessing.queues.Empty: except multiprocessing.queues.Empty:
pass pass
def __call__(self, n, fun=None, args=None, kwargs=None, handle=None): def __call__(self, n, fun=None, args=None, kwargs=None, handle=None, barlength=1):
""" Add new iteration, using optional manually defined handle.""" """ Add new iteration, using optional manually defined handle."""
if self.is_alive and not self.E.is_set(): if self.is_alive and not self.E.is_set():
n = dumps(n, recurse=True) n = dumps(n, recurse=True)
@@ -374,18 +376,20 @@ class parpool(object):
self.handle += 1 self.handle += 1
self.handles.append(handle) self.handles.append(handle)
self.Qi.put((handle, n, self.fun, self.args, self.kwargs)) self.Qi.put((handle, n, self.fun, self.args, self.kwargs))
self.barlengths[handle] = barlength
self._qbar_update() self._qbar_update()
return handle return handle
elif handle not in self: elif handle not in self:
self.handles.append(handle) self.handles.append(handle)
self.Qi.put((handle, n, self.fun, self.args, self.kwargs)) self.Qi.put((handle, n, self.fun, self.args, self.kwargs))
self.barlengths[handle] = barlength
self._qbar_update() self._qbar_update()
def _qbar_update(self): def _qbar_update(self):
if self.qbar is not None: if self.qbar is not None:
try: try:
self.qbar.n = self.Qi.qsize() self.qbar.n = self.Qi.qsize()
except: except Exception:
pass pass
def __setitem__(self, handle, n): def __setitem__(self, handle, n):
@@ -488,7 +492,7 @@ class parpool(object):
self.add_to_q((False, i, dumps(fun(dill.loads(n), *args, **kwargs), recurse=True))) self.add_to_q((False, i, dumps(fun(dill.loads(n), *args, **kwargs), recurse=True)))
except multiprocessing.queues.Empty: except multiprocessing.queues.Empty:
continue continue
except: except Exception:
self.add_to_q((True, i, (n, Fun[1], Args[1], Kwargs[1], dumps(format_exc(), recurse=True)))) self.add_to_q((True, i, (n, Fun[1], Args[1], Kwargs[1], dumps(format_exc(), recurse=True))))
self.E.set() self.E.set()
terminator = dill.loads(self.terminator) terminator = dill.loads(self.terminator)
@@ -531,16 +535,22 @@ def pmap(fun, iterable=None, args=None, kwargs=None, length=None, desc=None, bar
kwargs = kwargs or {} kwargs = kwargs or {}
try: try:
length = len(iterable) length = len(iterable)
except: except Exception:
pass pass
if length and length < serial: # serial case if length and length < serial: # serial case
return [fun(c, *args, **kwargs) for c in tqdm(iterable, total=length, desc=desc, disable=not bar)] return [fun(c, *args, **kwargs) for c in tqdm(iterable, total=length, desc=desc, disable=not bar)]
else: # parallel case else: # parallel case
chunk = isinstance(iterable, chunks)
if chunk:
length = iterable.N
with tqdmm(total=0, desc='Task buffer', disable=not qbar, leave=False) as qbar,\ with tqdmm(total=0, desc='Task buffer', disable=not qbar, leave=False) as qbar,\
tqdm(total=length, desc=desc, disable=not bar) as bar: tqdm(total=length, desc=desc, disable=not bar) as bar:
with parpool(fun, args, kwargs, rP, nP, bar, qbar, terminator) as p: with parpool(fun, args, kwargs, rP, nP, bar, qbar, terminator) as p:
length = 0 length = 0
for i, j in enumerate(iterable): # add work to the queue for i, j in enumerate(iterable): # add work to the queue
if chunk:
p(j, handle=i, barlength=iterable.lengths[i])
else:
p[i] = j p[i] = j
if bar.total is None or bar.total < i+1: if bar.total is None or bar.total < i+1:
bar.total = i+1 bar.total = i+1

View File

@@ -5,7 +5,7 @@ with open("README.md", "r") as fh:
setuptools.setup( setuptools.setup(
name="parfor", name="parfor",
version="2021.5.2", version="2021.5.3",
author="Wim Pomp", author="Wim Pomp",
author_email="wimpomp@gmail.com", author_email="wimpomp@gmail.com",
description="A package to mimic the use of parfor as done in Matlab.", description="A package to mimic the use of parfor as done in Matlab.",