python开发-多任务

python开发

多任务

并发:单核执行多个任务,或者任务数大于核数

并行:多核多个任务,任务数大于核数时也存在并发

进程

一个运行的程序就可以认为是一个进程,进程负责申请资源(它是操作系统分配资源的基本单位),真正干活儿的是线程。

比如现实中的公司可以认为是一个进程,公司提供办公资源(电脑、办公桌活的是员工,员工可以理解成线程

注意:
一个程序运行后至少有一个进程,一个进程默认有一个线程,进程里面可以创建多个线程,线程是依附在进程里面的,没有进程就没有线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import multiprocessing

def dance(n):
for i in range(n):
print(f"dance {i}")

def sing(n):
for i in range(n):
print(f"sing {i}")

if __name__ == '__main__':
# multiprocessing.freeze_support()
dance_process = multiprocessing.Process(target=dance,args=(100,))
sing_process = multiprocessing.Process(target=sing,args=(100,))

dance_process.start()
sing_process.start()

multiprocessing

.Process()参数

.Process()参数:

group:进程组,目前不能用也很少用,只能为None

target:执行任务函数名

args:以元组的方式给任务参数

kwargs:以字典方式给任务传参

name:进程名,一般不会去使用,默认Process-1…2

基本方法

主进程、子进程、进程id、父进程id、杀死进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import multiprocessing
import os

def dance(n):
ppid = os.getppid()
pid = os.getpid()
process_name = multiprocessing.current_process()
print(f"当前进程id:{pid}",f"进程:{process_name}",f"父进程:{ppid}")
for i in range(n):
print(f"dance {i}")
os.kill(pid,9)
def sing(n):
ppid = os.getppid()
pid = os.getpid()
process_name = multiprocessing.current_process()
print(f"当前进程id:{pid}",f"进程:{process_name}",f"父进程:{ppid}")
for i in range(n):
print(f"sing {i}")
os.kill(pid,9)

if __name__ == '__main__':
# multiprocessing.freeze_support()
main_pid = os.getpid()
process_name = multiprocessing.current_process()
print(f"当前进程id:{main_pid}",f"进程:{process_name}")
dance_process = multiprocessing.Process(target=dance,args=(100,)) # ,name="dance_process"
sing_process = multiprocessing.Process(target=sing,args=(100,)) # ,name="sing_process"

dance_process.start()
sing_process.start()
join进程等待
传参

可以使用元组(传入顺序和功能函数参数顺序一致)或者字典传参(key名和参数名一致)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import multiprocessing
import os

def show_info(name,age):
print(name,age)

if __name__ == '__main__':
# multiprocessing.freeze_support()
main_pid = os.getpid()
process_name = multiprocessing.current_process()
print(f"当前进程id:{main_pid}",f"进程:{process_name}")
tu=("ty",30)
di = {"name":"ty","age":30}
# dance_process = multiprocessing.Process(target=show_info,args=tu)
dance_process = multiprocessing.Process(target=show_info,kwargs=di)
# dance_process = multiprocessing.Process(target=show_info,args=("ty",),kwargs={"age":30})
dance_process.start()



进程注意点
进程之间不共享全局变量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import multiprocessing
import os
import time

g_list = []

def add_data():
for i in range(5):
g_list.append(i) # 这里不用global声明全局变量是因为,list是可变变量,修改值不会改变内存地址
time.sleep(0.3)
print("add完成:",g_list)

def read_data():
print("read:",g_list)

if __name__ == '__main__':
# multiprocessing.freeze_support()
main_pid = os.getpid()
process_name = multiprocessing.current_process()
print(f"当前进程id:{main_pid}",f"进程:{process_name}")
sub_process1 = multiprocessing.Process(target=add_data)
sub_process2 = multiprocessing.Process(target=read_data)
sub_process1.start()
sub_process1.join() # 主进程等待上一个子进程执行完,再执行后续代码
print("主进程g_list:",g_list)
sub_process2.start()

提示:对应linux和mac主进程执行的代码不会拷贝进子进程,但是对应window系统来说主进程执行的代码也会进行拷贝进子进程,对应window来说创建子进程的代码如果进子进程相当于无限制创建子进程会报错
解决:windows需要加上if –name– == “–main–”

它的作用:

  • 防止别人导入文件的时候执行main里面的代码
  • 防止windows系统无限递归创建子进程
1
2
3
4
5
(venv) D:\code\PYTHON>d:/code/PYTHON/venv/Scripts/python.exe d:/code/PYTHON/src/muti_global_var.py
当前进程id:1904 进程:<_MainProcess name='MainProcess' parent=None started>
add完成: [0, 1, 2, 3, 4]
主进程g_list: []
read: []
主进程会等待子进程执行完再退出(默认)
1
2
3
4
5
6
7
8
9
10
11
12
13
import multiprocessing
import time

def task():
for i in range(10):
print("procssing!")
time.sleep(0.3)


if __name__ == "__main__":
sub_process = multiprocessing.Process(target=task)
sub_process.start()
print("over!")
1
2
3
4
5
6
7
8
9
10
11
12
(venv) D:\code\PYTHON>d:/code/PYTHON/venv/Scripts/python.exe d:/code/PYTHON/src/主进程会等待子进程执行完才结束.py
over!
procssing!
procssing!
procssing!
procssing!
procssing!
procssing!
procssing!
procssing!
procssing!
procssing!
守护主进程.daemon和.terminate()实现让子进程不能无限时执行

结论:主进程会等待子进程执行完成以后再退出
解决办法:主进程退出子进程退出
1.让子进程设置成为守护主进程,主进程退出子进程销毁

2.让主进程退出前,子进程销毁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import multiprocessing
import time

def task():
for i in range(10):
print("procssing!")
time.sleep(0.3)


if __name__ == "__main__":
sub_process = multiprocessing.Process(target=task)
sub_process.daemon = True
sub_process.start()
time.sleep(5)
print("over!")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import multiprocessing
import time

def task():
for i in range(10):
print("procssing!")
time.sleep(0.3)


if __name__ == "__main__":
sub_process = multiprocessing.Process(target=task)
# sub_process.daemon = True
sub_process.start()
time.sleep(5)
sub_process.terminate()
print("over!")

线程

概念

线程是进程中执行代码的一个分支,每个执行分支(线程)要想工作执行代码需要cpu进行调度,也就是说线程是cpu调度的基本单位,每个进程至少都有一个线程,而这个线程就是我们通常说的主线程

说明:程序启动默认会有一个主线程,程序员自己创健的线程可以称为子线程,多线程可以完成多任务。

单进程里的多线程共享资源

使用

.Process()参数:

group:线程组,目前不能用也很少用,只能为None

target:执行任务函数名

args:以元组的方式给任务传参

kwargs:以字典方式给任务传参

name:线程名,一般不会去使用,默认thread-1…2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import threading
import time

def sing():
for i in range(10):
thread_name = threading.current_thread()
print("current thread:",thread_name)
print("singing!")
time.sleep(0.5)

def dance():
for i in range(10):
thread_name = threading.current_thread()
print("current thread:",thread_name)
print("dancing!")
time.sleep(0.5)

if __name__ == "__main__":
thread_name = threading.current_thread()
print("current thread:",thread_name)
t = threading.Thread(target=sing)
t = threading.Thread(target=dance)
t.start()
t.start()
传参

和进程那节一模一样

注意

1.线程之问执行是无序的(具体哪个线程执行是由CPU调度的)(多进程执行顺序,是由操作系统调度进程决定的)
2.主线程会等待所有的子线程执行结束再结束(可以使用守护主线程或者terminate()改变)
3.线程之间共享全局变量
4.线程之间共享全局变量数据出现错误问题

验证无序
1
2
3
4
5
6
7
8
9
10
11
12
13
import threading
import time

def task():
time.sleep(0.5)
print(threading.current_thread())

if __name__ == "__main__":
thread_name = threading.current_thread()
print("current thread:",thread_name)
for i in range(20):
t = threading.Thread(target=task)
t.start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
(venv) D:\code\PYTHON>d:/code/PYTHON/venv/Scripts/python.exe d:/code/PYTHON/src/multi_thread/验证多线程执行无序.py
current thread: <_MainThread(MainThread, started 20140)>
<Thread(Thread-1 (task), started 5608)>
<Thread(Thread-3 (task), started 5192)>
<Thread(Thread-4 (task), started 11116)>
<Thread(Thread-2 (task), started 18808)>
<Thread(Thread-6 (task), started 13096)>
<Thread(Thread-5 (task), started 11696)>
<Thread(Thread-7 (task), started 20192)>
<Thread(Thread-9 (task), started 9732)>
<Thread(Thread-8 (task), started 2900)>
<Thread(Thread-10 (task), started 21060)>
<Thread(Thread-12 (task), started 18512)>
<Thread(Thread-11 (task), started 21540)>
<Thread(Thread-13 (task), started 1544)>
<Thread(Thread-14 (task), started 18260)>
<Thread(Thread-15 (task), started 15168)>
<Thread(Thread-16 (task), started 19872)>
<Thread(Thread-17 (task), started 21984)>
<Thread(Thread-18 (task), started 19772)>
<Thread(Thread-19 (task), started 12420)>
<Thread(Thread-20 (task), started 19504)>
验证等待
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import threading
import time

def task():
while True:
print("procssing!")
time.sleep(0.3)


if __name__ == "__main__":
sub_thread = threading.Thread(target=task)
# sub_thread = threading.Thread(target=task,daemon=True)
sub_thread.daemon = True
sub_thread.start()
time.sleep(3)
# sub_thread.terminate()
print("over!")
验证共享变量

多个线程在同一个进程中,共享全局变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import threading
import os
import time

g_list = []

def add_data():
for i in range(5):
g_list.append(i)
time.sleep(0.3)
print("add完成:",g_list)

def read_data():
print("read:",g_list)

if __name__ == '__main__':
# multiprocessing.freeze_support()
thread_name = threading.current_thread()
print(f"线程:{thread_name}")
sub_thread1 = threading.Thread(target=add_data)
sub_thread2 = threading.Thread(target=read_data)
sub_thread1.start()
# sub_thread1.join()
sub_thread2.start()
print("主进程g_list:",g_list)
1
2
3
4
5
(venv) D:\code\PYTHON>d:/code/PYTHON/venv/Scripts/python.exe d:/code/PYTHON/src/multi_thread/验证多线程共享变量.py
线程:<_MainThread(MainThread, started 19340)>
read: [0]
主进程g_list: [0]
add完成: [0, 1, 2, 3, 4]
验证同一进程里的多线程共享同一全局变量可能会出错

g_sum += 1实际上分三步,这三步可能被其他线程中断

  1. 读取 g_sum 的值到寄存器
  2. 在寄存器中加1
  3. 将结果写回 g_sum
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import threading

g_sum = 0
def add():
for i in range(10000000):
global g_sum
g_sum += 1
print(g_sum)

def add2():
for i in range(10000000):
global g_sum
g_sum += 1
print(g_sum)


if __name__ == "__main__":
t1 = threading.Thread(target=add)
t2 = threading.Thread(target=add2)
t1.start()
t2.start()

# print(g_sum)

1
2
3
4
(venv) D:\code\PYTHON>d:/code/PYTHON/venv/Scripts/python.exe d:/code/PYTHON/src/multi_thread/验证同一进程里的多线程共享变量可能会出
错.py
19353154
20000000
join线程同步(等待)

线程同步(等待):保证同一时刻只能有一个线程去操作全局变量

同步:就是协同步调,按预定的先后次序进行运行。如:你说完,我再说,好比现实生活中的对讲机

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import threading

g_sum = 0
def add():
for i in range(10000000):
global g_sum
g_sum += 1
print(g_sum)

def add2():
for i in range(10000000):
global g_sum
g_sum += 1
print(g_sum)


if __name__ == "__main__":
t1 = threading.Thread(target=add)
t2 = threading.Thread(target=add2)
t1.start()
t1.join()
t2.start()

# print(g_sum)

互斥锁

互斥锁:对共享数据进行锁定,保证网一时刻只能有一个线程去操作。
注意:
互斥锁是多个线程一起去抢,抢到锁的线程先执行,没有抢到锁的线程需要等待,等互斥锁使用完释放后,其它等待的线程再去抢这个锁。

要加锁,涉及修改全局变量的地方都要加锁,不然还是有问题

互斥锁可以保证同一时刻只有一个线程去执行代码,能够保证全局变量的数据没有问题。线程等待和互斥锁都是把多任务改成单任务去执行,保证了数据的准确性,但是执行性能会下降

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import threading
lock = threading.Lock()

g_sum = 0
def add():
lock.acquire_lock() # 上锁
for i in range(10000000):
global g_sum
g_sum += 1
print(g_sum)
lock.release_lock() # 释放锁

def add2():
lock.acquire_lock() # 上锁
for i in range(10000000):
global g_sum
g_sum += 1
print(g_sum)
lock.release_lock() # 释放锁


if __name__ == "__main__":
t1 = threading.Thread(target=add)
t2 = threading.Thread(target=add2)
t1.start()
t2.start()

# print(g_sum)
死锁

死锁:一直等待对方释放锁的情景就是死锁

死锁要在合适的地方进行释放,防止死锁
取值不成功,也需要释放互斥锁,不要影响后面的线程去取值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import threading

lock = threading.Lock()
def get_ele(index):
lock.acquire_lock()
my_list = [1,2,3,4]
if index >= len(my_list):
print("index超出范围!")
# lock.release_lock() # 这里有注释时就是死锁案例
return
print(my_list[index])
lock.release_lock()

if __name__ == "__main__":
for i in range(10):
t = threading.Thread(target=get_ele,args=(i,))
t.start()
1
2
3
4
5
6
7
(venv) D:\code\PYTHON>d:/code/PYTHON/venv/Scripts/python.exe d:/code/PYTHON/src/multi_thread/死锁.py
1
2
3
4
index超出范围!

with

用with语句可以自动释放锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import threading

lock = threading.Lock()
def get_ele(index):
# lock.acquire_lock()
with lock:
my_list = [1,2,3,4]
if index >= len(my_list):
print("index超出范围!")
# lock.release_lock()
return
print(my_list[index])
# lock.release_lock()

if __name__ == "__main__":
for i in range(10):
t = threading.Thread(target=get_ele,args=(i,))
t.start()

进程和线程对比

关系对比

1.先有进程再有线程,线程依赖于进程,一个进程默认提供一个主线程

区别对比

1.进程之间不共享全局变量,每个子进程只是对主进程的一个克隆

2.同一进程中的多线程共享全局变量。线程共享全局变量可能会出错,可以用线程同步或者互斥锁

3.创建进程的资源开销要比创建线程的资源开销大

4.进程是操作系统资源分配的基本单位(操作系统调度进程),线程是CPU调度的基本单位

5.线程不能独立执行,必须依赖于进程

6.多进程开发(一个进程崩溃不影响其他进程;内存隔离,相互独立;通信成本高)比多线程开发(一个线程崩溃可能拖垮整个进程;共享内存,可能相互破坏;通信成本低)稳定

优缺点对比

  • 优缺点对比

    进程优缺点:
    。优点:可以用多核。缺点:资源开销大

    线程优缺点

    。优点:资源开销小。缺点:不能使用多核

跟计算密集型相关的使用多进程;文件写入,文件下载,I/O操作使用多线程

小结

协程(爬虫中讲)

计数器:同一进程里不同线程的计数器是不一样的,比如线程执行的代码完全一样,但是可能每个线程计数器记录的当前执行到的行数是不一样的。

协程:就是指线程里的代码不是从上到下依次执行的,比如代码分为三个函数,它可能执行1函数一段时间后,又去执行2一段时间,又去执行3函数一段时间,然后再去执行1

进程有操作系统调度,线程的执行CPU调度,协程的执行顺序由程序员编写的程序控制

image-20260509145228317

生成器yield能实现这样的效果,但是控制起来很麻烦,所以有了协程库。

协程库

切换的场景在程序中发生阻塞的时候(读磁盘,读写文件,网络IO操作,收发hTTP请求)。

gevent、eventlet协程库自动切换

1
2
3
4
5
6
7
8
9
import eventlet
eventlet.monkey_patch() # 直接将python原生代码中涉及上面阻塞场景的代码全部替换掉,使用起来没什么区别

import socketio
import eventlet.wsgi

sio = socketio.Server(async_mode = 'event_let') # 指明在eventlet模式下
app = socketio.Middleware(sio)
eventlet.wsgi.server(eventlet.listen(('',8000)),app)

并发执行

实际产品案例

1
2
3
4
5
from qcloud_cos.cos_threadpool import SimpleThreadPool
pool = SimpleThreadPool()
for file in file_infos:
pool.add_task(self.client.download_file, bucket_name, file_cos_key, localName)
pool.wait_completion()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import os
import threading
from typing import Any, Optional,Dict, List
from cos_browser import COSBrowser
import yaml
# from cos_api import CosUtils



class CosFactory:
"""COS客户端工厂类,支持多账号管理和通过桶名选择对应的cosutil实例
线程安全版本,使用锁机制保护共享资源
"""

# 桶名到账号配置的映射字典
_bucket_account_map = {}

# 账号名称到配置的映射
_account_configs = {}
# 单例缓存,存储已创建的cosutil实例
_cos_util_instances = {}

# 线程锁,用于保护共享资源的访问
_lock = threading.RLock()

@classmethod
def load_config_from_yaml(cls, config_path: str) -> bool:
"""
从YAML配置文件加载账号信息

Args:
config_path: YAML配置文件路径

Returns:
是否成功加载配置
"""
try:
with open(config_path, 'r', encoding='utf-8') as f:
config_data = yaml.safe_load(f)

if not config_data or 'cosfacotyconfiger' not in config_data:
print(f"Invalid config file format: {config_path}")
return False

cos_configer = config_data['cosfacotyconfiger']
if 'account' not in cos_configer:
print(f"No accounts found in config file: {config_path}")
return False

# 加载所有账号
for account_info in cos_configer['account']:
account_name = account_info.get('name')
account_config = account_info.get('congiger', {})

if not account_name or not account_config:
print(f"Invalid account information: {account_info}")
continue

# 注册账号(暂时没有桶信息,只注册账号配置)
with cls._lock:
cls._account_configs[account_name] = account_config.copy()

# cos_util= CosUtils(account_config)
client = COSBrowser(account_config)
buckets= client.get_bucket_list()

# 如果有对应的cosutil实例,清除它
if account_name in cls._cos_util_instances:
del cls._cos_util_instances[account_name]
else:
cls._cos_util_instances[account_name]=client

# 更新桶名到账号名称的映射
for bucket in buckets:
cls._bucket_account_map[bucket] = account_name

print(f"Successfully loaded account: {account_name}")

return True

except Exception as e:
print(f"Error loading config file {config_path}: {e}")
return False

@classmethod
def get_account_by_bucket(cls, bucket_name: str) -> Optional[str]:
"""
根据桶名获取对应的账号名称
线程安全的实现,使用锁保护共享资源的读取

Args:
bucket_name: 桶名称

Returns:
账号名称,如果未找到返回None
"""
# 使用锁保护共享资源的读取
with cls._lock:
return cls._bucket_account_map.get(bucket_name)

@classmethod
def get_cos_util(cls, bucket_name: Optional[str] = None) -> 'COSBrowser':
"""
根据桶名获取对应的cosutil实例
如果未提供桶名,则使用默认配置创建cosutil实例
线程安全的实现,使用双重检查锁定模式优化性能

Args:
bucket_name: 桶名称

Returns:
CosUtils实例
"""
# 根据桶名确定账号名称
account_name = cls.get_account_by_bucket(bucket_name) if bucket_name else 'default'

# 双重检查锁定模式,提高多线程环境下的性能
# 第一次检查,无锁
if account_name not in cls._cos_util_instances:
# 如果实例不存在,获取锁
# with cls._lock:
# # 再次检查,避免在获取锁期间其他线程已创建实例
# if account_name not in cls._cos_util_instances:
# # 获取账号配置
# config = cls._account_configs.get(account_name, cls._default_config)

# # 创建新的cosutil实例并缓存
# cls._cos_util_instances[account_name] = CosUtils(config)
return None

return cls._cos_util_instances[account_name]

@classmethod
def get_supported_regions(cls) -> List[str]:
"""获取支持的地区列表"""
return ["ap-beijing", "ap-shanghai", "ap-guangzhou"]

@classmethod
def clear_cache(cls) -> None:
"""
清除所有缓存的cosutil实例
线程安全的实现,使用锁保护共享资源
用于重置或清理资源的场景
"""
with cls._lock:
cls._cos_util_instances.clear()

进程

Python 并发编程实战:优雅地使用 concurrent.futures - Piper蛋窝 - 博客园

fork()
1
2
3
4
5
Unix/Linux操作系统提供了一个fork()系统调用

普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。

子进程永远返回0,而父进程返回子进程的ID。
1
2
3
4
5
6
7
8
9
import os

print('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/macOS:
pid = os.fork()
if pid == 0:
print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
1
2
3
4
Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.
由于Windows没有fork调用,上面的代码在Windows上无法运行。

concurrent

1
2
concurrent.futures 是 Python 标准库中一个强大的并发编程模块,提供了线程池和进程池的高级接口,用于简化多线程和多进程的实现。它支持异步执行任务,并通过 Future 对象管理任务的状态和结果。
concurrent.futures 提供了简洁的接口,自动管理线程或进程的生命周期,支持任务的超时和错误处理。相比传统的 threading 和 multiprocessing 模块,它更易于使用,代码更简洁,适合大多数并发场景。
1
concurrent.futures 提供了两个主要的执行器类:ThreadPoolExecutor 和 ProcessPoolExecutor。前者适用于 I/O 密集型任务,后者适用于 CPU 密集型任务。
1
2
3
4
5
6
7
8
9
10
from concurrent.futures import ThreadPoolExecutor
import time
def task(n):
time.sleep(n)
return f"Task {n} completed"
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(task, i) for i in range(5)]
for future in futures:
print(future.result())
此代码创建了一个线程池,最大并发线程数为 3。通过 submit 方法提交任务,并通过 result() 获取任务结果。
1
2
3
4
5
6
submit():提交任务到线程池或进程池,返回一个 Future 对象。
as_completed():生成器方法,按任务完成顺序返回 Future 对象。
map():类似于 Python 内置的 map(),将函数应用于可迭代对象中的每个元素,并返回结果。

线程池 (ThreadPoolExecutor):适合 I/O 密集型任务,如文件操作或网络请求。
进程池 (ProcessPoolExecutor):适合 CPU 密集型任务,如复杂计算或数据处理。
1
2
3
4
5
6
from concurrent.futures import ProcessPoolExecutor
def compute(n):
return sum(i * i for i in range(n))
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(compute, [10**6] * 5))
print(results)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
from concurrent.futures import ThreadPoolExecutor, TimeoutError
def task(n):
if n == 2:
raise ValueError("Task failed")
return f"Task {n} completed"
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(task, i) for i in range(5)]
for future in futures:
try:
print(future.result(timeout=2))
except TimeoutError:
print("Task timed out")
except Exception as e:
print(f"Task raised an exception: {e}")
实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def multiprocess_run(task_file:str, max_workers:int,config_path: str):
task_list = read_data_from_txt(task_file)
# error_list=read_data_from_txt('error_data.txt')

