- make ParPoolSingleton.close a class method.
This commit is contained in:
@@ -448,45 +448,48 @@ class PoolSingleton:
|
|||||||
pool.tasks.pop(handle)
|
pool.tasks.pop(handle)
|
||||||
return handle, result
|
return handle, result
|
||||||
|
|
||||||
def close(self):
|
@classmethod
|
||||||
self.__class__.instance = None
|
def close(cls):
|
||||||
|
if hasattr(cls, 'instance') and cls.instance is not None:
|
||||||
|
instance = cls.instance
|
||||||
|
cls.instance = None
|
||||||
|
|
||||||
def empty_queue(queue):
|
def empty_queue(queue):
|
||||||
try:
|
try:
|
||||||
|
if not queue._closed: # noqa
|
||||||
|
while not queue.empty():
|
||||||
|
try:
|
||||||
|
queue.get(True, 0.02)
|
||||||
|
except multiprocessing.queues.Empty: # noqa
|
||||||
|
pass
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def close_queue(queue):
|
||||||
|
empty_queue(queue)
|
||||||
if not queue._closed: # noqa
|
if not queue._closed: # noqa
|
||||||
while not queue.empty():
|
queue.close()
|
||||||
try:
|
queue.join_thread()
|
||||||
queue.get(True, 0.02)
|
|
||||||
except multiprocessing.queues.Empty: # noqa
|
|
||||||
pass
|
|
||||||
except OSError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
def close_queue(queue):
|
if instance.is_alive:
|
||||||
empty_queue(queue)
|
instance.is_alive = False # noqa
|
||||||
if not queue._closed: # noqa
|
instance.event.set()
|
||||||
queue.close()
|
instance.pool.close()
|
||||||
queue.join_thread()
|
t = time()
|
||||||
|
while instance.n_workers.value: # noqa
|
||||||
if self.is_alive:
|
empty_queue(instance.queue_in) # noqa
|
||||||
self.is_alive = False # noqa
|
empty_queue(instance.queue_out) # noqa
|
||||||
self.event.set()
|
if time() - t > 10:
|
||||||
self.pool.close()
|
warn(f'Parfor: Closing pool timed out, {instance.n_workers.value} processes still alive.') # noqa
|
||||||
t = time()
|
instance.pool.terminate()
|
||||||
while self.n_workers.value: # noqa
|
break
|
||||||
empty_queue(self.queue_in) # noqa
|
empty_queue(instance.queue_in) # noqa
|
||||||
empty_queue(self.queue_out) # noqa
|
empty_queue(instance.queue_out) # noqa
|
||||||
if time() - t > 10:
|
instance.pool.join()
|
||||||
warn(f'Parfor: Closing pool timed out, {self.n_workers.value} processes still alive.') # noqa
|
close_queue(instance.queue_in) # noqa
|
||||||
self.pool.terminate()
|
close_queue(instance.queue_out) # noqa
|
||||||
break
|
instance.manager.shutdown()
|
||||||
empty_queue(self.queue_in) # noqa
|
instance.handle = 0 # noqa
|
||||||
empty_queue(self.queue_out) # noqa
|
|
||||||
self.pool.join()
|
|
||||||
close_queue(self.queue_in) # noqa
|
|
||||||
close_queue(self.queue_out) # noqa
|
|
||||||
self.manager.shutdown()
|
|
||||||
self.handle = 0 # noqa
|
|
||||||
|
|
||||||
|
|
||||||
class Worker:
|
class Worker:
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[tool.poetry]
|
[tool.poetry]
|
||||||
name = "parfor"
|
name = "parfor"
|
||||||
version = "2024.2.0"
|
version = "2024.3.0"
|
||||||
description = "A package to mimic the use of parfor as done in Matlab."
|
description = "A package to mimic the use of parfor as done in Matlab."
|
||||||
authors = ["Wim Pomp <wimpomp@gmail.com>"]
|
authors = ["Wim Pomp <wimpomp@gmail.com>"]
|
||||||
license = "GPLv3"
|
license = "GPLv3"
|
||||||
|
|||||||
Reference in New Issue
Block a user