澳门新萄京官方网站-www.8455.com-澳门新萄京赌场网址

澳门新萄京官方网站:python多进程总括,Python多

2019-08-31 作者:www.8455.com   |   浏览(119)

序. multiprocessing
python中的多线程其实并不是实在的多线程,要是想要充裕地行使多核CPU的财富,在python中山大学部景观必要选拔多进度。Python提供了要命好用的多进度包multiprocessing,只要求定义一个函数,Python会达成其他全数工作。借助这么些包,能够轻巧做到从单进程到并发试行的改造。multiprocessing帮助子进度、通讯和分享数据、试行差异样式的共同,提供了Process、Queue、Pipe、Lock等零件。

读书目录

  python中的四线程其实并非真正的十二线程,即便想要丰硕地利用多核CPU的能源,在python中山大学部境况要求选拔多进程。Python提供了丰硕好用的多进度包multiprocessing,只要求定义二个函数,Python会达成别的具有事务。借助这么些包,能够轻便做到从单进度到出现实施的转变。multiprocessing帮助子进度、通讯和分享数据、实行分化式的联合,提供了Process、Queue、Pipe、Lock等零件。

原文:

  python中的八线程其实并非真正的二十四线程,借使想要丰硕地行使多核CPU的能源,在python中山高校部状态必要使用多进程。Python提供了那个好用的多进度包multiprocessing,只须求定义一个函数,Python会完结其余全数工作。借助那些包,能够轻巧做到从单进度到出现试行的转移。multiprocessing帮助子进度、通信和分享数据、试行区别式样的共同,提供了Process、Queue、Pipe、Lock等零件。

 

    1. Process
    1. Lock
    1. Semaphore
    1. Event
    1. Queue
    1. Pipe
    1. Pool

1、Process

阅读目录

1、Process

澳门新萄京官方网站:python多进程总括,Python多进程编程。1. Process

创立进度的类:Process([group [, target [, name [, args [, kwargs]]]]]),target表示调用对象,args表示调用对象的地点参数元组。kwargs表示调用对象的字典。name为小名。group实质上不选取。
方法:is_alive()、join([timeout])、run()、start()、terminate()。个中,Process以start()运行有些进程。

属性:authkey、daemon(要通过start()设置)、exitcode(进度在运维时为None、要是为–N,表示被时域信号N甘休)、name、pid。当中daemon是父进度终止后自动终止,且本人不可能生出新进度,必得在start()此前安装。

 

例1.1:创制函数并将其看作单个进度

import multiprocessing
import time

def worker(interval):
    n = 5
    while n > 0:
        print("The time is {0}".format(time.ctime()))
        time.sleep(interval)
        n -= 1

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.start()
    print "p.pid:", p.pid
    print "p.name:", p.name
    print "p.is_alive:", p.is_alive()

结果

p.pid: 8736
p.name: Process-1
p.is_alive: True
The time is Tue Apr 21 20:55:12 2015
The time is Tue Apr 21 20:55:15 2015
The time is Tue Apr 21 20:55:18 2015
The time is Tue Apr 21 20:55:21 2015
The time is Tue Apr 21 20:55:24 2015

 

例1.2:创造函数并将其用作多个经过

import multiprocessing
import time

def worker_1(interval):
    print "worker_1"
    time.sleep(interval)
    print "end worker_1"

def worker_2(interval):
    print "worker_2"
    time.sleep(interval)
    print "end worker_2"

def worker_3(interval):
    print "worker_3"
    time.sleep(interval)
    print "end worker_3"

if __name__ == "__main__":
    p1 = multiprocessing.Process(target = worker_1, args = (2,))
    p2 = multiprocessing.Process(target = worker_2, args = (3,))
    p3 = multiprocessing.Process(target = worker_3, args = (4,))

    p1.start()
    p2.start()
    p3.start()

    print("The number of CPU is:"   str(multiprocessing.cpu_count()))
    for p in multiprocessing.active_children():
        print("child   p.name:"   p.name   "tp.id"   str(p.pid))
    print "END!!!!!!!!!!!!!!!!!"

结果

The number of CPU is:4
child   p.name:Process-3 p.id7992
child   p.name:Process-2 p.id4204
child   p.name:Process-1 p.id6380
END!!!!!!!!!!!!!!!!!
worker_1
worker_3
worker_2
end worker_1
end worker_2
end worker_3

 

例1.3:将经过定义为类

import multiprocessing
import time

class ClockProcess(multiprocessing.Process):
    def __init__(self, interval):
        multiprocessing.Process.__init__(self)
        self.interval = interval

    def run(self):
        n = 5
        while n > 0:
            print("the time is {0}".format(time.ctime()))
            time.sleep(self.interval)
            n -= 1

if __name__ == '__main__':
    p = ClockProcess(3)
    p.start()      

:进程p调用start()时,自动调用run()

结果

the time is Tue Apr 21 20:31:30 2015
the time is Tue Apr 21 20:31:33 2015
the time is Tue Apr 21 20:31:36 2015
the time is Tue Apr 21 20:31:39 2015
the time is Tue Apr 21 20:31:42 2015

 

例1.4:daemon程序比较结果

#1.4-1 不加daemon属性

import multiprocessing
import time

def worker(interval):
    print("work start:{0}".format(time.ctime()));
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()));

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.start()
    print "end!"

结果

end!
work start:Tue Apr 21 21:29:10 2015
work end:Tue Apr 21 21:29:13 2015

#1.4-2 加上daemon属性

import multiprocessing
import time

def worker(interval):
    print("work start:{0}".format(time.ctime()));
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()));

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.daemon = True
    p.start()
    print "end!"

结果

end!

:因子进度设置了daemon属性,主进程停止,它们就趁早截至了。

#1.4-3 设置daemon施行完甘休的点子

import multiprocessing
import time

def worker(interval):
    print("work start:{0}".format(time.ctime()));
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()));

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.daemon = True
    p.start()
    p.join()
    print "end!"

结果

work start:Tue Apr 21 22:16:32 2015
work end:Tue Apr 21 22:16:35 2015
end!

 

序. multiprocessing
python中的四线程其实实际不是真的的四线程,假如想要充裕地利用多核CPU的财富,在python中多数情状需求利用多进度。Python提供了十二分好用的多进度包multiprocessing,只需求定义叁个函数,Python会达成其余具有业务。借助这么些包,能够轻便达成从单进度到并发施行的改造。multiprocessing援助子进度、通讯和分享数据、试行不一式样的一块,提供了Process、Queue、Pipe、Lock等零件。

创制进度的类:Process([group [, target [, name [, args [, kwargs]]]]]),target表示调用对象,args表示调用对象的地方参数元组。kwargs表示调用对象的字典。name为小名。group实质上不利用。

  • 1. Process
  • 2. Lock
  • 3. Semaphore
  • 4. Event
  • 5. Queue
  • 6. Pipe
  • 7. Pool

创制进程的类:Process([group [, target [, name [, args [, kwargs]]]]]),target表示调用对象,args表示调用对象的职位参数元组。kwargs表示调用对象的字典。name为外号。group实质上不行使。

2. Lock

当三个经过供给访谈分享能源的时候,Lock能够用来制止访问的争辨。

import multiprocessing
import sys

def worker_with(lock, f):
    with lock:
        fs = open(f, 'a ')
        n = 10
        while n > 1:
            fs.write("Lockd acquired via withn")
            n -= 1
        fs.close()

def worker_no_with(lock, f):
    lock.acquire()
    try:
        fs = open(f, 'a ')
        n = 10
        while n > 1:
            fs.write("Lock acquired directlyn")
            n -= 1
        fs.close()
    finally:
        lock.release()

if __name__ == "__main__":
    lock = multiprocessing.Lock()
    f = "file.txt"
    w = multiprocessing.Process(target = worker_with, args=(lock, f))
    nw = multiprocessing.Process(target = worker_no_with, args=(lock, f))
    w.start()
    nw.start()
    print "end"

结果(输出文件)

Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly

 

 

方法:is_alive()、join([timeout])、run()、start()、terminate()。个中,Process以start()运维有些进度。

序. multiprocessing
python中的四线程其实而不是确实的八线程,若是想要足够地动用多核CPU的财富,在python中比非常多情形须要运用多进程。Python提供了相当好用的多进程包multiprocessing,只必要定义一个函数,Python会实现其他具备业务。借助那些包,能够轻便完毕从单进度到出现推行的更换。multiprocessing帮衬子进度、通讯和分享数据、实施不一样式样的协同,提供了Process、Queue、Pipe、Lock等零件。

方法:is_alive()、join([timeout])、run、terminate()。在那之中,Process以start()运转有个别过程。

3. Semaphore

Semaphore用来支配成对分享能源的拜会数量,例如池的最加纳Ake拉接数。

import multiprocessing
import time

def worker(s, i):
    s.acquire()
    print(multiprocessing.current_process().name   "acquire");
    time.sleep(i)
    print(multiprocessing.current_process().name   "releasen");
    s.release()

if __name__ == "__main__":
    s = multiprocessing.Semaphore(2)
    for i in range(5):
        p = multiprocessing.Process(target = worker, args=(s, i*2))
        p.start()

结果

Process-1acquire
Process-1release

Process-2acquire
Process-3acquire
Process-2release

Process-5acquire
Process-3release

Process-4acquire
Process-5release

Process-4release

 

属性:authkey、daemon(要由此start()设置)、exitcode(进度在运转时为None、假如为–N,表示被信号N结束)、name、pid。个中daemon是父进程终止后自动截止,且自个儿不可能发出新进程,必得在start()在此以前设置。

 

属性:authkey、daemon(要经过start、exitcode(进度在运维时为None、假如为–N,表示被复信号N结束)、name、pid。当中daemon是父进度终止后自动终止,且本身不可能爆发新进度,必须在start()从前设置。

4. Event

伊夫nt用来完结进程间一块通讯。

import multiprocessing
import time

def wait_for_event(e):
    print("wait_for_event: starting")
    e.wait()
    print("wairt_for_event: e.is_set()->"   str(e.is_set()))

def wait_for_event_timeout(e, t):
    print("wait_for_event_timeout:starting")
    e.wait(t)
    print("wait_for_event_timeout:e.is_set->"   str(e.is_set()))

if __name__ == "__main__":
    e = multiprocessing.Event()
    w1 = multiprocessing.Process(name = "block",
            target = wait_for_event,
            args = (e,))

    w2 = multiprocessing.Process(name = "non-block",
            target = wait_for_event_timeout,
            args = (e, 2))
    w1.start()
    w2.start()

    time.sleep(3)

    e.set()
    print("main: event is set")

结果

wait_for_event: starting
wait_for_event_timeout:starting
wait_for_event_timeout:e.is_set->False
main: event is set
wairt_for_event: e.is_set()->True

 

1. Process

创设进度的类:Process([group [, target [, name [, args [, kwargs]]]]]),

  target表示调用对象,

  args表示调用对象的岗位参数元组。

  kwargs表示调用对象的字典。

  name为别名。

  group实质上不应用。
方法:is_alive()、join([timeout])、run()、start()、terminate()。

  个中,Process以start()运维某些进度。

属性:authkey、daemon(要经过start()设置)、exitcode(进度在运作时为None、即使为–N,表示被随机信号N甘休)、name、pid。

  当中daemon是父进度终止后活动终止,且自身不可能生出新历程,必须在start()此前安装。

 

例1.1:创造函数并将其看成单个进度

澳门新萄京官方网站 1

import multiprocessing
import time

def worker(interval):
    n = 5
    while n > 0:
        print("The time is {0}".format(time.ctime()))
        time.sleep(interval)
        n -= 1

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.start()
    print("p.pid:", p.pid)
    print("p.name:", p.name)
    print("p.is_alive:", p.is_alive())

澳门新萄京官方网站 2

结果

1
2
3
4
5
6
7
8
p.pid: 8736
p.name: Process-1
p.is_alive: True
The time is Tue Apr 21 20:55:12 2015
The time is Tue Apr 21 20:55:15 2015
The time is Tue Apr 21 20:55:18 2015
The time is Tue Apr 21 20:55:21 2015
The time is Tue Apr 21 20:55:24 2015

 

例1.2:创立函数并将其用作多少个进程

澳门新萄京官方网站 3

import multiprocessing
import time

def worker_1(interval):
    print("worker_1")
    time.sleep(interval)
    print("end worker_1")

def worker_2(interval):
    print("worker_2")
    time.sleep(interval)
    print("end worker_2")

def worker_3(interval):
    print("worker_3")
    time.sleep(interval)
    print("end worker_3")

if __name__ == "__main__":
    p1 = multiprocessing.Process(target = worker_1, args = (2,))
    p2 = multiprocessing.Process(target = worker_2, args = (3,))
    p3 = multiprocessing.Process(target = worker_3, args = (4,))

    p1.start()
    p2.start()
    p3.start()

    print("The number of CPU is:"   str(multiprocessing.cpu_count()))
    for p in multiprocessing.active_children():
        print("child   p.name:"   p.name   "tp.id"   str(p.pid))
    print("END!!!!!!!!!!!!!!!!!")

澳门新萄京官方网站 4

结果

1
2
3
4
5
6
7
8
9
10
11
The number of CPU is:4
child   p.name:Process-3    p.id7992
child   p.name:Process-2    p.id4204
child   p.name:Process-1    p.id6380
END!!!!!!!!!!!!!!!!!
worker_1
worker_3
worker_2
end worker_1
end worker_2
end worker_3

 

例1.3:将经过定义为类

澳门新萄京官方网站 5

import multiprocessing
import time

class ClockProcess(multiprocessing.Process):
    def __init__(self, interval):
        multiprocessing.Process.__init__(self)
        self.interval = interval

    def run(self):
        n = 5
        while n > 0:
            print("the time is {0}".format(time.ctime()))
            time.sleep(self.interval)
            n -= 1

if __name__ == '__main__':
    p = ClockProcess(3)
    p.start()      

澳门新萄京官方网站 6

:进度p调用start()时,自动调用run()

结果

1
2
3
4
5
the time is Tue Apr 21 20:31:30 2015
the time is Tue Apr 21 20:31:33 2015
the time is Tue Apr 21 20:31:36 2015
the time is Tue Apr 21 20:31:39 2015
the time is Tue Apr 21 20:31:42 2015

 

例1.4:daemon程序相比结果

#1.4-1 不加daemon属性

澳门新萄京官方网站 7

import multiprocessing
import time

def worker(interval):
    print("work start:{0}".format(time.ctime()));
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()));

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.start()
    print("end!")

澳门新萄京官方网站 8

结果

1
2
3
end!
work start:Tue Apr 21 21:29:10 2015
work end:Tue Apr 21 21:29:13 2015

#1.4-2 加上daemon属性

澳门新萄京官方网站 9

import multiprocessing
import time

def worker(interval):
    print("work start:{0}".format(time.ctime()));
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()));

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.daemon = True
    p.start()
    print("end!")

澳门新萄京官方网站 10

结果

1
end!

:因子进度设置了daemon属性,主进程结束,它们就趁早截至了。

#1.4-3 设置daemon实践完截止的措施

澳门新萄京官方网站 11

import multiprocessing
import time

def worker(interval):
    print("work start:{0}".format(time.ctime()));
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()));

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.daemon = True
    p.start()
    p.join()
    print("end!")

澳门新萄京官方网站 12

结果

1
2
3
work start:Tue Apr 21 22:16:32 2015
work end:Tue Apr 21 22:16:35 2015
end!

 

注:

再次来到顶端

注:

5. Queue

Queue是多进度安全的连串,能够选择Queue落成多进度之间的数据传递。put方法用以插入数据到行列中,put方法还恐怕有五个可选参数:blocked和timeout。倘使blocked为True(暗中认可值),何况timeout为正值,该方法会阻塞timeout钦命的小运,直到该队列有剩余的空间。要是超时,会抛出Queue.Full卓殊。纵然blocked为False,但该Queue已满,会即时抛出Queue.Full卓殊。

 

get方法能够从队列读取并且删除二个成分。同样,get方法有五个可选参数:blocked和timeout。若是blocked为True(默许值),何况timeout为正在,那么在等待时间内并未有取到任何因素,会抛出Queue.Empty极度。假使blocked为False,有二种情形存在,假诺Queue有三个值可用,则即刻再次回到该值,不然,假如队列为空,则立刻抛出Queue.Empty至极。Queue的一段示例代码:

import multiprocessing

def writer_proc(q):      
    try:         
        q.put(1, block = False) 
    except:         
        pass   

def reader_proc(q):      
    try:         
        print q.get(block = False) 
    except:         
        pass

if __name__ == "__main__":
    q = multiprocessing.Queue()
    writer = multiprocessing.Process(target=writer_proc, args=(q,))  
    writer.start()   

    reader = multiprocessing.Process(target=reader_proc, args=(q,))  
    reader.start()  

    reader.join()  
    writer.join()

结果

1

 

2. Lock

当多少个进程供给访谈分享财富的时候,Lock能够用来幸免访谈的争辨。

澳门新萄京官方网站 13

import multiprocessing
import sys

def worker_with(lock, f):
    with lock:
        fs = open(f, 'a ')
        n = 10
        while n > 1:
            fs.write("Lockd acquired via withn")
            n -= 1
        fs.close()

def worker_no_with(lock, f):
    lock.acquire()
    try:
        fs = open(f, 'a ')
        n = 10
        while n > 1:
            fs.write("Lock acquired directlyn")
            n -= 1
        fs.close()
    finally:
        lock.release()

if __name__ == "__main__":
    lock = multiprocessing.Lock()
    f = "file.txt"
    w = multiprocessing.Process(target = worker_with, args=(lock, f))
    nw = multiprocessing.Process(target = worker_no_with, args=(lock, f))
    w.start()
    nw.start()
    print("end")

澳门新萄京官方网站 14

结果(输出文件)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly

 

is_live()用来查阅进度的动静

1. Process

创建进度的类:Process([group [, target [, name [, args [, kwargs]]]]]),target表示调用对象,args表示调用对象的地点参数元组。kwargs表示调用对象的字典。name为别称。group实质上不选取。
方法:is_alive()、join([timeout])、run()、start()、terminate()。当中,Process以start()运营有个别进度。

属性:authkey、daemon(要通过start()设置)、exitcode(进度在运营时为None、如若为–N,表示被复信号N结束)、name、pid。在那之中daemon是父进程终止后自动终止,且本人不可能产生新进度,必得在start()从前设置。

 

例1.1:创造函数并将其看作单个进度

澳门新萄京官方网站 15

import multiprocessing
import time

def worker(interval):
    n = 5
    while n > 0:
        print("The time is {0}".format(time.ctime()))
        time.sleep(interval)
        n -= 1

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.start()
    print "p.pid:", p.pid
    print "p.name:", p.name
    print "p.is_alive:", p.is_alive()

澳门新萄京官方网站 16

结果

1
2
3
4
5
6
7
8
p.pid: 8736
p.name: Process-1
p.is_alive: True
The time is Tue Apr 21 20:55:12 2015
The time is Tue Apr 21 20:55:15 2015
The time is Tue Apr 21 20:55:18 2015
The time is Tue Apr 21 20:55:21 2015
The time is Tue Apr 21 20:55:24 2015

 

例1.2:成立函数并将其视作八个经过

澳门新萄京官方网站 17

import multiprocessing
import time

def worker_1(interval):
    print "worker_1"
    time.sleep(interval)
    print "end worker_1"

def worker_2(interval):
    print "worker_2"
    time.sleep(interval)
    print "end worker_2"

def worker_3(interval):
    print "worker_3"
    time.sleep(interval)
    print "end worker_3"

if __name__ == "__main__":
    p1 = multiprocessing.Process(target = worker_1, args = (2,))
    p2 = multiprocessing.Process(target = worker_2, args = (3,))
    p3 = multiprocessing.Process(target = worker_3, args = (4,))

    p1.start()
    p2.start()
    p3.start()

    print("The number of CPU is:"   str(multiprocessing.cpu_count()))
    for p in multiprocessing.active_children():
        print("child   p.name:"   p.name   "tp.id"   str(p.pid))
    print "END!!!!!!!!!!!!!!!!!"

澳门新萄京官方网站 18

结果

1
2
3
4
5
6
7
8
9
10
11
The number of CPU is:4
child   p.name:Process-3    p.id7992
child   p.name:Process-2    p.id4204
child   p.name:Process-1    p.id6380
END!!!!!!!!!!!!!!!!!
worker_1
worker_3
worker_2
end worker_1
end worker_2
end worker_3

 

