韦德国际1946手机版:模块实例详解,Python多进度

作者: 韦德国际1946手机版  发布:2019-05-28

正文实例讲述了Python三多进程 multiprocessing 模块。分享给大家供大家参谋,具体如下:

multiprocessing模块

Python多进度multiprocessing用法实例分析,

正文实例讲述了Python多进程multiprocessing用法。分享给大家供我们参谋,具体如下:

mutilprocess简介

韦德国际1946手机版:模块实例详解,Python多进度multiprocessing用法实例深入分析。像线程同样管理进度,那么些是mutilprocess的中坚,他与threading非凡形似,对多核CPU的利用率会比threading好的多。

轻易易行的创导进程:

import multiprocessing
def worker(num):
  """thread worker function"""
  print 'Worker:', num
  return
if __name__ == '__main__':
  jobs = []
  for i in range(5):
    p = multiprocessing.Process(target=worker, args=(i,))
    jobs.append(p)
    p.start()

明显当前的进程,就是给进程命名,方便标记区分,追踪

import multiprocessing
import time
def worker():
  name = multiprocessing.current_process().name
  print name, 'Starting'
  time.sleep(2)
  print name, 'Exiting'
def my_service():
  name = multiprocessing.current_process().name
  print name, 'Starting'
  time.sleep(3)
  print name, 'Exiting'
if __name__ == '__main__':
  service = multiprocessing.Process(name='my_service',
                   target=my_service)
  worker_1 = multiprocessing.Process(name='worker 1',
                    target=worker)
  worker_2 = multiprocessing.Process(target=worker) # default name
  worker_1.start()
  worker_2.start()
  service.start()

护理进程便是不阻拦主程序退出,本身干本人的 mutilprocess.setDaemon(True)就这句等待守护进度退出,要丰裕join,join能够流传浮点数值,等待n久就区别了

关照进度:

import multiprocessing
import time
import sys
def daemon():
  name = multiprocessing.current_process().name
  print 'Starting:', name
  time.sleep(2)
  print 'Exiting :', name
def non_daemon():
  name = multiprocessing.current_process().name
  print 'Starting:', name
  print 'Exiting :', name
if __name__ == '__main__':
  d = multiprocessing.Process(name='daemon',
                target=daemon)
  d.daemon = True
  n = multiprocessing.Process(name='non-daemon',
                target=non_daemon)
  n.daemon = False
  d.start()
  n.start()
  d.join(1)
  print 'd.is_alive()', d.is_alive()
  n.join()

最佳利用 poison pill,强制的运用terminate()注意 terminate之后要join,使其得以创新情况

截止进度:

import multiprocessing
import time
def slow_worker():
  print 'Starting worker'
  time.sleep(0.1)
  print 'Finished worker'
if __name__ == '__main__':
  p = multiprocessing.Process(target=slow_worker)
  print 'BEFORE:', p, p.is_alive()
  p.start()
  print 'DURING:', p, p.is_alive()
  p.terminate()
  print 'TERMINATED:', p, p.is_alive()
  p.join()
  print 'JOINED:', p, p.is_alive()

壹. == 0 未变动任何错误 
2. 0 进度有二个荒谬,并以该错误码退出
叁. < 0 进程由叁个-1 * exitcode能量信号甘休

进程的退出状态:

import multiprocessing
import sys
import time
def exit_error():
  sys.exit(1)
def exit_ok():
  return
def return_value():
  return 1
def raises():
  raise RuntimeError('There was an error!')
def terminated():
  time.sleep(3)
if __name__ == '__main__':
  jobs = []
  for f in [exit_error, exit_ok, return_value, raises, terminated]:
    print 'Starting process for', f.func_name
    j = multiprocessing.Process(target=f, name=f.func_name)
    jobs.append(j)
    j.start()
  jobs[-1].terminate()
  for j in jobs:
    j.join()
    print 's.exitcode = %s' % (j.name, j.exitcode)

福利的调弄整理,能够用logging

日志:

