博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Python 学习笔记 - 线程(线程锁,信标,事件和条件)
阅读量:6881 次
发布时间:2019-06-27

本文共 6864 字,大约阅读时间需要 22 分钟。

前面学习了线程基本的概念和创建线程的两种方法,现在看看多线程如何处理竞争条件(racing condition)的问题,当多个线程同时执行的时候,怎么进行控制。

比如说,下面的例子中 我使用了第二种创建的方式,自定义一个类,继承Thread类,然后自定义run()来执行我的方法。在这个run方法里面,每次都对全局变量加1

在主线程里面,他调用一个自己定义的函数,在这个函数里面创建了5000个线程;每个线程都加入一个列表,然后对每个对象都使用join,这是确保主线程等着直到所有子线程完成。最后输出结果

import timeimport threadingsome_var = 0class IncrementThread(threading.Thread):    def run(self):        #we want to read a global variable        #and then increment it        global some_var        read_value = some_var        print ("some_var in %s is %d" % (self.name, read_value))        some_var = read_value + 1        print ("some_var in %s after increment is %d" % (self.name, some_var))def use_increment_thread():    threads = []    start=time.time()    for i in range(5000):        t = IncrementThread()        threads.append(t)        t.start()    for t in threads:        t.join()    print("Total time %s"%(time.time()-start))    print ("After 5000 modifications, some_var should have become 5000")    print ("After 5000 modifications, some_var is %d" % (some_var,))use_increment_thread()------------------Total time 1.7780036926269531After 5000 modifications, some_var should have become 5000After 5000 modifications, some_var is 4987

可以看见结果并不是5000,这是为啥呢? 如果查看过程,会发现有些线程刚刚获取了一个值,还未来得及处理,执行的权力就转交给了另外一个线程,这样就导致计数错误。为了确保每一个线程都成功的执行了他应该执行的代码,我们可以加一把锁。

some_var in Thread-1524 is 1523some_var in Thread-1524 after increment is 1524some_var in Thread-1525 is 1524some_var in Thread-1526 is 1524some_var in Thread-1526 after increment is 1525some_var in Thread-1527 is 1525

下面是修订过的代码,通过使用Lock()函数,我们在执行代码前acquire(),之后release(),在当前线程完成这段代码之前,其他的线程不可以执行相同的操作。

some_var = 0lock=threading.Lock()class IncrementThread(threading.Thread):    def run(self):        #we want to read a global variable        #and then increment it        global some_var        lock.acquire()        read_value = some_var        print ("some_var in %s is %d" % (self.name, read_value))        some_var = read_value + 1        print ("some_var in %s after increment is %d" % (self.name, some_var))        lock.release()def use_increment_thread():    threads = []    start=time.time()    for i in range(5000):        t = IncrementThread()        threads.append(t)        t.start()    for t in threads:        t.join()    print("Total time %s"%(time.time()-start))    print ("After 5000 modifications, some_var should have become 5000")    print ("After 5000 modifications, some_var is %d" % (some_var,))use_increment_thread()---------------Total time 1.6369926929473877After 5000 modifications, some_var should have become 5000After 5000 modifications, some_var is 5000

线程锁,除了上面的Lock()之外,还有一些常用的,比如

Rlock(),允许多重嵌套锁,而Lock()只能锁一次;

还有一个常见的是BoundedSemaphore(信标),可以指定一次锁几个

例如,我可以指定一次放行5个,30个线程分6次出来

-*- coding:utf-8 -*-import threadingimport timeNUM = 10def func(i,l):    global NUM    # 上锁    l.acquire() # 30,5 25m5,20    NUM -= 1    time.sleep(2)    print(NUM,i)    # 开锁    l.release()# lock = threading.Lock()# lock = threading.RLock()lock = threading.BoundedSemaphore(5)for i in range(30):    t = threading.Thread(target=func,args=(i,lock,))    t.start()

还有一种放行的方式叫做Event(),他是统一的放行或者堵塞。

工作方式是通过一个flag的值,set()设置为True,clear()设置为False。如果flag为False,wait()则会堵塞。初始化的时候flag默认为False,即堵塞

#!/usr/bin/env python# -*- coding:utf-8 -*-# Author:Alex Liimport threadingdef func(i,e):    print(i)    e.wait() # 检测是什么等,如果是红灯,停;绿灯,行    print(i+100)event = threading.Event() #初始化,flag设置为False(红灯)for i in range(10):    t = threading.Thread(target=func, args=(i,event,))    t.start()#========# event.clear() # 设置成红灯,可以不写,因为初始化已经实现了inp = input('>>>')if inp == "1":    event.set() # 设置成绿灯-----------------0123456789>>>1100104103105107109102106101108

