Published in:2024-10-24 |

进程和线程

一、引言

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
1. 程序:是完成特定任务的一系列的指令(代码)的集合,或者是打包好的二进制的可执行的文件.(程序本身是一个静态实体)

2. 进程:是一个具有一定功能的程序在一个数据集上的一次动态执行过程,是操作系统进行资源分配和调度的一个独立单位(进程是动态实体)
- 一个应用程序的实例对应着一个进程
- 进程是操作系统动态执行的基本单元,它可以申请和拥有系统资源
- 进程有自己独立的空间(数据空间:文本域、数据域、堆栈域)
特征:
- 动态性:进程的实质是程序在操作系统中的一次执行过程,进程是动态产生的,动态消亡的+
- 并发性:任何的进程都可以同其它的进程一起并发执行(多任务)
- 独立性:进程是一个能独立运行的基本单位,同时也是操作系统分配资源和调度的独立单位.


3. 线程:是操作系统能够进行运算调度的最小单位.
- 一个进程可以有多个线程,各个线程之间共享程序的空间(进程资源)
- 一个进程中至少包含一个主线程 (一条河流至少有主流)
- 线程与线程之间可以相互独立,但资源是共享的
- 一个线程对应一个任务
1
2
3
4
# 并行:
指在同一时刻,有多条指令在多个处理器上同时执行,所以无论是从微观还是宏观来看,二者都是一起执行的.
# 并发:
指在同一时刻只能有一条指令执行,但是多个进程指令可以被快速的轮换着去执行,使得在宏观上具有多个进程同时执行的效果,但是在微观上并不是同时执行。(宏观并行微观串行)

二、发展背景

1
2
3
4
5
6
7
8
9
10
11
12
13
1. 几乎所有的现代操作系统都是多任务的操作系统.
- 一边在使用浏览器上网,一边听音乐,一边在用Pycharm写代码

2. 多核CPU可以并行地执行多个任务,但是由于任务数量远远大于CPU的核心数量,因此在单个CPU上会出现并发地执行多个任务的情况

3. 单核CPU:实际上对于单核CPU在同一时刻,只能运行一个任务(进程),CPU不断地在这些进程间轮换执行.
- 一个任务执行一小段时间后,会被强制的暂停,切换到另一个任务去执行,每个任务轮流执行
- CPU执行效率很高,时间片短,各个任务之间的切换就很快,人感觉不到(宏观并行,微观串行)
- 任务调度采用的是时间片轮转的抢占式调度(抢占式优先级调度策略)
- 时间片:每一个进程分配一定的时间片,它是进程允许执行的时间

抢占式:优先级高的先执行
非抢占式:按顺序执行

三、多任务

1
2
3
1. 一种是启动多个进程,每个进程有一个主线程,但是多个进程可以同时执行多个任务
2. 启动一个进程,在一个进程内启动多个线程,这样多个线程就可以同时执行多个任务
3. 多进程+多线程(一般不用)

四、多线程

1
2
3
4
单线程和多线程:
1. 单线程:一个进程中默认只有一个主线程,程序的所有资源都供给了主线程(之前写的代码都是运行在主线程中)

2. 多线程:一个进程中可以有多个线程,多个线程之间同时执行,之前会相互抢占资源
1
2
3
4
5
6
7
8
9
10
11
12
13
def music(name, loop):
for i in range(loop):
print('Listen the music %s' % name)


def movie(name, loop):
for i in range(loop):
print('watch the movie %s' % name)


if __name__ == '__main__':
music('凉凉',3)
movie('小电影',3) # 先执行了听音乐再执行了看电影 不能同时进行
多线程模块
1
2
1. Python2: thread,threading 
2. Python3: threading
1
2
3
4
5
6
7
8
9
10
# threading.Thread类
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, *, daemon=None):
pass

- group: 是一个预留参数,用于将来扩展功能
- target: 目标,这个线程要执行的任务(函数)
- name: 线程的名字 默认Thread-N(1,2,3,4,5,6...)
- args: 位置参数
- kwargs: 关键字参数
创建多线程的方式
  • 使用Thread类直接创建线程(子线程)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import threading
import time

def music(name, loop):
print(threading.current_thread().getName()) # -> Thread-1
for i in range(loop):
time.sleep(0.1)
print('Listen the music %s,%d' % (name,i))


