您的位置:新葡亰496net > 奥门新萄京娱乐场 > 并发编程,Python并发之多进程的方法实例代码

并发编程,Python并发之多进程的方法实例代码

发布时间:2019-09-15 20:06编辑:奥门新萄京娱乐场浏览(67)

    一,进程的理论基础

      一个应用程序,归根结底是一堆代码,是静态的,而进程才是执行中的程序,在一个程序运行的时候会有多个进程并发执行。

      进程和线程的区别:

    •     进程是系统资源分配的基本单位。
    •     一个进程内可以包含多个线程,属于一对多的关系,进程内的资源,被其内的线程共享
    •     线程是进程运行的最小单位,如果说进程是完成一个功能,那么其线程就是完成这个功能的基本单位
    •     进程间资源不共享,多进程切换资源开销,难度大,同一进程内的线程资源共享,多线程切换资源开销,难度小

      进程与线程的共同点:

        都是为了提高程序运行效率,都有执行的优先权

     

    一,进程的理论基础

    线程

    python线程

    线程的定义

    线程是操作系统能够进行运算调度的最小单位。它被包含在进程中。是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流。一个进程中可以并发多个线程,每条线程并行执行不同的任务
    多个线程的执行会通过线程的调度去抢占CPU的资源

    二,Python的多进程( multiprocessing模块)

    创建一个进程(和创建线程类似)

    方法一:创建Process对象,通过对象调用start()方法启动进程

    from multiprocessing import Process
    
    def foo(name):
        print('hello,%s'%name)
    
    if __name__ == '__main__':
        p1=Process(target=foo,args=('world',))
        p2 = Process(target=foo, args=('China',))
        p1.start()
        p2.start()
        print('=====主进程=====')
    
        # == == =主进程 == == =
        # hello, world
        # hello, China
        #主进程和子进程并发执行  
    

     注意:Process对象只能在在 if __name__ == '__main__':下创建,不然会报错。

    方法二:自定义一个类继承Process类,并重写run()方法,将执行代码放在其内**

    from multiprocessing import Process
    
    class MyProcess(Process):
        def __init__(self,name):
            super().__init__()
            self.name = name
        def run(self):
            print('hello,%s'%self.name)
    
    if __name__ == '__main__':
        myprocess1 = MyProcess('world')
        myprocess2 = MyProcess('world')
        myprocess1.start()
        myprocess2.start()
    

     

     

     Process内置方法

    实例方法:
    p.start():启动进程,并调用该子进程中的p.run() 
    
    p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法  
    
    p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
    
    p.is_alive():如果p仍然运行,返回True
    
    p.join([timeout]):主线程等待p终止。timeout是可选的超时时间
    

     

    Process属性

    p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置
    
    p.name:进程的名称
    
    p.pid:进程的pid
    
    p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可) 
    

    一个应用程序,归根结底是一堆代码,是静态的,而进程才是执行中的程序,在一个程序运行的时候会有多个进程并发执行。

    线程调用的两种形式

    1 . 直接调用

    import threading
    import time
    
    
    def say_hi(words):
        print("我要说的话: %s" % words)
    
    
    thread_1 = threading.Thread(target=say_hi, args=("我是线程1",),name= "T1")
    thread_2 = threading.Thread(target=say_hi, args=("我是线程2",),name= "T2")
    
    thread_1.start()
    thread_2.start()
    
    print(thread_1.getName())
    print(thread_2.getName())
    
    1. 继承式调用
    import threading
    import time
    
    
    class MyThread(threading.Thread):
        def __init__(self,num):
            threading.Thread.__init__(self)
            self.num = num
    
        def run(self):#定义每个线程要运行的函数
    
            print("running on number:%s" %self.num)
    
            time.sleep(3)
    
    if __name__ == '__main__':
    
        t1 = MyThread(1)
        t2 = MyThread(2)
        t1.start()
        t2.start()
    
        print("ending......")
    

    线程的创建

    多线程类似于同时执行多个不同程序,多线程有以下优点:

    • 使用线程可以把占据长时间的程序中的任务放到后台去处理
    • 程序的运行速度可能加快
    • 在一些IO密集型操作中,线程就比较有用。可以释放上些内存占用

    python3中使用Threading模块提供线程相关的操作。

    #!/usr/bin/env python3
    
    import threading
    import time
    
    
    class Mythread(threading.Thread):
        def __init__(self, func, arg):
            threading.Thread.__init__(self)
            self.func = func
            self.arg = arg
    
        def run(self):
            self.func(self.arg)
    
    
    def f1(x):
        time.sleep(1)
        print(x)
    
    if __name__ == '__main__':
        for i in range(10):
            t = Mythread(f1, i)
            t.start()
    

    上面的代码创建了十个线程,然后提交给CPU,让CPU根据指定算法去调度执行。

    • getName 获取线程名
    • setNmae 设置线程名
    • setDaemon 设置线程为前台还是后台,如果线程是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程执行完成后才退出程序。如果线台为后台,则主线程执行完毕后无论后台线程是否执行完毕,程序均会退出
    • join 逐个执行每个线程,待执先完毕后主线程才继续往下执行

    进程的定义

    程序执行的实例称为进程
    每个进程提供执行程序所需的资源。进程具有虚拟地址空间,可执行代码,系统对象的打开句柄,安全上下文,唯一进程标识符,环境变量,优先级类别,最小和最大工作集。每个进程都使用单线程启动,通常称为主线程,但可以从其任何线程创建其它线程

    进程和线程的比较
    进程和线程之间的比较是没有意义的,因为进程是一个程序的执行实例,而进程是由线程进行执行的,但线程和进程毕竟还是两种机制

    • 进程可以创建子进程,而每个子进程又可以开多个线程
    • 线程之间可以共享数据,而线程之间不可以共享数据,线程之间可以进行通信,而进程之间进行通信就会比较麻烦
    • 开辟进程要比开辟线程的开销大很多

    守护进程

    类似于守护线程,只不过守护线程是对象的一个方法,而守护进程封装成对象的属性。

    from multiprocessing import Process
    import time
    class MyProcess(Process):
        def __init__(self,name):
            super().__init__()
            self.name = name
        def run(self):
            time.sleep(3)
            print('hello,%s'%self.name)
    
    if __name__ == '__main__':
        myprocess1=MyProcess('world')
        myprocess1.daemon = True
        myprocess1.start()
        print('结束')
    
    #不会输出‘hello world’,因为设置为守护进程,主进程不会等待
    

     

    也可以使用join方法,使主进程等待

    新葡亰496net 1新葡亰496net 2

    from multiprocessing import Process
    import time
    class MyProcess(Process):
        def __init__(self,name):
            super().__init__()
            self.name = name
        def run(self):
            time.sleep(3)
            print('hello,%s'%self.name)
    
    if __name__ == '__main__':
        myprocess1=MyProcess('world')
        myprocess1.daemon = True
        myprocess1.start()
        myprocess1.join()  #程序阻塞
        print('结束')
    

    join()

     

    进程和线程的区别:

    Thread实例的方法

    join():在子线程完成运行之前,这个子线程的父线程将一直被阻塞。
    
    setDaemon(True):
    
             将线程声明为守护线程,必须在start() 方法调用之前设置, 如果不设置为守护线程程序会被无限挂起。这个方法基本和join是相反的。
    
             当我们 在程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程 就分兵两路,分别运行,那么当主线程完成
    
             想退出时,会检验子线程是否完成。如 果子线程未完成,则主线程会等待子线程完成后再退出。但是有时候我们需要的是 只要主线程
    
             完成了,不管子线程是否完成,都要和主线程一起退出,这时就可以 用setDaemon方法啦
    # run():  线程被cpu调度后自动执行线程对象的run方法
    # start():启动线程活动。
    # isAlive(): 返回线程是否活动的。
    # getName(): 返回线程名。
    # setName(): 设置线程名。
    
    threading模块提供的一些方法:
    # threading.currentThread(): 返回当前的线程变量。
    # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
    # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
    

    线程锁

    Python中创建线程

    Python中创建线程有多种模式

    进程同步和锁

      进程虽然不像线程共享资源,但是这并不意味着进程间不需要加锁,比如不同进程会共享同一个终端(屏幕),或者操作同一个文件,数据库,那么数据安全还是很有必要的,因此我们可以加锁,

    from multiprocessing import Process,Lock
    import time
    def a_print(l): #需要传入对象,因为信息不共享
        l.acquire()
        print('我要打印信息')
        time.sleep(1)
        print('我打印完了')
        l.release()
    
    if __name__ == '__main__':
        l = Lock()
        for i in range(20):
            p = Process(target=a_print,args=(l,))
            p.start()
    

     

    • 进程是系统资源分配的基本单位。
    • 一个进程内可以包含多个线程,属于一对多的关系,进程内的资源,被其内的线程共享
    • 线程是进程运行的最小单位,如果说进程是完成一个功能,那么其线程就是完成这个功能的基本单位
    • 进程间资源不共享,多进程切换资源开销,难度大,同一进程内的线程资源共享,多线程切换资源开销,难度小

    同步锁

    R=threading.Lock()
    
    ####
    def sub():
        global num
        R.acquire()
        temp=num-1
        time.sleep(0.1)
        num=temp
        R.release()
    
    互斥锁

    由于线程之间是随机调度的,当多个线程同时修改同一条数据时可能会导致数据混乱,因此,出线了线程锁,这样同上时刻只允许一个线程操作。

    #!/usr/bin/env python3
    
    import threading
    import time
    
    li = [1, 2, 3]
    
    
    def fun(x, y):
        x.append(y,)
        time.sleep(1)
        print(x)
    
    if __name__ == '__main__':
        for i in range(4, 10):
            t = threading.Thread(target=fun, args=(li, i,))
            t.start()
    

    以上代码中,先定义了一个列表li,并发产生了6个线程,每个线程往列表中添加一个数字并打印列表li,理论它的输出应该是这样的:

    [1, 2, 3, 4]
    [1, 2, 3, 4, 6]
    [1, 2, 3, 4, 6, 5]
    [1, 2, 3, 4, 6, 5, 8]
    [1, 2, 3, 4, 6, 5, 8, 7]
    [1, 2, 3, 4, 6, 5, 8, 7, 9]
    

    实际上它的输出确是这样的:

    [1, 2, 3, 4, 5, 6, 7, 8, 9]
    [1, 2, 3, 4, 5, 6, 7, 8, 9]
    [1, 2, 3, 4, 5, 6, 7, 8, 9]
    [1, 2, 3, 4, 5, 6, 7, 8, 9]
    [1, 2, 3, 4, 5, 6, 7, 8, 9]
    [1, 2, 3, 4, 5, 6, 7, 8, 9]
    

    出现这种情况的原因就是同一时刻多个线程操作同一个数据造成的。下面我们给程序加上线程锁:

    #!/usr/bin/env python3
    
    import threading
    import time
    
    li = [1, 2, 3]
    
    lock = threading.Lock()     #创建锁
    def fun(x, y):
        lock.acquire()          #锁定
        x.append(y,)
        time.sleep(1)
        print(x)    
        lock.release()          #释放
    
    if __name__ == '__main__':
        for i in range(4, 10):
            t = threading.Thread(target=fun, args=(li, i,))
            t.start()
    

    输出结果如下:

    [1, 2, 3, 4]
    [1, 2, 3, 4, 5]
    [1, 2, 3, 4, 5, 6]
    [1, 2, 3, 4, 5, 6, 7]
    [1, 2, 3, 4, 5, 6, 7, 8]
    [1, 2, 3, 4, 5, 6, 7, 8, 9]
    

    threading 模块

    信号量(Semaphore)

    能够并发执行的进程数,超出的进程阻塞,直到有进程运行完成。

      Semaphore管理一个内置的计数器,
      每当调用acquire()时内置计数器-1;
      调用release() 时内置计数器 1;
      计数器不能小于0;当计数器为0时,acquire()将阻塞进程直到其他进程调用release()。

    from multiprocessing import Process,Queue,Semaphore
    import time,random
    
    def seat(s,n):
        s.acquire()
        print('学生%d坐下了'%n)
        time.sleep(random.randint(1,2))
        s.release()
    
    if __name__ == '__main__':
        s = Semaphore(5)
        for i in range(20):
            p = Process(target=seat,args=(s,i))
            p.start()
    
        print('-----主进程-------')
    

     

    注意:其实信号量和锁类似,只是限制进程运行某个代码块的数量(锁为1个),并不是能限制并发的进程,如上述代码,一次性还是创建了20个进程

     

    进程与线程的共同点:

    递归锁

    递归锁,其中维护一个变量,当这个变量为正值时,不允许其他进程进入

    lockA=threading.Lock()
    lockB=threading.Lock()
    
    信号量(Semaphore)

    互斥锁同时只允许一个线程更改数据,而信号量同时可允许一定数量的线程更改数据。
    互斥锁用于线程的互斥,信号量用于线程的同步,这是互斥锁与信号量最根本的差别。
      Semaphore管理了一个内置的计数器,每当调用acquire()时内置计数器 -1,调用 release()时内置计数器 1,当计数器为 0 时,acquire()将阻塞线程直到其他线程调用 release().

    直接调用threading模块 创建线程

    Python中创建线程可以使用threading模块

    • threading.Thread(target=func,args = params,) 创建线程 target指定执行的函数 target指定参数元组形式
    '''
    python thread
    '''
    import threading
    
    import time
    
    beggin = time.time()
    
    
    def foo(n):
        print('foo%s' % n)
        time.sleep(1)
    
    
    def bar(n):
        print('bar %s' % n)
    
    
    end = time.time()
    cast_time = end - beggin
    print(float(cast_time))
    # 创建线程
    t1 = threading.Thread(target=foo, args=('thread1',))
    t2 = threading.Thread(target=bar, args=('thread2',))
    t1.start()
    t2.start()
    

    事件(Event)

    新葡亰496net 3新葡亰496net 4

    from multiprocessing import Process,Event
    import time, random
    def eating(event):
        event.wait()
        print('去吃饭的路上...')
    
    def makeing(event):
        print('做饭中')
        time.sleep(random.randint(1,2))
        print('做好了,快来...')
        event.set()
    
    if __name__ == '__main__':
        event=Event()
        t1 = Process(target=eating,args=(event,))
        t2 = Process(target=makeing,args=(event,))
        t1.start()
        t2.start()
        # 做饭中
        # 做好了,快来...
        # 去吃饭的路上...
    

    和线程事件几乎一致

     

     

    都是为了提高程序运行效率,都有执行的优先权

    Event

    set -> wait -> clear(由不同的调用)

    An event is a simple synchronization object;the event represents an internal flag,
    
    and threads can wait for the flag to be set, or set or clear the flag themselves.
    
    
    event = threading.Event()
    
    # a client thread can wait for the flag to be set
    event.wait()
    
    # a server thread can set or reset it
    event.set()
    event.clear()
    
    
    If the flag is set, the wait method doesn’t do anything.
    If the flag is cleared, wait will block until it becomes set again.
    Any number of threads may wait for the same event.
    
    
    
    import threading,time
    class Boss(threading.Thread):
        def run(self):
            print("BOSS:今晚大家都要加班到22:00。")
            print(event.isSet())
            event.set()
            time.sleep(5)
            print("BOSS:<22:00>可以下班了。")
            print(event.isSet())
            event.set()
    class Worker(threading.Thread):
        def run(self):
            event.wait()
            print("Worker:哎……命苦啊!")
            time.sleep(1)
            event.clear()
            event.wait()
            print("Worker:OhYeah!")
    if __name__=="__main__":
        event=threading.Event()
        threads=[]
        for i in range(5):
            threads.append(Worker())
        threads.append(Boss())
        for t in threads:
            t.start()
        for t in threads:
            t.join()
    
    实例:(每次只有5个线程可以同时执行)
    #!/usr/bin/env python3
    
    import threading
    import time
    
    num = 0
    
    def f1(i):
        global num
        sem.acquire()
        num  = 1
        time.sleep(1)
        print(num,i)
        sem.release()
    
    if __name__ == '__main__':
        sem = threading.BoundedSemaphore(5)
        for i in range(20):
            t = threading.Thread(target=f1,args=(i,))
            t.start()
    

    通过继承threading模块调用线程

    import threading
    import time
    
    
    class MyThread(threading.Thread):
        def __init__(self,num):
            threading.Thread.__init__(self)
            self.num = num
    
        def run(self):#定义每个线程要运行的函数
    
            print("running on number:%s" %self.num)
    
            time.sleep(3)
    
    if __name__ == '__main__':
    
        t1 = MyThread(1)
        t2 = MyThread(2)
        t1.start()
        t2.start()
    
    • 创建类继承threading.Thread
    • 重写类的run方法

    进程队列(Queue)

    进程队列是进程通讯的方式之一。使用multiprocessing 下的Queue

    from multiprocessing import Process,Queue
    import time
    def func1(queue):
        while True:
            info=queue.get()
            if info == None:
                return 
            print(info)
    
    def func2(queue):
        for i in range(10):
            time.sleep(1)
            queue.put('is %d'%i)
        queue.put(None) #结束的标志
    if __name__ == '__main__':
        q = Queue()
        p1 = Process(target=func1,args=(q,))
        p2 = Process(target=func2, args=(q,))
    
        p1.start()
        p2.start()
    

     

    Queue类的方法,源码如下:

    新葡亰496net 5新葡亰496net 6

    class Queue(object):
        def __init__(self, maxsize=-1):  #可以传参设置队列最大容量
            self._maxsize = maxsize
    
        def qsize(self): #返回当前时刻队列中的个数
            return 0
    
        def empty(self): #是否为空
            return False
    
        def full(self):    是否满了
            return False
    
        def put(self, obj, block=True, timeout=None): #放值,blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常
            pass
    
        def put_nowait(self, obj):  #=put(False)
            pass
    
        def get(self, block=True, timeout=None): 获取值,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
            pass
    
        def get_nowait(self): # = get(False)
            pass
    
        def close(self): #将队列关闭
            pass
    
        def join_thread(self): #略,几乎不用
            pass
    
        def cancel_join_thread(self):
            pass
    

    进程队列源码注释

     

    二,Python的多进程( multiprocessing模块)

    信号量

    信号量用来控制线程并发数的,BoundedSemaphore或Semaphore管理一个内置的计数 器,每当调用acquire()时-1,调用release()时 1。 计数器不能小于0,当计数器为 0时,acquire()将阻塞线程至同步锁定状态,直到其他线程调用release()。(类似于停车位的概念)
    BoundedSemaphore与Semaphore的唯一区别在于前者将在调用release()时检查计数 器的值是否超过了计数器的初始值,如果超过了将抛出一个异常。

    import threading,time
    class myThread(threading.Thread):
        def run(self):
            if semaphore.acquire():
                print(self.name)
                time.sleep(5)
                semaphore.release()
    if __name__=="__main__":
        semaphore=threading.Semaphore(5)
        thrs=[]
        for i in range(100):
            thrs.append(myThread())
        for t in thrs:
            t.start()
    
    事件(event)

    event是由线程设置的信号标志,如果信号标志为真,则其他线程等待直到信号结束。

    • set() 设置信号,使用Event的set() 方法可以设置对象内部的信号标置为真
    • clear() 清除信号,使用Event的 clear() 方法可以清除Event对象内部的信号标志
    • wait() 等待,Event对象wait的方法只有在内部信号为真的时候才会很快的执行并完成返回。当Event对象的内部信号标志位假时,则wait方法一直等待到其为真时才返回。

    实例:

    #!/usr/bin/env python3
    
    import threading
    import time
    
    def f(i):
    
        print('start')
        event.wait()
        print(i)
        print('end')
    
    event = threading.Event()
    
    if __name__ == '__main__':
        for i in range(5):
            t = threading.Thread(target=f,args=(i,))
            t.start()
    
    event.clear()
    inp = input('input your choose:')
    if inp:
        event.set()
    

    Python 多线程中的GIL

    Python的GIL并不是Python的特性,它是在实现Python解析器也就是基于C语言的解析器 CPython时所引入的一个概念。Python可以用不同的编译器来编译成可执行代码。例如C语言中的GCC等。也就是说只有在CPython中才会出现GIL的情况
    GIL又称为全局解释器锁(Global Interpreter Lock)
    现代的CPU已经是多核CPU,为了更有效的利用多核处理器的性能,就出现了多线程的编程方式。而在解决多线程之间数据完整性和状态同步的最简单的方法就是加锁。GIL就是给Python解释器加了一把大锁。我们知道Python是由解释器执行的,由于GIL的存在 只能有一个线程被解释器执行,这样就使得Python在多线程执行上的效率变低。由于历史遗留问题,发现大量库代码开发者已经重度依赖GIL而非常难以去除了。也就是说在多核CPU上,并行执行的Python多线程,甚至不如串行执行的Python程序,这就是GIL存在的问题

    进程池

      进程的消耗是很大的,因此我们不能无节制的开启新进程,因此我们可以通过维护一个进程池来控制进程的数量。这就不同于信号量,进程池可以从源头控制进程数量。在Python中可以通过如下方法使用

    同步调用

    from multiprocessing import Pool
    import time, random, os
    def func(n):
        pid = os.getpid()
        print('进程%s正在处理第%d个任务'%(pid,n),'时间%s'%time.strftime('%H-%M-%S'))
        time.sleep(2)
        res = '处理%s'%random.choice(['成功','失败'])
        return res
    
    if __name__ == '__main__':
        p = Pool(4) #创建4个进程,
        li = []
        for i in range(10):
            res = p.apply(func,args=(i,)) 交给进程池处理,处理完成才返回值,会阻塞,即使池内还有空余进程,相当于顺序执行
            li.append(res)
    
    
        for i in li:
            print(i)
    
    #进程1916正在处理第0个任务 时间21-02-53
    #进程1240正在处理第1个任务 时间21-02-55
    #进程3484正在处理第2个任务 时间21-02-57
    #进程7512正在处理第3个任务 时间21-02-59
    #进程1916正在处理第4个任务 时间21-03-01
    #进程1240正在处理第5个任务 时间21-03-03
    #进程3484正在处理第6个任务 时间21-03-05
    #进程7512正在处理第7个任务 时间21-03-07
    #进程1916正在处理第8个任务 时间21-03-09
    #进程1240正在处理第9个任务 时间21-03-11
    

     

    从结果可以发现两点:

    1. 不是并发处理
    2. 一直都只有四个进程,串行执行

     

    因此进程池提供了异步处理的方式

    from multiprocessing import Pool
    import time, random, os
    def func(n):
        pid = os.getpid()
        print('进程%s正在处理第%d个任务'%(pid,n),'时间%s'%time.strftime('%H-%M-%S'))
        time.sleep(2)
        res = '处理%s'%random.choice(['成功','失败'])
        return res
    
    if __name__ == '__main__':
        p = Pool(4)
        li = []
        for i in range(10):
            res = p.apply_async(func,args=(i,)) 结果不会立刻返回,遇到阻塞,开启下一个进程,在这,相当于几乎同时出现四个打印结果(一个线程处理一个任务,处理完下个任务才能进来)
            li.append(res)
    
        p.close() #join之前需要关闭进程池
        p.join()  #因为异步,所以需要等待池内进程工作结束再继续
        for i in li:
            print(i.get()) #i是一个对象,通过get方法获取返回值,而同步则没有该方法
    

     关于回调函数

    新葡亰496net 7新葡亰496net 8

    from multiprocessing import Pool
    import time, random, os
    def func(n):
        pid = os.getpid()
        print('进程%s正在处理第%d个任务'%(pid,n),'时间%s'%time.strftime('%H-%M-%S'))
        time.sleep(2)
        res = '处理%s'%random.choice(['成功','失败'])
        return res
    
    def foo(info):
        print(info) #传入值为进程执行结果
    
    if __name__ == '__main__':
        p = Pool(4)
        li = []
        for i in range(10):
            res = p.apply_async(func,args=(i,),callback = foo) callback()回调函数会在进程执行完之后调用(主进程调用) 
            li.append(res)
    
        p.close() 
        p.join()  
        for i in li:
            print(i.get()) 
    

    有回调函数

     

    创建一个进程(和创建线程类似)

    多线程利器:队列

    Python进程

    Python GIL的出现场景

    在Python中如果任务是IO密集型的,可以使用多线程。而且Python的多线程非常善于处理这种问题
    而如果Python中如果任务是计算密集型的,就需要处理一下GIL

    方法一:创建Process对象,通过对象调用start()方法启动进程

    队列的使用方法

    创建一个“队列”对象
    import Queue
    q = Queue.Queue(maxsize = 10)
    Queue.Queue类即是一个队列的同步实现。队列长度可为无限或者有限。可通过Queue的构造函数的可选参数maxsize来设定队列长度。如果maxsize小于1就表示队列长度无限。
    
    将一个值放入队列中
    q.put(10)
    调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为
    1。如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0,put方法将引发Full异常。
    
    将一个值从队列中取出
    q.get()
    调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。如果队列为空且block为True,
    get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。
    
    Python Queue模块有三种队列及构造函数:
    1、Python Queue模块的FIFO队列先进先出。   class queue.Queue(maxsize)
    2、LIFO类似于堆,即先进后出。               class queue.LifoQueue(maxsize)
    3、还有一种是优先级队列级别越低越先出来。        class queue.PriorityQueue(maxsize)
    
    此包中的常用方法(q = Queue.Queue()):
    q.qsize() 返回队列的大小
    q.empty() 如果队列为空,返回True,反之False
    q.full() 如果队列满了,返回True,反之False
    q.full 与 maxsize 大小对应
    q.get([block[, timeout]]) 获取队列,timeout等待时间
    q.get_nowait() 相当q.get(False)
    非阻塞 q.put(item) 写入队列,timeout等待时间
    q.put_nowait(item) 相当q.put(item, False)
    q.task_done() 队列中的数据发生变化后,发出信号
    q.join() 收到信号后,停止阻塞
    
    简单的创建进程

    python的进程一般用 multiprocessing模块,他与threading模块很像,对多核CPU的利用率会比 threading好的多。
    实例:(简单的创建进程)

    #!/usr/bin/env python3
    
    import multiprocessing
    import time
    def func(i):
        time.sleep(2)
        print(i)
    
    if __name__ == '__main__':
        for i in range(10):
            p = multiprocessing.Process(target=func,args=(i,))
            p.start()
    

    进程这间默认是无法共享数据的:

    #!/usr/bin/env python3
    
    import multiprocessing
    import time
    li = []
    def func(i):
        li.append(i)
        time.sleep(2)
        print(li)
    
    if __name__ == '__main__':
        for i in range(10):
            p = multiprocessing.Process(target=func,args=(i,),name='mywork')
    
            p.start()
    

    输出如下:

    [0]
    [3]
    [1]
    [9]
    [7]
    [6]
    [8]
    [5]
    [2]
    [4]
    

    join 和daemon

    join

    • 在子线程完成运行之前,这个子线程的父线程将一直被阻塞。在一个程序中我们执行一个主线程,这个主线程又创建一个子线程,主线程和子线程就互相执行,当子线程在主线程中调用join方法时,主线程会等待子线程执行完后再结束
    '''in main thread'''
    t.join() 主线程会等待线程t执行完成后再继续执行
    

    daemon

    • setDaemon(true)
      将线程声明为守护线程,必须在start() 方法调用之前设置, 如果不设置为守护线程程序会被无限挂起。这个方法基本和join是相反的。当我们 在程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程 就分兵两路,分别运行,那么当主线程完成想退出时,会检验子线程是否完成。如 果子线程未完成,则主线程会等待子线程完成后再退出。但是有时候我们需要的是 只要主线程完成了,不管子线程是否完成,都要和主线程一起退出,这时就可以 用setDaemon方法啦
    • 并发编程,Python并发之多进程的方法实例代码。currentThread() 获取当前执行的线程
    from multiprocessing import Process
    def foo(name):
     print('hello,%s'%name)
    if __name__ == '__main__':
     p1=Process(target=foo,args=('world',))
     p2 = Process(target=foo, args=('China',))
     p1.start()
     p2.start()
     print('=====主进程=====')
     # == == =主进程 == == =
     # hello, world
     # hello, China
     #主进程和子进程并发执行 
    

    生产者消费者模式

    import time,random
    import queue,threading
    
    q = queue.Queue()
    
    def Producer(name):
      count = 0
      while count <10:
        print("making........")
        time.sleep(random.randrange(3))
        q.put(count)
        print('Producer %s has produced %s baozi..' %(name, count))
        count  =1
        #q.task_done()
        #q.join()
        print("ok......")
    def Consumer(name):
      count = 0
      while count <10:
        time.sleep(random.randrange(4))
        if not q.empty():
            data = q.get()
            #q.task_done()
            #q.join()
            print(data)
            print('33[32;1mConsumer %s has eat %s baozi...33[0m' %(name, data))
        else:
            print("-----no baozi anymore----")
        count  =1
    
    p1 = threading.Thread(target=Producer, args=('A',))
    c1 = threading.Thread(target=Consumer, args=('B',))
    # c2 = threading.Thread(target=Consumer, args=('C',))
    # c3 = threading.Thread(target=Consumer, args=('D',))
    p1.start()
    c1.start()
    # c2.start()
    # c3.start()
    
    python数据共享的方法

    Python中进程间共享数据,除了基本的queue,pipe和value array外,还提供了更高层次的封装。使用multiprocessing.Manager可以简单地使用这些高级接口.Manager()返回的manager对象控制了一个server进程,此进程包含的python对象可以被其他的进程通过proxies来访问。从而达到多进程间数据通信且安全。Manager支持的类型有list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。

    • Array 数组

      #!/usr/bin/env python3

      from multiprocessing import Array,Process

      import time

      arry = Array('i',[11,22,33,44,55,66,77,88,99])

      def func(i,arry):

      arry[i]  = 100
      print(list(arry))
      
    if __name__ == '__main__':
        for i in range(8):
            p = Process(target=func,args=(i,arry,))
            p.start()
    
    • manage.dict() 字典

      #!/usr/bin/env python3

      import time from multiprocessing import Process,Manager

      def func(d,i):

      d[i] = i
      time.sleep(0)
      print(d)
      

      if name == 'main':

      d = Manager().dict()
      for i in range(2):
          p = Process(target=func,args=(d,i,))
          p.start()
      p.join()
      
    • queue

      #!/usr/bin/env python3

      from multiprocessing import Process,Queue import time def func(q,i):

      time.sleep(2)
      print(i,q.get())
      

      if name == 'main':

      q = Queue()
      q.put('h1')
      q.put('h2')
      q.put('h3')
      for i in range(3):
          p = Process(target=func,args=(q,i,))
          p.start()
      

    线程中的锁

    先看一个线程共享数据的问题

    '''
    线程安全问题
    '''
    # 定义一个共享变量
    import threading
    
    import time
    
    num = 100
    
    
    def sub():
        # 操作类变量
        global num
        tmp = num
        time.sleep(0.1)
        num = tmp - 1
    
    
    if __name__ == '__main__':
        thread_list = []
        for i in range(100):
            t1 = threading.Thread(target=sub)
            t1.start()
            thread_list.append(t1)
        for i in range(100):
            t2 = thread_list[i]
            t2.join()
    
    print('final num'   str(num))
    >>> 
    final num99
    

    注意:Process对象只能在在 if __name__ == '__main__':下创建,不然会报错。

    多进程模块

    由于GIL的存在,python中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。
    multiprocessing包是Python中的多进程管理包。与threading.Thread类似,它可以利用multiprocessing.Process对象来创建一个进程。该进程可以运行在Python程序内部编写的函数。该Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类 (这些对象可以像多线程那样,通过参数传递给各个进程),用以同步进程,其用法与threading包中的同名类一致。所以,multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。

    进程锁实例:
    #!/usr/bin/env python3
    
    from multiprocessing import Process,Array,Lock
    
    def func(arry,lock,i):
        lock.acquire()
        arry[i] = i
        for item in arry:
            print(item)
        lock.release()
    
    if __name__ == '__main__':
        lock = Lock()
        arry = Array('i',[11,22,33,44])
        for i in range(4):
            p = Process(target=func,args=(arry,lock,i,))
            p.start()
        p.join()
    

    分析

    上面的程序中,我们想要的是开启100个线程,每个线程将共享数据减去1,但是我们发现 输出的结果是99,这种情况是因为多线程在cpu中执行时是抢占式的,程序在开始执行时,开启了100个线程去执行,当程序执行到time.sleep(0.1)时,由于发生了线程的阻塞,所以cpu进行了切换,此时,程序的共享变量num是100,中间变量tmp也是100 在线程阻塞过后,将共享变量num的值减1,值变为99 此时其它的线程获得cpu的执行机会,而当前线程中的共享变量num的值还是100所以执行减1操作后,又将中间值赋值给共享变量num所以num的值一直为99

    • 线程的执行情况
    ![](https://upload-images.jianshu.io/upload_images/6052465-461749d8c9eb7ea5.png)
    
    多线程抢占.png
    

    方法二:自定义一个类继承Process类,并重写run()方法,将执行代码放在其内

    多进程模块的使用

    # 实例调用
    from multiprocessing import Process
    import time
    def f(name):
        time.sleep(1)
        print('hello', name,time.ctime())
    
    if __name__ == '__main__':
        p_list=[]
        for i in range(3):
            p = Process(target=f, args=('alvin',))
            p_list.append(p)
            p.start()
        for i in p_list:
            p.join()
        print('end')
    
    # 继承类
    from multiprocessing import Process
    import time
    
    class MyProcess(Process):
        def __init__(self):
            super(MyProcess, self).__init__()
            #self.name = name
    
        def run(self):
            time.sleep(1)
            print ('hello', self.name,time.ctime())
    
    
    if __name__ == '__main__':
        p_list=[]
        for i in range(3):
            p = MyProcess()
            p.start()
            p_list.append(p)
    
        for p in p_list:
            p.join()
    
        print('end')
    
    # 例子
    from multiprocessing import Process
    import os
    import time
    def info(title):
    
        print("title:",title)
        print('parent process:', os.getppid())
        print('process id:', os.getpid())
    
    def f(name):
        info('function f')
        print('hello', name)
    
    if __name__ == '__main__':
        info('main process line')
        time.sleep(1)
        print("------------------")
        p = Process(target=info, args=('yuan',))
        p.start()
        p.join()
    

    进程池与线程池

    在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。

    Pool可以提供指定数量的进程供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。

    Python 同步锁

    操作锁的方法在threading 模块中 Lock()

    • threading.Lock() 会获得一把锁
    • Python 中使用acquire() 获得锁
    r = threading.Lock()
    # 加锁
    r.acquire()
    
    • Python中使用release()释放锁
    r.release()
    

    加锁后代码

    '''
    线程安全问题
    '''
    # 定义一个共享变量
    import threading
    import time
    num = 100
    r = threading.Lock()
    def sub():
        # 操作类变量
        global num
        r.acquire()
        tmp = num
        time.sleep(0.1)
        num = tmp - 1
        r.release()
    if __name__ == '__main__':
        thread_list = []
        for i in range(100):
            t1 = threading.Thread(target=sub)
            t1.start()
            thread_list.append(t1)
        for i in range(100):
            t2 = thread_list[i]
            t2.join()
    print('final num'   str(num))
    
    from multiprocessing import Process
    class MyProcess(Process):
     def __init__(self,name):
      super().__init__()
      self.name = name
     def run(self):
      print('hello,%s'%self.name)
    if __name__ == '__main__':
     myprocess1 = MyProcess('world')
     myprocess2 = MyProcess('world')
     myprocess1.start()
     myprocess2.start()
    

    Process 类

    Process([group [, target [, name [, args [, kwargs]]]]])
    
      group: 线程组,目前还没有实现,库引用中提示必须是None; 
      target: 要执行的方法; 
      name: 进程名; 
      args/kwargs: 要传入方法的参数。
    
    实例方法:
    
      is_alive():返回进程是否在运行。
    
      join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。
    
      start():进程准备就绪,等待CPU调度
    
      run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。
    
      terminate():不管任务是否完成,立即停止工作进程
    
    属性:
    
      daemon:和线程的setDeamon功能一样
    
      name:进程名字。
    
      pid:进程号。
    
    进程池:
    #!/usr/bin/env python3
    
    from multiprocessing import Process,Pool
    import time
    
    def func(i):
        time.sleep(2)
        print(i)
    
    
    
    if __name__ == '__main__':
        pool = Pool(5)
        for i in range(30):
            p = pool.apply_async(func,args=(i,))
        pool.close()
        pool.join()
    

    线程中的死锁和递归锁

    在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方释放对方的资源,就会造成死锁,因为系统判断这部分资源正在使用,所以这两个线程在无外力作用下将一直等待下去
    看个栗子:

    '''
    线程死锁
    '''
    
    import threading, time
    
    
    class myThread(threading.Thread):
        def doA(self):
            lockA.acquire()
            print(self.name, "gotlockA", time.ctime())
            time.sleep(3)
            lockB.acquire()
            print(self.name, "gotlockB", time.ctime())
            lockB.release()
            lockA.release()
    
        def doB(self):
            lockB.acquire()
            print(self.name, "gotlockB", time.ctime())
            time.sleep(2)
            lockA.acquire()
            print(self.name, "gotlockA", time.ctime())
            lockA.release()
            lockB.release()
    
        def run(self):
            self.doA()
            self.doB()
    
    
    if __name__ == "__main__":
    
        lockA = threading.Lock()
        lockB = threading.Lock()
    
        threads = []
        for i in range(5):
            threads.append(myThread())
        for t in threads:
            t.start()
        for t in threads:
            t.join()  # 等待线程结束,后面再讲。
    

    在以上程序中,多个线程互相持有对方的锁并且等待对方释放,这就形成了死锁

    Process内置方法

    并发编程,Python并发之多进程的方法实例代码。线程池:
    #!/usr/bin/env python3
    
    from multiprocessing import pool,Process
    import time
    
    def func(i):
        time.sleep(2)
        print(i)
    
    
    
    if __name__ == '__main__':
        pool = pool.ThreadPool(5)
        for i in range(30):
            pool.apply_async(func,args=(i,))
        pool.close()
        pool.join()
    

    解决死锁的方式

    • threading.RLock() 可重入锁
      为了支持在同一线程中多次请求同一资源,python提供了“可重入锁”:threading.RLock。RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次acquire。直到一个线程所有的acquire都被release,其他的线程才能获得资源。可重入锁的内部维持了一个计数器和锁对象。

    实例方法:

    信号量

    信号量用来控制线程并发数的,BoundedSemaphore或Semaphore管理一个内置的计数器,每当调用acquire()时-1,调用release()时 1
    计数器不能小于0当计数器为0时,acquire()将阻塞线程至同步锁定状态,直到其他线程调用release()。
    BoundedSemaphore与Semaphore的唯一区别在于前者将在调用release()时检查计数器的值是否超过了计数器的初始值。如果超过了将抛出一个异常

    p.start():启动进程,并调用该子进程中的p.run()

    创建信号量

    • threading.BoundedSemaphore(num) 指定信号量为num
    import threading
    
    import time
    
    
    class Mythread(threading.Thread):
        def run(self):
            # 判断是否加锁
            if semaphore.acquire():
                print(self.name)
                time.sleep(1)
                # 释放锁
                semaphore.release()
    
    
    if __name__ == '__main__':
        # 创建带有信号量的锁
        semaphore = threading.BoundedSemaphore(5)
        # 存放线程的序列
        thrs = []
        for i in range(100):
            thrs.append(Mythread())
        for t in thrs:
            t.start()
    

    p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法 

    条件变量同步

    有一类线程需要满足条件之后才能够继续执行,Python提供了threading.Condition 对象用于条件变量线程的支持,它除了能提供RLock()或Lock()的方法外,还提供了 wait()、notify()、notifyAll()方法。
    条件变量也是线程中的一把锁,但是条件变量可以实现线程间的通信,类似于Java中的唤醒和等待

    p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁

    创建条件变量锁

    • lock_con = threading.Condition(Lock/Rlock) 锁是可选选项,不传入锁对象自动创建一个RLock()
    • wait() 条件不满足时调用,线程会释放锁并进入等待阻塞
    • notify() 条件创造后调用,通知等待池激活一个线程
    • notifyAll() 条件创造后调用,通知等待池激活所有线程
      看个栗子
    '''
    线程条件变量
    '''
    import threading
    from random import randint
    
    import time
    
    
    class Producer(threading.Thread):
        def run(self):
            global L
            while True:
                val = randint(0, 100)
                print('生产者', self.name, ':Append'   str(val), L)
                if lock_con.acquire():
                    L.append(val)
                    lock_con.notify()
                    lock_con.release()
                time.sleep(3)
    
    
    class Consumer(threading.Thread):
        def run(self):
            global L
            while True:
                lock_con.acquire()
                if len(L) == 0:
                    lock_con.wait()
                print('消费者',self.name,"Delete" str(L[0]),L)
                del  L[0]
                lock_con.release()
                time.sleep(0.25)
    
    
    if __name__ == '__main__':
        L = []
        # 创建条件变量锁
        lock_con = threading.Condition()
        # 线程存放列表
        threads = []
        for i in range(5):
            threads.append(Producer())
        threads.append(Consumer())
        for t in threads:
            t.start()
        for t in threads:
            t.join()
    

    p.is_alive():如果p仍然运行,返回True

    同步条件event

    条件同步和条件变量同步差不多意思,只是少了锁功能,因为条件同步设计于不访问共享资源的条件环境。event=threading.Event():条件环境对象,初始值 为False;

    • event.isSet():返回event的状态值;

    • event.wait():如果 event.isSet()==False将阻塞线程;

    • event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;

    • event.clear():恢复event的状态值为False。
      举个栗子:

    '''
    同步条件event
    '''
    import threading
    
    import time
    
    
    class Boss(threading.Thread):
        def run(self):
            print('BOSS: 今晚加班')
            # 改变事件
            event.isSet() or event.set()
            time.sleep(5)
            print('BOSS:加班结束')
            event.isSet() or event.set()
    
    
    class Worker(threading.Thread):
        def run(self):
            event.wait()
            print('WORKER:OH NO')
            time.sleep(0.25)
            # 改变同步事件标志
            event.clear()
            event.wait()
            print('WORKER:OH YEAD!')
    
    if __name__ == '__main__':
        # 获取同步事件
        event = threading.Event()
        threads = []
        for i in range(5):
            threads.append(Worker())
        threads.append(Boss())
        for t in threads:
            t.start()
        for t in threads:
            t.join()
    

    p.join([timeout]):主线程等待p终止。timeout是可选的超时时间
    Process属性

    线程利器队列 queue

    队列是一种数据结构,队列分为先进先出(FIFO) 和 先进后出(FILO)
    Python Queue模块有三种队列及构造函数:
    1、Python Queue模块的FIFO队列先进先出。 class queue.Queue(maxsize)
    2、LIFO类似于堆,即先进后出。 class queue.LifoQueue(maxsize)
    3、还有一种是优先级队列级别越低越先出来。 class queue.PriorityQueue(maxsize)
    队列能够保证数据安全,是因为队列的内部维护着一把锁。每个去队列中取数据的都会保证数据的安全。而列表虽然具有同样的功能,但是列表不是数据安全的

    p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置

    创建一个队列

    Queue.Queue类即是一个队列的同步实现。队列长度可为无限或者有限。可通过Queue的构造函数的可选参数maxsize来设定队列长度。如果maxsize小于1就表示队列长度无限。

    p.name:进程的名称

    向队列中插入数据

    • q.put(item,block)
      调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为1。如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0,put方法将引发Full异常。

    p.pid:进程的pid

    从队列中取出数据

    • q.get()
      调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。如果队列为空且block为True,get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。

    p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)

    API

    • q.qsize() 返回队列的大小
    • q.empty() 如果队列为空,返回True,反之False
    • 新葡亰496net,q.full() 如果队列满了,返回True,反之False
    • q.full 与 maxsize 大小对应
    • q.get([block[, timeout]]) 获取队列,timeout等待时间
    • q.get_nowait() 相当q.get(False)
      非阻塞 q.put(item) 写入队列,timeout等待时间
    • q.put_nowait(item) 相当q.put(item, False)
    • q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
    • q.join() 实际上意味着等到队列为空,再执行别的操作

    守护进程

    类似于守护线程,只不过守护线程是对象的一个方法,而守护进程封装成对象的属性。

    from multiprocessing import Process
    import time
    class MyProcess(Process):
     def __init__(self,name):
      super().__init__()
      self.name = name
     def run(self):
      time.sleep(3)
      print('hello,%s'%self.name)
    if __name__ == '__main__':
     myprocess1=MyProcess('world')
     myprocess1.daemon = True
     myprocess1.start()
     print('结束')
    #不会输出‘hello world',因为设置为守护进程,主进程不会等待
    

    也可以使用join方法,使主进程等待

    from multiprocessing import Process
    import time
    class MyProcess(Process):
     def __init__(self,name):
      super().__init__()
      self.name = name
     def run(self):
      time.sleep(3)
      print('hello,%s'%self.name)
    if __name__ == '__main__':
     myprocess1=MyProcess('world')
     myprocess1.daemon = True
     myprocess1.start()
     myprocess1.join() #程序阻塞
     print('结束')
    join()
    

    进程同步和锁

    进程虽然不像线程共享资源,但是这并不意味着进程间不 需要加锁,比如不同进程会共享同一个终端 ( 屏幕),或者操作同一个文件,数据库,那么数据安全还是很有必要的,因此我们可以加锁,

    from multiprocessing import Process,Lock
    import time
    def a_print(l): #需要传入对象,因为信息不共享
     l.acquire()
     print('我要打印信息')
     time.sleep(1)
     print('我打印完了')
     l.release()
    if __name__ == '__main__':
     l = Lock()
     for i in range(20):
      p = Process(target=a_print,args=(l,))
      p.start()
    

    信号量(Semaphore)

    能够并发执行的进程数,超出的进程阻塞,直到有进程运行完成。

    Semaphore管理一个内置的计数器,

    每当调用acquire()时内置计数器-1;

    调用release() 时内置计数器 1;

    计数器不能小于0;当计数器为0时,acquire()将阻塞进程直到其他进程调用release()。

    from multiprocessing import Process,Queue,Semaphore
    import time,random
    def seat(s,n):
     s.acquire()
     print('学生%d坐下了'%n)
     time.sleep(random.randint(1,2))
     s.release()
    if __name__ == '__main__':
     s = Semaphore(5)
     for i in range(20):
      p = Process(target=seat,args=(s,i))
      p.start()
     print('-----主进程-------')
    

    注意:其实信号量和锁类似,只是限制进程运行某个代码块的数量(锁为1个),并不是能限制并发的进程,如上述代码,一次性还是创建了20个进程

    事件(Event)

    from multiprocessing import Process,Event
    import time, random
    def eating(event):
     event.wait()
     print('去吃饭的路上...')
    def makeing(event):
     print('做饭中')
     time.sleep(random.randint(1,2))
     print('做好了,快来...')
     event.set()
    if __name__ == '__main__':
     event=Event()
     t1 = Process(target=eating,args=(event,))
     t2 = Process(target=makeing,args=(event,))
     t1.start()
     t2.start()
     # 做饭中
     # 做好了,快来...
     # 去吃饭的路上...
    

    和线程事件几乎一致

    进程队列(Queue)

    进程队列是进程通讯的方式之一。使用multiprocessing 下的Queue

    from multiprocessing import Process,Queue
    import time
    def func1(queue):
     while True:
      info=queue.get()
      if info == None:
       return 
      print(info)
    def func2(queue):
     for i in range(10):
      time.sleep(1)
      queue.put('is %d'%i)
     queue.put(None) #结束的标志
    if __name__ == '__main__':
     q = Queue()
     p1 = Process(target=func1,args=(q,))
     p2 = Process(target=func2, args=(q,))
     p1.start()
     p2.start()
    Queue类的方法,源码如下:
    class Queue(object):
     def __init__(self, maxsize=-1): #可以传参设置队列最大容量
      self._maxsize = maxsize
     def qsize(self): #返回当前时刻队列中的个数
      return 0
     def empty(self): #是否为空
      return False
     def full(self): 是否满了
      return False
     def put(self, obj, block=True, timeout=None): #放值,blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常
      pass
     def put_nowait(self, obj): #=put(False)
      pass
     def get(self, block=True, timeout=None): 获取值,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
      pass
     def get_nowait(self): # = get(False)
      pass
     def close(self): #将队列关闭
      pass
     def join_thread(self): #略,几乎不用
      pass
     def cancel_join_thread(self):
      pass
    

    进程队列源码注释

    进程池

    进程的消耗是很大的,因此我们不能无节制的开启新进程,因此我们可以 通过维护一个进程池来控制进程的数量 。这就不同于信号量,进程池可以从源头控制进程数量。在Python中可以通过如下方法使用

    同步调用

    from multiprocessing import Pool
    import time, random, os
    def func(n):
     pid = os.getpid()
     print('进程%s正在处理第%d个任务'%(pid,n),'时间%s'%time.strftime('%H-%M-%S'))
     time.sleep(2)
     res = '处理%s'%random.choice(['成功','失败'])
     return res
    if __name__ == '__main__':
     p = Pool(4) #创建4个进程,
     li = []
     for i in range(10):
      res = p.apply(func,args=(i,)) 交给进程池处理,处理完成才返回值,会阻塞,即使池内还有空余进程,相当于顺序执行
      li.append(res)
     for i in li:
      print(i)
    

    #进程1916正在处理第0个任务 时间21-02-53
    #进程1240正在处理第1个任务 时间21-02-55
    #进程3484正在处理第2个任务 时间21-02-57
    #进程7512正在处理第3个任务 时间21-02-59
    #进程1916正在处理第4个任务 时间21-03-01
    #进程1240正在处理第5个任务 时间21-03-03
    #进程3484正在处理第6个任务 时间21-03-05
    #进程7512正在处理第7个任务 时间21-03-07
    #进程1916正在处理第8个任务 时间21-03-09
    #进程1240正在处理第9个任务 时间21-03-11

    从结果可以发现两点:

    1. 不是并发处理
    2. 一直都只有四个进程,串行执行

    因此进程池提供了 异步处理 的方式

    from multiprocessing import Pool
    import time, random, os
    def func(n):
     pid = os.getpid()
     print('进程%s正在处理第%d个任务'%(pid,n),'时间%s'%time.strftime('%H-%M-%S'))
     time.sleep(2)
     res = '处理%s'%random.choice(['成功','失败'])
     return res
    
    if __name__ == '__main__':
     p = Pool(4)
     li = []
     for i in range(10):
      res = p.apply_async(func,args=(i,)) 结果不会立刻返回,遇到阻塞,开启下一个进程,在这,相当于几乎同时出现四个打印结果(一个线程处理一个任务,处理完下个任务才能进来)
      li.append(res)
    
     p.close() #join之前需要关闭进程池
     p.join() #因为异步,所以需要等待池内进程工作结束再继续
     for i in li:
      print(i.get()) #i是一个对象,通过get方法获取返回值,而同步则没有该方法
    

    关于回调函数

    from multiprocessing import Pool
    import time, random, os
    def func(n):
     pid = os.getpid()
     print('进程%s正在处理第%d个任务'%(pid,n),'时间%s'%time.strftime('%H-%M-%S'))
     time.sleep(2)
     res = '处理%s'%random.choice(['成功','失败'])
     return res
    
    def foo(info):
     print(info) #传入值为进程执行结果
    
    if __name__ == '__main__':
     p = Pool(4)
     li = []
     for i in range(10):
      res = p.apply_async(func,args=(i,),callback = foo) callback()回调函数会在进程执行完之后调用(主进程调用) 
      li.append(res)
    
     p.close() 
     p.join() 
     for i in li:
      print(i.get()) 
    

    有回调函数

    总结

    以上所述是小编给大家介绍的Python并发之多进程的方法实例代码,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对脚本之家网站的支持!

    您可能感兴趣的文章:

    • Python多进程并发(multiprocessing)用法实例详解
    • Python控制多进程与多线程并发数总结
    • python并发编程之多进程、多线程、异步和协程详解
    • Python多进程并发与多线程并发编程实例总结
    • Python 多进程并发操作中进程池Pool的实例
    • 理论讲解python多进程并发编程
    • Python多进程原理与用法分析
    • Python3多进程 multiprocessing 模块实例详解
    • Python多进程库multiprocessing中进程池Pool类的使用详解
    • Python多进程与服务器并发原理及用法实例分析

    本文由新葡亰496net发布于奥门新萄京娱乐场,转载请注明出处:并发编程,Python并发之多进程的方法实例代码

    关键词:

上一篇:没有了

下一篇:没有了