多线程
python提供了thread、threading和Queue模块来支持多线程编程
thread只支持基本的功能,不推荐使用,threading是更加全面,更高级别的模块,
使用Queue可以创建一个队列数据结构,用于在多线程之间进行共享
threading
模块
对象 | 描述 |
---|---|
Thread | 表示一个执行线程的对象 |
Lock | 锁原语对象 |
RLock | 可重入锁对象,使单一线程可以(在此)获得已持有的锁(递归锁) |
Condition | 条件变量对象 |
Event | 条件变量的通用版本,等待的条件满足后唤醒所有等待线程 |
Semaphone | 为线程间共享的有限资源提供了一个“计数器”,没有可用资源时阻塞 |
BoundeSemaphone | 与Semaphon类似,不过不允许超过初始值 |
Timer | 定时器,运行前会等待一段时间 |
Barrier | 创建一个“障碍”,必须达到指定数量的线程后才可以继续(PY3.2引入) |
属性和方法
属性 | 描述 |
---|---|
name | 线程名 |
ident | 线程标识符 |
daemon | bool值,表示这个线程是否是守护线程 |
__init__(group=None, tatget=None, name=None, args=(),kwargs ={}, verbose=None, daemon=None) | 实例化一个线程对象,需要有一个可调用的 target,以及其参数 args或 kwargs。还可以传递 name 或 group 参数,不过后者还未实现。此外, verbose 标 志 也 是 可 接 受 的。 而 daemon 的 值 将 会 设定thread.daemon 属性/标志 |
start() | 开启线程,自动调用run()方法执行线程代码 |
run() | 定义线程功能的方法,用来实现线程的功能与业务逻辑,一般在子类中重写 |
join(timeout = None) | 等待线程结束后或者超时返回 |
is_alive() | bool值,表示这个线程是否还存活 |
active_count() | 当前活动的Thread对象个数 |
current_thread() | 返回当前的 Thread对象 |
enumerate() | 返回当前活动的Thread对象列表 |
设置守护线程daemon属性:
1.daemon=True时主线程结束时不对该子线程进行检查而直接退出,同时所有为True的子线程无论是否运行完成都将随主线程一起结束。
2.daemon=False时主线程结束时会检查该子线程是否运行完成,若该子线程未完成,则主线程会等该子线程运行完成后再退出
多线程时,由于daemon的默认值为False,主线程执行完自己的任务以后会检查子线程是否运行完成,若子线程未完成,则主线程会等该子线程运行完成后再退出。如果需要修改daemon的值,则必须再调用start()方法启动线程之前修改
创建线程
使用Thread类创建线程的方法:
1.创建一个Thead实例, 传给他一个函数
2. 继承Thread类并在子类中重写__init__()和run()方法
3. 创建Thread实例, 传给他一个可调用的类实例 (一般不用)
例一:
#!/usr/bin/python3
from threading import Thread
def func(name):
print(name)
t = Thread(target = func, args = ("hello world",)) #args的传参须为列表
t.start()
t.join()
互斥锁
Threading.Lock
方法:
- acquire()
获得锁。该方法等待锁被解锁,将其设置为locked并返回True。 - release() 释放锁。当锁被锁定时,将其重置为解锁并返回。如果锁未锁定,则会引发RuntimeError。
- locked() 如果锁被锁定,返回True
例二+互斥锁:
from threading import Thread, Lock
exitFlag = 0
lock = Lock()
class myThread(Thread):
def __init__(self, ID, name, counter):
super().__init__() #调用父类__init__
self.id = ID
self.name = name # 设置name属性
self.counter = counter
# self.daemon = True / False 设置是否为守护线程
def run(self):
print("开始线程:" + self.name)
lock.acquire() #加锁
print_time(self.name, self.counter, 3)
lock.release() #释放锁
print("Done")
def print_time(name, delay, counter):
while counter:
if exitFlag:
print("shutdone")
name.exit()
time.sleep(delay)
print("%s : %s " % (name, time.ctime(time.time())))
counter -= 1
thread_1 = myThread(3, "lzj", 3)
thread_2 = myThread(6, "wrh", 2)
thread_1.start()
thread_2.start()
thread_1.join()
thread_2.join()
条件变量
此段来自链接
threading.Condition
方法:
-
acquire() — 线程锁,注意线程条件变量Condition中的所有相关函数使用必须在acquire() /release() 内部操作;
-
release() — 释放锁,注意线程条件变量Condition中的所有相关函数使用必须在acquire() /release() 内部操作;
-
wait(timeout) — 线程挂起(阻塞状态),直到收到一个notify通知或者超时才会被唤醒继续运行(超时参数默认不设置,可选填,类型是浮点数,单位是秒)。wait()必须在已获得Lock前提下才能调用,否则会触发RuntimeError;
-
notify(n=1) — 通知其他线程,那些挂起的线程接到这个通知之后会开始运行,缺省参数,默认是通知一个正等待通知的线程,最多则唤醒n个等待的线程。notify()必须在已获得Lock前提下才能调用,否则会触发RuntimeError,notify()不会主动释放Lock;
-
notifyAll() — 如果wait状态线程比较多,notifyAll的作用就是通知所有线程
#! /usr/bin/python3
# 导入线程模块
import threading
import time
# 创建条件变量condition
con = threading.Condition()
meat_num = 0
def thread_consumers():
# 条件变量condition 线程上锁
con.acquire()
# 全局变量声明关键字 global
global meat_num
meat_num = 0
# 等待肉片下锅煮熟
con.wait()
while True:
print("我来一块肉片...")
meat_num -= 1
print("剩余肉片数量:%d"%meat_num)
time.sleep(0.5)
if meat_num == 0:
# 肉片吃光了,通知老板添加肉片
print("老板,再来一份老肉片...")
con.notify()
# 肉片吃光了,等待肉片
con.wait()
# 条件变量condition 线程释放锁
con.release()
def thread_producer():
# 条件变量condition 线程上锁
con.acquire()
# 全局变量声明关键字 global
global meat_num
# 肉片熟了,可以开始吃了
meat_num = 10
print("肉片熟了,可以开始吃了...")
con.notify()
while True:
# 阻塞函数,等待肉片吃完的通知
con.wait()
meat_num = 10
# 添加肉片完成,可以继续开吃
print("添加肉片成功!当前肉片数量:%d"%meat_num)
time.sleep(1)
con.notify()
con.release()
if __name__ == "__main__":
# 创建并初始化线程
t1 = threading.Thread(target=thread_producer)
t2 = threading.Thread(target=thread_consumers)
# 启动线程 -- 注意线程启动顺序,启动顺序很重要
t2.start()
t1.start()
# 阻塞主线程,等待子线程结束
t1.join()
t2.join()
print("程序结束!")
Queue
来自菜鸟
Queue
模块是一个提供了同步的、线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列 PriorityQueue
这些队列都实现了锁原语,能够在多线程中直接使用,可以使用队列来实现线程间的同步。
- Queue 模块中的常用方法:
- Queue.qsize() 返回队列的大小
- Queue.empty() 如果队列为空,返回True,反之False
- Queue.full() 如果队列满了,返回True,反之False
- Queue.full 与 maxsize 大小对应
- Queue.get([block[, timeout]]) 获取队列,timeout等待时间
- Queue.get_nowait() 相当Queue.get(False)
- Queue.put(item) 写入队列,timeout等待时间
- Queue.put_nowait(item) 相当Queue.put(item, False)
- Queue.task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号
- Queue.join() 实际上意味着等到队列为空,再执行别的操作
#!/usr/bin/python3
import queue #引入queue模块
import threading
import time
exitFlag = 0
class myThread (threading.Thread):
def __init__(self, threadID, name, q):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.q = q
def run(self):
print ("开启线程:" + self.name)
process_data(self.name, self.q)
print ("退出线程:" + self.name)
def process_data(threadName, q):
while not exitFlag:
queueLock.acquire() #互斥锁加锁,取得任务的
if not workQueue.empty():
data = q.get()
queueLock.release() #解锁
print ("%s processing %s" % (threadName, data))
else:
queueLock.release()
time.sleep(1)
threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]
queueLock = threading.Lock() #创建互斥锁和队列对象
workQueue = queue.Queue(10)
threads = []
threadID = 1
# 创建新线程
for tName in threadList:
thread = myThread(threadID, tName, workQueue)
thread.start()
threads.append(thread)
threadID += 1
# 加锁,填充队列
queueLock.acquire()
for word in nameList:
workQueue.put(word)
queueLock.release()
# 等待队列清空
while not workQueue.empty():
pass
# 通知线程是时候退出
exitFlag = 1
# 等待所有线程完成
for t in threads:
t.join()
print ("退出主线程")
添加、取出元素要加锁