From bc2ae2ea25173cdcee4d5d91c74b99e574a345df Mon Sep 17 00:00:00 2001 From: Wim Pomp Date: Sat, 29 May 2021 11:09:54 +0200 Subject: [PATCH] - Make progress bar update with chunksize when using chunks. - Catch exceptions, but not keyboard interrupts. --- parfor/__init__.py | 36 +++++++++++++++++++++++------------- setup.py | 2 +- 2 files changed, 24 insertions(+), 14 deletions(-) diff --git a/parfor/__init__.py b/parfor/__init__.py index 0e90c97..c52ad2a 100644 --- a/parfor/__init__.py +++ b/parfor/__init__.py @@ -78,14 +78,14 @@ class Pickler(dill.Pickler): else: raise PicklingError("Can't pickle %r object: %r" % (t.__name__, obj)) - except: + except Exception: rv = failed_rv # Check for string returned by reduce(), meaning "save as global" if isinstance(rv, str): try: self.save_global(obj, rv) - except: + except Exception: self.save_global(obj, failed_rv) return @@ -102,7 +102,7 @@ class Pickler(dill.Pickler): # Save the reduce() output and finally memoize the object try: self.save_reduce(obj=obj, *rv) - except: + except Exception: self.save_reduce(obj=obj, *failed_rv) @@ -143,15 +143,16 @@ class chunks(): self.args = args self.A = len(args) == 1 self.N = N - self.n = max(1, min(N, n)) + self.len = max(1, min(N, n)) + self.lengths = [((i + 1) * self.N // self.len) - (i * self.N // self.len) for i in range(self.len)] def __iter__(self): - for i in range(self.n): - p, q = (i * self.N // self.n), ((i + 1) * self.N // self.n) + for i in range(self.len): + p, q = (i * self.N // self.len), ((i + 1) * self.N // self.len) yield self.args[0][p:q] if self.A else [a[p:q] for a in self.args] def __len__(self): - return self.n + return self.len class tqdmm(tqdm): @@ -299,6 +300,7 @@ class parpool(object): self.handle = 0 self.handles = [] self.bar = bar + self.barlengths = {} self.qbar = qbar if self.qbar is not None: self.qbar.total = 3*self.nP @@ -352,12 +354,12 @@ class parpool(object): raise Exception('Function \'{}\' cannot be executed by parfor, amend or execute in serial.' .format(fun.__name__)) if self.bar is not None: - self.bar.update() + self.bar.update(self.barlengths.pop(i)) self._qbar_update() except multiprocessing.queues.Empty: pass - def __call__(self, n, fun=None, args=None, kwargs=None, handle=None): + def __call__(self, n, fun=None, args=None, kwargs=None, handle=None, barlength=1): """ Add new iteration, using optional manually defined handle.""" if self.is_alive and not self.E.is_set(): n = dumps(n, recurse=True) @@ -374,18 +376,20 @@ class parpool(object): self.handle += 1 self.handles.append(handle) self.Qi.put((handle, n, self.fun, self.args, self.kwargs)) + self.barlengths[handle] = barlength self._qbar_update() return handle elif handle not in self: self.handles.append(handle) self.Qi.put((handle, n, self.fun, self.args, self.kwargs)) + self.barlengths[handle] = barlength self._qbar_update() def _qbar_update(self): if self.qbar is not None: try: self.qbar.n = self.Qi.qsize() - except: + except Exception: pass def __setitem__(self, handle, n): @@ -488,7 +492,7 @@ class parpool(object): self.add_to_q((False, i, dumps(fun(dill.loads(n), *args, **kwargs), recurse=True))) except multiprocessing.queues.Empty: continue - except: + except Exception: self.add_to_q((True, i, (n, Fun[1], Args[1], Kwargs[1], dumps(format_exc(), recurse=True)))) self.E.set() terminator = dill.loads(self.terminator) @@ -531,17 +535,23 @@ def pmap(fun, iterable=None, args=None, kwargs=None, length=None, desc=None, bar kwargs = kwargs or {} try: length = len(iterable) - except: + except Exception: pass if length and length < serial: # serial case return [fun(c, *args, **kwargs) for c in tqdm(iterable, total=length, desc=desc, disable=not bar)] else: # parallel case + chunk = isinstance(iterable, chunks) + if chunk: + length = iterable.N with tqdmm(total=0, desc='Task buffer', disable=not qbar, leave=False) as qbar,\ tqdm(total=length, desc=desc, disable=not bar) as bar: with parpool(fun, args, kwargs, rP, nP, bar, qbar, terminator) as p: length = 0 for i, j in enumerate(iterable): # add work to the queue - p[i] = j + if chunk: + p(j, handle=i, barlength=iterable.lengths[i]) + else: + p[i] = j if bar.total is None or bar.total < i+1: bar.total = i+1 length += 1 diff --git a/setup.py b/setup.py index fbefeaf..5e939c1 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ with open("README.md", "r") as fh: setuptools.setup( name="parfor", - version="2021.5.2", + version="2021.5.3", author="Wim Pomp", author_email="wimpomp@gmail.com", description="A package to mimic the use of parfor as done in Matlab.",