例1.3:将经过定义为类

澳门新萄京官方网站 19

import multiprocessing
import time

class ClockProcess(multiprocessing.Process):
    def __init__(self, interval):
        multiprocessing.Process.__init__(self)
        self.interval = interval

    def run(self):
        n = 5
        while n > 0:
            print("the time is {0}".format(time.ctime()))
            time.sleep(self.interval)
            n -= 1

if __name__ == '__main__':
    p = ClockProcess(3)
    p.start()      

澳门新萄京官方网站 20

:进度p调用start()时,自动调用run()

结果

1
2
3
4
5
the time is Tue Apr 21 20:31:30 2015
the time is Tue Apr 21 20:31:33 2015
the time is Tue Apr 21 20:31:36 2015
the time is Tue Apr 21 20:31:39 2015
the time is Tue Apr 21 20:31:42 2015

 

例1.4:daemon程序相比较结果

#1.4-1 不加daemon属性

澳门新萄京官方网站 21

import multiprocessing
import time

def worker(interval):
    print("work start:{0}".format(time.ctime()));
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()));

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.start()
    print "end!"

澳门新萄京官方网站 22

结果

1
2
3
end!
work start:Tue Apr 21 21:29:10 2015
work end:Tue Apr 21 21:29:13 2015

#1.4-2 加上daemon属性

澳门新萄京官方网站 23

import multiprocessing
import time

def worker(interval):
    print("work start:{0}".format(time.ctime()));
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()));

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.daemon = True
    p.start()
    print "end!"

澳门新萄京官方网站 24

结果

1
end!

:因子进度设置了daemon属性,主进程甘休,它们就趁早甘休了。

#1.4-3 设置daemon施行完甘休的方式

澳门新萄京官方网站 25

import multiprocessing
import time

def worker(interval):
    print("work start:{0}".format(time.ctime()));
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()));

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.daemon = True
    p.start()
    p.join()
    print "end!"

澳门新萄京官方网站 26

结果

1
2
3
work start:Tue Apr 21 22:16:32 2015
work end:Tue Apr 21 22:16:35 2015
end!

 

归来最上部

is_live()用来查阅进程的景观

6. Pipe

Pipe方法重返(conn1, conn2)代表叁个管道的七个端。Pipe方法有duplex参数,假诺duplex参数为True(私下认可值),那么这么些管道是全双工格局,也即是说conn1和conn2均可收发。duplex为False,conn1只担任接受新闻,conn2只承担发送音信。

 

send和recv方法分别是发送和经受音讯的措施。比如,在全双工方式下,能够调用conn1.send出殡和埋葬新闻,conn1.recv接收音信。若无新闻可吸纳,recv方法会一向不通。如果管道已经被关门,那么recv方法会抛出EOFError。

import multiprocessing
import time

def proc1(pipe):
    while True:
        for i in xrange(10000):
            print "send: %s" %(i)
            pipe.send(i)
            time.sleep(1)

def proc2(pipe):
    while True:
        print "proc2 rev:", pipe.recv()
        time.sleep(1)

def proc3(pipe):
    while True:
        print "PROC3 rev:", pipe.recv()
        time.sleep(1)

if __name__ == "__main__":
    pipe = multiprocessing.Pipe()
    p1 = multiprocessing.Process(target=proc1, args=(pipe[0],))
    p2 = multiprocessing.Process(target=proc2, args=(pipe[1],))
    #p3 = multiprocessing.Process(target=proc3, args=(pipe[1],))

    p1.start()
    p2.start()
    #p3.start()

    p1.join()
    p2.join()
    #p3.join()

结果

澳门新萄京官方网站 27

 

3. Semaphore

Semaphore用来支配成对分享能源的拜候数量,比如池的最菲尼克斯接数。

澳门新萄京官方网站 28

import multiprocessing
import time

def worker(s, i):
    s.acquire()
    print(multiprocessing.current_process().name   "acquire");
    time.sleep(i)
    print(multiprocessing.current_process().name   "releasen");
    s.release()

if __name__ == "__main__":
    s = multiprocessing.Semaphore(2)
    for i in range(5):
        p = multiprocessing.Process(target = worker, args=(s, i*2))
        p.start()

澳门新萄京官方网站 29

结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Process-1acquire
Process-1release
 
Process-2acquire
Process-3acquire
Process-2release
 
Process-5acquire
Process-3release
 
Process-4acquire
Process-5release
 
Process-4release

 

terminate()用来终止进度。

2. Lock

当八个经过必要访问分享能源的时候,Lock能够用来制止访谈的冲突。

澳门新萄京官方网站 30

import multiprocessing
import sys

def worker_with(lock, f):
    with lock:
        fs = open(f, 'a ')
        n = 10
        while n > 1:
            fs.write("Lockd acquired via withn")
            n -= 1
        fs.close()

def worker_no_with(lock, f):
    lock.acquire()
    try:
        fs = open(f, 'a ')
        n = 10
        while n > 1:
            fs.write("Lock acquired directlyn")
            n -= 1
        fs.close()
    finally:
        lock.release()

if __name__ == "__main__":
    lock = multiprocessing.Lock()
    f = "file.txt"
    w = multiprocessing.Process(target = worker_with, args=(lock, f))
    nw = multiprocessing.Process(target = worker_no_with, args=(lock, f))
    w.start()
    nw.start()
    print "end"

澳门新萄京官方网站 31

结果(输出文件)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly

 

回来顶端

terminate()用来终止进度。

7. Pool

在应用Python进行系统管理的时候,极度是同期操作四个文件目录,或许远程调整多台主机,并行操作能够省去多量的时日。当被操作对象数目相当的小时,能够平素利用multiprocessing中的Process动态成生四个经过,拾七个幸而,但只倘诺无数个,上千个对象,手动的去界定进程数量却又太过繁琐,此时能够公布进度池的作用。
Pool能够提供钦命数量的长河,供顾客调用,当有新的伸手提交到pool中时,假诺池还未曾满,那么就能够创制三个新的进度用来实践该诉求;但假若池中的进度数一度高达规定最大值,那么该央求就能等待,直到池中有经过甘休,才会制造新的进度来它。

 

例7.1:使用进程池(非阻塞)

#coding: utf-8
import multiprocessing
import time

def func(msg):
    print "msg:", msg
    time.sleep(3)
    print "end"

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes = 3)
    for i in xrange(4):
        msg = "hello %d" %(i)
        pool.apply_async(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去

    print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"
    pool.close()
    pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
    print "Sub-process(es) done."

一遍举办结果

mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~ello 0

msg: hello 1
msg: hello 2
end
msg: hello 3
end
end
end
Sub-process(es) done.

函数解释:

  • apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的(明白区别,看例1例2结果分歧)
  • close()    关闭pool,使其不在接受新的天职。
  • terminate()    截止工作经过,不在管理未到位的天职。
  • join()    主进度阻塞,等待子进度的淡出, join方法要在close或terminate之后采纳。

实施表达:成立二个经过池pool,并设定进度的数量为3,xrange(4)会挨个发出八个指标[0, 1, 2, 4],多个指标被交付到pool中,因pool钦定进度数为3,所以0、1、2会直接送到过程中实施,当个中一个试行到位后才空出二个进度管理对象3,所以会产出出口“msg: hello 3”现身在"end"后。因为为非阻塞,主函数会本身实行自个的,不搭理进程的推行,所以运转完for循环后直接出口“mMsg: hark~ Mark~ Mark~~~~~~”,主程序在pool.join()处等待各类进度的告竣。

 

例7.2:使用进程池(阻塞)

#coding: utf-8
import multiprocessing
import time

def func(msg):
    print "msg:", msg
    time.sleep(3)
    print "end"

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes = 3)
    for i in xrange(4):
        msg = "hello %d" %(i)
        pool.apply(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去

    print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"
    pool.close()
    pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
    print "Sub-process(es) done."

叁次实行的结果

msg: hello 0
end
msg: hello 1
end
msg: hello 2
end
msg: hello 3
end
Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
Sub-process(es) done.

  

例7.3:使用进度池,并关心结果

import multiprocessing
import time

def func(msg):
    print "msg:", msg
    time.sleep(3)
    print "end"
    return "done"   msg

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=4)
    result = []
    for i in xrange(3):
        msg = "hello %d" %(i)
        result.append(pool.apply_async(func, (msg, )))
    pool.close()
    pool.join()
    for res in result:
        print ":::", res.get()
    print "Sub-process(es) done."

二回实行结果

msg: hello 0
msg: hello 1
msg: hello 2
end
end
end
::: donehello 0
::: donehello 1
::: donehello 2
Sub-process(es) done.

 

例7.4:使用多少个进度池

#coding: utf-8
import multiprocessing
import os, time, random

def Lee():
    print "nRun task Lee-%s" %(os.getpid()) #os.getpid()获取当前的进程的ID
    start = time.time()
    time.sleep(random.random() * 10) #random.random()随机生成0-1之间的小数
    end = time.time()
    print 'Task Lee, runs %0.2f seconds.' %(end - start)

def Marlon():
    print "nRun task Marlon-%s" %(os.getpid())
    start = time.time()
    time.sleep(random.random() * 40)
    end=time.time()
    print 'Task Marlon runs %0.2f seconds.' %(end - start)

def Allen():
    print "nRun task Allen-%s" %(os.getpid())
    start = time.time()
    time.sleep(random.random() * 30)
    end = time.time()
    print 'Task Allen runs %0.2f seconds.' %(end - start)

def Frank():
    print "nRun task Frank-%s" %(os.getpid())
    start = time.time()
    time.sleep(random.random() * 20)
    end = time.time()
    print 'Task Frank runs %0.2f seconds.' %(end - start)

if __name__=='__main__':
    function_list=  [Lee, Marlon, Allen, Frank] 
    print "parent process %s" %(os.getpid())

    pool=multiprocessing.Pool(4)
    for func in function_list:
        pool.apply_async(func)     #Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中

    print 'Waiting for all subprocesses done...'
    pool.close()
    pool.join()    #调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束
    print 'All subprocesses done.'

叁遍举行结果

parent process 7704

Waiting for all subprocesses done...
Run task Lee-6948

Run task Marlon-2896

Run task Allen-7304

Run task Frank-3052
Task Lee, runs 1.59 seconds.
Task Marlon runs 8.48 seconds.
Task Frank runs 15.68 seconds.
Task Allen runs 18.08 seconds.
All subprocesses done.