import multiprocessing
import logging
import sys
def worker():
  print 'Doing some work'
  sys.stdout.flush()
if __name__ == '__main__':
  multiprocessing.log_to_stderr()
  logger = multiprocessing.get_logger()
  logger.setLevel(logging.INFO)
  p = multiprocessing.Process(target=worker)
  p.start()
  p.join()

动用class来创建进度,定制子类

派生进度:

import multiprocessing
class Worker(multiprocessing.Process):
  def run(self):
    print 'In %s' % self.name
    return
if __name__ == '__main__':
  jobs = []
  for i in range(5):
    p = Worker()
    jobs.append(p)
    p.start()
  for j in jobs:
    j.join()

python进度间传递音信:

import multiprocessing
class MyFancyClass(object):
  def __init__(self, name):
    self.name = name
  def do_something(self):
    proc_name = multiprocessing.current_process().name
    print 'Doing something fancy in %s for %s!' % 
      (proc_name, self.name)
def worker(q):
  obj = q.get()
  obj.do_something()
if __name__ == '__main__':
  queue = multiprocessing.Queue()
  p = multiprocessing.Process(target=worker, args=(queue,))
  p.start()
  queue.put(MyFancyClass('Fancy Dan'))
  # Wait for the worker to finish
  queue.close()
  queue.join_thread()
  p.join()
import multiprocessing
import time
class Consumer(multiprocessing.Process):
  def __init__(self, task_queue, result_queue):
    multiprocessing.Process.__init__(self)
    self.task_queue = task_queue
    self.result_queue = result_queue
  def run(self):
    proc_name = self.name
    while True:
      next_task = self.task_queue.get()
      if next_task is None:
        # Poison pill means shutdown
        print '%s: Exiting' % proc_name
        self.task_queue.task_done()
        break
      print '%s: %s' % (proc_name, next_task)
      answer = next_task()
      self.task_queue.task_done()
      self.result_queue.put(answer)
    return
class Task(object):
  def __init__(self, a, b):
    self.a = a
    self.b = b
  def __call__(self):
    time.sleep(0.1) # pretend to take some time to do the work
    return '%s * %s = %s' % (self.a, self.b, self.a * self.b)
  def __str__(self):
    return '%s * %s' % (self.a, self.b)
if __name__ == '__main__':
  # Establish communication queues
  tasks = multiprocessing.JoinableQueue()
  results = multiprocessing.Queue()
  # Start consumers
  num_consumers = multiprocessing.cpu_count() * 2
  print 'Creating %d consumers' % num_consumers
  consumers = [ Consumer(tasks, results)
         for i in xrange(num_consumers) ]
  for w in consumers:
    w.start()
  # Enqueue jobs
  num_jobs = 10
  for i in xrange(num_jobs):
    tasks.put(Task(i, i))
  # Add a poison pill for each consumer
  for i in xrange(num_consumers):
    tasks.put(None)
  # Wait for all of the tasks to finish
  tasks.join()
  # Start printing results
  while num_jobs:
    result = results.get()
    print 'Result:', result
    num_jobs -= 1

Event提供1种简单的点子,能够在经过间传递状态音信。事件能够切换设置和未安装情形。通过应用三个可选的超时值,时间对象的用户能够等待其情景未有安装变为设置。

经过间非能量信号传递:

import multiprocessing
import time
def wait_for_event(e):
  """Wait for the event to be set before doing anything"""
  print 'wait_for_event: starting'
  e.wait()
  print 'wait_for_event: e.is_set()->', e.is_set()
def wait_for_event_timeout(e, t):
  """Wait t seconds and then timeout"""
  print 'wait_for_event_timeout: starting'
  e.wait(t)
  print 'wait_for_event_timeout: e.is_set()->', e.is_set()
if __name__ == '__main__':
  e = multiprocessing.Event()
  w1 = multiprocessing.Process(name='block', 
                 target=wait_for_event,
                 args=(e,))
  w1.start()
  w2 = multiprocessing.Process(name='nonblock', 
                 target=wait_for_event_timeout, 
                 args=(e, 2))
  w2.start()
  print 'main: waiting before calling Event.set()'
  time.sleep(3)
  e.set()
  print 'main: event is set'

