python线程池并发_编译python源码

python线程池并发_编译python源码文章目录线程介绍代码实现线程threading模块threading.Thread1.创建线程2.线程实现TCP服务端的并发3.线程join方法4.线程之间共享5.线程对象属性和方法守护线程GIL全局解释器锁线程介绍

1. 线程介绍

1. 在前面了解了进程的概念,简单来说进程就是在内存中申请了一块内存空间,其实还有一个线程的概念,
线程包含在进程之中,是代码真正的执行者。也就是说进程其实是一个资源单位,而线程是执行单位。
2. 线程是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以存在多个
线程,每条线程并行执行不同的任务。可以想象成进程是一个车间,线程就是车间里的流水线。
为什么还要划分线程?

因为开设线程的消耗远远小于进程

  	开进程的流程:
    	1.申请内存空间
      	2.拷贝代码
   	
   	而开线程,无需申请内存空间,无需拷贝代码
   	

在这里插入图片描述

线程还可以分为两类:用户级线程(User-Level Thread)和内核级线程(Kernel-Level Thread),后者又
称为内核支持的线程或轻量级进程。在多线程操作系统中,各个系统的实现方式并不相同,在有的系统中实现
了用户级线程,有的系统中实现了内核级线程。

用户级线程:内核的切换由用户态程序自己控制内核切换,不需要内核干涉,少了进出内核态的消耗,但不能很
好的利用多核Cpu

内核级线程:切换由内核控制,当线程进行切换的时候,由用户态转化为内核态。切换完毕要从内核态返回用户
态;可以很好的利用smp,即利用多核cpu。类似于 windows线程。

在这里插入图片描述

2. 代码实现线程

线程的操作代码和进程类似,需要使用的是 threading 模块该模块相当于 multiprocessing 的作用,

threading 模块

threading.Thread

操作线程需要使用的是 threading 模块下的 Thread ,使用的方法和 Process 类似。

1. 创建线程
创建线程的方式也是有俩种,通过指定函数或者继承类的方式。
和创建进程不同的是,创建线程代码不需要在 __main__ 方法下。因为新线程不需要复制代码。

在这里插入图片描述
在这里插入图片描述

指定函数方式

代码示例一

		from threading import Thread
		
		def run(username): 
		    print(f'{ 
     username} is running')
		
		if __name__ == '__main__':  # 不需要在 __main__ 下写,可以保持习惯写下。
		    t = Thread(target=run, args=('XWenXiang', ))
		    t.start()
			print('主线程')

输出结果
		XWenXiang is running
		主线程


1. 同样是指定函数传入参数的形式。生成线程对象后调用 start()方法启动。
2. 但是此时发现主线程中打印的话在最后面执行了,其实是因为创建线程的开销极小,几乎是一瞬间就可以
创建,也就是执行的速度很快。

代码示例二

		from threading import Thread
		import time
		
		def run(username):
		    time.sleep(3)
		    print(f'{ 
     username} is running')
		
		if __name__ == '__main__':  # 不需要在 __main__ 下写,可以保持习惯写下。
		    t = Thread(target=run, args=('XWenXiang',))
		    t.start()
		    print('主线程')


1. 我们可以使用 time 模块增加子线程的运行时间,这样主线程的代码会被优先执行。

''' 虽然主线程的代码先执行完了,但是并不会完全结束,因为主线程结束也就标志着整个进程的结束,要确 保子线程运行过程中所需的各项资源 '''

继承类方法

代码示例

		from threading import Thread
		
		class MyThread(Thread):
		    def __init__(self, name):  
		        super().__init__()
		        self.name = name
		    def run(self):
		        print(f'{ 
     self.name} is running')
		
		if __name__ == '__main__':  # 可以不写。
		    t = MyThread('XWenXiang')
		    t.start()

输出结果
		XWenXiang is running

1. 同样的继承 Thread类,并定义 run() 方法,该方法也会自动被 target 指定执行。并通过__init__
来传参。

在这里插入图片描述