def movie(name, loop):
print(threading.current_thread().getName()) # -> Thread-2
for i in range(loop):
time.sleep(0.1)
print('watch the movie %s,%d' % (name,i))


if __name__ == '__main__':
# music('凉凉',3)
# movie('小电影',3)
t1 = threading.Thread(target=music,args=('凉凉',3))
t2 = threading.Thread(target=movie,args=('小电影',3))
t1.start()
t2.start() # 启动后处理就绪状态
  • 继承Thread类,并实现run方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import threading


class MyThread(threading.Thread):
def __init__(self, name):
super().__init__()
self.name = name


def run(self):
for i in range(30):
print(self.name)


if __name__ == '__main__':
t1 = MyThread('1111')
t2 = MyThread('2222')
t1.start()
t2.start()
Thread类中的方法
1
2
3
4
5
1. getName()  
获取当前线程的名字
主线程:MainThread
- threading.current_thread().getName()
子线程:Thread-N(1,2,3,4...)
1
2
2. setName(name)
设置线程的名字
1
2
3. ident: 
返回当前线程的id
1
2
3
4
4. is_alive(): 
返回当前线程的状态
当线程未结束时,返回为True
当任务结束,返回为False
1
2
3
4
5. join([timeout])  
在什么地方调用join,就会阻塞哪里的线程(一般都加在主线程中)
join方法的功能是在程序的指定位置,优先让该方法的调用者使用CPU资源
timeout参数是一个可选参数,其功能是指定线程最多可以占用CPU资源的时间,如果不使用该参数,默认是线程执行结束
1
2
3
4
5
6
6. setDaemon(bool)
当程序中有多个线程时,主线程的结束并不会影响子线程的继续执行,换句话只有当程序中所有的线程执行完毕后,程序才算真正的结束
设置守护线程,必须要在启动线程之前设置
设置为守护线程后,当主线程任务结束,所有的守护线程都会被强制结束

# Python解释器的垃圾回收机制就是守护线程,当程序中所有的主线程及非守护线程执行完毕后,垃圾回收机制也就没有继续执行的必要了

五、进程和线程的状态

1
2
3
4
# 进程的三个状态:
(1)运行状态:进程占用CPU,正在运行
(2)就绪状态:进程具备运行条件,等待系统分配CPU资源以便运行
(3)等待状态:又称为阻塞状态或睡眠状态,指进程不具备运行条件,正在等待某个事件完成
1
2
3
4
5
6
7
8
9
# 线程的五个状态:
(1)创建状态:程序还没有开始运行线程中的代码,刚创建好线程
(2)就绪状态:线程对象调用start()方法后启动了线程,然后进入就绪状态
(3)运行状态:当线程获取到了CPU资源后,它才进入到运行状态
(4)阻塞状态:线程中的任务暂停,不再占有CPU资源 sleep()、input
(5)死亡状态:线程中的任务正常执行完毕或异常退出

创建状态 -> 就绪状态 -> 运行状态 -> 死亡状态
阻塞状态

六、GIL锁

1
2
3
4
5
6
7
8
9
10
11
12
13
GIL:全局解释锁 Global Interpreter Lock

1. GIL并不是Python语言的特性,Python不依赖于GIL,CPython中有GIL锁而JPython就没有

2. GIL锁是为了解决多线程之间数据完整性和状态同步的问题(安全),设计为在任意时刻只有一个线程在解释器中运行.

3. GIL锁极大的影响了多线程的效率,让多线程几乎等同于单线程

4. 由于GIL锁的存在,只有拥有GIL锁的线程才会拥有CPU资源,执行任务

5. 对于计算密集型的任务(进行数学计算、矩阵运算、图像处理、视频编解码等),使用多线程(伪多线程)去处理,反而会由于多线程之间来回切换(获取锁和释放锁)导致效率很低(一个进程只有一个GIL锁) - 使用多进程解决

6. 虽然存在GIL锁,但是Python的多线程切换速度很快,所在在宏观上依然是'并行',但是微观上是串行.

七、线程安全

1
2
3
4
5
6
7
8
9
10
11
12
13
1. 线程同步:
同步:线程A在执行某个任务时,线程B需要线程A执行完毕之后,才能执行
异步:线程A在执行某个任务时,线程B不需要等待可以直接执行

2. 数据污染/脏读:
进程中的多个线程共享进程资源, 就有可能造成多个线程同时访问数据而导致数据不一致.