# task_list = [clip_name for clip_name in data_dir_names if not(clip_name in error_list)]
config = load_config(config_path)

cos_config = {
"stId": config['CosConfig']['stId'],
"stKey": config['CosConfig']['stKey'],
"Region": config['CosConfig']['Region']
}

cos_utils = CosUtils(cos_config)
# target_bucket_name = config['TargetBucket'] #目标源COS桶
# dataset_path = config['dataset']['base_path']
# clip_base_path = f"{dataset_path}/CD701_LS6C3G0Y0RA400630_2025-10-08_07-38-08/DrivingInfo"
# cos_result=cos_utils.list_objects(clip_base_path,target_bucket_name)

print(f"总任务数: {len(task_list)}")
print(f"使用进程数: {max_workers}")

#统计三种结果
success_list=[]
failure_list=[] #任务返回失败
exception_list=[] #任务执行出现异常

# 使用ProcessPoolExecutor
with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任务
future_to_task = {
executor.submit(process_task, task,cos_utils,config):task for task in task_list
}

print("所有任务已提交,开始并行处理...")

# 搜集任务结果
completed_count = 0
for future in concurrent.futures.as_completed(future_to_task):
task_name = future_to_task[future]
completed_count += 1

try:
# 获取任务返回结果
success = future.result()
if success:
success_list.append(task_name)
print(f"[{completed_count}/{len(task_list)}] 成功: {task_name}")
else:
failure_list.append((task_name, "process_task 返回 False"))
print(f"[{completed_count}/{len(task_list)}] 失败: {task_name}")
except Exception as exc:
# 其他执行异常
exception_list.append((task_name, str(exc)))
print(f"异常: {task_name} - 错误: {str(exc)}")

if len(success_list)>0:
with open('succcess.txt','w',encoding='utf-8') as f:
for success_name in success_list:
f.write(f"{success_name}:success+\n")
print(f"成功任务列表已保存到:success.txt (共{len(success_list)}个)")

# 如果有失败的任务,保存到文件
if failure_list or exception_list:
failed_tasks = []
with open('failed_tasks.txt', 'w', encoding='utf-8') as f:
for task_name, error in failure_list:
f.write(f"{task_name}: {error}\n")
failed_tasks.append(task_name)
for task_name, error in exception_list:
f.write(f"{task_name}: {error}\n")
failed_tasks.append(task_name)
print(f"💾 失败任务列表已保存到: failed_tasks.txt (共{len(failed_tasks)}个)")
1
2
3
4
5
6
7
创建了一个进程池执行器,最多使用 max_workers个进程
with语句确保使用完后自动关闭进程池
每个进程独立运行,不共享内存,适合CPU密集型任务

executor.submit():提交一个任务到进程池,返回一个 Future对象
future_to_task字典:{Future对象: 对应的任务名}
一次性提交 task_list中的所有任务,任务立即开始并行执行
1
2
3
4
5
for future in concurrent.futures.as_completed(future_to_task):
as_completed():返回一个迭代器,哪个任务先完成就先返回哪个
不按提交顺序,而是按完成顺序处理结果

success = future.result() # 获取任务执行结果
1
2
3
4
5
6
7
8
9
10
11
12
13
with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:
future_to_name = {
executor.submit(single_run, name, clipvistor, input_dic): name
for name in clip_names
}

for future in tqdm(concurrent.futures.as_completed(future_to_name), total=len(future_to_name), desc="处理进度"):
current_name = future_to_name[future]
try:
future.result()
except Exception as exc:
print(f"\n[错误] 处理 {current_name} 时发生异常: {exc}")
error_list.append(current_name)