- bugfix: some iterables caused an error

- pytest tests
- deprecation warnings
- use poetry for installs
This commit is contained in:
Wim Pomp
2023-08-08 15:05:31 +02:00
parent 7e0c0cc45d
commit 0635c62d42
5 changed files with 213 additions and 150 deletions

4
.gitignore vendored
View File

@@ -1,4 +1,6 @@
/build/
/dist/
/parfor.egg-info/
.idea
/.idea/
/.pytest_cache/
._*

View File

@@ -5,6 +5,7 @@ from traceback import format_exc
from psutil import Process
from collections import OrderedDict
from warnings import warn
from functools import wraps
from .pickler import Pickler, dumps, loads
@@ -23,26 +24,29 @@ class Chunks:
chunks(list0, list1, ..., size=s)
chunks(list0, list1, ..., number=n)
chunks(list0, list1, ..., ratio=r)
size: size of chunks, might change to optimize devision between chunks
size: size of chunks, might change to optimize division between chunks
number: number of chunks, coerced to 1 <= n <= len(list0)
ratio: number of chunks / number of cpus, coerced to 1 <= n <= len(list0)
both size and number or ratio are given: use number or ratio, unless the chunk size would be bigger than size
both ratio and number are given: use ratio
"""
def __init__(self, *args, size=None, number=None, ratio=None, length=None, s=None, n=None, r=None):
def __init__(self, *iterators, size=None, number=None, ratio=None, length=None, s=None, n=None, r=None):
# s, r and n are deprecated
if s is not None:
warn('parfor: use of \'s\' is deprecated, use \'size\' instead', DeprecationWarning, stacklevel=2)
size = s
if n is not None:
warn('parfor: use of \'n\' is deprecated, use \'number\' instead', DeprecationWarning, stacklevel=2)
number = n
if r is not None:
warn('parfor: use of \'r\' is deprecated, use \'ratio\' instead', DeprecationWarning, stacklevel=2)
ratio = r
if length is None:
try:
length = min(*[len(a) for a in args]) if len(args) > 1 else len(args[0])
length = min(*[len(iterator) for iterator in iterators]) if len(iterators) > 1 else len(iterators[0])
except TypeError:
raise TypeError('Cannot determine the length of the argument so the length must be provided as an'
raise TypeError('Cannot determine the length of the iterator(s), so the length must be provided as an'
' argument.')
if size is not None and (number is not None or ratio is not None):
if number is None:
@@ -53,7 +57,7 @@ class Chunks:
number = round(length / size)
elif ratio is not None: # number of chunks
number = int(cpu_count * ratio)
self.args = args
self.iterators = [iter(arg) for arg in iterators]
self.number_of_items = length
self.length = max(1, min(length, number))
self.lengths = [((i + 1) * self.number_of_items // self.length) - (i * self.number_of_items // self.length)
@@ -62,17 +66,10 @@ class Chunks:
def __iter__(self):
for i in range(self.length):
p, q = (i * self.number_of_items // self.length), ((i + 1) * self.number_of_items // self.length)
if len(self.args) == 1:
yield self._yielder(self.args[0], p, q)
if len(self.iterators) == 1:
yield [next(self.iterators[0]) for _ in range(q - p)]
else:
yield [self._yielder(arg, p, q) for arg in self.args]
@staticmethod
def _yielder(arg, p, q):
try:
return arg[p:q]
except TypeError:
return [next(arg) for _ in range(q-p)]
yield [[next(iterator) for _ in range(q-p)] for iterator in self.iterators]
def __len__(self):
return self.length
@@ -148,92 +145,6 @@ class TqdmMeter(tqdm):
super().__exit__(exc_type, exc_value, traceback)
def parfor(*args, **kwargs):
""" @parfor(iterator=None, args=(), kwargs={}, length=None, desc=None, bar=True, qbar=True, rP=1/3, serial=4):
decorator to parallize for-loops
required arguments:
fun: function taking arguments: iteration from iterable, other arguments defined in args & kwargs
iterable: iterable from which an item is given to fun as a first argument
optional arguments:
args: tuple with other unnamed arguments to fun
kwargs: dict with other named arguments to fun
length: give the length of the iterator in cases where len(iterator) results in an error
desc: string with description of the progress bar
bar: bool enable progress bar, or a function taking the number of passed iterations as an argument
pbar: bool enable buffer indicator bar, or a function taking the queue size as an argument
rP: ratio workers to cpu cores, default: 1
nP: number of workers, default: None, overrides rP if not None
number of workers will always be at least 2
serial: execute in series instead of parallel if True, None (default): let parfor decide
output: list with results from applying the decorated function to each iteration of the iterator
specified as the first argument to the function
examples:
<< from time import sleep
<<
@parfor(range(10), (3,))
def fun(i, a):
sleep(1)
return a*i**2
fun
>> [0, 3, 12, 27, 48, 75, 108, 147, 192, 243]
<<
@parfor(range(10), (3,), bar=False)
def fun(i, a):
sleep(1)
return a*i**2
fun
>> [0, 3, 12, 27, 48, 75, 108, 147, 192, 243]
<<
def fun(i, a):
sleep(1)
return a*i**2
pmap(fun, range(10), (3,))
>> [0, 3, 12, 27, 48, 75, 108, 147, 192, 243]
equivalent to using the deco module:
<<
@concurrent
def fun(i, a):
time.sleep(1)
return a*i**2
@synchronized
def run(iterator, a):
res = []
for i in iterator:
res.append(fun(i, a))
return res
run(range(10), 3)
>> [0, 3, 12, 27, 48, 75, 108, 147, 192, 243]
all equivalent to the serial for-loop:
<<
a = 3
fun = []
for i in range(10):
sleep(1)
fun.append(a*i**2)
fun
>> [0, 3, 12, 27, 48, 75, 108, 147, 192, 243]
"""
def decfun(fun):
return pmap(fun, *args, **kwargs)
return decfun
class Hasher:
def __init__(self, obj, hsh=None):
if hsh is not None:
@@ -388,7 +299,7 @@ class Parpool:
def error(self, error):
self.close()
raise Exception('Error occured in worker: {}'.format(error))
raise Exception('Error occurred in worker: {}'.format(error))
def task_error(self, handle, error):
if handle in self:
@@ -565,46 +476,108 @@ class Parpool:
self.n_tasks.value -= 1
def pmap(fun, iterable=None, args=None, kwargs=None, length=None, desc=None, bar=True, qbar=False, terminator=None,
rP=1, nP=None, serial=None, qsize=None):
def pmap(fun, iterable=None, args=None, kwargs=None, total=None, desc=None, bar=True, qbar=False, terminator=None,
rP=1, nP=None, serial=None, qsize=None, length=None, **bar_kwargs):
""" map a function fun to each iteration in iterable
best use: iterable is a generator and length is given to this function
use as a function: pmap
use as a decorator: parfor
best use: iterable is a generator and length is given to this function as 'total'
required:
fun: function taking arguments: iteration from iterable, other arguments defined in args & kwargs
iterable: iterable from which an item is given to fun as a first argument
iterable: iterable or iterator from which an item is given to fun as a first argument
optional:
args: tuple with other unnamed arguments to fun
kwargs: dict with other named arguments to fun
length: give the length of the iterator in cases where len(iterator) results in an error
total: give the length of the iterator in cases where len(iterator) results in an error
desc: string with description of the progress bar
bar: bool enable progress bar, or a callback function taking the number of passed iterations as an argument
bar: bool enable progress bar,
or a callback function taking the number of passed iterations as an argument
pbar: bool enable buffer indicator bar, or a callback function taking the queue size as an argument
terminator: function which is executed in each worker after all the work is done
rP: ratio workers to cpu cores, default: 1
nP: number of workers, default, None, overrides rP if not None
serial: execute in series instead of parallel if True, None (default): let pmap decide
qsize: maximum size of the task queue
length: deprecated alias for total
**bar_kwargs: keywords arguments for tqdm.tqdm
output:
list with results from applying the function \'fun\' to each iteration of the iterable / iterator
examples:
<< from time import sleep
<<
@parfor(range(10), (3,))
def fun(i, a):
sleep(1)
return a * i ** 2
fun
>> [0, 3, 12, 27, 48, 75, 108, 147, 192, 243]
<<
def fun(i, a):
sleep(1)
return a * i ** 2
pmap(fun, range(10), (3,))
>> [0, 3, 12, 27, 48, 75, 108, 147, 192, 243]
equivalent to using the deco module:
<<
@concurrent
def fun(i, a):
time.sleep(1)
return a * i ** 2
@synchronized
def run(iterator, a):
res = []
for i in iterator:
res.append(fun(i, a))
return res
run(range(10), 3)
>> [0, 3, 12, 27, 48, 75, 108, 147, 192, 243]
all equivalent to the serial for-loop:
<<
a = 3
fun = []
for i in range(10):
sleep(1)
fun.append(a * i ** 2)
fun
>> [0, 3, 12, 27, 48, 75, 108, 147, 192, 243]
"""
if total is None and length is not None:
total = length
warn('parfor: use of \'length\' is deprecated, use \'total\' instead', DeprecationWarning, stacklevel=2)
is_chunked = isinstance(iterable, Chunks)
if is_chunked:
chunk_fun = fun
else:
iterable = Chunks(iterable, ratio=5, length=length)
iterable = Chunks(iterable, ratio=5, length=total)
def chunk_fun(iterator, *args, **kwargs):
return [fun(i, *args, **kwargs) for i in iterator]
args = args or ()
kwargs = kwargs or {}
length = sum(iterable.lengths)
if 'total' not in bar_kwargs:
bar_kwargs['total'] = sum(iterable.lengths)
if 'desc' not in bar_kwargs:
bar_kwargs['desc'] = desc
if 'disable' not in bar_kwargs:
bar_kwargs['disable'] = not bar
if serial is True or (serial is None and len(iterable) < min(cpu_count, 4)): # serial case
if callable(bar):
return sum([chunk_fun(c, *args, **kwargs) for c in ExternalBar(iterable, bar)], [])
else:
return sum([chunk_fun(c, *args, **kwargs)
for c in tqdm(iterable, total=len(iterable), desc=desc, disable=not bar)], [])
return sum([chunk_fun(c, *args, **kwargs) for c in tqdm(iterable, **bar_kwargs)], [])
else: # parallel case
with ExternalBar(callback=qbar) if callable(qbar) \
else TqdmMeter(total=0, desc='Task buffer', disable=not qbar, leave=False) as qbar, \
ExternalBar(callback=bar) if callable(bar) else tqdm(total=length, desc=desc, disable=not bar) as bar:
ExternalBar(callback=bar) if callable(bar) else tqdm(**bar_kwargs) as bar:
with Parpool(chunk_fun, args, kwargs, rP, nP, bar, qbar, terminator, qsize) as p:
for i, (j, l) in enumerate(zip(iterable, iterable.lengths)): # add work to the queue
p(j, handle=i, barlength=iterable.lengths[i])
@@ -616,7 +589,25 @@ def pmap(fun, iterable=None, args=None, kwargs=None, length=None, desc=None, bar
return sum([p[i] for i in range(len(iterable))], []) # collect the results
@wraps(pmap)
def parfor(*args, **kwargs):
def decfun(fun):
return pmap(fun, *args, **kwargs)
return decfun
def deprecated(cls, name):
""" This is a decorator which can be used to mark functions and classes as deprecated. It will result in a warning
being emitted when the function or class is used."""
@wraps(cls)
def wrapper(*args, **kwargs):
warn(f'parfor: use of \'{name}\' is deprecated, use \'{cls.__name__}\' instead',
category=DeprecationWarning, stacklevel=2)
return cls(*args, **kwargs)
return wrapper
# backwards compatibility
parpool = Parpool
tqdmm = TqdmMeter
chunks = Chunks
parpool = deprecated(Parpool, 'parpool')
tqdmm = deprecated(TqdmMeter, 'tqdmm')
chunks = deprecated(Chunks, 'chunks')

23
pyproject.toml Normal file
View File

@@ -0,0 +1,23 @@
[tool.poetry]
name = "parfor"
version = "2023.8.0"
description = "A package to mimic the use of parfor as done in Matlab."
authors = ["Wim Pomp <wimpomp@gmail.com>"]
license = "GPLv3"
readme = "README.md"
keywords = ["parfor", "concurrency", "multiprocessing", "parallel"]
repository = "https://gitlab.rhpc.nki.nl/LenstraLab/LiveCellAnalysis"
[tool.poetry.dependencies]
python = "^3.5"
tqdm = ">=4.50.0"
dill = ">=0.3.0"
psutil = "*"
pytest = { version = "*", optional = true }
[tool.poetry.extras]
test = ["pytest"]
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

View File

@@ -1,23 +0,0 @@
import setuptools
with open("README.md", "r") as fh:
long_description = fh.read()
setuptools.setup(
name="parfor",
version="2022.6.0",
author="Wim Pomp",
author_email="wimpomp@gmail.com",
description="A package to mimic the use of parfor as done in Matlab.",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://github.com/wimpomp/parfor",
packages=setuptools.find_packages(),
classifiers=[
"Programming Language :: Python :: 3",
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
"Operating System :: OS Independent",
],
python_requires='>=3.5',
install_requires=['tqdm>=4.50.0', 'dill>=0.3.0', 'psutil'],
)

70
tests/test_parfor.py Normal file
View File

@@ -0,0 +1,70 @@
import pytest
from parfor import Chunks, parfor, Parpool, pmap
class SequenceIterator:
def __init__(self, sequence):
self._sequence = sequence
self._index = 0
def __iter__(self):
return self
def __next__(self):
if self._index < len(self._sequence):
item = self._sequence[self._index]
self._index += 1
return item
else:
raise StopIteration
def __len__(self):
return len(self._sequence)
class Iterable:
def __init__(self, sequence):
self.sequence = sequence
def __iter__(self):
return SequenceIterator(self.sequence)
def iterators():
yield range(10), None
yield list(range(10)), None
yield (i for i in range(10)), 10
yield SequenceIterator(range(10)), None
yield Iterable(range(10)), 10
@pytest.mark.parametrize('iterator', iterators())
def test_chunks(iterator):
chunks = Chunks(iterator[0], size=2, length=iterator[1])
assert list(chunks) == [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]]
def test_parpool():
def fun(i, j, k):
return i * j * k
with Parpool(fun, (3,), {'k': 2}) as pool:
for i in range(10):
pool[i] = i
assert [pool[i] for i in range(10)] == [0, 6, 12, 18, 24, 30, 36, 42, 48, 54]
def test_parfor():
@parfor(range(10), (3,), {'k': 2})
def fun(i, j, k):
return i * j * k
assert fun == [0, 6, 12, 18, 24, 30, 36, 42, 48, 54]
def test_pmap():
def fun(i, j, k):
return i * j * k
assert pmap(fun, range(10), (3,), {'k': 2}) == [0, 6, 12, 18, 24, 30, 36, 42, 48, 54]