From f3302f6dbadfa8a8986bff5758da61ce77a49909 Mon Sep 17 00:00:00 2001 From: Wim Pomp Date: Fri, 8 Sep 2023 17:19:35 +0200 Subject: [PATCH] - Use a singleton pool to prevent lengthy restarts of the pool, this also means that arguments for pool size have gone. - Removed qbar and TqdmMeter. - Wrap chunked function for better error messages. --- .github/workflows/pytest.yml | 22 ++ README.md | 11 +- parfor/__init__.py | 447 +++++++++++++++++------------------ parfor/pickler.py | 4 +- pyproject.toml | 4 +- tests/test_parfor.py | 4 +- 6 files changed, 246 insertions(+), 246 deletions(-) create mode 100644 .github/workflows/pytest.yml diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml new file mode 100644 index 0000000..1311849 --- /dev/null +++ b/.github/workflows/pytest.yml @@ -0,0 +1,22 @@ +name: PyTest + +on: [push, pull_request] + +jobs: + pytest: + runs-on: ${{ matrix.os }} + strategy: + matrix: + python-version: ["3.10"] + os: [ubuntu-20.04, windows-2019, macOS-11] + + steps: + - uses: actions/checkout@v3 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + - name: Install + run: pip install .[test] + - name: Test with pytest + run: pytest \ No newline at end of file diff --git a/README.md b/README.md index b556f9a..c229d4e 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,12 @@ +[![pytest](https://github.com/wimpomp/parfor/actions/workflows/pytest.yml/badge.svg)](https://github.com/wimpomp/parfor/actions/workflows/pytest.yml) + # Parfor Used to parallelize for-loops using parfor in Matlab? This package allows you to do the same in python. Take any normal serial but parallelizable for-loop and execute it in parallel using easy syntax. Don't worry about the technical details of using the multiprocessing module, race conditions, queues, parfor handles all that. -Tested on linux on python 3.8 and 3.10 and on Windows and OSX on python 3.8. +Tested on linux, Windows and OSX with python 3.10. ## Why is parfor better than just using multiprocessing? - Easy to use @@ -27,8 +29,6 @@ of objects that cannot be used. They can be used however, for the iterator argum iterations need to be dillable. You might be able to make objects dillable anyhow using `dill.register` or with `__reduce__`, `__getstate__`, etc. -On OSX the buffer bar does not work due to limitations of the OS. - ## Arguments ### Required: fun: function taking arguments: iteration from iterable, other arguments defined in args & kwargs @@ -154,10 +154,7 @@ The function parfor decorates, use it like `map`. ## `Chunks` Split a long iterator in bite-sized chunks to parallelize -## `Parpool` +## `ParPool` More low-level accessibility to parallel execution. Submit tasks and request the result at any time, (although necessarily submit first, then request a specific task), use different functions and function arguments for different tasks. - -## `TqdmMeter` -Meter bar, inherited from tqdm, used for displaying buffers. diff --git a/parfor/__init__.py b/parfor/__init__.py index 8185dc4..5b56657 100644 --- a/parfor/__init__.py +++ b/parfor/__init__.py @@ -1,19 +1,15 @@ import multiprocessing -from os import getpid -from tqdm.auto import tqdm -from traceback import format_exc from collections import OrderedDict -from warnings import warn +from contextlib import ExitStack from functools import wraps +from os import getpid +from traceback import format_exc +from warnings import warn + +from tqdm.auto import tqdm + from .pickler import Pickler, dumps, loads - -try: - from javabridge import kill_vm -except ImportError: - kill_vm = lambda: None - - cpu_count = int(multiprocessing.cpu_count()) @@ -110,43 +106,6 @@ class ExternalBar: self.callback(n) -class TqdmMeter(tqdm): - """ Overload tqdm to make a special version of tqdm functioning as a meter. """ - - def __init__(self, iterable=None, desc=None, total=None, *args, **kwargs): - self._n = 0 - self._total = total - self.disable = False - if 'bar_format' not in kwargs and len(args) < 16: - kwargs['bar_format'] = '{desc}{bar}{n}/{total}' - super().__init__(iterable, desc, total, *args, **kwargs) - - @property - def n(self): - return self._n - - @n.setter - def n(self, value): - if not value == self.n: - self._n = int(value) - self.refresh() - - @property - def total(self): - return self._total - - @total.setter - def total(self, value): - self._total = value - if hasattr(self, 'container'): - self.container.children[1].max = value - - def __exit__(self, exc_type=None, exc_value=None, traceback=None): - if not self.leave: - self.n = self.total - super().__exit__(exc_type, exc_value, traceback) - - class Hasher: def __init__(self, obj, hsh=None): if hsh is not None: @@ -207,7 +166,8 @@ class Task: args = HashDescriptor() kwargs = HashDescriptor() - def __init__(self, fun=None, args=None, kwargs=None, handle=None, n=None, done=False, result=None): + def __init__(self, pool_id, fun=None, args=None, kwargs=None, handle=None, n=None, done=False, result=None): + self.pool_id = pool_id self.fun = fun or (lambda *args, **kwargs: None) self.args = args or () self.kwargs = kwargs or {} @@ -219,10 +179,11 @@ class Task: def __reduce__(self): if self.done: - return self.__class__, (None, None, None, self.handle, None, self.done, dumps(self.result, recurse=True)) + return self.__class__, (self.pool_id, None, None, None, self.handle, None, self.done, + dumps(self.result, recurse=True)) else: - return self.__class__, (self._fun, self._args, self._kwargs, self.handle, dumps(self.n, recurse=True), - self.done) + return self.__class__, (self.pool_id, self._fun, self._args, self._kwargs, self.handle, + dumps(self.n, recurse=True), self.done) def set_from_cache(self, cache=None): self.n = loads(self.n) @@ -238,9 +199,9 @@ class Task: def __repr__(self): if self.done: - return 'Task {}, result: {}'.format(self.handle, self.result) + return f'Task {self.handle}, result: {self.result}' else: - return 'Task {}'.format(self.handle) + return f'Task {self.handle}' class Context(multiprocessing.context.SpawnContext): @@ -255,38 +216,23 @@ class Context(multiprocessing.context.SpawnContext): pass -class Parpool: +class ParPool: """ Parallel processing with addition of iterations at any time and request of that result any time after that. The target function and its argument can be changed at any time. """ - def __init__(self, fun=None, args=None, kwargs=None, rP=None, nP=None, bar=None, qbar=None, qsize=None): - """ fun, args, kwargs: target function and its arguments and keyword arguments - rP: ratio workers to cpu cores, default: 1 - nP: number of workers, default, None, overrides rP if not None - bar, qbar: instances of tqdm and tqdmm to use for monitoring buffer and progress """ - if rP is None and nP is None: - self.nP = cpu_count - elif nP is None: - self.nP = int(round(rP * cpu_count)) - else: - self.nP = int(nP) - self.nP = max(self.nP, 2) - self.task = Task(fun, args, kwargs) - self.is_started = False - ctx = Context() - self.n_tasks = ctx.Value('i', self.nP) - self.event = ctx.Event() - self.queue_in = ctx.Queue(qsize or 3 * self.nP) - self.queue_out = ctx.Queue(qsize or 12 * self.nP) - self.pool = ctx.Pool(self.nP, self._Worker(self.queue_in, self.queue_out, self.n_tasks, self.event)) - self.is_alive = True + def __init__(self, fun=None, args=None, kwargs=None, bar=None): + self.id = id(self) self.handle = 0 self.tasks = {} + self.last_task = Task(self.id, fun, args, kwargs) self.bar = bar self.bar_lengths = {} - self.qbar = qbar - if self.qbar is not None: - self.qbar.total = 3 * self.nP + self.spool = PoolSingleton(self) + self.manager = self.spool.manager + self.is_started = False + + def __getstate__(self): + raise RuntimeError(f'Cannot pickle {self.__class__.__name__} object.') def __enter__(self, *args, **kwargs): return self @@ -294,27 +240,66 @@ class Parpool: def __exit__(self, *args, **kwargs): self.close() - def _get_from_queue(self): - """ Get an item from the queue and store it, return True if more messages are waiting. """ - try: - code, *args = self.queue_out.get(True, 0.02) - getattr(self, code)(*args) - return True - except multiprocessing.queues.Empty: - for handle, task in self.tasks.items(): # retry a task if the process doing it was killed - if task.pid is not None and task.pid not in [child.pid for child in multiprocessing.active_children()]: - self.queue_in.put(task) - warn('Task {} was restarted because process {} was probably killed.'.format(task.handle, task.pid)) - return False + def close(self): + if self.id in self.spool.pools: + self.spool.pools.pop(self.id) - def error(self, error): - self.close() - raise Exception('Error occurred in worker: {}'.format(error)) + def __call__(self, n, fun=None, args=None, kwargs=None, handle=None, barlength=1): + if self.id not in self.spool.pools: + raise ValueError(f'this pool is not registered (anymore) with the pool singleton') + if handle is None: + new_handle = self.handle + self.handle += 1 + else: + new_handle = handle + if new_handle in self: + raise ValueError(f'handle {new_handle} already present') + self.last_task = Task(self.id, fun or self.last_task.fun, args or self.last_task.args, + kwargs or self.last_task.kwargs, new_handle, n) + self.tasks[new_handle] = self.last_task + self.bar_lengths[new_handle] = barlength + self.spool.add_task(self.last_task) + if handle is None: + return new_handle + + def __setitem__(self, handle, n): + """ Add new iteration. """ + self(n, handle=handle) + + def __getitem__(self, handle): + """ Request result and delete its record. Wait if result not yet available. """ + if handle not in self: + raise ValueError(f'No handle: {handle} in pool') + while not self.tasks[handle].done: + if not self.spool.get_from_queue() and not self.tasks[handle].done and self.is_started \ + and not self.working: + for _ in range(10): # wait some time while processing possible new messages + self.spool.get_from_queue() + if not self.spool.get_from_queue() and not self.tasks[handle].done and self.is_started \ + and not self.working: + # retry a task if the process was killed while retrieving the task + self.spool.add_task(self.tasks[handle]) + warn(f'Task {handle} was restarted because the process retrieving it was probably killed.') + result = self.tasks[handle].result + self.tasks.pop(handle) + return result + + def __contains__(self, handle): + return handle in self.tasks + + def __delitem__(self, handle): + self.tasks.pop(handle) + + def get_newest(self): + return self.spool.get_newest_for_pool(self) + + def process_queue(self): + self.spool.process_queue() def task_error(self, handle, error): if handle in self: task = self.tasks[handle] - print('Error from process working on iteration {}:\n'.format(handle)) + print(f'Error from process working on iteration {handle}:\n') print(error) self.close() print('Retrying in main thread...') @@ -325,166 +310,163 @@ class Parpool: def done(self, task): if task.handle in self: # if not, the task was restarted erroneously self.tasks[task.handle] = task - if self.bar is not None: + if hasattr(self.bar, 'update'): self.bar.update(self.bar_lengths.pop(task.handle)) - self._qbar_update() def started(self, handle, pid): self.is_started = True if handle in self: # if not, the task was restarted erroneously self.tasks[handle].pid = pid - def __call__(self, n, fun=None, args=None, kwargs=None, handle=None, barlength=1): - """ Add new iteration, using optional manually defined handle.""" - if self.is_alive and not self.event.is_set(): - self.task = Task(fun or self.task.fun, args or self.task.args, kwargs or self.task.kwargs, handle, n) - while self.queue_in.full(): - self._get_from_queue() - if handle is None: - handle = self.handle - self.handle += 1 - self.tasks[handle] = self.task - self.queue_in.put(self.task) - self.bar_lengths[handle] = barlength - self._qbar_update() - return handle - elif handle not in self: - self.tasks[handle] = self.task - self.queue_in.put(self.task) - self.bar_lengths[handle] = barlength - self._qbar_update() - - def _qbar_update(self): - if self.qbar is not None: - try: - self.qbar.n = self.queue_in.qsize() - except Exception: - pass - - def __setitem__(self, handle, n): - """ Add new iteration. """ - self(n, handle=handle) - - def __getitem__(self, handle): - """ Request result and delete its record. Wait if result not yet available. """ - if handle not in self: - raise ValueError('No handle: {}'.format(handle)) - while not self.tasks[handle].done: - if not self._get_from_queue() and not self.tasks[handle].done and self.is_started and not self.working: - for _ in range(10): # wait some time while processing possible new messages - self._get_from_queue() - if not self._get_from_queue() and not self.tasks[handle].done and self.is_started and not self.working: - # retry a task if the process was killed while retrieving the task - self.queue_in.put(self.tasks[handle]) - warn('Task {} was restarted because the process retrieving it was probably killed.'.format(handle)) - result = self.tasks[handle].result - self.tasks.pop(handle) - return result - @property def working(self): return not all([task.pid is None for task in self.tasks.values()]) - def get_newest(self): + +class PoolSingleton: + def __new__(cls, *args, **kwargs): + if not hasattr(cls, 'instance') or cls.instance is None or not cls.instance.is_alive: + new = super().__new__(cls) + new.n_processes = cpu_count + new.instance = new + new.is_started = False + ctx = Context() + new.n_workers = ctx.Value('i', new.n_processes) + new.event = ctx.Event() + new.queue_in = ctx.Queue(3 * new.n_processes) + new.queue_out = ctx.Queue(new.n_processes) + new.pool = ctx.Pool(new.n_processes, Worker(new.queue_in, new.queue_out, new.n_workers, new.event)) + new.is_alive = True + new.handle = 0 + new.pools = {} + new.manager = ctx.Manager() + cls.instance = new + return cls.instance + + def __init__(self, parpool=None): + if parpool is not None: + self.pools[parpool.id] = parpool + + def __getstate__(self): + raise RuntimeError(f'Cannot pickle {self.__class__.__name__} object.') + + def error(self, error): + self.close() + raise Exception(f'Error occurred in worker: {error}') + + def process_queue(self): + while self.get_from_queue(): + pass + + def get_from_queue(self): + """ Get an item from the queue and store it, return True if more messages are waiting. """ + try: + code, pool_id, *args = self.queue_out.get(True, 0.02) + if pool_id is None: + getattr(self, code)(*args) + elif pool_id in self.pools: + getattr(self.pools[pool_id], code)(*args) + return True + except multiprocessing.queues.Empty: + for pool in self.pools.values(): + for handle, task in pool.tasks.items(): # retry a task if the process doing it was killed + if task.pid is not None \ + and task.pid not in [child.pid for child in multiprocessing.active_children()]: + self.queue_in.put(task) + warn(f'Task {task.handle} was restarted because process {task.pid} was probably killed.') + return False + + def add_task(self, task): + """ Add new iteration, using optional manually defined handle.""" + if self.is_alive and not self.event.is_set(): + while self.queue_in.full(): + self.get_from_queue() + self.queue_in.put(task) + + def get_newest_for_pool(self, pool): """ Request the newest key and result and delete its record. Wait if result not yet available. """ - while len(self.tasks): - self._get_from_queue() - for task in self.tasks.values(): + while len(pool.tasks): + self.get_from_queue() + for task in pool.tasks.values(): if task.done: handle, result = task.handle, task.result - self.tasks.pop(handle) + pool.tasks.pop(handle) return handle, result - def __delitem__(self, handle): - self.tasks.pop(handle) - - def __contains__(self, handle): - return handle in self.tasks - - def __repr__(self): - if self.is_alive: - return '{} with {} workers.'.format(self.__class__, self.nP) - else: - return 'Closed {}'.format(self.__class__) - def close(self): + self.__class__.instance = None + + def empty_queue(queue): + if not queue._closed: + while not queue.empty(): + try: + queue.get(True, 0.02) + except multiprocessing.queues.Empty: + pass + + def close_queue(queue): + empty_queue(queue) + if not queue._closed: + queue.close() + queue.join_thread() + if self.is_alive: self.is_alive = False self.event.set() self.pool.close() - while self.n_tasks.value: - self._empty_queue(self.queue_in) - self._empty_queue(self.queue_out) - self._empty_queue(self.queue_in) - self._empty_queue(self.queue_out) + while self.n_workers.value: + empty_queue(self.queue_in) + empty_queue(self.queue_out) + empty_queue(self.queue_in) + empty_queue(self.queue_out) self.pool.join() - self._close_queue(self.queue_in) - self._close_queue(self.queue_out) + close_queue(self.queue_in) + close_queue(self.queue_out) self.handle = 0 self.tasks = {} - @staticmethod - def _empty_queue(queue): - if not queue._closed: - while not queue.empty(): - try: - queue.get(True, 0.02) - except multiprocessing.queues.Empty: - pass - @staticmethod - def _close_queue(queue): - if not queue._closed: - while not queue.empty(): - try: - queue.get(True, 0.02) - except multiprocessing.queues.Empty: - pass - queue.close() - queue.join_thread() +class Worker: + """ Manages executing the target function which will be executed in different processes. """ + def __init__(self, queue_in, queue_out, n_workers, event, cachesize=48): + self.cache = DequeDict(cachesize) + self.queue_in = queue_in + self.queue_out = queue_out + self.n_workers = n_workers + self.event = event - class _Worker(object): - """ Manages executing the target function which will be executed in different processes. """ - def __init__(self, queue_in, queue_out, n_tasks, event, cachesize=48): - self.cache = DequeDict(cachesize) - self.queue_in = queue_in - self.queue_out = queue_out - self.n_tasks = n_tasks - self.event = event + def add_to_queue(self, *args): + while not self.event.is_set(): + try: + self.queue_out.put(args, timeout=0.1) + break + except multiprocessing.queues.Full: + continue - def add_to_queue(self, *args): - while not self.event.is_set(): + def __call__(self): + pid = getpid() + while not self.event.is_set(): + try: + task = self.queue_in.get(True, 0.02) try: - self.queue_out.put(args, timeout=0.1) - break - except multiprocessing.queues.Full: - continue - - def __call__(self): - pid = getpid() - while not self.event.is_set(): - try: - task = self.queue_in.get(True, 0.02) - try: - self.add_to_queue('started', task.handle, pid) - task.set_from_cache(self.cache) - self.add_to_queue('done', task()) - except Exception: - self.add_to_queue('task_error', task.handle, format_exc()) - self.event.set() - except multiprocessing.queues.Empty: - continue + self.add_to_queue('started', task.pool_id, task.handle, pid) + task.set_from_cache(self.cache) + self.add_to_queue('done', task.pool_id, task()) except Exception: - self.add_to_queue('error', format_exc()) - self.event.set() - for child in multiprocessing.active_children(): - child.kill() - with self.n_tasks.get_lock(): - self.n_tasks.value -= 1 + self.add_to_queue('task_error', task.pool_id, task.handle, format_exc()) + except multiprocessing.queues.Empty: + continue + except Exception: + self.add_to_queue('error', None, format_exc()) + self.event.set() + for child in multiprocessing.active_children(): + child.kill() + with self.n_workers.get_lock(): + self.n_workers.value -= 1 -def pmap(fun, iterable=None, args=None, kwargs=None, total=None, desc=None, bar=True, qbar=False, terminator=None, - rP=1, nP=None, serial=None, qsize=None, length=None, **bar_kwargs): +def pmap(fun, iterable=None, args=None, kwargs=None, total=None, desc=None, bar=True, terminator=None, + serial=None, length=None, **bar_kwargs): """ map a function fun to each iteration in iterable use as a function: pmap use as a decorator: parfor @@ -500,11 +482,7 @@ def pmap(fun, iterable=None, args=None, kwargs=None, total=None, desc=None, bar= desc: string with description of the progress bar bar: bool enable progress bar, or a callback function taking the number of passed iterations as an argument - pbar: bool enable buffer indicator bar, or a callback function taking the queue size as an argument - rP: ratio workers to cpu cores, default: 1 - nP: number of workers, default, None, overrides rP if not None serial: execute in series instead of parallel if True, None (default): let pmap decide - qsize: maximum size of the task queue length: deprecated alias for total **bar_kwargs: keywords arguments for tqdm.tqdm @@ -569,6 +547,7 @@ def pmap(fun, iterable=None, args=None, kwargs=None, total=None, desc=None, bar= else: iterable = Chunks(iterable, ratio=5, length=total) + @wraps(fun) def chunk_fun(iterator, *args, **kwargs): return [fun(i, *args, **kwargs) for i in iterator] @@ -587,10 +566,12 @@ def pmap(fun, iterable=None, args=None, kwargs=None, total=None, desc=None, bar= else: return sum([chunk_fun(c, *args, **kwargs) for c in tqdm(iterable, **bar_kwargs)], []) else: # parallel case - with ExternalBar(callback=qbar) if callable(qbar) \ - else TqdmMeter(total=0, desc='Task buffer', disable=not qbar, leave=False) as qbar, \ - ExternalBar(callback=bar) if callable(bar) else tqdm(**bar_kwargs) as bar: - with Parpool(chunk_fun, args, kwargs, rP, nP, bar, qbar, qsize) as p: + with ExitStack() as stack: + if callable(bar): + bar = stack.enter_context(ExternalBar(callback=bar)) + elif bar is True: + bar = stack.enter_context(tqdm(**bar_kwargs)) + with ParPool(chunk_fun, args, kwargs, bar) as p: for i, (j, l) in enumerate(zip(iterable, iterable.lengths)): # add work to the queue p(j, handle=i, barlength=iterable.lengths[i]) if bar.total is None or bar.total < i+1: @@ -622,6 +603,6 @@ def deprecated(cls, name): # backwards compatibility -parpool = deprecated(Parpool, 'parpool') -tqdmm = deprecated(TqdmMeter, 'tqdmm') +parpool = deprecated(ParPool, 'parpool') +Parpool = deprecated(ParPool, 'Parpool') chunks = deprecated(Chunks, 'chunks') diff --git a/parfor/pickler.py b/parfor/pickler.py index e80fdb4..aa4041d 100644 --- a/parfor/pickler.py +++ b/parfor/pickler.py @@ -1,7 +1,7 @@ -import dill -from pickle import PicklingError, dispatch_table from io import BytesIO +from pickle import PicklingError, dispatch_table +import dill failed_rv = (lambda *args, **kwargs: None, ()) loads = dill.loads diff --git a/pyproject.toml b/pyproject.toml index a84350b..bfa8373 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "parfor" -version = "2023.8.2" +version = "2023.8.3" description = "A package to mimic the use of parfor as done in Matlab." authors = ["Wim Pomp "] license = "GPLv3" @@ -9,7 +9,7 @@ keywords = ["parfor", "concurrency", "multiprocessing", "parallel"] repository = "https://github.com/wimpomp/parfor" [tool.poetry.dependencies] -python = "^3.5" +python = "^3.6" tqdm = ">=4.50.0" dill = ">=0.3.0" pytest = { version = "*", optional = true } diff --git a/tests/test_parfor.py b/tests/test_parfor.py index 2c400e9..5f806cd 100644 --- a/tests/test_parfor.py +++ b/tests/test_parfor.py @@ -1,5 +1,5 @@ import pytest -from parfor import Chunks, parfor, Parpool, pmap +from parfor import Chunks, ParPool, parfor, pmap class SequenceIterator: @@ -48,7 +48,7 @@ def test_parpool(): def fun(i, j, k): return i * j * k - with Parpool(fun, (3,), {'k': 2}) as pool: + with ParPool(fun, (3,), {'k': 2}) as pool: for i in range(10): pool[i] = i