diff --git a/README.md b/README.md index 0eb699e..b556f9a 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ Parfor decorates a functions and returns the result of that function evaluated i an iterator. ## Requires -tqdm, dill, psutil +tqdm, dill ## Limitations Objects passed to the pool need to be dillable (dill needs to serialize them). Generators and SwigPyObjects are examples @@ -27,11 +27,6 @@ of objects that cannot be used. They can be used however, for the iterator argum iterations need to be dillable. You might be able to make objects dillable anyhow using `dill.register` or with `__reduce__`, `__getstate__`, etc. -The function evaluated in parallel needs to terminate. If parfor hangs after seeming to complete the task, it probably -is because the individual processes cannot terminate. Importing javabridge (used in python-bioformats) and starting the -java virtual machine can cause it to hang since the processes only terminate after the java vm has quit. In this case, -pass terminator=javabridge.kill_vm to parfor. - On OSX the buffer bar does not work due to limitations of the OS. ## Arguments @@ -47,7 +42,6 @@ On OSX the buffer bar does not work due to limitations of the OS. bar: bool enable progress bar, or a callback function taking the number of passed iterations as an argument pbar: bool enable buffer indicator bar, or a callback function taking the queue size as an argument - terminator: function which is executed in each worker after all the work is done rP: ratio workers to cpu cores, default: 1 nP: number of workers, default, None, overrides rP if not None serial: execute in series instead of parallel if True, None (default): let pmap decide diff --git a/parfor/__init__.py b/parfor/__init__.py index d4c9bb1..8185dc4 100644 --- a/parfor/__init__.py +++ b/parfor/__init__.py @@ -2,7 +2,6 @@ import multiprocessing from os import getpid from tqdm.auto import tqdm from traceback import format_exc -from psutil import Process from collections import OrderedDict from warnings import warn from functools import wraps @@ -35,12 +34,15 @@ class Chunks: # s, r and n are deprecated if s is not None: warn('parfor: use of \'s\' is deprecated, use \'size\' instead', DeprecationWarning, stacklevel=2) + warn('parfor: use of \'s\' is deprecated, use \'size\' instead', DeprecationWarning, stacklevel=3) size = s if n is not None: warn('parfor: use of \'n\' is deprecated, use \'number\' instead', DeprecationWarning, stacklevel=2) + warn('parfor: use of \'n\' is deprecated, use \'number\' instead', DeprecationWarning, stacklevel=3) number = n if r is not None: warn('parfor: use of \'r\' is deprecated, use \'ratio\' instead', DeprecationWarning, stacklevel=2) + warn('parfor: use of \'r\' is deprecated, use \'ratio\' instead', DeprecationWarning, stacklevel=3) ratio = r if length is None: try: @@ -241,12 +243,23 @@ class Task: return 'Task {}'.format(self.handle) +class Context(multiprocessing.context.SpawnContext): + """ Provide a context where child processes never are daemonic. """ + class Process(multiprocessing.context.SpawnProcess): + @property + def daemon(self): + return False + + @daemon.setter + def daemon(self, value): + pass + + class Parpool: """ Parallel processing with addition of iterations at any time and request of that result any time after that. The target function and its argument can be changed at any time. """ - def __init__(self, fun=None, args=None, kwargs=None, rP=None, nP=None, bar=None, qbar=None, terminator=None, - qsize=None): + def __init__(self, fun=None, args=None, kwargs=None, rP=None, nP=None, bar=None, qbar=None, qsize=None): """ fun, args, kwargs: target function and its arguments and keyword arguments rP: ratio workers to cpu cores, default: 1 nP: number of workers, default, None, overrides rP if not None @@ -259,16 +272,13 @@ class Parpool: self.nP = int(nP) self.nP = max(self.nP, 2) self.task = Task(fun, args, kwargs) - if hasattr(multiprocessing, 'get_context'): - ctx = multiprocessing.get_context('spawn') - else: - ctx = multiprocessing self.is_started = False + ctx = Context() self.n_tasks = ctx.Value('i', self.nP) self.event = ctx.Event() self.queue_in = ctx.Queue(qsize or 3 * self.nP) self.queue_out = ctx.Queue(qsize or 12 * self.nP) - self.pool = ctx.Pool(self.nP, self._Worker(self.queue_in, self.queue_out, self.n_tasks, self.event, terminator)) + self.pool = ctx.Pool(self.nP, self._Worker(self.queue_in, self.queue_out, self.n_tasks, self.event)) self.is_alive = True self.handle = 0 self.tasks = {} @@ -292,7 +302,7 @@ class Parpool: return True except multiprocessing.queues.Empty: for handle, task in self.tasks.items(): # retry a task if the process doing it was killed - if task.pid is not None and task.pid not in [child.pid for child in Process().children()]: + if task.pid is not None and task.pid not in [child.pid for child in multiprocessing.active_children()]: self.queue_in.put(task) warn('Task {} was restarted because process {} was probably killed.'.format(task.handle, task.pid)) return False @@ -435,13 +445,12 @@ class Parpool: class _Worker(object): """ Manages executing the target function which will be executed in different processes. """ - def __init__(self, queue_in, queue_out, n_tasks, event, terminator, cachesize=48): + def __init__(self, queue_in, queue_out, n_tasks, event, cachesize=48): self.cache = DequeDict(cachesize) self.queue_in = queue_in self.queue_out = queue_out self.n_tasks = n_tasks self.event = event - self.terminator = dumps(terminator, recurse=True) def add_to_queue(self, *args): while not self.event.is_set(): @@ -468,10 +477,8 @@ class Parpool: except Exception: self.add_to_queue('error', format_exc()) self.event.set() - terminator = loads(self.terminator) - kill_vm() - if terminator is not None: - terminator() + for child in multiprocessing.active_children(): + child.kill() with self.n_tasks.get_lock(): self.n_tasks.value -= 1 @@ -494,7 +501,6 @@ def pmap(fun, iterable=None, args=None, kwargs=None, total=None, desc=None, bar= bar: bool enable progress bar, or a callback function taking the number of passed iterations as an argument pbar: bool enable buffer indicator bar, or a callback function taking the queue size as an argument - terminator: function which is executed in each worker after all the work is done rP: ratio workers to cpu cores, default: 1 nP: number of workers, default, None, overrides rP if not None serial: execute in series instead of parallel if True, None (default): let pmap decide @@ -551,6 +557,12 @@ def pmap(fun, iterable=None, args=None, kwargs=None, total=None, desc=None, bar= if total is None and length is not None: total = length warn('parfor: use of \'length\' is deprecated, use \'total\' instead', DeprecationWarning, stacklevel=2) + warn('parfor: use of \'length\' is deprecated, use \'total\' instead', DeprecationWarning, stacklevel=3) + if terminator is not None: + warn('parfor: use of \'terminator\' is deprecated, workers are terminated automatically', + DeprecationWarning, stacklevel=2) + warn('parfor: use of \'terminator\' is deprecated, workers are terminated automatically', + DeprecationWarning, stacklevel=3) is_chunked = isinstance(iterable, Chunks) if is_chunked: chunk_fun = fun @@ -578,7 +590,7 @@ def pmap(fun, iterable=None, args=None, kwargs=None, total=None, desc=None, bar= with ExternalBar(callback=qbar) if callable(qbar) \ else TqdmMeter(total=0, desc='Task buffer', disable=not qbar, leave=False) as qbar, \ ExternalBar(callback=bar) if callable(bar) else tqdm(**bar_kwargs) as bar: - with Parpool(chunk_fun, args, kwargs, rP, nP, bar, qbar, terminator, qsize) as p: + with Parpool(chunk_fun, args, kwargs, rP, nP, bar, qbar, qsize) as p: for i, (j, l) in enumerate(zip(iterable, iterable.lengths)): # add work to the queue p(j, handle=i, barlength=iterable.lengths[i]) if bar.total is None or bar.total < i+1: @@ -603,6 +615,8 @@ def deprecated(cls, name): def wrapper(*args, **kwargs): warn(f'parfor: use of \'{name}\' is deprecated, use \'{cls.__name__}\' instead', category=DeprecationWarning, stacklevel=2) + warn(f'parfor: use of \'{name}\' is deprecated, use \'{cls.__name__}\' instead', + category=DeprecationWarning, stacklevel=3) return cls(*args, **kwargs) return wrapper diff --git a/pyproject.toml b/pyproject.toml index 8e2a602..a84350b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "parfor" -version = "2023.8.1" +version = "2023.8.2" description = "A package to mimic the use of parfor as done in Matlab." authors = ["Wim Pomp "] license = "GPLv3" @@ -12,7 +12,6 @@ repository = "https://github.com/wimpomp/parfor" python = "^3.5" tqdm = ">=4.50.0" dill = ">=0.3.0" -psutil = "*" pytest = { version = "*", optional = true } [tool.poetry.extras]