From 9783c1d1f2c729b64bf6155429ac8ddf70a6c9ac Mon Sep 17 00:00:00 2001 From: Wim Pomp Date: Fri, 24 May 2024 16:57:35 +0200 Subject: [PATCH] - introduce n_processes to change the number of processes in the pool --- .github/workflows/pytest.yml | 4 ++-- README.md | 21 +++++++++++----- parfor/__init__.py | 23 ++++++++++-------- pyproject.toml | 2 +- tests/test_parfor.py | 46 ++++++++++++++++++++++++------------ 5 files changed, 62 insertions(+), 34 deletions(-) diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index 1311849..7301e2d 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -11,9 +11,9 @@ jobs: os: [ubuntu-20.04, windows-2019, macOS-11] steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} - name: Install diff --git a/README.md b/README.md index c229d4e..c291e17 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,17 @@ Tested on linux, Windows and OSX with python 3.10. - Using dill instead of pickle: a lot more objects can be used when parallelizing - Progress bars are built-in +## How it works +The work you want parfor to do is divided over a number of processes. These processes are started by parfor and put +together in a pool. This pool is reused when you want parfor to do more work, or shut down when no new work arrives +within 10 minutes. + +A handle to each bit of work is put in a queue from which the workers take work. The objects needed to do the work are +stored in a memory manager in serialized form (using dill) and the manager hands out an object to a worker when the +worker is requesting it. The manager deletes objects automatically when they're not needed anymore. + +When the work is done the result is sent back for collection in the main process. + ## Installation `pip install parfor` @@ -41,13 +52,11 @@ iterations need to be dillable. You might be able to make objects dillable anyho desc: string with description of the progress bar bar: bool enable progress bar, or a callback function taking the number of passed iterations as an argument - pbar: bool enable buffer indicator bar, or a callback function taking the queue size as an argument - rP: ratio workers to cpu cores, default: 1 - nP: number of workers, default, None, overrides rP if not None serial: execute in series instead of parallel if True, None (default): let pmap decide - qsize: maximum size of the task queue length: deprecated alias for total - **bar_kwargs: keywords arguments for tqdm.tqdm + n_processes: number of processes to use, + the parallel pool will be restarted if the current pool does not have the right number of processes + **bar_kwargs: keyword arguments for tqdm.tqdm ### Return list with results from applying the function 'fun' to each iteration of the iterable / iterator @@ -149,7 +158,7 @@ Since generators don't have a predefined length, give parfor the length (total) # Extra's ## `pmap` -The function parfor decorates, use it like `map`. +The function parfor decorates, it's used similarly to `map`. ## `Chunks` Split a long iterator in bite-sized chunks to parallelize diff --git a/parfor/__init__.py b/parfor/__init__.py index 5255ba4..3a5a059 100644 --- a/parfor/__init__.py +++ b/parfor/__init__.py @@ -256,13 +256,13 @@ class ParPool: The target function and its argument can be changed at any time. """ def __init__(self, fun: Callable[[Any, ...], Any] = None, - args: tuple[Any] = None, kwargs: dict[str, Any] = None, bar: Bar = None): + args: tuple[Any] = None, kwargs: dict[str, Any] = None, n_processes: int = None, bar: Bar = None): self.id = id(self) self.handle = 0 self.tasks = {} self.bar = bar self.bar_lengths = {} - self.spool = PoolSingleton(self) + self.spool = PoolSingleton(n_processes, self) self.manager = self.spool.manager self.fun = fun self.args = args @@ -372,13 +372,14 @@ class PoolSingleton: instance = None - def __new__(cls, *args: Any, **kwargs: Any) -> PoolSingleton: - if cls.instance is not None: # restart if any workers have shut down - if cls.instance.n_workers.value < cls.instance.n_processes: + def __new__(cls, n_processes: int = None, *args: Any, **kwargs: Any) -> PoolSingleton: + # restart if any workers have shut down or if we want to have a different number of processes + if cls.instance is not None: + if cls.instance.n_workers.value < cls.instance.n_processes or cls.instance.n_processes != n_processes: cls.instance.close() if cls.instance is None or not cls.instance.is_alive: new = super().__new__(cls) - new.n_processes = cpu_count + new.n_processes = n_processes or cpu_count new.instance = new new.is_started = False ctx = Context() @@ -396,7 +397,7 @@ class PoolSingleton: cls.instance = new return cls.instance - def __init__(self, parpool: Parpool = None) -> None: # noqa + def __init__(self, n_processes: int = None, parpool: Parpool = None) -> None: # noqa if parpool is not None: self.pools[parpool.id] = parpool @@ -457,7 +458,7 @@ class PoolSingleton: @classmethod def close(cls) -> None: - if hasattr(cls, 'instance') and cls.instance is not None: + if cls.instance is not None: instance = cls.instance cls.instance = None @@ -549,7 +550,7 @@ class Worker: def pmap(fun: Callable[[Iteration, Any, ...], Result], iterable: Iterable[Iteration] = None, args: tuple[Any, ...] = None, kwargs: dict[str, Any] = None, total: int = None, desc: str = None, bar: Bar | bool = True, terminator: Callable[[], None] = None, serial: bool = None, length: int = None, - **bar_kwargs: Any) -> list[Result]: + n_processes: int = None, **bar_kwargs: Any) -> list[Result]: """ map a function fun to each iteration in iterable use as a function: pmap use as a decorator: parfor @@ -567,6 +568,8 @@ def pmap(fun: Callable[[Iteration, Any, ...], Result], iterable: Iterable[Iterat or a callback function taking the number of passed iterations as an argument serial: execute in series instead of parallel if True, None (default): let pmap decide length: deprecated alias for total + n_processes: number of processes to use, + the parallel pool will be restarted if the current pool does not have the right number of processes **bar_kwargs: keywords arguments for tqdm.tqdm output: @@ -654,7 +657,7 @@ def pmap(fun: Callable[[Iteration, Any, ...], Result], iterable: Iterable[Iterat bar = stack.enter_context(ExternalBar(callback=bar)) else: bar = stack.enter_context(tqdm(**bar_kwargs)) - with ParPool(chunk_fun, args, kwargs, bar) as p: + with ParPool(chunk_fun, args, kwargs, n_processes, bar) as p: for i, (j, l) in enumerate(zip(iterable, iterable.lengths)): # add work to the queue p(j, handle=i, barlength=iterable.lengths[i]) if bar.total is None or bar.total < i+1: diff --git a/pyproject.toml b/pyproject.toml index b01f7ed..bb40e8e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "parfor" -version = "2024.4.0" +version = "2024.5.0" description = "A package to mimic the use of parfor as done in Matlab." authors = ["Wim Pomp "] license = "GPLv3" diff --git a/tests/test_parfor.py b/tests/test_parfor.py index 91cb90e..e78b32f 100644 --- a/tests/test_parfor.py +++ b/tests/test_parfor.py @@ -1,4 +1,9 @@ +from __future__ import annotations + from dataclasses import dataclass +from os import getpid +from time import sleep +from typing import Any, Iterator, Optional, Sequence import pytest @@ -6,14 +11,14 @@ from parfor import Chunks, ParPool, parfor, pmap class SequenceIterator: - def __init__(self, sequence): + def __init__(self, sequence: Sequence) -> None: self._sequence = sequence self._index = 0 - def __iter__(self): + def __iter__(self) -> SequenceIterator: return self - def __next__(self): + def __next__(self) -> Any: if self._index < len(self._sequence): item = self._sequence[self._index] self._index += 1 @@ -21,19 +26,19 @@ class SequenceIterator: else: raise StopIteration - def __len__(self): + def __len__(self) -> int: return len(self._sequence) class Iterable: - def __init__(self, sequence): + def __init__(self, sequence: Sequence) -> None: self.sequence = sequence - def __iter__(self): + def __iter__(self) -> SequenceIterator: return SequenceIterator(self.sequence) -def iterators(): +def iterators() -> tuple[Iterator, Optional[int]]: yield range(10), None yield list(range(10)), None yield (i for i in range(10)), 10 @@ -42,23 +47,23 @@ def iterators(): @pytest.mark.parametrize('iterator', iterators()) -def test_chunks(iterator): +def test_chunks(iterator: tuple[Iterator, Optional[int]]) -> None: chunks = Chunks(iterator[0], size=2, length=iterator[1]) assert list(chunks) == [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]] -def test_parpool(): - def fun(i, j, k): # noqa +def test_parpool() -> None: + def fun(i, j, k) -> int: # noqa return i * j * k - with ParPool(fun, (3,), {'k': 2}) as pool: + with ParPool(fun, (3,), {'k': 2}) as pool: # noqa for i in range(10): pool[i] = i assert [pool[i] for i in range(10)] == [0, 6, 12, 18, 24, 30, 36, 42, 48, 54] -def test_parfor(): +def test_parfor() -> None: @parfor(range(10), (3,), {'k': 2}) def fun(i, j, k): return i * j * k @@ -66,14 +71,14 @@ def test_parfor(): assert fun == [0, 6, 12, 18, 24, 30, 36, 42, 48, 54] -def test_pmap(): +def test_pmap() -> None: def fun(i, j, k): return i * j * k assert pmap(fun, range(10), (3,), {'k': 2}) == [0, 6, 12, 18, 24, 30, 36, 42, 48, 54] -def test_id_reuse(): +def test_id_reuse() -> None: def fun(i): return i[0].a @@ -87,5 +92,16 @@ def test_id_reuse(): yield t del t - a = pmap(fun, Chunks(gen(1000), size=1, length=1000), total=1000) + a = pmap(fun, Chunks(gen(1000), size=1, length=1000), total=1000) # noqa assert all([i == j for i, j in enumerate(a)]) + + +@pytest.mark.parametrize('n_processes', (2, 4, 6)) +def test_n_processes(n_processes) -> None: + + @parfor(range(12), n_processes=n_processes) + def fun(i): # noqa + sleep(0.25) + return getpid() + + assert len(set(fun)) == n_processes