본문 바로가기
Study/Python

Python multiprocessing.Pool 멀티프로세싱 2

by 개발새-발 2021. 6. 7.
반응형

Python에선 multiprocessing.Pool을 이용하여 멀티프로세싱을 할 수 있다. Process를 활용할 때는 우리가 직접 Process를 만들어서 그 Process위에서 작업을 돌렸다면, Pool은 지정된 개수만큼 프로세스를 미리 만들어 놓고, 그 프로세스들 위에서 작업을 돌리는 방식이다.

 

Pool 사용하기

from multiprocessing import Pool

if __name__ == '__main__':
    p = Pool(4)
    # do something here with Pool
    # blabla
    # blablabla
    p.close() # or p.terminate()
    p.join()

생성

처음 Pool을 생성할 때에 사용될 프로세스 수를 지정할 수 있다. 만약, 주어지지 않는다면 os.cpu_count() 값으로 지정된다.

 

사용 후 처리 close(), terminate()

생성한 Pool을 다 사용하였으면 적절한 위치에 close()join()을 호출해주는 것이 좋다. close()는 더 이상 Pool에 추가 작업이 들어가지 않는다는 것을 알려주며, 지금 수행 중인 작업이 모두 끝나면 Pool의 프로세스들을 종료한다. close() 대신terminate()를 사용하면, 현재 진행 중인 작업이 있더라도 즉시 Pool의 프로세스들을 종료한다. join()Pool의 모든 프로세스들의 종료가 완료되기를 기다린다.

 

with문과의 사용

with문과 Pool을 함께 사용할 수 있다. __enter__ Pool의 생성이, __exit__terminate()가 호출된다.

from multiprocessing import Pool

if __name__ == '__main__':
    with Pool(4) as p:
        # do something here with Pool
        # blabla
        # blablabla

 

apply()

Pool에게 작업 하나를 시킨다. 그리고 작업이 끝날 때까지 기다렸다가 결과를 받는다. 아래는 실행시간이 대략 1초 정도 되는 func을 1~5의 값을 인자로 주어 apply()를 사용하여 총 5번 실행하는 예제이다.

from multiprocessing import Pool
import multiprocessing as mp
import time
import os

def func(num):
    c_proc = mp.current_process()
    print("Running on Process",c_proc.name,"PID",c_proc.pid)
    time.sleep(1)
    print("Ended",num,"Process",c_proc.name)
    return num
        

if __name__ == '__main__':
    p = Pool(4)
    start = time.time()

    ret = p.apply(func,(1,))
    print(ret)
    ret = p.apply(func,(2,))
    print(ret)
    ret = p.apply(func,(3,))
    print(ret)
    ret = p.apply(func,(4,))
    print(ret)
    ret = p.apply(func,(5,))
    print(ret)

    delta_t = time.time()-start
    print("Time :",delta_t)
    
    p.close()
    p.join()
Running on Process SpawnPoolWorker-3 PID 18264
Ended 1 Process SpawnPoolWorker-3
1
Running on Process SpawnPoolWorker-1 PID 9620
Ended 2 Process SpawnPoolWorker-1
2
Running on Process SpawnPoolWorker-2 PID 20172
Ended 3 Process SpawnPoolWorker-2
3
Running on Process SpawnPoolWorker-4 PID 11964
Ended 4 Process SpawnPoolWorker-4
4
Running on Process SpawnPoolWorker-3 PID 18264
Ended 5 Process SpawnPoolWorker-3
5
Time : 5.0882182121276855

apply의 경우 작업 하나를 주고 그 작업이 끝날 때까지 다른 작업을 할 수 없다. 그래서 사실 위 예제의 func()Pool을 쓰지 않고 5번 사용한 것과 실행시간의 차이가 거의 없다.

 

apply_async()

Pool에게 작업 하나를 시키고, AsyncResult를 반환받는다. 반환받은 AsyncResult에서 get()을 호출하면 작업의 반환 값을 얻을 수 있다.

from multiprocessing import Pool
import multiprocessing as mp
import time
import os

def func(num):
    c_proc = mp.current_process()
    print("Running on Process",c_proc.name,"PID",c_proc.pid)
    time.sleep(1)
    print("Ended",num,"Process",c_proc.name)
    return num
        
if __name__ == '__main__':
    p = Pool(4)
    start = time.time()

    ret1 = p.apply_async(func,(1,))
    ret2 = p.apply_async(func,(2,))
    ret3 = p.apply_async(func,(3,))
    ret4 = p.apply_async(func,(4,))
    ret5 = p.apply_async(func,(5,))
    print(ret1.get(),ret2.get(),ret3.get(),ret4.get(),ret5.get())

    delta_t = time.time()-start
    print("Time :",delta_t)
    
    p.close()
    p.join()
