diff --git a/parfor/__init__.py b/parfor/__init__.py index 5de2abe..e4ee1cb 100644 --- a/parfor/__init__.py +++ b/parfor/__init__.py @@ -29,7 +29,7 @@ class SharedMemory(UserDict): if item_id not in self: # values will not be changed try: self.data[item_id] = False, value - except Exception: # only use our pickler when necessary + except Exception: # only use our pickler when necessary # noqa self.data[item_id] = True, dumps(value, recurse=True) if item_id in self.references: self.references[item_id] += 1 @@ -215,10 +215,10 @@ class Task: def __call__(self, shared_memory: SharedMemory): if not self.done: - fun = shared_memory[self.fun] or (lambda *args, **kwargs: None) + fun = shared_memory[self.fun] or (lambda *args, **kwargs: None) # noqa args = [shared_memory[arg] for arg in self.args] kwargs = dict([shared_memory[kwarg] for kwarg in self.kwargs]) - self.result = fun(*args, **kwargs) + self.result = fun(*args, **kwargs) # noqa self.done = True return self @@ -257,6 +257,7 @@ class ParPool: self.args = args self.kwargs = kwargs self.is_started = False + self.last_task = None def __getstate__(self): raise RuntimeError(f'Cannot pickle {self.__class__.__name__} object.') @@ -355,7 +356,7 @@ class ParPool: class PoolSingleton: def __new__(cls, *args, **kwargs): - if not hasattr(cls, 'instance') or cls.instance is None or not cls.instance.is_alive: + if not hasattr(cls, 'instance') or cls.instance is None or not cls.instance.is_alive: # noqa new = super().__new__(cls) new.n_processes = cpu_count new.instance = new @@ -373,15 +374,18 @@ class PoolSingleton: new.handle = 0 new.pools = {} cls.instance = new - return cls.instance + return cls.instance # noqa - def __init__(self, parpool=None): + def __init__(self, parpool=None): # noqa if parpool is not None: self.pools[parpool.id] = parpool def __getstate__(self): raise RuntimeError(f'Cannot pickle {self.__class__.__name__} object.') + def __del__(self): + self.close() + def remove_pool(self, pool_id): self.shared_memory.remove_pool(pool_id) if pool_id in self.pools: @@ -404,7 +408,7 @@ class PoolSingleton: elif pool_id in self.pools: getattr(self.pools[pool_id], code)(*args) return True - except multiprocessing.queues.Empty: + except multiprocessing.queues.Empty: # noqa for pool in self.pools.values(): for handle, task in pool.tasks.items(): # retry a task if the process doing it was killed if task.pid is not None \ @@ -435,33 +439,32 @@ class PoolSingleton: self.__class__.instance = None def empty_queue(queue): - if not queue._closed: + if not queue._closed: # noqa while not queue.empty(): try: queue.get(True, 0.02) - except multiprocessing.queues.Empty: + except multiprocessing.queues.Empty: # noqa pass def close_queue(queue): empty_queue(queue) - if not queue._closed: + if not queue._closed: # noqa queue.close() queue.join_thread() if self.is_alive: - self.is_alive = False + self.is_alive = False # noqa self.event.set() self.pool.close() - while self.n_workers.value: - empty_queue(self.queue_in) - empty_queue(self.queue_out) - empty_queue(self.queue_in) - empty_queue(self.queue_out) + while self.n_workers.value: # noqa + empty_queue(self.queue_in) # noqa + empty_queue(self.queue_out) # noqa + empty_queue(self.queue_in) # noqa + empty_queue(self.queue_out) # noqa self.pool.join() - close_queue(self.queue_in) - close_queue(self.queue_out) - self.handle = 0 - self.tasks = {} + close_queue(self.queue_in) # noqa + close_queue(self.queue_out) # noqa + self.handle = 0 # noqa class Worker: @@ -478,7 +481,7 @@ class Worker: try: self.queue_out.put(args, timeout=0.1) break - except multiprocessing.queues.Full: + except multiprocessing.queues.Full: # noqa continue def __call__(self): @@ -489,14 +492,15 @@ class Worker: try: self.add_to_queue('started', task.pool_id, task.handle, pid) self.add_to_queue('done', task.pool_id, task(self.shared_memory)) - except Exception: + except Exception: # noqa self.add_to_queue('task_error', task.pool_id, task.handle, format_exc()) - except multiprocessing.queues.Empty: + self.shared_memory.garbage_collect() + except (multiprocessing.queues.Empty, KeyboardInterrupt): # noqa pass - except Exception: + except Exception: # noqa self.add_to_queue('error', None, format_exc()) self.event.set() - self.shared_memory.garbage_collect() + self.shared_memory.garbage_collect() for child in multiprocessing.active_children(): child.kill() with self.n_workers.get_lock(): @@ -586,8 +590,8 @@ def pmap(fun, iterable=None, args=None, kwargs=None, total=None, desc=None, bar= iterable = Chunks(iterable, ratio=5, length=total) @wraps(fun) - def chunk_fun(iterator, *args, **kwargs): - return [fun(i, *args, **kwargs) for i in iterator] + def chunk_fun(iterator, *args, **kwargs): # noqa + return [fun(i, *args, **kwargs) for i in iterator] # noqa args = args or () kwargs = kwargs or {} @@ -602,13 +606,13 @@ def pmap(fun, iterable=None, args=None, kwargs=None, total=None, desc=None, bar= if callable(bar): return sum([chunk_fun(c, *args, **kwargs) for c in ExternalBar(iterable, bar)], []) else: - return sum([chunk_fun(c, *args, **kwargs) for c in tqdm(iterable, **bar_kwargs)], []) + return sum([chunk_fun(c, *args, **kwargs) for c in tqdm(iterable, **bar_kwargs)], []) # noqa else: # parallel case with ExitStack() as stack: if callable(bar): - bar = stack.enter_context(ExternalBar(callback=bar)) + bar = stack.enter_context(ExternalBar(callback=bar)) # noqa elif bar is True: - bar = stack.enter_context(tqdm(**bar_kwargs)) + bar = stack.enter_context(tqdm(**bar_kwargs)) # noqa with ParPool(chunk_fun, args, kwargs, bar) as p: for i, (j, l) in enumerate(zip(iterable, iterable.lengths)): # add work to the queue p(j, handle=i, barlength=iterable.lengths[i]) diff --git a/parfor/pickler.py b/parfor/pickler.py index 692e05a..6142e25 100644 --- a/parfor/pickler.py +++ b/parfor/pickler.py @@ -77,14 +77,14 @@ class Pickler(dill.Pickler): else: raise PicklingError("Can't pickle %r object: %r" % (t.__name__, obj)) - except Exception: + except Exception: # noqa rv = CouldNotBePickled.reduce(obj) # Check for string returned by reduce(), meaning "save as global" if isinstance(rv, str): try: self.save_global(obj, rv) - except Exception: + except Exception: # noqa self.save_global(obj, CouldNotBePickled.reduce(obj)) return @@ -101,7 +101,7 @@ class Pickler(dill.Pickler): # Save the reduce() output and finally memoize the object try: self.save_reduce(obj=obj, *rv) - except Exception: + except Exception: # noqa self.save_reduce(obj=obj, *CouldNotBePickled.reduce(obj)) diff --git a/pyproject.toml b/pyproject.toml index 7821594..0b6b3f5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "parfor" -version = "2023.10.1" +version = "2023.11.0" description = "A package to mimic the use of parfor as done in Matlab." authors = ["Wim Pomp "] license = "GPLv3" diff --git a/tests/test_parfor.py b/tests/test_parfor.py index ef6561a..4f648a1 100644 --- a/tests/test_parfor.py +++ b/tests/test_parfor.py @@ -46,7 +46,7 @@ def test_chunks(iterator): def test_parpool(): - def fun(i, j, k): + def fun(i, j, k): # noqa return i * j * k with ParPool(fun, (3,), {'k': 2}) as pool: