diff --git a/README.md b/README.md index d679271..07a4149 100644 --- a/README.md +++ b/README.md @@ -4,36 +4,18 @@ Used to parallelize for-loops using parfor in Matlab? This package allows you to do the same in python. Take any normal serial but parallelizable for-loop and execute it in parallel using easy syntax. Don't worry about the technical details of using the multiprocessing module, race conditions, queues, -parfor handles all that. +parfor handles all that. Now powered by [ray](https://pypi.org/project/ray/). Tested on linux, Windows and OSX with python 3.10 and 3.12. ## Why is parfor better than just using multiprocessing? - Easy to use -- Using dill instead of pickle: a lot more objects can be used when parallelizing - Progress bars are built-in -- Automatically use multithreading instead of multiprocessing when the GIL is disabled +- Retry the task in the main process upon failure for easy debugging ## How it works -This depends on whether the GIL is currently disabled or not. Disabling the GIL in Python is currently an experimental -feature in Python3.13, and not the standard. - -### Python with GIL enabled -The work you want parfor to do is divided over a number of processes. These processes are started by parfor and put -together in a pool. This pool is reused when you want parfor to do more work, or shut down when no new work arrives -within 10 minutes. - -A handle to each bit of work is put in a queue from which the workers take work. The objects needed to do the work are -stored in a memory manager in serialized form (using dill) and the manager hands out an object to a worker when the -worker is requesting it. The manager deletes objects automatically when they're not needed anymore. - -When the work is done the result is sent back for collection in the main process. - -### Python with GIL disabled -The work you want parfor to do is given to a new thread. These threads are started by parfor and put together in a pool. -The threads and pool are not reused and closed automatically when done. - -When the work is done a message is sent to the main thread to update the status of the pool. +[Ray](https://pypi.org/project/ray/) does all the heavy lifting. Parfor now is just a wrapper around ray, adding +some ergonomics. ## Installation `pip install parfor` @@ -43,13 +25,7 @@ Parfor decorates a functions and returns the result of that function evaluated i an iterator. ## Requires -tqdm, dill - -## Limitations -If you're using Python with the GIL enabaled, then objects passed to the pool need to be dillable (dill needs to -serialize them). Generators and SwigPyObjects are examples of objects that cannot be used. They can be used however, for -the iterator argument when using parfor, but its iterations need to be dillable. You might be able to make objects -dillable anyhow using `dill.register` or with `__reduce__`, `__getstate__`, etc. +numpy, ray, tqdm ## Arguments To functions `parfor.parfor`, `parfor.pmap` and `parfor.gmap`. @@ -66,11 +42,11 @@ To functions `parfor.parfor`, `parfor.pmap` and `parfor.gmap`. bar: bool enable progress bar, or a callback function taking the number of passed iterations as an argument serial: execute in series instead of parallel if True, None (default): let pmap decide - length: deprecated alias for total n_processes: number of processes to use, the parallel pool will be restarted if the current pool does not have the right number of processes yield_ordered: return the result in the same order as the iterable yield_index: return the index of the result too + allow_output: allow output from subprocesses **bar_kwargs: keyword arguments for tqdm.tqdm ### Return @@ -185,3 +161,6 @@ Split a long iterator in bite-sized chunks to parallelize More low-level accessibility to parallel execution. Submit tasks and request the result at any time, (although to avoid breaking causality, submit first, then request), use different functions and function arguments for different tasks. + +## `SharedArray` +A numpy arrow that can be shared among processes. diff --git a/parfor/__init__.py b/parfor/__init__.py index 5e64ddd..bca2ddb 100644 --- a/parfor/__init__.py +++ b/parfor/__init__.py @@ -4,6 +4,7 @@ import logging import os import warnings from contextlib import ExitStack, redirect_stdout, redirect_stderr +from io import StringIO from functools import wraps from importlib.metadata import version from multiprocessing.shared_memory import SharedMemory @@ -57,7 +58,7 @@ class SharedArray(np.ndarray): shm = SharedMemory(create=True, size=int(np.prod(shape) * np.dtype(dtype).itemsize)) # type: ignore new = super().__new__(cls, shape, dtype, shm.buf, offset, strides, order) new.shm = shm - return new + return new # type: ignore def __reduce__( self, @@ -193,17 +194,23 @@ class ExternalBar(Iterable): @ray.remote def worker(task): try: - with ( - warnings.catch_warnings(), - redirect_stdout(open(os.devnull, "w")), - redirect_stderr(open(os.devnull, "w")), - ): - warnings.simplefilter("ignore", category=FutureWarning) + with ExitStack() as stack: # noqa + if task.allow_output: + out = StringIO() + err = StringIO() + stack.enter_context(redirect_stdout(out)) + stack.enter_context(redirect_stderr(err)) + else: + stack.enter_context(redirect_stdout(open(os.devnull, "w"))) + stack.enter_context(redirect_stderr(open(os.devnull, "w"))) try: task() task.status = ("done",) except Exception: # noqa task.status = "task_error", format_exc() + if task.allow_output: + task.out = out.getvalue() + task.err = err.getvalue() except KeyboardInterrupt: # noqa pass @@ -217,6 +224,7 @@ class Task: fun: Callable[[Any, ...], Any], args: tuple[Any, ...] = (), kwargs: dict[str, Any] = None, + allow_output: bool = False, ) -> None: self.handle = handle self.fun = fun @@ -225,8 +233,11 @@ class Task: self.name = fun.__name__ if hasattr(fun, "__name__") else None self.done = False self.result = None + self.out = None + self.err = None self.future = None self.status = "starting" + self.allow_output = allow_output @property def fun(self) -> Callable[[Any, ...], Any]: @@ -285,11 +296,13 @@ class ParPool: kwargs: dict[str, Any] = None, n_processes: int = None, bar: Bar = None, + allow_output: bool = False, ): self.handle = 0 self.tasks = {} self.bar = bar self.bar_lengths = {} + self.allow_output = allow_output self.fun = fun self.args = args self.kwargs = kwargs @@ -318,6 +331,7 @@ class ParPool: kwargs: dict[str, Any] = None, handle: Hashable = None, barlength: int = 1, + allow_output: bool = False, ) -> Optional[int]: if handle is None: new_handle = self.handle @@ -331,6 +345,7 @@ class ParPool: fun or self.fun, args or self.args, kwargs or self.kwargs, + allow_output or self.allow_output, ) task.future = worker.remote(task) self.tasks[new_handle] = task @@ -359,6 +374,10 @@ class ParPool: def finalize_task(self, task: Task) -> Any: code, *args = task.status + if task.out: + print(task.out, end="") + if task.err: + print(task.err, end="") getattr(self, code)(task, *args) self.tasks.pop(task.handle) return task.result @@ -366,13 +385,14 @@ class ParPool: def get_newest(self) -> Optional[Any]: """Request the newest handle and result and delete its record. Wait if result not yet available.""" while True: - for handle, task in self.tasks.items(): - if handle in self.bar_lengths: - try: - task = ray.get(task.future, timeout=0.01) - return task.handle, self.finalize_task(task) - except ray.exceptions.GetTimeoutError: - pass + if self.tasks: + for handle, task in self.tasks.items(): + if handle in self.bar_lengths: + try: + task = ray.get(task.future, timeout=0.01) + return task.handle, self.finalize_task(task) + except ray.exceptions.GetTimeoutError: + pass def task_error(self, task: Task, error: Exception) -> None: if task.handle in self: @@ -424,6 +444,7 @@ def gmap( n_processes: int = None, yield_ordered: bool = True, yield_index: bool = False, + allow_output: bool = False, **bar_kwargs: Any, ) -> Generator[Any, None, None]: """map a function fun to each iteration in iterable @@ -442,11 +463,11 @@ def gmap( bar: bool enable progress bar, or a callback function taking the number of passed iterations as an argument serial: execute in series instead of parallel if True, None (default): let pmap decide - length: deprecated alias for total n_processes: number of processes to use, the parallel pool will be restarted if the current pool does not have the right number of processes yield_ordered: return the result in the same order as the iterable yield_index: return the index of the result too + allow_output: allow output from subprocesses **bar_kwargs: keywords arguments for tqdm.tqdm output: @@ -550,7 +571,7 @@ def gmap( bar = stack.enter_context(ExternalBar(callback=bar)) # noqa else: bar = stack.enter_context(tqdm(**bar_kwargs)) - with ParPool(chunk_fun, args, kwargs, n_processes, bar) as p: # type: ignore + with ParPool(chunk_fun, args, kwargs, n_processes, bar, allow_output) as p: # type: ignore for i, (j, l) in enumerate(zip(iterable, iterable.lengths)): # add work to the queue p(j, handle=i, barlength=l) if bar.total is None or bar.total < i + 1: diff --git a/pyproject.toml b/pyproject.toml index b0585f1..2fabce6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "parfor" -version = "2026.1.0" +version = "2026.1.1" description = "A package to mimic the use of parfor as done in Matlab." authors = [ { name = "Wim Pomp-Pervova", email = "wimpomp@gmail.com" }