2. 线程实现TCP服务端的并发

我们可以用多线程简单模拟一下并发操作

服务端代码

		from threading import Thread
		import socket
		
		# 如果是进程的方式这些代码要放在 __main__ 下
		s = socket.socket()  
		s.bind(('127.0.0.1', 8888))
		s.listen(5)
		
		def run(sock):
		    while True:
		        res = sock.recv(1024)
		        print(res.decode('utf8'))
		        sock.send('我是服务端'.encode('utf8'))
		
		while True:
		    sock, addr = s.accept()
		    t = Thread(target=run, args=(sock,))
		    t.start()


客户端代码

		import socket
		
		c = socket.socket()
		c.connect(('127.0.0.1', 8888))
		
		while True:
		    info = input('>>> ').strip()
		    c.send(info.encode('utf8'))
		    res = c.recv(1024)
		    print(res.decode('utf8'))


3. 线程 join 方法

前面只不过是因为子线程比较快才会先执行,其实主线程和子线程是异步的,可以通过 time 模块的time.sleep 方法来更好的看到。那么将异步改成同步也是使用 join 方法,它可以让主线程等待子线程结束。

代码示例

		from threading import Thread
		import time
		
		def run(username):
		    time.sleep(3)
		    print(f'{ 
     username} is running')
		
		if __name__ == '__main__':  # 不需要在 __main__ 下写,可以保持习惯写下。
		    t = Thread(target=run, args=('XWenXiang',))
		    t.start()
		    t.join()
		    print('主线程')

输出结果
		XWenXiang is running
		主线程


1. 此时主线程会等待子线程结束后在执行其他代码,所以子线程的代码先打印出来。

在这里插入图片描述

4. 线程之间共享

进程与进程之间默认隔离,而线程与线程之间数据是共享的,因为创建了新线程也是在同一个进程里面。

代码示例

		from threading import Thread
		
		username = 'XWenXiang'
		def run():
		    global username
		    username = 'XXX'
		    print(f'{ 
     username} is running')
		
		t = Thread(target=run)
		t.start()
		t.join()
		print('主线程')
		print(username)


输出结果
		XXX is running
		主线程
		XXX


1. 此时的变量已经被修改了,证明了进程之间数据是共享的。

5. 线程对象属性和方法

验证线程是否处于一个进程

代码示例

		from threading import Thread
		import os
		
		def run():
		    print(os.getpid())  # 获取子线程所在的进程号
		
		t = Thread(target=run)
		t.start()
		t.join()
		print(os.getpid())  # 获取主进程所在的进程号


1. 通过 os 模块分别在子线程和主线程中获取进程号,发现他们是一样的,验证了他们确实是在同一个进程

统计进程下活跃的线程数
使用的是模块 threading 里的方法 active_count,需要导入

代码示例
		
		from threading import Thread, active_count
		import time
		
		def run(username):
		    time.sleep(2)
		    print(f'{ 
     username} is running')
		
		t = Thread(target=run, args=('XWenXiang',))
		t.start()
		print(active_count())
		print('主线程')


输出结果
		2
		主线程
		XWenXiang is running


1. 由于线程速度比较快,使用time延迟几秒。
2. 导入方法执行,此时统计的数量是 2 个,因为主线程也被包含在内了。

获取线程的名字
获取线程的名字可以使用方法 current_thread() ,或者用类的方式取出 self.name

代码示例(current_thread()方法)

		from threading import Thread, current_thread
		import time
		
		def run():
		    time.sleep(1)
		    print(current_thread().name)
		
		t = Thread(target=run)
		t.start()
		print('主线程', current_thread().name)

输出结果
		主线程 MainThread
		Thread-1 (run)
		

1. 主线程和子线程的名称是不一样的
代码示例(self.name)

		from threading import Thread
		
		class MyThread(Thread):
		    def run(self):
		        print(self.name)
		
		if __name__ == '__main__':  # 可以不写。
		    t = MyThread()
		    t.start()

输出结果
		Thread-1


判断线程是否存活

使用的方法是 is_alive()

代码示例

		from threading import Thread
		import time
		
		def run(name):
		    print(f'{ 
     name} is running')
		    time.sleep(2)
		    print(f'{ 
     name} is over')
		
		t = Thread(target=run, args=('XWenXiang',))
		t.start()
		print(t.is_alive())  # 判断线程是否存活
		print('主线程')

输出结果
		XWenXiang is running
		True
		主线程
		XWenXiang is over

在这里插入图片描述

6. 守护线程

和守护进程相似,守护线程就是会随着主线程的结束而结束

代码示例

		from threading import Thread
		import time
		
		def run(name):
		    print(f'{ 
     name} is running')
		    time.sleep(3)
		    print(f'{ 
     name} is over')		
		
		t = Thread(target=run, args=('XWenXiang',))
		t.daemon = True
		t.start()
		print('主线程')


输出结果
		XWenXiang is running
		主线程


1. 将子线程设成守护线程后,不管子线程里的代码需要运行多少时间,都会随着主进程的结束而结束,且此
时主线程结束会将子线程进行回收。所以示例中子线程只执行了一部分。

1. 主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束

2. 主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。

也就是说,主线程在有非守护线程存在的情况下,设置守护线程作用不大

代码示例

		from threading import Thread
		import time
		
		def run(name):
		    print(f'{ 
     name} is running')
		    time.sleep(1)
		    print(f'{ 
     name} is over')
		def run1(name):
		    print(f'{ 
     name} is running')
		    time.sleep(2)
		    print(f'{ 
     name} is over')
		
		t = Thread(target=run, args=('XWenXiang',))
		t1 = Thread(target=run1, args=('XXX',))
		t.daemon = True
		t.start()
		t1.start()
		print('主线程')


输出结果
		XWenXiang is running
		XXX is running
		主线程
		XWenXiang is over
		XXX is over


1. 此时除了守护线程外还有一个线程,当主线程代码执行完不会马上结束,会等待子线程结束并回收。和不
设置守护线程效果一样。

在这里插入图片描述

3. GIL全局解释器锁

GIL全局解释器锁在官方文档中是这么描述的:

In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple
native threads from executing Python bytecodes at once. This lock is necessary mainly 
because CPython’s memory management is not thread-safe. (However, since the GIL exists,
other features have grown to depend on the guarantees that it enforces.)
理解:
		首先要知道 python 解释器的类别有很多,例如 Cpython、Jpython、Ppython		
		GIL只存在于CPython解释器中,GIL是一把互斥锁,用于阻止同一个进程下的多个线程同时执行,
	也就是说同一时间只执行一个线程,不能利用多核优势。这是因为CPython解释器中的垃圾回收机制不是
	线程安全的,垃圾回收机制代码在进程中也是以线程存在。
		我们可以反推一下,如果可以多个线程同时进行,有可能会出现在变量名都还没有赋值上就被垃圾
	回收机制给清除了,也就是说会产生垃圾回收机制与正常线程之间数据错乱。所以我们需要一把锁进行
	限制。
		GIL是一把全局解释器锁,因为执行代码都需要用到解释器,加上锁之后线程就需要通过抢锁来得到
	解释器资源,等到运行结束后放锁让下一个线程抢锁。所以一次只能运行一个线程。

在这里插入图片描述


   所有的解释型语言都无法做到同一个进程下多个线程利用多核优势
   如果你想让你的应用更好地利用多核心计算机的计算资源,
   推荐你使用 multiprocessing 或 concurrent.futures.ProcessPoolExecutor。 
   但是,如果你想要同时运行多个 I/O 密集型任务,则多线程仍然是一个合适的模型(使用多道技术)。
   

4. 死锁现象

