本文共 6864 字,大约阅读时间需要 22 分钟。
前面学习了线程基本的概念和创建线程的两种方法,现在看看多线程如何处理竞争条件(racing condition)的问题,当多个线程同时执行的时候,怎么进行控制。
比如说,下面的例子中 我使用了第二种创建的方式,自定义一个类,继承Thread类,然后自定义run()来执行我的方法。在这个run方法里面,每次都对全局变量加1
import timeimport threadingsome_var = 0class IncrementThread(threading.Thread): def run(self): #we want to read a global variable #and then increment it global some_var read_value = some_var print ("some_var in %s is %d" % (self.name, read_value)) some_var = read_value + 1 print ("some_var in %s after increment is %d" % (self.name, some_var))def use_increment_thread(): threads = [] start=time.time() for i in range(5000): t = IncrementThread() threads.append(t) t.start() for t in threads: t.join() print("Total time %s"%(time.time()-start)) print ("After 5000 modifications, some_var should have become 5000") print ("After 5000 modifications, some_var is %d" % (some_var,))use_increment_thread()------------------Total time 1.7780036926269531After 5000 modifications, some_var should have become 5000After 5000 modifications, some_var is 4987
可以看见结果并不是5000,这是为啥呢? 如果查看过程,会发现有些线程刚刚获取了一个值,还未来得及处理,执行的权力就转交给了另外一个线程,这样就导致计数错误。为了确保每一个线程都成功的执行了他应该执行的代码,我们可以加一把锁。
some_var in Thread-1524 is 1523some_var in Thread-1524 after increment is 1524some_var in Thread-1525 is 1524some_var in Thread-1526 is 1524some_var in Thread-1526 after increment is 1525some_var in Thread-1527 is 1525
some_var = 0lock=threading.Lock()class IncrementThread(threading.Thread): def run(self): #we want to read a global variable #and then increment it global some_var lock.acquire() read_value = some_var print ("some_var in %s is %d" % (self.name, read_value)) some_var = read_value + 1 print ("some_var in %s after increment is %d" % (self.name, some_var)) lock.release()def use_increment_thread(): threads = [] start=time.time() for i in range(5000): t = IncrementThread() threads.append(t) t.start() for t in threads: t.join() print("Total time %s"%(time.time()-start)) print ("After 5000 modifications, some_var should have become 5000") print ("After 5000 modifications, some_var is %d" % (some_var,))use_increment_thread()---------------Total time 1.6369926929473877After 5000 modifications, some_var should have become 5000After 5000 modifications, some_var is 5000
-*- coding:utf-8 -*-import threadingimport timeNUM = 10def func(i,l): global NUM # 上锁 l.acquire() # 30,5 25m5,20 NUM -= 1 time.sleep(2) print(NUM,i) # 开锁 l.release()# lock = threading.Lock()# lock = threading.RLock()lock = threading.BoundedSemaphore(5)for i in range(30): t = threading.Thread(target=func,args=(i,lock,)) t.start()
#!/usr/bin/env python# -*- coding:utf-8 -*-# Author:Alex Liimport threadingdef func(i,e): print(i) e.wait() # 检测是什么等,如果是红灯,停;绿灯,行 print(i+100)event = threading.Event() #初始化,flag设置为False(红灯)for i in range(10): t = threading.Thread(target=func, args=(i,event,)) t.start()#========# event.clear() # 设置成红灯,可以不写,因为初始化已经实现了inp = input('>>>')if inp == "1": event.set() # 设置成绿灯-----------------0123456789>>>1100104103105107109102106101108
import threadingclass t1(threading.Thread): def __init__(self,i,con): self.i=i self.con=con super(t1,self).__init__() def run(self): print(self.i) self.con.acquire() self.con.wait() print(self.i+100) self.con.release()c=threading.Condition()def test(con): for i in range(10): t=t1(i,con) t.start() while True: inp=input('>>>') if inp=='q': break con.acquire() con.notify(int(inp)) con.release()test(c)--------------0123456789>>>2>>>1001013>>>1021031044>>>105107108106
Traceback (most recent call last): File "C:/Users/yli/Documents/Tencent Files/38144205/FileRecv/FileRecv/day11/s6.py", line 56, intest(c) File "C:/Users/yli/Documents/Tencent Files/38144205/FileRecv/FileRecv/day11/s6.py", line 53, in test con.notify(int(inp)) File "C:\Program Files\Python3\lib\threading.py", line 343, in notify raise RuntimeError("cannot notify on un-acquired lock")RuntimeError: cannot notify on un-acquired lock
def notify(self, n=1): """Wake up one or more threads waiting on this condition, if any. If the calling thread has not acquired the lock when this method is called, a RuntimeError is raised. This method wakes up at most n of the threads waiting for the condition variable; it is a no-op if no threads are waiting. """ if not self._is_owned(): raise RuntimeError("cannot notify on un-acquired lock") all_waiters = self._waiters waiters_to_notify = _deque(_islice(all_waiters, n))
import threadingdef condition(): ret = False r = input('>>>') if r == 'true': ret = True else: ret = False return retdef func(i,con): print(i) con.acquire() con.wait_for(condition) print(i+100) con.release()c = threading.Condition()for i in range(10): t = threading.Thread(target=func, args=(i,c,)) t.start()---------------"C:\Program Files\Python3\python.exe" "C:/Users/yli/Documents/Tencent Files/38144205/FileRecv/FileRecv/day11/s7.py"0>>>123456789true100>>>true101>>>ksdf>>>true103>>>1>>>1>>>>>>
class Event: """Class implementing event objects. Events manage a flag that can be set to true with the set() method and reset to false with the clear() method. The wait() method blocks until the flag is true. The flag is initially false. """ # After Tim Peters' event class (without is_posted()) def __init__(self): self._cond = Condition(Lock()) self._flag = False
def set(self): """Set the internal flag to true. All threads waiting for it to become true are awakened. Threads that call wait() once the flag is true will not block at all. """ with self._cond: self._flag = True self._cond.notify_all()