- rename some things to conform to PEP

- pmap and parfor now automatically divide the task in chunks: less pickling is needed this way
- pickle code in a new file: prevent some unpickleble things from getting pickled
- Chunks now supports generators when the length keyword argument is supplied
- TqdmMeter can have a description
This commit is contained in:
Wim Pomp
2022-05-03 15:55:11 +02:00
parent 865ec70d97
commit 32c5560306
4 changed files with 211 additions and 183 deletions

View File

@@ -162,5 +162,5 @@ More low-level accessibility to parallel execution. Submit tasks and request the
(although necessarily submit first, then request a specific task), use different functions and function (although necessarily submit first, then request a specific task), use different functions and function
arguments for different tasks. arguments for different tasks.
## Tqdmm ## TqdmMeter
Meter bar, inherited from tqdm, used for displaying buffers. Meter bar, inherited from tqdm, used for displaying buffers.

View File

@@ -1,161 +1,81 @@
import multiprocessing import multiprocessing
import dill
from os import getpid from os import getpid
from tqdm.auto import tqdm from tqdm.auto import tqdm
from traceback import format_exc from traceback import format_exc
from pickle import PicklingError, dispatch_table
from psutil import Process from psutil import Process
from collections import OrderedDict from collections import OrderedDict
from io import BytesIO
from warnings import warn from warnings import warn
from .pickler import Pickler, dumps, loads
try: try:
from javabridge import kill_vm from javabridge import kill_vm
except ImportError: except ImportError:
kill_vm = lambda: None kill_vm = lambda: None
failed_rv = (lambda *args, **kwargs: None, ())
cpu_count = int(multiprocessing.cpu_count()) cpu_count = int(multiprocessing.cpu_count())
class Pickler(dill.Pickler): class Chunks:
""" Overload dill to ignore unpickleble parts of objects.
You probably didn't want to use these parts anyhow.
However, if you did, you'll have to find some way to make them pickleble.
"""
def save(self, obj, save_persistent_id=True):
""" Copied from pickle and amended. """
self.framer.commit_frame()
# Check for persistent id (defined by a subclass)
pid = self.persistent_id(obj)
if pid is not None and save_persistent_id:
self.save_pers(pid)
return
# Check the memo
x = self.memo.get(id(obj))
if x is not None:
self.write(self.get(x[0]))
return
rv = NotImplemented
reduce = getattr(self, "reducer_override", None)
if reduce is not None:
rv = reduce(obj)
if rv is NotImplemented:
# Check the type dispatch table
t = type(obj)
f = self.dispatch.get(t)
if f is not None:
f(self, obj) # Call unbound method with explicit self
return
# Check private dispatch table if any, or else
# copyreg.dispatch_table
reduce = getattr(self, 'dispatch_table', dispatch_table).get(t)
if reduce is not None:
rv = reduce(obj)
else:
# Check for a class with a custom metaclass; treat as regular
# class
if issubclass(t, type):
self.save_global(obj)
return
# Check for a __reduce_ex__ method, fall back to __reduce__
reduce = getattr(obj, "__reduce_ex__", None)
try:
if reduce is not None:
rv = reduce(self.proto)
else:
reduce = getattr(obj, "__reduce__", None)
if reduce is not None:
rv = reduce()
else:
raise PicklingError("Can't pickle %r object: %r" %
(t.__name__, obj))
except Exception:
rv = failed_rv
# Check for string returned by reduce(), meaning "save as global"
if isinstance(rv, str):
try:
self.save_global(obj, rv)
except Exception:
self.save_global(obj, failed_rv)
return
# Assert that reduce() returned a tuple
if not isinstance(rv, tuple):
raise PicklingError("%s must return string or tuple" % reduce)
# Assert that it returned an appropriately sized tuple
l = len(rv)
if not (2 <= l <= 6):
raise PicklingError("Tuple returned by %s must have "
"two to six elements" % reduce)
# Save the reduce() output and finally memoize the object
try:
self.save_reduce(obj=obj, *rv)
except Exception:
self.save_reduce(obj=obj, *failed_rv)
def dumps(obj, protocol=None, byref=None, fmode=None, recurse=True, **kwds):
"""pickle an object to a string"""
protocol = dill.settings['protocol'] if protocol is None else int(protocol)
_kwds = kwds.copy()
_kwds.update(dict(byref=byref, fmode=fmode, recurse=recurse))
file = BytesIO()
Pickler(file, protocol, **_kwds).dump(obj)
return file.getvalue()
class chunks(object):
""" Yield successive chunks from lists. """ Yield successive chunks from lists.
Usage: chunks(s, list0, list1, ...) Usage: chunks(list0, list1, ...)
chunks(list0, list1, ..., s=s) chunks(list0, list1, ..., size=s)
chunks(list0, list1, ..., n=n) chunks(list0, list1, ..., number=n)
chunks(list0, list1, ..., r=r) chunks(list0, list1, ..., ratio=r)
s: size of chunks, might change to optimize devision between chunks size: size of chunks, might change to optimize devision between chunks
n: number of chunks, coerced to 1 <= n <= len(list0) number: number of chunks, coerced to 1 <= n <= len(list0)
r: number of chunks / number of cpus, coerced to 1 <= n <= len(list0) ratio: number of chunks / number of cpus, coerced to 1 <= n <= len(list0)
both s and n or r are given: use n or r, unless the chunk size would be bigger than s both size and number or ratio are given: use number or ratio, unless the chunk size would be bigger than size
both r and n are given: use n both ratio and number are given: use ratio
""" """
def __init__(self, *args, **kwargs): def __init__(self, *args, size=None, number=None, ratio=None, length=None, s=None, n=None, r=None):
if 's' in kwargs and ('n' in kwargs or 'r' in kwargs): # s, r and n are deprecated
number_of_items = min(*[len(a) for a in args]) if len(args) > 1 else len(args[0]) if s is not None:
n = kwargs['n'] if 'n' in kwargs else int(cpu_count * kwargs['r']) size = s
n = n if number_of_items < kwargs['s'] * n else round(number_of_items / kwargs['s']) if n is not None:
elif 's' in kwargs: # size of chunks number = n
number_of_items = min(*[len(a) for a in args]) if len(args) > 1 else len(args[0]) if r is not None:
n = round(number_of_items / kwargs['s']) ratio = r
elif 'n' in kwargs or 'r' in kwargs: # number of chunks if length is None:
number_of_items = min(*[len(a) for a in args]) if len(args) > 1 else len(args[0]) try:
n = kwargs['n'] if 'n' in kwargs else int(cpu_count * kwargs['r']) length = min(*[len(a) for a in args]) if len(args) > 1 else len(args[0])
else: # size of chunks in 1st argument except TypeError:
s, *args = args raise TypeError('Cannot determine the length of the argument so the length must be provided as an'
number_of_items = min(*[len(a) for a in args]) if len(args) > 1 else len(args[0]) ' argument.')
n = round(number_of_items / s) if size is not None and (number is not None or ratio is not None):
if number is None:
number = int(cpu_count * ratio)
if length >= size * number:
number = round(length / size)
elif size is not None: # size of chunks
number = round(length / size)
elif ratio is not None: # number of chunks
number = int(cpu_count * ratio)
self.args = args self.args = args
self.number_of_arguments = len(args) == 1 self.number_of_items = length
self.number_of_items = number_of_items self.length = max(1, min(length, number))
self.len = max(1, min(number_of_items, n)) self.lengths = [((i + 1) * self.number_of_items // self.length) - (i * self.number_of_items // self.length)
self.lengths = [((i + 1) * self.number_of_items // self.len) - (i * self.number_of_items // self.len) for i in range(self.length)]
for i in range(self.len)]
def __iter__(self): def __iter__(self):
for i in range(self.len): for i in range(self.length):
p, q = (i * self.number_of_items // self.len), ((i + 1) * self.number_of_items // self.len) p, q = (i * self.number_of_items // self.length), ((i + 1) * self.number_of_items // self.length)
yield self.args[0][p:q] if self.number_of_arguments else [a[p:q] for a in self.args] if len(self.args) == 1:
yield self._yielder(self.args[0], p, q)
else:
yield [self._yielder(arg, p, q) for arg in self.args]
@staticmethod
def _yielder(arg, p, q):
try:
return arg[p:q]
except TypeError:
return [next(arg) for _ in range(q-p)]
def __len__(self): def __len__(self):
return self.len return self.length
class ExternalBar: class ExternalBar:
@@ -191,18 +111,15 @@ class ExternalBar:
self.callback(n) self.callback(n)
External_bar = ExternalBar class TqdmMeter(tqdm):
class tqdmm(tqdm):
""" Overload tqdm to make a special version of tqdm functioning as a meter. """ """ Overload tqdm to make a special version of tqdm functioning as a meter. """
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self._n = 0 self._n = 0
self.disable = False self.disable = False
if 'bar_format' not in kwargs and len(args) < 16: if 'bar_format' not in kwargs and len(args) < 16:
kwargs['bar_format'] = '{n}/{total}' kwargs['bar_format'] = '{desc}{bar}{n}/{total}'
super(tqdmm, self).__init__(*args, **kwargs) super().__init__(*args, **kwargs)
@property @property
def n(self): def n(self):
@@ -217,7 +134,7 @@ class tqdmm(tqdm):
def __exit__(self, exc_type=None, exc_value=None, traceback=None): def __exit__(self, exc_type=None, exc_value=None, traceback=None):
if not self.leave: if not self.leave:
self.n = self.total self.n = self.total
super(tqdmm, self).__exit__(exc_type, exc_value, traceback) super().__exit__(exc_type, exc_value, traceback)
def parfor(*args, **kwargs): def parfor(*args, **kwargs):
@@ -238,7 +155,7 @@ def parfor(*args, **kwargs):
rP: ratio workers to cpu cores, default: 1 rP: ratio workers to cpu cores, default: 1
nP: number of workers, default: None, overrides rP if not None nP: number of workers, default: None, overrides rP if not None
number of workers will always be at least 2 number of workers will always be at least 2
serial: switch to serial if number of tasks less than serial, default: 4 serial: execute in series instead of parallel if True, None (default): let parfor decide
output: list with results from applying the decorated function to each iteration of the iterator output: list with results from applying the decorated function to each iteration of the iterator
specified as the first argument to the function specified as the first argument to the function
@@ -306,7 +223,7 @@ def parfor(*args, **kwargs):
return decfun return decfun
class Hasher(object): class Hasher:
def __init__(self, obj, hsh=None): def __init__(self, obj, hsh=None):
if hsh is not None: if hsh is not None:
self.obj, self.str, self.hash = None, obj, hsh self.obj, self.str, self.hash = None, obj, hsh
@@ -322,14 +239,14 @@ class Hasher(object):
def set_from_cache(self, cache=None): def set_from_cache(self, cache=None):
if cache is None: if cache is None:
self.obj = dill.loads(self.str) self.obj = loads(self.str)
elif self.hash in cache: elif self.hash in cache:
self.obj = cache[self.hash] self.obj = cache[self.hash]
else: else:
self.obj = cache[self.hash] = dill.loads(self.str) self.obj = cache[self.hash] = loads(self.str)
class HashDescriptor(object): class HashDescriptor:
def __set_name__(self, owner, name): def __set_name__(self, owner, name):
self.owner, self.name = owner, '_' + name self.owner, self.name = owner, '_' + name
@@ -343,25 +260,25 @@ class HashDescriptor(object):
return getattr(instance, self.name).obj return getattr(instance, self.name).obj
class deque_dict(OrderedDict): class DequeDict(OrderedDict):
def __init__(self, maxlen=None, *args, **kwargs): def __init__(self, maxlen=None, *args, **kwargs):
self.maxlen = maxlen self.maxlen = maxlen
super(deque_dict, self).__init__(*args, **kwargs) super().__init__(*args, **kwargs)
def __truncate__(self): def __truncate__(self):
while len(self) > self.maxlen: while len(self) > self.maxlen:
self.popitem(False) self.popitem(False)
def __setitem__(self, *args, **kwargs): def __setitem__(self, *args, **kwargs):
super(deque_dict, self).__setitem__(*args, **kwargs) super().__setitem__(*args, **kwargs)
self.__truncate__() self.__truncate__()
def update(self, *args, **kwargs): def update(self, *args, **kwargs):
super(deque_dict, self).update(*args, **kwargs) super().update(*args, **kwargs)
self.__truncate__() self.__truncate__()
class Task(object): class Task:
fun = HashDescriptor() fun = HashDescriptor()
args = HashDescriptor() args = HashDescriptor()
kwargs = HashDescriptor() kwargs = HashDescriptor()
@@ -373,7 +290,7 @@ class Task(object):
self.handle = handle self.handle = handle
self.n = n self.n = n
self.done = done self.done = done
self.result = dill.loads(result) if self.done else None self.result = loads(result) if self.done else None
self.pid = None self.pid = None
def __reduce__(self): def __reduce__(self):
@@ -384,7 +301,7 @@ class Task(object):
self.done) self.done)
def set_from_cache(self, cache=None): def set_from_cache(self, cache=None):
self.n = dill.loads(self.n) self.n = loads(self.n)
self._fun.set_from_cache(cache) self._fun.set_from_cache(cache)
self._args.set_from_cache(cache) self._args.set_from_cache(cache)
self._kwargs.set_from_cache(cache) self._kwargs.set_from_cache(cache)
@@ -402,7 +319,7 @@ class Task(object):
return 'Task {}'.format(self.handle) return 'Task {}'.format(self.handle)
class parpool(object): class Parpool:
""" Parallel processing with addition of iterations at any time and request of that result any time after that. """ Parallel processing with addition of iterations at any time and request of that result any time after that.
The target function and its argument can be changed at any time. The target function and its argument can be changed at any time.
""" """
@@ -429,7 +346,7 @@ class parpool(object):
self.event = ctx.Event() self.event = ctx.Event()
self.queue_in = ctx.Queue(qsize or 3 * self.nP) self.queue_in = ctx.Queue(qsize or 3 * self.nP)
self.queue_out = ctx.Queue(qsize or 12 * self.nP) self.queue_out = ctx.Queue(qsize or 12 * self.nP)
self.pool = ctx.Pool(self.nP, self._worker(self.queue_in, self.queue_out, self.n_tasks, self.event, terminator)) self.pool = ctx.Pool(self.nP, self._Worker(self.queue_in, self.queue_out, self.n_tasks, self.event, terminator))
self.is_alive = True self.is_alive = True
self.handle = 0 self.handle = 0
self.tasks = {} self.tasks = {}
@@ -592,10 +509,10 @@ class parpool(object):
queue.close() queue.close()
queue.join_thread() queue.join_thread()
class _worker(object): class _Worker(object):
""" Manages executing the target function which will be executed in different processes. """ """ Manages executing the target function which will be executed in different processes. """
def __init__(self, queue_in, queue_out, n_tasks, event, terminator, cachesize=48): def __init__(self, queue_in, queue_out, n_tasks, event, terminator, cachesize=48):
self.cache = deque_dict(cachesize) self.cache = DequeDict(cachesize)
self.queue_in = queue_in self.queue_in = queue_in
self.queue_out = queue_out self.queue_out = queue_out
self.n_tasks = n_tasks self.n_tasks = n_tasks
@@ -627,7 +544,7 @@ class parpool(object):
except Exception: except Exception:
self.add_to_queue('error', format_exc()) self.add_to_queue('error', format_exc())
self.event.set() self.event.set()
terminator = dill.loads(self.terminator) terminator = loads(self.terminator)
kill_vm() kill_vm()
if terminator is not None: if terminator is not None:
terminator() terminator()
@@ -636,7 +553,7 @@ class parpool(object):
def pmap(fun, iterable=None, args=None, kwargs=None, length=None, desc=None, bar=True, qbar=False, terminator=None, def pmap(fun, iterable=None, args=None, kwargs=None, length=None, desc=None, bar=True, qbar=False, terminator=None,
rP=1, nP=None, serial=4, qsize=None): rP=1, nP=None, serial=None, qsize=None):
""" map a function fun to each iteration in iterable """ map a function fun to each iteration in iterable
best use: iterable is a generator and length is given to this function best use: iterable is a generator and length is given to this function
@@ -651,34 +568,42 @@ def pmap(fun, iterable=None, args=None, kwargs=None, length=None, desc=None, bar
terminator: function which is executed in each worker after all the work is done terminator: function which is executed in each worker after all the work is done
rP: ratio workers to cpu cores, default: 1 rP: ratio workers to cpu cores, default: 1
nP: number of workers, default, None, overrides rP if not None nP: number of workers, default, None, overrides rP if not None
serial: switch to serial if number of tasks less than serial, default: 4 serial: execute in series instead of parallel if True, None (default): let pmap decide
""" """
is_chunked = isinstance(iterable, Chunks)
if is_chunked:
chunk_fun = fun
else:
iterable = Chunks(iterable, ratio=5, length=length)
def chunk_fun(iterator, *args, **kwargs):
return [fun(i, *args, **kwargs) for i in iterator]
args = args or () args = args or ()
kwargs = kwargs or {} kwargs = kwargs or {}
try:
length = len(iterable) length = sum(iterable.lengths)
except Exception: if serial is True or (serial is None and len(iterable) < min(cpu_count, 4)): # serial case
pass
if length and length < serial: # serial case
if callable(bar): if callable(bar):
return [fun(c, *args, **kwargs) for c in ExternalBar(iterable, bar)] return sum([chunk_fun(c, *args, **kwargs) for c in ExternalBar(iterable, bar)], [])
else: else:
return [fun(c, *args, **kwargs) for c in tqdm(iterable, total=length, desc=desc, disable=not bar)] return sum([chunk_fun(c, *args, **kwargs)
for c in tqdm(iterable, total=len(iterable), desc=desc, disable=not bar)], [])
else: # parallel case else: # parallel case
chunk = isinstance(iterable, chunks)
if chunk:
length = iterable.number_of_items
with ExternalBar(callback=qbar) if callable(qbar) \ with ExternalBar(callback=qbar) if callable(qbar) \
else tqdmm(total=0, desc='Task buffer', disable=not qbar, leave=False) as qbar, \ else TqdmMeter(total=0, desc='Task buffer', disable=not qbar, leave=False) as qbar, \
ExternalBar(callback=bar) if callable(bar) else tqdm(total=length, desc=desc, disable=not bar) as bar: ExternalBar(callback=bar) if callable(bar) else tqdm(total=length, desc=desc, disable=not bar) as bar:
with parpool(fun, args, kwargs, rP, nP, bar, qbar, terminator, qsize) as p: with Parpool(chunk_fun, args, kwargs, rP, nP, bar, qbar, terminator, qsize) as p:
length = 0 for i, (j, l) in enumerate(zip(iterable, iterable.lengths)): # add work to the queue
for i, j in enumerate(iterable): # add work to the queue
if chunk:
p(j, handle=i, barlength=iterable.lengths[i]) p(j, handle=i, barlength=iterable.lengths[i])
else:
p[i] = j
if bar.total is None or bar.total < i+1: if bar.total is None or bar.total < i+1:
bar.total = i+1 bar.total = i+1
length += 1 if is_chunked:
return [p[i] for i in range(length)] # collect the results return [p[i] for i in range(len(iterable))]
else:
return sum([p[i] for i in range(len(iterable))], []) # collect the results
# backwards compatibility
parpool = Parpool
tqdmm = TqdmMeter
chunks = Chunks

103
parfor/pickler.py Normal file
View File

@@ -0,0 +1,103 @@
import dill
from pickle import PicklingError, dispatch_table
from io import BytesIO
failed_rv = (lambda *args, **kwargs: None, ())
loads = dill.loads
class Pickler(dill.Pickler):
""" Overload dill to ignore unpickleble parts of objects.
You probably didn't want to use these parts anyhow.
However, if you did, you'll have to find some way to make them pickleble.
"""
def save(self, obj, save_persistent_id=True):
""" Copied from pickle and amended. """
self.framer.commit_frame()
# Check for persistent id (defined by a subclass)
pid = self.persistent_id(obj)
if pid is not None and save_persistent_id:
self.save_pers(pid)
return
# Check the memo
x = self.memo.get(id(obj))
if x is not None:
self.write(self.get(x[0]))
return
rv = NotImplemented
reduce = getattr(self, "reducer_override", None)
if reduce is not None:
rv = reduce(obj)
if rv is NotImplemented:
# Check the type dispatch table
t = type(obj)
f = self.dispatch.get(t)
if f is not None:
f(self, obj) # Call unbound method with explicit self
return
# Check private dispatch table if any, or else
# copyreg.dispatch_table
reduce = getattr(self, 'dispatch_table', dispatch_table).get(t)
if reduce is not None:
rv = reduce(obj)
else:
# Check for a class with a custom metaclass; treat as regular
# class
if issubclass(t, type):
self.save_global(obj)
return
# Check for a __reduce_ex__ method, fall back to __reduce__
reduce = getattr(obj, "__reduce_ex__", None)
try:
if reduce is not None:
rv = reduce(self.proto)
else:
reduce = getattr(obj, "__reduce__", None)
if reduce is not None:
rv = reduce()
else:
raise PicklingError("Can't pickle %r object: %r" %
(t.__name__, obj))
except Exception:
rv = failed_rv
# Check for string returned by reduce(), meaning "save as global"
if isinstance(rv, str):
try:
self.save_global(obj, rv)
except Exception:
self.save_global(obj, failed_rv)
return
# Assert that reduce() returned a tuple
if not isinstance(rv, tuple):
raise PicklingError("%s must return string or tuple" % reduce)
# Assert that it returned an appropriately sized tuple
l = len(rv)
if not (2 <= l <= 6):
raise PicklingError("Tuple returned by %s must have "
"two to six elements" % reduce)
# Save the reduce() output and finally memoize the object
try:
self.save_reduce(obj=obj, *rv)
except Exception:
self.save_reduce(obj=obj, *failed_rv)
def dumps(obj, protocol=None, byref=None, fmode=None, recurse=True, **kwds):
"""pickle an object to a string"""
protocol = dill.settings['protocol'] if protocol is None else int(protocol)
_kwds = kwds.copy()
_kwds.update(dict(byref=byref, fmode=fmode, recurse=recurse))
file = BytesIO()
Pickler(file, protocol, **_kwds).dump(obj)
return file.getvalue()

View File

@@ -5,7 +5,7 @@ with open("README.md", "r") as fh:
setuptools.setup( setuptools.setup(
name="parfor", name="parfor",
version="2022.3.1", version="2022.5.0",
author="Wim Pomp", author="Wim Pomp",
author_email="wimpomp@gmail.com", author_email="wimpomp@gmail.com",
description="A package to mimic the use of parfor as done in Matlab.", description="A package to mimic the use of parfor as done in Matlab.",