diff --git a/parfor/__init__.py b/parfor/__init__.py index 46add5e..fc7792b 100644 --- a/parfor/__init__.py +++ b/parfor/__init__.py @@ -10,7 +10,7 @@ from warnings import warn from tqdm.auto import tqdm from . import gil, nogil -from .common import Bar, cpu_count +from .common import Bar, SharedArray, cpu_count if hasattr(sys, '_is_gil_enabled') and not sys._is_gil_enabled(): # noqa from .nogil import ParPool, PoolSingleton, Task, Worker diff --git a/parfor/common.py b/parfor/common.py index b2a043f..b067eb2 100644 --- a/parfor/common.py +++ b/parfor/common.py @@ -1,10 +1,58 @@ from __future__ import annotations import os -from typing import Protocol +from multiprocessing.shared_memory import SharedMemory +from typing import Any, Callable, Protocol, Sequence + +import numpy as np +from numpy.typing import ArrayLike, DTypeLike cpu_count = int(os.cpu_count()) class Bar(Protocol): def update(self, n: int = 1) -> None: ... + + +class SharedArray(np.ndarray): + """ Numpy array whose memory can be shared between processes, so that memory use is reduced and changes in one + process are reflected in all other processes. Changes are not atomic, so protect changes with a lock to prevent + race conditions! + """ + + def __new__(cls, shape: int | Sequence[int], dtype: DTypeLike = float, shm: str | SharedMemory = None, + offset: int = 0, strides: tuple[int, int] = None, order: str = None) -> SharedArray: + if isinstance(shm, str): + shm = SharedMemory(shm) + elif shm is None: + shm = SharedMemory(create=True, size=np.prod(shape) * np.dtype(dtype).itemsize) + new = super().__new__(cls, shape, dtype, shm.buf, offset, strides, order) + new.shm = shm + return new + + def __reduce__(self) -> tuple[Callable[[int | Sequence[int], DTypeLike, str], SharedArray], + tuple[int | tuple[int, ...], np.dtype, str]]: + return self.__class__, (self.shape, self.dtype, self.shm.name) + + def __enter__(self) -> SharedArray: + return self + + def __exit__(self, *args: Any, **kwargs: Any) -> None: + if hasattr(self, 'shm'): + self.shm.close() + self.shm.unlink() + + def __del__(self) -> None: + if hasattr(self, 'shm'): + self.shm.close() + + def __array_finalize__(self, obj: np.ndarray | None) -> None: + if isinstance(obj, np.ndarray) and not isinstance(obj, SharedArray): + raise TypeError('view casting to SharedArray is not implemented because right now we need to make a copy') + + @classmethod + def from_array(cls, array: ArrayLike) -> SharedArray: + """ copy existing array into a SharedArray """ + new = cls(array.shape, array.dtype) + new[:] = array[:] + return new diff --git a/parfor/gil.py b/parfor/gil.py index e30a57d..d672391 100644 --- a/parfor/gil.py +++ b/parfor/gil.py @@ -319,7 +319,10 @@ class PoolSingleton: if pool_id in self.pools: self.pools.pop(pool_id) if len(self.pools) == 0: - self.time_out = asyncio.get_event_loop().call_later(600, self.close) # noqa + try: + self.time_out = asyncio.get_running_loop().call_later(600, self.close) # noqa + except RuntimeError: + self.time_out = asyncio.new_event_loop().call_later(600, self.close) # noqa def error(self, error: Exception) -> NoReturn: self.close() diff --git a/pyproject.toml b/pyproject.toml index 2f3c93d..5d1be80 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "parfor" -version = "2024.12.1" +version = "2025.1.0" description = "A package to mimic the use of parfor as done in Matlab." authors = ["Wim Pomp "] license = "GPLv3" @@ -15,7 +15,7 @@ dill = ">=0.3.0" pytest = { version = "*", optional = true } [tool.poetry.extras] -test = ["pytest"] +test = ["pytest", "numpy"] [tool.isort] line_length = 119 diff --git a/tests/test_parfor.py b/tests/test_parfor.py index 269a275..a0e9708 100644 --- a/tests/test_parfor.py +++ b/tests/test_parfor.py @@ -6,9 +6,10 @@ from os import getpid from time import sleep from typing import Any, Iterator, Optional, Sequence +import numpy as np import pytest -from parfor import Chunks, ParPool, parfor, pmap +from parfor import Chunks, ParPool, SharedArray, parfor, pmap try: if sys._is_gil_enabled(): # noqa @@ -135,3 +136,14 @@ def test_n_processes(n_processes) -> None: return getpid() assert len(set(fun)) == n_processes + + +def test_shared_array() -> None: + def fun(i, a): + a[i] = i + + with SharedArray(100, int) as arr: + pmap(fun, range(len(arr)), (arr,)) + b = np.array(arr) + + assert np.all(b == np.arange(len(arr)))