Running on Process SpawnPoolWorker-1 PID 11492
Running on Process SpawnPoolWorker-2 PID 8736
Running on Process SpawnPoolWorker-4 PID 16228
Running on Process SpawnPoolWorker-3 PID 14132
Ended 2 Process SpawnPoolWorker-2
Ended 1 Process SpawnPoolWorker-1
Running on Process SpawnPoolWorker-2 PID 8736
Ended 4 Process SpawnPoolWorker-3
Ended 3 Process SpawnPoolWorker-4
Ended 5 Process SpawnPoolWorker-2
1 2 3 4 5
Time : 2.086998462677002

프로세스 4개를 사용하기로 설정이 되어있다. 그래서 처음 4개의 작업을 4개의 프로세스로 동시에 거의 1초 만에 처리하고, 이후 나머지 작업을 처리하는 데에 1초가 소요되어 총 소요시간은 2초가 됨을 볼 수 있다. 여기서 중요한 것은 AsyncResultget() 호출 시점이다. get()을 호출하여 결과물을 받기 위해선, 그 작업이 끝날 때까지 기다려야 한다. 그런데 만약 위에서 p.apply_async(~~) 부분을 아래와 같이 작성하였다고 해보자.

    ret1 = p.apply_async(func,(1,)).get()
    ret2 = p.apply_async(func,(2,)).get()
    ret3 = p.apply_async(func,(3,)).get()
    ret4 = p.apply_async(func,(4,)).get()
    ret5 = p.apply_async(func,(5,)).get()

이 경우에 두번째 줄은 첫 번째 줄의 작업이 끝나기 전까지 호출되지 않는다. 결국 이 경우도 apply()를 사용한 것과 마찬가지로 멀티프로세싱의 시간적 이점을 누릴 수 없게 된다.

 

apply는 그 작업이 완료되지 않으면 메인 프로세스에서 다음 줄의 코드를 실행하지 않는다. 반면 apply_asyncapply_async을 사용한 줄에서 작업이 다 끝나지 않아도 메인 프로세스의 다음 줄을 실행할 수 있다. 단, 반환받은 AsyncResultget()을 호출한다면 그 작업이 끝나기 전까지는 메인 프로세스에서도 다음 줄로 넘어갈 수가 없다. 이 둘의 차이는 map(), map_async()starmap(), starmap_async()에서도 마찬가지이다.

 

참고로, apply()apply_async().get()으로 구현되어 있다.

 

map() , map_async()

iterable에 대해 동일한 함수를 멀티프로세싱을 이용하여 처리하고자 할 때 사용한다. 단, 사용하고자 하는 함수는 단일 인자를 받아야 한다.

from multiprocessing import Pool
import multiprocessing as mp
import time
import os

def func(num):
    c_proc = mp.current_process()
    print("Running on Process",c_proc.name,"PID",c_proc.pid)
    time.sleep(1)
    print("Ended",num,"Process",c_proc.name)
    return num
        

if __name__ == '__main__':
    p = Pool(4)
    start = time.time()
    
    ret = p.map(func,[1,2,3,4,5])
    print(ret)

    delta_t = time.time() - start
    print("Time :",delta_t)
    
    p.close()
    p.join()
Running on Process SpawnPoolWorker-1 PID 6600
Running on Process SpawnPoolWorker-3 PID 20064
Running on Process SpawnPoolWorker-4 PID 17432
Running on Process SpawnPoolWorker-2 PID 11312
Ended 1 Process SpawnPoolWorker-1
Ended 3 Process SpawnPoolWorker-4
Ended 2 Process SpawnPoolWorker-3
Ended 4 Process SpawnPoolWorker-2
Running on Process SpawnPoolWorker-1 PID 6600
Ended 5 Process SpawnPoolWorker-1
[1, 2, 3, 4, 5]
Time : 2.0998990535736084

map_async()apply_async()와 동일하게 AsyncResult를 반환받는다. map은 작업이 끝나기 이전에 메인 프로세스의 다음 줄의 코드들을 실행할 수 없지만, map_async()AsyncResultget()을 호출하기 이전까지는 작업이 완전히 끝나지 않아도 메인프로세스의 다음 코드들을 실행할 수 있다.

from multiprocessing import Pool
import multiprocessing as mp
import time
import os

def func(num):
    c_proc = mp.current_process()
    print("Running on Process",c_proc.name,"PID",c_proc.pid)
    time.sleep(1)
    print("Ended",num,"Process",c_proc.name)
    return num
        

if __name__ == '__main__':
    p = Pool(4)
    start = time.time()
    
    ret = p.map_async(func,[1,2,3,4,5])
    print("is 'ret' ready? :",ret.ready())
    print(ret.get())

    delta_t = time.time() - start
    print("Time :",delta_t)
    
    p.close()
    p.join()
is 'ret' ready? : False
Running on Process SpawnPoolWorker-1 PID 13200
Running on Process SpawnPoolWorker-2 PID 2276
Running on Process SpawnPoolWorker-3 PID 13828
Running on Process SpawnPoolWorker-4 PID 7652
Ended 3 Process SpawnPoolWorker-3
Ended 4 Process SpawnPoolWorker-4
Ended 2 Process SpawnPoolWorker-2
Ended 1 Process SpawnPoolWorker-1
Running on Process SpawnPoolWorker-3 PID 13828
Ended 5 Process SpawnPoolWorker-3
[1, 2, 3, 4, 5]
Time : 2.1028099060058594

 

starmap() , starmap_async()

인자를 두 개 이상 받을 수 있다는 점을 제외하면 map(), map_async()와 같다.

from multiprocessing import Pool
import multiprocessing as mp
import time
import os

def mul(x,y):
    c_proc = mp.current_process()
    print("Running on Process",c_proc.name,"PID",c_proc.pid)
    time.sleep(1)
    print("Ended",x,"*",y,"Process",c_proc.name)
    return x*y
        

if __name__ == '__main__':
    p = Pool(4)
    start = time.time()
    
    ret = p.starmap(mul,[[1,2],[2,3],[3,4],[4,5],[5,6]])
    print(ret)

    delta_t = time.time() - start
    print("Time :",delta_t)
    
    p.close()
    p.join()
Running on Process SpawnPoolWorker-2 PID 3460
Running on Process SpawnPoolWorker-3 PID 19636
Running on Process SpawnPoolWorker-1 PID 8232
Running on Process SpawnPoolWorker-4 PID 6876
Ended 3 * 4 Process SpawnPoolWorker-1
Ended 4 * 5 Process SpawnPoolWorker-4
Ended 1 * 2 Process SpawnPoolWorker-2
Ended 2 * 3 Process SpawnPoolWorker-3
Running on Process SpawnPoolWorker-1 PID 8232
Ended 5 * 6 Process SpawnPoolWorker-1
[2, 6, 12, 20, 30]
Time : 2.096247673034668

starmap_async는 위의 코드에서 starmapstarmap_async로 바꾸어주고, map_async에서 처리한 것과 같이 AsyncResult를 받아 원하는 위치에서 get()을 호출해주면 된다. map_async와 많은 부분이 유사하기 때문에 예제 코드는 작성하지 않았다.

 

imap(), imap_unordered()

map의 결과물은 list인 반면, imap의 결과물은 iterator이다. 기본 chunkzise는 1인데, 1 대신 적절하게 큰 값을 써주면 훨씬 빨리 처리할 수 있다. 결과물의 길이가 길어서 list로 나타내었을 때 메모리에 부담이 가는 경우 imap을 사용해주면 좋다 imap_unordered()는 순서가 보장되지 않는다.

from multiprocessing import Pool
import multiprocessing as mp
import time
import os

def func(num):
    c_proc = mp.current_process()
    print("Running on Process",c_proc.name,"PID",c_proc.pid,"/ num",num)
    time.sleep(1)
    print("Ended",num,"Process",c_proc.name)
    return num
        

if __name__ == '__main__':
    p = Pool(4)
    start = time.time()
    
    for ret in p.imap(func,[1,2,3,4,5]):
        print("Got value",ret,"Time :",time.time()-start)

    delta_t = time.time() - start
    print("Total Time :",delta_t)
    
    p.close()
    p.join()
Running on Process SpawnPoolWorker-1 PID 568 / num 1
Running on Process SpawnPoolWorker-3 PID 5548 / num 2
Running on Process SpawnPoolWorker-2 PID 3720 / num 3
Running on Process SpawnPoolWorker-4 PID 12276 / num 4
Ended 3 Process SpawnPoolWorker-2
Ended 1 Process SpawnPoolWorker-1
Ended 2 Process SpawnPoolWorker-3
Ended 4 Process SpawnPoolWorker-4
Running on Process SpawnPoolWorker-2 PID 3720 / num 5
Got value 1 Time : 1.0782506465911865
Got value 2 Time : 1.0782506465911865
Got value 3 Time : 1.0782506465911865
Got value 4 Time : 1.0782506465911865
Ended 5 Process SpawnPoolWorker-2
Got value 5 Time : 2.0864195823669434
Total Time : 2.08742618560791

 

apply_async, map_async, starmap_async에서 callback

위 세 개의 함수에 callback을 달아줄 수 있다. callback의 인자는 위 함수들의 결과물을 받는다. 작업이 완료되는 순간 실행된다.

from multiprocessing import Pool
import multiprocessing as mp
import time
import os

def callback_func(result):
    print("callback_func got result :",result)

def square(x):
    return x*x

if __name__ == '__main__':
    with Pool(4) as p:
        result = p.map_async(square,range(11),callback=callback_func)
        result.wait()
callback_func got result : [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100]

 

반응형

댓글