if __name__ == "__main__": thread_name = threading.current_thread() print("current thread:",thread_name) for i inrange(20): t = threading.Thread(target=task) t.start()
(venv) D:\code\PYTHON>d:/code/PYTHON/venv/Scripts/python.exe d:/code/PYTHON/src/multi_thread/验证多线程执行无序.py current thread: <_MainThread(MainThread, started 20140)> <Thread(Thread-1 (task), started 5608)> <Thread(Thread-3 (task), started 5192)> <Thread(Thread-4 (task), started 11116)> <Thread(Thread-2 (task), started 18808)> <Thread(Thread-6 (task), started 13096)> <Thread(Thread-5 (task), started 11696)> <Thread(Thread-7 (task), started 20192)> <Thread(Thread-9 (task), started 9732)> <Thread(Thread-8 (task), started 2900)> <Thread(Thread-10 (task), started 21060)> <Thread(Thread-12 (task), started 18512)> <Thread(Thread-11 (task), started 21540)> <Thread(Thread-13 (task), started 1544)> <Thread(Thread-14 (task), started 18260)> <Thread(Thread-15 (task), started 15168)> <Thread(Thread-16 (task), started 19872)> <Thread(Thread-17 (task), started 21984)> <Thread(Thread-18 (task), started 19772)> <Thread(Thread-19 (task), started 12420)> <Thread(Thread-20 (task), started 19504)>
from qcloud_cos.cos_threadpool import SimpleThreadPool pool = SimpleThreadPool() for file in file_infos: pool.add_task(self.client.download_file, bucket_name, file_cos_key, localName) pool.wait_completion()
# 如果有对应的cosutil实例,清除它 if account_name in cls._cos_util_instances: del cls._cos_util_instances[account_name] else: cls._cos_util_instances[account_name]=client
# 更新桶名到账号名称的映射 for bucket in buckets: cls._bucket_account_map[bucket] = account_name print(f"Successfully loaded account: {account_name}") returnTrue except Exception as e: print(f"Error loading config file {config_path}: {e}") returnFalse
print('Process (%s) start...' % os.getpid()) # Only works on Unix/Linux/macOS: pid = os.fork() if pid == 0: print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid())) else: print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
1 2 3 4
Process (876) start... I (876) just created a child process (877). I am child process (877) and my parent is 876. 由于Windows没有fork调用,上面的代码在Windows上无法运行。
concurrent.futures 提供了两个主要的执行器类:ThreadPoolExecutor 和 ProcessPoolExecutor。前者适用于 I/O 密集型任务,后者适用于 CPU 密集型任务。
1 2 3 4 5 6 7 8 9 10
from concurrent.futures import ThreadPoolExecutor import time def task(n): time.sleep(n) return f"Task {n} completed" with ThreadPoolExecutor(max_workers=3) as executor: futures = [executor.submit(task, i) for i in range(5)] for future in futures: print(future.result()) 此代码创建了一个线程池,最大并发线程数为 3。通过 submit 方法提交任务,并通过 result() 获取任务结果。
线程池 (ThreadPoolExecutor):适合 I/O 密集型任务,如文件操作或网络请求。 进程池 (ProcessPoolExecutor):适合 CPU 密集型任务,如复杂计算或数据处理。
1 2 3 4 5 6
from concurrent.futures import ProcessPoolExecutor def compute(n): return sum(i * i for i in range(n)) with ProcessPoolExecutor(max_workers=4) as executor: results = list(executor.map(compute, [10**6] * 5)) print(results)
1 2 3 4 5 6 7 8 9 10 11 12 13 14
from concurrent.futures import ThreadPoolExecutor, TimeoutError def task(n): if n == 2: raise ValueError("Task failed") return f"Task {n} completed" with ThreadPoolExecutor(max_workers=3) as executor: futures = [executor.submit(task, i) for i in range(5)] for future in futures: try: print(future.result(timeout=2)) except TimeoutError: print("Task timed out") except Exception as e: print(f"Task raised an exception: {e}")
# 使用ProcessPoolExecutor with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor: # 提交所有任务 future_to_task = { executor.submit(process_task, task,cos_utils,config):task for task in task_list }
iflen(success_list)>0: withopen('succcess.txt','w',encoding='utf-8') as f: for success_name in success_list: f.write(f"{success_name}:success+\n") print(f"成功任务列表已保存到:success.txt (共{len(success_list)}个)") # 如果有失败的任务,保存到文件 if failure_list or exception_list: failed_tasks = [] withopen('failed_tasks.txt', 'w', encoding='utf-8') as f: for task_name, error in failure_list: f.write(f"{task_name}: {error}\n") failed_tasks.append(task_name) for task_name, error in exception_list: f.write(f"{task_name}: {error}\n") failed_tasks.append(task_name) print(f"💾 失败任务列表已保存到: failed_tasks.txt (共{len(failed_tasks)}个)")
for future in concurrent.futures.as_completed(future_to_task): as_completed():返回一个迭代器,哪个任务先完成就先返回哪个 不按提交顺序,而是按完成顺序处理结果
success = future.result() # 获取任务执行结果
1 2 3 4 5 6 7 8 9 10 11 12 13
with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor: future_to_name = { executor.submit(single_run, name, clipvistor, input_dic): name for name in clip_names } for future in tqdm(concurrent.futures.as_completed(future_to_name), total=len(future_to_name), desc="处理进度"): current_name = future_to_name[future] try: future.result() except Exception as exc: print(f"\n[错误] 处理 {current_name} 时发生异常: {exc}") error_list.append(current_name)