- skip KeyboardInterrupts in workers
- silence some warnings
This commit is contained in:
@@ -29,7 +29,7 @@ class SharedMemory(UserDict):
|
|||||||
if item_id not in self: # values will not be changed
|
if item_id not in self: # values will not be changed
|
||||||
try:
|
try:
|
||||||
self.data[item_id] = False, value
|
self.data[item_id] = False, value
|
||||||
except Exception: # only use our pickler when necessary
|
except Exception: # only use our pickler when necessary # noqa
|
||||||
self.data[item_id] = True, dumps(value, recurse=True)
|
self.data[item_id] = True, dumps(value, recurse=True)
|
||||||
if item_id in self.references:
|
if item_id in self.references:
|
||||||
self.references[item_id] += 1
|
self.references[item_id] += 1
|
||||||
@@ -215,10 +215,10 @@ class Task:
|
|||||||
|
|
||||||
def __call__(self, shared_memory: SharedMemory):
|
def __call__(self, shared_memory: SharedMemory):
|
||||||
if not self.done:
|
if not self.done:
|
||||||
fun = shared_memory[self.fun] or (lambda *args, **kwargs: None)
|
fun = shared_memory[self.fun] or (lambda *args, **kwargs: None) # noqa
|
||||||
args = [shared_memory[arg] for arg in self.args]
|
args = [shared_memory[arg] for arg in self.args]
|
||||||
kwargs = dict([shared_memory[kwarg] for kwarg in self.kwargs])
|
kwargs = dict([shared_memory[kwarg] for kwarg in self.kwargs])
|
||||||
self.result = fun(*args, **kwargs)
|
self.result = fun(*args, **kwargs) # noqa
|
||||||
self.done = True
|
self.done = True
|
||||||
return self
|
return self
|
||||||
|
|
||||||
@@ -257,6 +257,7 @@ class ParPool:
|
|||||||
self.args = args
|
self.args = args
|
||||||
self.kwargs = kwargs
|
self.kwargs = kwargs
|
||||||
self.is_started = False
|
self.is_started = False
|
||||||
|
self.last_task = None
|
||||||
|
|
||||||
def __getstate__(self):
|
def __getstate__(self):
|
||||||
raise RuntimeError(f'Cannot pickle {self.__class__.__name__} object.')
|
raise RuntimeError(f'Cannot pickle {self.__class__.__name__} object.')
|
||||||
@@ -355,7 +356,7 @@ class ParPool:
|
|||||||
|
|
||||||
class PoolSingleton:
|
class PoolSingleton:
|
||||||
def __new__(cls, *args, **kwargs):
|
def __new__(cls, *args, **kwargs):
|
||||||
if not hasattr(cls, 'instance') or cls.instance is None or not cls.instance.is_alive:
|
if not hasattr(cls, 'instance') or cls.instance is None or not cls.instance.is_alive: # noqa
|
||||||
new = super().__new__(cls)
|
new = super().__new__(cls)
|
||||||
new.n_processes = cpu_count
|
new.n_processes = cpu_count
|
||||||
new.instance = new
|
new.instance = new
|
||||||
@@ -373,15 +374,18 @@ class PoolSingleton:
|
|||||||
new.handle = 0
|
new.handle = 0
|
||||||
new.pools = {}
|
new.pools = {}
|
||||||
cls.instance = new
|
cls.instance = new
|
||||||
return cls.instance
|
return cls.instance # noqa
|
||||||
|
|
||||||
def __init__(self, parpool=None):
|
def __init__(self, parpool=None): # noqa
|
||||||
if parpool is not None:
|
if parpool is not None:
|
||||||
self.pools[parpool.id] = parpool
|
self.pools[parpool.id] = parpool
|
||||||
|
|
||||||
def __getstate__(self):
|
def __getstate__(self):
|
||||||
raise RuntimeError(f'Cannot pickle {self.__class__.__name__} object.')
|
raise RuntimeError(f'Cannot pickle {self.__class__.__name__} object.')
|
||||||
|
|
||||||
|
def __del__(self):
|
||||||
|
self.close()
|
||||||
|
|
||||||
def remove_pool(self, pool_id):
|
def remove_pool(self, pool_id):
|
||||||
self.shared_memory.remove_pool(pool_id)
|
self.shared_memory.remove_pool(pool_id)
|
||||||
if pool_id in self.pools:
|
if pool_id in self.pools:
|
||||||
@@ -404,7 +408,7 @@ class PoolSingleton:
|
|||||||
elif pool_id in self.pools:
|
elif pool_id in self.pools:
|
||||||
getattr(self.pools[pool_id], code)(*args)
|
getattr(self.pools[pool_id], code)(*args)
|
||||||
return True
|
return True
|
||||||
except multiprocessing.queues.Empty:
|
except multiprocessing.queues.Empty: # noqa
|
||||||
for pool in self.pools.values():
|
for pool in self.pools.values():
|
||||||
for handle, task in pool.tasks.items(): # retry a task if the process doing it was killed
|
for handle, task in pool.tasks.items(): # retry a task if the process doing it was killed
|
||||||
if task.pid is not None \
|
if task.pid is not None \
|
||||||
@@ -435,33 +439,32 @@ class PoolSingleton:
|
|||||||
self.__class__.instance = None
|
self.__class__.instance = None
|
||||||
|
|
||||||
def empty_queue(queue):
|
def empty_queue(queue):
|
||||||
if not queue._closed:
|
if not queue._closed: # noqa
|
||||||
while not queue.empty():
|
while not queue.empty():
|
||||||
try:
|
try:
|
||||||
queue.get(True, 0.02)
|
queue.get(True, 0.02)
|
||||||
except multiprocessing.queues.Empty:
|
except multiprocessing.queues.Empty: # noqa
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def close_queue(queue):
|
def close_queue(queue):
|
||||||
empty_queue(queue)
|
empty_queue(queue)
|
||||||
if not queue._closed:
|
if not queue._closed: # noqa
|
||||||
queue.close()
|
queue.close()
|
||||||
queue.join_thread()
|
queue.join_thread()
|
||||||
|
|
||||||
if self.is_alive:
|
if self.is_alive:
|
||||||
self.is_alive = False
|
self.is_alive = False # noqa
|
||||||
self.event.set()
|
self.event.set()
|
||||||
self.pool.close()
|
self.pool.close()
|
||||||
while self.n_workers.value:
|
while self.n_workers.value: # noqa
|
||||||
empty_queue(self.queue_in)
|
empty_queue(self.queue_in) # noqa
|
||||||
empty_queue(self.queue_out)
|
empty_queue(self.queue_out) # noqa
|
||||||
empty_queue(self.queue_in)
|
empty_queue(self.queue_in) # noqa
|
||||||
empty_queue(self.queue_out)
|
empty_queue(self.queue_out) # noqa
|
||||||
self.pool.join()
|
self.pool.join()
|
||||||
close_queue(self.queue_in)
|
close_queue(self.queue_in) # noqa
|
||||||
close_queue(self.queue_out)
|
close_queue(self.queue_out) # noqa
|
||||||
self.handle = 0
|
self.handle = 0 # noqa
|
||||||
self.tasks = {}
|
|
||||||
|
|
||||||
|
|
||||||
class Worker:
|
class Worker:
|
||||||
@@ -478,7 +481,7 @@ class Worker:
|
|||||||
try:
|
try:
|
||||||
self.queue_out.put(args, timeout=0.1)
|
self.queue_out.put(args, timeout=0.1)
|
||||||
break
|
break
|
||||||
except multiprocessing.queues.Full:
|
except multiprocessing.queues.Full: # noqa
|
||||||
continue
|
continue
|
||||||
|
|
||||||
def __call__(self):
|
def __call__(self):
|
||||||
@@ -489,14 +492,15 @@ class Worker:
|
|||||||
try:
|
try:
|
||||||
self.add_to_queue('started', task.pool_id, task.handle, pid)
|
self.add_to_queue('started', task.pool_id, task.handle, pid)
|
||||||
self.add_to_queue('done', task.pool_id, task(self.shared_memory))
|
self.add_to_queue('done', task.pool_id, task(self.shared_memory))
|
||||||
except Exception:
|
except Exception: # noqa
|
||||||
self.add_to_queue('task_error', task.pool_id, task.handle, format_exc())
|
self.add_to_queue('task_error', task.pool_id, task.handle, format_exc())
|
||||||
except multiprocessing.queues.Empty:
|
self.shared_memory.garbage_collect()
|
||||||
|
except (multiprocessing.queues.Empty, KeyboardInterrupt): # noqa
|
||||||
pass
|
pass
|
||||||
except Exception:
|
except Exception: # noqa
|
||||||
self.add_to_queue('error', None, format_exc())
|
self.add_to_queue('error', None, format_exc())
|
||||||
self.event.set()
|
self.event.set()
|
||||||
self.shared_memory.garbage_collect()
|
self.shared_memory.garbage_collect()
|
||||||
for child in multiprocessing.active_children():
|
for child in multiprocessing.active_children():
|
||||||
child.kill()
|
child.kill()
|
||||||
with self.n_workers.get_lock():
|
with self.n_workers.get_lock():
|
||||||
@@ -586,8 +590,8 @@ def pmap(fun, iterable=None, args=None, kwargs=None, total=None, desc=None, bar=
|
|||||||
iterable = Chunks(iterable, ratio=5, length=total)
|
iterable = Chunks(iterable, ratio=5, length=total)
|
||||||
|
|
||||||
@wraps(fun)
|
@wraps(fun)
|
||||||
def chunk_fun(iterator, *args, **kwargs):
|
def chunk_fun(iterator, *args, **kwargs): # noqa
|
||||||
return [fun(i, *args, **kwargs) for i in iterator]
|
return [fun(i, *args, **kwargs) for i in iterator] # noqa
|
||||||
|
|
||||||
args = args or ()
|
args = args or ()
|
||||||
kwargs = kwargs or {}
|
kwargs = kwargs or {}
|
||||||
@@ -602,13 +606,13 @@ def pmap(fun, iterable=None, args=None, kwargs=None, total=None, desc=None, bar=
|
|||||||
if callable(bar):
|
if callable(bar):
|
||||||
return sum([chunk_fun(c, *args, **kwargs) for c in ExternalBar(iterable, bar)], [])
|
return sum([chunk_fun(c, *args, **kwargs) for c in ExternalBar(iterable, bar)], [])
|
||||||
else:
|
else:
|
||||||
return sum([chunk_fun(c, *args, **kwargs) for c in tqdm(iterable, **bar_kwargs)], [])
|
return sum([chunk_fun(c, *args, **kwargs) for c in tqdm(iterable, **bar_kwargs)], []) # noqa
|
||||||
else: # parallel case
|
else: # parallel case
|
||||||
with ExitStack() as stack:
|
with ExitStack() as stack:
|
||||||
if callable(bar):
|
if callable(bar):
|
||||||
bar = stack.enter_context(ExternalBar(callback=bar))
|
bar = stack.enter_context(ExternalBar(callback=bar)) # noqa
|
||||||
elif bar is True:
|
elif bar is True:
|
||||||
bar = stack.enter_context(tqdm(**bar_kwargs))
|
bar = stack.enter_context(tqdm(**bar_kwargs)) # noqa
|
||||||
with ParPool(chunk_fun, args, kwargs, bar) as p:
|
with ParPool(chunk_fun, args, kwargs, bar) as p:
|
||||||
for i, (j, l) in enumerate(zip(iterable, iterable.lengths)): # add work to the queue
|
for i, (j, l) in enumerate(zip(iterable, iterable.lengths)): # add work to the queue
|
||||||
p(j, handle=i, barlength=iterable.lengths[i])
|
p(j, handle=i, barlength=iterable.lengths[i])
|
||||||
|
|||||||
@@ -77,14 +77,14 @@ class Pickler(dill.Pickler):
|
|||||||
else:
|
else:
|
||||||
raise PicklingError("Can't pickle %r object: %r" %
|
raise PicklingError("Can't pickle %r object: %r" %
|
||||||
(t.__name__, obj))
|
(t.__name__, obj))
|
||||||
except Exception:
|
except Exception: # noqa
|
||||||
rv = CouldNotBePickled.reduce(obj)
|
rv = CouldNotBePickled.reduce(obj)
|
||||||
|
|
||||||
# Check for string returned by reduce(), meaning "save as global"
|
# Check for string returned by reduce(), meaning "save as global"
|
||||||
if isinstance(rv, str):
|
if isinstance(rv, str):
|
||||||
try:
|
try:
|
||||||
self.save_global(obj, rv)
|
self.save_global(obj, rv)
|
||||||
except Exception:
|
except Exception: # noqa
|
||||||
self.save_global(obj, CouldNotBePickled.reduce(obj))
|
self.save_global(obj, CouldNotBePickled.reduce(obj))
|
||||||
return
|
return
|
||||||
|
|
||||||
@@ -101,7 +101,7 @@ class Pickler(dill.Pickler):
|
|||||||
# Save the reduce() output and finally memoize the object
|
# Save the reduce() output and finally memoize the object
|
||||||
try:
|
try:
|
||||||
self.save_reduce(obj=obj, *rv)
|
self.save_reduce(obj=obj, *rv)
|
||||||
except Exception:
|
except Exception: # noqa
|
||||||
self.save_reduce(obj=obj, *CouldNotBePickled.reduce(obj))
|
self.save_reduce(obj=obj, *CouldNotBePickled.reduce(obj))
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[tool.poetry]
|
[tool.poetry]
|
||||||
name = "parfor"
|
name = "parfor"
|
||||||
version = "2023.10.1"
|
version = "2023.11.0"
|
||||||
description = "A package to mimic the use of parfor as done in Matlab."
|
description = "A package to mimic the use of parfor as done in Matlab."
|
||||||
authors = ["Wim Pomp <wimpomp@gmail.com>"]
|
authors = ["Wim Pomp <wimpomp@gmail.com>"]
|
||||||
license = "GPLv3"
|
license = "GPLv3"
|
||||||
|
|||||||
@@ -46,7 +46,7 @@ def test_chunks(iterator):
|
|||||||
|
|
||||||
|
|
||||||
def test_parpool():
|
def test_parpool():
|
||||||
def fun(i, j, k):
|
def fun(i, j, k): # noqa
|
||||||
return i * j * k
|
return i * j * k
|
||||||
|
|
||||||
with ParPool(fun, (3,), {'k': 2}) as pool:
|
with ParPool(fun, (3,), {'k': 2}) as pool:
|
||||||
|
|||||||
Reference in New Issue
Block a user