- add SharedArray to share numpy arrays

- fix async call_later when no event loop is available
This commit is contained in:
Wim Pomp
2025-01-06 18:01:55 +01:00
parent eb92ce006d
commit 326a06f9da
5 changed files with 68 additions and 5 deletions

View File

@@ -10,7 +10,7 @@ from warnings import warn
from tqdm.auto import tqdm from tqdm.auto import tqdm
from . import gil, nogil from . import gil, nogil
from .common import Bar, cpu_count from .common import Bar, SharedArray, cpu_count
if hasattr(sys, '_is_gil_enabled') and not sys._is_gil_enabled(): # noqa if hasattr(sys, '_is_gil_enabled') and not sys._is_gil_enabled(): # noqa
from .nogil import ParPool, PoolSingleton, Task, Worker from .nogil import ParPool, PoolSingleton, Task, Worker

View File

@@ -1,10 +1,58 @@
from __future__ import annotations from __future__ import annotations
import os import os
from typing import Protocol from multiprocessing.shared_memory import SharedMemory
from typing import Any, Callable, Protocol, Sequence
import numpy as np
from numpy.typing import ArrayLike, DTypeLike
cpu_count = int(os.cpu_count()) cpu_count = int(os.cpu_count())
class Bar(Protocol): class Bar(Protocol):
def update(self, n: int = 1) -> None: ... def update(self, n: int = 1) -> None: ...
class SharedArray(np.ndarray):
""" Numpy array whose memory can be shared between processes, so that memory use is reduced and changes in one
process are reflected in all other processes. Changes are not atomic, so protect changes with a lock to prevent
race conditions!
"""
def __new__(cls, shape: int | Sequence[int], dtype: DTypeLike = float, shm: str | SharedMemory = None,
offset: int = 0, strides: tuple[int, int] = None, order: str = None) -> SharedArray:
if isinstance(shm, str):
shm = SharedMemory(shm)
elif shm is None:
shm = SharedMemory(create=True, size=np.prod(shape) * np.dtype(dtype).itemsize)
new = super().__new__(cls, shape, dtype, shm.buf, offset, strides, order)
new.shm = shm
return new
def __reduce__(self) -> tuple[Callable[[int | Sequence[int], DTypeLike, str], SharedArray],
tuple[int | tuple[int, ...], np.dtype, str]]:
return self.__class__, (self.shape, self.dtype, self.shm.name)
def __enter__(self) -> SharedArray:
return self
def __exit__(self, *args: Any, **kwargs: Any) -> None:
if hasattr(self, 'shm'):
self.shm.close()
self.shm.unlink()
def __del__(self) -> None:
if hasattr(self, 'shm'):
self.shm.close()
def __array_finalize__(self, obj: np.ndarray | None) -> None:
if isinstance(obj, np.ndarray) and not isinstance(obj, SharedArray):
raise TypeError('view casting to SharedArray is not implemented because right now we need to make a copy')
@classmethod
def from_array(cls, array: ArrayLike) -> SharedArray:
""" copy existing array into a SharedArray """
new = cls(array.shape, array.dtype)
new[:] = array[:]
return new

View File

@@ -319,7 +319,10 @@ class PoolSingleton:
if pool_id in self.pools: if pool_id in self.pools:
self.pools.pop(pool_id) self.pools.pop(pool_id)
if len(self.pools) == 0: if len(self.pools) == 0:
self.time_out = asyncio.get_event_loop().call_later(600, self.close) # noqa try:
self.time_out = asyncio.get_running_loop().call_later(600, self.close) # noqa
except RuntimeError:
self.time_out = asyncio.new_event_loop().call_later(600, self.close) # noqa
def error(self, error: Exception) -> NoReturn: def error(self, error: Exception) -> NoReturn:
self.close() self.close()

View File

@@ -1,6 +1,6 @@
[tool.poetry] [tool.poetry]
name = "parfor" name = "parfor"
version = "2024.12.1" version = "2025.1.0"
description = "A package to mimic the use of parfor as done in Matlab." description = "A package to mimic the use of parfor as done in Matlab."
authors = ["Wim Pomp <wimpomp@gmail.com>"] authors = ["Wim Pomp <wimpomp@gmail.com>"]
license = "GPLv3" license = "GPLv3"

View File

@@ -6,9 +6,10 @@ from os import getpid
from time import sleep from time import sleep
from typing import Any, Iterator, Optional, Sequence from typing import Any, Iterator, Optional, Sequence
import numpy as np
import pytest import pytest
from parfor import Chunks, ParPool, parfor, pmap from parfor import Chunks, ParPool, SharedArray, parfor, pmap
try: try:
if sys._is_gil_enabled(): # noqa if sys._is_gil_enabled(): # noqa
@@ -135,3 +136,14 @@ def test_n_processes(n_processes) -> None:
return getpid() return getpid()
assert len(set(fun)) == n_processes assert len(set(fun)) == n_processes
def test_shared_array() -> None:
def fun(i, a):
a[i] = i
with SharedArray(100, int) as arr:
pmap(fun, range(len(arr)), (arr,))
b = np.array(arr)
assert np.all(b == np.arange(len(arr)))