# 锁就算掌握了如何抢 如何放 也会产生死锁现象
代码示例
		
		from threading import Thread, Lock
		import time
		
		mutexA = Lock()
		mutexB = Lock()
		
		class MyThread(Thread):
		    def run(self):
		        self.f1()
		        self.f2()
		        
		    def f1(self):
		        mutexA.acquire()
		        print(f'{ 
     self.name}抢到了A锁')
		        mutexB.acquire()
		        print(f'{ 
     self.name}抢到了B锁')
		        mutexB.release()
		        mutexA.release()
		
		    def f2(self):
		        mutexB.acquire()
		        print(f'{ 
     self.name}抢到了B锁')
		        time.sleep(2)
		        mutexA.acquire()
		        print(f'{ 
     self.name}抢到了A锁')
		        mutexA.release()
		        mutexB.release()
		
		for i in range(20):
		    t = MyThread()
		    t.start()
		
		
		"""锁不能轻易使用并且以后我们也不会在自己去处理锁都是用别人封装的工具"""
		

5. 信号量

如果说互斥锁是一个房间上了锁只能一个人使用,那么信号量就是同时有多个房间上锁同时容纳多个人。在并发编程中信号量意思是多把互斥锁,使用信号量需要导入 threading 模块下的 Semaphore

代码示例

		from threading import Thread, Semaphore
		import time
		import random
		
		sp = Semaphore(5)  # 创建一个有五个带锁的房间
		
		def task(name):
		    sp.acquire()  # 抢锁
		    print('%s正在吃饭' % name)
		    time.sleep(random.randint(1, 2))
		    sp.release()  # 放锁
		
		for i in range(1, 21):
		    t = Thread(target=task, args=('老八%s号' % i,))
		    t.start()


1. 此时相当于可以允许多个线程抢锁,刚开始会执行5个打印语句。

6. event事件

前面说守护线程会随着主线程结束而结束,受主线程影响,其实线程之间也可以互相影响。也就是子线程的运行可以由其他子线程决定。需要使用 threading 模块下的 Event,这是一个实现事件对象的类。事件对象管理一个内部标识。

使用的主要方法:

'set()':  将内部标识设置为 true 。所有正在等待这个事件的线程将被唤醒。当标识为 true 时,调用
	wait() 方法的线程不会被被阻塞。
	
'wait()': 阻塞线程直到内部变量为 true 。如果调用时内部标识为 true,将立即返回。否则将阻塞线程

代码示例

		from threading import Thread, Event
		import time
		
		event = Event()
		
		def light():
		    print('红灯亮起')
		    time.sleep(3)
		    print('绿灯亮起')
		    event.set()
		
		def car(name):
		    print('%s 正在等红灯' % name)
		    event.wait()
		    print('%s 开始启动' % name)
		
		t = Thread(target=light)
		t.start()
		for i in range(10):
		    t = Thread(target=car, args=('汽车%s' % i,))
		    t.start()


1. 首先创建一个用于指示的对象 exent ,在定义 light 和 car 函数。
2. 在 car 函数中添加 event.wait() 让程序阻塞,在 light 函数中调用 event.set() ,这个的作用
是将阻塞的程序唤起。
3. 首先创建指定 light 函数的子线程并运行,在示例中相当于创建了一个红绿灯。
4. 在使用 for 循环创建多个 car 子线程并启动,此时由于被阻塞就像多个车子等待红灯,等到子进程中的
event.set() 执行就结束阻塞。

在这里插入图片描述

7. 进程池与线程池

在前面使用线程实现服务端并发中,来一个客户端就创建一个线程,但是客户端到了一定程度电脑就会承担不住所以不可能无限制的创建线程。我们可以使用池的方式,池的概念是定义一个池子,在里面放上固定数量的进程或线程,这样可以保证计算机硬件安全的情况下提升程序的运行效率。

进程池:
		提前创建好固定数量的进程,后续反复使用这些进程
线程池:
		提前创建好固定数量的线程,后续反复使用这些线程
		
注意点:
		如果任务超出了池子里面的最大进程或线程数 则原地等待
		进程池和线程池其实降低了程序的运行效率 但是保证了硬件的安全


