- differentiate use of nP into nP and rP

- default to using one process per cpu core
- update requirements with versions
This commit is contained in:
Wim Pomp
2021-01-04 12:25:19 +01:00
parent b979a0c72a
commit 0721e2125c
2 changed files with 20 additions and 16 deletions

View File

@@ -151,7 +151,7 @@ class tqdmm(tqdm):
super(tqdmm, self).__exit__(exc_type, exc_value, traceback) super(tqdmm, self).__exit__(exc_type, exc_value, traceback)
def parfor(*args, **kwargs): def parfor(*args, **kwargs):
""" @parfor(iterator=None, args=(), kwargs={}, length=None, desc=None, bar=True, qbar=True, np1/3 serial=4, debug=False): """ @parfor(iterator=None, args=(), kwargs={}, length=None, desc=None, bar=True, qbar=True, rP=1/3, serial=4, debug=False):
decorator to parallize for-loops decorator to parallize for-loops
required arguments: required arguments:
@@ -165,7 +165,8 @@ def parfor(*args, **kwargs):
desc: string with description of the progress bar desc: string with description of the progress bar
bar: bool enable progress bar bar: bool enable progress bar
pbar: bool enable buffer indicator bar pbar: bool enable buffer indicator bar
nP: number of workers, default: number of cpu's/3 rP: ratio workers to cpu cores, default: 1
nP: number of workers, default: None, overrides rP if not None
serial: switch to serial if number of tasks less than serial, default: 4 serial: switch to serial if number of tasks less than serial, default: 4
debug: if an error occurs in an iteration, return the erorr instead of retrying in the main process debug: if an error occurs in an iteration, return the erorr instead of retrying in the main process
@@ -238,14 +239,16 @@ class parpool(object):
""" Parallel processing with addition of iterations at any time and request of that result any time after that. """ Parallel processing with addition of iterations at any time and request of that result any time after that.
The target function and its argument can be changed at any time. The target function and its argument can be changed at any time.
""" """
def __init__(self, fun=None, args=None, kwargs=None, nP=0.33, bar=None, qbar=None, terminator=None, debug=False): def __init__(self, fun=None, args=None, kwargs=None, rP=None, nP=None, bar=None, qbar=None, terminator=None,
""" fun, args, kwargs: target function and its arguments and keyword arguments. debug=False):
nP: number of workers, or fraction of cpu cores to use, default: 0.33. """ fun, args, kwargs: target function and its arguments and keyword arguments
bar, qbar: instances of tqdm and tqdmm to use for monitoring buffer and progress. """ rP: ratio workers to cpu cores, default: 1
if nP is None: nP: number of workers, default, None, overrides rP if not None
self.nP = int(multiprocessing.cpu_count()/3) bar, qbar: instances of tqdm and tqdmm to use for monitoring buffer and progress """
elif nP<1: if rP is None and nP is None:
self.nP = int(nP*multiprocessing.cpu_count()) self.nP = int(multiprocessing.cpu_count())
elif nP is None:
self.nP = int(round(rP * multiprocessing.cpu_count()))
else: else:
self.nP = int(nP) self.nP = int(nP)
self.fun = fun or (lambda x: x) self.fun = fun or (lambda x: x)
@@ -267,7 +270,7 @@ class parpool(object):
self.bar = bar self.bar = bar
self.qbar = qbar self.qbar = qbar
if not self.qbar is None: if not self.qbar is None:
self.qbar.total = 3*nP self.qbar.total = 3*self.nP
@property @property
def fun(self): def fun(self):
@@ -445,7 +448,7 @@ class parpool(object):
return obj return obj
def pmap(fun, iterable=None, args=None, kwargs=None, length=None, desc=None, bar=True, qbar=False, terminator=None, def pmap(fun, iterable=None, args=None, kwargs=None, length=None, desc=None, bar=True, qbar=False, terminator=None,
nP=0.33, serial=4, debug=False): rP=1, nP=None, serial=4, debug=False):
""" map a function fun to each iteration in iterable """ map a function fun to each iteration in iterable
best use: iterable is a generator and length is given to this function best use: iterable is a generator and length is given to this function
@@ -458,7 +461,8 @@ def pmap(fun, iterable=None, args=None, kwargs=None, length=None, desc=None, bar
bar: bool enable progress bar bar: bool enable progress bar
pbar: bool enable buffer indicator bar pbar: bool enable buffer indicator bar
terminator: function which is executed in each worker after all the work is done terminator: function which is executed in each worker after all the work is done
nP: number of workers, default: number of cpu's/3 rP: ratio workers to cpu cores, default: 1
nP: number of workers, default, None, overrides rP if not None
serial: switch to serial if number of tasks less than serial, default: 4 serial: switch to serial if number of tasks less than serial, default: 4
debug: if an error occurs in an iteration, return the erorr instead of retrying in the main process debug: if an error occurs in an iteration, return the erorr instead of retrying in the main process
""" """
@@ -473,7 +477,7 @@ def pmap(fun, iterable=None, args=None, kwargs=None, length=None, desc=None, bar
else: #parallel case else: #parallel case
with tqdmm(total=0, desc='Task buffer', disable=not qbar, leave=False) as qbar,\ with tqdmm(total=0, desc='Task buffer', disable=not qbar, leave=False) as qbar,\
tqdm(total=length, desc=desc, disable=not bar) as bar: tqdm(total=length, desc=desc, disable=not bar) as bar:
with parpool(fun, args, kwargs, nP, bar, qbar, terminator, debug) as p: with parpool(fun, args, kwargs, rP, nP, bar, qbar, terminator, debug) as p:
length = 0 length = 0
for i, j in enumerate(iterable): #add work to the queue for i, j in enumerate(iterable): #add work to the queue
p[i] = j p[i] = j

View File

@@ -5,7 +5,7 @@ with open("README.md", "r") as fh:
setuptools.setup( setuptools.setup(
name="parfor", name="parfor",
version="2020.12.28", version="2021.1.4",
author="Wim Pomp", author="Wim Pomp",
author_email="wimpomp@gmail.com", author_email="wimpomp@gmail.com",
description="A package to mimic the use of parfor as done in Matlab.", description="A package to mimic the use of parfor as done in Matlab.",
@@ -20,5 +20,5 @@ setuptools.setup(
"Operating System :: OS Independent", "Operating System :: OS Independent",
], ],
python_requires='>=2.7', python_requires='>=2.7',
install_requires=['tqdm', 'dill'], install_requires=['tqdm>=4.50.0', 'dill>=0.3.0'],
) )