- bug fix in killed process detection

This commit is contained in:
Wim Pomp
2022-03-22 15:36:07 +01:00
parent fcf451cb23
commit 865ec70d97
2 changed files with 26 additions and 17 deletions

View File

@@ -7,6 +7,7 @@ from pickle import PicklingError, dispatch_table
from psutil import Process
from collections import OrderedDict
from io import BytesIO
from warnings import warn
try:
from javabridge import kill_vm
@@ -423,6 +424,7 @@ class parpool(object):
ctx = multiprocessing.get_context('spawn')
else:
ctx = multiprocessing
self.is_started = False
self.n_tasks = ctx.Value('i', self.nP)
self.event = ctx.Event()
self.queue_in = ctx.Queue(qsize or 3 * self.nP)
@@ -453,6 +455,7 @@ class parpool(object):
for handle, task in self.tasks.items(): # retry a task if the process doing it was killed
if task.pid is not None and task.pid not in [child.pid for child in Process().children()]:
self.queue_in.put(task)
warn('Task {} was restarted because process {} was probably killed.'.format(task.handle, task.pid))
return False
def error(self, error):
@@ -460,23 +463,27 @@ class parpool(object):
raise Exception('Error occured in worker: {}'.format(error))
def task_error(self, handle, error):
task = self.tasks[handle]
print('Error from process working on iteration {}:\n'.format(handle))
print(error)
self.close()
print('Retrying in main thread...')
fun = task.fun.__name__
task()
raise Exception('Function \'{}\' cannot be executed by parfor, amend or execute in serial.'.format(fun))
if handle in self:
task = self.tasks[handle]
print('Error from process working on iteration {}:\n'.format(handle))
print(error)
self.close()
print('Retrying in main thread...')
fun = task.fun.__name__
task()
raise Exception('Function \'{}\' cannot be executed by parfor, amend or execute in serial.'.format(fun))
def done(self, task):
self.tasks[task.handle] = task
if self.bar is not None:
self.bar.update(self.bar_lengths.pop(task.handle))
self._qbar_update()
if task.handle in self: # if not, the task was restarted erroneously
self.tasks[task.handle] = task
if self.bar is not None:
self.bar.update(self.bar_lengths.pop(task.handle))
self._qbar_update()
def started(self, handle, pid):
self.tasks[handle].pid = pid
self.is_started = True
if handle in self: # if not, the task was restarted erroneously
self.tasks[handle].pid = pid
def __call__(self, n, fun=None, args=None, kwargs=None, handle=None, barlength=1):
""" Add new iteration, using optional manually defined handle."""
@@ -514,11 +521,13 @@ class parpool(object):
if handle not in self:
raise ValueError('No handle: {}'.format(handle))
while not self.tasks[handle].done:
if not self._get_from_queue() and not self.tasks[handle].done and not self.working:
if not self._get_from_queue() and not self.tasks[handle].done and self.is_started and not self.working:
for _ in range(10): # wait some time while processing possible new messages
self._get_from_queue()
if not self._get_from_queue() and not self.tasks[handle].done and not self.working:
if not self._get_from_queue() and not self.tasks[handle].done and self.is_started and not self.working:
# retry a task if the process was killed while retrieving the task
self.queue_in.put(self.tasks[handle])
warn('Task {} was restarted because the process retrieving it was probably killed.'.format(handle))
result = self.tasks[handle].result
self.tasks.pop(handle)
return result
@@ -607,8 +616,8 @@ class parpool(object):
try:
task = self.queue_in.get(True, 0.02)
try:
task.set_from_cache(self.cache)
self.add_to_queue('started', task.handle, pid)
task.set_from_cache(self.cache)
self.add_to_queue('done', task())
except Exception:
self.add_to_queue('task_error', task.handle, format_exc())