'实现进程池需要使用 concurrent.futures 模块下的 ProcessPoolExecutor '
'实现线程池需要使用 concurrent.futures 模块下的 ThreadPoolExecutor '

线程池

代码示例一(基础使用)

		from concurrent.futures import ThreadPoolExecutor
		from threading import current_thread  # 调用模块查看线程名
		import time
		
		pool = ThreadPoolExecutor(5) # 不指定数字默认为 cpu_count(CPU数量) + 4 
		'''上面的代码执行之后就会立刻创建五个等待工作的线程'''
		def run(x):
		    time.sleep(1)
		    print(x)
		    print(current_thread().name)  # 打印线程号
		
		for i in range(15):
		    pool.submit(run, i)  # 提交指定任务以及参数给线程池,


1. 在示例中创建了包含5个线程的线程池,并向线程池提交多个任务。
2. 运行的结果我们可以发现打印出来的线程号只有5个,并且是乱序的,说明只有5个线程在工作,且任务的
提交方式是异步的,而不是同步等待其运行结束
获取任务返回信息,在示例一中不能获取到任务的返回信息,可以使用方法result()
代码示例二(result()方法同步)

		from concurrent.futures import ThreadPoolExecutor
		from threading import current_thread
		import time
		
		pool = ThreadPoolExecutor(5)
		def run(x):
		    time.sleep(1)
		    print(x)
		    return current_thread().name
		
		for i in range(15):
		    print(pool.submit(run, i).result())


1.  在提交任务的时候使用方法result(),提交的方式就会变成同步,且打印的线程号是同一个。
2.  且任务函数的返回值就是方法result()的值。
虽然使用方法result()可以获取任务的返回值,但是却改变了提交的形式,我们应该让异步自动提醒。也就是
'异步回调机制',使用的方法是 add_done_callback()

代码示例三(异步回调)

		from concurrent.futures import ThreadPoolExecutor
		from threading import current_thread
		import time
		
		pool = ThreadPoolExecutor(5)
		def run(x):
		    time.sleep(1)
		    print(x)
		    return current_thread().name
		
		def func(*args, **kwargs):
		    print(args[0].result(), kwargs)
		
		for i in range(15):
		    pool.submit(run, i).add_done_callback(func)


1. add_done_callback()方法是在任务有结果的时候执行括号里的函数例如示例中的func函数,在func函
数中的args[0].result()就是任务函数run的返回值。

进程池

进程池和线程池类似

代码示例一(基础使用)

		from concurrent.futures import ProcessPoolExecutor
		import os
		import time
		pool = ProcessPoolExecutor(5)  # 立刻创建5个进程,不指定默认为CPU的数量
		
		def run(x):
		    time.sleep(1)
		    print(f'({ 
     x})')
		    print(f'(进程号:{ 
     os.getpid()})')  # 获取进程号
		
		
		if __name__ == '__main__':
		    for i in range(7):
		        pool.submit(run, i)


1. 也是先创建指定进程,在使用方法submit传任务以及参数。打印进程号发现也只有指定个数的进程以异步
的形式在工作
代码示例(异步回调)

		from concurrent.futures import ProcessPoolExecutor
		from threading import current_thread
		import os
		import time
		
		pool = ProcessPoolExecutor(5)
		def run(x):
		    time.sleep(1)
		    print(f'({ 
     x})')
		    # return current_thread().name
		    return f'进程号:{ 
     os.getpid()}'
		
		def func(*args, **kwargs):
		    print(f'({ 
     args[0].result()})')
		
		if __name__ == '__main__':
		    for i in range(7):
		        pool.submit(run, i).add_done_callback(func)


1. 在 submit 方法后使用方法 add_done_callback,当任务有结果是调用括号里的函数对结果进行操作
args[0].result()也是任务函数的返回值。

今天的文章python线程池并发_编译python源码分享到此就结束了,感谢您的阅读。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:http://bianchenghao.cn/71803.html

(0)
编程小号编程小号

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注