python處理多線程請(qǐng)求接口結(jié)果順序的4種方案
除了“無序收集+統(tǒng)一排序”的方案一,處理多線程請(qǐng)求接口結(jié)果順序的核心思路是 “確保結(jié)果與請(qǐng)求提交順序?qū)R” ,以下是 4 種實(shí)用方案(含進(jìn)階優(yōu)化和第三方庫方案),覆蓋不同場景需求,且均保證線程安全和并發(fā)效率
一、方案一:固定位置存儲(chǔ)(無排序,高效實(shí)時(shí))
核心邏輯
提前創(chuàng)建一個(gè)與請(qǐng)求總數(shù)長度一致的結(jié)果列表,每個(gè)線程攜帶唯一的“請(qǐng)求索引”,執(zhí)行完成后直接將結(jié)果寫入列表的對(duì)應(yīng)索引位置(如任務(wù) 5 的結(jié)果寫入 ??results[5]??)。由于索引與提交順序一一對(duì)應(yīng),所有線程完成后,列表自然是有序的。
關(guān)鍵優(yōu)勢
- 無需后續(xù)排序,效率最高(省去排序開銷);
- 可實(shí)時(shí)查看每個(gè)任務(wù)的執(zhí)行狀態(tài)(通過列表非 ?
?None?? 的位置判斷); - 僅需對(duì)“列表寫入”加鎖,鎖粒度極小,不影響并發(fā)。
完整代碼
import requests
import threading
import time
from typing import List
API_URL = "https://jsonplaceholder.typicode.com/posts/{}"
THREAD_NUM = 10
TOTAL_REQUESTS = 20
TIMEOUT = 5
# 1. 初始化:固定長度的結(jié)果列表(與請(qǐng)求順序?qū)?yīng))+ 互斥鎖(保護(hù)列表寫入)
results: List[tuple] = [None] * TOTAL_REQUESTS # 初始值為 None,完成后寫入結(jié)果
lock = threading.Lock() # 線程安全:避免多線程同時(shí)修改同一列表位置
def request_api(index: int):
"""線程執(zhí)行函數(shù):按索引寫入結(jié)果到固定位置"""
url = API_URL.format(index % 10 + 1)
try:
response = requests.get(url, timeout=TIMEOUT)
response.raise_for_status()
result = (index, True, f"響應(yīng):{response.json()['title'][:20]}...")
except Exception as e:
result = (index, False, f"失敗:{str(e)[:30]}")
# 2. 加鎖寫入結(jié)果(僅鎖定寫入操作,不影響請(qǐng)求并發(fā))
with lock:
results[index] = result # 關(guān)鍵:按請(qǐng)求索引寫入對(duì)應(yīng)位置
if __name__ == "__main__":
start_time = time.time()
threads = []
# 3. 創(chuàng)建并啟動(dòng)線程(控制線程池大小)
for i in range(TOTAL_REQUESTS):
# 限制同時(shí)運(yùn)行的線程數(shù),避免創(chuàng)建過多線程
if len(threads) >= THREAD_NUM:
# 等待任意線程完成后再創(chuàng)建新線程
threading.Thread.join(threading.Thread.wait(threads))
threads = [t for t in threads if t.is_alive()]
t = threading.Thread(target=request_api, args=(i,), name=f"Thread-{i}")
t.start()
threads.append(t)
# 4. 等待所有線程完成
for t in threads:
t.join()
# 5. 直接按列表順序輸出(已與提交順序一致)
print("固定位置存儲(chǔ) - 按請(qǐng)求順序輸出:")
for idx, is_success, msg in results:
print(f"任務(wù)[{idx}]:{'?' if is_success else '?'} {msg}")
print(f"\n總耗時(shí):{round(time.time() - start_time, 3)}s")
二、方案二:隊(duì)列(Queue)流式有序處理
核心邏輯
用兩個(gè)線程安全的隊(duì)列:
- 任務(wù)隊(duì)列:按提交順序存入所有請(qǐng)求索引(0、1、2...);
- 結(jié)果隊(duì)列:線程執(zhí)行完成后,將(索引+結(jié)果)存入隊(duì)列;
- 主線程按“期望索引”(從 0 開始)循環(huán)檢查結(jié)果隊(duì)列,只輸出當(dāng)前期望的索引結(jié)果,非期望結(jié)果放回隊(duì)列,直到所有任務(wù)完成。
關(guān)鍵優(yōu)勢
- 支持流式輸出:無需等待所有任務(wù)完成,可實(shí)時(shí)按順序打印結(jié)果;
- 隊(duì)列自帶線程安全(?
?queue.Queue?? 內(nèi)部已實(shí)現(xiàn)鎖),無需手動(dòng)加鎖; - 適合實(shí)時(shí)展示進(jìn)度(如批量操作時(shí)實(shí)時(shí)打印日志)。
完整代碼
import requests
import threading
from queue import Queue
import time
API_URL = "https://jsonplaceholder.typicode.com/posts/{}"
THREAD_NUM = 10
TOTAL_REQUESTS = 20
TIMEOUT = 5
def worker(task_queue: Queue, result_queue: Queue):
"""工作線程:從任務(wù)隊(duì)列取任務(wù),執(zhí)行后存入結(jié)果隊(duì)列"""
while not task_queue.empty():
try:
index = task_queue.get(timeout=1) # 非阻塞取任務(wù)(超時(shí)1秒退出)
url = API_URL.format(index % 10 + 1)
try:
response = requests.get(url, timeout=TIMEOUT)
response.raise_for_status()
result = (index, True, f"響應(yīng):{response.json()['title'][:20]}...")
except Exception as e:
result = (index, False, f"失?。簕str(e)[:30]}")
result_queue.put(result) # 存入結(jié)果隊(duì)列(無序)
except Exception:
break
if __name__ == "__main__":
start_time = time.time()
# 1. 初始化隊(duì)列
task_queue = Queue() # 按順序存入請(qǐng)求索引(0~19)
result_queue = Queue() # 存儲(chǔ)無序的結(jié)果
# 2. 提交任務(wù)(按順序入隊(duì))
for i in range(TOTAL_REQUESTS):
task_queue.put(i)
# 3. 啟動(dòng)工作線程
threads = [threading.Thread(target=worker, args=(task_queue, result_queue)) for _ in range(THREAD_NUM)]
for t in threads:
t.start()
# 4. 主線程按順序提取結(jié)果(流式輸出)
print("隊(duì)列流式處理 - 按請(qǐng)求順序?qū)崟r(shí)輸出:")
expected_index = 0 # 期望的下一個(gè)任務(wù)索引(從0開始)
completed = 0 # 已完成的任務(wù)數(shù)
while completed < TOTAL_REQUESTS:
if not result_queue.empty():
index, is_success, msg = result_queue.get()
# 匹配期望索引則輸出,否則放回隊(duì)列
if index == expected_index:
print(f"任務(wù)[{index}]:{'?' if is_success else '?'} {msg}")
expected_index += 1
completed += 1
else:
result_queue.put((index, is_success, msg)) # 未匹配則放回
else:
time.sleep(0.01) # 避免空循環(huán)占用CPU
# 5. 等待所有線程結(jié)束
for t in threads:
t.join()
print(f"\n總耗時(shí):{round(time.time() - start_time, 3)}s")
三、方案三:使用 ??concurrent.futures?? + 有序結(jié)果收集
核心邏輯
??ThreadPoolExecutor?? 提交任務(wù)后會(huì)返回一個(gè) ??Future?? 列表,該列表的順序與提交順序一致(即使任務(wù)完成順序無序)。通過遍歷 ??Future?? 列表(而非 ??as_completed??),直接調(diào)用 ??future.result()??,會(huì)按提交順序阻塞等待每個(gè)任務(wù)完成,從而自然得到有序結(jié)果。
關(guān)鍵優(yōu)勢
- 基于 Python 標(biāo)準(zhǔn)庫,代碼簡潔(無需手動(dòng)管理線程/鎖/隊(duì)列);
- 本質(zhì)是“按提交順序等待結(jié)果”,無需排序或額外存儲(chǔ);
- 適合需要“逐個(gè)按順序處理結(jié)果”且不想寫復(fù)雜邏輯的場景。
完整代碼
import requests
from concurrent.futures import ThreadPoolExecutor
import time
API_URL = "https://jsonplaceholder.typicode.com/posts/{}"
THREAD_NUM = 10
TOTAL_REQUESTS = 20
TIMEOUT = 5
def request_api(index: int) -> tuple:
"""單個(gè)請(qǐng)求函數(shù):返回(索引,是否成功,結(jié)果信息)"""
url = API_URL.format(index % 10 + 1)
try:
response = requests.get(url, timeout=TIMEOUT)
response.raise_for_status()
return (index, True, f"響應(yīng):{response.json()['title'][:20]}...")
except Exception as e:
return (index, False, f"失?。簕str(e)[:30]}")
if __name__ == "__main__":
start_time = time.time()
# 1. 提交任務(wù)并獲取 Future 列表(順序與提交一致)
with ThreadPoolExecutor(max_workers=THREAD_NUM) as executor:
future_list = [executor.submit(request_api, i) for i in range(TOTAL_REQUESTS)]
# 2. 按 Future 列表順序獲取結(jié)果(阻塞等待,順序與提交一致)
print("ThreadPoolExecutor 有序收集 - 按請(qǐng)求順序輸出:")
for future in future_list:
index, is_success, msg = future.result() # 按提交順序阻塞等待
print(f"任務(wù)[{index}]:{'?' if is_success else '?'} {msg}")
total_cost = round(time.time() - start_time, 3)
print(f"\n總耗時(shí):{total_cost}s")
注意
- 該方案的“有序”是通過“按提交順序等待每個(gè)任務(wù)完成”實(shí)現(xiàn)的,不會(huì)降低并發(fā)效率(任務(wù)仍在后臺(tái)并行執(zhí)行,只是結(jié)果獲取順序固定);
- 若前一個(gè)任務(wù)未完成,會(huì)阻塞等待,直到其完成后再獲取下一個(gè)任務(wù)的結(jié)果,適合需要“逐個(gè)處理結(jié)果”的場景(如按順序?qū)懭霐?shù)據(jù)庫)。
四、方案四:第三方庫 ??aiohttp??(異步并發(fā)+有序結(jié)果)
核心邏輯
雖然是“異步”而非“多線程”,但 ??aiohttp?? 是 IO 密集型接口請(qǐng)求的更優(yōu)選擇(單線程異步并發(fā),無 GIL 影響,效率更高),且天然支持有序結(jié)果——異步任務(wù)的提交順序與結(jié)果返回順序一致。
關(guān)鍵優(yōu)勢
- 并發(fā)效率高于多線程(無線程切換開銷);
- 代碼簡潔,無需處理線程安全問題;
- 適合高并發(fā)接口請(qǐng)求場景(如批量爬取、接口壓測)。
完整代碼
import aiohttp
import asyncio
import time
API_URL = "https://jsonplaceholder.typicode.com/posts/{}"
TOTAL_REQUESTS = 20
TIMEOUT = 5
async def request_api(session: aiohttp.ClientSession, index: int) -> tuple:
"""異步請(qǐng)求函數(shù)"""
url = API_URL.format(index % 10 + 1)
try:
async with session.get(url, timeout=TIMEOUT) as response:
response.raise_for_status()
data = await response.json()
return (index, True, f"響應(yīng):{data['title'][:20]}...")
except Exception as e:
return (index, False, f"失敗:{str(e)[:30]}")
async def main():
start_time = time.time()
# 1. 創(chuàng)建異步會(huì)話(復(fù)用連接,提升效率)
async with aiohttp.ClientSession() as session:
# 2. 創(chuàng)建所有異步任務(wù)(順序與提交一致)
tasks = [request_api(session, i) for i in range(TOTAL_REQUESTS)]
# 3. 并發(fā)執(zhí)行任務(wù),按提交順序獲取結(jié)果
results = await asyncio.gather(*tasks) # gather 保證結(jié)果順序與任務(wù)順序一致
# 4. 輸出結(jié)果(已有序)
print("aiohttp 異步并發(fā) - 按請(qǐng)求順序輸出:")
for idx, is_success, msg in results:
print(f"任務(wù)[{idx}]:{'?' if is_success else '?'} {msg}")
total_cost = round(time.time() - start_time, 3)
print(f"\n總耗時(shí):{total_cost}s")
if __name__ == "__main__":
# 兼容 Windows 系統(tǒng)
if __name__ == "__main__":
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.run(main())
依賴安裝
pip install aiohttp
各方案對(duì)比&選型建議
| 方案 | 技術(shù)依賴 | 核心優(yōu)勢 | 適用場景 |
|---|---|---|---|
| 固定位置存儲(chǔ) | threading + lock | 效率最高、可實(shí)時(shí)查進(jìn)度 | 高并發(fā)、需跟蹤任務(wù)狀態(tài) |
| 隊(duì)列流式處理 | threading + Queue | 流式輸出、無需等待所有任務(wù) | 實(shí)時(shí)展示進(jìn)度、邊請(qǐng)求邊處理 |
| ThreadPoolExecutor 有序收集 | concurrent.futures | 代碼最簡單、標(biāo)準(zhǔn)庫支持 | 無需實(shí)時(shí)輸出、按順序處理結(jié)果 |
| aiohttp 異步并發(fā) | aiohttp + asyncio | 并發(fā)效率最高、無線程安全問題 | 高并發(fā)接口請(qǐng)求、爬取、壓測 |
面試&實(shí)戰(zhàn)關(guān)鍵要點(diǎn)
線程安全是前提:修改共享數(shù)據(jù)(如列表)必須加鎖,或使用線程安全的數(shù)據(jù)結(jié)構(gòu)(如 ??queue.Queue??);
有序的核心是“索引綁定” :無論哪種方案,都需要通過“請(qǐng)求索引”關(guān)聯(lián)任務(wù)和結(jié)果,確保順序?qū)R;
IO 密集型優(yōu)先選異步:??aiohttp?? 異步并發(fā)效率高于多線程,且天然有序,是接口請(qǐng)求的最優(yōu)解;
避免過度設(shè)計(jì):簡單場景用 ??ThreadPoolExecutor?? 有序收集(方案四),復(fù)雜場景用隊(duì)列或固定位置存儲(chǔ),無需追求復(fù)雜邏輯。
到此這篇關(guān)于python處理多線程請(qǐng)求接口結(jié)果順序的4種方案的文章就介紹到這了,更多相關(guān)python處理多線程請(qǐng)求接口結(jié)果順序內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
python使用OpenCV獲取高動(dòng)態(tài)范圍成像HDR
這篇文章主要介紹了python使用OpenCV獲取高動(dòng)態(tài)范圍成像HDR,如何使用不同曝光設(shè)置拍攝的多張圖像創(chuàng)建高動(dòng)態(tài)范圍圖像HDR,下文嗎更詳細(xì)的內(nèi)容介紹,需要的小伙伴可以參考一下2022-04-04
Python實(shí)現(xiàn)一鍵自動(dòng)分類管理文件
經(jīng)常雜亂無章的文件夾會(huì)讓我們找不到所想要的文件,所以本文小編特意為大家介紹了如何制作一個(gè)可視化GUI界面,通過輸入路徑一鍵點(diǎn)擊實(shí)現(xiàn)文件分門別類的歸檔,希望對(duì)大家有所幫助<BR>2024-01-01
在django view中給form傳入?yún)?shù)的例子
今天小編就為大家分享一篇在django view中給form傳入?yún)?shù)的例子,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2019-07-07
python判斷端口是否打開的實(shí)現(xiàn)代碼
python判斷端口是否打開的代碼,有需要的朋友可以參考下2013-02-02
python pandas 時(shí)間日期的處理實(shí)現(xiàn)
這篇文章主要介紹了python pandas 時(shí)間日期的處理實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-07-07
使用Python實(shí)現(xiàn)計(jì)算DICOM圖像兩點(diǎn)真實(shí)距離
這篇文章主要為大家詳細(xì)介紹了如何使用Python實(shí)現(xiàn)計(jì)算DICOM圖像兩點(diǎn)真實(shí)距離,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2023-11-11
Python+OpenCV圖片局部區(qū)域像素值處理詳解
這篇文章主要為大家詳細(xì)介紹了Python+OpenCV圖片局部區(qū)域像素值處理,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2019-01-01
python client使用http post 到server端的代碼
python client使用 http post 到server端的代碼,供大家學(xué)習(xí)參考2013-02-02