3. 原子操作:
原子是不可分割的最小工作单位,在执行某个任务完毕之前不会被其它的任务或事件中断.(一个线程中的任务一次性执行完毕)

4. 锁:
保护原子操作不被破坏 锁:同步锁/互斥锁
threading.Lock
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 这里会发生数据脏读问题
import threading, time

lst = ['A', 'B', '', '', '', ]
index = 2

def fun(ch):
global lst
global index
lst[index] = ch
time.sleep(0.1)
index += 1

if __name__ == '__main__':
t1 = threading.Thread(target=fun, args=('C',))
t2 = threading.Thread(target=fun, args=('D',))
t1.start()
t2.start()
t1.join() # 让t1和t2优先拥有CPU资源
t2.join()
print(lst)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 加锁之后就是线程同步 one by one 
import threading, time

lst = ['A', 'B', '', '', '', ]
index = 2

lock = threading.Lock() # 创建一把锁

def fun(ch):
global lst
global index
lock.acquire() # 获取这把锁
lst[index] = ch
time.sleep(0.1)
index += 1
lock.release()


if __name__ == '__main__':
t1 = threading.Thread(target=fun, args=('C',))
t2 = threading.Thread(target=fun, args=('D',))
t1.start()
t2.start()
t1.join() # 让t1和t2优先拥有CPU资源
t2.join()
print(lst)
  • 死锁问题
1
Lock对象在一个线程中对同一个原子操作,只有一次机会使用acquire来给线程加锁,如果出现了多次,则就会出现死锁状态,无法正常使用
1
2
3
4
5
6
7
8
9
10
def fun(ch):
global lst
global index
lock.acquire() # 获取这把锁
lock.acquire() # 获取这把锁两次 就会产生死锁

lst[index] = ch
time.sleep(0.1)
index += 1
lock.release()
1
2
3
解决死锁问题:
使用RLock对象来解决死锁问题,允许同一个原子操作来重复获取锁
lock = threading.RLock()

八、多进程模块

1、多进程模块
1
2
3
# multiprocessing 多进程模块
1. 解决多线程于计算密集型任务处理速度慢的情况
2. 多进程不受GIL锁的限制,可以达到并行状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import multiprocessing
import time, random


def run(name):
print('%s running' % name, multiprocessing.current_process().pid)
time.sleep(random.randrange(1, 5))
print('%s running end' % name)


if __name__ == '__main__':
p1 = multiprocessing.Process(target=run, args=('anne',))
p2 = multiprocessing.Process(target=run, args=('alice',))
p3 = multiprocessing.Process(target=run, args=('biantai',))
p1.start()
p2.start()
p3.start()
p1.join() # 需要等待p1任务结束
print('主进程')
2、进程之间数据不共享
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import multiprocessing, time

lst = ['A', 'B', '', '', '', ]
index = 2


def fun(ch):
global lst
global index
lst[index] = ch
time.sleep(0.1)
index += 1
print(lst) # 打印全局变量lst

if __name__ == '__main__':
t1 = multiprocessing.Process(target=fun, args=('C',))
t2 = multiprocessing.Process(target=fun, args=('D',))
t1.start()
t2.start()
t1.join() # 让t1和t2优先拥有CPU资源
t2.join()

# 每个进程拥有自己独立的空间
3、进程锁
1
2
3
1. 进程之间的数据是不共享的,但是共享同一套文件系统,所以访问同一个文件时可能会有问题

2. 所以此时需要加锁处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import time
from multiprocessing import Process,Lock


def buy_ticket(i, lock):
lock.acquire() # 加锁
with open('ticket.txt') as f:
count = int(f.read())
time.sleep(0.1)
if count > 0:
count -= 1
print(f'\033[32m{i}买到票了\033[0m')
else:
print(f'\033[31m{i}没有买到票\033[0m')
time.sleep(0.1)
with open('ticket.txt', 'w') as f:
f.write(str(count))
lock.release() # 释放锁


if __name__ == '__main__':
lock = Lock()

for i in range(1, 11):
p = Process(target=buy_ticket, args=(i, lock))
p.start()
4、进程池
1
2
3
1. 进程池:在进程任务特殊多的情况下,手动挨个来管理进程比较麻烦,可以通过设置进程池来进程管理

2. 可以设置指定数量的进程,当有新的请求提交到进程池中,如果进程池还没满,则可以直接创建进的进程去执行,如果进程满了(达到最大数量),则会等待进程池中某个进程结束,才会创建新的进程并执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import multiprocessing, time


