- Better error handling

This commit is contained in:
Wim Pomp
2021-03-05 22:27:00 +01:00
parent 61f1429543
commit 6cce9e9b8e
2 changed files with 15 additions and 6 deletions

View File

@@ -311,7 +311,9 @@ class parpool(object):
""" Get an item from the queue and store it. """
try:
r = self.Qo.get(True, 0.02)
if r[0]:
if r[0] is None:
self.res[r[1]] = dill.loads(r[2])
elif r[0] is False:
pfmt = warnings.formatwarning
warnings.formatwarning = lambda message, *args: '{}\n'.format(message)
warnings.warn(
@@ -321,7 +323,14 @@ class parpool(object):
warnings.formatwarning = pfmt
fun, args, kwargs = [dill.loads(f[1]) for f in r[2][1:]]
r = (False, r[1], fun(dill.loads(r[2][0]), *args, **kwargs))
self.res[r[1]] = dill.loads(r[2])
self.res[r[1]] = r[2]
else:
err = dill.loads(r[2])
pfmt = warnings.formatwarning
warnings.formatwarning = lambda message, *args: '{}\n'.format(message)
warnings.warn('Warning, error occurred in iteration {}:\n{}'.format(r[1], err))
warnings.formatwarning = pfmt
self.res[r[1]] = err
if not self.bar is None:
self.bar.update()
self._qbar_update()
@@ -426,14 +435,14 @@ class parpool(object):
fun = self.get_from_cache(*Fun)
args = self.get_from_cache(*Args)
kwargs = self.get_from_cache(*Kwargs)
self.Qo.put((False, i, dumps(fun(dill.loads(n), *args, **kwargs), recurse=True)))
self.Qo.put((None, i, dumps(fun(dill.loads(n), *args, **kwargs), recurse=True)))
except multiprocessing.queues.Empty:
continue
except:
if self.debug:
self.Qo.put((False, i, format_exc()))
self.Qo.put((True, i, dumps(format_exc(), recurse=True)))
else:
self.Qo.put((True, i, (n, Fun, Args, Kwargs)))
self.Qo.put((False, i, (n, Fun, Args, Kwargs)))
terminator = dill.loads(self.terminator)
if not terminator is None:
terminator()