diff --git a/parfor/__init__.py b/parfor/__init__.py index 718ff78..d0c4e20 100644 --- a/parfor/__init__.py +++ b/parfor/__init__.py @@ -2,6 +2,7 @@ from __future__ import annotations import logging import os +from collections import OrderedDict from contextlib import ExitStack, redirect_stderr, redirect_stdout from functools import wraps from importlib.metadata import version @@ -231,6 +232,28 @@ def get_worker(n_processes) -> RemoteFunction: return ray.remote(worker) # type: ignore +class DequeDict(OrderedDict): + def __init__(self, maxlen: int = None, *args: Any, **kwargs: Any) -> None: + self.maxlen = maxlen + super().__init__(*args, **kwargs) + + def __setitem__(self, *args: Any, **kwargs: Any) -> None: + super().__setitem__(*args, **kwargs) + self.truncate() + + def truncate(self) -> None: + if self.maxlen is not None: + while len(self) > self.maxlen: + self.popitem(False) + + def update(self, *args: Any, **kwargs: Any) -> None: + super().update(*args, **kwargs) # type: ignore + self.truncate() + + +cache = DequeDict(128) + + class Task: def __init__( self, @@ -263,9 +286,20 @@ class Task: @staticmethod def put(item: Any) -> tuple[bool, Any]: try: - return False, ray.put(item) - except Exception: # noqa - return True, ray.put(dumps(item, recurse=True)) + h = hash(item) + if not h in cache: + try: + cache[h] = False, ray.put(item) + except Exception: # noqa + cache[h] = True, ray.put(dumps(item, recurse=True)) + else: + cache.move_to_end(h) + return cache[h] + except TypeError: + try: + return False, ray.put(item) + except Exception: # noqa + return True, ray.put(dumps(item, recurse=True)) @property def fun(self) -> Callable[[Any, ...], Any]: @@ -344,7 +378,7 @@ class ParPool: return self def __exit__(self, *args: Any, **kwargs: Any) -> None: - pass + self.close() def __call__(self, n: Any, handle: Hashable = None, barlength: int = 1) -> None: self.add_task( @@ -354,7 +388,7 @@ class ParPool: ) def close(self) -> None: - pass + cache.clear() def add_task( self, diff --git a/pyproject.toml b/pyproject.toml index 0c29524..91629c3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,11 +1,11 @@ [project] name = "parfor" -version = "2026.2.1" +version = "2026.2.2" description = "A package to mimic the use of parfor as done in Matlab." authors = [ { name = "Wim Pomp-Pervova", email = "wimpomp@gmail.com" } ] -license = { text = "MIT"} +license = { text = "MIT" } readme = "README.md" keywords = ["parfor", "concurrency", "multiprocessing", "parallel"] requires-python = ">=3.10"