4. Event

伊夫nt用来贯彻进程间协同通讯。

澳门新萄京官方网站 32

import multiprocessing
import time

def wait_for_event(e):
    print("wait_for_event: starting")
    e.wait()
    print("wairt_for_event: e.is_set()->"   str(e.is_set()))

def wait_for_event_timeout(e, t):
    print("wait_for_event_timeout:starting")
    e.wait(t)
    print("wait_for_event_timeout:e.is_set->"   str(e.is_set()))

if __name__ == "__main__":
    e = multiprocessing.Event()
    w1 = multiprocessing.Process(name = "block",
            target = wait_for_event,
            args = (e,))

    w2 = multiprocessing.Process(name = "non-block",
            target = wait_for_event_timeout,
            args = (e, 2))
    w1.start()
    w2.start()

    time.sleep(3)

    e.set()
    print("main: event is set")

澳门新萄京官方网站 33

结果

1
2
3
4
5
wait_for_event: starting
wait_for_event_timeout:starting
wait_for_event_timeout:e.is_set->False
main: event is set
wairt_for_event: e.is_set()->True

 

单进程:

3. Semaphore

Semaphore用来支配对共享能源的拜谒数量,比方池的最亚松森接数。

澳门新萄京官方网站 34

import multiprocessing
import time

def worker(s, i):
    s.acquire()
    print(multiprocessing.current_process().name   "acquire");
    time.sleep(i)
    print(multiprocessing.current_process().name   "releasen");
    s.release()

if __name__ == "__main__":
    s = multiprocessing.Semaphore(2)
    for i in range(5):
        p = multiprocessing.Process(target = worker, args=(s, i*2))
        p.start()

澳门新萄京官方网站 35

结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Process-1acquire
Process-1release
 
Process-2acquire
Process-3acquire
Process-2release
 
Process-5acquire
Process-3release
 
Process-4acquire
Process-5release
 
Process-4release

 

回去顶上部分

单进程:

5. Queue

Queue是多进度安全的行列,能够选择Queue完成多进程之间的多少传递。put方法用以插入数据到行列中,put方法还有七个可选参数:blocked和timeout。假诺blocked为True(暗中同意值),并且timeout为正值,该方法会阻塞timeout钦赐的年华,直到该队列有盈余的半空中。倘若超时,会抛出Queue.Full分外。借使blocked为False,但该Queue已满,会即时抛出Queue.Full至极。

 

get方法能够从队列读取况兼删除二个因素。一样,get方法有五个可选参数:blocked和timeout。倘若blocked为True(暗中同意值),况且timeout为正在,那么在守候时间内未有取到任何因素,会抛出Queue.Empty十分。假若blocked为False,有两种状态存在,假诺Queue有二个值可用,则随即回到该值,否则,假若队列为空,则立即抛出Queue.Empty十分。Queue的一段示例代码:

澳门新萄京官方网站 36

import multiprocessing

def writer_proc(q):      
    try:         
        q.put(1, block = False) 
    except:         
        pass   

def reader_proc(q):      
    try:         
        print(q.get(block = False) )
    except:         
        pass

if __name__ == "__main__":
    q = multiprocessing.Queue()
    writer = multiprocessing.Process(target=writer_proc, args=(q,))  
    writer.start()   

    reader = multiprocessing.Process(target=reader_proc, args=(q,))  
    reader.start()  

    reader.join()  
    writer.join()

澳门新萄京官方网站 37

结果

1
1

 

 1 import multiprocessing
 2 import time
 3 def worker(interval):
 4     n=5
 5     while n > 0:
 6         print("The time is {0}".format(time.ctime()))
 7         time.sleep(interval)
 8         n -=1
 9 
10 if __name__ == "__main__":
11     p = multiprocessing.Process(target=worker,args=(3,))
12     p.start()
13     print("p.pid:",p.pid)
14     print("p.name:",p.name)
15     print("p.is_alive:",p.is_alive())

4. Event

伊芙nt用来完成进程间一块通讯。

澳门新萄京官方网站 38

import multiprocessing
import time

def wait_for_event(e):
    print("wait_for_event: starting")
    e.wait()
    print("wairt_for_event: e.is_set()->"   str(e.is_set()))

def wait_for_event_timeout(e, t):
    print("wait_for_event_timeout:starting")
    e.wait(t)
    print("wait_for_event_timeout:e.is_set->"   str(e.is_set()))

if __name__ == "__main__":
    e = multiprocessing.Event()
    w1 = multiprocessing.Process(name = "block",
            target = wait_for_event,
            args = (e,))

    w2 = multiprocessing.Process(name = "non-block",
            target = wait_for_event_timeout,
            args = (e, 2))
    w1.start()
    w2.start()

    time.sleep(3)

    e.set()
    print("main: event is set")

澳门新萄京官方网站 39

结果

1
2
3
4
5
wait_for_event: starting
wait_for_event_timeout:starting
wait_for_event_timeout:e.is_set->False
main: event is set
wairt_for_event: e.is_set()->True

 

回来顶上部分

 1 import multiprocessing 2 import time 3 def worker: 4     n=5 5     while n > 0: 6         print("The time is {0}".format(time.ctime 7         time.sleep 8         n -=1 9 10 if __name__ == "__main__":11     p = multiprocessing.Process(target=worker,args=(3,))12     p.start()13     print("p.pid:",p.pid)14     print("p.name:",p.name)15     print("p.is_alive:",p.is_alive

6. Pipe

Pipe方法再次回到(conn1, conn2)代表贰个管道的八个端。Pipe方法有duplex参数,假诺duplex参数为True(私下认可值),那么这么些管道是全双工方式,也正是说conn1和conn2均可收发。duplex为False,conn1只担当接受消息,conn2只承担发送新闻。

 

send和recv方法分别是出殡和埋葬和承受消息的法子。举个例子,在全双工方式下,能够调用conn1.send发送消息,conn1.recv接收新闻。若无新闻可收取,recv方法会一贯不通。要是管道已经被关闭,那么recv方法会抛出EOFError。

澳门新萄京官方网站 40

import multiprocessing
import time

def proc1(pipe):
    while True:
        for i in xrange(10000):
            print("send: %s" %(i))
            pipe.send(i)
            time.sleep(1)

def proc2(pipe):
    while True:
        print("proc2 rev:", pipe.recv())
        time.sleep(1)

def proc3(pipe):
    while True:
        print("PROC3 rev:", pipe.recv())
        time.sleep(1)

if __name__ == "__main__":
    pipe = multiprocessing.Pipe()
    p1 = multiprocessing.Process(target=proc1, args=(pipe[0],))
    p2 = multiprocessing.Process(target=proc2, args=(pipe[1],))
    #p3 = multiprocessing.Process(target=proc3, args=(pipe[1],))

    p1.start()
    p2.start()
    #p3.start()

    p1.join()
    p2.join()
    #p3.join()

澳门新萄京官方网站 41

结果

澳门新萄京官方网站 42

 

多进程:

5. Queue

Queue是多进程安全的行列,能够利用Queue完结多进度之间的数码传递。put方法用以插入数据到行列中,put方法还大概有四个可选参数:blocked和timeout。假设blocked为True(私下认可值),何况timeout为正值,该方法会阻塞timeout钦点的年华,直到该队列有结余的半空中。要是超时,会抛出Queue.Full十分。假诺blocked为False,但该Queue已满,会立即抛出Queue.Full分外。

 

get方法能够从队列读取何况删除二个要素。一样,get方法有多个可选参数:blocked和timeout。要是blocked为True(暗中同意值),而且timeout为正值,那么在守候时间内未有取到任何因素,会抛出Queue.Empty万分。若是blocked为False,有三种状态存在,假如Queue有叁个值可用,则随即赶回该值,不然,若是队列为空,则立即抛出Queue.Empty非凡。Queue的一段示例代码:

澳门新萄京官方网站 43

import multiprocessing

def writer_proc(q):      
    try:         
        q.put(1, block = False) 
    except:         
        pass   

def reader_proc(q):      
    try:         
        print q.get(block = False) 
    except:         
        pass

if __name__ == "__main__":
    q = multiprocessing.Queue()
    writer = multiprocessing.Process(target=writer_proc, args=(q,))  
    writer.start()   

    reader = multiprocessing.Process(target=reader_proc, args=(q,))  
    reader.start()  

    reader.join()  
    writer.join()

澳门新萄京官方网站 44

结果

1
1

 

回到最上部

多进程:

7. Pool

在使用Python进行系统管理的时候,特别是还要操作四个文件目录,或然远程序调整制多台主机,并行操作能够节省一大波的时日。当被操作对象数目相当的小时,能够间接采取multiprocessing中的Process动态成生多个进度,拾九个幸好,但借使是相当的多个,上千个对象,手动的去限制进程数量却又太过繁琐,此时得以发挥进程池的机能。
Pool能够提供钦点数量的长河,供客户调用,当有新的须要提交到pool中时,如若池还未曾满,那么就能成立三个新的经过用来施行该央浼;但一旦池中的进程数一度高达规定最大值,那么该央求就能等待,直到池中有进程甘休,才会创立新的进程来它。

 

例7.1:使用进度池(非阻塞)

澳门新萄京官方网站 45

#coding: utf-8
import multiprocessing
import time

def func(msg):
    print("msg:", msg)
    time.sleep(3)
    print "end"

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes = 3)
    for i in xrange(4):
        msg = "hello %d" %(i)
        pool.apply_async(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去

    print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
    pool.close()
    pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
    print("Sub-process(es) done.")

澳门新萄京官方网站 46

三回施行结果

1
2
3
4
5
6
7
8
9
10
mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~ello 0
 
msg: hello 1
msg: hello 2
end
msg: hello 3
end
end
end
Sub-process(es) done.

函数解释:

  • apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的(精通差距,看例1例2结出差距)
  • close()    关闭pool,使其不在接受新的任务。
  • terminate()    甘休职业进程,不在管理未产生的义务。
  • join()    主进度阻塞,等待子进程的脱离, join方法要在close或terminate之后采纳。

举行表达:成立二个进程池pool,并设定进度的数目为3,xrange(4)会挨个发出七个目的[0, 1, 2, 4],四个目的被提交到pool中,因pool内定进度数为3,所以0、1、2会直接送到进程中试行,当当中二个施行到位后才空出一个进程处理对象3,所以会见世出口“msg: hello 3”出现在"end"后。因为为非阻塞,主函数会本身施行自个的,不搭理进度的进行,所以运营完for循环后向来出口“mMsg: hark~ Mark~ Mark~~~~~~”,主程序在pool.join()处等待种种进度的了断。

 

例7.2:使用进度池(阻塞)

澳门新萄京官方网站 47

#coding: utf-8
import multiprocessing
import time

def func(msg):
    print("msg:", msg)
    time.sleep(3)
    print("end")

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes = 3)
    for i in xrange(4):
        msg = "hello %d" %(i)
        pool.apply(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去

    print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
    pool.close()
    pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
    print("Sub-process(es) done.")

澳门新萄京官方网站 48

二回施行的结果

1
2
3
4
5
6
7
8
9
10
msg: hello 0
end
msg: hello 1
end
msg: hello 2
end
msg: hello 3
end
Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
Sub-process(es) done.

  

例7.3:使用进度池,并关心结果

澳门新萄京官方网站 49

import multiprocessing
import time

def func(msg):
    print("msg:", msg)
    time.sleep(3)
    print("end")
    return "done"   msg

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=4)
    result = []
    for i in xrange(3):
        msg = "hello %d" %(i)
        result.append(pool.apply_async(func, (msg, )))
    pool.close()
    pool.join()
    for res in result:
        print(":::", res.get())
    print(Sub-process(es) done.")

澳门新萄京官方网站 50

二次推行结果

1
2
3
4
5
6
7
8
9
10
msg: hello 0
msg: hello 1
msg: hello 2
end
end
end
::: donehello 0
::: donehello 1
::: donehello 2
Sub-process(es) done.

 

例7.4:使用八个进程池

澳门新萄京官方网站 51

#coding: utf-8
import multiprocessing
import os, time, random

def Lee():
    print("nRun task Lee-%s" %(os.getpid())) #os.getpid()获取当前的进程的ID
    start = time.time()
    time.sleep(random.random() * 10) #random.random()随机生成0-1之间的小数
    end = time.time()
    print('Task Lee, runs %0.2f seconds.' %(end - start))

def Marlon():
    print("nRun task Marlon-%s" %(os.getpid()))
    start = time.time()
    time.sleep(random.random() * 40)
    end=time.time()
    print('Task Marlon runs %0.2f seconds.' %(end - start))

def Allen():
    print("nRun task Allen-%s" %(os.getpid()))
    start = time.time()
    time.sleep(random.random() * 30)
    end = time.time()
    print('Task Allen runs %0.2f seconds.' %(end - start))

def Frank():
    print("nRun task Frank-%s" %(os.getpid()))
    start = time.time()
    time.sleep(random.random() * 20)
    end = time.time()
    print('Task Frank runs %0.2f seconds.' %(end - start))

if __name__=='__main__':
    function_list=  [Lee, Marlon, Allen, Frank] 
    print("parent process %s" %(os.getpid()))

    pool=multiprocessing.Pool(4)
    for func in function_list:
        pool.apply_async(func)     #Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中

    print('Waiting for all subprocesses done...')
    pool.close()
    pool.join()    #调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束
    print('All subprocesses done.')

澳门新萄京官方网站 52

贰次推行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
parent process 7704
 
Waiting for all subprocesses done...
Run task Lee-6948
 
Run task Marlon-2896
 
Run task Allen-7304
 
Run task Frank-3052
Task Lee, runs 1.59 seconds.
Task Marlon runs 8.48 seconds.
Task Frank runs 15.68 seconds.
Task Allen runs 18.08 seconds.
All subprocesses done.
 1 import multiprocessing
 2 import time
 3 
 4 def worker_1(interval):
 5     print ("worker_1")
 6     time.sleep(interval)
 7     print ("end worker_1")
 8 
 9 def worker_2(interval):
10     print ("worker_2")
11     time.sleep(interval)
12     print ("end worker_2")
13 
14 def worker_3(interval):
15     print ("worker_3")
16     time.sleep(interval)
17     print ("end worker_3")
18 
19 if __name__ == "__main__":
20     p1 = multiprocessing.Process(target = worker_1, args = (2,))
21     p2 = multiprocessing.Process(target = worker_2, args = (3,))
22     p3 = multiprocessing.Process(target = worker_3, args = (4,))
23 
24     p1.start()
25     p2.start()
26     p3.start()
27     # 用来获得当前的CPU的核数,可以用来设置接下来子进程的个数。
28     # 用来获得当前所有的子进程,包括daemon和非daemon子进程。
29     # p.name,p.pid分别表示进程的名字,进程id。 
30     print("The number of CPU is:"   str(multiprocessing.cpu_count()))
31     for p in multiprocessing.active_children():
32         print("child   p.name:"   p.name   "tp.id"   str(p.pid))
33     print ("END!!!!!!!!!!!!!!!!!")

6. Pipe

Pipe方法重回(conn1, conn2)代表叁个管道的四个端。Pipe方法有duplex参数,假若duplex参数为True(默许值),那么这几个管道是全双工格局,也等于说conn1和conn2均可收发。duplex为False,conn1只担负接受新闻,conn2只承担发送新闻。

 

send和recv方法分别是发送和收受音信的主意。举个例子,在全双工方式下,能够调用conn1.send出殡和埋葬音讯,conn1.recv接收消息。如果未有新闻可吸收接纳,recv方法会一向不通。如果管道已经被关门,那么recv方法会抛出EOFError。

澳门新萄京官方网站 53

import multiprocessing
import time

def proc1(pipe):
    while True:
        for i in xrange(10000):
            print "send: %s" %(i)
            pipe.send(i)
            time.sleep(1)

def proc2(pipe):
    while True:
        print "proc2 rev:", pipe.recv()
        time.sleep(1)

def proc3(pipe):
    while True:
        print "PROC3 rev:", pipe.recv()
        time.sleep(1)

if __name__ == "__main__":
    pipe = multiprocessing.Pipe()
    p1 = multiprocessing.Process(target=proc1, args=(pipe[0],))
    p2 = multiprocessing.Process(target=proc2, args=(pipe[1],))
    #p3 = multiprocessing.Process(target=proc3, args=(pipe[1],))

    p1.start()
    p2.start()
    #p3.start()

    p1.join()
    p2.join()
    #p3.join()

澳门新萄京官方网站 54

结果

澳门新萄京官方网站 55

 

回到最上部

 1 import multiprocessing 2 import time 3  4 def worker_1: 5     print ("worker_1") 6     time.sleep 7     print ("end worker_1") 8  9 def worker_2:10     print ("worker_2")11     time.sleep12     print ("end worker_2")13 14 def worker_3:15     print ("worker_3")16     time.sleep17     print ("end worker_3")18 19 if __name__ == "__main__":20     p1 = multiprocessing.Process(target = worker_1, args = (2,))21     p2 = multiprocessing.Process(target = worker_2, args = (3,))22     p3 = multiprocessing.Process(target = worker_3, args = (4,))23 24     p1.start()25     p2.start()26     p3.start()27     # 用来获得当前的CPU的核数,可以用来设置接下来子进程的个数。28     # 用来获得当前所有的子进程,包括daemon和非daemon子进程。29     # p.name,p.pid分别表示进程的名字,进程id。 30     print("The number of CPU is:"   str(multiprocessing.cpu_count31     for p in multiprocessing.active_children():32         print("child   p.name:"   p.name   "tp.id"   str33     print ("END!!!!!!!!!!!!!!!!!")

将经过定义为类:

7. Pool

在选取Python实行系统管理的时候,极其是同不常间操作多少个文件目录,恐怕远程调整多台主机,并行操作能够省去多量的岁月。当被操作对象数目相当小时,能够直接利用multiprocessing中的Process动态成生八个经过,十多少个好在,但倘如若过四个,上千个对象,手动的去界定进度数量却又太过繁琐,此时能够发布进度池的效用。
Pool能够提供钦赐数量的进度,供客商调用,当有新的恳求提交到pool中时,若是池还从未满,那么就能够创设四个新的历程用来实行该央浼;但万一池中的进度数一度达成规定最大值,那么该央浼就能够等待,直到池中有进度甘休,才会创建新的进度来它。

 

例7.1:使用过程池

澳门新萄京官方网站 56

#coding: utf-8
import multiprocessing
import time

def func(msg):
    print "msg:", msg
    time.sleep(3)
    print "end"

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes = 3)
    for i in xrange(4):
        msg = "hello %d" %(i)
        pool.apply_async(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去

    print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"
    pool.close()
    pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
    print "Sub-process(es) done."

澳门新萄京官方网站 57

一遍实践结果

1
2
3
4
5
6
7
8
9
10
mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~ello 0
 
msg: hello 1
msg: hello 2
end
msg: hello 3
end
end
end
Sub-process(es) done.

函数解释:

  • apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的(精通不一样,看例1例2结果不一样)
  • close()    关闭pool,使其不在接受新的职分。
  • terminate()    停止职业历程,不在管理未成功的职分。
  • join()    主进程阻塞,等待子进度的退出, join方法要在close或terminate之后采纳。

施行表达:创造贰个历程池pool,并设定进度的数码为3,xrange(4)会相继发出八个目标[0, 1, 2, 4],三个指标被交付到pool中,因pool内定进度数为3,所以0、1、2会直接送到进程中试行,当当中八个推行到位后才空出一个经过管理对象3,所以会见世出口“msg: hello 3”出现在"end"后。因为为非阻塞,主函数会自身实践自个的,不搭理进度的实施,所以运维完for循环后一向出口“mMsg: hark~ Mark~ Mark~~~~~~”,主程序在pool.join()处等待各类进度的收尾。

 

例7.2:使用进程池(阻塞)

澳门新萄京官方网站 58

#coding: utf-8
import multiprocessing
import time

def func(msg):
    print "msg:", msg
    time.sleep(3)
    print "end"

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes = 3)
    for i in xrange(4):
        msg = "hello %d" %(i)
        pool.apply(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去

    print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"
    pool.close()
    pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
    print "Sub-process(es) done."

澳门新萄京官方网站 59

一次实践的结果

1
2
3
4
5
6
7
8
9
10
msg: hello 0
end
msg: hello 1
end
msg: hello 2
end
msg: hello 3
end
Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
Sub-process(es) done.

  

例7.3:使用进度池,并关注结果

澳门新萄京官方网站 60

import multiprocessing
import time

def func(msg):
    print "msg:", msg
    time.sleep(3)
    print "end"
    return "done"   msg

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=4)
    result = []
    for i in xrange(3):
        msg = "hello %d" %(i)
        result.append(pool.apply_async(func, (msg, )))
    pool.close()
    pool.join()
    for res in result:
        print ":::", res.get()
    print "Sub-process(es) done."

澳门新萄京官方网站 61

三遍施行结果

1
2
3
4
5
6
7
8
9
10
msg: hello 0
msg: hello 1
msg: hello 2
end
end
end
::: donehello 0
::: donehello 1
::: donehello 2
Sub-process(es) done.

 

例7.4:使用八个进度池

澳门新萄京官方网站 62

#coding: utf-8
import multiprocessing
import os, time, random

def Lee():
    print "nRun task Lee-%s" %(os.getpid()) #os.getpid()获取当前的进程的ID
    start = time.time()
    time.sleep(random.random() * 10) #random.random()随机生成0-1之间的小数
    end = time.time()
    print 'Task Lee, runs %0.2f seconds.' %(end - start)

def Marlon():
    print "nRun task Marlon-%s" %(os.getpid())
    start = time.time()
    time.sleep(random.random() * 40)
    end=time.time()
    print 'Task Marlon runs %0.2f seconds.' %(end - start)

def Allen():
    print "nRun task Allen-%s" %(os.getpid())
    start = time.time()
    time.sleep(random.random() * 30)
    end = time.time()
    print 'Task Allen runs %0.2f seconds.' %(end - start)

def Frank():
    print "nRun task Frank-%s" %(os.getpid())
    start = time.time()
    time.sleep(random.random() * 20)
    end = time.time()
    print 'Task Frank runs %0.2f seconds.' %(end - start)

if __name__=='__main__':
    function_list=  [Lee, Marlon, Allen, Frank] 
    print "parent process %s" %(os.getpid())

    pool=multiprocessing.Pool(4)
    for func in function_list:
        pool.apply_async(func)     #Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中

    print 'Waiting for all subprocesses done...'
    pool.close()
    pool.join()    #调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束
    print 'All subprocesses done.'

澳门新萄京官方网站 63

三回施行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
parent process 7704
 
Waiting for all subprocesses done...
Run task Lee-6948
 
Run task Marlon-2896
 
Run task Allen-7304
 
Run task Frank-3052
Task Lee, runs 1.59 seconds.
Task Marlon runs 8.48 seconds.
Task Frank runs 15.68 seconds.
Task Allen runs 18.08 seconds.
All subprocesses done.

将经过定义为类:

 1 import multiprocessing
 2 import time
 3 
 4 class ClockProcess(multiprocessing.Process):
 5     def __init__(self, interval):
 6         multiprocessing.Process.__init__(self)
 7         self.interval = interval
 8 
 9     def run(self):
10         n = 5
11         while n > 0:
12             print("the time is {0}".format(time.ctime()))
13             time.sleep(self.interval)
14             n -= 1
15 
16 if __name__ == '__main__':
17     p = ClockProcess(3)
18     p.start()
 1 import multiprocessing 2 import time 3  4 class ClockProcess(multiprocessing.Process): 5     def __init__(self, interval): 6         multiprocessing.Process.__init__ 7         self.interval = interval 8  9     def run:10         n = 511         while n > 0:12             print("the time is {0}".format(time.ctime13             time.sleep(self.interval)14             n -= 115 16 if __name__ == '__main__':17     p = ClockProcess(3)18     p.start()

daemon程序相比结果:
1.不加daemon

daemon程序比较结果:
1.不加daemon

 1 import multiprocessing
 2 import time
 3 
 4 def worker(interval):
 5     print("work start:{0}".format(time.ctime()));
 6     time.sleep(interval)
 7     print("work end:{0}".format(time.ctime()));
 8 
 9 if __name__ == "__main__":
10     p = multiprocessing.Process(target = worker, args = (3,))
11     p.start()
12     print ("end!")
13 
14 #程序运行结果
15 '''
16 end!
17 work start:Wed Jun 28 00:07:57 2017
18 work end:Wed Jun 28 00:08:00 2017
19 '''
 1 import multiprocessing 2 import time 3  4 def worker: 5     print("work start:{0}".format(time.ctime; 6     time.sleep 7     print("work end:{0}".format(time.ctime; 8  9 if __name__ == "__main__":10     p = multiprocessing.Process(target = worker, args = (3,))11     p.start()12     print ("end!")13 14 #程序运行结果15 '''16 end!17 work start:Wed Jun 28 00:07:57 201718 work end:Wed Jun 28 00:08:00 201719 '''

2.加daemon

2.加daemon

 1 import multiprocessing
 2 import time
 3 
 4 def worker(interval):
 5     print("work start:{0}".format(time.ctime()));
 6     time.sleep(interval)
 7     print("work end:{0}".format(time.ctime()));
 8 
 9 if __name__ == "__main__":
10     p = multiprocessing.Process(target = worker, args = (3,))
11     p.daemon = True
12     p.start()
13     print ("end!")
14 
15 #程序运行结果
16 '''
17 end!
18 
19 '''
 1 import multiprocessing 2 import time 3  4 def worker: 5     print("work start:{0}".format(time.ctime; 6     time.sleep 7     print("work end:{0}".format(time.ctime; 8  9 if __name__ == "__main__":10     p = multiprocessing.Process(target = worker, args = (3,))11     p.daemon = True12     p.start()13     print ("end!")14 15 #程序运行结果16 '''17 end!18 19 '''

PS:因子进度设置了daemon属性,主进度甘休,它们就趁早甘休了。
3.安装daemon试行完截止的措施

PS:因子进度设置了daemon属性,主进程停止,它们就趁早停止了。
3.装置daemon试行完甘休的办法

 1 import multiprocessing
 2 import time
 3 
 4 def worker(interval):
 5     print("work start:{0}".format(time.ctime()));
 6     time.sleep(interval)
 7     print("work end:{0}".format(time.ctime()));
 8 
 9 if __name__ == "__main__":
10     p = multiprocessing.Process(target = worker, args = (3,))
11     p.daemon = True
12     p.start()
13     p.join()
14     print "end!"
15 
16 # 结果
17 '''
18 work start:Tue Apr 21 22:16:32 2015
19 work end:Tue Apr 21 22:16:35 2015
20 end!
21 '''
 1 import multiprocessing 2 import time 3  4 def worker: 5     print("work start:{0}".format(time.ctime; 6     time.sleep 7     print("work end:{0}".format(time.ctime; 8  9 if __name__ == "__main__":10     p = multiprocessing.Process(target = worker, args = (3,))11     p.daemon = True12     p.start()13     p.join()14     print "end!"15 16 # 结果17 '''18 work start:Tue Apr 21 22:16:32 201519 work end:Tue Apr 21 22:16:35 201520 end!21 '''

2、Lock
当多少个进度需求访问共享财富的时候,Lock能够用来防止访谈的争论。

2、Lock
当四个进度必要访谈共享能源的时候,Lock能够用来幸免访谈的争论。

 1 import multiprocessing
 2 import sys
 3 
 4 def worker_with(lock, f):
 5     with lock:
 6         fs = open(f, 'a ')
 7         n = 10
 8         while n > 1:
 9             fs.write("Lockd acquired via withn")
10             n -= 1
11         fs.close()
12         
13 def worker_no_with(lock, f):
14     lock.acquire()
15     try:
16         fs = open(f, 'a ')
17         n = 10
18         while n > 1:
19             fs.write("Lock acquired directlyn")
20             n -= 1
21         fs.close()
22     finally:
23         lock.release()
24     
25 if __name__ == "__main__":
26     lock = multiprocessing.Lock()
27     f = "file.txt"
28     w = multiprocessing.Process(target = worker_with, args=(lock, f))
29     nw = multiprocessing.Process(target = worker_no_with, args=(lock, f))
30     w.start()
31     nw.start()
32     print ("end")
 1 import multiprocessing 2 import sys 3  4 def worker_with: 5     with lock: 6         fs = open(f, 'a ') 7         n = 10 8         while n > 1: 9             fs.write("Lockd acquired via withn")10             n -= 111         fs.close()12         13 def worker_no_with:14     lock.acquire()15     try:16         fs = open(f, 'a ')17         n = 1018         while n > 1:19             fs.write("Lock acquired directlyn")20             n -= 121         fs.close()22     finally:23         lock.release()24     25 if __name__ == "__main__":26     lock = multiprocessing.Lock()27     f = "file.txt"28     w = multiprocessing.Process(target = worker_with, args=29     nw = multiprocessing.Process(target = worker_no_with, args=30     w.start()31     nw.start()32     print ("end")

3、Semaphore
塞马phore用来支配成对分享财富的会见数量,比方池的最特古西加尔巴接数。

3、Semaphore
Semaphore用来支配对分享财富的访谈数量,比方池的最瓜达拉哈拉接数。

 1 import multiprocessing
 2 import time
 3 
 4 def worker(s, i):
 5     s.acquire()
 6     print(multiprocessing.current_process().name   "acquire")
 7     time.sleep(i)
 8     print(multiprocessing.current_process().name   "releasen")
 9     s.release()
10 
11 if __name__ == "__main__":
12     s = multiprocessing.Semaphore(2)   # 限制最多有两个进程同时执行
13     for i in range(5):
14         p = multiprocessing.Process(target = worker, args=(s, i*2))
15         p.start()
 1 import multiprocessing 2 import time 3  4 def worker: 5     s.acquire() 6     print(multiprocessing.current_process().name   "acquire") 7     time.sleep 8     print(multiprocessing.current_process().name   "releasen") 9     s.release()10 11 if __name__ == "__main__":12     s = multiprocessing.Semaphore   # 限制最多有两个进程同时执行13     for i in range(5):14         p = multiprocessing.Process(target = worker, args=(s, i*2))15         p.start()

运行结果:

运作结果:

 1 Process-4acquire
 2 Process-2acquire
 3 Process-2release
 4 
 5 Process-1acquire
 6 Process-1release
 7 
 8 Process-3acquire
 9 Process-4release
10 
11 Process-5acquire
12 Process-3release
13 
14 Process-5release
 1 Process-4acquire 2 Process-2acquire 3 Process-2release 4  5 Process-1acquire 6 Process-1release 7  8 Process-3acquire 9 Process-4release10 11 Process-5acquire12 Process-3release13 14 Process-5release

4、Event
Event贯彻进程间协同通讯

4、Event
伊夫nt完成进度间共同通讯

 1 import multiprocessing
 2 import time
 3 
 4 def wait_for_event(e):
 5     print("wait_for_event: starting")
 6     e.wait()
 7     print("wairt_for_event: e.is_set()->"   str(e.is_set()))
 8 
 9 def wait_for_event_timeout(e, t):
10     print("wait_for_event_timeout:starting")
11     e.wait(t)
12     print("wait_for_event_timeout:e.is_set->"   str(e.is_set()))
13 
14 if __name__ == "__main__":
15     e = multiprocessing.Event()
16     w1 = multiprocessing.Process(name = "block",
17             target = wait_for_event,
18             args = (e,))
19 
20     w2 = multiprocessing.Process(name = "non-block",
21             target = wait_for_event_timeout,
22             args = (e, 2))
23     w1.start()
24     w2.start()
25 
26     time.sleep(3)
27 
28     e.set()
29     print("main: event is set")
30 
31 # 运行结果
32 '''
33 
34 wait_for_event: starting
35 
36 wait_for_event_timeout:starting
37 
38 wait_for_event_timeout:e.is_set->False
39 
40 main: event is set
41 
42 wairt_for_event: e.is_set()->True
43 
44 '''
 1 import multiprocessing 2 import time 3  4 def wait_for_event: 5     print("wait_for_event: starting") 6     e.wait() 7     print("wairt_for_event: e.is_set()->"   str(e.is_set 8  9 def wait_for_event_timeout:10     print("wait_for_event_timeout:starting")11     e.wait12     print("wait_for_event_timeout:e.is_set->"   str(e.is_set13 14 if __name__ == "__main__":15     e = multiprocessing.Event()16     w1 = multiprocessing.Process(name = "block",17             target = wait_for_event,18             args = 19 20     w2 = multiprocessing.Process(name = "non-block",21             target = wait_for_event_timeout,22             args = (e, 2))23     w1.start()24     w2.start()25 26     time.sleep(3)27 28     e.set()29     print("main: event is set")30 31 # 运行结果32 '''33 34 wait_for_event: starting35 36 wait_for_event_timeout:starting37 38 wait_for_event_timeout:e.is_set->False39 40 main: event is set41 42 wairt_for_event: e.is_set()->True43 44 '''

5、Queue

5、Queue

Queue是多进度安全的行列,能够选取Queue达成多进度之间的多少传递。put方法用以插入数据到行列中,put方法还应该有三个可选参数:blocked和timeout。假若blocked为True(默许值),并且timeout为正在,该方法会阻塞timeout钦定的日子,直到该队列有盈余的空中。假使超时,会抛出Queue.Full万分。倘若blocked为False,但该Queue已满,会及时抛出Queue.Full十分。

Queue是多进度安全的行列,能够应用Queue完成多进度之间的数目传递。put方法用以插入数据到行列中,put方法还或然有七个可选参数:blocked和timeout。如若blocked为True,何况timeout为正在,该方法会阻塞timeout钦命的时辰,直到该队列有多余的上空。即使超时,会抛出Queue.Full至极。假如blocked为False,但该Queue已满,会立马抛出Queue.Full格外。get方法能够从队列读取並且删除四个因素。一样,get方法有八个可选参数:blocked和timeout。要是blocked为True,何况timeout为正在,那么在伺机时间内并未有取到任何因素,会抛出Queue.Empty十分。要是blocked为False,有三种情状存在,即使Queue有一个值可用,则即时赶回该值,不然,若是队列为空,则立即抛出Queue.Empty卓殊。

 

 1 import multiprocessing 2 def writer_proc: 3     try: 4         q.put(1, block = False) 5     except: 6         pass 7  8 def reader_proc: 9     try:10         print (q.get(block = False))11     except:12         pass13 14 if __name__ == "__main__":15     q = multiprocessing.Queue()16     writer = multiprocessing.Process(target=writer_proc, args=17     writer.start()18 19     reader = multiprocessing.Process(target=reader_proc, args=20     reader.start()21 22     reader.join()23     writer.join()24 25 # 运行结果26 # 1

get方法能够从队列读取何况删除一个要素。同样,get方法有几个可选参数:blocked和timeout。假若blocked为True(私下认可值),何况timeout为正在,那么在等待时间内未有取到任何因素,会抛出Queue.Empty至极。借使blocked为False,有三种情况存在,借使Queue有叁个值可用,则立即赶回该值,不然,要是队列为空,则立刻抛出Queue.Empty万分。

6、Pipe

 1 import multiprocessing
 2 def writer_proc(q):
 3     try:
 4         q.put(1, block = False)
 5     except:
 6         pass
 7 
 8 def reader_proc(q):
 9     try:
10         print (q.get(block = False))
11     except:
12         pass
13 
14 if __name__ == "__main__":
15     q = multiprocessing.Queue()
16     writer = multiprocessing.Process(target=writer_proc, args=(q,))
17     writer.start()
18 
19     reader = multiprocessing.Process(target=reader_proc, args=(q,))
20     reader.start()
21 
22     reader.join()
23     writer.join()
24 
25 # 运行结果
26 # 1

Pipe方法重回(conn1, conn2)代表多少个管道的多个端。Pipe方法有duplex参数,假使duplex参数为True,那么这些管道是全双工情势,也便是说conn1和conn2均可收发。duplex为False,conn1只承担接受消息,conn2只承担发送音讯。send和recv方法分别是出殡和埋葬和收受消息的章程。举例,在全双工格局下,能够调用conn1.send出殡和埋葬音讯,conn1.recv接收音讯。若无音信可收到,recv方法会一向不通。即使管道已经被关门,那么recv方法会抛出EOFError。

6、Pipe

Pipe能够是单向(half-duplex),也能够是双向。大家透过mutiprocessing.Pipe(duplex=False)创设单向管道 。八个历程从PIPE一端输入对象,然后被PIPE另一端的进度接收,单向管道只允许管道一端的进程输入,而双向管道则允许从两端输入。

Pipe方法再次来到(conn1, conn2)代表一个管道的七个端。Pipe方法有duplex参数,假设duplex参数为True(默许值),那么那么些管道是全双工格局,也即是说conn1和conn2均可收发。duplex为False,conn1只承担接受新闻,conn2只肩负发送音信。

 1 # proc1 发送消息,proc2,proc3轮流接收消息 2 import multiprocessing 3 import time 4  5 def proc1: 6     while True: 7         for i in range(100): 8             print ("send: %s" % 9             pipe.send10             time.sleep(1)11 12 def proc2:13     while True:14         print ("proc2 rev:", pipe.recv15         time.sleep(1)16 17 def proc3:18     while True:19         print ("proc3 rev:", pipe.recv20         time.sleep(1)21 22 if __name__ == "__main__":23     pipe = multiprocessing.Pipe()24     p1 = multiprocessing.Process(target=proc1, args=)25     p2 = multiprocessing.Process(target=proc2, args=(pipe[1],))26     p3 = multiprocessing.Process(target=proc3, args=(pipe[1],))27 28     p1.start()29     p2.start()30     p3.start()31 32     p1.join()33     p2.join()34     p3.join()35 36 # 运行结果37 '''38 send: 039 proc2 rev: 040 send: 141 proc3 rev: 142 send: 243 proc2 rev: 244 send: 345 proc3 rev: 346 send: 447 proc2 rev: 448 send: 549 proc3 rev: 550 send: 651 proc2 rev: 652 send: 753 proc3 rev: 754 send: 855 proc2 rev: 856 send: 957 proc3 rev: 958 send: 1059 proc2 rev: 1060 ......61 '''

 

7、Pool
在行使Python进行系统管理的时候,极其是同一时候操作三个文件目录,或许远程序控制制多台主机,并行操作可以节约多量的年月。当被操作对象数目非常小时,可以一贯运用multiprocessing中的Process动态成生八个进度,贰13个幸亏,但假使是成都百货上千个,上千个目的,手动的去界定进度数量却又太过繁琐,此时能够表达进程池的职能。
Pool能够提供钦命数量的长河,供客商调用,当有新的诉求提交到pool中时,假如池还尚无满,那么就能创制叁个新的经过用来实行该央求;但借使池中的进度数已经到达规定最大值,那么该必要就能够等待,直到池中有进度截止,才会创设新的进度来实施它。

send和recv方法分别是发送和收受音信的方法。举个例子,在全双工形式下,能够调用conn1.send出殡和埋葬新闻,conn1.recv接收新闻。若无消息可接收,recv方法会一贯不通。纵然管道已经被关门,那么recv方法会抛出EOFError。

使用进度池

Pipe能够是单向(half-duplex),也得以是双向(duplex)。大家通过mutiprocessing.Pipe(duplex=False)创立单向管道 (默认为双向)。两个进度从PIPE一端输入对象,然后被PIPE另一端的进程接收,单向管道只允许管道一端的进度输入,而双向管道则允许从两端输入。

 1 import multiprocessing 2 import time 3  4 def func: 5     print ("msg:", msg) 6     time.sleep(3) 7     print ("end") 8  9 if __name__ == "__main__":10     pool = multiprocessing.Pool(processes = 3)   # 池中最大进程数为311     for i in range(10):12         msg = "hello %d"          pool.apply_async(func,    #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去14 15     print ("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")16     pool.close()17     pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束18     print ("Sub-process done.")
 1 # proc1 发送消息,proc2,proc3轮流接收消息
 2 import multiprocessing
 3 import time
 4 
 5 def proc1(pipe):
 6     while True:
 7         for i in range(100):
 8             print ("send: %s" %(i))
 9             pipe.send(i)
10             time.sleep(1)
11 
12 def proc2(pipe):
13     while True:
14         print ("proc2 rev:", pipe.recv())
15         time.sleep(1)
16 
17 def proc3(pipe):
18     while True:
19         print ("proc3 rev:", pipe.recv())
20         time.sleep(1)
21 
22 if __name__ == "__main__":
23     pipe = multiprocessing.Pipe()
24     p1 = multiprocessing.Process(target=proc1, args=(pipe[0],))
25     p2 = multiprocessing.Process(target=proc2, args=(pipe[1],))
26     p3 = multiprocessing.Process(target=proc3, args=(pipe[1],))
27 
28     p1.start()
29     p2.start()
30     p3.start()
31 
32     p1.join()
33     p2.join()
34     p3.join()
35 
36 # 运行结果
37 '''
38 send: 0
39 proc2 rev: 0
40 send: 1
41 proc3 rev: 1
42 send: 2
43 proc2 rev: 2
44 send: 3
45 proc3 rev: 3
46 send: 4
47 proc2 rev: 4
48 send: 5
49 proc3 rev: 5
50 send: 6
51 proc2 rev: 6
52 send: 7
53 proc3 rev: 7
54 send: 8
55 proc2 rev: 8
56 send: 9
57 proc3 rev: 9
58 send: 10
59 proc2 rev: 10
60 ......
61 '''

运营结果:

7、Pool
在采纳Python实行系统一管理理的时候,极其是同有的时候常候操作多少个文件目录,大概远程序调整制多台主机,并行操作能够节约多量的小运。当被操作对象数目十分的小时,能够一直运用multiprocessing中的Process动态成生多少个经过,贰11个万幸,但假诺是累累个,上千个对象,手动的去界定进程数量却又太过繁琐,此时能够表达进度池的功用。
Pool能够提供钦赐数量的历程,供客商调用,当有新的呼吁提交到pool中时,假如池还从未满,那么就能成立贰个新的进程用来进行该诉求;但只要池中的进度数一度达成规定最大值,那么该央浼就能够等待,直到池中有经过结束,才会创制新的历程来施行它。

Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~msg: hello 0msg: hello 1msg: hello 2endmsg: hello 3endmsg: hello 4endmsg: hello 5endmsg: hello 6endmsg: hello 7endmsg: hello 8endmsg: hello 9endendendSub-process done.

采纳进度池(非阻塞)

函数解释:

 1 import multiprocessing
 2 import time
 3 
 4 def func(msg):
 5     print ("msg:", msg)
 6     time.sleep(3)
 7     print ("end")
 8 
 9 if __name__ == "__main__":
10     pool = multiprocessing.Pool(processes = 3)   # 池中最大进程数为3
11     for i in range(10):
12         msg = "hello %d" %(i)
13         pool.apply_async(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
14 
15     print ("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
16     pool.close()
17     pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
18     print ("Sub-process(es) done.")
  • apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的(明白差异,看例1例2结果差异)
  • close() 关闭pool,使其不在接受新的职务。
  • terminate() 截止专门的学问进度,不在处理未成功的职务。
  • join() 主进程阻塞,等待子进度的脱离, join方法要在close或terminate之后采用。

运维结果:

实践表达:创制叁个经过池pool,并设定进度的数量为3,range会相继发出四个目的[0, 1, 2, 3,4,5,6,7,8,9],拾二个目的被交付到pool中,因pool钦点进程数为3,所以0、1、2会直接送到进程中执行,当个中八个实施到位后才空出贰个进度管理对象3,所以会并发出口“msg: hello 3”出现在"end"后。因为为非阻塞,主函数会自身执行自个的,不搭理进度的施行,所以运维完for循环后直接出口“mMsg: hark~ Mark~ Mark~~~~~~”,主程序在pool.join()处等待各类进度的扫尾。

Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
msg: hello 0
msg: hello 1
msg: hello 2
end
msg: hello 3
end
msg: hello 4
end
msg: hello 5
end
msg: hello 6
end
msg: hello 7
end
msg: hello 8
end
msg: hello 9
end
end
end
Sub-process(es) done.

使用线程池

函数解释:

 1 import multiprocessing 2 import time 3  4 def func: 5     print ("msg:", msg) 6     time.sleep(3) 7     print ("end") 8  9 if __name__ == "__main__":10     pool = multiprocessing.Pool(processes = 3)   # 池中最大进程数为311     for i in range(10):12         msg = "hello %d"          pool.apply(func,    #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去14 15     print ("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")16     pool.close()17     pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束18     print ("Sub-process done.")19 20 # 运行结果21 '''22 msg: hello 023 end24 msg: hello 125 end26 msg: hello 227 end28 msg: hello 329 end30 msg: hello 431 end32 msg: hello 533 end34 msg: hello 635 end36 msg: hello 737 end38 msg: hello 839 end40 msg: hello 941 end42 Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~43 Sub-process done.44 '''
  • apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的(精通差距,看例1例2结果差异)
  • close()    关闭pool,使其不在接受新的职分。
  • terminate()    甘休工作进度,不在管理未成功的天职。
  • join()    主进度阻塞,等待子进度的退出, join方法要在close或terminate之后采纳。

选拔多个进度池

推行表达:创造一个经过池pool,并设定进程的多少为3,range(4)会挨个发出多少个目的[0, 1, 2, 3,4,5,6,7,8,9],十三个指标被交给到pool中,因pool钦点进度数为3,所以0、1、2会直接送到进程中实行,当当中二个实施到位后才空出三个进度处理对象3,所以会现出出口“msg: hello 3”出现在"end"后。因为为非阻塞,主函数会自身实施自个的,不搭理进度的施行,所以运转完for循环后直接出口“mMsg: hark~ Mark~ Mark~~~~~~”,主程序在pool.join()处等待各样进度的扫尾。

 1 import multiprocessing 2 import os, time, random 3  4  5 def Lee(): 6     print("nRun task Lee-%s" % (os.getpid  # os.getpid()获取当前的进程的ID 7     start = time.time() 8     time.sleep(random.random  # random.random()随机生成0-1之间的小数 9     end = time.time()10     print( 'Task Lee, runs %0.2f seconds.' % (end - start))11 12 13 def Marlon():14     print("nRun task Marlon-%s" % (os.getpid15     start = time.time()16     time.sleep(random.random() * 40)17     end = time.time()18     print('Task Marlon runs %0.2f seconds.' % (end - start))19 20 21 def Allen():22     print("nRun task Allen-%s" % (os.getpid23     start = time.time()24     time.sleep(random.random() * 30)25     end = time.time()26     print('Task Allen runs %0.2f seconds.' % (end - start))27 28 29 def Frank():30     print( "nRun task Frank-%s" % (os.getpid31     start = time.time()32     time.sleep(random.random() * 20)33     end = time.time()34     print( 'Task Frank runs %0.2f seconds.' % (end - start))35 36 37 if __name__ == '__main__':38     function_list = [Lee, Marlon, Allen, Frank]39     print("parent process %s" % (os.getpid40 41     pool = multiprocessing.Pool(4)42     for func in function_list:43         pool.apply_async  # Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中44 45     print('Waiting for all subprocesses done...')46     pool.close()47     pool.join()  # 调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束48     print( 'All subprocesses done.')49 50 # 运行结果51 '''52 parent process 325653 Waiting for all subprocesses done...54 55 Run task Lee-219656 57 Run task Marlon-458058 59 Run task Allen-592060 61 Run task Frank-638462 Task Allen runs 2.15 seconds.63 Task Lee, runs 9.99 seconds.64 Task Frank runs 14.14 seconds.65 Task Marlon runs 32.74 seconds.66 All subprocesses done.67 68 '''

 

使用线程池(阻塞)

 1 import multiprocessing
 2 import time
 3 
 4 def func(msg):
 5     print ("msg:", msg)
 6     time.sleep(3)
 7     print ("end")
 8 
 9 if __name__ == "__main__":
10     pool = multiprocessing.Pool(processes = 3)   # 池中最大进程数为3
11     for i in range(10):
12         msg = "hello %d" %(i)
13         pool.apply(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
14 
15     print ("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
16     pool.close()
17     pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
18     print ("Sub-process(es) done.")
19 
20 # 运行结果
21 '''
22 msg: hello 0
23 end
24 msg: hello 1
25 end
26 msg: hello 2
27 end
28 msg: hello 3
29 end
30 msg: hello 4
31 end
32 msg: hello 5
33 end
34 msg: hello 6
35 end
36 msg: hello 7
37 end
38 msg: hello 8
39 end
40 msg: hello 9
41 end
42 Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
43 Sub-process(es) done.
44 '''

运用多少个进度池

 1 import multiprocessing
 2 import os, time, random
 3 
 4 
 5 def Lee():
 6     print("nRun task Lee-%s" % (os.getpid()))  # os.getpid()获取当前的进程的ID
 7     start = time.time()
 8     time.sleep(random.random() * 10)  # random.random()随机生成0-1之间的小数
 9     end = time.time()
10     print( 'Task Lee, runs %0.2f seconds.' % (end - start))
11 
12 
13 def Marlon():
14     print("nRun task Marlon-%s" % (os.getpid()))
15     start = time.time()
16     time.sleep(random.random() * 40)
17     end = time.time()
18     print('Task Marlon runs %0.2f seconds.' % (end - start))
19 
20 
21 def Allen():
22     print("nRun task Allen-%s" % (os.getpid()))
23     start = time.time()
24     time.sleep(random.random() * 30)
25     end = time.time()
26     print('Task Allen runs %0.2f seconds.' % (end - start))
27 
28 
29 def Frank():
30     print( "nRun task Frank-%s" % (os.getpid()))
31     start = time.time()
32     time.sleep(random.random() * 20)
33     end = time.time()
34     print( 'Task Frank runs %0.2f seconds.' % (end - start))
35 
36 
37 if __name__ == '__main__':
38     function_list = [Lee, Marlon, Allen, Frank]
39     print("parent process %s" % (os.getpid()))
40 
41     pool = multiprocessing.Pool(4)
42     for func in function_list:
43         pool.apply_async(func)  # Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中
44 
45     print('Waiting for all subprocesses done...')
46     pool.close()
47     pool.join()  # 调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束
48     print( 'All subprocesses done.')
49 
50 # 运行结果
51 '''
52 parent process 3256
53 Waiting for all subprocesses done...
54 
55 Run task Lee-2196
56 
57 Run task Marlon-4580
58 
59 Run task Allen-5920
60 
61 Run task Frank-6384
62 Task Allen runs 2.15 seconds.
63 Task Lee, runs 9.99 seconds.
64 Task Frank runs 14.14 seconds.
65 Task Marlon runs 32.74 seconds.
66 All subprocesses done.
67 
68 '''

 

本文由澳门新萄京官方网站发布于www.8455.com,转载请注明出处:澳门新萄京官方网站:python多进程总括,Python多

关键词: