From 0b0c7a798f873902384d47abd4d62dfe16cf4458 Mon Sep 17 00:00:00 2001 From: Wim Pomp Date: Sat, 18 Nov 2023 13:38:01 +0100 Subject: [PATCH] - Shutdown workers after 10 minutes of inactivity. Restart pool when needed again. --- parfor/__init__.py | 14 +++++++++++--- pyproject.toml | 2 +- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/parfor/__init__.py b/parfor/__init__.py index bdb1dc8..f917aa5 100644 --- a/parfor/__init__.py +++ b/parfor/__init__.py @@ -3,6 +3,7 @@ from collections import UserDict from contextlib import ExitStack from functools import wraps from os import getpid +from time import time from traceback import format_exc from warnings import warn @@ -18,7 +19,7 @@ class SharedMemory(UserDict): super().__init__() self.data = manager.dict() # item_id: dilled representation of object self.references = manager.dict() # item_id: counter - self.references_lock = manager.RLock() + self.references_lock = manager.Lock() self.cache = {} # item_id: object self.trash_can = {} self.pool_ids = {} # item_id: {(pool_id, task_handle), ...} @@ -362,7 +363,12 @@ class ParPool: class PoolSingleton: + """ There can be only one pool at a time, but the pool can be restarted by calling close() and then constructing a + new pool. The pool will close itself after 10 minutes of idle time. """ def __new__(cls, *args, **kwargs): + if hasattr(cls, 'instance') and cls.instance is not None: # noqa restart if any workers have shut down + if cls.instance.n_workers.value < cls.instance.n_processes: + cls.instance.close() if not hasattr(cls, 'instance') or cls.instance is None or not cls.instance.is_alive: # noqa new = super().__new__(cls) new.n_processes = cpu_count @@ -493,7 +499,8 @@ class Worker: def __call__(self): pid = getpid() - while not self.event.is_set(): + last_active_time = time() + while not self.event.is_set() and time() - last_active_time < 600: try: task = self.queue_in.get(True, 0.02) try: @@ -502,6 +509,7 @@ class Worker: except Exception: # noqa self.add_to_queue('task_error', task.pool_id, task.handle, format_exc()) self.shared_memory.garbage_collect() + last_active_time = time() except (multiprocessing.queues.Empty, KeyboardInterrupt): # noqa pass except Exception: # noqa @@ -510,7 +518,7 @@ class Worker: self.shared_memory.garbage_collect() for child in multiprocessing.active_children(): child.kill() - with self.n_workers.get_lock(): + with self.n_workers: self.n_workers.value -= 1 diff --git a/pyproject.toml b/pyproject.toml index 86fa1e1..3e73831 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "parfor" -version = "2023.11.2" +version = "2023.11.3" description = "A package to mimic the use of parfor as done in Matlab." authors = ["Wim Pomp "] license = "GPLv3"