멀티 프로세싱의 공유 메모리 개체
큰 메모리 numpy 배열을 가지고 있다고 가정 func
하면이 거대한 배열을 입력으로 사용 하는 함수 가 있습니다 (다른 매개 변수와 함께). func
다른 매개 변수를 사용하여 병렬로 실행할 수 있습니다. 예를 들면 다음과 같습니다.
def func(arr, param):
# do stuff to arr, param
# build array arr
pool = Pool(processes = 6)
results = [pool.apply_async(func, [arr, param]) for param in all_params]
output = [res.get() for res in results]
다중 처리 라이브러리를 사용하면 거대한 배열이 여러 번 다른 프로세스로 복사됩니다.
다른 프로세스가 동일한 배열을 공유 할 수있는 방법이 있습니까? 이 배열 객체는 읽기 전용이며 수정되지 않습니다.
arr이 배열이 아니라 임의의 파이썬 객체 인 경우 더 복잡한 것은 무엇입니까?
[편집 됨]
나는 대답을 읽었지만 여전히 약간 혼란 스럽다. fork ()는 COW (Copy-On-Write)이므로 Python 다중 처리 라이브러리에서 새 프로세스를 생성 할 때 추가 비용을 발생시키지 않아야합니다. 그러나 다음 코드는 엄청난 오버 헤드가 있음을 나타냅니다.
from multiprocessing import Pool, Manager
import numpy as np;
import time
def f(arr):
return len(arr)
t = time.time()
arr = np.arange(10000000)
print "construct array = ", time.time() - t;
pool = Pool(processes = 6)
t = time.time()
res = pool.apply_async(f, [arr,])
res.get()
print "multiprocessing overhead = ", time.time() - t;
출력 (그리고 배열의 크기가 증가함에 따라 비용이 증가하므로 메모리 복사와 관련된 오버 헤드가 여전히 있다고 생각합니다) :
construct array = 0.0178790092468
multiprocessing overhead = 0.252444982529
배열을 복사하지 않으면 왜 그렇게 큰 오버 헤드가 발생합니까? 그리고 공유 메모리는 어떤 부분을 저장합니까?
쓰기시 복사 fork()
시맨틱 (공통 유닉스와 같은) 을 사용하는 운영 체제를 사용하는 경우 데이터 구조를 변경하지 않는 한 추가 메모리를 사용하지 않고 모든 하위 프로세스에서 사용할 수 있습니다. 특별한 작업을 수행하지 않아도됩니다 (객체를 변경하지 않는 것을 제외하고).
가장 효율적인 것은 당신이 당신의 문제를 위해 할 수있는가 (이용하여 효율적인 배열 구조로 배열을 포장하는 것 numpy
또는 array
), 공유 메모리에, 그것을 포장 그 장소 multiprocessing.Array
, 그리고 기능에 그것을 전달합니다. 이 답변은 그 방법을 보여줍니다 .
쓰기 가능한 공유 객체 를 원하는 경우 동기화 또는 잠금으로 래핑해야합니다. 이 작업을 수행하는 두 가지 방법을multiprocessing
제공합니다 . 하나는 공유 메모리 (단순 값, 배열 또는 ctype에 적합) 또는 프록시를 사용합니다. 여기서 하나의 프로세스는 메모리를 보유하고 관리자는 다른 프로세스 (네트워크를 통해서도)에서 액세스를 중재합니다.Manager
이 Manager
접근 방식은 임의의 Python 객체와 함께 사용할 수 있지만, 객체를 직렬화 / 직렬화하고 프로세스간에 전송해야하기 때문에 공유 메모리를 사용하는 것보다 속도가 느립니다.
파이썬 에는 다양한 병렬 처리 라이브러리와 접근 방식이 있습니다 . multiprocessing
우수하고 반올림 된 라이브러리이지만 특별한 요구가있는 경우 다른 접근법 중 하나가 더 좋을 수 있습니다.
나는 같은 문제에 부딪 히고 약간의 공유 메모리 유틸리티 클래스를 작성하여 해결했습니다.
multiprocessing.RawArray
(lockfree)를 사용 하고 있으며 어레이에 대한 액세스가 전혀 동기화되어 있지 않습니다 (lockfree). 자신의 발을 쏘지 않도록주의하십시오.
솔루션을 사용하면 쿼드 코어 i7에서 속도가 약 3 배 증가합니다.
코드는 다음과 같습니다. 사용하고 개선하고 버그를 다시보고하십시오.
'''
Created on 14.05.2013
@author: martin
'''
import multiprocessing
import ctypes
import numpy as np
class SharedNumpyMemManagerError(Exception):
pass
'''
Singleton Pattern
'''
class SharedNumpyMemManager:
_initSize = 1024
_instance = None
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super(SharedNumpyMemManager, cls).__new__(
cls, *args, **kwargs)
return cls._instance
def __init__(self):
self.lock = multiprocessing.Lock()
self.cur = 0
self.cnt = 0
self.shared_arrays = [None] * SharedNumpyMemManager._initSize
def __createArray(self, dimensions, ctype=ctypes.c_double):
self.lock.acquire()
# double size if necessary
if (self.cnt >= len(self.shared_arrays)):
self.shared_arrays = self.shared_arrays + [None] * len(self.shared_arrays)
# next handle
self.__getNextFreeHdl()
# create array in shared memory segment
shared_array_base = multiprocessing.RawArray(ctype, np.prod(dimensions))
# convert to numpy array vie ctypeslib
self.shared_arrays[self.cur] = np.ctypeslib.as_array(shared_array_base)
# do a reshape for correct dimensions
# Returns a masked array containing the same data, but with a new shape.
# The result is a view on the original array
self.shared_arrays[self.cur] = self.shared_arrays[self.cnt].reshape(dimensions)
# update cnt
self.cnt += 1
self.lock.release()
# return handle to the shared memory numpy array
return self.cur
def __getNextFreeHdl(self):
orgCur = self.cur
while self.shared_arrays[self.cur] is not None:
self.cur = (self.cur + 1) % len(self.shared_arrays)
if orgCur == self.cur:
raise SharedNumpyMemManagerError('Max Number of Shared Numpy Arrays Exceeded!')
def __freeArray(self, hdl):
self.lock.acquire()
# set reference to None
if self.shared_arrays[hdl] is not None: # consider multiple calls to free
self.shared_arrays[hdl] = None
self.cnt -= 1
self.lock.release()
def __getArray(self, i):
return self.shared_arrays[i]
@staticmethod
def getInstance():
if not SharedNumpyMemManager._instance:
SharedNumpyMemManager._instance = SharedNumpyMemManager()
return SharedNumpyMemManager._instance
@staticmethod
def createArray(*args, **kwargs):
return SharedNumpyMemManager.getInstance().__createArray(*args, **kwargs)
@staticmethod
def getArray(*args, **kwargs):
return SharedNumpyMemManager.getInstance().__getArray(*args, **kwargs)
@staticmethod
def freeArray(*args, **kwargs):
return SharedNumpyMemManager.getInstance().__freeArray(*args, **kwargs)
# Init Singleton on module load
SharedNumpyMemManager.getInstance()
if __name__ == '__main__':
import timeit
N_PROC = 8
INNER_LOOP = 10000
N = 1000
def propagate(t):
i, shm_hdl, evidence = t
a = SharedNumpyMemManager.getArray(shm_hdl)
for j in range(INNER_LOOP):
a[i] = i
class Parallel_Dummy_PF:
def __init__(self, N):
self.N = N
self.arrayHdl = SharedNumpyMemManager.createArray(self.N, ctype=ctypes.c_double)
self.pool = multiprocessing.Pool(processes=N_PROC)
def update_par(self, evidence):
self.pool.map(propagate, zip(range(self.N), [self.arrayHdl] * self.N, [evidence] * self.N))
def update_seq(self, evidence):
for i in range(self.N):
propagate((i, self.arrayHdl, evidence))
def getArray(self):
return SharedNumpyMemManager.getArray(self.arrayHdl)
def parallelExec():
pf = Parallel_Dummy_PF(N)
print(pf.getArray())
pf.update_par(5)
print(pf.getArray())
def sequentialExec():
pf = Parallel_Dummy_PF(N)
print(pf.getArray())
pf.update_seq(5)
print(pf.getArray())
t1 = timeit.Timer("sequentialExec()", "from __main__ import sequentialExec")
t2 = timeit.Timer("parallelExec()", "from __main__ import parallelExec")
print("Sequential: ", t1.timeit(number=1))
print("Parallel: ", t2.timeit(number=1))
이것은 병렬 및 분산 Python 용 라이브러리 인 Ray의 사용 사례입니다 . 후드 아래에서 Apache Arrow 데이터 레이아웃 (제로 복사 형식)을 사용하여 오브젝트를 직렬화 하고이를 공유 메모리 오브젝트 저장소에 저장 하므로 사본을 작성하지 않고 여러 프로세스에서 액세스 할 수 있습니다.
코드는 다음과 같습니다.
import numpy as np
import ray
ray.init()
@ray.remote
def func(array, param):
# Do stuff.
return 1
array = np.ones(10**6)
# Store the array in the shared memory object store once
# so it is not copied multiple times.
array_id = ray.put(array)
result_ids = [func.remote(array_id, i) for i in range(4)]
output = ray.get(result_ids)
호출하지 않으면 ray.put
배열은 여전히 공유 메모리에 저장되지만 호출마다 한 번씩 수행됩니다 func
. 원하는 것은 아닙니다.
Note that this will work not only for arrays but also for objects that contain arrays, e.g., dictionaries mapping ints to arrays as below.
You can compare the performance of serialization in Ray versus pickle by running the following in IPython.
import numpy as np
import pickle
import ray
ray.init()
x = {i: np.ones(10**7) for i in range(20)}
# Time Ray.
%time x_id = ray.put(x) # 2.4s
%time new_x = ray.get(x_id) # 0.00073s
# Time pickle.
%time serialized = pickle.dumps(x) # 2.6s
%time deserialized = pickle.loads(serialized) # 1.9s
Serialization with Ray is only slightly faster than pickle, but deserialization is 1000x faster because of the use of shared memory (this number will of course depend on the object).
See the Ray documentation. You can read more about fast serialization using Ray and Arrow. Note I'm one of the Ray developers.
Like Robert Nishihara mentioned, Apache Arrow makes this easy, specifically with the Plasma in-memory object store, which is what Ray is built on.
I made brain-plasma
specifically for this reason - fast loading and reloading of big objects in a Flask app. It's a shared-memory object namespace for Apache Arrow-serializable objects, including pickle
'd bytestrings generated by pickle.dumps(...).
The key difference with Apache Ray and Plasma is that it keeps track of object IDs for you. Any processes or threads or programs that are running on locally can share the variables' values by calling the name from any brain
object.
$ pip install brain-plasma
$ plasma_store -m 10000000 -s /tmp/plasma
from brain_plasma import Brain
brain = Brain(path='/tmp/plasma/)
brain['a'] = [1]*10000
brain['a']
# >>> [1,1,1,1,...]
참고URL : https://stackoverflow.com/questions/10721915/shared-memory-objects-in-multiprocessing
'Programing' 카테고리의 다른 글
C ++에서 함수의 실행 시간 측정 (0) | 2020.08.07 |
---|---|
cout은 동기화 / 스레드 안전합니까? (0) | 2020.08.06 |
멀티 모듈 메이븐 프로젝트에서 모듈 간 src / test 클래스 공유 (0) | 2020.08.06 |
JavaScript를위한 간단한 (비보안) 해시 함수? (0) | 2020.08.06 |
HtmlString과 MvcHtmlString (0) | 2020.08.06 |