Python多进程教程

Python2.6版本中新添了multiprocessing模块。它最初由Jesse Noller和Richard Oudkerk定义在PEP 371中。就像你能通过threading模块衍生线程一样,multiprocessing 模块允许你衍生进程。这里用到的思想:因为你现在能衍生进程,所以你能够避免使用全局解释器锁(GIL),并且充分利用机器的多个处理器。

为卓尼等地区用户提供了全套网页设计制作服务,及卓尼网站建设行业解决方案。主营业务为成都做网站、成都网站设计、卓尼网站设计,以传统方式定制建设网站,并提供域名空间备案等一条龙服务,秉承以专业、用心的态度为用户提供真诚的服务。我们深信只要达到每一位用户的要求,就会得到认可,从而选择与我们长期合作。这样,我们也可以走得更远!

多进程包也包含一些根本不在threading 模块中的API。比如:有一个灵活的Pool类能让你在多个输入下并行化地执行函数。我们将在后面的小节讲解Pool类。我们将以multiprocessing模块的Process类开始讲解。

开始学习multiprocessing模块

Process这个类和threading模块中的Thread类很像。让我们创建一系列调用相同函数的进程,并且看看它是如何工作的。

 
 
 
 
  1. import os
  2. from multiprocessing import Process
  3. def doubler(number):
  4.     """
  5.     A doubling function that can be used by a process
  6.     """
  7.     result = number * 2
  8.     proc = os.getpid()
  9.     print('{0} doubled to {1} by process id: {2}'.format(
  10.         number, result, proc))
  11. if __name__ == '__main__':
  12.     numbers = [5, 10, 15, 20, 25]
  13.     procs = []
  14.     for index, number in enumerate(numbers):
  15.         proc = Process(target=doubler, args=(number,))
  16.         procs.append(proc)
  17.         proc.start()
  18.     for proc in procs:
  19.         proc.join()

对于上面的例子,我们导入Process类、创建一个叫doubler的函数。在函数中,我们将传入的数字乘上2。我们也用Python的os模块来获取当前进程的ID(pid)。这个ID将告诉我们哪个进程正在调用doubler函数。然后,在下面的代码块中,我们实例化了一系列的Process类并且启动它们。***一个循环只是调用每个进程的join()方法,该方法告诉Python等待进程直到它结束。如果你需要结束一个进程,你可以调用它的terminate()方法。

当你运行上面的代码,你应该看到和下面类似的输出结果:

 
 
 
 
  1. 5 doubled to 10 by process id: 10468
  2. 10 doubled to 20 by process id: 10469
  3. 15 doubled to 30 by process id: 10470
  4. 20 doubled to 40 by process id: 10471
  5. 25 doubled to 50 by process id: 10472

有时候,你***给你的进程取一个易于理解的名字 。幸运的是,Process类确实允许你访问同样的进程。让我们来看看如下例子:

 
 
 
 
  1. import os
  2. from multiprocessing import Process, current_process
  3. def doubler(number):
  4.     """
  5.     A doubling function that can be used by a process
  6.     """
  7.     result = number * 2
  8.     proc_name = current_process().name
  9.     print('{0} doubled to {1} by: {2}'.format(
  10.         number, result, proc_name))
  11. if __name__ == '__main__':
  12.     numbers = [5, 10, 15, 20, 25]
  13.     procs = []
  14.     proc = Process(target=doubler, args=(5,))
  15.     for index, number in enumerate(numbers):
  16.         proc = Process(target=doubler, args=(number,))
  17.         procs.append(proc)
  18.         proc.start()
  19.     proc = Process(target=doubler, name='Test', args=(2,))
  20.     proc.start()
  21.     procs.append(proc)
  22.     for proc in procs:
  23.         proc.join()

这一次,我们多导入了current_process。current_process基本上和threading模块的current_thread是类似的东西。我们用它来获取正在调用我们的函数的线程的名字。你将注意到我们没有给前面的5个进程设置名字。然后我们将第6个进程的名字设置为“Test”。

让我们看看我们将得到什么样的输出结果:

 
 
 
 
  1. 5 doubled to 10 by: Process-2
  2. 10 doubled to 20 by: Process-3
  3. 15 doubled to 30 by: Process-4
  4. 20 doubled to 40 by: Process-5
  5. 25 doubled to 50 by: Process-6
  6. 2 doubled to 4 by: Test

输出结果说明:默认情况下,multiprocessing模块给每个进程分配了一个编号,而该编号被用来组成进程的名字的一部分。当然,如果我们给定了名字的话,并不会有编号被添加到名字中。

multiprocessing模块支持锁,它和threading模块做的方式一样。你需要做的只是导入Lock,获取它,做一些事,释放它。

 
 
 
 
  1. from multiprocessing import Process, Lock
  2. def printer(item, lock):
  3.     """
  4.     Prints out the item that was passed in
  5.     """
  6.     lock.acquire()
  7.     try:
  8.         print(item)
  9.     finally:
  10.         lock.release()
  11. if __name__ == '__main__':
  12.     lock = Lock()
  13.     items = ['tango', 'foxtrot', 10]
  14.     for item in items:
  15.         p = Process(target=printer, args=(item, lock))
  16.         p.start()

我们在这里创建了一个简单的用于打印函数,你输入什么,它就输出什么。为了避免线程之间互相阻塞,我们使用Lock对象。代码循环列表中的三个项并为它们各自都创建一个进程。每一个进程都将调用我们的函数,并且每次遍历到的那一项作为参数传入函数。因为我们现在使用了锁,所以队列中下一个进程将一直阻塞,直到之前的进程释放锁。

日志