最后我们来看看condition(条件),我们可以灵活的设置一次放行1个或者多个线程。这些线程都hang住,直到收到notify(通知)才放行

import threadingclass t1(threading.Thread):    def __init__(self,i,con):        self.i=i        self.con=con        super(t1,self).__init__()    def run(self):        print(self.i)        self.con.acquire()        self.con.wait()        print(self.i+100)        self.con.release()c=threading.Condition()def test(con):    for i in range(10):        t=t1(i,con)        t.start()    while True:        inp=input('>>>')        if inp=='q':            break        con.acquire()        con.notify(int(inp))        con.release()test(c)--------------0123456789>>>2>>>1001013>>>1021031044>>>105107108106

可以看见上面的代码里面,在wait()和notify()的前后都上了锁,这个锁是初始化的时候自动创建的。如果我们把他去掉,他会直接抛出异常

Traceback (most recent call last):  File "C:/Users/yli/Documents/Tencent Files/38144205/FileRecv/FileRecv/day11/s6.py", line 56, in 
    test(c)  File "C:/Users/yli/Documents/Tencent Files/38144205/FileRecv/FileRecv/day11/s6.py", line 53, in test    con.notify(int(inp))  File "C:\Program Files\Python3\lib\threading.py", line 343, in notify    raise RuntimeError("cannot notify on un-acquired lock")RuntimeError: cannot notify on un-acquired lock

看看源码,他的确是强调只能对上锁的线程进行操作

    def notify(self, n=1):        """Wake up one or more threads waiting on this condition, if any.        If the calling thread has not acquired the lock when this method is        called, a RuntimeError is raised.        This method wakes up at most n of the threads waiting for the condition        variable; it is a no-op if no threads are waiting.        """        if not self._is_owned():            raise RuntimeError("cannot notify on un-acquired lock")        all_waiters = self._waiters        waiters_to_notify = _deque(_islice(all_waiters, n))

conditon还有一种写法是wait_for,他后面参数需要传入一个函数的名字,然后他会内部调用这个函数,如果返回值为真,那么就继续,否则就等着

import threadingdef condition():    ret = False    r = input('>>>')    if r == 'true':        ret = True    else:        ret = False    return retdef func(i,con):    print(i)    con.acquire()    con.wait_for(condition)    print(i+100)    con.release()c = threading.Condition()for i in range(10):    t = threading.Thread(target=func, args=(i,c,))    t.start()---------------"C:\Program Files\Python3\python.exe" "C:/Users/yli/Documents/Tencent Files/38144205/FileRecv/FileRecv/day11/s7.py"0>>>123456789true100>>>true101>>>ksdf>>>true103>>>1>>>1>>>>>>

当我们学完conditon之后,如果回头看前面event()的源码,会发现他本质就是调用的condition,当他放行的时候,他直接放行了所有的线程;因此Event的效果是要么全部停,要么全部开通

class Event:    """Class implementing event objects.    Events manage a flag that can be set to true with the set() method and reset    to false with the clear() method. The wait() method blocks until the flag is    true.  The flag is initially false.    """    # After Tim Peters' event class (without is_posted())    def __init__(self):        self._cond = Condition(Lock())        self._flag = False
   def set(self):        """Set the internal flag to true.        All threads waiting for it to become true are awakened. Threads        that call wait() once the flag is true will not block at all.        """        with self._cond:            self._flag = True            self._cond.notify_all()

转载地址:http://rxjbl.baihongyu.com/

你可能感兴趣的文章
音视频性能指标介绍
查看>>
实战ISA2004+三层交换机实现多VLAN互通(20130327修正部分错误)
查看>>
AS3.0中的显示编程(二)-- DisplayObject类
查看>>
SOA也是一种设计模式
查看>>
当飞鸽传书无法找到别人时… …
查看>>
GPT分区体系
查看>>
搭建nginx服务器nginx-1.6.2.tar.gz
查看>>
如何备份\恢复MDaemon的BES数据库
查看>>
GoldenGate for mysql to mysql:单向同步
查看>>
网路游侠:免费网络和主机漏洞评估程序Nessus 4.2.0安装试用
查看>>
MySQL外键陷阱
查看>>
Struts Spring Hibernate 整合:SSH2
查看>>
网络设备主备配置系列2:netscreen防火墙双机主备
查看>>
ORACLE RAC 之I/O分离--hangcheck-timer模块配置
查看>>
有用的runas命令,尤其对域用户环境
查看>>
Oracle 备份与恢复学习笔记(11)
查看>>
Windows Home Server 是什么?
查看>>
VC++动态链接库(DLL)编程深入浅出(一)
查看>>
内存技术术语不完全手册
查看>>
20个Linux命令及Linux终端的趣事
查看>>