淺談Rust中錯誤處理與響應構建
Rust作為一門系統(tǒng)編程語言,以其內存安全和零成本抽象而聞名。在錯誤處理方面,Rust采用了獨特而強大的機制,摒棄了傳統(tǒng)的異常處理方式,轉而使用類型系統(tǒng)來強制開發(fā)者顯式地處理錯誤。本文將深入探討Rust中的錯誤處理機制、最佳實踐以及如何構建健壯的錯誤響應系統(tǒng)。
第一部分:Rust錯誤處理基礎
1.1 Result類型:錯誤處理的核心
在Rust中,Result<T, E>是錯誤處理的基石。它是一個枚舉類型,定義如下:
enum Result<T, E> {
Ok(T),
Err(E),
}
這個設計迫使開發(fā)者必須處理可能出現(xiàn)的錯誤,編譯器會檢查是否所有的錯誤情況都得到了處理。
基本使用示例:
use std::fs::File;
use std::io::Read;
fn read_file_content(path: &str) -> Result<String, std::io::Error> {
let mut file = File::open(path)?;
let mut content = String::new();
file.read_to_string(&mut content)?;
Ok(content)
}
fn main() {
match read_file_content("example.txt") {
Ok(content) => println!("文件內容: {}", content),
Err(e) => eprintln!("讀取文件錯誤: {}", e),
}
}
1.2 Option類型:處理可能不存在的值
Option<T>用于表示值可能存在也可能不存在的情況:
enum Option<T> {
Some(T),
None,
}
實際應用場景:
fn find_user_by_id(id: u32, users: &Vec<User>) -> Option<&User> {
users.iter().find(|user| user.id == id)
}
fn get_user_email(id: u32, users: &Vec<User>) -> Option<String> {
find_user_by_id(id, users)
.and_then(|user| user.email.clone())
}
1.3 ?操作符:簡化錯誤傳播
?操作符是Rust中最優(yōu)雅的錯誤處理特性之一。它會自動進行錯誤傳播,如果遇到錯誤就提前返回。
fn process_data() -> Result<(), Box<dyn std::error::Error>> {
let data = fetch_data()?;
let parsed = parse_data(&data)?;
let validated = validate_data(parsed)?;
save_data(validated)?;
Ok(())
}
第二部分:自定義錯誤類型
2.1 實現(xiàn)std::error::Error trait
創(chuàng)建自定義錯誤類型需要實現(xiàn)std::error::Error trait:
use std::fmt;
use std::error::Error;
#[derive(Debug)]
enum DatabaseError {
ConnectionFailed(String),
QueryFailed(String),
DataNotFound(String),
InvalidData(String),
}
impl fmt::Display for DatabaseError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
DatabaseError::ConnectionFailed(msg) =>
write!(f, "數(shù)據(jù)庫連接失敗: {}", msg),
DatabaseError::QueryFailed(msg) =>
write!(f, "查詢執(zhí)行失敗: {}", msg),
DatabaseError::DataNotFound(msg) =>
write!(f, "數(shù)據(jù)未找到: {}", msg),
DatabaseError::InvalidData(msg) =>
write!(f, "數(shù)據(jù)格式無效: {}", msg),
}
}
}
impl Error for DatabaseError {}
2.2 使用thiserror庫簡化錯誤定義
thiserror是一個流行的庫,可以大幅簡化錯誤類型的定義:
use thiserror::Error;
#[derive(Error, Debug)]
pub enum AppError {
#[error("數(shù)據(jù)庫錯誤: {0}")]
Database(#[from] sqlx::Error),
#[error("IO錯誤: {0}")]
Io(#[from] std::io::Error),
#[error("序列化錯誤: {0}")]
Serialization(#[from] serde_json::Error),
#[error("驗證失敗: {field} - {message}")]
Validation {
field: String,
message: String,
},
#[error("未授權訪問")]
Unauthorized,
#[error("資源未找到: {0}")]
NotFound(String),
}
2.3 錯誤上下文與錯誤鏈
使用anyhow庫可以輕松添加錯誤上下文:
use anyhow::{Context, Result};
fn load_config() -> Result<Config> {
let content = std::fs::read_to_string("config.toml")
.context("無法讀取配置文件 config.toml")?;
let config: Config = toml::from_str(&content)
.context("配置文件格式錯誤")?;
Ok(config)
}
fn initialize_app() -> Result<App> {
let config = load_config()
.context("應用初始化失敗")?;
let database = connect_database(&config.database_url)
.context("數(shù)據(jù)庫連接失敗")?;
Ok(App { config, database })
}
第三部分:Web應用中的錯誤處理
3.1 Actix-web框架的錯誤處理
在Actix-web中,錯誤處理需要實現(xiàn)ResponseError trait:
use actix_web::{error, http::StatusCode, HttpResponse};
use serde::Serialize;
#[derive(Debug, Serialize)]
struct ErrorResponse {
code: String,
message: String,
details: Option<Vec<String>>,
}
impl error::ResponseError for AppError {
fn error_response(&self) -> HttpResponse {
let (status, code, message) = match self {
AppError::Database(_) => (
StatusCode::INTERNAL_SERVER_ERROR,
"DATABASE_ERROR",
"數(shù)據(jù)庫操作失敗",
),
AppError::Validation { field, message } => (
StatusCode::BAD_REQUEST,
"VALIDATION_ERROR",
message.as_str(),
),
AppError::Unauthorized => (
StatusCode::UNAUTHORIZED,
"UNAUTHORIZED",
"未授權訪問",
),
AppError::NotFound(resource) => (
StatusCode::NOT_FOUND,
"NOT_FOUND",
&format!("資源未找到: {}", resource),
),
_ => (
StatusCode::INTERNAL_SERVER_ERROR,
"INTERNAL_ERROR",
"服務器內部錯誤",
),
};
HttpResponse::build(status).json(ErrorResponse {
code: code.to_string(),
message: message.to_string(),
details: None,
})
}
fn status_code(&self) -> StatusCode {
match self {
AppError::Validation { .. } => StatusCode::BAD_REQUEST,
AppError::Unauthorized => StatusCode::UNAUTHORIZED,
AppError::NotFound(_) => StatusCode::NOT_FOUND,
_ => StatusCode::INTERNAL_SERVER_ERROR,
}
}
}
3.2 中間件層的錯誤處理
實現(xiàn)全局錯誤處理中間件:
use actix_web::{
dev::{ServiceRequest, ServiceResponse},
middleware::{ErrorHandlerResponse, ErrorHandlers},
Result,
};
fn error_handler<B>(
res: ServiceResponse<B>,
) -> Result<ErrorHandlerResponse<B>> {
let status = res.status();
// 記錄錯誤日志
if status.is_server_error() {
log::error!("服務器錯誤: {} - {:?}", status, res.request().path());
} else if status.is_client_error() {
log::warn!("客戶端錯誤: {} - {:?}", status, res.request().path());
}
Ok(ErrorHandlerResponse::Response(res.map_into_left_body()))
}
// 在App中注冊
App::new()
.wrap(
ErrorHandlers::new()
.handler(StatusCode::INTERNAL_SERVER_ERROR, error_handler)
.handler(StatusCode::BAD_REQUEST, error_handler)
.handler(StatusCode::NOT_FOUND, error_handler)
)
3.3 RESTful API的錯誤響應設計
設計統(tǒng)一的API錯誤響應格式:
#[derive(Serialize, Debug)]
struct ApiResponse<T> {
success: bool,
data: Option<T>,
error: Option<ApiError>,
timestamp: i64,
}
#[derive(Serialize, Debug)]
struct ApiError {
code: String,
message: String,
details: Option<serde_json::Value>,
trace_id: Option<String>,
}
impl<T: Serialize> ApiResponse<T> {
fn success(data: T) -> Self {
Self {
success: true,
data: Some(data),
error: None,
timestamp: chrono::Utc::now().timestamp(),
}
}
fn error(code: &str, message: &str) -> ApiResponse<()> {
ApiResponse {
success: false,
data: None,
error: Some(ApiError {
code: code.to_string(),
message: message.to_string(),
details: None,
trace_id: Some(uuid::Uuid::new_v4().to_string()),
}),
timestamp: chrono::Utc::now().timestamp(),
}
}
}
// 使用示例
async fn get_user(id: web::Path<u32>) -> Result<HttpResponse, AppError> {
let user = fetch_user(*id).await?;
Ok(HttpResponse::Ok().json(ApiResponse::success(user)))
}
第四部分:高級錯誤處理模式
4.1 Result類型的組合器
Rust提供了豐富的組合器方法來處理Result:
fn process_user_data(user_id: u32) -> Result<ProcessedData, AppError> {
// map: 轉換Ok值
let user = get_user(user_id)
.map(|u| User {
name: u.name.to_uppercase(),
..u
})?;
// and_then: 鏈式調用返回Result的函數(shù)
let profile = get_user_profile(user_id)
.and_then(|p| validate_profile(p))?;
// or_else: 處理錯誤情況
let settings = get_user_settings(user_id)
.or_else(|_| Ok(UserSettings::default()))?;
// map_err: 轉換錯誤類型
let preferences = load_preferences(user_id)
.map_err(|e| AppError::Database(e.into()))?;
Ok(ProcessedData {
user,
profile,
settings,
preferences,
})
}
4.2 提前返回與錯誤恢復
實現(xiàn)優(yōu)雅的錯誤恢復策略:
async fn fetch_data_with_fallback(id: u32) -> Result<Data, AppError> {
// 嘗試從主數(shù)據(jù)源獲取
match fetch_from_primary(id).await {
Ok(data) => return Ok(data),
Err(e) => {
log::warn!("主數(shù)據(jù)源失敗: {}, 嘗試備用源", e);
}
}
// 嘗試從緩存獲取
match fetch_from_cache(id).await {
Ok(data) => {
log::info!("從緩存獲取數(shù)據(jù)成功");
return Ok(data);
}
Err(e) => {
log::warn!("緩存獲取失敗: {}", e);
}
}
// 最后嘗試從備用數(shù)據(jù)源
fetch_from_backup(id).await
.map_err(|e| {
log::error!("所有數(shù)據(jù)源都失敗");
AppError::DataUnavailable(format!("無法獲取ID為{}的數(shù)據(jù)", id))
})
}
4.3 并發(fā)錯誤處理
在異步環(huán)境中處理多個并發(fā)操作的錯誤:
use futures::future::join_all;
async fn fetch_multiple_users(
ids: Vec<u32>
) -> Result<Vec<User>, AppError> {
let futures: Vec<_> = ids
.into_iter()
.map(|id| async move {
fetch_user(id).await
})
.collect();
let results = join_all(futures).await;
// 收集所有成功的結果和錯誤
let mut users = Vec::new();
let mut errors = Vec::new();
for (idx, result) in results.into_iter().enumerate() {
match result {
Ok(user) => users.push(user),
Err(e) => errors.push((idx, e)),
}
}
if !errors.is_empty() {
log::warn!("部分用戶獲取失敗: {:?}", errors);
// 根據(jù)業(yè)務需求決定是返回部分結果還是完全失敗
if users.is_empty() {
return Err(AppError::BatchOperationFailed(
format!("所有用戶獲取都失敗了")
));
}
}
Ok(users)
}
4.4 重試機制
實現(xiàn)智能重試邏輯:
use std::time::Duration;
use tokio::time::sleep;
async fn retry_with_backoff<F, T, E>(
mut operation: F,
max_retries: u32,
initial_delay: Duration,
) -> Result<T, E>
where
F: FnMut() -> futures::future::BoxFuture<'static, Result<T, E>>,
E: std::fmt::Display,
{
let mut delay = initial_delay;
for attempt in 0..max_retries {
match operation().await {
Ok(result) => return Ok(result),
Err(e) => {
if attempt == max_retries - 1 {
log::error!("操作失敗,已達最大重試次數(shù): {}", e);
return Err(e);
}
log::warn!(
"操作失敗 (嘗試 {}/{}): {}. {}秒后重試...",
attempt + 1,
max_retries,
e,
delay.as_secs()
);
sleep(delay).await;
delay *= 2; // 指數(shù)退避
}
}
}
unreachable!()
}
// 使用示例
async fn fetch_with_retry(url: &str) -> Result<Response, reqwest::Error> {
retry_with_backoff(
|| Box::pin(reqwest::get(url)),
3,
Duration::from_secs(1),
).await
}
第五部分:錯誤日志與監(jiān)控
5.1 結構化日志記錄
使用tracing庫實現(xiàn)結構化日志:
use tracing::{error, warn, info, debug, instrument};
#[instrument(skip(db))]
async fn process_order(
order_id: u32,
db: &Database,
) -> Result<Order, AppError> {
info!(order_id, "開始處理訂單");
let order = db.get_order(order_id).await
.map_err(|e| {
error!(
error = %e,
order_id,
"獲取訂單失敗"
);
AppError::Database(e)
})?;
debug!(order_id, status = ?order.status, "訂單狀態(tài)檢查");
if !order.is_valid() {
warn!(order_id, "訂單驗證失敗");
return Err(AppError::Validation {
field: "order".to_string(),
message: "訂單數(shù)據(jù)無效".to_string(),
});
}
let processed = order.process().await
.map_err(|e| {
error!(
error = %e,
order_id,
"訂單處理失敗"
);
AppError::ProcessingFailed(e.to_string())
})?;
info!(order_id, "訂單處理完成");
Ok(processed)
}
5.2 錯誤度量與監(jiān)控
集成Prometheus進行錯誤監(jiān)控:
use prometheus::{IntCounterVec, HistogramVec, register_int_counter_vec, register_histogram_vec};
use lazy_static::lazy_static;
lazy_static! {
static ref ERROR_COUNTER: IntCounterVec = register_int_counter_vec!(
"app_errors_total",
"應用錯誤總數(shù)",
&["error_type", "severity"]
).unwrap();
static ref REQUEST_DURATION: HistogramVec = register_histogram_vec!(
"request_duration_seconds",
"請求處理時間",
&["endpoint", "status"]
).unwrap();
}
fn record_error(error: &AppError) {
let (error_type, severity) = match error {
AppError::Database(_) => ("database", "high"),
AppError::Validation { .. } => ("validation", "low"),
AppError::Unauthorized => ("auth", "medium"),
AppError::NotFound(_) => ("not_found", "low"),
_ => ("unknown", "medium"),
};
ERROR_COUNTER
.with_label_values(&[error_type, severity])
.inc();
}
async fn handle_request<F, T>(
endpoint: &str,
handler: F,
) -> Result<T, AppError>
where
F: Future<Output = Result<T, AppError>>,
{
let timer = REQUEST_DURATION
.with_label_values(&[endpoint, "processing"])
.start_timer();
let result = handler.await;
let status = if result.is_ok() { "success" } else { "error" };
timer.observe_duration();
REQUEST_DURATION
.with_label_values(&[endpoint, status])
.observe(timer.stop_and_record());
if let Err(ref e) = result {
record_error(e);
}
result
}
5.3 分布式追蹤
實現(xiàn)OpenTelemetry追蹤:
use opentelemetry::{global, trace::{Tracer, Span, Status}};
use tracing_opentelemetry::OpenTelemetrySpanExt;
async fn traced_operation(
trace_id: String,
) -> Result<(), AppError> {
let tracer = global::tracer("app");
let mut span = tracer.start("process_operation");
span.set_attribute(
opentelemetry::KeyValue::new("trace_id", trace_id.clone())
);
let result = perform_operation().await;
match &result {
Ok(_) => {
span.set_status(Status::Ok);
}
Err(e) => {
span.set_status(Status::error(e.to_string()));
span.set_attribute(
opentelemetry::KeyValue::new("error.type", format!("{:?}", e))
);
}
}
span.end();
result
}
第六部分:測試錯誤處理
6.1 單元測試
編寫全面的錯誤處理測試:
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_validation_error() {
let result = validate_email("");
assert!(result.is_err());
match result {
Err(AppError::Validation { field, message }) => {
assert_eq!(field, "email");
assert!(message.contains("不能為空"));
}
_ => panic!("期望ValidationError"),
}
}
#[tokio::test]
async fn test_database_error_handling() {
let mock_db = MockDatabase::new();
mock_db.expect_query()
.returning(|_| Err(DatabaseError::ConnectionFailed("連接超時".into())));
let result = fetch_user_from_db(1, &mock_db).await;
assert!(result.is_err());
assert!(matches!(result, Err(AppError::Database(_))));
}
#[tokio::test]
async fn test_retry_mechanism() {
let mut attempt = 0;
let operation = || {
attempt += 1;
async move {
if attempt < 3 {
Err(AppError::TemporaryError)
} else {
Ok(())
}
}
};
let result = retry_with_backoff(
operation,
5,
Duration::from_millis(10),
).await;
assert!(result.is_ok());
assert_eq!(attempt, 3);
}
}
6.2 集成測試
測試完整的錯誤處理流程:
#[actix_web::test]
async fn test_api_error_response() {
let app = test::init_service(
App::new()
.service(web::resource("/users/{id}").to(get_user))
).await;
// 測試404錯誤
let req = test::TestRequest::get()
.uri("/users/99999")
.to_request();
let resp = test::call_service(&app, req).await;
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
let body: ApiResponse<()> = test::read_body_json(resp).await;
assert!(!body.success);
assert!(body.error.is_some());
assert_eq!(body.error.unwrap().code, "NOT_FOUND");
}
#[actix_web::test]
async fn test_validation_error_response() {
let app = test::init_service(
App::new()
.service(web::resource("/users").route(web::post().to(create_user)))
).await;
let invalid_user = json!({
"email": "invalid-email",
"age": -5,
});
let req = test::TestRequest::post()
.uri("/users")
.set_json(&invalid_user)
.to_request();
let resp = test::call_service(&app, req).await;
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
}
6.3 錯誤場景的屬性測試
使用proptest進行屬性測試:
use proptest::prelude::*;
proptest! {
#[test]
fn test_email_validation_properties(
email in "[a-z]{1,10}@[a-z]{1,10}\\.[a-z]{2,3}"
) {
// 有效的郵箱格式應該通過驗證
let result = validate_email(&email);
prop_assert!(result.is_ok());
}
#[test]
fn test_invalid_email_rejection(
invalid in "[^@]*"
) {
// 不包含@的字符串應該被拒絕
if !invalid.contains('@') {
let result = validate_email(&invalid);
prop_assert!(result.is_err());
}
}
}
第七部分:性能優(yōu)化
7.1 避免不必要的錯誤分配
使用引用和借用來減少分配:
// 不好的做法:每次都分配新的錯誤
fn bad_validation(input: &str) -> Result<(), String> {
if input.is_empty() {
return Err("輸入不能為空".to_string());
}
Ok(())
}
// 好的做法:使用靜態(tài)字符串或Cow
fn good_validation(input: &str) -> Result<(), &'static str> {
if input.is_empty() {
return Err("輸入不能為空");
}
Ok(())
}
// 更好的做法:使用枚舉
#[derive(Debug)]
enum ValidationError {
Empty,
TooLong,
InvalidFormat,
}
fn best_validation(input: &str) -> Result<(), ValidationError> {
if input.is_empty() {
return Err(ValidationError::Empty);
}
Ok(())
}
7.2 錯誤類型的大小優(yōu)化
保持錯誤類型尺寸合理:
// 檢查錯誤類型大小
fn check_error_size() {
println!("AppError size: {}", std::mem::size_of::<AppError>());
println!("Result<(), AppError> size: {}",
std::mem::size_of::<Result<(), AppError>>());
}
// 如果錯誤類型過大,考慮使用Box
#[derive(Debug)]
enum LargeError {
BigVariant(Box<VeryLargeStruct>),
SmallVariant(u32),
}
7.3 快速路徑優(yōu)化
#[inline]
fn fast_path_check(input: &str) -> Result<(), AppError> {
// 快速路徑:常見的成功情況
if likely(input.len() > 0 && input.len() < 100) {
return Ok(());
}
// 慢速路徑:詳細的錯誤檢查
validate_detailed(input)
}
// 使用likely宏提示編譯器優(yōu)化分支預測
#[inline(always)]
fn likely(b: bool) -> bool {
if !b {
core::hint::unreachable_unchecked();
}
b
}
第八部分:實戰(zhàn)案例
8.1 構建完整的Web API錯誤處理系統(tǒng)
use actix_web::{web, App, HttpServer, middleware};
use serde::{Deserialize, Serialize};
// 應用狀態(tài)
struct AppState {
db: Database,
cache: Cache,
config: Config,
}
// 請求處理器
#[derive(Deserialize)]
struct CreateUserRequest {
email: String,
name: String,
age: u8,
}
async fn create_user(
data: web::Json<CreateUserRequest>,
state: web::Data<AppState>,
) -> Result<HttpResponse, AppError> {
// 輸入驗證
validate_user_input(&data)?;
// 檢查用戶是否已存在
if user_exists(&data.email, &state.db).await? {
return Err(AppError::Conflict(
format!("郵箱 {} 已被注冊", data.email)
));
}
// 創(chuàng)建用戶
let user = User::new(
data.email.clone(),
data.name.clone(),
data.age,
);
// 保存到數(shù)據(jù)庫
let saved_user = state.db
.insert_user(user)
.await
.map_err(|e| {
log::error!("保存用戶失敗: {}", e);
AppError::Database(e)
})?;
// 發(fā)送歡迎郵件(失敗不影響主流程)
if let Err(e) = send_welcome_email(&saved_user).await {
log::warn!("發(fā)送歡迎郵件失敗: {}", e);
}
// 更新緩存
state.cache.set_user(&saved_user).await?;
Ok(HttpResponse::Created().json(ApiResponse::success(saved_user)))
}
fn validate_user_input(input: &CreateUserRequest) -> Result<(), AppError> {
if input.email.is_empty() {
return Err(AppError::Validation {
field: "email".to_string(),
message: "郵箱不能為空".to_string(),
});
}
if !is_valid_email(&input.email) {
return Err(AppError::Validation {
field: "email".to_string(),
message: "郵箱格式無效".to_string(),
});
}
if input.name.len() < 2 || input.name.len() > 50 {
return Err(AppError::Validation {
field: "name".to_string(),
message: "姓名長度必須在2-50個字符之間".to_string(),
});
}
if input.age < 18 {
return Err(AppError::Validation {
field: "age".to_string(),
message: "年齡必須大于18歲".to_string(),
});
}
Ok(())
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
// 初始化日志
tracing_subscriber::fmt::init();
// 初始化應用狀態(tài)
let state = web::Data::new(AppState {
db: Database::connect().await.unwrap(),
cache: Cache::new(),
config: Config::load().unwrap(),
});
HttpServer::new(move || {
App::new()
.app_data(state.clone())
.wrap(middleware::Logger::default())
.wrap(ErrorHandlers::new
``` .wrap(middleware::Compress::default())
.service(
web::scope("/api/v1")
.service(
web::resource("/users")
.route(web::post().to(create_user))
.route(web::get().to(list_users))
)
.service(
web::resource("/users/{id}")
.route(web::get().to(get_user))
.route(web::put().to(update_user))
.route(web::delete().to(delete_user))
)
)
.default_service(web::route().to(not_found))
})
.bind("127.0.0.1:8080")?
.run()
.await
}
async fn not_found() -> Result<HttpResponse, AppError> {
Err(AppError::NotFound("請求的資源不存在".to_string()))
}
8.2 數(shù)據(jù)庫事務中的錯誤處理
use sqlx::{PgPool, Postgres, Transaction};
async fn transfer_funds(
from_account: u32,
to_account: u32,
amount: f64,
pool: &PgPool,
) -> Result<TransferResult, AppError> {
// 開始事務
let mut tx = pool.begin().await
.map_err(|e| AppError::Database(e.into()))?;
// 檢查源賬戶余額
let from_balance = get_account_balance(&mut tx, from_account).await?;
if from_balance < amount {
return Err(AppError::InsufficientFunds {
account_id: from_account,
available: from_balance,
requested: amount,
});
}
// 扣款
deduct_from_account(&mut tx, from_account, amount)
.await
.map_err(|e| {
log::error!("扣款失敗: account={}, amount={}, error={}",
from_account, amount, e);
AppError::TransactionFailed(format!("扣款操作失敗: {}", e))
})?;
// 入賬
add_to_account(&mut tx, to_account, amount)
.await
.map_err(|e| {
log::error!("入賬失敗: account={}, amount={}, error={}",
to_account, amount, e);
// 事務會自動回滾
AppError::TransactionFailed(format!("入賬操作失敗: {}", e))
})?;
// 記錄轉賬歷史
record_transfer(&mut tx, from_account, to_account, amount)
.await
.map_err(|e| {
log::warn!("記錄轉賬歷史失敗: {}", e);
// 即使記錄失敗,也不影響主要轉賬操作
e
})
.ok();
// 提交事務
tx.commit().await
.map_err(|e| AppError::Database(e.into()))?;
log::info!("轉賬成功: {} -> {}, 金額: {}",
from_account, to_account, amount);
Ok(TransferResult {
from_account,
to_account,
amount,
timestamp: chrono::Utc::now(),
})
}
async fn get_account_balance(
tx: &mut Transaction<'_, Postgres>,
account_id: u32,
) -> Result<f64, AppError> {
sqlx::query_scalar("SELECT balance FROM accounts WHERE id = $1")
.bind(account_id)
.fetch_optional(tx)
.await
.map_err(|e| AppError::Database(e.into()))?
.ok_or_else(|| AppError::NotFound(
format!("賬戶 {} 不存在", account_id)
))
}
8.3 外部API調用的錯誤處理
use reqwest::{Client, StatusCode};
use serde_json::Value;
async fn call_external_api(
endpoint: &str,
payload: Value,
) -> Result<Value, AppError> {
let client = Client::new();
let response = client
.post(endpoint)
.json(&payload)
.timeout(Duration::from_secs(30))
.send()
.await
.map_err(|e| {
if e.is_timeout() {
AppError::ExternalServiceTimeout {
service: endpoint.to_string(),
duration: Duration::from_secs(30),
}
} else if e.is_connect() {
AppError::ExternalServiceUnavailable {
service: endpoint.to_string(),
reason: "連接失敗".to_string(),
}
} else {
AppError::ExternalServiceError {
service: endpoint.to_string(),
message: e.to_string(),
}
}
})?;
match response.status() {
StatusCode::OK => {
response.json().await
.map_err(|e| AppError::Serialization(e.into()))
}
StatusCode::BAD_REQUEST => {
let error_body: Value = response.json().await
.unwrap_or(json!({"error": "未知錯誤"}));
Err(AppError::ExternalServiceBadRequest {
service: endpoint.to_string(),
details: error_body,
})
}
StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => {
Err(AppError::ExternalServiceAuth {
service: endpoint.to_string(),
status: response.status().as_u16(),
})
}
StatusCode::NOT_FOUND => {
Err(AppError::ExternalServiceNotFound {
service: endpoint.to_string(),
resource: payload.to_string(),
})
}
StatusCode::TOO_MANY_REQUESTS => {
let retry_after = response
.headers()
.get("Retry-After")
.and_then(|v| v.to_str().ok())
.and_then(|v| v.parse().ok())
.unwrap_or(60);
Err(AppError::RateLimitExceeded {
service: endpoint.to_string(),
retry_after: Duration::from_secs(retry_after),
})
}
status if status.is_server_error() => {
Err(AppError::ExternalServiceError {
service: endpoint.to_string(),
message: format!("服務器錯誤: {}", status),
})
}
_ => {
Err(AppError::ExternalServiceError {
service: endpoint.to_string(),
message: format!("未預期的狀態(tài)碼: {}", response.status()),
})
}
}
}
// 帶重試的外部API調用
async fn call_external_api_with_retry(
endpoint: &str,
payload: Value,
) -> Result<Value, AppError> {
let max_retries = 3;
let mut last_error = None;
for attempt in 0..max_retries {
match call_external_api(endpoint, payload.clone()).await {
Ok(result) => return Ok(result),
Err(e) => {
match &e {
AppError::ExternalServiceTimeout { .. }
| AppError::ExternalServiceUnavailable { .. } => {
// 可重試的錯誤
last_error = Some(e);
if attempt < max_retries - 1 {
let delay = Duration::from_secs(2_u64.pow(attempt));
log::warn!(
"API調用失敗,{}秒后重試 (嘗試 {}/{})",
delay.as_secs(),
attempt + 1,
max_retries
);
tokio::time::sleep(delay).await;
}
}
AppError::RateLimitExceeded { retry_after, .. } => {
// 速率限制,等待指定時間后重試
if attempt < max_retries - 1 {
log::warn!(
"觸發(fā)速率限制,等待{}秒后重試",
retry_after.as_secs()
);
tokio::time::sleep(*retry_after).await;
last_error = Some(e);
} else {
return Err(e);
}
}
_ => {
// 不可重試的錯誤,直接返回
return Err(e);
}
}
}
}
}
Err(last_error.unwrap_or_else(|| AppError::Unknown(
"API調用失敗但未記錄錯誤".to_string()
)))
}
8.4 文件處理的錯誤處理
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
async fn process_uploaded_file(
file_path: &str,
content_type: &str,
) -> Result<FileInfo, AppError> {
// 驗證文件類型
validate_file_type(content_type)?;
// 讀取文件
let mut file = File::open(file_path)
.await
.map_err(|e| AppError::FileOperation {
operation: "open".to_string(),
path: file_path.to_string(),
source: e,
})?;
let mut buffer = Vec::new();
file.read_to_end(&mut buffer)
.await
.map_err(|e| AppError::FileOperation {
operation: "read".to_string(),
path: file_path.to_string(),
source: e,
})?;
// 驗證文件大小
const MAX_SIZE: usize = 10 * 1024 * 1024; // 10MB
if buffer.len() > MAX_SIZE {
return Err(AppError::FileTooLarge {
size: buffer.len(),
max_size: MAX_SIZE,
});
}
// 驗證文件內容
validate_file_content(&buffer, content_type)?;
// 生成安全的文件名
let safe_filename = generate_safe_filename(file_path)?;
let storage_path = format!("uploads/{}", safe_filename);
// 保存文件
let mut output_file = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&storage_path)
.await
.map_err(|e| AppError::FileOperation {
operation: "create".to_string(),
path: storage_path.clone(),
source: e,
})?;
output_file.write_all(&buffer)
.await
.map_err(|e| AppError::FileOperation {
operation: "write".to_string(),
path: storage_path.clone(),
source: e,
})?;
output_file.sync_all()
.await
.map_err(|e| AppError::FileOperation {
operation: "sync".to_string(),
path: storage_path.clone(),
source: e,
})?;
Ok(FileInfo {
original_name: file_path.to_string(),
storage_path,
size: buffer.len(),
content_type: content_type.to_string(),
uploaded_at: chrono::Utc::now(),
})
}
fn validate_file_type(content_type: &str) -> Result<(), AppError> {
const ALLOWED_TYPES: &[&str] = &[
"image/jpeg",
"image/png",
"image/gif",
"application/pdf",
];
if !ALLOWED_TYPES.contains(&content_type) {
return Err(AppError::InvalidFileType {
provided: content_type.to_string(),
allowed: ALLOWED_TYPES.iter().map(|s| s.to_string()).collect(),
});
}
Ok(())
}
fn validate_file_content(
buffer: &[u8],
expected_type: &str,
) -> Result<(), AppError> {
// 驗證文件魔數(shù)(文件頭)
let magic_bytes = &buffer[..std::cmp::min(16, buffer.len())];
let is_valid = match expected_type {
"image/jpeg" => magic_bytes.starts_with(&[0xFF, 0xD8, 0xFF]),
"image/png" => magic_bytes.starts_with(&[0x89, 0x50, 0x4E, 0x47]),
"image/gif" => magic_bytes.starts_with(b"GIF87a")
|| magic_bytes.starts_with(b"GIF89a"),
"application/pdf" => magic_bytes.starts_with(b"%PDF"),
_ => true, // 其他類型跳過驗證
};
if !is_valid {
return Err(AppError::FileContentMismatch {
declared_type: expected_type.to_string(),
message: "文件內容與聲明類型不匹配".to_string(),
});
}
Ok(())
}
第九部分:領域特定錯誤處理
9.1 認證與授權錯誤
#[derive(Error, Debug)]
pub enum AuthError {
#[error("無效的憑證")]
InvalidCredentials,
#[error("令牌已過期")]
TokenExpired,
#[error("令牌無效: {0}")]
InvalidToken(String),
#[error("缺少認證信息")]
MissingAuth,
#[error("權限不足: 需要 {required}, 當前 {current}")]
InsufficientPermissions {
required: String,
current: String,
},
#[error("賬戶已鎖定: {reason}")]
AccountLocked {
reason: String,
locked_until: Option<chrono::DateTime<chrono::Utc>>,
},
}
async fn authenticate_user(
email: &str,
password: &str,
db: &Database,
) -> Result<User, AuthError> {
// 查找用戶
let user = db.find_user_by_email(email)
.await
.map_err(|_| AuthError::InvalidCredentials)?;
// 檢查賬戶狀態(tài)
if user.is_locked() {
return Err(AuthError::AccountLocked {
reason: "多次登錄失敗".to_string(),
locked_until: user.locked_until,
});
}
// 驗證密碼
if !verify_password(password, &user.password_hash)? {
// 記錄失敗嘗試
db.record_failed_login(&user.id).await.ok();
// 檢查是否需要鎖定賬戶
let failed_attempts = db.get_failed_login_count(&user.id).await?;
if failed_attempts >= 5 {
db.lock_account(&user.id, Duration::from_secs(1800)).await?;
return Err(AuthError::AccountLocked {
reason: "連續(xù)登錄失敗超過5次".to_string(),
locked_until: Some(chrono::Utc::now() + chrono::Duration::minutes(30)),
});
}
return Err(AuthError::InvalidCredentials);
}
// 重置失敗計數(shù)
db.reset_failed_login_count(&user.id).await.ok();
Ok(user)
}
// 權限檢查中間件
async fn check_permissions(
req: ServiceRequest,
required_permission: &str,
) -> Result<ServiceRequest, actix_web::Error> {
let token = extract_token(&req)
.ok_or(AuthError::MissingAuth)?;
let claims = validate_token(&token)
.map_err(|e| AuthError::InvalidToken(e.to_string()))?;
if !claims.permissions.contains(&required_permission.to_string()) {
return Err(AuthError::InsufficientPermissions {
required: required_permission.to_string(),
current: claims.permissions.join(", "),
}.into());
}
Ok(req)
}
9.2 支付處理錯誤
#[derive(Error, Debug)]
pub enum PaymentError {
#[error("支付金額無效: {0}")]
InvalidAmount(f64),
#[error("余額不足: 可用 {available}, 需要 {required}")]
InsufficientFunds {
available: f64,
required: f64,
},
#[error("支付方式不支持: {0}")]
UnsupportedPaymentMethod(String),
#[error("支付處理失敗: {reason}")]
ProcessingFailed {
reason: String,
transaction_id: Option<String>,
},
#[error("支付網(wǎng)關錯誤: {gateway} - {message}")]
GatewayError {
gateway: String,
message: String,
error_code: Option<String>,
},
#[error("重復支付: 訂單 {order_id} 已支付")]
DuplicatePayment {
order_id: String,
existing_transaction: String,
},
}
async fn process_payment(
order: &Order,
payment_method: PaymentMethod,
gateway: &PaymentGateway,
) -> Result<PaymentResult, PaymentError> {
// 驗證支付金額
if order.total <= 0.0 {
return Err(PaymentError::InvalidAmount(order.total));
}
// 檢查訂單狀態(tài)
if order.is_paid() {
return Err(PaymentError::DuplicatePayment {
order_id: order.id.to_string(),
existing_transaction: order.payment_transaction.clone()
.unwrap_or_default(),
});
}
// 驗證支付方式
if !is_payment_method_supported(&payment_method) {
return Err(PaymentError::UnsupportedPaymentMethod(
format!("{:?}", payment_method)
));
}
// 調用支付網(wǎng)關
let payment_request = PaymentRequest {
amount: order.total,
currency: order.currency.clone(),
order_id: order.id.to_string(),
payment_method,
customer: order.customer.clone(),
};
let result = gateway
.process_payment(payment_request)
.await
.map_err(|e| match e {
GatewayError::InsufficientFunds { available, required } => {
PaymentError::InsufficientFunds { available, required }
}
GatewayError::NetworkError(msg) => {
PaymentError::GatewayError {
gateway: gateway.name().to_string(),
message: format!("網(wǎng)絡錯誤: {}", msg),
error_code: None,
}
}
GatewayError::ApiError { code, message } => {
PaymentError::GatewayError {
gateway: gateway.name().to_string(),
message,
error_code: Some(code),
}
}
_ => PaymentError::ProcessingFailed {
reason: e.to_string(),
transaction_id: None,
},
})?;
// 記錄支付結果
log::info!(
"支付成功: order={}, transaction={}, amount={}",
order.id,
result.transaction_id,
order.total
);
Ok(result)
}
// 支付失敗后的補償處理
async fn handle_payment_failure(
order: &Order,
error: &PaymentError,
db: &Database,
) -> Result<(), AppError> {
// 記錄失敗原因
db.record_payment_failure(order.id, error).await?;
// 發(fā)送通知
match error {
PaymentError::InsufficientFunds { .. } => {
notify_insufficient_funds(&order.customer).await?;
}
PaymentError::GatewayError { .. } => {
notify_technical_issue(&order.customer).await?;
// 通知技術團隊
alert_ops_team(format!("支付網(wǎng)關錯誤: {:?}", error)).await?;
}
_ => {
notify_payment_failed(&order.customer, error).await?;
}
}
// 更新訂單狀態(tài)
db.update_order_status(order.id, OrderStatus::PaymentFailed).await?;
Ok(())
}
9.3 數(shù)據(jù)驗證錯誤
use validator::{Validate, ValidationError};
#[derive(Debug, Validate, Deserialize)]
struct UserRegistration {
#[validate(email(message = "郵箱格式無效"))]
email: String,
#[validate(length(min = 8, message = "密碼長度至少8位"))]
#[validate(custom = "validate_password_strength")]
password: String,
#[validate(length(min = 2, max = 50, message = "用戶名長度必須在2-50之間"))]
#[validate(regex(path = "USERNAME_REGEX", message = "用戶名只能包含字母、數(shù)字和下劃線"))]
username: String,
#[validate(range(min = 18, max = 120, message = "年齡必須在18-120之間"))]
age: u8,
#[validate(phone(message = "手機號格式無效"))]
phone: Option<String>,
}
lazy_static! {
static ref USERNAME_REGEX: regex::Regex =
regex::Regex::new(r"^[a-zA-Z0-9_]+$").unwrap();
}
fn validate_password_strength(password: &str) -> Result<(), ValidationError> {
let has_uppercase = password.chars().any(|c| c.is_uppercase());
let has_lowercase = password.chars().any(|c| c.is_lowercase());
let has_digit = password.chars().any(|c| c.is_numeric());
let has_special = password.chars().any(|c| "!@#$%^&*()".contains(c));
if !(has_uppercase && has_lowercase && has_digit && has_special) {
return Err(ValidationError::new("weak_password")
.with_message(std::borrow::Cow::Borrowed(
"密碼必須包含大小寫字母、數(shù)字和特殊字符"
)));
}
Ok(())
}
async fn register_user(
data: web::Json<UserRegistration>,
db: web::Data<Database>,
) -> Result<HttpResponse, AppError> {
// 使用validator進行驗證
data.validate()
.map_err(|e| AppError::ValidationErrors(e))?;
// 額外的業(yè)務邏輯驗證
validate_business_rules(&data, &db).await?;
// 創(chuàng)建用戶
let user = create_user_from_registration(&data).await?;
Ok(HttpResponse::Created().json(ApiResponse::success(user)))
}
async fn validate_business_rules(
data: &UserRegistration,
db: &Database,
) -> Result<(), AppError> {
// 檢查郵箱是否已被使用
if db.email_exists(&data.email).await? {
return Err(AppError::Validation {
field: "email".to_string(),
message: "該郵箱已被注冊".to_string(),
});
}
// 檢查用戶名是否已被使用
if db.username_exists(&data.username).await? {
return Err(AppError::Validation {
field: "username".to_string(),
message: "該用戶名已被占用".to_string(),
});
}
// 檢查是否在黑名單中
if is_email_blacklisted(&data.email) {
return Err(AppError::Validation {
field: "email".to_string(),
message: "該郵箱域名不被支持".to_string(),
});
}
Ok(())
}
// 將validator的錯誤轉換為應用錯誤
impl From<validator::ValidationErrors> for AppError {
fn from(errors: validator::ValidationErrors) -> Self {
let mut messages = Vec::new();
for (field, errors) in errors.field_errors() {
for error in errors {
let message = error
.message
.clone()
.unwrap_or_else(|| std::borrow::Cow::Borrowed("驗證失敗"));
messages.push(format!("{}: {}", field, message));
}
}
AppError::ValidationErrors {
fields: messages,
}
}
}
第十部分:錯誤恢復策略
10.1 斷路器模式
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Clone)]
pub struct CircuitBreaker {
state: Arc<RwLock<CircuitState>>,
config: CircuitConfig,
}
#[derive(Debug)]
enum CircuitState {
Closed {
failure_count: u32,
},
Open {
opened_at: Instant,
},
HalfOpen {
success_count: u32,
failure_count: u32,
},
}
struct CircuitConfig {
failure_threshold: u32,
timeout: Duration,
half_open_max_calls: u32,
}
impl CircuitBreaker {
pub fn new(config: CircuitConfig) -> Self {
Self {
state: Arc::new(RwLock::new(CircuitState::Closed {
failure_count: 0,
})),
config,
}
}
pub async fn call<F, T, E>(&self, operation: F) -> Result<T, CircuitBreakerError<E>>
where
F: Future<Output = Result<T, E>>,
E: std::fmt::Display,
{
// 檢查斷路器狀態(tài)
{
let state = self.state.read().await;
match *state {
CircuitState::Open { opened_at } => {
if opened_at.elapsed() < self.config.timeout {
return Err(CircuitBreakerError::Open);
}
// 超時后進入半開狀態(tài)
}
_ => {}
}
}
// 執(zhí)行操作
let result = operation.await;
// 更新狀態(tài)
self.handle_result(&result).await;
result.map_err(CircuitBreakerError::Inner)
}
async fn handle_result<T, E>(&self, result: &Result<T, E>) {
let mut state = self.state.write().await;
match &mut *state {
CircuitState::Closed { failure_count } => {
if result.is_err() {
*failure_count += 1;
if *failure_count >= self.config.failure_threshold {
log::warn!("斷路器打開: 失敗次數(shù)達到閾值");
*state = CircuitState::Open {
opened_at: Instant::now(),
};
}
} else {
*failure_count = 0;
}
}
CircuitState::Open { opened_at } => {
if opened_at.elapsed() >= self.config.timeout {
log::info!("斷路器進入半開狀態(tài)");
*state = CircuitState::HalfOpen {
success_count: 0,
failure_count: 0,
};
}
}
CircuitState::HalfOpen {
success_count,
failure_count,
} => {
if result.is_ok() {
*success_count += 1;
if *success_count >= self.config.half_open_max_calls {
log::info!("斷路器關閉: 測試調用成功");
*state = CircuitState::Closed { failure_count: 0 };
}
} else {
*failure_count += 1;
log::warn!("斷路器重新打開: 測試調用失敗");
*state = CircuitState::Open {
opened_at: Instant::now(),
};
}
}
}
}
}
#[derive(Error, Debug)]
pub enum CircuitBreakerError<E> {
#[error("斷路器打開")]
Open,
#[error("操作失敗: {0}")]
Inner(E),
}
// 使用示例
async fn call_external_service_with_circuit_breaker(
circuit_breaker: &CircuitBreaker,
request: Request,
) -> Result<Response, AppError> {
circuit_breaker
.call(async {
call_external_service(request).await
})
.await
.map_err(|e| match e {
CircuitBreakerError::Open => {
AppError::ServiceUnavailable {
service: "external_api".to_string(),
reason: "斷路器打開".to_string(),
}
}
CircuitBreakerError::Inner(e) => e,
})
}
10.2 降級策略
async fn get_user_with_fallback(
user_id: u32,
services: &Services,
) -> Result<User, AppError> {
// 第一級:嘗試從主數(shù)據(jù)庫獲取
match services.primary_db.get_user(user_id).await {
Ok(user) => {
log::debug!("從主數(shù)據(jù)庫獲取用戶成功: {}", user_id);
return Ok(user);
}
Err(e) => {
log::warn!("主數(shù)據(jù)庫失敗: {}, 嘗試緩存", e);
}
}
// 第二級:嘗試從緩存獲取
match services.cache.get_user(user_id).await {
Ok(Some(user)) => {
log::info!("從緩存獲取用戶成功: {}", user_id);
return Ok(user);
}
Ok(None) => {
log::debug!("緩存中不存在用戶: {}", user_id);
}
Err(e) => {
log::warn!("緩存獲取失敗: {}", e);
}
}
// 第三級:嘗試從只讀副本獲取
match services.read_replica.get_user(user_id).await {
Ok(user) => {
log::info!("從只讀副本獲取用戶成功: {}", user_id);
// 異步更新緩存
let cache = services.cache.clone();
let user_clone = user.clone();
tokio::spawn(async move {
if let Err(e) = cache.set_user(&user_clone).await {
log::error!("更新緩存失敗: {}", e);
}
});
return Ok(user);
}
Err(e) => {
log::error!("所有數(shù)據(jù)源都失敗: {}", e);
}
}
// 第四級:返回默認用戶(最后的降級)
log::error!("無法獲取用戶 {}, 返回默認用戶", user_id);
Ok(User::guest_user())
}
// 功能降級裝飾器
async fn with_graceful_degradation<F, T>(
operation: F,
fallback: T,
operation_name: &str,
) -> T
where
F: Future<Output = Result<T, AppError>>,
{
match operation.await {
Ok(result) => result,
Err(e) => {
log::warn!(
"操作 {} 失敗: {}, 使用降級方案",
operation_name,
e
);
// 記錄降級事件
metrics::counter!("degradation_events", 1, "operation" => operation_name);
fallback
}
}
}
// 使用示例
async fn get_user_recommendations(
user_id: u32,
services: &Services,
) -> Vec<Recommendation> {
with_graceful_degradation(
services.recommendation_engine.get_recommendations(user_id),
vec![], // 降級:返回空列表
"user_recommendations",
).await
}
async fn get_user_profile_with_degradation(
user_id: u32,
services: &Services,
) -> UserProfile {
let user = with_graceful_degradation(
services.db.get_user(user_id),
User::guest_user(),
"get_user",
).await;
let preferences = with_graceful_degradation(
services.db.get_preferences(user_id),
Preferences::default(),
"get_preferences",
).await;
let stats = with_graceful_degradation(
services.analytics.get_user_stats(user_id),
UserStats::default(),
"get_user_stats",
).await;
UserProfile {
user,
preferences,
stats,
}
}
10.3 補償事務(Saga模式)
use async_trait::async_trait;
#[async_trait]
trait SagaStep: Send + Sync {
async fn execute(&self) -> Result<(), AppError>;
async fn compensate(&self) -> Result<(), AppError>;
}
struct CreateOrderStep {
order_data: OrderData,
db: Arc<Database>,
}
#[async_trait]
impl SagaStep for CreateOrderStep {
async fn execute(&self) -> Result<(), AppError> {
self.db.create_order(&self.order_data).await?;
log::info!("訂單創(chuàng)建成功: {}", self.order_data.id);
Ok(())
}
async fn compensate(&self) -> Result<(), AppError> {
self.db.delete_order(self.order_data.id).await?;
log::info!("訂單回滾: {}", self.order_data.id);
Ok(())
}
}
struct ReserveInventoryStep {
order_id: u32,
items: Vec<OrderItem>,
inventory_service: Arc<InventoryService>,
}
#[async_trait]
impl SagaStep for ReserveInventoryStep {
async fn execute(&self) -> Result<(), AppError> {
for item in &self.items {
self.inventory_service
.reserve(item.product_id, item.quantity)
.await?;
}
log::info!("庫存預留成功: order={}", self.order_id);
Ok(())
}
async fn compensate(&self) -> Result<(), AppError> {
for item in &self.items {
self.inventory_service
.release_reservation(item.product_id, item.quantity)
.await?;
}
log::info!("庫存預留回滾: order={}", self.order_id);
Ok(())
}
}
struct ProcessPaymentStep {
order_id: u32,
amount: f64,
payment_service: Arc<PaymentService>,
}
#[async_trait]
impl SagaStep for ProcessPaymentStep {
async fn execute(&self) -> Result<(), AppError> {
self.payment_service
.charge(self.order_id, self.amount)
.await?;
log::info!("支付處理成功: order={}, amount={}", self.order_id, self.amount);
Ok(())
}
async fn compensate(&self) -> Result<(), AppError> {
self.payment_service
.refund(self.order_id, self.amount)
.await?;
log::info!("支付退款: order={}, amount={}", self.order_id, self.amount);
Ok(())
}
}
struct Saga {
steps: Vec<Box<dyn SagaStep>>,
executed_steps: Vec<usize>,
}
impl Saga {
fn new() -> Self {
Self {
steps: Vec::new(),
executed_steps: Vec::new(),
}
}
fn add_step(&mut self, step: Box<dyn SagaStep>) {
self.steps.push(step);
}
async fn execute(&mut self) -> Result<(), AppError> {
for (index, step) in self.steps.iter().enumerate() {
match step.execute().await {
Ok(_) => {
self.executed_steps.push(index);
}
Err(e) => {
log::error!("Saga步驟失敗: step={}, error={}", index, e);
// 執(zhí)行補償
self.compensate().await?;
return Err(e);
}
}
}
Ok(())
}
async fn compensate(&self) -> Result<(), AppError> {
log::warn!("開始Saga補償事務");
// 按相反順序執(zhí)行補償
for &index in self.executed_steps.iter().rev() {
if let Some(step) = self.steps.get(index) {
if let Err(e) = step.compensate().await {
log::error!("補償步驟失敗: step={}, error={}", index, e);
// 繼續(xù)執(zhí)行其他補償,不要中斷
}
}
}
log::info!("Saga補償事務完成");
Ok(())
}
}
// 使用示例
async fn process_order_with_saga(
order_data: OrderData,
services: &Services,
) -> Result<Order, AppError> {
let mut saga = Saga::new();
// 添加創(chuàng)建訂單步驟
saga.add_step(Box::new(CreateOrderStep {
order_data: order_data.clone(),
db: services.db.clone(),
}));
// 添加預留庫存步驟
saga.add_step(Box::new(ReserveInventoryStep {
order_id: order_data.id,
items: order_data.items.clone(),
inventory_service: services.inventory.clone(),
}));
// 添加支付處理步驟
saga.add_step(Box::new(ProcessPaymentStep {
order_id: order_data.id,
amount: order_data.total,
payment_service: services.payment.clone(),
}));
// 執(zhí)行Saga
saga.execute().await?;
// 獲取完整的訂單信息
let order = services.db.get_order(order_data.id).await?;
log::info!("訂單處理完成: {}", order_data.id);
Ok(order)
}
10.4 冪等性處理
use uuid::Uuid;
struct IdempotencyKey(String);
impl IdempotencyKey {
fn new() -> Self {
Self(Uuid::new_v4().to_string())
}
fn from_request(req: &HttpRequest) -> Option<Self> {
req.headers()
.get("Idempotency-Key")
.and_then(|v| v.to_str().ok())
.map(|s| Self(s.to_string()))
}
}
struct IdempotencyStore {
redis: redis::Client,
}
impl IdempotencyStore {
async fn check_and_store(
&self,
key: &IdempotencyKey,
ttl: Duration,
) -> Result<IdempotencyStatus, AppError> {
let mut conn = self.redis.get_async_connection().await
.map_err(|e| AppError::Cache(e.into()))?;
let exists: bool = redis::cmd("EXISTS")
.arg(&key.0)
.query_async(&mut conn)
.await
.map_err(|e| AppError::Cache(e.into()))?;
if exists {
// 獲取之前的結果
let result: Option<String> = redis::cmd("GET")
.arg(&key.0)
.query_async(&mut conn)
.await
.map_err(|e| AppError::Cache(e.into()))?;
return Ok(IdempotencyStatus::Duplicate(result));
}
// 設置處理中狀態(tài)
redis::cmd("SETEX")
.arg(&key.0)
.arg(ttl.as_secs())
.arg("processing")
.query_async(&mut conn)
.await
.map_err(|e| AppError::Cache(e.into()))?;
Ok(IdempotencyStatus::New)
}
async fn store_result(
&self,
key: &IdempotencyKey,
result: &str,
ttl: Duration,
) -> Result<(), AppError> {
let mut conn = self.redis.get_async_connection().await
.map_err(|e| AppError::Cache(e.into()))?;
redis::cmd("SETEX")
.arg(&key.0)
.arg(ttl.as_secs())
.arg(result)
.query_async(&mut conn)
.await
.map_err(|e| AppError::Cache(e.into()))?;
Ok(())
}
}
enum IdempotencyStatus {
New,
Duplicate(Option<String>),
}
// 冪等性中間件
async fn idempotent_handler<F, T>(
req: HttpRequest,
handler: F,
store: &IdempotencyStore,
) -> Result<HttpResponse, AppError>
where
F: Future<Output = Result<T, AppError>>,
T: Serialize,
{
let idempotency_key = IdempotencyKey::from_request(&req)
.ok_or_else(|| AppError::MissingIdempotencyKey)?;
match store.check_and_store(&idempotency_key, Duration::from_secs(86400)).await? {
IdempotencyStatus::New => {
// 執(zhí)行操作
let result = handler.await?;
// 存儲結果
let result_json = serde_json::to_string(&result)
.map_err(|e| AppError::Serialization(e.into()))?;
store.store_result(
&idempotency_key,
&result_json,
Duration::from_secs(86400),
).await?;
Ok(HttpResponse::Ok().json(result))
}
IdempotencyStatus::Duplicate(Some(cached_result)) => {
// 返回緩存的結果
log::info!("冪等請求,返回緩存結果");
Ok(HttpResponse::Ok()
.content_type("application/json")
.body(cached_result))
}
IdempotencyStatus::Duplicate(None) => {
// 正在處理中
Err(AppError::RequestInProgress)
}
}
}
// 使用示例
async fn create_payment_idempotent(
req: HttpRequest,
data: web::Json<PaymentRequest>,
state: web::Data<AppState>,
) -> Result<HttpResponse, AppError> {
idempotent_handler(
req,
async {
process_payment(&data, &state.payment_service).await
},
&state.idempotency_store,
).await
}
第十一部分:錯誤文檔化與溝通
11.1 錯誤代碼體系
// 定義標準化的錯誤代碼
pub mod error_codes {
pub const VALIDATION_ERROR: &str = "E1000";
pub const MISSING_FIELD: &str = "E1001";
pub const INVALID_FORMAT: &str = "E1002";
pub const OUT_OF_RANGE: &str = "E1003";
pub const AUTH_ERROR: &str = "E2000";
pub const INVALID_CREDENTIALS: &str = "E2001";
pub const TOKEN_EXPIRED: &str = "E2002";
pub const INSUFFICIENT_PERMISSIONS: &str = "E2003";
pub const DATABASE_ERROR: &str = "E3000";
pub const CONNECTION_FAILED: &str = "E3001";
pub const QUERY_FAILED: &str = "E3002";
pub const CONSTRAINT_VIOLATION: &str = "E3003";
pub const BUSINESS_ERROR: &str = "E4000";
pub const INSUFFICIENT_FUNDS: &str = "E4001";
pub const DUPLICATE_OPERATION: &str = "E4002";
pub const RESOURCE_LOCKED: &str = "E4003";
}
#[derive(Debug, Serialize)]
struct DetailedErrorResponse {
// 錯誤代碼
code: String,
// 面向用戶的錯誤消息
message: String,
// 面向開發(fā)者的詳細信息
details: Option<String>,
// 錯誤發(fā)生的時間
timestamp: chrono::DateTime<chrono::Utc>,
// 請求追蹤ID
trace_id: String,
// 相關文檔鏈接
documentation_url: Option<String>,
// 建議的解決方案
suggestions: Vec<String>,
}
impl AppError {
fn to_detailed_response(&self, trace_id: String) -> DetailedErrorResponse {
let (code, message, details, doc_url, suggestions) = match self {
AppError::Validation { field, message } => (
error_codes::VALIDATION_ERROR,
format!("驗證失敗: {}", message),
Some(format!("字段 '{}' 的值不符合要求", field)),
Some("https://docs.example.com/errors/validation".to_string()),
vec![
"檢查輸入格式是否正確".to_string(),
"參考API文檔了解字段要求".to_string(),
],
),
AppError::Unauthorized => (
error_codes::AUTH_ERROR,
"未授權訪問".to_string(),
Some("需要有效的認證憑證".to_string()),
Some("https://docs.example.com/errors/auth".to_string()),
vec![
"確保請求包含有效的認證令牌".to_string(),
"檢查令牌是否已過期".to_string(),
"聯(lián)系管理員獲取訪問權限".to_string(),
],
),
AppError::InsufficientFunds { available, required, .. } => (
error_codes::INSUFFICIENT_FUNDS,
"余額不足".to_string(),
Some(format!("可用余額: {}, 需要: {}", available, required)),
Some("https://docs.example.com/errors/payment".to_string()),
vec![
"充值賬戶".to_string(),
"選擇其他支付方式".to_string(),
"減少交易金額".to_string(),
],
),
_ => (
"E9999",
"內部服務器錯誤".to_string(),
None,
Some("https://docs.example.com/errors/general".to_string()),
vec![
"稍后重試".to_string(),
"如果問題持續(xù),請聯(lián)系技術支持".to_string(),
],
),
};
DetailedErrorResponse {
code: code.to_string(),
message,
details,
timestamp: chrono::Utc::now(),
trace_id,
documentation_url: doc_url,
suggestions,
}
}
}
11.2 多語言錯誤消息
use std::collections::HashMap;
struct ErrorMessageCatalog {
messages: HashMap<String, HashMap<String, String>>,
}
impl ErrorMessageCatalog {
fn new() -> Self {
let mut messages = HashMap::new();
// 英文消息
let mut en = HashMap::new();
en.insert("validation.required".to_string(), "This field is required".to_string());
en.insert("validation.email".to_string(), "Invalid email format".to_string());
en.insert("auth.invalid_credentials".to_string(), "Invalid username or password".to_string());
messages.insert("en".to_string(), en);
// 中文消息
let mut zh = HashMap::new();
zh.insert("validation.required".to_string(), "此字段為必填項".to_string());
zh.insert("validation.email".to_string(), "郵箱格式無效".to_string());
zh.insert("auth.invalid_credentials".to_string(), "用戶名或密碼錯誤".to_string());
messages.insert("zh".to_string(), zh);
Self { messages }
}
fn get_message(&self, key: &str, locale: &str) -> String {
self.messages
.get(locale)
.and_then(|m| m.get(key))
.or_else(|| {
self.messages
.get("en")
.and_then(|m| m.get(key))
})
.cloned()
.unwrap_or_else(|| key.to_string())
}
}
// 從請求中獲取語言偏好
fn get_preferred_locale(req: &HttpRequest) -> String {
req.headers()
.get("Accept-Language")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.split(',').next())
.and_then(|s| s.split('-').next())
.unwrap_or("en")
.to_string()
}
// 本地化的錯誤響應
fn localized_error_response(
error: &AppError,
locale: &str,
catalog: &ErrorMessageCatalog,
) -> ErrorResponse {
let (message_key, context) = match error {
AppError::Validation { field, .. } => (
"validation.required",
Some(json!({ "field": field })),
),
AppError::Unauthorized => (
"auth.invalid_credentials",
None,
),
_ => (
"error.general",
None,
),
};
ErrorResponse {
code: error.code(),
message: catalog.get_message(message_key, locale),
context,
}
}
11.3 錯誤報告與反饋
use sentry::{ClientOptions, Hub};
struct ErrorReporter {
sentry_hub: Hub,
slack_webhook: Option<String>,
}
impl ErrorReporter {
fn new(sentry_dsn: &str, slack_webhook: Option<String>) -> Self {
let options = ClientOptions {
dsn: Some(sentry_dsn.parse().unwrap()),
..Default::default()
};
let hub = Hub::new(Some(sentry::init(options).into()));
Self {
sentry_hub: hub,
slack_webhook,
}
}
async fn report_error(
&self,
error: &AppError,
context: ErrorContext,
) {
// 發(fā)送到Sentry
self.sentry_hub.with_active(|| {
sentry::capture_error(error);
sentry::configure_scope(|scope| {
scope.set_tag("environment", &context.environment);
scope.set_tag("service", &context.service_name);
scope.set_extra("trace_id", context.trace_id.clone().into());
scope.set_extra("user_id", context.user_id.map(|id| id.into()).unwrap_or_default());
});
});
// 關鍵錯誤發(fā)送到Slack
if error.is_critical() {
if let Some(webhook_url) = &self.slack_webhook {
self.send_slack_alert(webhook_url, error, &context).await.ok();
}
}
}
async fn send_slack_alert(
&self,
webhook_url: &str,
error: &AppError,
context: &ErrorContext,
) -> Result<(), reqwest::Error> {
let message = json!({
"text": format!("?? Critical Error in {}", context.service_name),
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": format!("*Error:* {}\n*Trace ID:* {}\n*Environment:* {}",
error,
context.trace_id,
context.environment)
}
}
]
});
let client = reqwest::Client::new();
client.post(webhook_url)
.json(&message)
.send()
.await?;
Ok(())
}
}
struct ErrorContext {
trace_id: String,
user_id: Option<u32>,
service_name: String,
environment: String,
timestamp: chrono::DateTime<chrono::Utc>,
}
impl AppError {
fn is_critical(&self) -> bool {
matches!(
self,
AppError::Database(_)
| AppError::ExternalServiceError { .. }
| AppError::DataCorruption { .. }
)
}
}
第十二部分:總結與最佳實踐
12.1 錯誤處理原則清單
// ? 好的做法
fn good_error_handling() -> Result<Data, AppError> {
// 1. 使用具體的錯誤類型
let data = fetch_data()
.map_err(|e| AppError::DataFetchFailed(e.to_string()))?;
// 2. 添加上下文信息
let parsed = parse_data(&data)
.context("解析用戶數(shù)據(jù)失敗")?;
// 3. 記錄錯誤日志
if let Err(e) = validate(&parsed) {
log::error!("數(shù)據(jù)驗證失敗: {}", e);
return Err(AppError::ValidationFailed(e.to_string()));
}
Ok(parsed)
}
// ? 不好的做法
fn bad_error_handling() -> Result<Data, String> {
// 1. 使用String作為錯誤類型
let data = fetch_data()
.map_err(|e| format!("錯誤: {}", e))?;
// 2. 丟失錯誤上下文
let parsed = parse_data(&data)
.map_err(|_| "解析失敗".to_string())?;
// 3. 不記錄錯誤
validate(&parsed)
.map_err(|e| e.to_string())?;
Ok(parsed)
}
12.2 關鍵要點總結
1. 類型安全的錯誤處理
- 使用
Result<T, E>而不是panic或異常 - 定義清晰的錯誤類型層次結構
- 利用類型系統(tǒng)在編譯時捕獲錯誤
2. 錯誤傳播與轉換
- 使用
?操作符簡化錯誤傳播 - 實現(xiàn)
Fromtrait進行錯誤類型轉換 - 使用
thiserror和anyhow簡化錯誤定義
3. 上下文與可追溯性
- 為錯誤添加豐富的上下文信息
- 實現(xiàn)分布式追蹤
- 保留完整的錯誤鏈
4. 用戶體驗
- 區(qū)分用戶錯誤和系統(tǒng)錯誤
- 提供清晰的錯誤消息和建議
- 支持多語言錯誤消息
5. 可靠性保障
- 實現(xiàn)重試機制
- 使用斷路器模式
- 設計降級方案
- 確保冪等性
6. 監(jiān)控與告警
- 實施結構化日志
- 集成錯誤追蹤系統(tǒng)
- 設置關鍵指標告警
- 定期審查錯誤趨勢
7. 測試覆蓋
- 編寫錯誤場景的單元測試
- 實施混沌工程測試
- 使用屬性測試驗證錯誤處理邏輯
8. 文檔化
- 文檔化所有可能的錯誤類型
- 提供錯誤代碼參考
- 包含錯誤處理示例
這份全面的指南涵蓋了Rust中錯誤處理的方方面面,從基礎概念到高級模式,從實戰(zhàn)案例到最佳實踐。通過遵循這些原則和模式,你可以構建出健壯、可維護、用戶友好的Rust應用程序。
錯誤處理不僅僅是技術問題,更是關乎用戶體驗和系統(tǒng)可靠性的關鍵因素。在Rust的幫助下,我們可以在編譯時就捕獲大量潛在錯誤,在運行時優(yōu)雅地處理異常情況,最終交付高質量的軟件產(chǎn)品。
到此這篇關于淺談Rust中錯誤處理與響應構建的文章就介紹到這了,更多相關Rust錯誤處理與響應構建內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Rust?實現(xiàn)?async/await的詳細代碼
異步編程在 Rust 中的地位非常高,很多 crate 尤其是多IO操作的都使用了 async/await,這篇文章主要介紹了Rust?如何實現(xiàn)?async/await,需要的朋友可以參考下2022-09-09
Rust中的Iterator和IntoIterator介紹及應用小結
Iterator即迭代器,它可以用于對數(shù)據(jù)結構進行迭代,被迭代的數(shù)據(jù)結構是可迭代的(iterable),所謂的可迭代就是這個數(shù)據(jù)結構有返回迭代器的方法,這篇文章主要介紹了Rust中的Iterator和IntoIterator介紹及應用,需要的朋友可以參考下2023-07-07