Python多进度,一般的景色是Queue来传递。

Queue:

from multiprocessing import Process, Queue
def f(q):
  q.put([42, None, 'hello'])
if __name__ == '__main__':
  q = Queue()
  p = Process(target=f, args=(q,))
  p.start()
  print q.get()  # prints "[42, None, 'hello']"
  p.join()

二10四线程优先队列Queue:

import Queue
import threading
import time
exitFlag = 0
class myThread (threading.Thread):
  def __init__(self, threadID, name, q):
    threading.Thread.__init__(self)
    self.threadID = threadID
    self.name = name
    self.q = q
  def run(self):
    print "Starting "   self.name
    process_data(self.name, self.q)
    print "Exiting "   self.name
def process_data(threadName, q):
  while not exitFlag:
    queueLock.acquire()
    if not workQueue.empty():
      data = q.get()
      queueLock.release()
      print "%s processing %s" % (threadName, data)
    else:
      queueLock.release()
    time.sleep(1)
threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]
queueLock = threading.Lock()
workQueue = Queue.Queue(10)
threads = []
threadID = 1
# Create new threads
for tName in threadList:
  thread = myThread(threadID, tName, workQueue)
  thread.start()
  threads.append(thread)
  threadID  = 1
# Fill the queue
queueLock.acquire()
for word in nameList:
  workQueue.put(word)
queueLock.release()
# Wait for queue to empty
while not workQueue.empty():
  pass
# Notify threads it's time to exit
exitFlag = 1
# Wait for all threads to complete
for t in threads:
  t.join()
print "Exiting Main Thread"

多进度使用Queue通讯的事例

import time
from multiprocessing import Process,Queue
MSG_QUEUE = Queue(5)
def startA(msgQueue):
  while True:
    if msgQueue.empty() > 0:
      print ('queue is empty %d' % (msgQueue.qsize()))
    else:
      msg = msgQueue.get()
      print( 'get msg %s' % (msg,))
    time.sleep(1)
def startB(msgQueue):
  while True:
    msgQueue.put('hello world')
    print( 'put hello world queue size is %d' % (msgQueue.qsize(),))
    time.sleep(3)
if __name__ == '__main__':
  processA = Process(target=startA,args=(MSG_QUEUE,))
  processB = Process(target=startB,args=(MSG_QUEUE,))
  processA.start()
  print( 'processA start..')

主进度定义了三个Queue类型的变量,并作为Process的args参数字传送给子进度processA和processB,多少个经过一个向队列中写多少,多少个读数据。

越多关于Python相关内容感兴趣的读者可查阅本站专项论题:《Python进度与线程操作技艺总结》、《Python Socket编制程序手艺总括》、《Python数据结构与算法教程》、《Python函数使用本领计算》、《Python字符串操作技艺汇总》、《Python入门与升级优异教程》及《Python文件与目录操作技术汇总》

期望本文所述对我们Python程序设计具有协理。

本文实例讲述了Python多进度multiprocessing用法。分享给我们供大家参照他事他说加以考察,具体如下: mutilprocess简单介绍 像...

import multiprocessing
import time

# 具体的处理函数,负责处理单个任务
def func(msg):
  # for i in range(3):
    print (msg)
    time.sleep(1)
    return "done "   msg
if __name__ == "__main__":
    # 进程池,创建多个进程,并行执行
    pool = multiprocessing.Pool(processes=4)
    # 把运行的结果添加到一个列表里,关注每个进程的执行结果
    result = []
    # 生产msg,并加入进程池
    for i in range(10):
        msg = "hello %d" %(i)
        # apply_async 它是非阻塞且支持结果返回进行回调
        result.append(pool.apply_async(func, (msg, )))
        # 关闭进程池,使其不在接受新的任务
    pool.close()
    # 主进程阻塞等待子进程的退出,join方法必须在close或terminate之后使用。
    pool.join()
    # 查看执行结果
    for res in result:
        print (res.get())
    print ("Sub-process(es) done.")

