- introduce n_processes to change the number of processes in the pool

This commit is contained in:
Wim Pomp
2024-05-24 16:57:35 +02:00
parent ac4d599646
commit 9783c1d1f2
5 changed files with 62 additions and 34 deletions

View File

@@ -11,9 +11,9 @@ jobs:
os: [ubuntu-20.04, windows-2019, macOS-11]
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install

View File

@@ -13,6 +13,17 @@ Tested on linux, Windows and OSX with python 3.10.
- Using dill instead of pickle: a lot more objects can be used when parallelizing
- Progress bars are built-in
## How it works
The work you want parfor to do is divided over a number of processes. These processes are started by parfor and put
together in a pool. This pool is reused when you want parfor to do more work, or shut down when no new work arrives
within 10 minutes.
A handle to each bit of work is put in a queue from which the workers take work. The objects needed to do the work are
stored in a memory manager in serialized form (using dill) and the manager hands out an object to a worker when the
worker is requesting it. The manager deletes objects automatically when they're not needed anymore.
When the work is done the result is sent back for collection in the main process.
## Installation
`pip install parfor`
@@ -41,13 +52,11 @@ iterations need to be dillable. You might be able to make objects dillable anyho
desc: string with description of the progress bar
bar: bool enable progress bar,
or a callback function taking the number of passed iterations as an argument
pbar: bool enable buffer indicator bar, or a callback function taking the queue size as an argument
rP: ratio workers to cpu cores, default: 1
nP: number of workers, default, None, overrides rP if not None
serial: execute in series instead of parallel if True, None (default): let pmap decide
qsize: maximum size of the task queue
length: deprecated alias for total
**bar_kwargs: keywords arguments for tqdm.tqdm
n_processes: number of processes to use,
the parallel pool will be restarted if the current pool does not have the right number of processes
**bar_kwargs: keyword arguments for tqdm.tqdm
### Return
list with results from applying the function 'fun' to each iteration of the iterable / iterator
@@ -149,7 +158,7 @@ Since generators don't have a predefined length, give parfor the length (total)
# Extra's
## `pmap`
The function parfor decorates, use it like `map`.
The function parfor decorates, it's used similarly to `map`.
## `Chunks`
Split a long iterator in bite-sized chunks to parallelize

View File

