- add allow_output option

- update readme
This commit is contained in:
Wim Pomp
2026-01-08 10:42:50 +01:00
parent 6341561380
commit 43a0cf68b5
3 changed files with 47 additions and 47 deletions

View File

@@ -4,36 +4,18 @@
Used to parallelize for-loops using parfor in Matlab? This package allows you to do the same in python. Used to parallelize for-loops using parfor in Matlab? This package allows you to do the same in python.
Take any normal serial but parallelizable for-loop and execute it in parallel using easy syntax. Take any normal serial but parallelizable for-loop and execute it in parallel using easy syntax.
Don't worry about the technical details of using the multiprocessing module, race conditions, queues, Don't worry about the technical details of using the multiprocessing module, race conditions, queues,
parfor handles all that. parfor handles all that. Now powered by [ray](https://pypi.org/project/ray/).
Tested on linux, Windows and OSX with python 3.10 and 3.12. Tested on linux, Windows and OSX with python 3.10 and 3.12.
## Why is parfor better than just using multiprocessing? ## Why is parfor better than just using multiprocessing?
- Easy to use - Easy to use
- Using dill instead of pickle: a lot more objects can be used when parallelizing
- Progress bars are built-in - Progress bars are built-in
- Automatically use multithreading instead of multiprocessing when the GIL is disabled - Retry the task in the main process upon failure for easy debugging
## How it works ## How it works
This depends on whether the GIL is currently disabled or not. Disabling the GIL in Python is currently an experimental [Ray](https://pypi.org/project/ray/) does all the heavy lifting. Parfor now is just a wrapper around ray, adding
feature in Python3.13, and not the standard. some ergonomics.
### Python with GIL enabled
The work you want parfor to do is divided over a number of processes. These processes are started by parfor and put
together in a pool. This pool is reused when you want parfor to do more work, or shut down when no new work arrives
within 10 minutes.
A handle to each bit of work is put in a queue from which the workers take work. The objects needed to do the work are
stored in a memory manager in serialized form (using dill) and the manager hands out an object to a worker when the
worker is requesting it. The manager deletes objects automatically when they're not needed anymore.
When the work is done the result is sent back for collection in the main process.
### Python with GIL disabled
The work you want parfor to do is given to a new thread. These threads are started by parfor and put together in a pool.
The threads and pool are not reused and closed automatically when done.
When the work is done a message is sent to the main thread to update the status of the pool.
## Installation ## Installation
`pip install parfor` `pip install parfor`
@@ -43,13 +25,7 @@ Parfor decorates a functions and returns the result of that function evaluated i
an iterator. an iterator.
## Requires ## Requires
tqdm, dill numpy, ray, tqdm
## Limitations
If you're using Python with the GIL enabaled, then objects passed to the pool need to be dillable (dill needs to
serialize them). Generators and SwigPyObjects are examples of objects that cannot be used. They can be used however, for
the iterator argument when using parfor, but its iterations need to be dillable. You might be able to make objects
dillable anyhow using `dill.register` or with `__reduce__`, `__getstate__`, etc.
## Arguments ## Arguments
To functions `parfor.parfor`, `parfor.pmap` and `parfor.gmap`. To functions `parfor.parfor`, `parfor.pmap` and `parfor.gmap`.
@@ -66,11 +42,11 @@ To functions `parfor.parfor`, `parfor.pmap` and `parfor.gmap`.
bar: bool enable progress bar, bar: bool enable progress bar,
or a callback function taking the number of passed iterations as an argument or a callback function taking the number of passed iterations as an argument
serial: execute in series instead of parallel if True, None (default): let pmap decide serial: execute in series instead of parallel if True, None (default): let pmap decide
length: deprecated alias for total
n_processes: number of processes to use, n_processes: number of processes to use,
the parallel pool will be restarted if the current pool does not have the right number of processes the parallel pool will be restarted if the current pool does not have the right number of processes
yield_ordered: return the result in the same order as the iterable yield_ordered: return the result in the same order as the iterable
yield_index: return the index of the result too yield_index: return the index of the result too
allow_output: allow output from subprocesses
**bar_kwargs: keyword arguments for tqdm.tqdm **bar_kwargs: keyword arguments for tqdm.tqdm
### Return ### Return
@@ -185,3 +161,6 @@ Split a long iterator in bite-sized chunks to parallelize
More low-level accessibility to parallel execution. Submit tasks and request the result at any time, More low-level accessibility to parallel execution. Submit tasks and request the result at any time,
(although to avoid breaking causality, submit first, then request), use different functions and function (although to avoid breaking causality, submit first, then request), use different functions and function
arguments for different tasks. arguments for different tasks.
## `SharedArray`
A numpy arrow that can be shared among processes.

View File

@@ -4,6 +4,7 @@ import logging
import os import os
import warnings import warnings
from contextlib import ExitStack, redirect_stdout, redirect_stderr from contextlib import ExitStack, redirect_stdout, redirect_stderr
from io import StringIO
from functools import wraps from functools import wraps
from importlib.metadata import version from importlib.metadata import version
from multiprocessing.shared_memory import SharedMemory from multiprocessing.shared_memory import SharedMemory
@@ -57,7 +58,7 @@ class SharedArray(np.ndarray):
shm = SharedMemory(create=True, size=int(np.prod(shape) * np.dtype(dtype).itemsize)) # type: ignore shm = SharedMemory(create=True, size=int(np.prod(shape) * np.dtype(dtype).itemsize)) # type: ignore
new = super().__new__(cls, shape, dtype, shm.buf, offset, strides, order) new = super().__new__(cls, shape, dtype, shm.buf, offset, strides, order)
new.shm = shm new.shm = shm
return new return new # type: ignore
def __reduce__( def __reduce__(
self, self,
@@ -193,17 +194,23 @@ class ExternalBar(Iterable):
@ray.remote @ray.remote
def worker(task): def worker(task):
try: try:
with ( with ExitStack() as stack: # noqa
warnings.catch_warnings(), if task.allow_output:
redirect_stdout(open(os.devnull, "w")), out = StringIO()
redirect_stderr(open(os.devnull, "w")), err = StringIO()
): stack.enter_context(redirect_stdout(out))
warnings.simplefilter("ignore", category=FutureWarning) stack.enter_context(redirect_stderr(err))
else:
stack.enter_context(redirect_stdout(open(os.devnull, "w")))
stack.enter_context(redirect_stderr(open(os.devnull, "w")))
try: try:
task() task()
task.status = ("done",) task.status = ("done",)
except Exception: # noqa except Exception: # noqa
task.status = "task_error", format_exc() task.status = "task_error", format_exc()
if task.allow_output:
task.out = out.getvalue()
task.err = err.getvalue()
except KeyboardInterrupt: # noqa except KeyboardInterrupt: # noqa
pass pass
@@ -217,6 +224,7 @@ class Task:
fun: Callable[[Any, ...], Any], fun: Callable[[Any, ...], Any],
args: tuple[Any, ...] = (), args: tuple[Any, ...] = (),
kwargs: dict[str, Any] = None, kwargs: dict[str, Any] = None,
allow_output: bool = False,
) -> None: ) -> None:
self.handle = handle self.handle = handle
self.fun = fun self.fun = fun
@@ -225,8 +233,11 @@ class Task:
self.name = fun.__name__ if hasattr(fun, "__name__") else None self.name = fun.__name__ if hasattr(fun, "__name__") else None
self.done = False self.done = False
self.result = None self.result = None
self.out = None
self.err = None
self.future = None self.future = None
self.status = "starting" self.status = "starting"
self.allow_output = allow_output
@property @property
def fun(self) -> Callable[[Any, ...], Any]: def fun(self) -> Callable[[Any, ...], Any]:
@@ -285,11 +296,13 @@ class ParPool:
kwargs: dict[str, Any] = None, kwargs: dict[str, Any] = None,
n_processes: int = None, n_processes: int = None,
bar: Bar = None, bar: Bar = None,
allow_output: bool = False,
): ):
self.handle = 0 self.handle = 0
self.tasks = {} self.tasks = {}
self.bar = bar self.bar = bar
self.bar_lengths = {} self.bar_lengths = {}
self.allow_output = allow_output
self.fun = fun self.fun = fun
self.args = args self.args = args
self.kwargs = kwargs self.kwargs = kwargs
@@ -318,6 +331,7 @@ class ParPool:
kwargs: dict[str, Any] = None, kwargs: dict[str, Any] = None,
handle: Hashable = None, handle: Hashable = None,
barlength: int = 1, barlength: int = 1,
allow_output: bool = False,
) -> Optional[int]: ) -> Optional[int]:
if handle is None: if handle is None:
new_handle = self.handle new_handle = self.handle
@@ -331,6 +345,7 @@ class ParPool:
fun or self.fun, fun or self.fun,
args or self.args, args or self.args,
kwargs or self.kwargs, kwargs or self.kwargs,
allow_output or self.allow_output,
) )
task.future = worker.remote(task) task.future = worker.remote(task)
self.tasks[new_handle] = task self.tasks[new_handle] = task
@@ -359,6 +374,10 @@ class ParPool:
def finalize_task(self, task: Task) -> Any: def finalize_task(self, task: Task) -> Any:
code, *args = task.status code, *args = task.status
if task.out:
print(task.out, end="")
if task.err:
print(task.err, end="")
getattr(self, code)(task, *args) getattr(self, code)(task, *args)
self.tasks.pop(task.handle) self.tasks.pop(task.handle)
return task.result return task.result
@@ -366,6 +385,7 @@ class ParPool:
def get_newest(self) -> Optional[Any]: def get_newest(self) -> Optional[Any]:
"""Request the newest handle and result and delete its record. Wait if result not yet available.""" """Request the newest handle and result and delete its record. Wait if result not yet available."""
while True: while True:
if self.tasks:
for handle, task in self.tasks.items(): for handle, task in self.tasks.items():
if handle in self.bar_lengths: if handle in self.bar_lengths:
try: try:
@@ -424,6 +444,7 @@ def gmap(
n_processes: int = None, n_processes: int = None,
yield_ordered: bool = True, yield_ordered: bool = True,
yield_index: bool = False, yield_index: bool = False,
allow_output: bool = False,
**bar_kwargs: Any, **bar_kwargs: Any,
) -> Generator[Any, None, None]: ) -> Generator[Any, None, None]:
"""map a function fun to each iteration in iterable """map a function fun to each iteration in iterable
@@ -442,11 +463,11 @@ def gmap(
bar: bool enable progress bar, bar: bool enable progress bar,
or a callback function taking the number of passed iterations as an argument or a callback function taking the number of passed iterations as an argument
serial: execute in series instead of parallel if True, None (default): let pmap decide serial: execute in series instead of parallel if True, None (default): let pmap decide
length: deprecated alias for total
n_processes: number of processes to use, n_processes: number of processes to use,
the parallel pool will be restarted if the current pool does not have the right number of processes the parallel pool will be restarted if the current pool does not have the right number of processes
yield_ordered: return the result in the same order as the iterable yield_ordered: return the result in the same order as the iterable
yield_index: return the index of the result too yield_index: return the index of the result too
allow_output: allow output from subprocesses
**bar_kwargs: keywords arguments for tqdm.tqdm **bar_kwargs: keywords arguments for tqdm.tqdm
output: output:
@@ -550,7 +571,7 @@ def gmap(
bar = stack.enter_context(ExternalBar(callback=bar)) # noqa bar = stack.enter_context(ExternalBar(callback=bar)) # noqa
else: else:
bar = stack.enter_context(tqdm(**bar_kwargs)) bar = stack.enter_context(tqdm(**bar_kwargs))
with ParPool(chunk_fun, args, kwargs, n_processes, bar) as p: # type: ignore with ParPool(chunk_fun, args, kwargs, n_processes, bar, allow_output) as p: # type: ignore
for i, (j, l) in enumerate(zip(iterable, iterable.lengths)): # add work to the queue for i, (j, l) in enumerate(zip(iterable, iterable.lengths)): # add work to the queue
p(j, handle=i, barlength=l) p(j, handle=i, barlength=l)
if bar.total is None or bar.total < i + 1: if bar.total is None or bar.total < i + 1:

View File

@@ -1,6 +1,6 @@
[project] [project]
name = "parfor" name = "parfor"
version = "2026.1.0" version = "2026.1.1"
description = "A package to mimic the use of parfor as done in Matlab." description = "A package to mimic the use of parfor as done in Matlab."
authors = [ authors = [
{ name = "Wim Pomp-Pervova", email = "wimpomp@gmail.com" } { name = "Wim Pomp-Pervova", email = "wimpomp@gmail.com" }