def do_thing(n):
print(f'第{n}个人上厕所')
time.sleep(2)
print(f'第{n}个人上厕所结束')


if __name__ == '__main__':
pool = multiprocessing.Pool(3) # 三个坑

for i in range(5):
# async:异步(同时)
pool.apply_async(func=do_thing, args=(i + 1,))

pool.close() # 关闭进程池 不能再让新的进程进来了
pool.join()

常用方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
1. apply() 
同步,阻塞池中的进程(等待状态)

2. apply_async()
异步,非阻塞式,池中的进程可以同时执行

3. close()
关闭进程池,不再接受新的请求(任务)

4. teminate()
结束工作进程,不再处理未完成的任务(正在执行中的任务中断)

5. join()
阻塞主进程,等待子进程的结束

九、进程间通信

1
2
3
4
5
6
7
8
9
10
11
1. 操作系统会为每一个进程分配一个独立的地址空间,不同进程的地址空间是完全隔离的

2. 虽然可以用文件共享数据实现进程间通信(买票的案例),但是问题是:
- 效率低(对文件读写操作)
- 需要自己加锁处理

3. Python中进程间通信:
- 队列 Queue
- 管道 Pipe

4. 队列和管道都是将数据存放在内存中而不是文件
1、队列

队列并不是数据的共享,而是数据的传递

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import multiprocessing, time


def put(queue):
for i in ['A', 'B', 'C']:
print(f'发送{i}到队列中')
queue.put(i)
time.sleep(1)


def get(queue):
while 1:
value = queue.get() # 从队列取数据,并将数据从队列中删除
print(f'从队列中取到{value}')



if __name__ == '__main__':
queue = multiprocessing.Queue()
# 这个进程是向队列中存数据
p_put = multiprocessing.Process(target=put, args=(queue,))
p_get = multiprocessing.Process(target=get, args=(queue,))

p_put.start()
p_get.start()

p_put.join()
p_get.terminate() # 终止正在执行的进程
2、管道
1
2
3
4
5
6
1. Pipe(duplex=True)
返回的是一个元组(conn1,conn2)表示管道的两端
duplex默认为True,表示管道是双工模式
duplex是False,conn1只负责接收消息,conn2来发送消息

2. send()、recv() 发送消息和接收消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import multiprocessing, time


def put(pipe):
for i in ['A', 'B', 'C']:
print(f'发送{i}到管道中')
pipe[1].send(i)
time.sleep(1)


def get(pipe):
while 1:
value = pipe[0].recv() # 从队列取数据,并将数据从队列中删除
print(f'从管道中取到{value}')



if __name__ == '__main__':
pipe = multiprocessing.Pipe()
# 这个进程是向队列中存数据
p_put = multiprocessing.Process(target=put, args=(pipe,))
p_get = multiprocessing.Process(target=get, args=(pipe,))

p_put.start()
p_get.start()

p_put.join()
p_get.terminate() # 终止正在执行的进程

十、生产者与消费者

1
生产者与消费者模式:生产者与消费不直接通讯,而是通过队列进行通讯,队列是一个缓冲区,生产者产生了数据丢给队列(缓冲区),消费者从队列取数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 消费者
def consumer(q):
while 1:
res = q.get()
if res is None:
break
# time.sleep(random.randrange(3,5))
print('\033[32m %s 吃 %s\033[0m' % (os.getpid(), res))


# 生产者
def producer(q):
for i in range(10):
res = '包子%s' % (i + 1)
time.sleep(random.randrange(1, 3))
q.put(res)
print('\033[31m %s 生产了 %s\033[0m' % (os.getpid(), res))
q.put(None)


if __name__ == '__main__':
queue = Queue()
# 生产者
p1 = Process(target=producer, args=(queue,))
# 消费者
c1 = Process(target=consumer, args=(queue,))
p1.start()
c1.start()
print('主...')

补充:threadlocal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 每一个线程会创建一个副本,访问的是自己内容的局部变量

import threading

local = threading.local() # 全局变量


def f(value):
local.val = value
print('%s 中的值为 %s' % (threading.current_thread(), local.val))


if __name__ == '__main__':
t1 = threading.Thread(target=f, args=('t1',))
t2 = threading.Thread(target=f, args=('t2',))
t1.start()
t2.start()
t1.join()
t2.join()
Prev:
Next: