From 32c5560306085ee4320d7b143c02c66e3a185808 Mon Sep 17 00:00:00 2001 From: Wim Pomp Date: Tue, 3 May 2022 15:55:11 +0200 Subject: [PATCH] - rename some things to conform to PEP - pmap and parfor now automatically divide the task in chunks: less pickling is needed this way - pickle code in a new file: prevent some unpickleble things from getting pickled - Chunks now supports generators when the length keyword argument is supplied - TqdmMeter can have a description --- README.md | 2 +- parfor/__init__.py | 285 +++++++++++++++++---------------------------- parfor/pickler.py | 103 ++++++++++++++++ setup.py | 4 +- 4 files changed, 211 insertions(+), 183 deletions(-) create mode 100644 parfor/pickler.py diff --git a/README.md b/README.md index 6255434..e067ae0 100644 --- a/README.md +++ b/README.md @@ -162,5 +162,5 @@ More low-level accessibility to parallel execution. Submit tasks and request the (although necessarily submit first, then request a specific task), use different functions and function arguments for different tasks. -## Tqdmm +## TqdmMeter Meter bar, inherited from tqdm, used for displaying buffers. diff --git a/parfor/__init__.py b/parfor/__init__.py index 9f753cc..ec1fd45 100644 --- a/parfor/__init__.py +++ b/parfor/__init__.py @@ -1,161 +1,81 @@ import multiprocessing -import dill from os import getpid from tqdm.auto import tqdm from traceback import format_exc -from pickle import PicklingError, dispatch_table from psutil import Process from collections import OrderedDict -from io import BytesIO from warnings import warn +from .pickler import Pickler, dumps, loads + try: from javabridge import kill_vm except ImportError: kill_vm = lambda: None -failed_rv = (lambda *args, **kwargs: None, ()) + cpu_count = int(multiprocessing.cpu_count()) -class Pickler(dill.Pickler): - """ Overload dill to ignore unpickleble parts of objects. - You probably didn't want to use these parts anyhow. - However, if you did, you'll have to find some way to make them pickleble. - """ - def save(self, obj, save_persistent_id=True): - """ Copied from pickle and amended. """ - self.framer.commit_frame() - - # Check for persistent id (defined by a subclass) - pid = self.persistent_id(obj) - if pid is not None and save_persistent_id: - self.save_pers(pid) - return - - # Check the memo - x = self.memo.get(id(obj)) - if x is not None: - self.write(self.get(x[0])) - return - - rv = NotImplemented - reduce = getattr(self, "reducer_override", None) - if reduce is not None: - rv = reduce(obj) - - if rv is NotImplemented: - # Check the type dispatch table - t = type(obj) - f = self.dispatch.get(t) - if f is not None: - f(self, obj) # Call unbound method with explicit self - return - - # Check private dispatch table if any, or else - # copyreg.dispatch_table - reduce = getattr(self, 'dispatch_table', dispatch_table).get(t) - if reduce is not None: - rv = reduce(obj) - else: - # Check for a class with a custom metaclass; treat as regular - # class - if issubclass(t, type): - self.save_global(obj) - return - - # Check for a __reduce_ex__ method, fall back to __reduce__ - reduce = getattr(obj, "__reduce_ex__", None) - try: - if reduce is not None: - rv = reduce(self.proto) - else: - reduce = getattr(obj, "__reduce__", None) - if reduce is not None: - rv = reduce() - else: - raise PicklingError("Can't pickle %r object: %r" % - (t.__name__, obj)) - except Exception: - rv = failed_rv - - # Check for string returned by reduce(), meaning "save as global" - if isinstance(rv, str): - try: - self.save_global(obj, rv) - except Exception: - self.save_global(obj, failed_rv) - return - - # Assert that reduce() returned a tuple - if not isinstance(rv, tuple): - raise PicklingError("%s must return string or tuple" % reduce) - - # Assert that it returned an appropriately sized tuple - l = len(rv) - if not (2 <= l <= 6): - raise PicklingError("Tuple returned by %s must have " - "two to six elements" % reduce) - - # Save the reduce() output and finally memoize the object - try: - self.save_reduce(obj=obj, *rv) - except Exception: - self.save_reduce(obj=obj, *failed_rv) - - -def dumps(obj, protocol=None, byref=None, fmode=None, recurse=True, **kwds): - """pickle an object to a string""" - protocol = dill.settings['protocol'] if protocol is None else int(protocol) - _kwds = kwds.copy() - _kwds.update(dict(byref=byref, fmode=fmode, recurse=recurse)) - file = BytesIO() - Pickler(file, protocol, **_kwds).dump(obj) - return file.getvalue() - - -class chunks(object): +class Chunks: """ Yield successive chunks from lists. - Usage: chunks(s, list0, list1, ...) - chunks(list0, list1, ..., s=s) - chunks(list0, list1, ..., n=n) - chunks(list0, list1, ..., r=r) - s: size of chunks, might change to optimize devision between chunks - n: number of chunks, coerced to 1 <= n <= len(list0) - r: number of chunks / number of cpus, coerced to 1 <= n <= len(list0) - both s and n or r are given: use n or r, unless the chunk size would be bigger than s - both r and n are given: use n + Usage: chunks(list0, list1, ...) + chunks(list0, list1, ..., size=s) + chunks(list0, list1, ..., number=n) + chunks(list0, list1, ..., ratio=r) + size: size of chunks, might change to optimize devision between chunks + number: number of chunks, coerced to 1 <= n <= len(list0) + ratio: number of chunks / number of cpus, coerced to 1 <= n <= len(list0) + both size and number or ratio are given: use number or ratio, unless the chunk size would be bigger than size + both ratio and number are given: use ratio """ - def __init__(self, *args, **kwargs): - if 's' in kwargs and ('n' in kwargs or 'r' in kwargs): - number_of_items = min(*[len(a) for a in args]) if len(args) > 1 else len(args[0]) - n = kwargs['n'] if 'n' in kwargs else int(cpu_count * kwargs['r']) - n = n if number_of_items < kwargs['s'] * n else round(number_of_items / kwargs['s']) - elif 's' in kwargs: # size of chunks - number_of_items = min(*[len(a) for a in args]) if len(args) > 1 else len(args[0]) - n = round(number_of_items / kwargs['s']) - elif 'n' in kwargs or 'r' in kwargs: # number of chunks - number_of_items = min(*[len(a) for a in args]) if len(args) > 1 else len(args[0]) - n = kwargs['n'] if 'n' in kwargs else int(cpu_count * kwargs['r']) - else: # size of chunks in 1st argument - s, *args = args - number_of_items = min(*[len(a) for a in args]) if len(args) > 1 else len(args[0]) - n = round(number_of_items / s) + def __init__(self, *args, size=None, number=None, ratio=None, length=None, s=None, n=None, r=None): + # s, r and n are deprecated + if s is not None: + size = s + if n is not None: + number = n + if r is not None: + ratio = r + if length is None: + try: + length = min(*[len(a) for a in args]) if len(args) > 1 else len(args[0]) + except TypeError: + raise TypeError('Cannot determine the length of the argument so the length must be provided as an' + ' argument.') + if size is not None and (number is not None or ratio is not None): + if number is None: + number = int(cpu_count * ratio) + if length >= size * number: + number = round(length / size) + elif size is not None: # size of chunks + number = round(length / size) + elif ratio is not None: # number of chunks + number = int(cpu_count * ratio) self.args = args - self.number_of_arguments = len(args) == 1 - self.number_of_items = number_of_items - self.len = max(1, min(number_of_items, n)) - self.lengths = [((i + 1) * self.number_of_items // self.len) - (i * self.number_of_items // self.len) - for i in range(self.len)] + self.number_of_items = length + self.length = max(1, min(length, number)) + self.lengths = [((i + 1) * self.number_of_items // self.length) - (i * self.number_of_items // self.length) + for i in range(self.length)] def __iter__(self): - for i in range(self.len): - p, q = (i * self.number_of_items // self.len), ((i + 1) * self.number_of_items // self.len) - yield self.args[0][p:q] if self.number_of_arguments else [a[p:q] for a in self.args] + for i in range(self.length): + p, q = (i * self.number_of_items // self.length), ((i + 1) * self.number_of_items // self.length) + if len(self.args) == 1: + yield self._yielder(self.args[0], p, q) + else: + yield [self._yielder(arg, p, q) for arg in self.args] + + @staticmethod + def _yielder(arg, p, q): + try: + return arg[p:q] + except TypeError: + return [next(arg) for _ in range(q-p)] def __len__(self): - return self.len + return self.length class ExternalBar: @@ -191,18 +111,15 @@ class ExternalBar: self.callback(n) -External_bar = ExternalBar - - -class tqdmm(tqdm): +class TqdmMeter(tqdm): """ Overload tqdm to make a special version of tqdm functioning as a meter. """ def __init__(self, *args, **kwargs): self._n = 0 self.disable = False if 'bar_format' not in kwargs and len(args) < 16: - kwargs['bar_format'] = '{n}/{total}' - super(tqdmm, self).__init__(*args, **kwargs) + kwargs['bar_format'] = '{desc}{bar}{n}/{total}' + super().__init__(*args, **kwargs) @property def n(self): @@ -217,7 +134,7 @@ class tqdmm(tqdm): def __exit__(self, exc_type=None, exc_value=None, traceback=None): if not self.leave: self.n = self.total - super(tqdmm, self).__exit__(exc_type, exc_value, traceback) + super().__exit__(exc_type, exc_value, traceback) def parfor(*args, **kwargs): @@ -238,7 +155,7 @@ def parfor(*args, **kwargs): rP: ratio workers to cpu cores, default: 1 nP: number of workers, default: None, overrides rP if not None number of workers will always be at least 2 - serial: switch to serial if number of tasks less than serial, default: 4 + serial: execute in series instead of parallel if True, None (default): let parfor decide output: list with results from applying the decorated function to each iteration of the iterator specified as the first argument to the function @@ -306,7 +223,7 @@ def parfor(*args, **kwargs): return decfun -class Hasher(object): +class Hasher: def __init__(self, obj, hsh=None): if hsh is not None: self.obj, self.str, self.hash = None, obj, hsh @@ -322,14 +239,14 @@ class Hasher(object): def set_from_cache(self, cache=None): if cache is None: - self.obj = dill.loads(self.str) + self.obj = loads(self.str) elif self.hash in cache: self.obj = cache[self.hash] else: - self.obj = cache[self.hash] = dill.loads(self.str) + self.obj = cache[self.hash] = loads(self.str) -class HashDescriptor(object): +class HashDescriptor: def __set_name__(self, owner, name): self.owner, self.name = owner, '_' + name @@ -343,25 +260,25 @@ class HashDescriptor(object): return getattr(instance, self.name).obj -class deque_dict(OrderedDict): +class DequeDict(OrderedDict): def __init__(self, maxlen=None, *args, **kwargs): self.maxlen = maxlen - super(deque_dict, self).__init__(*args, **kwargs) + super().__init__(*args, **kwargs) def __truncate__(self): while len(self) > self.maxlen: self.popitem(False) def __setitem__(self, *args, **kwargs): - super(deque_dict, self).__setitem__(*args, **kwargs) + super().__setitem__(*args, **kwargs) self.__truncate__() def update(self, *args, **kwargs): - super(deque_dict, self).update(*args, **kwargs) + super().update(*args, **kwargs) self.__truncate__() -class Task(object): +class Task: fun = HashDescriptor() args = HashDescriptor() kwargs = HashDescriptor() @@ -373,7 +290,7 @@ class Task(object): self.handle = handle self.n = n self.done = done - self.result = dill.loads(result) if self.done else None + self.result = loads(result) if self.done else None self.pid = None def __reduce__(self): @@ -384,7 +301,7 @@ class Task(object): self.done) def set_from_cache(self, cache=None): - self.n = dill.loads(self.n) + self.n = loads(self.n) self._fun.set_from_cache(cache) self._args.set_from_cache(cache) self._kwargs.set_from_cache(cache) @@ -402,7 +319,7 @@ class Task(object): return 'Task {}'.format(self.handle) -class parpool(object): +class Parpool: """ Parallel processing with addition of iterations at any time and request of that result any time after that. The target function and its argument can be changed at any time. """ @@ -429,7 +346,7 @@ class parpool(object): self.event = ctx.Event() self.queue_in = ctx.Queue(qsize or 3 * self.nP) self.queue_out = ctx.Queue(qsize or 12 * self.nP) - self.pool = ctx.Pool(self.nP, self._worker(self.queue_in, self.queue_out, self.n_tasks, self.event, terminator)) + self.pool = ctx.Pool(self.nP, self._Worker(self.queue_in, self.queue_out, self.n_tasks, self.event, terminator)) self.is_alive = True self.handle = 0 self.tasks = {} @@ -592,10 +509,10 @@ class parpool(object): queue.close() queue.join_thread() - class _worker(object): + class _Worker(object): """ Manages executing the target function which will be executed in different processes. """ def __init__(self, queue_in, queue_out, n_tasks, event, terminator, cachesize=48): - self.cache = deque_dict(cachesize) + self.cache = DequeDict(cachesize) self.queue_in = queue_in self.queue_out = queue_out self.n_tasks = n_tasks @@ -627,7 +544,7 @@ class parpool(object): except Exception: self.add_to_queue('error', format_exc()) self.event.set() - terminator = dill.loads(self.terminator) + terminator = loads(self.terminator) kill_vm() if terminator is not None: terminator() @@ -636,7 +553,7 @@ class parpool(object): def pmap(fun, iterable=None, args=None, kwargs=None, length=None, desc=None, bar=True, qbar=False, terminator=None, - rP=1, nP=None, serial=4, qsize=None): + rP=1, nP=None, serial=None, qsize=None): """ map a function fun to each iteration in iterable best use: iterable is a generator and length is given to this function @@ -651,34 +568,42 @@ def pmap(fun, iterable=None, args=None, kwargs=None, length=None, desc=None, bar terminator: function which is executed in each worker after all the work is done rP: ratio workers to cpu cores, default: 1 nP: number of workers, default, None, overrides rP if not None - serial: switch to serial if number of tasks less than serial, default: 4 + serial: execute in series instead of parallel if True, None (default): let pmap decide """ + is_chunked = isinstance(iterable, Chunks) + if is_chunked: + chunk_fun = fun + else: + iterable = Chunks(iterable, ratio=5, length=length) + def chunk_fun(iterator, *args, **kwargs): + return [fun(i, *args, **kwargs) for i in iterator] + args = args or () kwargs = kwargs or {} - try: - length = len(iterable) - except Exception: - pass - if length and length < serial: # serial case + + length = sum(iterable.lengths) + if serial is True or (serial is None and len(iterable) < min(cpu_count, 4)): # serial case if callable(bar): - return [fun(c, *args, **kwargs) for c in ExternalBar(iterable, bar)] + return sum([chunk_fun(c, *args, **kwargs) for c in ExternalBar(iterable, bar)], []) else: - return [fun(c, *args, **kwargs) for c in tqdm(iterable, total=length, desc=desc, disable=not bar)] + return sum([chunk_fun(c, *args, **kwargs) + for c in tqdm(iterable, total=len(iterable), desc=desc, disable=not bar)], []) else: # parallel case - chunk = isinstance(iterable, chunks) - if chunk: - length = iterable.number_of_items with ExternalBar(callback=qbar) if callable(qbar) \ - else tqdmm(total=0, desc='Task buffer', disable=not qbar, leave=False) as qbar, \ + else TqdmMeter(total=0, desc='Task buffer', disable=not qbar, leave=False) as qbar, \ ExternalBar(callback=bar) if callable(bar) else tqdm(total=length, desc=desc, disable=not bar) as bar: - with parpool(fun, args, kwargs, rP, nP, bar, qbar, terminator, qsize) as p: - length = 0 - for i, j in enumerate(iterable): # add work to the queue - if chunk: - p(j, handle=i, barlength=iterable.lengths[i]) - else: - p[i] = j + with Parpool(chunk_fun, args, kwargs, rP, nP, bar, qbar, terminator, qsize) as p: + for i, (j, l) in enumerate(zip(iterable, iterable.lengths)): # add work to the queue + p(j, handle=i, barlength=iterable.lengths[i]) if bar.total is None or bar.total < i+1: bar.total = i+1 - length += 1 - return [p[i] for i in range(length)] # collect the results + if is_chunked: + return [p[i] for i in range(len(iterable))] + else: + return sum([p[i] for i in range(len(iterable))], []) # collect the results + + +# backwards compatibility +parpool = Parpool +tqdmm = TqdmMeter +chunks = Chunks diff --git a/parfor/pickler.py b/parfor/pickler.py new file mode 100644 index 0000000..e80fdb4 --- /dev/null +++ b/parfor/pickler.py @@ -0,0 +1,103 @@ +import dill +from pickle import PicklingError, dispatch_table +from io import BytesIO + + +failed_rv = (lambda *args, **kwargs: None, ()) +loads = dill.loads + + +class Pickler(dill.Pickler): + """ Overload dill to ignore unpickleble parts of objects. + You probably didn't want to use these parts anyhow. + However, if you did, you'll have to find some way to make them pickleble. + """ + def save(self, obj, save_persistent_id=True): + """ Copied from pickle and amended. """ + self.framer.commit_frame() + + # Check for persistent id (defined by a subclass) + pid = self.persistent_id(obj) + if pid is not None and save_persistent_id: + self.save_pers(pid) + return + + # Check the memo + x = self.memo.get(id(obj)) + if x is not None: + self.write(self.get(x[0])) + return + + rv = NotImplemented + reduce = getattr(self, "reducer_override", None) + if reduce is not None: + rv = reduce(obj) + + if rv is NotImplemented: + # Check the type dispatch table + t = type(obj) + f = self.dispatch.get(t) + if f is not None: + f(self, obj) # Call unbound method with explicit self + return + + # Check private dispatch table if any, or else + # copyreg.dispatch_table + reduce = getattr(self, 'dispatch_table', dispatch_table).get(t) + if reduce is not None: + rv = reduce(obj) + else: + # Check for a class with a custom metaclass; treat as regular + # class + if issubclass(t, type): + self.save_global(obj) + return + + # Check for a __reduce_ex__ method, fall back to __reduce__ + reduce = getattr(obj, "__reduce_ex__", None) + try: + if reduce is not None: + rv = reduce(self.proto) + else: + reduce = getattr(obj, "__reduce__", None) + if reduce is not None: + rv = reduce() + else: + raise PicklingError("Can't pickle %r object: %r" % + (t.__name__, obj)) + except Exception: + rv = failed_rv + + # Check for string returned by reduce(), meaning "save as global" + if isinstance(rv, str): + try: + self.save_global(obj, rv) + except Exception: + self.save_global(obj, failed_rv) + return + + # Assert that reduce() returned a tuple + if not isinstance(rv, tuple): + raise PicklingError("%s must return string or tuple" % reduce) + + # Assert that it returned an appropriately sized tuple + l = len(rv) + if not (2 <= l <= 6): + raise PicklingError("Tuple returned by %s must have " + "two to six elements" % reduce) + + # Save the reduce() output and finally memoize the object + try: + self.save_reduce(obj=obj, *rv) + except Exception: + self.save_reduce(obj=obj, *failed_rv) + + +def dumps(obj, protocol=None, byref=None, fmode=None, recurse=True, **kwds): + """pickle an object to a string""" + protocol = dill.settings['protocol'] if protocol is None else int(protocol) + _kwds = kwds.copy() + _kwds.update(dict(byref=byref, fmode=fmode, recurse=recurse)) + file = BytesIO() + Pickler(file, protocol, **_kwds).dump(obj) + return file.getvalue() diff --git a/setup.py b/setup.py index e244632..14f26a4 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ with open("README.md", "r") as fh: setuptools.setup( name="parfor", - version="2022.3.1", + version="2022.5.0", author="Wim Pomp", author_email="wimpomp@gmail.com", description="A package to mimic the use of parfor as done in Matlab.", @@ -20,4 +20,4 @@ setuptools.setup( ], python_requires='>=3.5', install_requires=['tqdm>=4.50.0', 'dill>=0.3.0', 'psutil'], -) \ No newline at end of file +)