• jimmy Windows 7    4月10日 10:10

    微博分享

    标签: Python

    Table of Contents

    1.1. linux/unix/win 启动方式对比

    1.1.1. spawn win的默认方法支持unix/win

    1.1.2. fork unix的默认方法仅支持unix

    1.1.3. forkserver 当平台支持unix的管道文件时该方法可用

    1.1.5. 选择启动的方法

    1.4.1. 创建Pool([processes[initalizer[initargs[maxtasksperchild[context]]]])

    1.4.2. AsyncResult apply async map async与starmap async的返回对象

    1.5.1. 使用ArrayValue作为存储空间来保存需要共享的资源

    1.5.3. 使用服务进程server process

    1 多进程

    1.1 linux/unix/win 启动方式对比

    在不同平台上系统使用的多进程机制是不一样的所以在python实现中有三种不同的开启多进程的方式

    1.1.1 spawn win的默认方法支持unix/win

    父进程开启一个新的python解释程序 子进程只获得足够运行run()方法的资源 父进程的文件描述符和句柄不被继承 此方式的速度在三种方式中最慢

    run() 该方法是target的参数参数是一个可调用对象即可

    经测试当run调用的是function时资源仍无法保留只有调用一个可调用对象且初始化方法( init )除self外有其他参数才行

    def hello():

        pass

    class Hello():

        def __init__(selfname):

    print(name)

    hello.__init__()  # 不需要传参所以资源无法保留

    hello_cla = Hello('name')

    hello_cla.__init__('name')  # 有传参资源才能保留

    实例:

    from multiprocessing import Process Queue

    import multiprocessing as mp

    import os

    def hello_procs(name):

        print("这是{}{}进程".format(nameos.getpid()))

        print("父进程ID%s"%os.getppid())

        try:

    print(sourceA)

    print("调用成功因为未使用spawn方式")

        except Exception as e:

    print("调用失败因为该资源不是必须资源")

    print(e)

    class Sou():

        def __init__(selfsoua):

    self.soua = soua

        def show_soua(self):

    print(self.soua)

        def __call__(self):

    self.show_soua()

    if __name__ == '__main__':

        sourceA = "AA"

        sourceB = "BB"

        # 使用forkserver在资源继承方面与spawn表现一致

        mp.set_start_method('spawn')

        print("当前进程的进程id是%s"%os.getpid())

        #p = Process(target=hello_procs args=('第一个进程'))

        a = Sou(sourceA)

        p = Process(target=a)

        """

        上面的是不带参数的传递方式如需要给可调用函数传递参数那么需要修改为__call__(self参数)

        再接下来就是修改Process参数为(target=aargs=('参数'))

        """

        print('开始进入子进程')

        p.start()

        p.join()

        print("进程结束")

    1.1.2 fork unix的默认方法仅支持unix

    使用unix的fork()[os.fork()]来创建一个当前解释器的子进程 子进程获得父进程全部的资源 此方式的安全问题不好控制

    1.1.3 forkserver 当平台支持unix的管道文件时该方法可用

    在使用这种方式时开启多进程会开启一个额外的服务进程 当需要一个子进程时父进程去请求服务进程并得到一个子进程 由于服务进程是单线程的所以该方式是线程安全的

    1.1.4 启动子进程

    创建进程对象 表示在单独进程中运行的活动 

    Process(group=Nonetarget=Nonename=Noneargs=()kwargs={}*daemon=None) group一直为None即可只是为了和threading.Thread兼容 target 是run()要调用的对象需要是可执行的 name 无实意名字 args 可调用对象的位置参数 kwargs 可调用对象的关键字参数 daemon True/False/None 与过程继承有关

    run() 表示进程活动的方法

    start() 启动进程的活动

    is alive () 检测子进程是否存活只能检测子进程

    join([timeout]) 阻塞调用该方法的进程 

    None 阻塞直到进程完毕 正数 阻塞timeout秒

    daemon 标识该进程是否为守护进程True是False不是None从上层继承 

    主进程不是守护进程所以只要不明确指定为True那么创建的所有进程都不是守护进程

    from multiprocessing import Process as ps

    import sys

    def hello(*a=None):

        if a == None:

    pass

        else:

    print(a)

    def not_sh():

        global pishpfshex

        print("非守护进程%s"%ex)

        pish = ps(target=is_sh args=(1) daemon=True)

        pfsh = ps(target=is_sh args=('非守护进程')) #daemon为None从上层继承属性

        pish.start()

        pfsh.start()

        for i in range(1): #当该值为1时可以看到守护进程在打印出消息前就退出了

    print("第二个主进程")

        ex="有变化"

    def is_sh(jc_type):

        if jc_type == 1:

    print("守护进程")

    for i in range(1000):

       print("守护进程")

        else:

           print()

           print("不是守护进程")

           for i in range(100):

      print("非守护进程")

    if __name__ == '__main__':

        p = ps(target=hello name='hello' args=() kwargs={'a':'A'})

        print("进程是否存活%s"%p.is_alive())

        p.start()

        print("守护进程daemon? %s"%p.daemon)

        print("进程是否存活%s"%p.is_alive())

        # 创建一个非守护进程

        pish pfsh =NoneNone

        ex = "原始"

        pnsh = ps(target=not_sh)

        pnsh.start()

        pnsh.join()

        print(ex)

    1.1.5 选择启动的方法

    import multiprocessing

    multiprocessing.set_start_method('spawn') #传入方式的名字

    # 该方法在程序中至多使用一次

    1.1.6 其他内容

    使用global可以将数据发送到子进程但是子进程对数据的修改不会反馈到父进程

    1.2 进程通信

    1.2.1 队列 Queue

    Queue([maxsize]) 创建并可以设置最大值

    qsize() 返回队列大致大小(不准确是因为并发) 在MAC上回引发异常NotImplementedError

    empty() 是否为空不准确

    full() 队列是否已满不准确

    put(obj[block[timeout]])

    put nowait (obj) 相当于put(objFalse)

    get nowite ()

    close() 没有返回值关闭队列后台线程将数据一次性刷新到管道当关闭后仍去操作会抛出异常OSError: handle is closed

    join thread () 只能在close()后调用他阻塞直到后台线程退出确保数据刷新到管道

    cancel join thread () 立即关闭队列不等待后台线程将数据刷新到管道

    1.2.2 TODO 管道 Pipes

    1.2.3 实例

    """进程间通信"""

    from multiprocessing import Process Queue Pipe

    import os random

    def write(m):

        print('进程:%s'%os.getpid())

        m.put('数据A')

        """

        put(obj[block[timeout]])

        将值放入队列

        当block为True(默认值)且timeout为None(默认值)时不会抛出异常会一直等到可以入队时将值入队

        当timeout为正值时等待timeout秒超时则抛出queue.Full异常

        当block为False时一旦无法入队立即抛出异常

        """

    def read(m):    

        print('进程:%s'%os.getpid())

        try:

    print(m.qsize())

        except Exception as e:

    print("在MAC上会引发异常")

        finally:

    value = m.get()

        """

        get([block[timeout]])

        获取一个值后删除

        当block为True(默认值)且timeout为None(默认值)那么只有当队列中有内容时获取值

        timeout为正数时当队列中无值时阻塞timeout秒而后仍无值则抛出queue.Empty异常

        block为False时一旦无值立即抛出异常

        """

        print(value)

    if __name__ == '__main__':

        q = Queue()

        pw = Process(target=write name='写进程' args=(q))

        print('开始写入数据 %s'%pw.name end=' :  ')

        pw.start()

        pr = Process(target=read name='读进程' args=(q))

        print('开始读取数据 %s'%pr.name end=' :  ')

        pr.start()

    # TODO Pipe 通过管道传递消息

    1.3 进程同步

    1.3.1 锁Lock

    一旦进程线程获得了锁那么随后的任何进程线程在获取锁时将阻塞

    acquire(block=True timeout=None) 

    当block为True方法调用将阻塞直到解锁 当block为True时timeout为正数那么最多只能被阻塞timeout秒当timeout为负数阻塞时长为0当为None一直阻塞

    release() 释放锁 

    一把锁可以被任意对象释放未必是上锁的对象来解锁

    1.3.2 实例

    from multiprocessing import Process Lock

    def show_lock(l):

        #l.release() 在try_get_lock中上的锁可以在这里解开

        l.acquire(True-1)

        # 超时时长为负数即使被锁定也会执行

        print("函数正常执行")

    def try_get_lock(l):

        l.acquire()

        print("获得了锁")

    #    l.release()

    if __name__ == '__main__':

        l = Lock()

        #l.acquire(True)

        pg = Process(target=try_get_lockargs=(l))

        pg.start()

        ps = Process(target=show_lockargs=(l))

        ps.start()

    1.4 进程池 Pool

    1.4.1 创建Pool([processes[initalizer[initargs[maxtasksperchild[context]]]])

    processes 进程的数量 initializer 如果不为None则在每个工作进程启动时调用initializer(*initargs) maxtasksperchild context 工作进程的上下文

    该类实现了上下文管理

    apply(func[args[kwds]]) 

    使用argskwds调用func直到结果完成

    apply async (func[args[kwds[callback[error callback]]]] ) 

    返回一个结果对象 返回的对象是 AsyncResult 当指定callback(一个接受单参数的可调用对象)时完成时会调用callback调用失败则调用error callback 回调应该立即完成否则线程将会阻塞

    map(funciterable[chunksize]) 

    与内置函数map()相同它阻塞直到map完成

    map async (funciterable[chunksize[callback[error callback]]] ) 

    返回结果的map()

    imap(funciterable[chunkszie]) 

    惰性map() chunkszie参数与map()方法的参数相同

    starmap(funciterable[chunksize]) iterable必须为可迭代对象 

    需要注意'abc'也是可迭代对象一旦加上()('abc')更不行 func('abc') 会给func传入三个参数而不是一个整体 正确做法 传入(('abc'))同理传入其他可迭代内容也可以这样做

    starmap async (funciterable[chuunksize[callback[error back]]] ) 

    将iterable拆分后调用func并返回一个结果对象

    close() 

    一旦任务完成退出进程

    terminate() 

    立即停止进程并退出

    join() 

    等待进程结束在此之前必须调用close或terminate

    1.4.2 AsyncResult apply async map async与starmap async的返回对象

    get([timeout]) 返回结果并要求在timeout秒内到达当timeout不为None时N秒内未到达抛出异常TimeoutError

    wait([timeout]) 等待结果或直到N秒超时

    ready() 判断返回是否就绪

    successful() 返回调用是否完成且无异常

    1.4.3 实例

    from multiprocessing import Pool TimeoutError Process

    import time

    import os

    def proc_pool(name):

        print("asd")

        for i in range(5):

    print(str(i)+' : %s')

        #return "返回的结果值""有两个会怎样?"  不要返回一个以上的值会导致map调用产生歧义(使用map(func[12])时会返回[返回值1返回值2]而不是[(返回值1返回值2)(返回值1返回值2)]) 当需要返回两个值要显式的返回一个元组

        #return ("返回的结果值""第二个值")

        return "返回值"

    def proc_err(name):

        raise Exception

    def proc_mm(name):

        print('该函数被调用了%s%s'%(nametype(name)))

        return name

    if __name__ == '__main__':

        print("开始启动线程池")

        p = Pool(4)

        for i in range(5):

    p.apply_async(proc_pool args=('cc'))

        #p.map(proc_pool['cc''dd'])

        def callback(name):

    print("回调函数%s")

        def err_callback(err):

    try:

       print("yc")

    except Exception as e:

       print('发生异常')

    finally:

       print("ww")

        mapr = p.map_async(proc_err 'ee' 3 callback err_callback)

        #mapr.get() 获得可调用对象的返回值

        #print("返回的结果值%s"%mapr.get())

        #mmap = p.starmap(proc_mm[('abcd')('a')])

        mmap = p.starmap(proc_mm((('abc'))))

        mmaps = p.starmap_async(proc_mm((('abc'))))

        list(mmap)

        print(mmaps.get())

        p.close()

        p.join()

    1.5 资源共享

    1.5.1 使用ArrayValue作为存储空间来保存需要共享的资源

    Value(typecode or type *args lock=True) Array(typecode or type size or initializer * lock=True)

    ArrayValue的共通之处 

    在创建存储空间时在lock的参数选取上默认情况是自己创建一个资源锁 但是也可以选择使用一个已经存在的锁当lock被传入一个已存在的锁时受该锁影响 当设置为False时资源不被锁保护导致线程不安全

    tyoecode or type 都是array模块使用的类型代码 array 表示基本类型的数组有:字符整数浮点数

    区别 

    Array存储一个队列Value存储一个值 Array的size of initializer就是保存的数组 同时该数组的长度也是Array的长度

    1.5.2 实例

    """

    进程共享内容

    使用ValueArray使内容共享

    """

    from multiprocessing import Process Value Array Lock

    def f(n a):

        n.value = 3.1415927

        for i in range(len(a)):

    a[i] = -a[i]

    def fun(l strr):

        # 获得锁当被锁住等待最多3秒继续执行

        l.acquire(True3)

        try:

    print(num.value)

    print(strr.value)

    print(chr(strr.value))

        except Exception as ex:

    print(ex)

        finally:

    print('完成')

    if __name__ == '__main__':

        l = Lock()

        num = Value('d' 0.0)

        arr = Array('i' range(10))

        lisi = [123]

        arrs = Array('i'lisi)

        # 因为python中没有char类型所以在这里只能转换为数字最后在转回来

        strr = Value('b'ord('c'))

        p = Process(target=f args=(num arrs))

        pl = Process(target=fun args=(lstrr))

        # 上锁

        l.acquire()

        p.start()

        p.join()

        pl.start()

        pl.join()

        print(num.value)

        print(arrs[:])

    1.5.3 使用服务进程server process

    使用Manager()会返回一个管理对象 

    该管理对象支持的类型更广泛有: list dict Namespace Lock RLock Semaphore BoundedSemaphore Condition Event Barrier Queue Value Array

    该类实现了上下文管理

    Manager的两个子类Manager()返回的就是SyncManager 

    baseManager([adress[authkey]]) 

    adress是管理器进程侦听新链接的地址None为随机选一个 authkey是认证密匙None为使用current process ().authkey否则使用authkey必须为字符串 current process () 返回当前Process对象 authkey 进程的认证密钥(字节字符串) 当初始化multiprocessing时,使用os.urandom()为主进程分配一个随机字符串 当创建Process对象时,它将继承其父进程的认证密钥 但可以通过将authkey设置为另一个字节字符串来更改。

    start([initializer[initargs]]) 

    启动子过程以启动管理器

    get server () 

    返回Server对象他表示在manger控制下的实际服务器

    connect() 

    本地管理器对象链接到远程管理器进程

    shutdown() 

    停止manager进程仅当启动使用start()时可用

    register(typeid[ callable[ proxytype[ exposed[ method to typeid[ create method]]]]] ) 

    向Manager注册类型或可调用的类方法

    typeid 用于标识特定类型的共享对象的类型标识符必须是字符串 callable 用于typeid类型的可调用选项 proxytype 是baseProxy的子类用于创建typeid的共享对象代理None自动创建 exposed 用于指定代理类型所使用的方法 method to typeid 返回代理类型的公开方法 create method 确定是否使用typeid创建方法默认为True

    SyncManager 

    baseManager 主要用来创建自定义的Manager

    Queue([maxsize]) 创建queue.Queue对象返回其代理 

    在进程通信中展示了部分Queue队列的使用方法

    Array(typecodesequence) 创建一个数组并返回其代理 

    在进程共享中展示了部分Array的用法

    Value(typecodevalue) 创建一个值并返回其代理 

    在进程共享中展示了部分Value的用法

    dict([dict]) 创建一个dict并返回其代理

    list([list]) 创建一个list并返回其代理

    Lock() 创建一个threading.Lock对象并返回其代理

    实例 

    from multiprocessing import Process Manager

    def f(d l q a v lo):

        d[1] = '1'

        d['2'] = 2

        d[0.25] = None

        q.put(100)

        lo.acquire(True3)

        for i in range(len(a)):

    a[i]=1

        vvalue = 100

        l.reverse()

    if __name__ == '__main__':

        with Manager() as manager:

    d = manager.dict()

    l = manager.list(range(10))

    q = manager.Queue(10)

    a = manager.Array('i'[123])

    v = manager.Value('i'3)

    lo = manager.Lock()

    lo.acquire(True)

    p = Process(target=f args=(d l q a v lo))

    p.start()

    p.join()

    print(d)

    print(l)

    print("********")

    print(q.get())

    print(a[:])

    print(v.value)

    上一篇:PHP命名空间(Namespace)的使用详解

    下一篇:Python创建单例模式的三种方式

  • 亲,使用会员登录 QQ帐号登录 后,在线交流才会学到更多知识哦~