diff --git a/.gitignore b/.gitignore index 54a7f77..44ac3bc 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,6 @@ /build/ /dist/ /parfor.egg-info/ -.idea +/.idea/ +/.pytest_cache/ +._* \ No newline at end of file diff --git a/parfor/__init__.py b/parfor/__init__.py index 0fe72f0..d4c9bb1 100644 --- a/parfor/__init__.py +++ b/parfor/__init__.py @@ -5,6 +5,7 @@ from traceback import format_exc from psutil import Process from collections import OrderedDict from warnings import warn +from functools import wraps from .pickler import Pickler, dumps, loads @@ -23,26 +24,29 @@ class Chunks: chunks(list0, list1, ..., size=s) chunks(list0, list1, ..., number=n) chunks(list0, list1, ..., ratio=r) - size: size of chunks, might change to optimize devision between chunks + size: size of chunks, might change to optimize division between chunks number: number of chunks, coerced to 1 <= n <= len(list0) ratio: number of chunks / number of cpus, coerced to 1 <= n <= len(list0) both size and number or ratio are given: use number or ratio, unless the chunk size would be bigger than size both ratio and number are given: use ratio """ - def __init__(self, *args, size=None, number=None, ratio=None, length=None, s=None, n=None, r=None): + def __init__(self, *iterators, size=None, number=None, ratio=None, length=None, s=None, n=None, r=None): # s, r and n are deprecated if s is not None: + warn('parfor: use of \'s\' is deprecated, use \'size\' instead', DeprecationWarning, stacklevel=2) size = s if n is not None: + warn('parfor: use of \'n\' is deprecated, use \'number\' instead', DeprecationWarning, stacklevel=2) number = n if r is not None: + warn('parfor: use of \'r\' is deprecated, use \'ratio\' instead', DeprecationWarning, stacklevel=2) ratio = r if length is None: try: - length = min(*[len(a) for a in args]) if len(args) > 1 else len(args[0]) + length = min(*[len(iterator) for iterator in iterators]) if len(iterators) > 1 else len(iterators[0]) except TypeError: - raise TypeError('Cannot determine the length of the argument so the length must be provided as an' + raise TypeError('Cannot determine the length of the iterator(s), so the length must be provided as an' ' argument.') if size is not None and (number is not None or ratio is not None): if number is None: @@ -53,7 +57,7 @@ class Chunks: number = round(length / size) elif ratio is not None: # number of chunks number = int(cpu_count * ratio) - self.args = args + self.iterators = [iter(arg) for arg in iterators] self.number_of_items = length self.length = max(1, min(length, number)) self.lengths = [((i + 1) * self.number_of_items // self.length) - (i * self.number_of_items // self.length) @@ -62,17 +66,10 @@ class Chunks: def __iter__(self): for i in range(self.length): p, q = (i * self.number_of_items // self.length), ((i + 1) * self.number_of_items // self.length) - if len(self.args) == 1: - yield self._yielder(self.args[0], p, q) + if len(self.iterators) == 1: + yield [next(self.iterators[0]) for _ in range(q - p)] else: - yield [self._yielder(arg, p, q) for arg in self.args] - - @staticmethod - def _yielder(arg, p, q): - try: - return arg[p:q] - except TypeError: - return [next(arg) for _ in range(q-p)] + yield [[next(iterator) for _ in range(q-p)] for iterator in self.iterators] def __len__(self): return self.length @@ -148,92 +145,6 @@ class TqdmMeter(tqdm): super().__exit__(exc_type, exc_value, traceback) -def parfor(*args, **kwargs): - """ @parfor(iterator=None, args=(), kwargs={}, length=None, desc=None, bar=True, qbar=True, rP=1/3, serial=4): - decorator to parallize for-loops - - required arguments: - fun: function taking arguments: iteration from iterable, other arguments defined in args & kwargs - iterable: iterable from which an item is given to fun as a first argument - - optional arguments: - args: tuple with other unnamed arguments to fun - kwargs: dict with other named arguments to fun - length: give the length of the iterator in cases where len(iterator) results in an error - desc: string with description of the progress bar - bar: bool enable progress bar, or a function taking the number of passed iterations as an argument - pbar: bool enable buffer indicator bar, or a function taking the queue size as an argument - rP: ratio workers to cpu cores, default: 1 - nP: number of workers, default: None, overrides rP if not None - number of workers will always be at least 2 - serial: execute in series instead of parallel if True, None (default): let parfor decide - - output: list with results from applying the decorated function to each iteration of the iterator - specified as the first argument to the function - - examples: - << from time import sleep - - << - @parfor(range(10), (3,)) - def fun(i, a): - sleep(1) - return a*i**2 - fun - - >> [0, 3, 12, 27, 48, 75, 108, 147, 192, 243] - - << - @parfor(range(10), (3,), bar=False) - def fun(i, a): - sleep(1) - return a*i**2 - fun - - >> [0, 3, 12, 27, 48, 75, 108, 147, 192, 243] - - << - def fun(i, a): - sleep(1) - return a*i**2 - pmap(fun, range(10), (3,)) - - >> [0, 3, 12, 27, 48, 75, 108, 147, 192, 243] - - equivalent to using the deco module: - << - @concurrent - def fun(i, a): - time.sleep(1) - return a*i**2 - - @synchronized - def run(iterator, a): - res = [] - for i in iterator: - res.append(fun(i, a)) - return res - - run(range(10), 3) - - >> [0, 3, 12, 27, 48, 75, 108, 147, 192, 243] - - all equivalent to the serial for-loop: - << - a = 3 - fun = [] - for i in range(10): - sleep(1) - fun.append(a*i**2) - fun - - >> [0, 3, 12, 27, 48, 75, 108, 147, 192, 243] - """ - def decfun(fun): - return pmap(fun, *args, **kwargs) - return decfun - - class Hasher: def __init__(self, obj, hsh=None): if hsh is not None: @@ -388,7 +299,7 @@ class Parpool: def error(self, error): self.close() - raise Exception('Error occured in worker: {}'.format(error)) + raise Exception('Error occurred in worker: {}'.format(error)) def task_error(self, handle, error): if handle in self: @@ -565,46 +476,108 @@ class Parpool: self.n_tasks.value -= 1 -def pmap(fun, iterable=None, args=None, kwargs=None, length=None, desc=None, bar=True, qbar=False, terminator=None, - rP=1, nP=None, serial=None, qsize=None): +def pmap(fun, iterable=None, args=None, kwargs=None, total=None, desc=None, bar=True, qbar=False, terminator=None, + rP=1, nP=None, serial=None, qsize=None, length=None, **bar_kwargs): """ map a function fun to each iteration in iterable - best use: iterable is a generator and length is given to this function + use as a function: pmap + use as a decorator: parfor + best use: iterable is a generator and length is given to this function as 'total' - fun: function taking arguments: iteration from iterable, other arguments defined in args & kwargs - iterable: iterable from which an item is given to fun as a first argument - args: tuple with other unnamed arguments to fun - kwargs: dict with other named arguments to fun - length: give the length of the iterator in cases where len(iterator) results in an error - desc: string with description of the progress bar - bar: bool enable progress bar, or a callback function taking the number of passed iterations as an argument - pbar: bool enable buffer indicator bar, or a callback function taking the queue size as an argument - terminator: function which is executed in each worker after all the work is done - rP: ratio workers to cpu cores, default: 1 - nP: number of workers, default, None, overrides rP if not None - serial: execute in series instead of parallel if True, None (default): let pmap decide + required: + fun: function taking arguments: iteration from iterable, other arguments defined in args & kwargs + iterable: iterable or iterator from which an item is given to fun as a first argument + optional: + args: tuple with other unnamed arguments to fun + kwargs: dict with other named arguments to fun + total: give the length of the iterator in cases where len(iterator) results in an error + desc: string with description of the progress bar + bar: bool enable progress bar, + or a callback function taking the number of passed iterations as an argument + pbar: bool enable buffer indicator bar, or a callback function taking the queue size as an argument + terminator: function which is executed in each worker after all the work is done + rP: ratio workers to cpu cores, default: 1 + nP: number of workers, default, None, overrides rP if not None + serial: execute in series instead of parallel if True, None (default): let pmap decide + qsize: maximum size of the task queue + length: deprecated alias for total + **bar_kwargs: keywords arguments for tqdm.tqdm + + output: + list with results from applying the function \'fun\' to each iteration of the iterable / iterator + + examples: + << from time import sleep + << + @parfor(range(10), (3,)) + def fun(i, a): + sleep(1) + return a * i ** 2 + fun + >> [0, 3, 12, 27, 48, 75, 108, 147, 192, 243] + + << + def fun(i, a): + sleep(1) + return a * i ** 2 + pmap(fun, range(10), (3,)) + >> [0, 3, 12, 27, 48, 75, 108, 147, 192, 243] + + equivalent to using the deco module: + << + @concurrent + def fun(i, a): + time.sleep(1) + return a * i ** 2 + + @synchronized + def run(iterator, a): + res = [] + for i in iterator: + res.append(fun(i, a)) + return res + run(range(10), 3) + >> [0, 3, 12, 27, 48, 75, 108, 147, 192, 243] + + all equivalent to the serial for-loop: + << + a = 3 + fun = [] + for i in range(10): + sleep(1) + fun.append(a * i ** 2) + fun + >> [0, 3, 12, 27, 48, 75, 108, 147, 192, 243] """ + if total is None and length is not None: + total = length + warn('parfor: use of \'length\' is deprecated, use \'total\' instead', DeprecationWarning, stacklevel=2) is_chunked = isinstance(iterable, Chunks) if is_chunked: chunk_fun = fun else: - iterable = Chunks(iterable, ratio=5, length=length) + iterable = Chunks(iterable, ratio=5, length=total) + def chunk_fun(iterator, *args, **kwargs): return [fun(i, *args, **kwargs) for i in iterator] args = args or () kwargs = kwargs or {} - length = sum(iterable.lengths) + if 'total' not in bar_kwargs: + bar_kwargs['total'] = sum(iterable.lengths) + if 'desc' not in bar_kwargs: + bar_kwargs['desc'] = desc + if 'disable' not in bar_kwargs: + bar_kwargs['disable'] = not bar if serial is True or (serial is None and len(iterable) < min(cpu_count, 4)): # serial case if callable(bar): return sum([chunk_fun(c, *args, **kwargs) for c in ExternalBar(iterable, bar)], []) else: - return sum([chunk_fun(c, *args, **kwargs) - for c in tqdm(iterable, total=len(iterable), desc=desc, disable=not bar)], []) - else: # parallel case + return sum([chunk_fun(c, *args, **kwargs) for c in tqdm(iterable, **bar_kwargs)], []) + else: # parallel case with ExternalBar(callback=qbar) if callable(qbar) \ else TqdmMeter(total=0, desc='Task buffer', disable=not qbar, leave=False) as qbar, \ - ExternalBar(callback=bar) if callable(bar) else tqdm(total=length, desc=desc, disable=not bar) as bar: + ExternalBar(callback=bar) if callable(bar) else tqdm(**bar_kwargs) as bar: with Parpool(chunk_fun, args, kwargs, rP, nP, bar, qbar, terminator, qsize) as p: for i, (j, l) in enumerate(zip(iterable, iterable.lengths)): # add work to the queue p(j, handle=i, barlength=iterable.lengths[i]) @@ -616,7 +589,25 @@ def pmap(fun, iterable=None, args=None, kwargs=None, length=None, desc=None, bar return sum([p[i] for i in range(len(iterable))], []) # collect the results +@wraps(pmap) +def parfor(*args, **kwargs): + def decfun(fun): + return pmap(fun, *args, **kwargs) + return decfun + + +def deprecated(cls, name): + """ This is a decorator which can be used to mark functions and classes as deprecated. It will result in a warning + being emitted when the function or class is used.""" + @wraps(cls) + def wrapper(*args, **kwargs): + warn(f'parfor: use of \'{name}\' is deprecated, use \'{cls.__name__}\' instead', + category=DeprecationWarning, stacklevel=2) + return cls(*args, **kwargs) + return wrapper + + # backwards compatibility -parpool = Parpool -tqdmm = TqdmMeter -chunks = Chunks +parpool = deprecated(Parpool, 'parpool') +tqdmm = deprecated(TqdmMeter, 'tqdmm') +chunks = deprecated(Chunks, 'chunks') diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..89e6822 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,23 @@ +[tool.poetry] +name = "parfor" +version = "2023.8.0" +description = "A package to mimic the use of parfor as done in Matlab." +authors = ["Wim Pomp "] +license = "GPLv3" +readme = "README.md" +keywords = ["parfor", "concurrency", "multiprocessing", "parallel"] +repository = "https://gitlab.rhpc.nki.nl/LenstraLab/LiveCellAnalysis" + +[tool.poetry.dependencies] +python = "^3.5" +tqdm = ">=4.50.0" +dill = ">=0.3.0" +psutil = "*" +pytest = { version = "*", optional = true } + +[tool.poetry.extras] +test = ["pytest"] + +[build-system] +requires = ["poetry-core"] +build-backend = "poetry.core.masonry.api" diff --git a/setup.py b/setup.py deleted file mode 100644 index d4c8fa7..0000000 --- a/setup.py +++ /dev/null @@ -1,23 +0,0 @@ -import setuptools - -with open("README.md", "r") as fh: - long_description = fh.read() - -setuptools.setup( - name="parfor", - version="2022.6.0", - author="Wim Pomp", - author_email="wimpomp@gmail.com", - description="A package to mimic the use of parfor as done in Matlab.", - long_description=long_description, - long_description_content_type="text/markdown", - url="https://github.com/wimpomp/parfor", - packages=setuptools.find_packages(), - classifiers=[ - "Programming Language :: Python :: 3", - "License :: OSI Approved :: GNU General Public License v3 (GPLv3)", - "Operating System :: OS Independent", - ], - python_requires='>=3.5', - install_requires=['tqdm>=4.50.0', 'dill>=0.3.0', 'psutil'], -) diff --git a/tests/test_parfor.py b/tests/test_parfor.py new file mode 100644 index 0000000..2c400e9 --- /dev/null +++ b/tests/test_parfor.py @@ -0,0 +1,70 @@ +import pytest +from parfor import Chunks, parfor, Parpool, pmap + + +class SequenceIterator: + def __init__(self, sequence): + self._sequence = sequence + self._index = 0 + + def __iter__(self): + return self + + def __next__(self): + if self._index < len(self._sequence): + item = self._sequence[self._index] + self._index += 1 + return item + else: + raise StopIteration + + def __len__(self): + return len(self._sequence) + + +class Iterable: + def __init__(self, sequence): + self.sequence = sequence + + def __iter__(self): + return SequenceIterator(self.sequence) + + +def iterators(): + yield range(10), None + yield list(range(10)), None + yield (i for i in range(10)), 10 + yield SequenceIterator(range(10)), None + yield Iterable(range(10)), 10 + + +@pytest.mark.parametrize('iterator', iterators()) +def test_chunks(iterator): + chunks = Chunks(iterator[0], size=2, length=iterator[1]) + assert list(chunks) == [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]] + + +def test_parpool(): + def fun(i, j, k): + return i * j * k + + with Parpool(fun, (3,), {'k': 2}) as pool: + for i in range(10): + pool[i] = i + + assert [pool[i] for i in range(10)] == [0, 6, 12, 18, 24, 30, 36, 42, 48, 54] + + +def test_parfor(): + @parfor(range(10), (3,), {'k': 2}) + def fun(i, j, k): + return i * j * k + + assert fun == [0, 6, 12, 18, 24, 30, 36, 42, 48, 54] + + +def test_pmap(): + def fun(i, j, k): + return i * j * k + + assert pmap(fun, range(10), (3,), {'k': 2}) == [0, 6, 12, 18, 24, 30, 36, 42, 48, 54]