python后端流式處理LLM響應(yīng)數(shù)據(jù)使用解讀
本文檔詳細(xì)描述了我們?nèi)绾瓮ㄟ^流式處理(Streaming)來優(yōu)化大語言模型(LLM)的響應(yīng),以提升用戶體驗。我們將通過代碼實現(xiàn)和性能對比,來展示流式處理的優(yōu)勢。
實際效果
先
1、問題背景
在傳統(tǒng)的請求-響應(yīng)模式中,客戶端向服務(wù)器發(fā)送一個請求,然后必須等待服務(wù)器完全處理完該請求并生成完整的響應(yīng)后,才能接收到數(shù)據(jù)。
對于 LLM 應(yīng)用來說,這意味著用戶在輸入問題后,需要等待模型生成完整的答案(可能需要幾十秒),這期間界面沒有任何反饋,用戶體驗較差。
2、解決方案:流式處理
為了解決這個問題,我們采用了流式處理技術(shù)。
其核心思想是,服務(wù)器不再一次性地返回完整的響應(yīng),而是將響應(yīng)分割成多個小的數(shù)據(jù)塊(chunks),并逐個地、實時地發(fā)送給客戶端。
在我們的實現(xiàn)中,我們使用了 FastAPI 的 StreamingResponse,它允許我們創(chuàng)建一個異步生成器,該生成器可以持續(xù)地 yield 數(shù)據(jù)塊,直到響應(yīng)完全結(jié)束。
2.1. 后端實現(xiàn)
我們的后端實現(xiàn)主要涉及以下幾個部分:
deepseek_chat_model.py:負(fù)責(zé)初始化 ChatDeepSeek 模型。llm_service.py:封裝了與 LLM 模型的交互。chat_router.py:定義了 FastAPI 路由和流式生成器。
src/llm/deepseek_chat_model.py
這個文件負(fù)責(zé)從配置文件中讀取 API 密鑰和基礎(chǔ) URL,并初始化 ChatDeepSeek 模型。
from langchain_deepseek import ChatDeepSeek
from src.configs.config import yaml_configs
def get_deepseek_llm():
"""
Initializes and returns a ChatDeepSeek instance.
"""
api_key = yaml_configs["deepseek"]["api-key"]
base_url = yaml_configs["deepseek"]["base-url"]
llm = ChatDeepSeek(
model="deepseek-chat",
temperature=0.2,
max_tokens=None,
api_key=api_key,
base_url=base_url,
)
return llm
```
2.1.2. src/services/llm_service.py
這個服務(wù)類封裝了與 LLM 模型的交互,提供了 ainvoke(非流式)和 astream(流式)兩種方法。
```python
from langchain_core.language_models import BaseChatModel
from loguru import logger
class LLMService:
def __init__(self, llm: BaseChatModel):
logger.info("Initializing LLMService...")
self.llm = llm
logger.info("LLMService initialized.")
async def ainvoke(self, prompt: str):
logger.info(f"LLMService ainvoking with prompt: {prompt}")
response = await self.llm.ainvoke(prompt)
logger.info("LLMService ainvocation complete.")
return response
def astream(self, prompt: str):
"""Streams the response from the LLM."""
logger.info(f"LLMService astreaming with prompt: {prompt}")
return self.llm.astream(prompt)
src/routers/chat_router.py
這是我們的 FastAPI 路由,它定義了 /api/v1/chat 這個端點。它使用 Depends 來注入 LLMService,并通過一個異步生成器 stream_generator 來調(diào)用 LLMService 的 astream 方法,然后將返回的數(shù)據(jù)塊逐個地 yield 給客戶端。
from fastapi import APIRouter, Depends
from starlette.responses import StreamingResponse
from src.llm.deepseek_chat_model import get_deepseek_llm
from src.services.llm_service import LLMService
# ...
def get_llm_service():
"""Dependency to get a singleton instance of LLMService."""
try:
deepseek_model = get_deepseek_llm()
return LLMService(llm=deepseek_model)
except Exception as e:
logger.error(f"Failed to initialize LLM service for dependency: {e}")
return None
# ...
async def stream_generator(prompt: str, llm_service: LLMService):
"""Async generator that yields response chunks from the LLM stream."""
# ...
try:
# Call the astream method on the service
llm_stream = llm_service.astream(prompt)
# Iterate over the stream and yield each chunk to the client
async for chunk in llm_stream:
if hasattr(chunk, 'content') and chunk.content:
yield f"data: {chunk.content}\n\n"
# ...
@router.post("/chat")
async def chat(request: ChatRequest, llm_service: LLMService = Depends(get_llm_service)):
# ...
return StreamingResponse(stream_wrapper(), media_type="text/event-stream")
2.2. yield 的作用:流式處理的核心
在我們的實現(xiàn)中,yield 關(guān)鍵字是實現(xiàn)流式處理的核心。它在兩個關(guān)鍵位置發(fā)揮作用:
在 stream_generator 中:
yield f"data: {chunk.content}\n\n"
在 StreamingResponse 中: FastAPI 在內(nèi)部處理 stream_wrapper 生成器時,也是通過 yield 來逐塊發(fā)送數(shù)據(jù)。
專業(yè)解釋
當(dāng)一個函數(shù)包含 yield 關(guān)鍵字時,它就不再是一個普通的函數(shù),而是一個生成器(Generator)。
yield 在 stream_generator 中:
- 當(dāng)我們調(diào)用 stream_generator 時,它不會立即執(zhí)行函數(shù)體,而是返回一個生成器對象。
- async for chunk in llm_stream: 循環(huán)每次從 LLM 模型獲取一個數(shù)據(jù)塊(chunk)。
yield f"data: {chunk.content}\n\n"
這行代碼的作用是:暫停函數(shù)的執(zhí)行,并將 f"data: {chunk.content}\n\n" 這個值作為當(dāng)前迭代的結(jié)果發(fā)送出去。
當(dāng)外部代碼(在這里是 stream_wrapper)請求下一個值時,stream_generator 會從上次暫停的地方恢復(fù)執(zhí)行,直到遇到下一個 yield 或函數(shù)結(jié)束。
StreamingResponse 如何使用 yield:
- FastAPI 的 StreamingResponse 接收一個生成器(在我們的例子中是 stream_wrapper)。
- 它在內(nèi)部迭代這個生成器。每當(dāng) stream_generator yield 一個數(shù)據(jù)塊時,StreamingResponse 就會立即將這個數(shù)據(jù)塊通過 HTTP 連接發(fā)送給客戶端,而不會等待整個響應(yīng)生成完畢。
- 這個過程會一直持續(xù),直到生成器執(zhí)行完畢,此時 StreamingResponse 會關(guān)閉 HTTP 連接。
- 這個機(jī)制實現(xiàn)了服務(wù)器和客戶端之間的持續(xù)通信通道,數(shù)據(jù)可以源源不斷地從服務(wù)器流向客戶端。
通俗解釋
我們可以把這個過程比作看一場正在直播的足球比賽。
非流式處理(傳統(tǒng)方式):
這就像是等待比賽結(jié)束后,一次性地觀看完整的比賽錄像。你必須等到終場哨聲吹響,電視臺把90分鐘的比賽全部錄制打包好,然后才能開始觀看。在等待的90分鐘里,你什么也看不到,只能干等。
流式處理(我們的實現(xiàn)):
這就像是直接觀看電視直播。
- yield 在 stream_generator 中:可以看作是解說員。他不會等到比賽結(jié)束才說話,而是每當(dāng)有精彩瞬間(比如進(jìn)球、犯規(guī)、換人),他就會立即 yield 一句解說詞(一個數(shù)據(jù)塊)。
- StreamingResponse:可以看作是電視臺。它不會把所有解說詞都攢起來,而是一旦收到解說員 yield 的一句話,就立刻通過電視信號(HTTP 連接)把它廣播給千家萬戶(客戶端)。
- 你(客戶端):坐在電視機(jī)前,幾乎是實時地聽到解說員的每一句話,看到球場上的每一個動作。雖然整場比賽(完整的 LLM 響應(yīng))還沒結(jié)束,但你已經(jīng)可以持續(xù)地獲取信息,體驗非常好。
通過這種方式,yield 就像一個聰明的“暫停-繼續(xù)”按鈕,它讓服務(wù)器能夠“邊說邊送”,而不是“說完再說”,從而實現(xiàn)了流暢的流式體驗。
2.3. 前端實現(xiàn)
在前端,我們可以使用 fetch API 或 EventSource API 來接收服務(wù)器發(fā)送的流式數(shù)據(jù)。
每當(dāng)接收到一個新的數(shù)據(jù)塊時,我們就可以立即將其追加到界面上,從而實現(xiàn)打字機(jī)一樣的效果。
3、性能對比:流式 vs. 非流式
為了量化流式處理的優(yōu)勢,我們編寫了一個 pytest 測試用例,分別測試了 invoke(非流式)和 stream(流式)兩種模式下的響應(yīng)時間。
test/services/test_deepseek_chat_model.py:
def test_deepseek_chat_model_invoke():
# ...
start_time = time.time()
result = model.invoke(prompt)
end_time = time.time()
response_time = end_time - start_time
logger.info(f"Invoke mode - Total response time: {response_time:.4f} seconds")
# ...
def test_deepseek_chat_model_stream():
# ...
start_time = time.time()
stream = model.stream(prompt)
# ...
for chunk in stream:
if not first_chunk_received:
time_to_first_chunk = time.time() - start_time
logger.info(f"Stream mode - Time to first chunk: {time_to_first_chunk:.4f} seconds")
first_chunk_received = True
# ...
total_stream_time = time.time() - start_time
logger.info(f"Stream mode - Total response time: {total_stream_time:.4f} seconds")
# ...
3.1. 測試結(jié)果
我們的測試結(jié)果非常清晰地展示了流式處理的優(yōu)勢:
非流式(Invoke)模式:
- 總響應(yīng)時間:7.31 秒
流式(Stream)模式:
- 首個數(shù)據(jù)塊響應(yīng)時間:1.03 秒
- 總響應(yīng)時間:30.93 秒
3.2. 結(jié)果分析
從測試結(jié)果中我們可以看到:
- 用戶感知延遲:在流式模式下,用戶在 1.03 秒 內(nèi)就看到了第一個數(shù)據(jù)塊,這給了用戶一個“系統(tǒng)正在工作”的即時反饋。而在非流式模式下,用戶需要等待 7.31 秒 才能看到任何內(nèi)容。
- 總響應(yīng)時間:流式處理的總時間(30.93 秒)比非流式處理(7.31 秒)要長。這是因為流式處理需要逐塊地生成和傳輸數(shù)據(jù),而 invoke 模式則是一次性地在服務(wù)器端生成所有內(nèi)容。
總結(jié)
盡管流式處理的總時間更長,但它通過顯著降低首次響應(yīng)時間,極大地提升了用戶體驗。對于任何需要與 LLM 進(jìn)行交互的應(yīng)用來說,流式處理都是一個必不可少的優(yōu)化手段。
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
用python生成與調(diào)用cntk模型代碼演示方法
今天小編就為大家分享一篇用python生成與調(diào)用cntk模型代碼演示方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-08-08
Python數(shù)據(jù)清洗之抽取jsonl文件數(shù)據(jù)字段并合并
這篇文章主要為大家詳細(xì)介紹了Python數(shù)據(jù)清洗之抽取jsonl文件數(shù)據(jù)字段并合并的相關(guān)知識,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以了解下2025-03-03
Python多進(jìn)程multiprocessing.Pool類詳解
這篇文章主要為大家詳細(xì)介紹了Python多進(jìn)程multiprocessing.Pool類,具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-04-04
基于Python實現(xiàn)excel表格數(shù)據(jù)一鍵轉(zhuǎn)json格式小工具
這篇文章主要為大家詳細(xì)介紹了如何使用Python開發(fā)excel一鍵轉(zhuǎn)json小工具,實現(xiàn)任意選中excel的xlsx和xls文件轉(zhuǎn)化成json文件,并把結(jié)果顯示在界面中,需要的可以了解下2025-09-09
Python?Requests使用Cookie的幾種方式詳解
這篇文章主要給大家介紹了關(guān)于Python?Requests使用Cookie的幾種方式,Python中的requests庫可以使用cookie來維持會話狀態(tài),實現(xiàn)登錄等操作,需要的朋友可以參考下2023-07-07
python實現(xiàn)網(wǎng)站用戶名密碼自動登錄功能
最近接到這樣的需求通過網(wǎng)頁用戶認(rèn)證登錄實現(xiàn)上網(wǎng),如何實現(xiàn)網(wǎng)站自動登錄功能呢,接下來小編給大家?guī)砹藀ython實現(xiàn)網(wǎng)站用戶名密碼自動登錄功能,需要的朋友可以參考下2019-08-08
對pandas進(jìn)行數(shù)據(jù)預(yù)處理的實例講解
下面小編就為大家分享一篇對pandas進(jìn)行數(shù)據(jù)預(yù)處理的實例講解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-04-04

