diff --git a/parfor/__init__.py b/parfor/__init__.py index f303087..9f753cc 100644 --- a/parfor/__init__.py +++ b/parfor/__init__.py @@ -7,6 +7,7 @@ from pickle import PicklingError, dispatch_table from psutil import Process from collections import OrderedDict from io import BytesIO +from warnings import warn try: from javabridge import kill_vm @@ -423,6 +424,7 @@ class parpool(object): ctx = multiprocessing.get_context('spawn') else: ctx = multiprocessing + self.is_started = False self.n_tasks = ctx.Value('i', self.nP) self.event = ctx.Event() self.queue_in = ctx.Queue(qsize or 3 * self.nP) @@ -453,6 +455,7 @@ class parpool(object): for handle, task in self.tasks.items(): # retry a task if the process doing it was killed if task.pid is not None and task.pid not in [child.pid for child in Process().children()]: self.queue_in.put(task) + warn('Task {} was restarted because process {} was probably killed.'.format(task.handle, task.pid)) return False def error(self, error): @@ -460,23 +463,27 @@ class parpool(object): raise Exception('Error occured in worker: {}'.format(error)) def task_error(self, handle, error): - task = self.tasks[handle] - print('Error from process working on iteration {}:\n'.format(handle)) - print(error) - self.close() - print('Retrying in main thread...') - fun = task.fun.__name__ - task() - raise Exception('Function \'{}\' cannot be executed by parfor, amend or execute in serial.'.format(fun)) + if handle in self: + task = self.tasks[handle] + print('Error from process working on iteration {}:\n'.format(handle)) + print(error) + self.close() + print('Retrying in main thread...') + fun = task.fun.__name__ + task() + raise Exception('Function \'{}\' cannot be executed by parfor, amend or execute in serial.'.format(fun)) def done(self, task): - self.tasks[task.handle] = task - if self.bar is not None: - self.bar.update(self.bar_lengths.pop(task.handle)) - self._qbar_update() + if task.handle in self: # if not, the task was restarted erroneously + self.tasks[task.handle] = task + if self.bar is not None: + self.bar.update(self.bar_lengths.pop(task.handle)) + self._qbar_update() def started(self, handle, pid): - self.tasks[handle].pid = pid + self.is_started = True + if handle in self: # if not, the task was restarted erroneously + self.tasks[handle].pid = pid def __call__(self, n, fun=None, args=None, kwargs=None, handle=None, barlength=1): """ Add new iteration, using optional manually defined handle.""" @@ -514,11 +521,13 @@ class parpool(object): if handle not in self: raise ValueError('No handle: {}'.format(handle)) while not self.tasks[handle].done: - if not self._get_from_queue() and not self.tasks[handle].done and not self.working: + if not self._get_from_queue() and not self.tasks[handle].done and self.is_started and not self.working: for _ in range(10): # wait some time while processing possible new messages self._get_from_queue() - if not self._get_from_queue() and not self.tasks[handle].done and not self.working: + if not self._get_from_queue() and not self.tasks[handle].done and self.is_started and not self.working: + # retry a task if the process was killed while retrieving the task self.queue_in.put(self.tasks[handle]) + warn('Task {} was restarted because the process retrieving it was probably killed.'.format(handle)) result = self.tasks[handle].result self.tasks.pop(handle) return result @@ -607,8 +616,8 @@ class parpool(object): try: task = self.queue_in.get(True, 0.02) try: - task.set_from_cache(self.cache) self.add_to_queue('started', task.handle, pid) + task.set_from_cache(self.cache) self.add_to_queue('done', task()) except Exception: self.add_to_queue('task_error', task.handle, format_exc()) diff --git a/setup.py b/setup.py index f918075..e244632 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ with open("README.md", "r") as fh: setuptools.setup( name="parfor", - version="2022.3.0", + version="2022.3.1", author="Wim Pomp", author_email="wimpomp@gmail.com", description="A package to mimic the use of parfor as done in Matlab.",