From 4d803162441e6bec9d7780102fc51663b33cc4cc Mon Sep 17 00:00:00 2001 From: Wim Pomp Date: Thu, 5 Sep 2024 18:37:47 +0200 Subject: [PATCH] - add gmap: function like pmap, but returning a generator instead of a list - add arguments for returning results out/in order and returning result indices --- .github/workflows/pytest.yml | 4 +-- README.md | 9 ++++-- parfor/__init__.py | 59 +++++++++++++++++++++++++++++------- pyproject.toml | 2 +- 4 files changed, 58 insertions(+), 16 deletions(-) diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index 7301e2d..c61decd 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -7,8 +7,8 @@ jobs: runs-on: ${{ matrix.os }} strategy: matrix: - python-version: ["3.10"] - os: [ubuntu-20.04, windows-2019, macOS-11] + python-version: ["3.10", "3.12"] + os: [ubuntu-latest, windows-latest, macOS-latest] steps: - uses: actions/checkout@v4 diff --git a/README.md b/README.md index c291e17..8d10976 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ Take any normal serial but parallelizable for-loop and execute it in parallel us Don't worry about the technical details of using the multiprocessing module, race conditions, queues, parfor handles all that. -Tested on linux, Windows and OSX with python 3.10. +Tested on linux, Windows and OSX with python 3.10 and 3.12. ## Why is parfor better than just using multiprocessing? - Easy to use @@ -56,6 +56,8 @@ iterations need to be dillable. You might be able to make objects dillable anyho length: deprecated alias for total n_processes: number of processes to use, the parallel pool will be restarted if the current pool does not have the right number of processes + yield_ordered: return the result in the same order as the iterable + yield_index: return the index of the result too **bar_kwargs: keyword arguments for tqdm.tqdm ### Return @@ -158,7 +160,10 @@ Since generators don't have a predefined length, give parfor the length (total) # Extra's ## `pmap` -The function parfor decorates, it's used similarly to `map`. +The function parfor decorates, it's used similarly to `map`, it returns a list with the results. + +## `gmap` +Same as pmap, but returns a generator. Useful to use the result as soon as it's generated. ## `Chunks` Split a long iterator in bite-sized chunks to parallelize diff --git a/parfor/__init__.py b/parfor/__init__.py index 1d2109e..5bfc0f7 100644 --- a/parfor/__init__.py +++ b/parfor/__init__.py @@ -8,7 +8,7 @@ from importlib.metadata import version from os import devnull, getpid from time import time from traceback import format_exc -from typing import Any, Callable, Hashable, Iterable, Iterator, NoReturn, Optional, Protocol, Sized, TypeVar +from typing import Any, Callable, Generator, Hashable, Iterable, Iterator, NoReturn, Optional, Protocol, Sized, TypeVar from warnings import warn from tqdm.auto import tqdm @@ -155,7 +155,7 @@ class Chunks(Iterable): if len(self.iterators) == 1: yield [next(self.iterators[0]) for _ in range(q - p)] else: - yield [[next(iterator) for _ in range(q-p)] for iterator in self.iterators] + yield [[next(iterator) for _ in range(q - p)] for iterator in self.iterators] def __len__(self) -> int: return self.length @@ -551,10 +551,11 @@ class Worker: self.n_workers.value -= 1 -def pmap(fun: Callable[[Iteration, Any, ...], Result], iterable: Iterable[Iteration] = None, +def gmap(fun: Callable[[Iteration, Any, ...], Result], iterable: Iterable[Iteration] = None, args: tuple[Any, ...] = None, kwargs: dict[str, Any] = None, total: int = None, desc: str = None, bar: Bar | bool = True, terminator: Callable[[], None] = None, serial: bool = None, length: int = None, - n_processes: int = None, **bar_kwargs: Any) -> list[Result]: + n_processes: int = None, yield_ordered: bool = True, yield_index: bool = False, + **bar_kwargs: Any) -> Generator[Result, None, None]: """ map a function fun to each iteration in iterable use as a function: pmap use as a decorator: parfor @@ -574,10 +575,13 @@ def pmap(fun: Callable[[Iteration, Any, ...], Result], iterable: Iterable[Iterat length: deprecated alias for total n_processes: number of processes to use, the parallel pool will be restarted if the current pool does not have the right number of processes + yield_ordered: return the result in the same order as the iterable + yield_index: return the index of the result too **bar_kwargs: keywords arguments for tqdm.tqdm output: - list with results from applying the function \'fun\' to each iteration of the iterable / iterator + list (pmap) or generator (gmap) with results from applying the function \'fun\' to each iteration + of the iterable / iterator examples: << from time import sleep @@ -663,16 +667,49 @@ def pmap(fun: Callable[[Iteration, Any, ...], Result], iterable: Iterable[Iterat bar = stack.enter_context(tqdm(**bar_kwargs)) with ParPool(chunk_fun, args, kwargs, n_processes, bar) as p: for i, (j, l) in enumerate(zip(iterable, iterable.lengths)): # add work to the queue - p(j, handle=i, barlength=iterable.lengths[i]) - if bar.total is None or bar.total < i+1: - bar.total = i+1 + p(j, handle=i, barlength=l) + if bar.total is None or bar.total < i + 1: + bar.total = i + 1 + if is_chunked: - return [p[i] for i in range(len(iterable))] + if yield_ordered: + if yield_index: + for i in range(len(iterable)): + yield i, p[i] + else: + for i in range(len(iterable)): + yield p[i] + else: + if yield_index: + for _ in range(len(iterable)): + yield p.get_newest() + else: + for _ in range(len(iterable)): + yield p.get_newest()[1] else: - return sum([p[i] for i in range(len(iterable))], []) # collect the results + if yield_ordered: + if yield_index: + for i in range(len(iterable)): + yield i, p[i][0] + else: + for i in range(len(iterable)): + yield p[i][0] + else: + if yield_index: + for _ in range(len(iterable)): + i, n = p.get_newest() + yield i, n[0] + else: + for _ in range(len(iterable)): + yield p.get_newest()[1][0] -@wraps(pmap) +@wraps(gmap) +def pmap(*args, **kwargs) -> list[Result]: + return list(gmap(*args, **kwargs)) # type: ignore + + +@wraps(gmap) def parfor(*args: Any, **kwargs: Any) -> Callable[[Callable[[Iteration, Any, ...], Result]], list[Result]]: def decfun(fun: Callable[[Iteration, Any, ...], Result]) -> list[Result]: return pmap(fun, *args, **kwargs) diff --git a/pyproject.toml b/pyproject.toml index f1eb76d..b19a67d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "parfor" -version = "2024.7.1" +version = "2024.9.0" description = "A package to mimic the use of parfor as done in Matlab." authors = ["Wim Pomp "] license = "GPLv3"