多进程 Multiprocessing 模块

multiprocessing包是Python中的多进度管理包。它与 threading.Thread类似,能够选择multiprocessing.Process对象来成立3个进程。该进度能够允许放在Python程序内部编写的函数中。该Process对象与Thread对象的用法一样,具备is_alive()、join([timeout])、run()、start()、terminate()等情势。属性有:authkey、daemon(要透过start()设置)、exitcode(进度在运转时为None、如若为–N,表示被实信号N甘休)、name、pid。其余multiprocessing包中也可能有Lock/伊夫nt/Semaphore/Condition类,用来一同进度,其用法也与threading包中的同名类同样。multiprocessing的十分大壹部份与threading使用同1套API,只然而换来了多进度的境地。

  

multiprocessing 模块官方表达文书档案

本条模块表示像线程同样管理进程,那个是multiprocessing的主干,它与threading很一般,对多核CPU的利用率会比threading好的多。

Process 类

看一下Process类的构造方法:

Process 类用来叙述1个历程对象。创设子进程的时候,只供给传入一个施行函数和函数的参数就可以成功 Process 示例的创始。

__init__(self, group=None, target=None, name=None, args=(), kwargs={})

star() 方法运维进程,
join() 方法落成进度间的一道,等待全体进程退出。
close() 用来阻拦多余的经过涌入进度池 Pool 形成进度阻塞。

参数表明:

multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

group:进度所属组。基本不用
target:表示调用对象。
args:表示调用对象的地方参数元组。
name:别名
kwargs:表示调用对象的字典。

target 是函数名字,供给调用的函数
args 函数需求的参数,以 tuple 的款型传播

创造进度的简要实例:

示例:

#coding=utf-8
import multiprocessing

def do(n) :
 #获取当前线程的名字
 name = multiprocessing.current_process().name
 print name,'starting'
 print "worker ", n
 return

if __name__ == '__main__' :
 numList = []
 for i in xrange(5) :
  p = multiprocessing.Process(target=do, args=(i,))
  numList.append(p)
  p.start()
  p.join()
  print "Process end."
import multiprocessing
import os
def run_proc(name):
  print('Child process {0} {1} Running '.format(name, os.getpid()))
if __name__ == '__main__':
  print('Parent process {0} is Running'.format(os.getpid()))
  for i in range(5):
    p = multiprocessing.Process(target=run_proc, args=(str(i),))
    print('process start')
    p.start()
  p.join()
  print('Process close')

进行结果:

结果:

Process-1 starting
worker 0
Process end.
Process-2 starting
worker 1
Process end.
Process-3 starting
worker 2
Process end.
Process-4 starting
worker 3
Process end.
Process-5 starting
worker 4
Process end.

Parent process 809 is Running
process start
process start
process start
process start
process start
Child process 0 810 Running
Child process 1 811 Running
Child process 2 812 Running
Child process 3 813 Running
Child process 4 814 Running
Process close

创设子进度时,只须要传入八个实行函数和函数的参数,创设1个Process实例,并用其start()方法运转,那样创造进度比fork()还要简单。

Pool

join()方法表示等待子进程甘休未来再持续往下运作,常常用于进程间的共同。

Pool 能够提供钦点数量的长河供用户接纳,暗许是 CPU 核数。当有新的请求提交到 Poll 的时候,假设池子未有满,会成立二个历程来实践,不然就能够让该请求等待。

注意:

- Pool 对象调用 join 方法会等待全部的子进度实践实现

在Windows上要想行使过程模块,就必须把有关进程的代码写在当前.py文件的if __name__ == ‘__main__' :语句的上面,本领健康使用Windows下的进程模块。Unix/Linux下则无需。

本文由韦德国际1946发布于韦德国际1946手机版,转载请注明出处:韦德国际1946手机版:模块实例详解,Python多进度

关键词: python 1946bv1946