@@ -256,13 +256,13 @@ class ParPool:
The target function and its argument can be changed at any time.
"""
def __init__(self, fun: Callable[[Any, ...], Any] = None,
args: tuple[Any] = None, kwargs: dict[str, Any] = None, bar: Bar = None):
args: tuple[Any] = None, kwargs: dict[str, Any] = None, n_processes: int = None, bar: Bar = None):
self.id = id(self)
self.handle = 0
self.tasks = {}
self.bar = bar
self.bar_lengths = {}
self.spool = PoolSingleton(self)
self.spool = PoolSingleton(n_processes, self)
self.manager = self.spool.manager
self.fun = fun
self.args = args
@@ -372,13 +372,14 @@ class PoolSingleton:
instance = None
def __new__(cls, *args: Any, **kwargs: Any) -> PoolSingleton:
if cls.instance is not None: # restart if any workers have shut down
if cls.instance.n_workers.value < cls.instance.n_processes:
def __new__(cls, n_processes: int = None, *args: Any, **kwargs: Any) -> PoolSingleton:
# restart if any workers have shut down or if we want to have a different number of processes
if cls.instance is not None:
if cls.instance.n_workers.value < cls.instance.n_processes or cls.instance.n_processes != n_processes:
cls.instance.close()
if cls.instance is None or not cls.instance.is_alive:
new = super().__new__(cls)
new.n_processes = cpu_count
new.n_processes = n_processes or cpu_count
new.instance = new
new.is_started = False
ctx = Context()
@@ -396,7 +397,7 @@ class PoolSingleton:
cls.instance = new
return cls.instance
def __init__(self, parpool: Parpool = None) -> None: # noqa
def __init__(self, n_processes: int = None, parpool: Parpool = None) -> None: # noqa
if parpool is not None:
self.pools[parpool.id] = parpool
@@ -457,7 +458,7 @@ class PoolSingleton:
@classmethod
def close(cls) -> None:
if hasattr(cls, 'instance') and cls.instance is not None:
if cls.instance is not None:
instance = cls.instance
cls.instance = None
@@ -549,7 +550,7 @@ class Worker:
def pmap(fun: Callable[[Iteration, Any, ...], Result], iterable: Iterable[Iteration] = None,
args: tuple[Any, ...] = None, kwargs: dict[str, Any] = None, total: int = None, desc: str = None,
bar: Bar | bool = True, terminator: Callable[[], None] = None, serial: bool = None, length: int = None,
**bar_kwargs: Any) -> list[Result]:
n_processes: int = None, **bar_kwargs: Any) -> list[Result]:
""" map a function fun to each iteration in iterable
use as a function: pmap
use as a decorator: parfor
@@ -567,6 +568,8 @@ def pmap(fun: Callable[[Iteration, Any, ...], Result], iterable: Iterable[Iterat
or a callback function taking the number of passed iterations as an argument
serial: execute in series instead of parallel if True, None (default): let pmap decide
length: deprecated alias for total
n_processes: number of processes to use,
the parallel pool will be restarted if the current pool does not have the right number of processes
**bar_kwargs: keywords arguments for tqdm.tqdm
output:
@@ -654,7 +657,7 @@ def pmap(fun: Callable[[Iteration, Any, ...], Result], iterable: Iterable[Iterat
bar = stack.enter_context(ExternalBar(callback=bar))
else:
bar = stack.enter_context(tqdm(**bar_kwargs))
with ParPool(chunk_fun, args, kwargs, bar) as p:
with ParPool(chunk_fun, args, kwargs, n_processes, bar) as p:
for i, (j, l) in enumerate(zip(iterable, iterable.lengths)): # add work to the queue
p(j, handle=i, barlength=iterable.lengths[i])
if bar.total is None or bar.total < i+1:

View File

@@ -1,6 +1,6 @@
[tool.poetry]
name = "parfor"
version = "2024.4.0"
version = "2024.5.0"
description = "A package to mimic the use of parfor as done in Matlab."
authors = ["Wim Pomp <wimpomp@gmail.com>"]
license = "GPLv3"

View File

@@ -1,4 +1,9 @@
from __future__ import annotations
from dataclasses import dataclass
from os import getpid
from time import sleep
from typing import Any, Iterator, Optional, Sequence
import pytest
@@ -6,14 +11,14 @@ from parfor import Chunks, ParPool, parfor, pmap
class SequenceIterator:
def __init__(self, sequence):
def __init__(self, sequence: Sequence) -> None:
self._sequence = sequence
self._index = 0
def __iter__(self):
def __iter__(self) -> SequenceIterator:
return self
def __next__(self):
def __next__(self) -> Any:
if self._index < len(self._sequence):
item = self._sequence[self._index]
self._index += 1
@@ -21,19 +26,19 @@ class SequenceIterator:
else:
raise StopIteration
def __len__(self):
def __len__(self) -> int:
return len(self._sequence)
class Iterable:
def __init__(self, sequence):
def __init__(self, sequence: Sequence) -> None:
self.sequence = sequence
def __iter__(self):
def __iter__(self) -> SequenceIterator:
return SequenceIterator(self.sequence)
def iterators():
def iterators() -> tuple[Iterator, Optional[int]]:
yield range(10), None
yield list(range(10)), None
yield (i for i in range(10)), 10
@@ -42,23 +47,23 @@ def iterators():
@pytest.mark.parametrize('iterator', iterators())
def test_chunks(iterator):
def test_chunks(iterator: tuple[Iterator, Optional[int]]) -> None:
chunks = Chunks(iterator[0], size=2, length=iterator[1])
assert list(chunks) == [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]]
def test_parpool():
def fun(i, j, k): # noqa
def test_parpool() -> None:
def fun(i, j, k) -> int: # noqa
return i * j * k
with ParPool(fun, (3,), {'k': 2}) as pool:
with ParPool(fun, (3,), {'k': 2}) as pool: # noqa
for i in range(10):
pool[i] = i
assert [pool[i] for i in range(10)] == [0, 6, 12, 18, 24, 30, 36, 42, 48, 54]
def test_parfor():
def test_parfor() -> None:
@parfor(range(10), (3,), {'k': 2})
def fun(i, j, k):
return i * j * k
@@ -66,14 +71,14 @@ def test_parfor():
assert fun == [0, 6, 12, 18, 24, 30, 36, 42, 48, 54]
def test_pmap():
def test_pmap() -> None:
def fun(i, j, k):
return i * j * k
assert pmap(fun, range(10), (3,), {'k': 2}) == [0, 6, 12, 18, 24, 30, 36, 42, 48, 54]
def test_id_reuse():
def test_id_reuse() -> None:
def fun(i):
return i[0].a
@@ -87,5 +92,16 @@ def test_id_reuse():
yield t
del t
a = pmap(fun, Chunks(gen(1000), size=1, length=1000), total=1000)
a = pmap(fun, Chunks(gen(1000), size=1, length=1000), total=1000) # noqa
assert all([i == j for i, j in enumerate(a)])
@pytest.mark.parametrize('n_processes', (2, 4, 6))
def test_n_processes(n_processes) -> None:
@parfor(range(12), n_processes=n_processes)
def fun(i): # noqa
sleep(0.25)
return getpid()
assert len(set(fun)) == n_processes