- catch and ignore errors when emptying queue
- timeout on closing pool - close manager on closing pool
This commit is contained in:
@@ -452,12 +452,15 @@ class PoolSingleton:
|
||||
self.__class__.instance = None
|
||||
|
||||
def empty_queue(queue):
|
||||
if not queue._closed: # noqa
|
||||
while not queue.empty():
|
||||
try:
|
||||
queue.get(True, 0.02)
|
||||
except multiprocessing.queues.Empty: # noqa
|
||||
pass
|
||||
try:
|
||||
if not queue._closed: # noqa
|
||||
while not queue.empty():
|
||||
try:
|
||||
queue.get(True, 0.02)
|
||||
except multiprocessing.queues.Empty: # noqa
|
||||
pass
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
def close_queue(queue):
|
||||
empty_queue(queue)
|
||||
@@ -469,14 +472,20 @@ class PoolSingleton:
|
||||
self.is_alive = False # noqa
|
||||
self.event.set()
|
||||
self.pool.close()
|
||||
t = time()
|
||||
while self.n_workers.value: # noqa
|
||||
empty_queue(self.queue_in) # noqa
|
||||
empty_queue(self.queue_out) # noqa
|
||||
if time() - t > 10:
|
||||
warn(f'Parfor: Closing pool timed out, {self.n_workers.value} processes still alive.') # noqa
|
||||
self.pool.terminate()
|
||||
break
|
||||
empty_queue(self.queue_in) # noqa
|
||||
empty_queue(self.queue_out) # noqa
|
||||
self.pool.join()
|
||||
close_queue(self.queue_in) # noqa
|
||||
close_queue(self.queue_out) # noqa
|
||||
self.manager.shutdown()
|
||||
self.handle = 0 # noqa
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user