为进程创建日志与为线程创建日志有一些不同。它们存在不同是因为Python的logging包不使用共享锁的进程,因此有可能以来自不同进程的信息作为结束的标志。让我们试着给前面的例子添加基本的日志。代码如下:

 
 
 
 
  1. import logging
  2. import multiprocessing
  3. from multiprocessing import Process, Lock
  4. def printer(item, lock):
  5.     """
  6.     Prints out the item that was passed in
  7.     """
  8.     lock.acquire()
  9.     try:
  10.         print(item)
  11.     finally:
  12.         lock.release()
  13. if __name__ == '__main__':
  14.     lock = Lock()
  15.     items = ['tango', 'foxtrot', 10]
  16.     multiprocessing.log_to_stderr()
  17.     logger = multiprocessing.get_logger()
  18.     logger.setLevel(logging.INFO)
  19.     for item in items:
  20.         p = Process(target=printer, args=(item, lock))
  21.         p.start()

最简单的添加日志的方法通过推送它到stderr实现。我们能通过调用thelog_to_stderr() 函数来实现该方法。然后我们调用get_logger 函数获得一个logger实例,并将它的日志等级设为INFO。之后的代码是相同的。需要提示下这里我并没有调用join()方法。取而代之的:当它退出,父线程将自动调用join()方法。

当你这么做了,你应该得到类似下面的输出:

 
 
 
 
  1. [INFO/Process-1] child process calling self.run()
  2. tango
  3. [INFO/Process-1] process shutting down
  4. [INFO/Process-1] process exiting with exitcode 0
  5. [INFO/Process-2] child process calling self.run()
  6. [INFO/MainProcess] process shutting down
  7. foxtrot
  8. [INFO/Process-2] process shutting down
  9. [INFO/Process-3] child process calling self.run()
  10. [INFO/Process-2] process exiting with exitcode 0
  11. 10
  12. [INFO/MainProcess] calling join() for process Process-3
  13. [INFO/Process-3] process shutting down
  14. [INFO/Process-3] process exiting with exitcode 0
  15. [INFO/MainProcess] calling join() for process Process-2

现在如果你想要保存日志到硬盘中,那么这件事就显得有些棘手。你能在Python的logging Cookbook阅读一些有关那类话题。

Pool类

Pool类被用来代表一个工作进程池。它有让你将任务转移到工作进程的方法。让我们看下面一个非常简单的例子。

 
 
 
 
  1. from multiprocessing import Pool
  2. def doubler(number):
  3.     return number * 2
  4. if __name__ == '__main__':
  5.     numbers = [5, 10, 20]
  6.     pool = Pool(processes=3)
  7.     print(pool.map(doubler, numbers))

基本上执行上述代码之后,一个Pool的实例被创建,并且该实例创建了3个工作进程。然后我们使用map 方法将一个函数和一个可迭代对象映射到每个进程。***我们打印出这个例子的结果:[10, 20, 40]。

你也能通过apply_async方法获得池中进程的运行结果:

 
 
 
 
  1. from multiprocessing import Pool
  2. def doubler(number):
  3.     return number * 2
  4. if __name__ == '__main__':
  5.     pool = Pool(processes=3)
  6.     result = pool.apply_async(doubler, (25,))
  7.     print(result.get(timeout=1))

我们上面做的事实际上就是请求进程的运行结果。那就是get函数的用途。它尝试去获取我们的结果。你能够注意到我们设置了timeout,这是为了预防我们调用的函数发生异常的情况。毕竟我们不想要它被***期地阻塞。

进程通信

当遇到进程间通信的情况,multiprocessing 模块提供了两个主要的方法:Queues 和 Pipes。Queue 实现上既是线程安全的也是进程安全的。让我们看一个相当简单的并且基于 Queue的例子。代码来自于我的文章(threading articles)。

 
 
 
 
  1. from multiprocessing import Process, Queue
  2. sentinel = -1
  3. def creator(data, q):
  4.     """
  5.     Creates data to be consumed and waits for the consumer
  6.     to finish processing
  7.     """
  8.     print('Creating data and putting it on the queue')
  9.     for item in data:
  10.         q.put(item)
  11. def my_consumer(q):
  12.     """
  13.     Consumes some data and works on it
  14.     In this case, all it does is double the input
  15.     """
  16.     while True:
  17.         data = q.get()
  18.         print('data found to be processed: {}'.format(data))
  19.         processed = data * 2
  20.         print(processed)
  21.         if data is sentinel:
  22.             break
  23. if __name__ == '__main__':
  24.     q = Queue()
  25.     data = [5, 10, 13, -1]
  26.     process_one = Process(target=creator, args=(data, q))
  27.     process_two = Process(target=my_consumer, args=(q,))
  28.     process_one.start()
  29.     process_two.start()
  30.     q.close()
  31.     q.join_thread()
  32.     process_one.join()
  33.     process_two.join()

在这里我们只需要导入Queue和Process。Queue用来创建数据和添加数据到队列中,Process用来消耗数据并执行它。通过使用Queue的put()和get()方法,我们就能添加数据到Queue、从Queue获取数据。代码的***一块只是创建了Queue 对象以及两个Process对象,并且运行它们。你能注意到我们在进程对象上调用join()方法,而不是在Queue本身上调用。

总结

我们这里有大量的资料。你已经学习如何使用multiprocessing模块指定不变的函数、使用Queues在进程间通信、给进程命名等很多事。在Python文档中也有很多本文没有接触到的知识点,因此也务必深入了解下文档。与此同时,你现在知道如何用Python利用你电脑所有的处理能力了!

相关阅读

  • 有关multiprocessing模块的Python文档(multiprocessing module)
  • Python模块周刊:multiprocessing
  • Python的并发–Porting a Queue to multiprocessing

分享标题:Python多进程教程
URL分享:http://www.hantingmc.com/qtweb/news33/280833.html

网站建设、网络推广公司-创新互联,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联