您的位置:新葡亰496net > 奥门新萄京娱乐场 > Python多进度编制程序,python多进度总括

Python多进度编制程序,python多进度总括

发布时间:2019-08-17 11:09编辑:奥门新萄京娱乐场浏览(145)

    序. multiprocessing
    python中的多线程其实并非当真的二十八线程,如若想要丰富地使用多核CPU的能源,在python中山大学部场馆供给选拔多进度。Python提供了相当好用的多进程包multiprocessing,只供给定义一个函数,Python会完结其他兼具事情。借助这些包,能够轻巧完结从单进度到出现试行的转移。multiprocessing扶助子进度、通讯和分享数据、试行不一样情势的协同,提供了Process、Queue、Pipe、Lock等零件。

    翻阅目录

      python中的四线程其实并不是实在的四线程,假如想要充裕地行使多核CPU的能源,在python中山高校部分气象需求采取多进度。Python提供了这几个好用的多进度包multiprocessing,只需求定义贰个函数,Python会完成别的全部职业。借助那些包,可以轻便做到从单进度到出现实践的转移。multiprocessing帮忙子进度、通讯和分享数据、实施分化格局的一块,提供了Process、Queue、Pipe、Lock等零件。

    原文:

      python中的多线程其实并不是实在的三十二线程,假如想要丰富地选择多核CPU的财富,在python中山大学部景色要求选拔多进程。Python提供了极度好用的多进度包multiprocessing,只需求定义一个函数,Python会实现别的具备事务。借助那些包,可以轻巧做到从单进度到并发实行的转换。multiprocessing帮助子进度、通讯和分享数据、实施分裂样式的协同,提供了Process、Queue、Pipe、Lock等零件。

     

      1. Process
      1. Lock
      1. Semaphore
      1. Event
      1. Queue
      1. Pipe
      1. Pool

    1、Process

    开卷目录

    1、Process

    1. Process

    开创进度的类:Process([group [, target [, name [, args [, kwargs]]]]]),target表示调用对象,args表示调用对象的任务参数元组。kwargs表示调用对象的字典。name为外号。group实质上不选拔。
    方法:is_alive()、join([timeout])、run()、start()、terminate()。个中,Process以start()运转有个别进度。

    属性:authkey、daemon(要经过start()设置)、exitcode(进度在运作时为None、如果为–N,表示被功率信号N结束)、name、pid。个中daemon是父进程终止后活动结束,且本身不能产生新历程,必须在start()以前设置。

     

    例1.1:成立函数并将其看做单个进度

    import multiprocessing
    import time
    
    def worker(interval):
        n = 5
        while n > 0:
            print("The time is {0}".format(time.ctime()))
            time.sleep(interval)
            n -= 1
    
    if __name__ == "__main__":
        p = multiprocessing.Process(target = worker, args = (3,))
        p.start()
        print "p.pid:", p.pid
        print "p.name:", p.name
        print "p.is_alive:", p.is_alive()
    

    结果

    p.pid: 8736
    p.name: Process-1
    p.is_alive: True
    The time is Tue Apr 21 20:55:12 2015
    The time is Tue Apr 21 20:55:15 2015
    The time is Tue Apr 21 20:55:18 2015
    The time is Tue Apr 21 20:55:21 2015
    The time is Tue Apr 21 20:55:24 2015
    

     

    例1.2:创立函数并将其视作七个进度

    import multiprocessing
    import time
    
    def worker_1(interval):
        print "worker_1"
        time.sleep(interval)
        print "end worker_1"
    
    def worker_2(interval):
        print "worker_2"
        time.sleep(interval)
        print "end worker_2"
    
    def worker_3(interval):
        print "worker_3"
        time.sleep(interval)
        print "end worker_3"
    
    if __name__ == "__main__":
        p1 = multiprocessing.Process(target = worker_1, args = (2,))
        p2 = multiprocessing.Process(target = worker_2, args = (3,))
        p3 = multiprocessing.Process(target = worker_3, args = (4,))
    
        p1.start()
        p2.start()
        p3.start()
    
        print("The number of CPU is:"   str(multiprocessing.cpu_count()))
        for p in multiprocessing.active_children():
            print("child   p.name:"   p.name   "tp.id"   str(p.pid))
        print "END!!!!!!!!!!!!!!!!!"
    

    结果

    The number of CPU is:4
    child   p.name:Process-3 p.id7992
    child   p.name:Process-2 p.id4204
    child   p.name:Process-1 p.id6380
    END!!!!!!!!!!!!!!!!!
    worker_1
    worker_3
    worker_2
    end worker_1
    end worker_2
    end worker_3
    

     

    例1.3:将经过定义为类

    import multiprocessing
    import time
    
    class ClockProcess(multiprocessing.Process):
        def __init__(self, interval):
            multiprocessing.Process.__init__(self)
            self.interval = interval
    
        def run(self):
            n = 5
            while n > 0:
                print("the time is {0}".format(time.ctime()))
                time.sleep(self.interval)
                n -= 1
    
    if __name__ == '__main__':
        p = ClockProcess(3)
        p.start()      
    

    :进程p调用start()时,自动调用run()

    结果

    the time is Tue Apr 21 20:31:30 2015
    the time is Tue Apr 21 20:31:33 2015
    the time is Tue Apr 21 20:31:36 2015
    the time is Tue Apr 21 20:31:39 2015
    the time is Tue Apr 21 20:31:42 2015
    

     

    例1.4:daemon程序相比结果

    #1.4-1 不加daemon属性

    import multiprocessing
    import time
    
    def worker(interval):
        print("work start:{0}".format(time.ctime()));
        time.sleep(interval)
        print("work end:{0}".format(time.ctime()));
    
    if __name__ == "__main__":
        p = multiprocessing.Process(target = worker, args = (3,))
        p.start()
        print "end!"
    

    结果

    end!
    work start:Tue Apr 21 21:29:10 2015
    work end:Tue Apr 21 21:29:13 2015
    

    #1.4-2 加上daemon属性

    import multiprocessing
    import time
    
    def worker(interval):
        print("work start:{0}".format(time.ctime()));
        time.sleep(interval)
        print("work end:{0}".format(time.ctime()));
    
    if __name__ == "__main__":
        p = multiprocessing.Process(target = worker, args = (3,))
        p.daemon = True
        p.start()
        print "end!"
    

    结果

    end!
    

    :因子进度设置了daemon属性,主进程截止,它们就趁早结束了。

    #1.4-3 设置daemon施行完结束的章程

    import multiprocessing
    import time
    
    def worker(interval):
        print("work start:{0}".format(time.ctime()));
        time.sleep(interval)
        print("work end:{0}".format(time.ctime()));
    
    if __name__ == "__main__":
        p = multiprocessing.Process(target = worker, args = (3,))
        p.daemon = True
        p.start()
        p.join()
        print "end!"
    

    结果

    work start:Tue Apr 21 22:16:32 2015
    work end:Tue Apr 21 22:16:35 2015
    end!
    

     

    序. multiprocessing
    python中的八线程其实并非实在的多线程,假如想要丰盛地利用多核CPU的能源,在python中山高校部分情景须求利用多进程。Python提供了丰盛好用的多进度包multiprocessing,只必要定义贰个函数,Python会完结其他具有事务。借助那个包,能够轻易做到从单进度到出现推行的更动。multiprocessing帮助子进程、通讯和分享数据、试行分歧式样的同步,提供了Process、Queue、Pipe、Lock等零件。

    创办进程的类:Process([group [, target [, name [, args [, kwargs]]]]]),target表示调用对象,args表示调用对象的地点参数元组。kwargs表示调用对象的字典。name为外号。group实质上不利用。

    • 1. Process
    • 2. Lock
    • 3. Semaphore
    • 4. Event
    • 5. Queue
    • 6. Pipe
    • 7. Pool

    创办进度的类:Process([group [, target [, name [, args [, kwargs]]]]]),target表示调用对象,args表示调用对象的职位参数元组。kwargs表示调用对象的字典。name为外号。group实质上不行使。

    2. Lock

    当几个经过须求访谈分享财富的时候,Lock能够用来幸免访谈的争辨。

    import multiprocessing
    import sys
    
    def worker_with(lock, f):
        with lock:
            fs = open(f, 'a ')
            n = 10
            while n > 1:
                fs.write("Lockd acquired via withn")
                n -= 1
            fs.close()
    
    def worker_no_with(lock, f):
        lock.acquire()
        try:
            fs = open(f, 'a ')
            n = 10
            while n > 1:
                fs.write("Lock acquired directlyn")
                n -= 1
            fs.close()
        finally:
            lock.release()
    
    if __name__ == "__main__":
        lock = multiprocessing.Lock()
        f = "file.txt"
        w = multiprocessing.Process(target = worker_with, args=(lock, f))
        nw = multiprocessing.Process(target = worker_no_with, args=(lock, f))
        w.start()
        nw.start()
        print "end"
    

    结果(输出文件)

    Lockd acquired via with
    Lockd acquired via with
    Lockd acquired via with
    Lockd acquired via with
    Lockd acquired via with
    Lockd acquired via with
    Lockd acquired via with
    Lockd acquired via with
    Lockd acquired via with
    Lock acquired directly
    Lock acquired directly
    Lock acquired directly
    Lock acquired directly
    Lock acquired directly
    Lock acquired directly
    Lock acquired directly
    Lock acquired directly
    Lock acquired directly
    

     

     

    方法:is_alive()、join([timeout])、run()、start()、terminate()。当中,Process以start()运维有些进度。

    序. multiprocessing
    python中的三十二线程其实并不是确实的二十八线程,假如想要充足地动用多核CPU的能源,在python中山大学部情形须求运用多进度。Python提供了要命好用的多进度包multiprocessing,只需求定义一个函数,Python会实现其他具有业务。借助那个包,能够轻便实现从单进度到并发施行的改动。multiprocessing协理子进度、通讯和分享数据、实施不一款式的同台,提供了Process、Queue、Pipe、Lock等零件。

    方法:is_alive()、join([timeout])、run、terminate()。在那之中,Process以start()运行有些进度。

    3. Semaphore

    Semaphore用来支配成对分享能源的拜会数量,举例池的最摩苏尔接数。

    import multiprocessing
    import time
    
    def worker(s, i):
        s.acquire()
        print(multiprocessing.current_process().name   "acquire");
        time.sleep(i)
        print(multiprocessing.current_process().name   "releasen");
        s.release()
    
    if __name__ == "__main__":
        s = multiprocessing.Semaphore(2)
        for i in range(5):
            p = multiprocessing.Process(target = worker, args=(s, i*2))
            p.start()
    

    结果

    Process-1acquire
    Process-1release
    
    Process-2acquire
    Process-3acquire
    Process-2release
    
    Process-5acquire
    Process-3release
    
    Process-4acquire
    Process-5release
    
    Process-4release
    

     

    属性:authkey、daemon(要透过start()设置)、exitcode(进度在运转时为None、如若为–N,表示被时限信号N结束)、name、pid。在那之中daemon是父进度终止后活动终止,且本身不能够产生新历程,必须在start()从前安装。

     

    属性:authkey、daemon(要因而start、exitcode(进度在运行时为None、如若为–N,表示被时限信号N停止)、name、pid。个中daemon是父进度终止后活动终止,且自身不可能发出新历程,必须在start()之前设置。

    4. Event

    伊芙nt用来贯彻进度间共同通讯。

    import multiprocessing
    import time
    
    def wait_for_event(e):
        print("wait_for_event: starting")
        e.wait()
        print("wairt_for_event: e.is_set()->"   str(e.is_set()))
    
    def wait_for_event_timeout(e, t):
        print("wait_for_event_timeout:starting")
        e.wait(t)
        print("wait_for_event_timeout:e.is_set->"   str(e.is_set()))
    
    if __name__ == "__main__":
        e = multiprocessing.Event()
        w1 = multiprocessing.Process(name = "block",
                target = wait_for_event,
                args = (e,))
    
        w2 = multiprocessing.Process(name = "non-block",
                target = wait_for_event_timeout,
                args = (e, 2))
        w1.start()
        w2.start()
    
        time.sleep(3)
    
        e.set()
        print("main: event is set")
    

    结果

    wait_for_event: starting
    wait_for_event_timeout:starting
    wait_for_event_timeout:e.is_set->False
    main: event is set
    wairt_for_event: e.is_set()->True
    

     

    1. Process

    创建进程的类:Process([group [, target [, name [, args [, kwargs]]]]]),

      target表示调用对象,

      args表示调用对象的职位参数元组。

      kwargs表示调用对象的字典。

      name为别名。

      group实质上不采用。
    方法:is_alive()、join([timeout])、run()、start()、terminate()。

      当中,Process以start()运营有个别进度。

    属性:authkey、daemon(要由此start()设置)、exitcode(进度在运作时为None、假如为–N,表示被时限信号N截至)、name、pid。

      个中daemon是父进度终止后活动结束,且本身无法产生新历程,必须在start()在此以前安装。

     

    例1.1:创建函数并将其当做单个进程

    图片 1

    import multiprocessing
    import time
    
    def worker(interval):
        n = 5
        while n > 0:
            print("The time is {0}".format(time.ctime()))
            time.sleep(interval)
            n -= 1
    
    if __name__ == "__main__":
        p = multiprocessing.Process(target = worker, args = (3,))
        p.start()
        print("p.pid:", p.pid)
        print("p.name:", p.name)
        print("p.is_alive:", p.is_alive())
    

    图片 2

    结果

    1
    2
    3
    4
    5
    6
    7
    8
    p.pid: 8736
    p.name: Process-1
    p.is_alive: True
    The time is Tue Apr 21 20:55:12 2015
    The time is Tue Apr 21 20:55:15 2015
    The time is Tue Apr 21 20:55:18 2015
    The time is Tue Apr 21 20:55:21 2015
    The time is Tue Apr 21 20:55:24 2015

     

    例1.2:创立函数并将其看成多少个经过

    图片 3

    import multiprocessing
    import time
    
    def worker_1(interval):
        print("worker_1")
        time.sleep(interval)
        print("end worker_1")
    
    def worker_2(interval):
        print("worker_2")
        time.sleep(interval)
        print("end worker_2")
    
    def worker_3(interval):
        print("worker_3")
        time.sleep(interval)
        print("end worker_3")
    
    if __name__ == "__main__":
        p1 = multiprocessing.Process(target = worker_1, args = (2,))
        p2 = multiprocessing.Process(target = worker_2, args = (3,))
        p3 = multiprocessing.Process(target = worker_3, args = (4,))
    
        p1.start()
        p2.start()
        p3.start()
    
        print("The number of CPU is:"   str(multiprocessing.cpu_count()))
        for p in multiprocessing.active_children():
            print("child   p.name:"   p.name   "tp.id"   str(p.pid))
        print("END!!!!!!!!!!!!!!!!!")
    

    图片 4

    结果

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    The number of CPU is:4
    child   p.name:Process-3    p.id7992
    child   p.name:Process-2    p.id4204
    child   p.name:Process-1    p.id6380
    END!!!!!!!!!!!!!!!!!
    worker_1
    worker_3
    worker_2
    end worker_1
    end worker_2
    end worker_3

     

    例1.3:将经过定义为类

    图片 5

    import multiprocessing
    import time
    
    class ClockProcess(multiprocessing.Process):
        def __init__(self, interval):
            multiprocessing.Process.__init__(self)
            self.interval = interval
    
        def run(self):
            n = 5
            while n > 0:
                print("the time is {0}".format(time.ctime()))
                time.sleep(self.interval)
                n -= 1
    
    if __name__ == '__main__':
        p = ClockProcess(3)
        p.start()      
    

    图片 6

    :进度p调用start()时,自动调用run()

    结果

    1
    2
    3
    4
    5
    the time is Tue Apr 21 20:31:30 2015
    the time is Tue Apr 21 20:31:33 2015
    the time is Tue Apr 21 20:31:36 2015
    the time is Tue Apr 21 20:31:39 2015
    the time is Tue Apr 21 20:31:42 2015

     

    例1.4:daemon程序比较结果

    #1.4-1 不加daemon属性

    图片 7

    import multiprocessing
    import time
    
    def worker(interval):
        print("work start:{0}".format(time.ctime()));
        time.sleep(interval)
        print("work end:{0}".format(time.ctime()));
    
    if __name__ == "__main__":
        p = multiprocessing.Process(target = worker, args = (3,))
        p.start()
        print("end!")
    

    图片 8

    结果

    1
    2
    3
    end!
    work start:Tue Apr 21 21:29:10 2015
    work end:Tue Apr 21 21:29:13 2015

    #1.4-2 加上daemon属性

    图片 9

    import multiprocessing
    import time
    
    def worker(interval):
        print("work start:{0}".format(time.ctime()));
        time.sleep(interval)
        print("work end:{0}".format(time.ctime()));
    
    if __name__ == "__main__":
        p = multiprocessing.Process(target = worker, args = (3,))
        p.daemon = True
        p.start()
        print("end!")
    

    图片 10

    结果

    1
    end!

    :因子进程设置了daemon属性,主进度甘休,它们就趁机截至了。

    #1.4-3 设置daemon施行完截至的秘籍

    图片 11

    import multiprocessing
    import time
    
    def worker(interval):
        print("work start:{0}".format(time.ctime()));
        time.sleep(interval)
        print("work end:{0}".format(time.ctime()));
    
    if __name__ == "__main__":
        p = multiprocessing.Process(target = worker, args = (3,))
        p.daemon = True
        p.start()
        p.join()
        print("end!")
    

    图片 12

    结果

    1
    2
    3
    work start:Tue Apr 21 22:16:32 2015
    work end:Tue Apr 21 22:16:35 2015
    end!

     

    注:

    归来最上部

    注:

    5. Queue

    Queue是多进程安全的队列,能够使用Queue达成多进度之间的数额传递。put方法用以插入数据到行列中,put方法还应该有五个可选参数:blocked和timeout。如若blocked为True(暗中同意值),而且timeout为正值,该方法会阻塞timeout钦定的年华,直到该队列有剩余的空间。假使超时,会抛出Queue.Full分外。要是blocked为False,但该Queue已满,会应声抛出Queue.Full极度。

     

    get方法能够从队列读取並且删除三个成分。一样,get方法有四个可选参数:blocked和timeout。假诺blocked为True(默许值),况兼timeout为正值,那么在等待时间内尚未取到任何因素,会抛出Queue.Empty相当。假若blocked为False,有二种意况存在,假设Queue有三个值可用,则立即再次来到该值,不然,假使队列为空,则马上抛出Queue.Empty分外。Queue的一段示例代码:

    import multiprocessing
    
    def writer_proc(q):      
        try:         
            q.put(1, block = False) 
        except:         
            pass   
    
    def reader_proc(q):      
        try:         
            print q.get(block = False) 
        except:         
            pass
    
    if __name__ == "__main__":
        q = multiprocessing.Queue()
        writer = multiprocessing.Process(target=writer_proc, args=(q,))  
        writer.start()   
    
        reader = multiprocessing.Process(target=reader_proc, args=(q,))  
        reader.start()  
    
        reader.join()  
        writer.join()
    

    结果

    1
    

     

    2. Lock

    当四个进程须求访谈分享财富的时候,Lock能够用来防止访谈的争辩。

    图片 13

    import multiprocessing
    import sys
    
    def worker_with(lock, f):
        with lock:
            fs = open(f, 'a ')
            n = 10
            while n > 1:
                fs.write("Lockd acquired via withn")
                n -= 1
            fs.close()
    
    def worker_no_with(lock, f):
        lock.acquire()
        try:
            fs = open(f, 'a ')
            n = 10
            while n > 1:
                fs.write("Lock acquired directlyn")
                n -= 1
            fs.close()
        finally:
            lock.release()
    
    if __name__ == "__main__":
        lock = multiprocessing.Lock()
        f = "file.txt"
        w = multiprocessing.Process(target = worker_with, args=(lock, f))
        nw = multiprocessing.Process(target = worker_no_with, args=(lock, f))
        w.start()
        nw.start()
        print("end")
    

    图片 14

    结果(输出文件)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    Lockd acquired via with
    Lockd acquired via with
    Lockd acquired via with
    Lockd acquired via with
    Lockd acquired via with
    Lockd acquired via with
    Lockd acquired via with
    Lockd acquired via with
    Lockd acquired via with
    Lock acquired directly
    Lock acquired directly
    Lock acquired directly
    Lock acquired directly
    Lock acquired directly
    Lock acquired directly
    Lock acquired directly
    Lock acquired directly
    Lock acquired directly

     

    Python多进度编制程序,python多进度总括。is_live()用来查阅进度的处境

    1. Process

    创立进程的类:Process([group [, target [, name [, args [, kwargs]]]]]),target表示调用对象,args表示调用对象的岗位参数元组。kwargs表示调用对象的字典。name为别称。group实质上不接纳。
    方法:is_alive()、join([timeout])、run()、start()、terminate()。个中,Process以start()运转某些进度。

    属性:authkey、daemon(要通过start()设置)、exitcode(进度在运维时为None、假如为–N,表示被实信号N甘休)、name、pid。当中daemon是父进度终止后自行终止,且本身不能够发生新进程,必须在start()在此之前安装。

     

    例1.1:创立函数并将其当做单个进度

    图片 15

    import multiprocessing
    import time
    
    def worker(interval):
        n = 5
        while n > 0:
            print("The time is {0}".format(time.ctime()))
            time.sleep(interval)
            n -= 1
    
    if __name__ == "__main__":
        p = multiprocessing.Process(target = worker, args = (3,))
        p.start()
        print "p.pid:", p.pid
        print "p.name:", p.name
        print "p.is_alive:", p.is_alive()
    

    图片 16

    结果

    1
    2
    3
    4
    5
    6
    7
    8
    p.pid: 8736
    p.name: Process-1
    p.is_alive: True
    The time is Tue Apr 21 20:55:12 2015
    The time is Tue Apr 21 20:55:15 2015
    The time is Tue Apr 21 20:55:18 2015
    The time is Tue Apr 21 20:55:21 2015
    The time is Tue Apr 21 20:55:24 2015

     

    例1.2:成立函数并将其看成几个经过

    图片 17

    import multiprocessing
    import time
    
    def worker_1(interval):
        print "worker_1"
        time.sleep(interval)
        print "end worker_1"
    
    def worker_2(interval):
        print "worker_2"
        time.sleep(interval)
        print "end worker_2"
    
    def worker_3(interval):
        print "worker_3"
        time.sleep(interval)
        print "end worker_3"
    
    if __name__ == "__main__":
        p1 = multiprocessing.Process(target = worker_1, args = (2,))
        p2 = multiprocessing.Process(target = worker_2, args = (3,))
        p3 = multiprocessing.Process(target = worker_3, args = (4,))
    
        p1.start()
        p2.start()
        p3.start()
    
        print("The number of CPU is:"   str(multiprocessing.cpu_count()))
        for p in multiprocessing.active_children():
            print("child   p.name:"   p.name   "tp.id"   str(p.pid))
        print "END!!!!!!!!!!!!!!!!!"
    

    图片 18

    结果

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    The number of CPU is:4
    child   p.name:Process-3    p.id7992
    child   p.name:Process-2    p.id4204
    child   p.name:Process-1    p.id6380
    END!!!!!!!!!!!!!!!!!
    worker_1
    worker_3
    worker_2
    end worker_1
    end worker_2
    end worker_3

     

    例1.3:将经过定义为类

    图片 19

    import multiprocessing
    import time
    
    class ClockProcess(multiprocessing.Process):
        def __init__(self, interval):
            multiprocessing.Process.__init__(self)
            self.interval = interval
    
        def run(self):
            n = 5
            while n > 0:
                print("the time is {0}".format(time.ctime()))
                time.sleep(self.interval)
                n -= 1
    
    if __name__ == '__main__':
        p = ClockProcess(3)
        p.start()      
    

    图片 20

    :进度p调用start()时,自动调用run()

    结果

    1
    2
    3
    4
    5
    the time is Tue Apr 21 20:31:30 2015
    the time is Tue Apr 21 20:31:33 2015
    the time is Tue Apr 21 20:31:36 2015
    the time is Tue Apr 21 20:31:39 2015
    the time is Tue Apr 21 20:31:42 2015

     

    例1.4:daemon程序相比较结果

    #1.4-1 不加daemon属性

    图片 21

    import multiprocessing
    import time
    
    def worker(interval):
        print("work start:{0}".format(time.ctime()));
        time.sleep(interval)
        print("work end:{0}".format(time.ctime()));
    
    if __name__ == "__main__":
        p = multiprocessing.Process(target = worker, args = (3,))
        p.start()
        print "end!"
    

    图片 22

    结果

    1
    2
    3
    end!
    work start:Tue Apr 21 21:29:10 2015
    work end:Tue Apr 21 21:29:13 2015

    #1.4-2 加上daemon属性

    图片 23

    import multiprocessing
    import time
    
    def worker(interval):
        print("work start:{0}".format(time.ctime()));
        time.sleep(interval)
        print("work end:{0}".format(time.ctime()));
    
    if __name__ == "__main__":
        p = multiprocessing.Process(target = worker, args = (3,))
        p.daemon = True
        p.start()
        print "end!"
    

    图片 24

    结果

    1
    end!

    :因子进度设置了daemon属性,主进度甘休,它们就趁着甘休了。

    #1.4-3 设置daemon施行完结束的艺术

    图片 25

    import multiprocessing
    import time
    
    def worker(interval):
        print("work start:{0}".format(time.ctime()));
        time.sleep(interval)
        print("work end:{0}".format(time.ctime()));
    
    if __name__ == "__main__":
        p = multiprocessing.Process(target = worker, args = (3,))
        p.daemon = True
        p.start()
        p.join()
        print "end!"
    

    图片 26

    结果

    1
    2
    3
    work start:Tue Apr 21 22:16:32 2015
    work end:Tue Apr 21 22:16:35 2015
    end!

     

    重临顶上部分

    is_live()用来查看进程的情况

    6. Pipe

    Pipe方法再次来到(conn1, conn2)代表贰个管道的多少个端。Pipe方法有duplex参数,假诺duplex参数为True(暗中认可值),那么那几个管道是全双工方式,约等于说conn1和conn2均可收发。duplex为False,conn1只承担接受消息,conn2只担负发送音信。

     

    send和recv方法分别是出殡和埋葬和承受音信的主意。譬如,在全双工情势下,能够调用conn1.send发送信息,conn1.recv接收讯息。若无信息可收到,recv方法会一向不通。假若管道已经被关闭,那么recv方法会抛出EOFError。

    import multiprocessing
    import time
    
    def proc1(pipe):
        while True:
            for i in xrange(10000):
                print "send: %s" %(i)
                pipe.send(i)
                time.sleep(1)
    
    def proc2(pipe):
        while True:
            print "proc2 rev:", pipe.recv()
            time.sleep(1)
    
    def proc3(pipe):
        while True:
            print "PROC3 rev:", pipe.recv()
            time.sleep(1)
    
    if __name__ == "__main__":
        pipe = multiprocessing.Pipe()
        p1 = multiprocessing.Process(target=proc1, args=(pipe[0],))
        p2 = multiprocessing.Process(target=proc2, args=(pipe[1],))
        #p3 = multiprocessing.Process(target=proc3, args=(pipe[1],))
    
        p1.start()
        p2.start()
        #p3.start()
    
        p1.join()
        p2.join()
        #p3.join()
    

    结果

    图片 27

     

    3. Semaphore

    Semaphore用来决定对分享财富的访谈数量,比方池的最厦门接数。

    图片 28

    import multiprocessing
    import time
    
    def worker(s, i):
        s.acquire()
        print(multiprocessing.current_process().name   "acquire");
        time.sleep(i)
        print(multiprocessing.current_process().name   "releasen");
        s.release()
    
    if __name__ == "__main__":
        s = multiprocessing.Semaphore(2)
        for i in range(5):
            p = multiprocessing.Process(target = worker, args=(s, i*2))
            p.start()
    

    图片 29

    结果

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    Process-1acquire
    Process-1release
     
    Process-2acquire
    Process-3acquire
    Process-2release
     
    Process-5acquire
    Process-3release
     
    Process-4acquire
    Process-5release
     
    Process-4release

     

    terminate()用来终止进程。

    2. Lock

    当八个经过须要访谈分享财富的时候,Lock能够用来幸免访问的争执。

    图片 30

    import multiprocessing
    import sys
    
    def worker_with(lock, f):
        with lock:
            fs = open(f, 'a ')
            n = 10
            while n > 1:
                fs.write("Lockd acquired via withn")
                n -= 1
            fs.close()
    
    def worker_no_with(lock, f):
        lock.acquire()
        try:
            fs = open(f, 'a ')
            n = 10
            while n > 1:
                fs.write("Lock acquired directlyn")
                n -= 1
            fs.close()
        finally:
            lock.release()
    
    if __name__ == "__main__":
        lock = multiprocessing.Lock()
        f = "file.txt"
        w = multiprocessing.Process(target = worker_with, args=(lock, f))
        nw = multiprocessing.Process(target = worker_no_with, args=(lock, f))
        w.start()
        nw.start()
        print "end"
    

    图片 31

    结果(输出文件)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    Lockd acquired via with
    Lockd acquired via with
    Lockd acquired via with
    Lockd acquired via with
    Lockd acquired via with
    Lockd acquired via with
    Lockd acquired via with
    Lockd acquired via with
    Lockd acquired via with
    Lock acquired directly
    Lock acquired directly
    Lock acquired directly
    Lock acquired directly
    Lock acquired directly
    Lock acquired directly
    Lock acquired directly
    Lock acquired directly
    Lock acquired directly

     

    重返最上部

    terminate()用来终止进度。

    7. Pool

    在运用Python实行系统管理的时候,极其是相同的时间操作八个文件目录,大概远程序调节制多台主机,并行操作可以节省大批量的小时。当被操作对象数目非常小时,能够平素利用multiprocessing中的Process动态成生多个进度,二十一个辛亏,但假若是过三个,上千个指标,手动的去界定进度数量却又太过繁琐,此时能够宣布进程池的效应。
    Pool能够提供钦点数量的经过,供用户调用,当有新的呼吁提交到pool中时,倘使池还从未满,那么就能创立一个新的长河用来进行该央求;但假如池中的进程数一度达到规定的规范规定最大值,那么该央浼就能够等待,直到池中有经过甘休,才会创建新的历程来它。

     

    例7.1:使用进程池(非阻塞)

    #coding: utf-8
    import multiprocessing
    import time
    
    def func(msg):
        print "msg:", msg
        time.sleep(3)
        print "end"
    
    if __name__ == "__main__":
        pool = multiprocessing.Pool(processes = 3)
        for i in xrange(4):
            msg = "hello %d" %(i)
            pool.apply_async(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
    
        print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"
        pool.close()
        pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
        print "Sub-process(es) done."
    

    三遍执行结果

    mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~ello 0
    
    msg: hello 1
    msg: hello 2
    end
    msg: hello 3
    end
    end
    end
    Sub-process(es) done.
    

    函数解释:

    • apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的(掌握区别,看例1例2结果区别)
    • close()    关闭pool,使其不在接受新的天职。
    • terminate()    甘休职业历程,不在处理未到位的天职。
    • join()    主进度阻塞,等待子进度的退出, join方法要在close或terminate之后选取。

    施行表达:创立叁个历程池pool,并设定进程的数码为3,xrange(4)会挨个发出三个指标[0, 1, 2, 4],多个对象被交付到pool中,因pool内定进度数为3,所以0、1、2会直接送到进度中试行,当个中三个试行到位后才空出八个经过管理对象3,所以会见世出口“msg: hello 3”出现在"end"后。因为为非阻塞,主函数会自个儿施行自个的,不搭理进度的实践,所以运营完for循环后一向出口“mMsg: hark~ Mark~ Mark~~~~~~”,主程序在pool.join()处等待各种进度的截至。

     

    例7.2:使用进度池(阻塞)

    #coding: utf-8
    import multiprocessing
    import time
    
    def func(msg):
        print "msg:", msg
        time.sleep(3)
        print "end"
    
    if __name__ == "__main__":
        pool = multiprocessing.Pool(processes = 3)
        for i in xrange(4):
            msg = "hello %d" %(i)
            pool.apply(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
    
        print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"
        pool.close()
        pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
        print "Sub-process(es) done."
    

    一遍试行的结果

    msg: hello 0
    end
    msg: hello 1
    end
    msg: hello 2
    end
    msg: hello 3
    end
    Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
    Sub-process(es) done.
    

      

    例7.3:使用进度池,并关注结果

    import multiprocessing
    import time
    
    def func(msg):
        print "msg:", msg
        time.sleep(3)
        print "end"
        return "done"   msg
    
    if __name__ == "__main__":
        pool = multiprocessing.Pool(processes=4)
        result = []
        for i in xrange(3):
            msg = "hello %d" %(i)
            result.append(pool.apply_async(func, (msg, )))
        pool.close()
        pool.join()
        for res in result:
            print ":::", res.get()
        print "Sub-process(es) done."
    

    一遍推行结果

    msg: hello 0
    msg: hello 1
    msg: hello 2
    end
    end
    end
    ::: donehello 0
    ::: donehello 1
    ::: donehello 2
    Sub-process(es) done.
    

     

    例7.4:使用八个进度池

    #coding: utf-8
    import multiprocessing
    import os, time, random
    
    def Lee():
        print "nRun task Lee-%s" %(os.getpid()) #os.getpid()获取当前的进程的ID
        start = time.time()
        time.sleep(random.random() * 10) #random.random()随机生成0-1之间的小数
        end = time.time()
        print 'Task Lee, runs %0.2f seconds.' %(end - start)
    
    def Marlon():
        print "nRun task Marlon-%s" %(os.getpid())
        start = time.time()
        time.sleep(random.random() * 40)
        end=time.time()
        print 'Task Marlon runs %0.2f seconds.' %(end - start)
    
    def Allen():
        print "nRun task Allen-%s" %(os.getpid())
        start = time.time()
        time.sleep(random.random() * 30)
        end = time.time()
        print 'Task Allen runs %0.2f seconds.' %(end - start)
    
    def Frank():
        print "nRun task Frank-%s" %(os.getpid())
        start = time.time()
        time.sleep(random.random() * 20)
        end = time.time()
        print 'Task Frank runs %0.2f seconds.' %(end - start)
    
    if __name__=='__main__':
        function_list=  [Lee, Marlon, Allen, Frank] 
        print "parent process %s" %(os.getpid())
    
        pool=multiprocessing.Pool(4)
        for func in function_list:
            pool.apply_async(func)     #Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中
    
        print 'Waiting for all subprocesses done...'
        pool.close()
        pool.join()    #调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束
        print 'All subprocesses done.'
    

    三回施行结果

    parent process 7704
    
    Waiting for all subprocesses done...
    Run task Lee-6948
    
    Run task Marlon-2896
    
    Run task Allen-7304
    
    Run task Frank-3052
    Task Lee, runs 1.59 seconds.
    Task Marlon runs 8.48 seconds.
    Task Frank runs 15.68 seconds.
    Task Allen runs 18.08 seconds.
    All subprocesses done.
    

    4. Event

    Event用来完结进程间共同通讯。

    图片 32

    import multiprocessing
    import time
    
    def wait_for_event(e):
        print("wait_for_event: starting")
        e.wait()
        print("wairt_for_event: e.is_set()->"   str(e.is_set()))
    
    def wait_for_event_timeout(e, t):
        print("wait_for_event_timeout:starting")
        e.wait(t)
        print("wait_for_event_timeout:e.is_set->"   str(e.is_set()))
    
    if __name__ == "__main__":
        e = multiprocessing.Event()
        w1 = multiprocessing.Process(name = "block",
                target = wait_for_event,
                args = (e,))
    
        w2 = multiprocessing.Process(name = "non-block",
                target = wait_for_event_timeout,
                args = (e, 2))
        w1.start()
        w2.start()
    
        time.sleep(3)
    
        e.set()
        print("main: event is set")
    

    图片 33

    结果

    1
    2
    3
    4
    5
    wait_for_event: starting
    wait_for_event_timeout:starting
    wait_for_event_timeout:e.is_set->False
    main: event is set
    wairt_for_event: e.is_set()->True

     

    单进程:

    3. Semaphore

    Semaphore用来支配成对分享能源的探问数量,举个例子池的最阿比让接数。

    图片 34

    import multiprocessing
    import time
    
    def worker(s, i):
        s.acquire()
        print(multiprocessing.current_process().name   "acquire");
        time.sleep(i)
        print(multiprocessing.current_process().name   "releasen");
        s.release()
    
    if __name__ == "__main__":
        s = multiprocessing.Semaphore(2)
        for i in range(5):
            p = multiprocessing.Process(target = worker, args=(s, i*2))
            p.start()
    

    图片 35

    结果

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    Process-1acquire
    Process-1release
     
    Process-2acquire
    Process-3acquire
    Process-2release
     
    Process-5acquire
    Process-3release
     
    Process-4acquire
    Process-5release
     
    Process-4release

     

    重回顶上部分

    单进程:

    5. Queue

    Queue是多进度安全的连串,可以行使Queue完结多进度之间的多寡传递。put方法用以插入数据到行列中,put方法还或然有多少个可选参数:blocked和timeout。假如blocked为True(默许值),况兼timeout为正在,该方法会阻塞timeout钦点的时日,直到该队列有剩余的空间。假如超时,会抛出Queue.Full相当。假使blocked为False,但该Queue已满,会马上抛出Queue.Full万分。

     

    get方法能够从队列读取並且删除一个元素。同样,get方法有七个可选参数:blocked和timeout。若是blocked为True(暗中认可值),并且timeout为正值,那么在等待时间内并未有取到任何因素,会抛出Queue.Empty非常。即便blocked为False,有三种意况存在,假设Queue有贰个值可用,则即时回去该值,不然,借使队列为空,则登时抛出Queue.Empty极度。Queue的一段示例代码:

    图片 36

    import multiprocessing
    
    def writer_proc(q):      
        try:         
            q.put(1, block = False) 
        except:         
            pass   
    
    def reader_proc(q):      
        try:         
            print(q.get(block = False) )
        except:         
            pass
    
    if __name__ == "__main__":
        q = multiprocessing.Queue()
        writer = multiprocessing.Process(target=writer_proc, args=(q,))  
        writer.start()   
    
        reader = multiprocessing.Process(target=reader_proc, args=(q,))  
        reader.start()  
    
        reader.join()  
        writer.join()
    

    图片 37

    结果

    1
    1

     

     1 import multiprocessing
     2 import time
     3 def worker(interval):
     4     n=5
     5     while n > 0:
     6         print("The time is {0}".format(time.ctime()))
     7         time.sleep(interval)
     8         n -=1
     9 
    10 if __name__ == "__main__":
    11     p = multiprocessing.Process(target=worker,args=(3,))
    12     p.start()
    13     print("p.pid:",p.pid)
    14     print("p.name:",p.name)
    15     print("p.is_alive:",p.is_alive())
    

    4. Event

    Event用来贯彻进程间协同通讯。

    图片 38

    import multiprocessing
    import time
    
    def wait_for_event(e):
        print("wait_for_event: starting")
        e.wait()
        print("wairt_for_event: e.is_set()->"   str(e.is_set()))
    
    def wait_for_event_timeout(e, t):
        print("wait_for_event_timeout:starting")
        e.wait(t)
        print("wait_for_event_timeout:e.is_set->"   str(e.is_set()))
    
    if __name__ == "__main__":
        e = multiprocessing.Event()
        w1 = multiprocessing.Process(name = "block",
                target = wait_for_event,
                args = (e,))
    
        w2 = multiprocessing.Process(name = "non-block",
                target = wait_for_event_timeout,
                args = (e, 2))
        w1.start()
        w2.start()
    
        time.sleep(3)
    
        e.set()
        print("main: event is set")
    

    图片 39

    结果

    1
    2
    3
    4
    5
    wait_for_event: starting
    wait_for_event_timeout:starting
    wait_for_event_timeout:e.is_set->False
    main: event is set
    wairt_for_event: e.is_set()->True

     

    回到最上端

     1 import multiprocessing 2 import time 3 def worker: 4     n=5 5     while n > 0: 6         print("The time is {0}".format(time.ctime 7         time.sleep 8         n -=1 9 10 if __name__ == "__main__":11     p = multiprocessing.Process(target=worker,args=(3,))12     p.start()13     print("p.pid:",p.pid)14     print("p.name:",p.name)15     print("p.is_alive:",p.is_alive
    

    6. Pipe

    Pipe方法再次回到(conn1, conn2)代表贰个管道的多个端。Pipe方法有duplex参数,假若duplex参数为True(暗中认可值),那么这么些管道是全双工情势,相当于说conn1和conn2均可收发。duplex为False,conn1只担负接受消息,conn2只担任发送新闻。

     

    send和recv方法分别是出殡和埋葬和承受信息的措施。比如,在全双工情势下,能够调用conn1.send出殡和埋葬消息,conn1.recv接收信息。若无消息可收到,recv方法会一向不通。假若管道已经被关闭,那么recv方法会抛出EOFError。

    图片 40

    import multiprocessing
    import time
    
    def proc1(pipe):
        while True:
            for i in xrange(10000):
                print("send: %s" %(i))
                pipe.send(i)
                time.sleep(1)
    
    def proc2(pipe):
        while True:
            print("proc2 rev:", pipe.recv())
            time.sleep(1)
    
    def proc3(pipe):
        while True:
            print("PROC3 rev:", pipe.recv())
            time.sleep(1)
    
    if __name__ == "__main__":
        pipe = multiprocessing.Pipe()
        p1 = multiprocessing.Process(target=proc1, args=(pipe[0],))
        p2 = multiprocessing.Process(target=proc2, args=(pipe[1],))
        #p3 = multiprocessing.Process(target=proc3, args=(pipe[1],))
    
        p1.start()
        p2.start()
        #p3.start()
    
        p1.join()
        p2.join()
        #p3.join()
    

    图片 41

    结果

    图片 42

     

    多进程:

    5. Queue

    Queue是多进度安全的系列,可以利用Queue达成多进程之间的数码传递。put方法用以插入数据到行列中,put方法还应该有八个可选参数:blocked和timeout。假使blocked为True(私下认可值),而且timeout为正值,该方法会阻塞timeout内定的时间,直到该队列有盈余的空中。要是超时,会抛出Queue.Full万分。假若blocked为False,但该Queue已满,会立时抛出Queue.Full分外。

     

    get方法能够从队列读取並且删除三个成分。一样,get方法有四个可选参数:blocked和timeout。借使blocked为True(暗许值),并且timeout为正值,那么在等候时间内尚未取到任何因素,会抛出Queue.Empty格外。假设blocked为False,有二种情景存在,假诺Queue有二个值可用,则立即赶回该值,不然,假设队列为空,则即时抛出Queue.Empty格外。Queue的一段示例代码:

    图片 43

    import multiprocessing
    
    def writer_proc(q):      
        try:         
            q.put(1, block = False) 
        except:         
            pass   
    
    def reader_proc(q):      
        try:         
            print q.get(block = False) 
        except:         
            pass
    
    if __name__ == "__main__":
        q = multiprocessing.Queue()
        writer = multiprocessing.Process(target=writer_proc, args=(q,))  
        writer.start()   
    
        reader = multiprocessing.Process(target=reader_proc, args=(q,))  
        reader.start()  
    
        reader.join()  
        writer.join()
    

    图片 44

    结果

    1
    1

     

    归来最上部

    多进程:

    7. Pool

    在应用Python举办系统管理的时候,非常是同期操作三个文件目录,可能远程序调节制多台主机,并行操作能够省去大批量的流年。当被操作对象数目相当的小时,能够直接利用multiprocessing中的Process动态成生五个经过,二十一个万幸,但只若是无数个,上千个对象,手动的去界定进度数量却又太过繁琐,此时可以发表进度池的遵从。
    Pool能够提供钦点数量的进度,供用户调用,当有新的央求提交到pool中时,倘使池还未曾满,那么就能够成立二个新的历程用来施行该哀告;但倘使池中的进程数一度高达规定最大值,那么该伏乞就能等待,直到池中有经过结束,才会创制新的经过来它。

     

    例7.1:使用进度池(非阻塞)

    图片 45

    #coding: utf-8
    import multiprocessing
    import time
    
    def func(msg):
        print("msg:", msg)
        time.sleep(3)
        print "end"
    
    if __name__ == "__main__":
        pool = multiprocessing.Pool(processes = 3)
        for i in xrange(4):
            msg = "hello %d" %(i)
            pool.apply_async(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
    
        print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
        pool.close()
        pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
        print("Sub-process(es) done.")
    

    图片 46

    一遍实行结果

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~ello 0
     
    msg: hello 1
    msg: hello 2
    end
    msg: hello 3
    end
    end
    end
    Sub-process(es) done.

    函数解释:

    • apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的(精通分歧,看例1例2结实不一致)
    • close()    关闭pool,使其不在接受新的天职。
    • terminate()    结束专门的学问经过,不在管理未到位的天职。
    • join()    主进度阻塞,等待子进度的淡出, join方法要在close或terminate之后接纳。

    进行表达:创设一个经过池pool,并设定进度的数目为3,xrange(4)会挨个发出八个目的[0, 1, 2, 4],三个指标被交付到pool中,因pool钦定进度数为3,所以0、1、2会直接送到进程中实践,当在那之中一个奉行到位后才空出一个进度管理对象3,所以相会世出口“msg: hello 3”出现在"end"后。因为为非阻塞,主函数会自身施行自个的,不搭理进度的进行,所以运维完for循环后平素出口“mMsg: hark~ Mark~ Mark~~~~~~”,主程序在pool.join()处等待种种进度的扫尾。

     

    例7.2:使用过程池(阻塞)

    图片 47

    #coding: utf-8
    import multiprocessing
    import time
    
    def func(msg):
        print("msg:", msg)
        time.sleep(3)
        print("end")
    
    if __name__ == "__main__":
        pool = multiprocessing.Pool(processes = 3)
        for i in xrange(4):
            msg = "hello %d" %(i)
            pool.apply(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
    
        print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
        pool.close()
        pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
        print("Sub-process(es) done.")
    

    图片 48

    二遍实行的结果

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    msg: hello 0
    end
    msg: hello 1
    end
    msg: hello 2
    end
    msg: hello 3
    end
    Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
    Sub-process(es) done.

      

    例7.3:使用进程池,并关注结果

    图片 49

    import multiprocessing
    import time
    
    def func(msg):
        print("msg:", msg)
        time.sleep(3)
        print("end")
        return "done"   msg
    
    if __name__ == "__main__":
        pool = multiprocessing.Pool(processes=4)
        result = []
        for i in xrange(3):
            msg = "hello %d" %(i)
            result.append(pool.apply_async(func, (msg, )))
        pool.close()
        pool.join()
        for res in result:
            print(":::", res.get())
        print(Sub-process(es) done.")
    

    图片 50

    三遍施行结果

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    msg: hello 0
    msg: hello 1
    msg: hello 2
    end
    end
    end
    ::: donehello 0
    ::: donehello 1
    ::: donehello 2
    Sub-process(es) done.

     

    例7.4:使用八个进度池

    图片 51

    #coding: utf-8
    import multiprocessing
    import os, time, random
    
    def Lee():
        print("nRun task Lee-%s" %(os.getpid())) #os.getpid()获取当前的进程的ID
        start = time.time()
        time.sleep(random.random() * 10) #random.random()随机生成0-1之间的小数
        end = time.time()
        print('Task Lee, runs %0.2f seconds.' %(end - start))
    
    def Marlon():
        print("nRun task Marlon-%s" %(os.getpid()))
        start = time.time()
        time.sleep(random.random() * 40)
        end=time.time()
        print('Task Marlon runs %0.2f seconds.' %(end - start))
    
    def Allen():
        print("nRun task Allen-%s" %(os.getpid()))
        start = time.time()
        time.sleep(random.random() * 30)
        end = time.time()
        print('Task Allen runs %0.2f seconds.' %(end - start))
    
    def Frank():
        print("nRun task Frank-%s" %(os.getpid()))
        start = time.time()
        time.sleep(random.random() * 20)
        end = time.time()
        print('Task Frank runs %0.2f seconds.' %(end - start))
    
    if __name__=='__main__':
        function_list=  [Lee, Marlon, Allen, Frank] 
        print("parent process %s" %(os.getpid()))
    
        pool=multiprocessing.Pool(4)
        for func in function_list:
            pool.apply_async(func)     #Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中
    
        print('Waiting for all subprocesses done...')
        pool.close()
        pool.join()    #调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束
        print('All subprocesses done.')
    

    图片 52

    一次实施结果

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    parent process 7704
     
    Waiting for all subprocesses done...
    Run task Lee-6948
     
    Run task Marlon-2896
     
    Run task Allen-7304
     
    Run task Frank-3052
    Task Lee, runs 1.59 seconds.
    Task Marlon runs 8.48 seconds.
    Task Frank runs 15.68 seconds.
    Task Allen runs 18.08 seconds.
    All subprocesses done.
     1 import multiprocessing
     2 import time
     3 
     4 def worker_1(interval):
     5     print ("worker_1")
     6     time.sleep(interval)
     7     print ("end worker_1")
     8 
     9 def worker_2(interval):
    10     print ("worker_2")
    11     time.sleep(interval)
    12     print ("end worker_2")
    13 
    14 def worker_3(interval):
    15     print ("worker_3")
    16     time.sleep(interval)
    17     print ("end worker_3")
    18 
    19 if __name__ == "__main__":
    20     p1 = multiprocessing.Process(target = worker_1, args = (2,))
    21     p2 = multiprocessing.Process(target = worker_2, args = (3,))
    22     p3 = multiprocessing.Process(target = worker_3, args = (4,))
    23 
    24     p1.start()
    25     p2.start()
    26     p3.start()
    27     # 用来获得当前的CPU的核数,可以用来设置接下来子进程的个数。
    28     # 用来获得当前所有的子进程,包括daemon和非daemon子进程。
    29     # p.name,p.pid分别表示进程的名字,进程id。 
    30     print("The number of CPU is:"   str(multiprocessing.cpu_count()))
    31     for p in multiprocessing.active_children():
    32         print("child   p.name:"   p.name   "tp.id"   str(p.pid))
    33     print ("END!!!!!!!!!!!!!!!!!")
    

    6. Pipe

    Pipe方法重返(conn1, conn2)代表一个管道的多少个端。Pipe方法有duplex参数,借使duplex参数为True(暗中认可值),那么这一个管道是全双工格局,也等于说conn1和conn2均可收发。duplex为False,conn1只肩负接受音信,conn2只承担发送信息。

     

    send和recv方法分别是发送和接受新闻的艺术。举个例子,在全双工方式下,能够调用conn1.send发送信息,conn1.recv接收音信。若无音信可接收,recv方法会一贯不通。假若管道已经被关闭,那么recv方法会抛出EOFError。

    图片 53

    import multiprocessing
    import time
    
    def proc1(pipe):
        while True:
            for i in xrange(10000):
                print "send: %s" %(i)
                pipe.send(i)
                time.sleep(1)
    
    def proc2(pipe):
        while True:
            print "proc2 rev:", pipe.recv()
            time.sleep(1)
    
    def proc3(pipe):
        while True:
            print "PROC3 rev:", pipe.recv()
            time.sleep(1)
    
    if __name__ == "__main__":
        pipe = multiprocessing.Pipe()
        p1 = multiprocessing.Process(target=proc1, args=(pipe[0],))
        p2 = multiprocessing.Process(target=proc2, args=(pipe[1],))
        #p3 = multiprocessing.Process(target=proc3, args=(pipe[1],))
    
        p1.start()
        p2.start()
        #p3.start()
    
        p1.join()
        p2.join()
        #p3.join()
    

    图片 54

    结果

    图片 55

     

    回到顶上部分

     1 import multiprocessing 2 import time 3  4 def worker_1: 5     print ("worker_1") 6     time.sleep 7     print ("end worker_1") 8  9 def worker_2:10     print ("worker_2")11     time.sleep12     print ("end worker_2")13 14 def worker_3:15     print ("worker_3")16     time.sleep17     print ("end worker_3")18 19 if __name__ == "__main__":20     p1 = multiprocessing.Process(target = worker_1, args = (2,))21     p2 = multiprocessing.Process(target = worker_2, args = (3,))22     p3 = multiprocessing.Process(target = worker_3, args = (4,))23 24     p1.start()25     p2.start()26     p3.start()27     # 用来获得当前的CPU的核数,可以用来设置接下来子进程的个数。28     # 用来获得当前所有的子进程,包括daemon和非daemon子进程。29     # p.name,p.pid分别表示进程的名字,进程id。 30     print("The number of CPU is:"   str(multiprocessing.cpu_count31     for p in multiprocessing.active_children():32         print("child   p.name:"   p.name   "tp.id"   str33     print ("END!!!!!!!!!!!!!!!!!")
    

    将经过定义为类:

    7. Pool

    在利用Python进行系统一管理理的时候,非常是同期操作四个文件目录,可能远程序调节制多台主机,并行操作能够省去一大波的时日。当被操作对象数目相当小时,能够平素运用multiprocessing中的Process动态成生四个进度,二十一个幸好,但如若是成都百货上千个,上千个指标,手动的去界定进程数量却又太过繁琐,此时能够表明进度池的效率。
    Pool能够提供钦命数量的进度,供用户调用,当有新的乞求提交到pool中时,假使池还不曾满,那么就能够创制三个新的历程用来实施该诉求;但若是池中的过程数已经到达规定最大值,那么该央求就能够等待,直到池中有进度甘休,才会制造新的进程来它。

     

    例7.1:使用进度池

    图片 56

    #coding: utf-8
    import multiprocessing
    import time
    
    def func(msg):
        print "msg:", msg
        time.sleep(3)
        print "end"
    
    if __name__ == "__main__":
        pool = multiprocessing.Pool(processes = 3)
        for i in xrange(4):
            msg = "hello %d" %(i)
            pool.apply_async(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
    
        print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"
        pool.close()
        pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
        print "Sub-process(es) done."
    

    图片 57

    一回施行结果

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~ello 0
     
    msg: hello 1
    msg: hello 2
    end
    msg: hello 3
    end
    end
    end
    Sub-process(es) done.

    函数解释:

    • apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的(驾驭分化,看例1例2结实分裂)
    • close()    关闭pool,使其不在接受新的天职。
    • terminate()    甘休职业历程,不在管理未到位的天职。
    • join()    主进程阻塞,等待子进程的淡出, join方法要在close或terminate之后采纳。

    进行表达:创制一个历程池pool,并设定进度的数目为3,xrange(4)会相继发生四个目标[0, 1, 2, 4],四个对象被交付到pool中,因pool钦点进度数为3,所以0、1、2会直接送到进度中实施,当当中二个施行到位后才空出二个进度管理对象3,所以会晤世出口“msg: hello 3”出以往"end"后。因为为非阻塞,主函数会自个儿实行自个的,不搭理进程的进行,所以运维完for循环后从来出口“mMsg: hark~ Mark~ Mark~~~~~~”,主程序在pool.join()处等待各个进度的甘休。

     

    例7.2:使用进度池(阻塞)

    图片 58

    #coding: utf-8
    import multiprocessing
    import time
    
    def func(msg):
        print "msg:", msg
        time.sleep(3)
        print "end"
    
    if __name__ == "__main__":
        pool = multiprocessing.Pool(processes = 3)
        for i in xrange(4):
            msg = "hello %d" %(i)
            pool.apply(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
    
        print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"
        pool.close()
        pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
        print "Sub-process(es) done."
    

    图片 59

    一回实施的结果

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    msg: hello 0
    end
    msg: hello 1
    end
    msg: hello 2
    end
    msg: hello 3
    end
    Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
    Sub-process(es) done.

      

    例7.3:使用进程池,并关切结果

    图片 60

    import multiprocessing
    import time
    
    def func(msg):
        print "msg:", msg
        time.sleep(3)
        print "end"
        return "done"   msg
    
    if __name__ == "__main__":
        pool = multiprocessing.Pool(processes=4)
        result = []
        for i in xrange(3):
            msg = "hello %d" %(i)
            result.append(pool.apply_async(func, (msg, )))
        pool.close()
        pool.join()
        for res in result:
            print ":::", res.get()
        print "Sub-process(es) done."
    

    图片 61

    一遍实行结果

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    msg: hello 0
    msg: hello 1
    msg: hello 2
    end
    end
    end
    ::: donehello 0
    ::: donehello 1
    ::: donehello 2
    Sub-process(es) done.

     

    例7.4:使用多个进度池

    图片 62

    #coding: utf-8
    import multiprocessing
    import os, time, random
    
    def Lee():
        print "nRun task Lee-%s" %(os.getpid()) #os.getpid()获取当前的进程的ID
        start = time.time()
        time.sleep(random.random() * 10) #random.random()随机生成0-1之间的小数
        end = time.time()
        print 'Task Lee, runs %0.2f seconds.' %(end - start)
    
    def Marlon():
        print "nRun task Marlon-%s" %(os.getpid())
        start = time.time()
        time.sleep(random.random() * 40)
        end=time.time()
        print 'Task Marlon runs %0.2f seconds.' %(end - start)
    
    def Allen():
        print "nRun task Allen-%s" %(os.getpid())
        start = time.time()
        time.sleep(random.random() * 30)
        end = time.time()
        print 'Task Allen runs %0.2f seconds.' %(end - start)
    
    def Frank():
        print "nRun task Frank-%s" %(os.getpid())
        start = time.time()
        time.sleep(random.random() * 20)
        end = time.time()
        print 'Task Frank runs %0.2f seconds.' %(end - start)
    
    if __name__=='__main__':
        function_list=  [Lee, Marlon, Allen, Frank] 
        print "parent process %s" %(os.getpid())
    
        pool=multiprocessing.Pool(4)
        for func in function_list:
            pool.apply_async(func)     #Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中
    
        print 'Waiting for all subprocesses done...'
        pool.close()
        pool.join()    #调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束
        print 'All subprocesses done.'
    

    图片 63

    叁次实践结果

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    parent process 7704
     
    Waiting for all subprocesses done...
    Run task Lee-6948
     
    Run task Marlon-2896
     
    Run task Allen-7304
     
    Run task Frank-3052
    Task Lee, runs 1.59 seconds.
    Task Marlon runs 8.48 seconds.
    Task Frank runs 15.68 seconds.
    Task Allen runs 18.08 seconds.
    All subprocesses done.

    将经过定义为类:

     1 import multiprocessing
     2 import time
     3 
     4 class ClockProcess(multiprocessing.Process):
     5     def __init__(self, interval):
     6         multiprocessing.Process.__init__(self)
     7         self.interval = interval
     8 
     9     def run(self):
    10         n = 5
    11         while n > 0:
    12             print("the time is {0}".format(time.ctime()))
    13             time.sleep(self.interval)
    14             n -= 1
    15 
    16 if __name__ == '__main__':
    17     p = ClockProcess(3)
    18     p.start()
    
     1 import multiprocessing 2 import time 3  4 class ClockProcess(multiprocessing.Process): 5     def __init__(self, interval): 6         multiprocessing.Process.__init__ 7         self.interval = interval 8  9     def run:10         n = 511         while n > 0:12             print("the time is {0}".format(time.ctime13             time.sleep(self.interval)14             n -= 115 16 if __name__ == '__main__':17     p = ClockProcess(3)18     p.start()
    

    daemon程序比较结果:
    1.不加daemon

    daemon程序相比结果:
    1.不加daemon

     1 import multiprocessing
     2 import time
     3 
     4 def worker(interval):
     5     print("work start:{0}".format(time.ctime()));
     6     time.sleep(interval)
     7     print("work end:{0}".format(time.ctime()));
     8 
     9 if __name__ == "__main__":
    10     p = multiprocessing.Process(target = worker, args = (3,))
    11     p.start()
    12     print ("end!")
    13 
    14 #程序运行结果
    15 '''
    16 end!
    17 work start:Wed Jun 28 00:07:57 2017
    18 work end:Wed Jun 28 00:08:00 2017
    19 '''
    
     1 import multiprocessing 2 import time 3  4 def worker: 5     print("work start:{0}".format(time.ctime; 6     time.sleep 7     print("work end:{0}".format(time.ctime; 8  9 if __name__ == "__main__":10     p = multiprocessing.Process(target = worker, args = (3,))11     p.start()12     print ("end!")13 14 #程序运行结果15 '''16 end!17 work start:Wed Jun 28 00:07:57 201718 work end:Wed Jun 28 00:08:00 201719 '''
    

    2.加daemon

    2.加daemon

     1 import multiprocessing
     2 import time
     3 
     4 def worker(interval):
     5     print("work start:{0}".format(time.ctime()));
     6     time.sleep(interval)
     7     print("work end:{0}".format(time.ctime()));
     8 
     9 if __name__ == "__main__":
    10     p = multiprocessing.Process(target = worker, args = (3,))
    11     p.daemon = True
    12     p.start()
    13     print ("end!")
    14 
    15 #程序运行结果
    16 '''
    17 end!
    18 
    19 '''
    
     1 import multiprocessing 2 import time 3  4 def worker: 5     print("work start:{0}".format(time.ctime; 6     time.sleep 7     print("work end:{0}".format(time.ctime; 8  9 if __name__ == "__main__":10     p = multiprocessing.Process(target = worker, args = (3,))11     p.daemon = True12     p.start()13     print ("end!")14 15 #程序运行结果16 '''17 end!18 19 '''
    

    PS:因子进程设置了daemon属性,主进度截止,它们就趁早停止了。
    3.装置daemon试行完甘休的方法

    PS:因子进度设置了daemon属性,主进度甘休,它们就趁着截至了。
    3.装置daemon推行完停止的章程

     1 import multiprocessing
     2 import time
     3 
     4 def worker(interval):
     5     print("work start:{0}".format(time.ctime()));
     6     time.sleep(interval)
     7     print("work end:{0}".format(time.ctime()));
     8 
     9 if __name__ == "__main__":
    10     p = multiprocessing.Process(target = worker, args = (3,))
    11     p.daemon = True
    12     p.start()
    13     p.join()
    14     print "end!"
    15 
    16 # 结果
    17 '''
    18 work start:Tue Apr 21 22:16:32 2015
    19 work end:Tue Apr 21 22:16:35 2015
    20 end!
    21 '''
    
     1 import multiprocessing 2 import time 3  4 def worker: 5     print("work start:{0}".format(time.ctime; 6     time.sleep 7     print("work end:{0}".format(time.ctime; 8  9 if __name__ == "__main__":10     p = multiprocessing.Process(target = worker, args = (3,))11     p.daemon = True12     p.start()13     p.join()14     print "end!"15 16 # 结果17 '''18 work start:Tue Apr 21 22:16:32 201519 work end:Tue Apr 21 22:16:35 201520 end!21 '''
    

    2、Lock
    当多少个经过要求访谈分享财富的时候,Lock能够用来幸免访谈的冲突。

    2、Lock
    当七个经过须求访问分享能源的时候,Lock能够用来防止访谈的争辩。

     1 import multiprocessing
     2 import sys
     3 
     4 def worker_with(lock, f):
     5     with lock:
     6         fs = open(f, 'a ')
     7         n = 10
     8         while n > 1:
     9             fs.write("Lockd acquired via withn")
    10             n -= 1
    11         fs.close()
    12         
    13 def worker_no_with(lock, f):
    14     lock.acquire()
    15     try:
    16         fs = open(f, 'a ')
    17         n = 10
    18         while n > 1:
    19             fs.write("Lock acquired directlyn")
    20             n -= 1
    21         fs.close()
    22     finally:
    23         lock.release()
    24     
    25 if __name__ == "__main__":
    26     lock = multiprocessing.Lock()
    27     f = "file.txt"
    28     w = multiprocessing.Process(target = worker_with, args=(lock, f))
    29     nw = multiprocessing.Process(target = worker_no_with, args=(lock, f))
    30     w.start()
    31     nw.start()
    32     print ("end")
    
     1 import multiprocessing 2 import sys 3  4 def worker_with: 5     with lock: 6         fs = open(f, 'a ') 7         n = 10 8         while n > 1: 9             fs.write("Lockd acquired via withn")10             n -= 111         fs.close()12         13 def worker_no_with:14     lock.acquire()15     try:16         fs = open(f, 'a ')17         n = 1018         while n > 1:19             fs.write("Lock acquired directlyn")20             n -= 121         fs.close()22     finally:23         lock.release()24     25 if __name__ == "__main__":26     lock = multiprocessing.Lock()27     f = "file.txt"28     w = multiprocessing.Process(target = worker_with, args=29     nw = multiprocessing.Process(target = worker_no_with, args=30     w.start()31     nw.start()32     print ("end")
    

    3、Semaphore
    Semaphore用来支配成对分享财富的拜谒数量,比方池的最大连接数。

    3、Semaphore
    Semaphore用来支配成对共享财富的访谈数量,比如池的最第Billy斯接数。

     1 import multiprocessing
     2 import time
     3 
     4 def worker(s, i):
     5     s.acquire()
     6     print(multiprocessing.current_process().name   "acquire")
     7     time.sleep(i)
     8     print(multiprocessing.current_process().name   "releasen")
     9     s.release()
    10 
    11 if __name__ == "__main__":
    12     s = multiprocessing.Semaphore(2)   # 限制最多有两个进程同时执行
    13     for i in range(5):
    14         p = multiprocessing.Process(target = worker, args=(s, i*2))
    15         p.start()
    
     1 import multiprocessing 2 import time 3  4 def worker: 5     s.acquire() 6     print(multiprocessing.current_process().name   "acquire") 7     time.sleep 8     print(multiprocessing.current_process().name   "releasen") 9     s.release()10 11 if __name__ == "__main__":12     s = multiprocessing.Semaphore   # 限制最多有两个进程同时执行13     for i in range(5):14         p = multiprocessing.Process(target = worker, args=(s, i*2))15         p.start()
    

    运行结果:

    运作结果:

     1 Process-4acquire
     2 Process-2acquire
     3 Process-2release
     4 
     5 Process-1acquire
     6 Process-1release
     7 
     8 Process-3acquire
     9 Process-4release
    10 
    11 Process-5acquire
    12 Process-3release
    13 
    14 Process-5release
    
     1 Process-4acquire 2 Process-2acquire 3 Process-2release 4  5 Process-1acquire 6 Process-1release 7  8 Process-3acquire 9 Process-4release10 11 Process-5acquire12 Process-3release13 14 Process-5release
    

    4、Event
    伊夫nt完成进度间协同通讯

    4、Event
    Event落到实处进度间共同通讯

     1 import multiprocessing
     2 import time
     3 
     4 def wait_for_event(e):
     5     print("wait_for_event: starting")
     6     e.wait()
     7     print("wairt_for_event: e.is_set()->"   str(e.is_set()))
     8 
     9 def wait_for_event_timeout(e, t):
    10     print("wait_for_event_timeout:starting")
    11     e.wait(t)
    12     print("wait_for_event_timeout:e.is_set->"   str(e.is_set()))
    13 
    14 if __name__ == "__main__":
    15     e = multiprocessing.Event()
    16     w1 = multiprocessing.Process(name = "block",
    17             target = wait_for_event,
    18             args = (e,))
    19 
    20     w2 = multiprocessing.Process(name = "non-block",
    21             target = wait_for_event_timeout,
    22             args = (e, 2))
    23     w1.start()
    24     w2.start()
    25 
    26     time.sleep(3)
    27 
    28     e.set()
    29     print("main: event is set")
    30 
    31 # 运行结果
    32 '''
    33 
    34 wait_for_event: starting
    35 
    36 wait_for_event_timeout:starting
    37 
    38 wait_for_event_timeout:e.is_set->False
    39 
    40 main: event is set
    41 
    42 wairt_for_event: e.is_set()->True
    43 
    44 '''
    
     1 import multiprocessing 2 import time 3  4 def wait_for_event: 5     print("wait_for_event: starting") 6     e.wait() 7     print("wairt_for_event: e.is_set()->"   str(e.is_set 8  9 def wait_for_event_timeout:10     print("wait_for_event_timeout:starting")11     e.wait12     print("wait_for_event_timeout:e.is_set->"   str(e.is_set13 14 if __name__ == "__main__":15     e = multiprocessing.Event()16     w1 = multiprocessing.Process(name = "block",17             target = wait_for_event,18             args = 19 20     w2 = multiprocessing.Process(name = "non-block",21             target = wait_for_event_timeout,22             args = (e, 2))23     w1.start()24     w2.start()25 26     time.sleep(3)27 28     e.set()29     print("main: event is set")30 31 # 运行结果32 '''33 34 wait_for_event: starting35 36 wait_for_event_timeout:starting37 38 wait_for_event_timeout:e.is_set->False39 40 main: event is set41 42 wairt_for_event: e.is_set()->True43 44 '''
    

    5、Queue

    5、Queue

    Queue是多进程安全的行列,能够行使Queue落成多进度之间的数额传递。put方法用以插入数据到行列中,put方法还会有三个可选参数:blocked和timeout。假诺blocked为True(暗中认可值),并且timeout为正值,该方法会阻塞timeout钦命的年华,直到该队列有盈余的半空中。尽管超时,会抛出Queue.Full非常。若是blocked为False,但该Queue已满,会立刻抛出Queue.Full非常。

    Queue是多进度安全的行列,可以选拔Queue实现多进度之间的数量传递。put方法用以插入数据到行列中,put方法还会有五个可选参数:blocked和timeout。假使blocked为True,并且timeout为正在,该方法会阻塞timeout钦赐的时日,直到该队列有剩余的上空。要是超时,会抛出Queue.Full卓殊。要是blocked为False,但该Queue已满,会及时抛出Queue.Full格外。get方法能够从队列读取而且删除二个成分。一样,get方法有四个可选参数:blocked和timeout。假设blocked为True,况且timeout为正在,那么在等候时间内尚未取到任何因素,会抛出Queue.Empty分外。如若blocked为False,有两种状态存在,假设Queue有三个值可用,则随即回到该值,不然,如若队列为空,则立即抛出Queue.Empty极度。

     

     1 import multiprocessing 2 def writer_proc: 3     try: 4         q.put(1, block = False) 5     except: 6         pass 7  8 def reader_proc: 9     try:10         print (q.get(block = False))11     except:12         pass13 14 if __name__ == "__main__":15     q = multiprocessing.Queue()16     writer = multiprocessing.Process(target=writer_proc, args=17     writer.start()18 19     reader = multiprocessing.Process(target=reader_proc, args=20     reader.start()21 22     reader.join()23     writer.join()24 25 # 运行结果26 # 1
    

    get方法能够从队列读取并且删除一个成分。同样,get方法有四个可选参数:blocked和timeout。若是blocked为True(私下认可值),並且timeout为正在,那么在伺机时间内并未有取到任何因素,会抛出Queue.Empty非常。倘诺blocked为False,有三种意况存在,即使Queue有一个值可用,则即时重返该值,不然,假诺队列为空,则马上抛出Queue.Empty分外。

    6、Pipe

     1 import multiprocessing
     2 def writer_proc(q):
     3     try:
     4         q.put(1, block = False)
     5     except:
     6         pass
     7 
     8 def reader_proc(q):
     9     try:
    10         print (q.get(block = False))
    11     except:
    12         pass
    13 
    14 if __name__ == "__main__":
    15     q = multiprocessing.Queue()
    16     writer = multiprocessing.Process(target=writer_proc, args=(q,))
    17     writer.start()
    18 
    19     reader = multiprocessing.Process(target=reader_proc, args=(q,))
    20     reader.start()
    21 
    22     reader.join()
    23     writer.join()
    24 
    25 # 运行结果
    26 # 1
    

    Pipe方法重回(conn1, conn2)代表三个管道的三个端。Pipe方法有duplex参数,假设duplex参数为True,那么那几个管道是全双工方式,也正是说conn1和conn2均可收发。duplex为False,conn1只负责接受音信,conn2只承担发送信息。send和recv方法分别是发送和经受音信的不二等秘书技。比方,在全双工形式下,能够调用conn1.send出殡和埋葬新闻,conn1.recv接收消息。如果未有音信可抽取,recv方法会一向不通。假若管道已经被关门,那么recv方法会抛出EOFError。

    6、Pipe

    Pipe能够是单向(half-duplex),也能够是双向。我们经过mutiprocessing.Pipe(duplex=False)创立单向管道 。一个经过从PIPE一端输入对象,然后被PIPE另一端的进程接收,单向管道只同意管道一端的进度输入,而双向管道则允许从两端输入。

    Pipe方法重返(conn1, conn2)代表一个管道的三个端。Pipe方法有duplex参数,假诺duplex参数为True(暗许值),那么这几个管道是全双工方式,也正是说conn1和conn2均可收发。duplex为False,conn1只负担接受音讯,conn2只担任发送音讯。

     1 # proc1 发送消息,proc2,proc3轮流接收消息 2 import multiprocessing 3 import time 4  5 def proc1: 6     while True: 7         for i in range(100): 8             print ("send: %s" % 9             pipe.send10             time.sleep(1)11 12 def proc2:13     while True:14         print ("proc2 rev:", pipe.recv15         time.sleep(1)16 17 def proc3:18     while True:19         print ("proc3 rev:", pipe.recv20         time.sleep(1)21 22 if __name__ == "__main__":23     pipe = multiprocessing.Pipe()24     p1 = multiprocessing.Process(target=proc1, args=)25     p2 = multiprocessing.Process(target=proc2, args=(pipe[1],))26     p3 = multiprocessing.Process(target=proc3, args=(pipe[1],))27 28     p1.start()29     p2.start()30     p3.start()31 32     p1.join()33     p2.join()34     p3.join()35 36 # 运行结果37 '''38 send: 039 proc2 rev: 040 send: 141 proc3 rev: 142 send: 243 proc2 rev: 244 send: 345 proc3 rev: 346 send: 447 proc2 rev: 448 send: 549 proc3 rev: 550 send: 651 proc2 rev: 652 send: 753 proc3 rev: 754 send: 855 proc2 rev: 856 send: 957 proc3 rev: 958 send: 1059 proc2 rev: 1060 ......61 '''
    

     

    7、Pool
    在行使Python实行系统管理的时候,极其是同一时间操作四个文件目录,或许远程调节多台主机,并行操作能够省去大量的时刻。当被操作对象数目相当小时,能够直接利用multiprocessing中的Process动态成生多个经过,十八个幸好,但只纵然无数个,上千个对象,手动的去界定进度数量却又太过繁琐,此时能够公布进程池的效益。
    Pool能够提供钦命数量的进程,供用户调用,当有新的央求提交到pool中时,假使池还尚无满,那么就能够创设多个新的经过用来实行该诉求;但如果池中的进度数一度抵达规定最大值,那么该需要就能等待,直到池中有经过甘休,才会创建新的进度来实施它。

    send和recv方法分别是出殡和埋葬和收受新闻的秘籍。比方,在全双工方式下,能够调用conn1.send出殡和埋葬音信,conn1.recv接收音信。如果未有新闻可选取,recv方法会一直不通。若是管道已经被关闭,那么recv方法会抛出EOFError。

    使用进程池

    Pipe能够是单向(half-duplex),也足以是双向(duplex)。大家通过mutiprocessing.Pipe(duplex=False)创建单向管道 (默以为双向)。一个进度从PIPE一端输入对象,然后被PIPE另一端的进度接收,单向管道只允许管道一端的进度输入,而双向管道则允许从两端输入。

     1 import multiprocessing 2 import time 3  4 def func: 5     print ("msg:", msg) 6     time.sleep(3) 7     print ("end") 8  9 if __name__ == "__main__":10     pool = multiprocessing.Pool(processes = 3)   # 池中最大进程数为311     for i in range(10):12         msg = "hello %d"          pool.apply_async(func,    #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去14 15     print ("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")16     pool.close()17     pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束18     print ("Sub-process done.")
    
     1 # proc1 发送消息,proc2,proc3轮流接收消息
     2 import multiprocessing
     3 import time
     4 
     5 def proc1(pipe):
     6     while True:
     7         for i in range(100):
     8             print ("send: %s" %(i))
     9             pipe.send(i)
    10             time.sleep(1)
    11 
    12 def proc2(pipe):
    13     while True:
    14         print ("proc2 rev:", pipe.recv())
    15         time.sleep(1)
    16 
    17 def proc3(pipe):
    18     while True:
    19         print ("proc3 rev:", pipe.recv())
    20         time.sleep(1)
    21 
    22 if __name__ == "__main__":
    23     pipe = multiprocessing.Pipe()
    24     p1 = multiprocessing.Process(target=proc1, args=(pipe[0],))
    25     p2 = multiprocessing.Process(target=proc2, args=(pipe[1],))
    26     p3 = multiprocessing.Process(target=proc3, args=(pipe[1],))
    27 
    28     p1.start()
    29     p2.start()
    30     p3.start()
    31 
    32     p1.join()
    33     p2.join()
    34     p3.join()
    35 
    36 # 运行结果
    37 '''
    38 send: 0
    39 proc2 rev: 0
    40 send: 1
    41 proc3 rev: 1
    42 send: 2
    43 proc2 rev: 2
    44 send: 3
    45 proc3 rev: 3
    46 send: 4
    47 proc2 rev: 4
    48 send: 5
    49 proc3 rev: 5
    50 send: 6
    51 proc2 rev: 6
    52 send: 7
    53 proc3 rev: 7
    54 send: 8
    55 proc2 rev: 8
    56 send: 9
    57 proc3 rev: 9
    58 send: 10
    59 proc2 rev: 10
    60 ......
    61 '''
    

    运转结果:

    7、Pool
    在选拔Python进行系统管理的时候,特别是同时操作几个文件目录,恐怕远程序调节制多台主机,并行操作能够省去多量的时刻。当被操作对象数目比比较小时,可以一贯利用multiprocessing中的Process动态成生多个进程,二十一个幸而,但只假如过多个,上千个目的,手动的去界定进度数量却又太过繁琐,此时能够宣布进度池的效能。
    Pool能够提供钦命数量的长河,供用户调用,当有新的央求提交到pool中时,假设池还尚未满,那么就能够创立多少个新的经过用来推行该需要;但假诺池中的进程数已经达到规定的规范规定最大值,那么该诉求就能等待,直到池中有经过甘休,才会创制新的进程来实行它。

    Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~msg: hello 0msg: hello 1msg: hello 2endmsg: hello 3endmsg: hello 4endmsg: hello 5endmsg: hello 6endmsg: hello 7endmsg: hello 8endmsg: hello 9endendendSub-process done.
    

    运用进度池(非阻塞)

    函数解释:

     1 import multiprocessing
     2 import time
     3 
     4 def func(msg):
     5     print ("msg:", msg)
     6     time.sleep(3)
     7     print ("end")
     8 
     9 if __name__ == "__main__":
    10     pool = multiprocessing.Pool(processes = 3)   # 池中最大进程数为3
    11     for i in range(10):
    12         msg = "hello %d" %(i)
    13         pool.apply_async(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
    14 
    15     print ("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
    16     pool.close()
    17     pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
    18     print ("Sub-process(es) done.")
    
    • apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的(驾驭不同,看例1例2结出不一致)
    • close() 关闭pool,使其不在接受新的任务。
    • terminate() 甘休职业经过,不在管理未产生的职务。
    • join() 主进度阻塞,等待子进度的脱离, join方法要在close或terminate之后选择。

    运作结果:

    实行表达:创立二个历程池pool,并设定进度的数目为3,range会相继爆发多少个对象[0, 1, 2, 3,4,5,6,7,8,9],十二个对象被提交到pool中,因pool钦赐进度数为3,所以0、1、2会直接送到进度中实施,当当中一个试行到位后才空出三个进度管理目的3,所以会产出出口“msg: hello 3”出现在"end"后。因为为非阻塞,主函数会本身施行自个的,不搭理进度的试行,所以运营完for循环后一向出口“mMsg: hark~ Mark~ Mark~~~~~~”,主程序在pool.join()处等待种种进程的完成。

    Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
    msg: hello 0
    msg: hello 1
    msg: hello 2
    end
    msg: hello 3
    end
    msg: hello 4
    end
    msg: hello 5
    end
    msg: hello 6
    end
    msg: hello 7
    end
    msg: hello 8
    end
    msg: hello 9
    end
    end
    end
    Sub-process(es) done.
    

    使用线程池

    函数解释:

     1 import multiprocessing 2 import time 3  4 def func: 5     print ("msg:", msg) 6     time.sleep(3) 7     print ("end") 8  9 if __name__ == "__main__":10     pool = multiprocessing.Pool(processes = 3)   # 池中最大进程数为311     for i in range(10):12         msg = "hello %d"          pool.apply(func,    #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去14 15     print ("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")16     pool.close()17     pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束18     print ("Sub-process done.")19 20 # 运行结果21 '''22 msg: hello 023 end24 msg: hello 125 end26 msg: hello 227 end28 msg: hello 329 end30 msg: hello 431 end32 msg: hello 533 end34 msg: hello 635 end36 msg: hello 737 end38 msg: hello 839 end40 msg: hello 941 end42 Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~43 Sub-process done.44 '''
    
    • apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的(驾驭不一样,看例1例2结出差异)
    • close()    关闭pool,使其不在接受新的职责。
    • terminate()    结束工作进程,不在管理未成功的职务。
    • join()    主进度阻塞,等待子进程的脱离, join方法要在close或terminate之后选择。

    运用多少个进度池

    施行表明:创立贰个历程池pool,并设定进程的数据为3,range(4)会相继发生八个目的[0, 1, 2, 3,4,5,6,7,8,9],十二个对象被交给到pool中,因pool钦命进度数为3,所以0、1、2会直接送到进程中实行,当当中二个实施到位后才空出二个经过管理对象3,所以会冒出出口“msg: hello 3”出现在"end"后。因为为非阻塞,主函数会本人试行自个的,不搭理进度的实施,所以运行完for循环后平素出口“mMsg: hark~ Mark~ Mark~~~~~~”,主程序在pool.join()处等待各类进程的完结。

     1 import multiprocessing 2 import os, time, random 3  4  5 def Lee(): 6     print("nRun task Lee-%s" % (os.getpid  # os.getpid()获取当前的进程的ID 7     start = time.time() 8     time.sleep(random.random  # random.random()随机生成0-1之间的小数 9     end = time.time()10     print( 'Task Lee, runs %0.2f seconds.' % (end - start))11 12 13 def Marlon():14     print("nRun task Marlon-%s" % (os.getpid15     start = time.time()16     time.sleep(random.random() * 40)17     end = time.time()18     print('Task Marlon runs %0.2f seconds.' % (end - start))19 20 21 def Allen():22     print("nRun task Allen-%s" % (os.getpid23     start = time.time()24     time.sleep(random.random() * 30)25     end = time.time()26     print('Task Allen runs %0.2f seconds.' % (end - start))27 28 29 def Frank():30     print( "nRun task Frank-%s" % (os.getpid31     start = time.time()32     time.sleep(random.random() * 20)33     end = time.time()34     print( 'Task Frank runs %0.2f seconds.' % (end - start))35 36 37 if __name__ == '__main__':38     function_list = [Lee, Marlon, Allen, Frank]39     print("parent process %s" % (os.getpid40 41     pool = multiprocessing.Pool(4)42     for func in function_list:43         pool.apply_async  # Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中44 45     print('Waiting for all subprocesses done...')46     pool.close()47     pool.join()  # 调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束48     print( 'All subprocesses done.')49 50 # 运行结果51 '''52 parent process 325653 Waiting for all subprocesses done...54 55 Run task Lee-219656 57 Run task Marlon-458058 59 Run task Allen-592060 61 Run task Frank-638462 Task Allen runs 2.15 seconds.63 Task Lee, runs 9.99 seconds.64 Task Frank runs 14.14 seconds.65 Task Marlon runs 32.74 seconds.66 All subprocesses done.67 68 '''
    

     

    使用线程池(阻塞)

     1 import multiprocessing
     2 import time
     3 
     4 def func(msg):
     5     print ("msg:", msg)
     6     time.sleep(3)
     7     print ("end")
     8 
     9 if __name__ == "__main__":
    10     pool = multiprocessing.Pool(processes = 3)   # 池中最大进程数为3
    11     for i in range(10):
    12         msg = "hello %d" %(i)
    13         pool.apply(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
    14 
    15     print ("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
    16     pool.close()
    17     pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
    18     print ("Sub-process(es) done.")
    19 
    20 # 运行结果
    21 '''
    22 msg: hello 0
    23 end
    24 msg: hello 1
    25 end
    26 msg: hello 2
    27 end
    28 msg: hello 3
    29 end
    30 msg: hello 4
    31 end
    32 msg: hello 5
    33 end
    34 msg: hello 6
    35 end
    36 msg: hello 7
    37 end
    38 msg: hello 8
    39 end
    40 msg: hello 9
    41 end
    42 Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
    43 Sub-process(es) done.
    44 '''
    

    动用两个进程池

     1 import multiprocessing
     2 import os, time, random
     3 
     4 
     5 def Lee():
     6     print("nRun task Lee-%s" % (os.getpid()))  # os.getpid()获取当前的进程的ID
     7     start = time.time()
     8     time.sleep(random.random() * 10)  # random.random()随机生成0-1之间的小数
     9     end = time.time()
    10     print( 'Task Lee, runs %0.2f seconds.' % (end - start))
    11 
    12 
    13 def Marlon():
    14     print("nRun task Marlon-%s" % (os.getpid()))
    15     start = time.time()
    16     time.sleep(random.random() * 40)
    17     end = time.time()
    18     print('Task Marlon runs %0.2f seconds.' % (end - start))
    19 
    20 
    21 def Allen():
    22     print("nRun task Allen-%s" % (os.getpid()))
    23     start = time.time()
    24     time.sleep(random.random() * 30)
    25     end = time.time()
    26     print('Task Allen runs %0.2f seconds.' % (end - start))
    27 
    28 
    29 def Frank():
    30     print( "nRun task Frank-%s" % (os.getpid()))
    31     start = time.time()
    32     time.sleep(random.random() * 20)
    33     end = time.time()
    34     print( 'Task Frank runs %0.2f seconds.' % (end - start))
    35 
    36 
    37 if __name__ == '__main__':
    38     function_list = [Lee, Marlon, Allen, Frank]
    39     print("parent process %s" % (os.getpid()))
    40 
    41     pool = multiprocessing.Pool(4)
    42     for func in function_list:
    43         pool.apply_async(func)  # Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中
    44 
    45     print('Waiting for all subprocesses done...')
    46     pool.close()
    47     pool.join()  # 调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束
    48     print( 'All subprocesses done.')
    49 
    50 # 运行结果
    51 '''
    52 parent process 3256
    53 Waiting for all subprocesses done...
    54 
    55 Run task Lee-2196
    56 
    57 Run task Marlon-4580
    58 
    59 Run task Allen-5920
    60 
    61 Run task Frank-6384
    62 Task Allen runs 2.15 seconds.
    63 Task Lee, runs 9.99 seconds.
    64 Task Frank runs 14.14 seconds.
    65 Task Marlon runs 32.74 seconds.
    66 All subprocesses done.
    67 
    68 '''
    

     

    本文由新葡亰496net发布于奥门新萄京娱乐场,转载请注明出处:Python多进度编制程序,python多进度总括

    关键词: