최근에 여러 가상 머신에서 작업을 실행해야 하는 프로젝트를 발견했습니다. 다른 사람들의 이전 프로젝트 코드를 참조하고 멀티 프로세스를 사용하여 처리했기 때문에 온라인에서 Python의 멀티 프로세스에 대해 확인했습니다.
1. 먼저 Queue(queue 객체)에 대해 이야기해보겠습니다
Queue는 이전에 공부할 때 직접 Import하고 인용할 수 있는 Python의 표준 라이브러리입니다. , "먼저 먹고, 먼저 잡아라"와 "먼저 먹고, 먼저 토하라"는 유명한 말이 실제로 여기서 언급된 대기열이라고 들었습니다. 대기열을 구성할 때 용량을 정의할 수 있습니다. 과식하지 마세요. 너무 많이 먹으면 , 작성 시 오류가 발생합니다. 숫자는 무한
import Queue
q =을 의미합니다. Queue.Queue(10)
큐에 넣기 Value(put)
q.put('yang')
q.put(4)
q.put(['yan','xing'])
큐에서 값 가져오기 ()
기본 대기열은 선입선출입니다
>>> q.get()
'yang'
>>> ; q.get()
4
>>> q .get()
['yan', 'xing']
큐가 비어 있는 경우 , get을 사용하여 검색하면 차단되므로 일반적으로 대기열을 검색할 때 사용됩니다.
get_nowait() 메서드, 이 메서드는 빈 대기열에서 값을 가져올 때 빈 예외를 발생시킵니다.
따라서 더 일반적인 방법은 먼저 대기열이 비어 있는지 확인하는 것입니다. 비어 있으면 값을 가져옵니다
큐에서 일반적으로 사용되는 방법
Queue.qsize()는 대기열의 크기를 반환합니다
Queue.empty() 대기열이 비어 있으면 True를 반환하고, 그렇지 않으면 False를 반환합니다
Queue.full() 대기열이 가득 차면 True를 반환합니다. , 그렇지 않으면 False
Queue.get([block[, timeout]]) 대기열 가져오기, 시간 초과 대기 시간
Queue.get_nowait () Queue.get(False)과 동일
Non-blocking Queue.put (item) 큐에 쓰기, 타임아웃 대기 시간
Queue.put_nowait(item) Queue.put(item, False)와 동일
2. 다중 처리에서 하위 프로세스 개념 사용
from multiprocessing import Process
Process를 통해 하위 프로세스를 구성할 수 있습니다
p = Process(target =fun,args=(args))
그런 다음 p.start()를 사용하여 하위 프로세스를 시작합니다
그런 다음 p.join() 메서드를 사용하여 상위 프로세스를 실행하기 전에 하위 프로세스의 실행을 완료합니다
from multiprocessing import Process import os # 子进程要执行的代码 def run_proc(name): print 'Run child process %s (%s)...' % (name, os.getpid()) if __name__=='__main__': print 'Parent process %s.' % os.getpid() p = Process(target=run_proc, args=('test',)) print 'Process will start.' p.start() p.join() print 'Process end.'
3. 필요한 경우 다중 처리에서 pool
을 사용합니다. 하위 프로세스가 여러 개인 경우 고려할 수 있습니다. 프로세스 풀을 사용하여
from multiprocessing import Pool
from multiprocessing import Pool import os, time def long_time_task(name): print 'Run task %s (%s)...' % (name, os.getpid()) start = time.time() time.sleep(3) end = time.time() print 'Task %s runs %0.2f seconds.' % (name, (end - start)) if __name__=='__main__': print 'Parent process %s.' % os.getpid() p = Pool() for i in range(5): p.apply_async(long_time_task, args=(i,)) print 'Waiting for all subprocesses done...' p.close() p.join() print 'All subprocesses done.'
풀을 관리하는 방법은 다음과 같습니다. Process와는 다릅니다.
p.apply_async(func,args=(args))를 통해 구현됩니다. 풀에서 동시에 실행할 수 있는 작업은 컴퓨터의 CPU 수에 따라 다릅니다. 이제 컴퓨터에는 4개의 CPU가 있습니다. 그러면 하위 프로세스 task0, task1, task2 및 task3이 동시에 시작될 수 있습니다. 이전 프로세스가 종료된 후 Task4가 시작됩니다.
위 프로그램을 실행한 후의 결과는 실제로 위 그림의 1, 2, 3에 따라 별도로 수행됩니다. 1이 먼저 인쇄되고, 2가 3초 후에 인쇄되고, 3이 3초 후에 인쇄됩니다.
코드의 p. close()는 프로세스 풀을 닫고 더 이상 프로세스를 추가하지 않습니다. Pool 객체에서 Join() 메서드를 호출하면 모든 하위 프로세스가 실행을 완료할 때까지 기다립니다. 먼저 close()를 호출하고 close()를 호출해야 합니다. 그 후에는 새 프로세스를 계속 추가할 수 없습니다.
그때 인스턴스 풀의 프로세스 수를 정의할 수도 있습니다
위 코드에서 p=Pool(5)이면 모든 하위 프로세스를 동시에 처리할 수 있습니다.
3. 여러 하위 프로세스 간 통신
여러 하위 프로세스 간 통신에는 첫 번째 단계에서 언급한 대기열을 사용해야 합니다. 다음 요구 사항, 하위 프로세스는 큐에 데이터를 쓰고 다른 프로세스는 큐에서 데이터를 가져옵니다.
#coding:gbk from multiprocessing import Process, Queue import os, time, random # 写数据进程执行的代码: def write(q): for value in ['A', 'B', 'C']: print 'Put %s to queue...' % value q.put(value) time.sleep(random.random()) # 读数据进程执行的代码: def read(q): while True: if not q.empty(): value = q.get(True) print 'Get %s from queue.' % value time.sleep(random.random()) else: break if __name__=='__main__': # 父进程创建Queue,并传给各个子进程: q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) # 启动子进程pw,写入: pw.start() # 等待pw结束: pw.join() # 启动子进程pr,读取: pr.start() pr.join() # pr进程里是死循环,无法等待其结束,只能强行终止: print print '所有数据都写入并且读完'
4. 위 내용에 대해 코드에 관한 몇 가지 흥미로운 문제
if __name__=='__main__': # 父进程创建Queue,并传给各个子进程: q = Queue() p = Pool() pw = p.apply_async(write,args=(q,)) pr = p.apply_async(read,args=(q,)) p.close() p.join() print print '所有数据都写入并且读完'
주 함수를 위의 샘플과 같이 작성하면 내가 무엇을 대기열을 가져오는 것이 프로세스 풀의 각 하위 프로세스에 매개변수로 전달되지만
RuntimeError: 대기열 개체는 상속을 통해 프로세스 간에만 공유되어야 합니다
<🎜 오류가 발생합니다. > 확인 결과, 객체는 상위 프로세스와 하위 프로세스 간에 통신할 수 없습니다. 프로세스 풀에서 큐를 사용하려면 다중 프로세스의 Manager 클래스를 사용해야 합니다
if __name__=='__main__': manager = multiprocessing.Manager() # 父进程创建Queue,并传给各个子进程: q = manager.Queue() p = Pool() pw = p.apply_async(write,args=(q,)) time.sleep(0.5) pr = p.apply_async(read,args=(q,)) p.close() p.join() print print '所有数据都写入并且读完'
关于锁的应用,在不同程序间如果有同时对同一个队列操作的时候,为了避免错误,可以在某个函数操作队列的时候给它加把锁,这样在同一个时间内则只能有一个子进程对队列进行操作,锁也要在manager对象中的锁
#coding:gbk from multiprocessing import Process,Queue,Pool import multiprocessing import os, time, random # 写数据进程执行的代码: def write(q,lock): lock.acquire() #加上锁 for value in ['A', 'B', 'C']: print 'Put %s to queue...' % value q.put(value) lock.release() #释放锁 # 读数据进程执行的代码: def read(q): while True: if not q.empty(): value = q.get(False) print 'Get %s from queue.' % value time.sleep(random.random()) else: break if __name__=='__main__': manager = multiprocessing.Manager() # 父进程创建Queue,并传给各个子进程: q = manager.Queue() lock = manager.Lock() #初始化一把锁 p = Pool() pw = p.apply_async(write,args=(q,lock)) pr = p.apply_async(read,args=(q,)) p.close() p.join() print print '所有数据都写入并且读完'
更多Python의 대기열 및 다중 프로세스相关文章请关注PHP中文网!