国产无遮挡裸体免费直播视频,久久精品国产蜜臀av,动漫在线视频一区二区,欧亚日韩一区二区三区,久艹在线 免费视频,国产精品美女网站免费,正在播放 97超级视频在线观看,斗破苍穹年番在线观看免费,51最新乱码中文字幕

PostgreSQL高級特性與性能優(yōu)化的實(shí)戰(zhàn)指南

 更新時間:2026年02月09日 08:54:04   作者:閑人編程  
本文將深入探討PostgreSQL的高級特性與性能優(yōu)化技術(shù),結(jié)合Python實(shí)踐,幫助開發(fā)者充分發(fā)揮PostgreSQL的潛力,文中的示例代碼講解詳細(xì),需要的小伙伴可以了解下

1. 引言:PostgreSQL的技術(shù)演進(jìn)與現(xiàn)狀

PostgreSQL作為全球最先進(jìn)的開源關(guān)系型數(shù)據(jù)庫,自1986年誕生以來,歷經(jīng)30多年的持續(xù)發(fā)展,已成為企業(yè)級應(yīng)用的首選數(shù)據(jù)庫之一。根據(jù)2023年DB-Engines排名數(shù)據(jù)顯示,PostgreSQL在"流行度"和"功能完備性"兩方面均位列開源數(shù)據(jù)庫第一。其強(qiáng)大的擴(kuò)展性、嚴(yán)格的數(shù)據(jù)完整性和豐富的功能特性,使其在處理復(fù)雜查詢、海量數(shù)據(jù)和高并發(fā)場景時表現(xiàn)出色。

本文將深入探討PostgreSQL的高級特性與性能優(yōu)化技術(shù),結(jié)合Python實(shí)踐,幫助開發(fā)者充分發(fā)揮PostgreSQL的潛力。根據(jù)國際數(shù)據(jù)公司(IDC)的報告,使用PostgreSQL的企業(yè)在數(shù)據(jù)庫運(yùn)營成本上平均降低65%,同時查詢性能提升300% 以上。

2. PostgreSQL高級特性詳解

2.1 JSONB與半結(jié)構(gòu)化數(shù)據(jù)處理

PostgreSQL的JSONB類型提供了對JSON數(shù)據(jù)的二進(jìn)制存儲,支持索引、查詢和修改操作,實(shí)現(xiàn)了關(guān)系型數(shù)據(jù)庫與文檔數(shù)據(jù)庫的完美結(jié)合。

2.1.1 JSONB性能對比分析

操作類型JSONB性能JSON性能性能提升
數(shù)據(jù)插入O(log n)O(n)5-10倍
路徑查詢O(log n)O(n)20-100倍
索引構(gòu)建O(n log n)不支持無限
數(shù)據(jù)更新O(log n)O(n)10-50倍

JSONB的存儲格式優(yōu)勢體現(xiàn)在:

2.2 全文搜索與文本分析

PostgreSQL內(nèi)置了強(qiáng)大的全文搜索功能,支持多語言、詞干提取、相關(guān)性排序等高級特性。

"""
PostgreSQL全文搜索高級應(yīng)用示例
"""
import psycopg2
from psycopg2.extras import Json, DictCursor
import json
from typing import List, Dict, Any, Optional
import logging
from datetime import datetime

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class PostgreSQLFullTextSearch:
    """PostgreSQL全文搜索高級功能封裝"""
    
    def __init__(self, dsn: str):
        """
        初始化數(shù)據(jù)庫連接
        
        Args:
            dsn: 數(shù)據(jù)庫連接字符串
        """
        self.conn = psycopg2.connect(dsn)
        self.conn.autocommit = False
        self.cursor = self.conn.cursor(cursor_factory=DictCursor)
        
    def create_fulltext_configuration(self, lang: str = 'english'):
        """
        創(chuàng)建自定義全文搜索配置
        
        Args:
            lang: 語言配置
        """
        config_sql = f"""
        -- 創(chuàng)建自定義文本搜索配置
        CREATE TEXT SEARCH CONFIGURATION {lang}_custom (
            COPY = {lang}
        );
        
        -- 添加同義詞字典
        CREATE TEXT SEARCH DICTIONARY synonym_dict (
            TEMPLATE = synonym,
            SYNONYMS = synonym_sample
        );
        
        -- 添加自定義字典到配置
        ALTER TEXT SEARCH CONFIGURATION {lang}_custom
            ALTER MAPPING FOR asciiword, asciihword, hword_asciipart
            WITH synonym_dict, english_stem;
        """
        
        try:
            self.cursor.execute(config_sql)
            self.conn.commit()
            logger.info(f"全文搜索配置創(chuàng)建成功: {lang}_custom")
        except Exception as e:
            self.conn.rollback()
            logger.error(f"創(chuàng)建配置失敗: {e}")
            
    def create_searchable_table(self):
        """
        創(chuàng)建支持全文搜索的表
        """
        create_table_sql = """
        -- 創(chuàng)建文檔表
        CREATE TABLE IF NOT EXISTS documents (
            id SERIAL PRIMARY KEY,
            title VARCHAR(500) NOT NULL,
            content TEXT NOT NULL,
            author VARCHAR(200),
            category VARCHAR(100),
            tags JSONB DEFAULT '[]',
            publication_date DATE DEFAULT CURRENT_DATE,
            
            -- 生成列:用于全文搜索
            content_search_vector TSVECTOR GENERATED ALWAYS AS (
                setweight(to_tsvector('english_custom', coalesce(title, '')), 'A') ||
                setweight(to_tsvector('english_custom', coalesce(content, '')), 'B')
            ) STORED,
            
            -- 生成列:用于元數(shù)據(jù)搜索
            metadata_search_vector TSVECTOR GENERATED ALWAYS AS (
                to_tsvector('english_custom', 
                    coalesce(author, '') || ' ' || 
                    coalesce(category, '') || ' ' ||
                    (tags::text)
                )
            ) STORED,
            
            -- 創(chuàng)建GIN索引優(yōu)化搜索性能
            CONSTRAINT valid_tags CHECK (jsonb_typeof(tags) = 'array')
        );
        
        -- 創(chuàng)建GIN索引
        CREATE INDEX IF NOT EXISTS idx_documents_content_search 
        ON documents USING GIN(content_search_vector);
        
        CREATE INDEX IF NOT EXISTS idx_documents_metadata_search 
        ON documents USING GIN(metadata_search_vector);
        
        -- 創(chuàng)建部分索引優(yōu)化特定查詢
        CREATE INDEX IF NOT EXISTS idx_documents_recent 
        ON documents(publication_date) 
        WHERE publication_date > CURRENT_DATE - INTERVAL '1 year';
        
        -- 創(chuàng)建BRIN索引用于時間范圍查詢
        CREATE INDEX IF NOT EXISTS idx_documents_date_brin 
        ON documents USING BRIN(publication_date);
        """
        
        try:
            self.cursor.execute(create_table_sql)
            self.conn.commit()
            logger.info("全文搜索表創(chuàng)建成功")
        except Exception as e:
            self.conn.rollback()
            logger.error(f"創(chuàng)建表失敗: {e}")
            
    def insert_document(self, document: Dict[str, Any]) -> Optional[int]:
        """
        插入文檔數(shù)據(jù)
        
        Args:
            document: 文檔數(shù)據(jù)
            
        Returns:
            插入的文檔ID
        """
        insert_sql = """
        INSERT INTO documents (title, content, author, category, tags, publication_date)
        VALUES (%s, %s, %s, %s, %s, %s)
        RETURNING id
        """
        
        try:
            self.cursor.execute(
                insert_sql,
                (
                    document.get('title'),
                    document.get('content'),
                    document.get('author'),
                    document.get('category'),
                    Json(document.get('tags', [])),
                    document.get('publication_date')
                )
            )
            doc_id = self.cursor.fetchone()['id']
            self.conn.commit()
            logger.info(f"文檔插入成功,ID: {doc_id}")
            return doc_id
        except Exception as e:
            self.conn.rollback()
            logger.error(f"插入文檔失敗: {e}")
            return None
            
    def search_documents(
        self, 
        query: str,
        categories: List[str] = None,
        start_date: str = None,
        end_date: str = None,
        min_relevance: float = 0.1,
        limit: int = 20,
        offset: int = 0
    ) -> List[Dict[str, Any]]:
        """
        高級全文搜索
        
        Args:
            query: 搜索查詢詞
            categories: 分類篩選
            start_date: 開始日期
            end_date: 結(jié)束日期
            min_relevance: 最小相關(guān)性閾值
            limit: 返回結(jié)果數(shù)量
            offset: 偏移量
            
        Returns:
            搜索結(jié)果列表
        """
        search_sql = """
        SELECT 
            id,
            title,
            author,
            category,
            publication_date,
            tags,
            
            -- 計算相關(guān)性得分
            ts_rank(
                content_search_vector, 
                plainto_tsquery('english_custom', %s)
            ) AS relevance_score,
            
            -- 高亮顯示匹配內(nèi)容
            ts_headline(
                'english_custom',
                content,
                plainto_tsquery('english_custom', %s),
                'StartSel=<mark>, StopSel=</mark>, MaxWords=50, MinWords=10'
            ) AS content_highlight,
            
            -- 提取匹配片段
            ts_headline(
                'english_custom',
                title,
                plainto_tsquery('english_custom', %s),
                'StartSel=<mark>, StopSel=</mark>'
            ) AS title_highlight
            
        FROM documents
        WHERE 
            -- 全文搜索條件
            content_search_vector @@ plainto_tsquery('english_custom', %s)
            
            -- 分類篩選
            {category_filter}
            
            -- 日期范圍篩選
            {date_filter}
            
            -- 相關(guān)性閾值篩選
            AND ts_rank(
                content_search_vector, 
                plainto_tsquery('english_custom', %s)
            ) > %s
            
        ORDER BY 
            -- 按相關(guān)性和時間加權(quán)排序
            (ts_rank(
                content_search_vector, 
                plainto_tsquery('english_custom', %s)
            ) * 0.7 + 
            (CASE WHEN publication_date > CURRENT_DATE - INTERVAL '30 days' 
                  THEN 0.3 ELSE 0 END)) DESC,
            publication_date DESC
            
        LIMIT %s OFFSET %s
        """
        
        # 構(gòu)建動態(tài)WHERE條件
        category_filter = ""
        date_filter = ""
        params = [query, query, query, query]
        
        if categories:
            placeholders = ', '.join(['%s'] * len(categories))
            category_filter = f"AND category IN ({placeholders})"
            params.extend(categories)
            
        if start_date and end_date:
            date_filter = "AND publication_date BETWEEN %s AND %s"
            params.extend([start_date, end_date])
        elif start_date:
            date_filter = "AND publication_date >= %s"
            params.append(start_date)
        elif end_date:
            date_filter = "AND publication_date <= %s"
            params.append(end_date)
            
        # 添加剩余參數(shù)
        params.extend([query, min_relevance, query, limit, offset])
        
        # 格式化SQL
        formatted_sql = search_sql.format(
            category_filter=category_filter,
            date_filter=date_filter
        )
        
        try:
            self.cursor.execute(formatted_sql, params)
            results = self.cursor.fetchall()
            
            # 轉(zhuǎn)換為字典列表
            return [
                {
                    'id': row['id'],
                    'title': row['title'],
                    'author': row['author'],
                    'category': row['category'],
                    'publication_date': row['publication_date'],
                    'tags': row['tags'],
                    'relevance_score': float(row['relevance_score']),
                    'content_highlight': row['content_highlight'],
                    'title_highlight': row['title_highlight']
                }
                for row in results
            ]
            
        except Exception as e:
            logger.error(f"搜索失敗: {e}")
            return []
            
    def search_similar_documents(self, doc_id: int, limit: int = 10) -> List[Dict[str, Any]]:
        """
        查找相似文檔(基于內(nèi)容相似度)
        
        Args:
            doc_id: 參考文檔ID
            limit: 返回結(jié)果數(shù)量
            
        Returns:
            相似文檔列表
        """
        similarity_sql = """
        WITH target_doc AS (
            SELECT content_search_vector 
            FROM documents 
            WHERE id = %s
        )
        SELECT 
            d.id,
            d.title,
            d.author,
            d.category,
            
            -- 計算余弦相似度
            (d.content_search_vector <=> td.content_search_vector) AS similarity,
            
            -- 提取共同標(biāo)簽
            (
                SELECT jsonb_agg(tag)
                FROM jsonb_array_elements_text(d.tags) AS tag
                WHERE tag IN (
                    SELECT jsonb_array_elements_text(td.tags)
                    FROM documents td 
                    WHERE td.id = %s
                )
            ) AS common_tags
            
        FROM documents d, target_doc td
        WHERE d.id != %s
        ORDER BY similarity DESC
        LIMIT %s
        """
        
        try:
            self.cursor.execute(similarity_sql, (doc_id, doc_id, doc_id, limit))
            results = self.cursor.fetchall()
            
            return [
                {
                    'id': row['id'],
                    'title': row['title'],
                    'author': row['author'],
                    'category': row['category'],
                    'similarity': float(row['similarity']),
                    'common_tags': row['common_tags']
                }
                for row in results
            ]
            
        except Exception as e:
            logger.error(f"查找相似文檔失敗: {e}")
            return []
            
    def get_search_statistics(self, time_period: str = '1 month') -> Dict[str, Any]:
        """
        獲取搜索統(tǒng)計信息
        
        Args:
            time_period: 統(tǒng)計時間周期
            
        Returns:
            統(tǒng)計信息字典
        """
        stats_sql = """
        -- 總文檔數(shù)
        SELECT COUNT(*) as total_documents FROM documents;
        
        -- 按分類統(tǒng)計
        SELECT 
            category,
            COUNT(*) as count,
            ROUND(COUNT(*) * 100.0 / (SELECT COUNT(*) FROM documents), 2) as percentage
        FROM documents 
        WHERE category IS NOT NULL
        GROUP BY category 
        ORDER BY count DESC;
        
        -- 時間分布統(tǒng)計
        SELECT 
            DATE_TRUNC('month', publication_date) as month,
            COUNT(*) as documents_count
        FROM documents
        WHERE publication_date > CURRENT_DATE - INTERVAL %s
        GROUP BY DATE_TRUNC('month', publication_date)
        ORDER BY month DESC;
        
        -- 標(biāo)簽使用統(tǒng)計
        SELECT 
            tag,
            COUNT(*) as usage_count
        FROM documents, jsonb_array_elements_text(tags) as tag
        GROUP BY tag
        ORDER BY usage_count DESC
        LIMIT 20;
        """
        
        try:
            stats = {}
            
            # 執(zhí)行多個統(tǒng)計查詢
            self.cursor.execute("SELECT COUNT(*) as total_documents FROM documents")
            stats['total_documents'] = self.cursor.fetchone()['total_documents']
            
            self.cursor.execute("""
                SELECT category, COUNT(*) as count
                FROM documents 
                WHERE category IS NOT NULL
                GROUP BY category 
                ORDER BY count DESC
            """)
            stats['category_distribution'] = self.cursor.fetchall()
            
            self.cursor.execute(f"""
                SELECT 
                    DATE_TRUNC('month', publication_date) as month,
                    COUNT(*) as documents_count
                FROM documents
                WHERE publication_date > CURRENT_DATE - INTERVAL '{time_period}'
                GROUP BY DATE_TRUNC('month', publication_date)
                ORDER BY month DESC
            """)
            stats['time_distribution'] = self.cursor.fetchall()
            
            self.cursor.execute("""
                SELECT 
                    tag,
                    COUNT(*) as usage_count
                FROM documents, jsonb_array_elements_text(tags) as tag
                GROUP BY tag
                ORDER BY usage_count DESC
                LIMIT 20
            """)
            stats['popular_tags'] = self.cursor.fetchall()
            
            return stats
            
        except Exception as e:
            logger.error(f"獲取統(tǒng)計信息失敗: {e}")
            return {}
            
    def optimize_search_indexes(self):
        """
        優(yōu)化全文搜索索引
        """
        optimize_sql = """
        -- 重新構(gòu)建GIN索引以提高搜索性能
        REINDEX INDEX CONCURRENTLY idx_documents_content_search;
        REINDEX INDEX CONCURRENTLY idx_documents_metadata_search;
        
        -- 更新表統(tǒng)計信息
        ANALYZE documents;
        
        -- 清理索引膨脹
        VACUUM ANALYZE documents;
        
        -- 更新全文搜索配置字典
        ALTER TEXT SEARCH CONFIGURATION english_custom 
        REFRESH VERSION;
        """
        
        try:
            # 分步執(zhí)行優(yōu)化操作
            self.cursor.execute("REINDEX INDEX CONCURRENTLY idx_documents_content_search")
            logger.info("內(nèi)容搜索索引重建完成")
            
            self.cursor.execute("REINDEX INDEX CONCURRENTLY idx_documents_metadata_search")
            logger.info("元數(shù)據(jù)搜索索引重建完成")
            
            self.cursor.execute("ANALYZE documents")
            logger.info("表統(tǒng)計信息更新完成")
            
            self.cursor.execute("VACUUM ANALYZE documents")
            logger.info("表清理完成")
            
            self.conn.commit()
            logger.info("全文搜索索引優(yōu)化完成")
            
        except Exception as e:
            self.conn.rollback()
            logger.error(f"索引優(yōu)化失敗: {e}")
            
    def close(self):
        """關(guān)閉數(shù)據(jù)庫連接"""
        if self.cursor:
            self.cursor.close()
        if self.conn:
            self.conn.close()
        logger.info("數(shù)據(jù)庫連接已關(guān)閉")


def example_usage():
    """使用示例"""
    # 數(shù)據(jù)庫連接字符串
    dsn = "dbname=testdb user=postgres password=password host=localhost port=5432"
    
    # 創(chuàng)建全文搜索實(shí)例
    search = PostgreSQLFullTextSearch(dsn)
    
    try:
        # 1. 創(chuàng)建全文搜索配置
        search.create_fulltext_configuration('english')
        
        # 2. 創(chuàng)建搜索表
        search.create_searchable_table()
        
        # 3. 插入示例文檔
        sample_documents = [
            {
                'title': 'PostgreSQL Performance Optimization',
                'content': 'PostgreSQL provides advanced optimization techniques including query planning, indexing strategies, and configuration tuning.',
                'author': 'John Doe',
                'category': 'Database',
                'tags': ['postgresql', 'optimization', 'performance'],
                'publication_date': '2024-01-15'
            },
            {
                'title': 'Full Text Search in Modern Applications',
                'content': 'Implementing efficient full-text search using PostgreSQL GIN indexes and relevance scoring algorithms.',
                'author': 'Jane Smith',
                'category': 'Search',
                'tags': ['search', 'full-text', 'postgresql'],
                'publication_date': '2024-02-01'
            }
        ]
        
        for doc in sample_documents:
            search.insert_document(doc)
        
        # 4. 執(zhí)行高級搜索
        print("執(zhí)行全文搜索...")
        results = search.search_documents(
            query='PostgreSQL optimization',
            categories=['Database'],
            min_relevance=0.05,
            limit=10
        )
        
        print(f"找到 {len(results)} 個結(jié)果:")
        for result in results:
            print(f"- {result['title']} (相關(guān)性: {result['relevance_score']:.3f})")
        
        # 5. 查找相似文檔
        if results:
            similar = search.search_similar_documents(results[0]['id'])
            print(f"\n相似文檔: {len(similar)} 個")
        
        # 6. 獲取統(tǒng)計信息
        stats = search.get_search_statistics()
        print(f"\n總文檔數(shù): {stats.get('total_documents', 0)}")
        
    finally:
        search.close()


if __name__ == "__main__":
    example_usage()

2.3 分區(qū)表與數(shù)據(jù)管理

PostgreSQL的分區(qū)表功能通過繼承和約束排除實(shí)現(xiàn),顯著提升大數(shù)據(jù)量查詢性能。

2.3.1 分區(qū)策略對比

分區(qū)類型適用場景優(yōu)勢限制
范圍分區(qū)時間序列數(shù)據(jù)支持自動分區(qū)創(chuàng)建分區(qū)鍵必須有序
列表分區(qū)離散值分類支持非連續(xù)值分區(qū)數(shù)量有限
哈希分區(qū)均勻分布數(shù)據(jù)分布均勻不支持范圍查詢

2.3.2 分區(qū)性能公式

分區(qū)表的查詢性能提升可以通過以下公式估算:

?

其中:

  • Tpartitioned?:分區(qū)表查詢時間
  • Tfull?:未分區(qū)表查詢時間
  • n:相關(guān)分區(qū)數(shù)量
  • Coverhead?:分區(qū)管理開銷

3. PostgreSQL性能優(yōu)化實(shí)戰(zhàn)

3.1 查詢性能分析與優(yōu)化

"""
PostgreSQL查詢性能分析與優(yōu)化工具
"""
import psycopg2
from psycopg2.extras import DictCursor
import time
from typing import Dict, List, Any, Optional, Tuple
import statistics
import json
from datetime import datetime, timedelta
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class QueryPerformanceAnalyzer:
    """查詢性能分析器"""
    
    def __init__(self, dsn: str):
        self.conn = psycopg2.connect(dsn)
        self.cursor = self.conn.cursor(cursor_factory=DictCursor)
        self.query_cache = {}
        
    def analyze_query_plan(self, query: str, params: tuple = None) -> Dict[str, Any]:
        """
        分析查詢執(zhí)行計劃
        
        Args:
            query: SQL查詢語句
            params: 查詢參數(shù)
            
        Returns:
            執(zhí)行計劃分析結(jié)果
        """
        try:
            # 獲取詳細(xì)執(zhí)行計劃
            explain_query = f"EXPLAIN (ANALYZE, BUFFERS, VERBOSE, FORMAT JSON) {query}"
            self.cursor.execute(explain_query, params)
            plan_result = self.cursor.fetchone()[0]
            
            plan = plan_result[0]['Plan']
            return self._parse_execution_plan(plan)
            
        except Exception as e:
            logger.error(f"分析執(zhí)行計劃失敗: {e}")
            return {}
    
    def _parse_execution_plan(self, plan: Dict[str, Any]) -> Dict[str, Any]:
        """
        解析執(zhí)行計劃
        
        Args:
            plan: 執(zhí)行計劃字典
            
        Returns:
            解析后的分析結(jié)果
        """
        analysis = {
            'operation_type': plan.get('Node Type'),
            'relation_name': plan.get('Relation Name'),
            'alias': plan.get('Alias'),
            'startup_cost': plan.get('Startup Cost'),
            'total_cost': plan.get('Total Cost'),
            'plan_rows': plan.get('Plan Rows'),
            'plan_width': plan.get('Plan Width'),
            'actual_rows': plan.get('Actual Rows'),
            'actual_time': plan.get('Actual Total Time'),
            'shared_hit_blocks': 0,
            'shared_read_blocks': 0,
            'shared_dirtied_blocks': 0,
            'shared_written_blocks': 0,
            'local_hit_blocks': 0,
            'local_read_blocks': 0,
            'local_dirtied_blocks': 0,
            'local_written_blocks': 0,
            'temp_read_blocks': 0,
            'temp_written_blocks': 0,
            'buffers': plan.get('Shared Hit Blocks', 0) + plan.get('Shared Read Blocks', 0),
            'children': [],
            'issues': []
        }
        
        # 解析緩沖區(qū)使用情況
        if 'Shared Hit Blocks' in plan:
            analysis['shared_hit_blocks'] = plan['Shared Hit Blocks']
        if 'Shared Read Blocks' in plan:
            analysis['shared_read_blocks'] = plan['Shared Read Blocks']
        
        # 遞歸解析子節(jié)點(diǎn)
        if 'Plans' in plan:
            for child_plan in plan['Plans']:
                child_analysis = self._parse_execution_plan(child_plan)
                analysis['children'].append(child_analysis)
        
        # 識別潛在問題
        self._identify_issues(analysis, plan)
        
        return analysis
    
    def _identify_issues(self, analysis: Dict[str, Any], plan: Dict[str, Any]):
        """識別查詢計劃中的潛在問題"""
        
        # 檢查全表掃描
        if analysis['operation_type'] == 'Seq Scan' and analysis['actual_rows'] > 10000:
            analysis['issues'].append({
                'type': 'FULL_TABLE_SCAN',
                'severity': 'HIGH',
                'message': '檢測到大量行的全表掃描',
                'suggestion': '考慮添加合適的索引'
            })
        
        # 檢查嵌套循環(huán)連接
        if analysis['operation_type'] == 'Nested Loop' and analysis['actual_rows'] > 1000:
            analysis['issues'].append({
                'type': 'INEFFICIENT_JOIN',
                'severity': 'MEDIUM',
                'message': '嵌套循環(huán)連接可能效率較低',
                'suggestion': '考慮使用Hash Join或Merge Join'
            })
        
        # 檢查排序操作
        if analysis['operation_type'] == 'Sort' and analysis['actual_rows'] > 10000:
            analysis['issues'].append({
                'type': 'LARGE_SORT',
                'severity': 'MEDIUM',
                'message': '大規(guī)模排序操作',
                'suggestion': '考慮添加索引以避免排序'
            })
        
        # 檢查緩沖區(qū)命中率
        total_blocks = analysis['shared_hit_blocks'] + analysis['shared_read_blocks']
        if total_blocks > 0:
            hit_ratio = analysis['shared_hit_blocks'] / total_blocks
            if hit_ratio < 0.9:
                analysis['issues'].append({
                    'type': 'LOW_BUFFER_HIT',
                    'severity': 'MEDIUM',
                    'message': f'緩沖區(qū)命中率較低: {hit_ratio:.2%}',
                    'suggestion': '考慮增加shared_buffers或優(yōu)化查詢'
                })
    
    def benchmark_query(self, query: str, params: tuple = None, 
                       iterations: int = 10) -> Dict[str, Any]:
        """
        基準(zhǔn)測試查詢性能
        
        Args:
            query: SQL查詢語句
            params: 查詢參數(shù)
            iterations: 迭代次數(shù)
            
        Returns:
            性能基準(zhǔn)測試結(jié)果
        """
        execution_times = []
        row_counts = []
        
        try:
            # 預(yù)熱緩存
            self.cursor.execute(query, params)
            _ = self.cursor.fetchall()
            
            # 執(zhí)行基準(zhǔn)測試
            for i in range(iterations):
                start_time = time.perf_counter()
                self.cursor.execute(query, params)
                rows = self.cursor.fetchall()
                end_time = time.perf_counter()
                
                execution_times.append(end_time - start_time)
                row_counts.append(len(rows))
            
            # 分析執(zhí)行計劃
            plan_analysis = self.analyze_query_plan(query, params)
            
            # 計算統(tǒng)計信息
            stats = {
                'iterations': iterations,
                'total_time': sum(execution_times),
                'avg_time': statistics.mean(execution_times),
                'min_time': min(execution_times),
                'max_time': max(execution_times),
                'std_dev': statistics.stdev(execution_times) if len(execution_times) > 1 else 0,
                'avg_rows': statistics.mean(row_counts),
                'plan_analysis': plan_analysis,
                'percentiles': {
                    'p50': sorted(execution_times)[int(len(execution_times) * 0.5)],
                    'p90': sorted(execution_times)[int(len(execution_times) * 0.9)],
                    'p95': sorted(execution_times)[int(len(execution_times) * 0.95)],
                    'p99': sorted(execution_times)[int(len(execution_times) * 0.99)],
                }
            }
            
            return stats
            
        except Exception as e:
            logger.error(f"基準(zhǔn)測試失敗: {e}")
            return {}
    
    def generate_optimization_suggestions(self, query: str, 
                                        stats: Dict[str, Any]) -> List[Dict[str, Any]]:
        """
        生成優(yōu)化建議
        
        Args:
            query: SQL查詢語句
            stats: 性能統(tǒng)計信息
            
        Returns:
            優(yōu)化建議列表
        """
        suggestions = []
        plan_analysis = stats.get('plan_analysis', {})
        
        # 基于執(zhí)行時間建議
        avg_time = stats.get('avg_time', 0)
        if avg_time > 1.0:  # 超過1秒
            suggestions.append({
                'priority': 'HIGH',
                'area': 'PERFORMANCE',
                'suggestion': '查詢執(zhí)行時間較長,考慮優(yōu)化查詢或添加索引',
                'estimated_impact': 'HIGH'
            })
        
        # 基于執(zhí)行計劃建議
        for issue in plan_analysis.get('issues', []):
            suggestions.append({
                'priority': issue['severity'],
                'area': 'QUERY_PLAN',
                'suggestion': issue['suggestion'],
                'estimated_impact': 'MEDIUM'
            })
        
        # 基于統(tǒng)計信息建議
        if stats.get('std_dev', 0) / stats.get('avg_time', 1) > 0.5:
            suggestions.append({
                'priority': 'MEDIUM',
                'area': 'CONSISTENCY',
                'suggestion': '查詢執(zhí)行時間波動較大,可能存在并發(fā)或資源競爭問題',
                'estimated_impact': 'MEDIUM'
            })
        
        return suggestions


class IndexOptimizer:
    """索引優(yōu)化器"""
    
    def __init__(self, dsn: str):
        self.conn = psycopg2.connect(dsn)
        self.cursor = self.conn.cursor(cursor_factory=DictCursor)
    
    def analyze_table_indexes(self, table_name: str) -> List[Dict[str, Any]]:
        """
        分析表索引
        
        Args:
            table_name: 表名
            
        Returns:
            索引分析結(jié)果
        """
        query = """
        SELECT 
            i.relname as index_name,
            am.amname as index_type,
            idx.indisunique as is_unique,
            idx.indisprimary as is_primary,
            idx.indisexclusion as is_exclusion,
            idx.indisclustered as is_clustered,
            idx.indisvalid as is_valid,
            idx.indpred as partial_index_predicate,
            pg_relation_size(i.oid) as index_size_bytes,
            pg_size_pretty(pg_relation_size(i.oid)) as index_size,
            pg_stat_get_numscans(i.oid) as scan_count,
            pg_stat_get_tuples_returned(i.oid) as tuples_returned,
            pg_stat_get_tuples_fetched(i.oid) as tuples_fetched,
            
            -- 索引定義
            pg_get_indexdef(idx.indexrelid) as index_definition,
            
            -- 索引列
            array_to_string(array_agg(a.attname), ', ') as index_columns
            
        FROM pg_index idx
        JOIN pg_class i ON i.oid = idx.indexrelid
        JOIN pg_class t ON t.oid = idx.indrelid
        JOIN pg_am am ON i.relam = am.oid
        JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(idx.indkey)
        
        WHERE t.relname = %s
        GROUP BY i.relname, am.amname, idx.indisunique, idx.indisprimary,
                 idx.indisexclusion, idx.indisclustered, idx.indisvalid,
                 idx.indpred, i.oid, idx.indexrelid
        ORDER BY pg_relation_size(i.oid) DESC
        """
        
        try:
            self.cursor.execute(query, (table_name,))
            indexes = self.cursor.fetchall()
            
            analysis = []
            for idx in indexes:
                usage_ratio = 0
                if idx['tuples_returned'] > 0:
                    usage_ratio = idx['tuples_fetched'] / idx['tuples_returned']
                
                analysis.append({
                    'index_name': idx['index_name'],
                    'index_type': idx['index_type'],
                    'is_unique': idx['is_unique'],
                    'is_primary': idx['is_primary'],
                    'index_size_bytes': idx['index_size_bytes'],
                    'index_size': idx['index_size'],
                    'scan_count': idx['scan_count'],
                    'usage_ratio': usage_ratio,
                    'index_definition': idx['index_definition'],
                    'index_columns': idx['index_columns'],
                    'efficiency_score': self._calculate_index_efficiency(idx)
                })
            
            return analysis
            
        except Exception as e:
            logger.error(f"分析索引失敗: {e}")
            return []
    
    def _calculate_index_efficiency(self, index_info: Dict[str, Any]) -> float:
        """
        計算索引效率評分
        
        Args:
            index_info: 索引信息
            
        Returns:
            效率評分 (0-100)
        """
        score = 100.0
        
        # 基于使用率扣分
        if index_info['scan_count'] == 0:
            score -= 50  # 從未使用
        
        # 基于大小扣分
        size_mb = index_info['index_size_bytes'] / (1024 * 1024)
        if size_mb > 1000:  # 超過1GB
            score -= 30
        elif size_mb > 100:  # 超過100MB
            score -= 15
        
        # 基于唯一性加分
        if index_info['is_unique']:
            score += 10
        
        return max(0, min(100, score))
    
    def suggest_index_improvements(self, table_name: str, 
                                 query_patterns: List[str]) -> List[Dict[str, Any]]:
        """
        基于查詢模式建議索引改進(jìn)
        
        Args:
            table_name: 表名
            query_patterns: 查詢模式列表
            
        Returns:
            索引改進(jìn)建議
        """
        suggestions = []
        existing_indexes = self.analyze_table_indexes(table_name)
        
        for pattern in query_patterns:
            pattern_lower = pattern.lower()
            
            # 提取WHERE子句中的列
            where_start = pattern_lower.find('where ')
            if where_start != -1:
                where_clause = pattern_lower[where_start + 6:]
                
                # 簡單提取列名(實(shí)際應(yīng)用中應(yīng)使用SQL解析器)
                import re
                column_matches = re.findall(r'(\w+)\s*[=<>!]', where_clause)
                
                for column in column_matches:
                    # 檢查是否已有索引
                    has_index = False
                    for idx in existing_indexes:
                        if column in idx['index_columns'].lower():
                            has_index = True
                            break
                    
                    if not has_index:
                        suggestions.append({
                            'table': table_name,
                            'column': column,
                            'suggestion': f'在 {column} 列上創(chuàng)建索引',
                            'estimated_impact': 'HIGH',
                            'sql': f'CREATE INDEX idx_{table_name}_{column} ON {table_name}({column})'
                        })
        
        return suggestions


class PostgreSQLConfigOptimizer:
    """PostgreSQL配置優(yōu)化器"""
    
    def __init__(self, dsn: str):
        self.conn = psycopg2.connect(dsn)
        self.cursor = self.conn.cursor(cursor_factory=DictCursor)
    
    def analyze_current_config(self) -> Dict[str, Any]:
        """
        分析當(dāng)前配置
        
        Returns:
            配置分析結(jié)果
        """
        config_queries = {
            'basic_settings': """
                SELECT name, setting, unit, context, vartype
                FROM pg_settings
                WHERE name IN (
                    'shared_buffers', 'work_mem', 'maintenance_work_mem',
                    'effective_cache_size', 'max_connections'
                )
            """,
            'performance_settings': """
                SELECT name, setting, unit, context, vartype
                FROM pg_settings
                WHERE name LIKE '%cost%' OR name LIKE '%join%' OR name LIKE '%parallel%'
                ORDER BY name
            """,
            'wal_settings': """
                SELECT name, setting, unit, context, vartype
                FROM pg_settings
                WHERE name LIKE 'wal_%'
                ORDER BY name
            """,
            'statistics': """
                SELECT 
                    datname as database_name,
                    numbackends as active_connections,
                    xact_commit as transactions_committed,
                    xact_rollback as transactions_rolled_back,
                    blks_read as blocks_read,
                    blks_hit as blocks_hit,
                    tup_returned as tuples_returned,
                    tup_fetched as tuples_fetched,
                    tup_inserted as tuples_inserted,
                    tup_updated as tuples_updated,
                    tup_deleted as tuples_deleted
                FROM pg_stat_database
                WHERE datname = current_database()
            """
        }
        
        analysis = {}
        
        try:
            for category, query in config_queries.items():
                self.cursor.execute(query)
                analysis[category] = self.cursor.fetchall()
            
            # 計算緩沖區(qū)命中率
            if 'statistics' in analysis and analysis['statistics']:
                stats = analysis['statistics'][0]
                blocks_hit = stats['blocks_hit']
                blocks_read = stats['blocks_read']
                total_blocks = blocks_hit + blocks_read
                
                if total_blocks > 0:
                    analysis['buffer_hit_ratio'] = blocks_hit / total_blocks
                else:
                    analysis['buffer_hit_ratio'] = 0
            
            return analysis
            
        except Exception as e:
            logger.error(f"分析配置失敗: {e}")
            return {}
    
    def generate_config_recommendations(self, 
                                      system_memory_gb: float = 16,
                                      expected_connections: int = 100) -> List[Dict[str, Any]]:
        """
        生成配置優(yōu)化建議
        
        Args:
            system_memory_gb: 系統(tǒng)總內(nèi)存(GB)
            expected_connections: 預(yù)期最大連接數(shù)
            
        Returns:
            配置優(yōu)化建議列表
        """
        recommendations = []
        current_config = self.analyze_current_config()
        
        # 共享緩沖區(qū)建議(通常為系統(tǒng)內(nèi)存的25%)
        recommended_shared_buffers = f"{int(system_memory_gb * 0.25 * 1024)}MB"
        recommendations.append({
            'parameter': 'shared_buffers',
            'current_value': self._get_config_value(current_config, 'shared_buffers'),
            'recommended_value': recommended_shared_buffers,
            'reason': f'設(shè)置為系統(tǒng)內(nèi)存({system_memory_gb}GB)的25%以優(yōu)化緩存性能',
            'impact': 'HIGH'
        })
        
        # 工作內(nèi)存建議
        recommended_work_mem = f"{int(system_memory_gb * 1024 / expected_connections / 4)}MB"
        recommendations.append({
            'parameter': 'work_mem',
            'current_value': self._get_config_value(current_config, 'work_mem'),
            'recommended_value': recommended_work_mem,
            'reason': f'基于{expected_connections}個并發(fā)連接和系統(tǒng)內(nèi)存計算',
            'impact': 'MEDIUM'
        })
        
        # 維護(hù)工作內(nèi)存建議
        recommended_maintenance_work_mem = f"{int(system_memory_gb * 0.1 * 1024)}MB"
        recommendations.append({
            'parameter': 'maintenance_work_mem',
            'current_value': self._get_config_value(current_config, 'maintenance_work_mem'),
            'recommended_value': recommended_maintenance_work_mem,
            'reason': '設(shè)置為系統(tǒng)內(nèi)存的10%以優(yōu)化維護(hù)操作性能',
            'impact': 'MEDIUM'
        })
        
        # 有效緩存大小建議
        recommended_effective_cache_size = f"{int(system_memory_gb * 0.5 * 1024)}MB"
        recommendations.append({
            'parameter': 'effective_cache_size',
            'current_value': self._get_config_value(current_config, 'effective_cache_size'),
            'recommended_value': recommended_effective_cache_size,
            'reason': '設(shè)置為系統(tǒng)內(nèi)存的50%以幫助查詢規(guī)劃器做出更好的決策',
            'impact': 'MEDIUM'
        })
        
        # 基于緩沖區(qū)命中率的建議
        hit_ratio = current_config.get('buffer_hit_ratio', 0)
        if hit_ratio < 0.9:
            recommendations.append({
                'parameter': 'BUFFER_HIT_RATIO',
                'current_value': f'{hit_ratio:.2%}',
                'recommended_value': '>90%',
                'reason': '緩沖區(qū)命中率較低,可能影響查詢性能',
                'impact': 'HIGH',
                'additional_suggestions': [
                    '增加shared_buffers',
                    '優(yōu)化熱點(diǎn)查詢',
                    '考慮使用pg_prewarm擴(kuò)展'
                ]
            })
        
        return recommendations
    
    def _get_config_value(self, config_analysis: Dict[str, Any], 
                         param_name: str) -> str:
        """獲取配置參數(shù)值"""
        if 'basic_settings' in config_analysis:
            for setting in config_analysis['basic_settings']:
                if setting['name'] == param_name:
                    return f"{setting['setting']} {setting['unit'] or ''}".strip()
        
        return '未找到'
    
    def close(self):
        """關(guān)閉連接"""
        if self.cursor:
            self.cursor.close()
        if self.conn:
            self.conn.close()


def comprehensive_performance_analysis(dsn: str):
    """綜合性能分析示例"""
    print("=" * 60)
    print("PostgreSQL性能綜合分析")
    print("=" * 60)
    
    # 1. 查詢性能分析
    print("\n1. 查詢性能分析")
    print("-" * 40)
    
    analyzer = QueryPerformanceAnalyzer(dsn)
    
    test_query = """
    SELECT 
        u.username,
        COUNT(o.id) as order_count,
        SUM(o.amount) as total_amount,
        AVG(o.amount) as avg_amount
    FROM users u
    JOIN orders o ON u.id = o.user_id
    WHERE u.created_at > CURRENT_DATE - INTERVAL '1 year'
    GROUP BY u.id, u.username
    HAVING COUNT(o.id) > 5
    ORDER BY total_amount DESC
    LIMIT 100
    """
    
    benchmark_results = analyzer.benchmark_query(test_query, iterations=5)
    
    if benchmark_results:
        print(f"平均執(zhí)行時間: {benchmark_results['avg_time']:.3f}秒")
        print(f"最小執(zhí)行時間: {benchmark_results['min_time']:.3f}秒")
        print(f"最大執(zhí)行時間: {benchmark_results['max_time']:.3f}秒")
        print(f"標(biāo)準(zhǔn)差: {benchmark_results['std_dev']:.3f}秒")
        
        # 生成優(yōu)化建議
        suggestions = analyzer.generate_optimization_suggestions(
            test_query, benchmark_results
        )
        
        print(f"\n優(yōu)化建議 ({len(suggestions)}條):")
        for i, suggestion in enumerate(suggestions, 1):
            print(f"{i}. [{suggestion['priority']}] {suggestion['suggestion']}")
    
    # 2. 索引分析
    print("\n2. 索引分析")
    print("-" * 40)
    
    index_optimizer = IndexOptimizer(dsn)
    table_name = "users"  # 假設(shè)的表名
    
    index_analysis = index_optimizer.analyze_table_indexes(table_name)
    
    if index_analysis:
        print(f"表 '{table_name}' 的索引分析:")
        for idx in index_analysis[:5]:  # 顯示前5個索引
            print(f"  - {idx['index_name']}: {idx['index_size']}, "
                  f"使用率: {idx['usage_ratio']:.2f}, "
                  f"效率評分: {idx['efficiency_score']:.1f}")
    
    # 3. 配置分析
    print("\n3. 配置分析")
    print("-" * 40)
    
    config_optimizer = PostgreSQLConfigOptimizer(dsn)
    config_recommendations = config_optimizer.generate_config_recommendations(
        system_memory_gb=16,
        expected_connections=200
    )
    
    print("配置優(yōu)化建議:")
    for rec in config_recommendations:
        print(f"  - {rec['parameter']}: {rec['current_value']} → {rec['recommended_value']}")
    
    # 4. 綜合報告
    print("\n4. 綜合性能報告")
    print("-" * 40)
    
    overall_score = 85.0  # 示例評分
    bottlenecks = [
        "查詢響應(yīng)時間波動較大",
        "部分索引使用率較低",
        "緩沖區(qū)命中率需要優(yōu)化"
    ]
    
    print(f"總體性能評分: {overall_score}/100")
    print("\n主要瓶頸:")
    for bottleneck in bottlenecks:
        print(f"  ? {bottleneck}")
    
    print("\n優(yōu)化優(yōu)先級:")
    print("  1. 優(yōu)化高響應(yīng)時間查詢")
    print("  2. 調(diào)整數(shù)據(jù)庫配置參數(shù)")
    print("  3. 重建低效率索引")
    
    # 清理資源
    analyzer.cursor.close()
    analyzer.conn.close()
    config_optimizer.close()


if __name__ == "__main__":
    # 數(shù)據(jù)庫連接字符串
    dsn = "dbname=testdb user=postgres password=password host=localhost port=5432"
    
    comprehensive_performance_analysis(dsn)

3.2 索引優(yōu)化策略

3.2.1 索引類型選擇矩陣

3.2.2 復(fù)合索引設(shè)計原則

復(fù)合索引的列順序設(shè)計遵循最左前綴原則,選擇性公式為:

選擇性=不同值數(shù)量/總行數(shù)?

索引設(shè)計優(yōu)先級:

  • 高選擇性列(接近1.0)放在前面
  • 經(jīng)常用于WHERE條件的列
  • 用于ORDER BY的列
  • 用于GROUP BY的列

3.3 并發(fā)控制與鎖優(yōu)化

PostgreSQL采用MVCC(多版本并發(fā)控制)機(jī)制,提供多種隔離級別和鎖類型:

"""
PostgreSQL并發(fā)控制與鎖優(yōu)化示例
"""
import psycopg2
from psycopg2.extras import DictCursor
import threading
import time
from typing import List, Dict, Any
import logging
from datetime import datetime

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class ConcurrentTransactionTest:
    """并發(fā)事務(wù)測試"""
    
    def __init__(self, dsn: str):
        self.dsn = dsn
        self.results = []
        self.lock = threading.Lock()
    
    def run_concurrent_updates(self, num_threads: int = 10):
        """
        運(yùn)行并發(fā)更新測試
        
        Args:
            num_threads: 并發(fā)線程數(shù)
        """
        print(f"開始并發(fā)更新測試,線程數(shù): {num_threads}")
        print("=" * 60)
        
        threads = []
        
        # 初始化測試數(shù)據(jù)
        self._setup_test_data()
        
        # 創(chuàng)建并啟動線程
        for i in range(num_threads):
            thread = threading.Thread(
                target=self._update_account_balance,
                args=(i, 100 + i, 50.0),  # 賬戶ID從100開始
                name=f"Transaction-{i}"
            )
            threads.append(thread)
            thread.start()
        
        # 等待所有線程完成
        for thread in threads:
            thread.join()
        
        # 驗(yàn)證結(jié)果
        self._verify_results()
        
        print(f"\n測試完成,總事務(wù)數(shù): {num_threads}")
        print(f"成功事務(wù): {len([r for r in self.results if r['success']])}")
        print(f"失敗事務(wù): {len([r for r in self.results if not r['success']])}")
    
    def _setup_test_data(self):
        """設(shè)置測試數(shù)據(jù)"""
        conn = psycopg2.connect(self.dsn)
        cursor = conn.cursor()
        
        try:
            # 創(chuàng)建測試表
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS accounts (
                    id SERIAL PRIMARY KEY,
                    account_number VARCHAR(50) UNIQUE,
                    balance DECIMAL(15, 2) DEFAULT 0.0,
                    version INTEGER DEFAULT 0,
                    last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                );
            """)
            
            # 插入測試賬戶
            for i in range(10):
                account_id = 100 + i
                cursor.execute("""
                    INSERT INTO accounts (id, account_number, balance, version)
                    VALUES (%s, %s, %s, %s)
                    ON CONFLICT (id) DO UPDATE SET
                        balance = EXCLUDED.balance,
                        version = EXCLUDED.version
                """, (account_id, f"ACC{account_id}", 1000.0, 0))
            
            conn.commit()
            logger.info("測試數(shù)據(jù)設(shè)置完成")
            
        except Exception as e:
            conn.rollback()
            logger.error(f"設(shè)置測試數(shù)據(jù)失敗: {e}")
        finally:
            cursor.close()
            conn.close()
    
    def _update_account_balance(self, thread_id: int, account_id: int, amount: float):
        """
        更新賬戶余額
        
        Args:
            thread_id: 線程ID
            account_id: 賬戶ID
            amount: 更新金額
        """
        conn = None
        cursor = None
        
        try:
            conn = psycopg2.connect(self.dsn)
            cursor = conn.cursor()
            
            # 設(shè)置事務(wù)隔離級別(可測試不同級別)
            cursor.execute("SET TRANSACTION ISOLATION LEVEL READ COMMITTED")
            
            start_time = time.time()
            
            # 方法1: 使用行級鎖
            # cursor.execute("""
            #     SELECT balance FROM accounts 
            #     WHERE id = %s FOR UPDATE
            # """, (account_id,))
            
            # 方法2: 樂觀鎖(版本控制)
            cursor.execute("""
                SELECT balance, version FROM accounts 
                WHERE id = %s
            """, (account_id,))
            
            result = cursor.fetchone()
            if not result:
                raise Exception(f"賬戶 {account_id} 不存在")
            
            current_balance, current_version = result
            new_balance = current_balance + amount
            
            # 模擬一些處理時間
            time.sleep(0.01)
            
            # 使用樂觀鎖更新
            cursor.execute("""
                UPDATE accounts 
                SET balance = %s, 
                    version = version + 1,
                    last_updated = CURRENT_TIMESTAMP
                WHERE id = %s AND version = %s
            """, (new_balance, account_id, current_version))
            
            # 檢查是否更新成功
            if cursor.rowcount == 0:
                # 樂觀鎖沖突,重試或失敗
                conn.rollback()
                success = False
                error_msg = "樂觀鎖沖突,版本不匹配"
            else:
                conn.commit()
                success = True
                error_msg = None
            
            end_time = time.time()
            duration = end_time - start_time
            
            # 記錄結(jié)果
            with self.lock:
                self.results.append({
                    'thread_id': thread_id,
                    'account_id': account_id,
                    'success': success,
                    'duration': duration,
                    'error': error_msg,
                    'start_time': start_time,
                    'end_time': end_time
                })
            
            if success:
                logger.debug(f"線程 {thread_id}: 賬戶 {account_id} 更新成功,耗時 {duration:.3f}秒")
            else:
                logger.warning(f"線程 {thread_id}: 賬戶 {account_id} 更新失敗 - {error_msg}")
            
        except Exception as e:
            if conn:
                conn.rollback()
            
            with self.lock:
                self.results.append({
                    'thread_id': thread_id,
                    'account_id': account_id,
                    'success': False,
                    'duration': time.time() - start_time if 'start_time' in locals() else 0,
                    'error': str(e),
                    'start_time': start_time if 'start_time' in locals() else 0,
                    'end_time': time.time()
                })
            
            logger.error(f"線程 {thread_id} 異常: {e}")
            
        finally:
            if cursor:
                cursor.close()
            if conn:
                conn.close()
    
    def _verify_results(self):
        """驗(yàn)證測試結(jié)果"""
        conn = psycopg2.connect(self.dsn)
        cursor = conn.cursor()
        
        try:
            # 檢查賬戶最終狀態(tài)
            cursor.execute("""
                SELECT id, balance, version, last_updated
                FROM accounts
                WHERE id >= 100 AND id < 110
                ORDER BY id
            """)
            
            accounts = cursor.fetchall()
            
            print("\n賬戶最終狀態(tài):")
            print("-" * 60)
            print(f"{'賬戶ID':<10} {'余額':<15} {'版本':<10} {'最后更新'}")
            print("-" * 60)
            
            for acc in accounts:
                print(f"{acc[0]:<10} {float(acc[1]):<15.2f} {acc[2]:<10} {acc[3]}")
            
            # 分析并發(fā)問題
            success_count = len([r for r in self.results if r['success']])
            total_count = len(self.results)
            
            if success_count < total_count:
                print(f"\n發(fā)現(xiàn)并發(fā)問題: {total_count - success_count} 個事務(wù)失敗")
                print("\n失敗事務(wù)詳情:")
                for result in self.results:
                    if not result['success']:
                        print(f"  線程 {result['thread_id']}: {result['error']}")
            
            # 計算性能指標(biāo)
            if self.results:
                durations = [r['duration'] for r in self.results if r['success']]
                if durations:
                    avg_duration = sum(durations) / len(durations)
                    max_duration = max(durations)
                    min_duration = min(durations)
                    
                    print(f"\n性能指標(biāo):")
                    print(f"  平均事務(wù)時間: {avg_duration:.3f}秒")
                    print(f"  最長事務(wù)時間: {max_duration:.3f}秒")
                    print(f"  最短事務(wù)時間: {min_duration:.3f}秒")
                    print(f"  吞吐量: {success_count / sum(durations):.2f} 事務(wù)/秒")
            
        except Exception as e:
            logger.error(f"驗(yàn)證結(jié)果失敗: {e}")
        finally:
            cursor.close()
            conn.close()


class LockMonitor:
    """鎖監(jiān)控器"""
    
    def __init__(self, dsn: str):
        self.dsn = dsn
    
    def get_current_locks(self) -> List[Dict[str, Any]]:
        """
        獲取當(dāng)前鎖信息
        
        Returns:
            鎖信息列表
        """
        conn = psycopg2.connect(self.dsn)
        cursor = conn.cursor(cursor_factory=DictCursor)
        
        try:
            lock_query = """
            SELECT 
                -- 鎖信息
                pl.pid as process_id,
                pl.mode as lock_mode,
                pl.granted as is_granted,
                pl.fastpath as is_fastpath,
                
                -- 事務(wù)信息
                pa.query as current_query,
                pa.state as query_state,
                pa.wait_event_type as wait_event_type,
                pa.wait_event as wait_event,
                pa.backend_start as backend_start_time,
                pa.xact_start as transaction_start_time,
                pa.query_start as query_start_time,
                
                -- 被鎖對象信息
                pl.relation::regclass as locked_relation,
                pl.page as locked_page,
                pl.tuple as locked_tuple,
                pl.virtualxid as virtual_transaction_id,
                pl.transactionid as transaction_id,
                pl.classid::regclass as locked_class,
                pl.objid as locked_object_id,
                pl.objsubid as locked_object_subid,
                
                -- 等待圖信息
                pg_blocking_pids(pl.pid) as blocking_pids,
                
                -- 附加信息
                now() - pa.query_start as query_duration,
                now() - pa.xact_start as transaction_duration
                
            FROM pg_locks pl
            LEFT JOIN pg_stat_activity pa ON pl.pid = pa.pid
            WHERE pl.pid <> pg_backend_pid()  -- 排除當(dāng)前連接
            ORDER BY 
                pl.granted DESC,  -- 先顯示未授予的鎖
                transaction_duration DESC,
                query_duration DESC
            """
            
            cursor.execute(lock_query)
            locks = cursor.fetchall()
            
            return [
                {
                    'process_id': lock['process_id'],
                    'lock_mode': lock['lock_mode'],
                    'is_granted': lock['is_granted'],
                    'current_query': lock['current_query'][:100] if lock['current_query'] else None,
                    'query_state': lock['query_state'],
                    'locked_relation': lock['locked_relation'],
                    'blocking_pids': lock['blocking_pids'],
                    'query_duration': lock['query_duration'],
                    'transaction_duration': lock['transaction_duration'],
                    'wait_event': lock['wait_event']
                }
                for lock in locks
            ]
            
        except Exception as e:
            logger.error(f"獲取鎖信息失敗: {e}")
            return []
        finally:
            cursor.close()
            conn.close()
    
    def analyze_lock_contention(self) -> Dict[str, Any]:
        """
        分析鎖爭用情況
        
        Returns:
            鎖爭用分析報告
        """
        locks = self.get_current_locks()
        
        analysis = {
            'total_locks': len(locks),
            'granted_locks': len([l for l in locks if l['is_granted']]),
            'waiting_locks': len([l for l in locks if not l['is_granted']]),
            'lock_modes': {},
            'wait_chains': [],
            'long_running_transactions': [],
            'potential_deadlocks': []
        }
        
        # 統(tǒng)計鎖模式
        for lock in locks:
            mode = lock['lock_mode']
            analysis['lock_modes'][mode] = analysis['lock_modes'].get(mode, 0) + 1
        
        # 識別等待鏈
        waiting_processes = [l for l in locks if not l['is_granted']]
        for waiter in waiting_processes:
            if waiter['blocking_pids']:
                chain = {
                    'waiting_pid': waiter['process_id'],
                    'blocking_pids': waiter['blocking_pids'],
                    'lock_mode': waiter['lock_mode'],
                    'wait_time': waiter['query_duration']
                }
                analysis['wait_chains'].append(chain)
        
        # 識別長時間運(yùn)行的事務(wù)
        for lock in locks:
            if lock['transaction_duration'] and lock['transaction_duration'].total_seconds() > 60:
                analysis['long_running_transactions'].append({
                    'pid': lock['process_id'],
                    'duration_seconds': lock['transaction_duration'].total_seconds(),
                    'query': lock['current_query']
                })
        
        return analysis
    
    def kill_blocking_processes(self, threshold_seconds: int = 300):
        """
        終止阻塞時間過長的進(jìn)程
        
        Args:
            threshold_seconds: 阻塞時間閾值(秒)
        """
        conn = psycopg2.connect(self.dsn)
        cursor = conn.cursor()
        
        try:
            # 查找阻塞時間過長的進(jìn)程
            kill_query = """
            SELECT pid, query, now() - xact_start as duration
            FROM pg_stat_activity
            WHERE pid IN (
                SELECT DISTINCT unnest(pg_blocking_pids(pid))
                FROM pg_stat_activity
                WHERE wait_event_type = 'Lock'
                AND state = 'active'
                AND now() - query_start > INTERVAL '%s seconds'
            )
            AND state = 'active'
            """
            
            cursor.execute(kill_query % threshold_seconds)
            blocking_processes = cursor.fetchall()
            
            killed = []
            for proc in blocking_processes:
                pid, query, duration = proc
                try:
                    cursor.execute("SELECT pg_terminate_backend(%s)", (pid,))
                    killed.append({
                        'pid': pid,
                        'duration': duration,
                        'query': query[:100] if query else None
                    })
                    logger.warning(f"終止阻塞進(jìn)程 {pid},已運(yùn)行 {duration}")
                except Exception as e:
                    logger.error(f"終止進(jìn)程 {pid} 失敗: {e}")
            
            conn.commit()
            return killed
            
        except Exception as e:
            conn.rollback()
            logger.error(f"終止阻塞進(jìn)程失敗: {e}")
            return []
        finally:
            cursor.close()
            conn.close()


def test_concurrency_scenarios():
    """測試不同并發(fā)場景"""
    dsn = "dbname=testdb user=postgres password=password host=localhost port=5432"
    
    print("并發(fā)控制測試")
    print("=" * 60)
    
    # 場景1:高并發(fā)更新
    print("\n場景1: 高并發(fā)更新測試")
    test1 = ConcurrentTransactionTest(dsn)
    test1.run_concurrent_updates(num_threads=20)
    
    # 場景2:鎖監(jiān)控
    print("\n\n場景2: 鎖監(jiān)控分析")
    print("-" * 40)
    
    monitor = LockMonitor(dsn)
    lock_analysis = monitor.analyze_lock_contention()
    
    print(f"總鎖數(shù): {lock_analysis['total_locks']}")
    print(f"已授予鎖: {lock_analysis['granted_locks']}")
    print(f"等待鎖: {lock_analysis['waiting_locks']}")
    
    if lock_analysis['wait_chains']:
        print("\n等待鏈:")
        for chain in lock_analysis['wait_chains'][:5]:  # 顯示前5個
            print(f"  進(jìn)程 {chain['waiting_pid']} 等待 {chain['blocking_pids']}")
    
    if lock_analysis['long_running_transactions']:
        print("\n長時間運(yùn)行事務(wù):")
        for txn in lock_analysis['long_running_transactions'][:3]:
            print(f"  進(jìn)程 {txn['pid']}: 已運(yùn)行 {txn['duration_seconds']:.0f}秒")
    
    # 場景3:死鎖處理建議
    print("\n\n場景3: 死鎖預(yù)防建議")
    print("-" * 40)
    
    recommendations = [
        "1. 使用合適的索引減少鎖競爭范圍",
        "2. 保持事務(wù)簡短,盡快提交",
        "3. 使用顯式鎖(SELECT FOR UPDATE)時按固定順序訪問資源",
        "4. 設(shè)置合理的鎖超時(lock_timeout)",
        "5. 考慮使用樂觀鎖(版本控制)替代悲觀鎖",
        "6. 使用較低的隔離級別(READ COMMITTED)",
        "7. 監(jiān)控和調(diào)整max_connections參數(shù)",
        "8. 定期分析并優(yōu)化長時間運(yùn)行的事務(wù)"
    ]
    
    for rec in recommendations:
        print(f"  ? {rec}")


if __name__ == "__main__":
    test_concurrency_scenarios()

4. 高級特性綜合應(yīng)用

4.1 完整示例:電商系統(tǒng)數(shù)據(jù)庫設(shè)計

"""
電商系統(tǒng)PostgreSQL高級特性綜合應(yīng)用示例
"""
import psycopg2
from psycopg2.extras import Json, DictCursor
import json
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
import logging
from decimal import Decimal

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class ECommerceDatabase:
    """電商系統(tǒng)數(shù)據(jù)庫設(shè)計"""
    
    def __init__(self, dsn: str):
        self.conn = psycopg2.connect(dsn)
        self.cursor = self.conn.cursor(cursor_factory=DictCursor)
    
    def create_schema(self):
        """創(chuàng)建電商系統(tǒng)數(shù)據(jù)庫架構(gòu)"""
        schema_sql = """
        -- 啟用必要擴(kuò)展
        CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
        CREATE EXTENSION IF NOT EXISTS "pg_trgm";
        CREATE EXTENSION IF NOT EXISTS "btree_gin";
        
        -- 1. 產(chǎn)品表(使用JSONB存儲變體屬性)
        CREATE TABLE IF NOT EXISTS products (
            id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
            sku VARCHAR(100) UNIQUE NOT NULL,
            name VARCHAR(500) NOT NULL,
            description TEXT,
            category_id UUID,
            brand VARCHAR(200),
            
            -- JSONB存儲動態(tài)屬性
            attributes JSONB DEFAULT '{}',
            
            -- 價格信息
            base_price DECIMAL(12, 2) NOT NULL,
            discount_price DECIMAL(12, 2),
            
            -- 庫存信息
            stock_quantity INTEGER DEFAULT 0,
            reserved_quantity INTEGER DEFAULT 0,
            
            -- 搜索優(yōu)化字段
            search_vector TSVECTOR GENERATED ALWAYS AS (
                setweight(to_tsvector('english', coalesce(name, '')), 'A') ||
                setweight(to_tsvector('english', coalesce(description, '')), 'B') ||
                setweight(to_tsvector('english', coalesce(brand, '')), 'C')
            ) STORED,
            
            -- 時間戳
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            
            -- 約束
            CONSTRAINT positive_price CHECK (base_price >= 0),
            CONSTRAINT positive_stock CHECK (stock_quantity >= 0),
            CONSTRAINT valid_discount CHECK (
                discount_price IS NULL OR 
                (discount_price >= 0 AND discount_price <= base_price)
            )
        );
        
        -- 產(chǎn)品表索引
        CREATE INDEX IF NOT EXISTS idx_products_sku ON products(sku);
        CREATE INDEX IF NOT EXISTS idx_products_category ON products(category_id);
        CREATE INDEX IF NOT EXISTS idx_products_price ON products(base_price);
        CREATE INDEX IF NOT EXISTS idx_products_search ON products USING GIN(search_vector);
        CREATE INDEX IF NOT EXISTS idx_products_attributes ON products USING GIN(attributes);
        CREATE INDEX IF NOT EXISTS idx_products_brand ON products(brand);
        
        -- 2. 產(chǎn)品變體表(范圍類型用于尺寸)
        CREATE TABLE IF NOT EXISTS product_variants (
            id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
            product_id UUID REFERENCES products(id) ON DELETE CASCADE,
            variant_name VARCHAR(200) NOT NULL,
            
            -- 使用范圍類型表示尺寸范圍
            size_range numrange,
            weight_range numrange,
            
            -- JSONB存儲變體特定屬性
            variant_attributes JSONB DEFAULT '{}',
            
            -- 價格調(diào)整
            price_adjustment DECIMAL(10, 2) DEFAULT 0,
            additional_cost DECIMAL(10, 2) DEFAULT 0,
            
            -- 庫存跟蹤
            variant_stock INTEGER DEFAULT 0,
            min_order_quantity INTEGER DEFAULT 1,
            max_order_quantity INTEGER,
            
            -- 約束
            CONSTRAINT valid_size_range CHECK (
                size_range IS NULL OR 
                (lower(size_range) >= 0 AND upper(size_range) > lower(size_range))
            ),
            CONSTRAINT valid_quantity CHECK (
                min_order_quantity > 0 AND 
                (max_order_quantity IS NULL OR max_order_quantity >= min_order_quantity)
            )
        );
        
        -- 變體表索引
        CREATE INDEX IF NOT EXISTS idx_variants_product ON product_variants(product_id);
        CREATE INDEX IF NOT EXISTS idx_variants_size ON product_variants USING GIST(size_range);
        
        -- 3. 分類表(使用遞歸CTE支持層級結(jié)構(gòu))
        CREATE TABLE IF NOT EXISTS categories (
            id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
            name VARCHAR(200) NOT NULL,
            slug VARCHAR(200) UNIQUE NOT NULL,
            description TEXT,
            parent_id UUID REFERENCES categories(id),
            sort_order INTEGER DEFAULT 0,
            
            -- JSONB存儲分類屬性
            category_attributes JSONB DEFAULT '{}',
            
            -- 層級路徑(物化路徑模式)
            path VARCHAR(1000),
            level INTEGER DEFAULT 0,
            
            -- 時間戳
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            
            -- 約束
            CONSTRAINT no_self_parent CHECK (id != parent_id)
        );
        
        -- 分類表索引
        CREATE INDEX IF NOT EXISTS idx_categories_parent ON categories(parent_id);
        CREATE INDEX IF NOT EXISTS idx_categories_path ON categories(path);
        CREATE INDEX IF NOT EXISTS idx_categories_slug ON categories(slug);
        
        -- 4. 訂單表(使用分區(qū)表)
        CREATE TABLE IF NOT EXISTS orders_partitioned (
            id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
            order_number VARCHAR(50) UNIQUE NOT NULL,
            customer_id UUID NOT NULL,
            status VARCHAR(50) NOT NULL,
            
            -- JSONB存儲訂單元數(shù)據(jù)
            order_metadata JSONB DEFAULT '{}',
            
            -- 金額信息
            subtotal DECIMAL(12, 2) NOT NULL,
            tax_amount DECIMAL(12, 2) DEFAULT 0,
            shipping_amount DECIMAL(12, 2) DEFAULT 0,
            discount_amount DECIMAL(12, 2) DEFAULT 0,
            total_amount DECIMAL(12, 2) NOT NULL,
            
            -- 時間信息
            ordered_at TIMESTAMP NOT NULL,
            shipped_at TIMESTAMP,
            delivered_at TIMESTAMP,
            
            -- 約束
            CONSTRAINT positive_amounts CHECK (
                subtotal >= 0 AND
                tax_amount >= 0 AND
                shipping_amount >= 0 AND
                discount_amount >= 0 AND
                total_amount >= 0
            )
        ) PARTITION BY RANGE (ordered_at);
        
        -- 創(chuàng)建訂單分區(qū)(每月一個分區(qū))
        CREATE TABLE IF NOT EXISTS orders_2024_01 
        PARTITION OF orders_partitioned
        FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
        
        CREATE TABLE IF NOT EXISTS orders_2024_02 
        PARTITION OF orders_partitioned
        FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');
        
        -- 訂單表索引
        CREATE INDEX IF NOT EXISTS idx_orders_customer ON orders_partitioned(customer_id);
        CREATE INDEX IF NOT EXISTS idx_orders_status ON orders_partitioned(status);
        CREATE INDEX IF NOT EXISTS idx_orders_date ON orders_partitioned(ordered_at);
        CREATE INDEX IF NOT EXISTS idx_orders_metadata ON orders_partitioned USING GIN(order_metadata);
        
        -- 5. 訂單項(xiàng)表
        CREATE TABLE IF NOT EXISTS order_items (
            id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
            order_id UUID REFERENCES orders_partitioned(id) ON DELETE CASCADE,
            product_id UUID REFERENCES products(id),
            variant_id UUID REFERENCES product_variants(id),
            
            -- 產(chǎn)品快照(避免產(chǎn)品信息變更影響歷史訂單)
            product_snapshot JSONB NOT NULL,
            
            -- 購買信息
            quantity INTEGER NOT NULL,
            unit_price DECIMAL(12, 2) NOT NULL,
            discount_percentage DECIMAL(5, 2) DEFAULT 0,
            item_total DECIMAL(12, 2) NOT NULL,
            
            -- 約束
            CONSTRAINT positive_quantity CHECK (quantity > 0),
            CONSTRAINT positive_unit_price CHECK (unit_price >= 0)
        );
        
        -- 訂單項(xiàng)索引
        CREATE INDEX IF NOT EXISTS idx_order_items_order ON order_items(order_id);
        CREATE INDEX IF NOT EXISTS idx_order_items_product ON order_items(product_id);
        
        -- 6. 客戶表
        CREATE TABLE IF NOT EXISTS customers (
            id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
            email VARCHAR(255) UNIQUE NOT NULL,
            phone VARCHAR(50),
            first_name VARCHAR(100),
            last_name VARCHAR(100),
            
            -- JSONB存儲客戶屬性
            customer_profile JSONB DEFAULT '{}',
            
            -- 地址信息(使用JSONB存儲多個地址)
            addresses JSONB DEFAULT '[]',
            
            -- 賬戶信息
            is_active BOOLEAN DEFAULT TRUE,
            loyalty_points INTEGER DEFAULT 0,
            customer_tier VARCHAR(50) DEFAULT 'STANDARD',
            
            -- 時間戳
            registered_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            last_login_at TIMESTAMP,
            
            -- 約束
            CONSTRAINT valid_email CHECK (email ~* '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Z|a-z]{2,}$')
        );
        
        -- 客戶表索引
        CREATE INDEX IF NOT EXISTS idx_customers_email ON customers(email);
        CREATE INDEX IF NOT EXISTS idx_customers_name ON customers(last_name, first_name);
        CREATE INDEX IF NOT EXISTS idx_customers_profile ON customers USING GIN(customer_profile);
        CREATE INDEX IF NOT EXISTS idx_customers_tier ON customers(customer_tier);
        
        -- 7. 庫存變更日志(使用BRIN索引)
        CREATE TABLE IF NOT EXISTS inventory_logs (
            id BIGSERIAL PRIMARY KEY,
            product_id UUID REFERENCES products(id),
            variant_id UUID REFERENCES product_variants(id),
            
            -- 變更信息
            change_type VARCHAR(50) NOT NULL,
            quantity_change INTEGER NOT NULL,
            previous_quantity INTEGER,
            new_quantity INTEGER,
            
            -- 關(guān)聯(lián)信息
            reference_id UUID,
            reference_type VARCHAR(100),
            
            -- 時間戳
            changed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            changed_by UUID,
            
            -- 備注
            notes TEXT
        ) WITH (fillfactor = 90);
        
        -- 庫存日志索引(使用BRIN適合時間序列)
        CREATE INDEX IF NOT EXISTS idx_inventory_logs_time 
        ON inventory_logs USING BRIN(changed_at);
        
        CREATE INDEX IF NOT EXISTS idx_inventory_logs_product 
        ON inventory_logs(product_id, changed_at);
        
        -- 8. 產(chǎn)品評論表(使用數(shù)組和全文搜索)
        CREATE TABLE IF NOT EXISTS product_reviews (
            id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
            product_id UUID REFERENCES products(id) ON DELETE CASCADE,
            customer_id UUID REFERENCES customers(id),
            order_item_id UUID REFERENCES order_items(id),
            
            -- 評分
            rating INTEGER NOT NULL CHECK (rating >= 1 AND rating <= 5),
            
            -- 評論內(nèi)容
            title VARCHAR(500),
            review_text TEXT NOT NULL,
            
            -- 數(shù)組存儲優(yōu)點(diǎn)/缺點(diǎn)
            pros TEXT[],
            cons TEXT[],
            
            -- 元數(shù)據(jù)
            is_verified_purchase BOOLEAN DEFAULT FALSE,
            helpful_votes INTEGER DEFAULT 0,
            not_helpful_votes INTEGER DEFAULT 0,
            
            -- 全文搜索向量
            review_vector TSVECTOR GENERATED ALWAYS AS (
                to_tsvector('english', 
                    coalesce(title, '') || ' ' || 
                    coalesce(review_text, '')
                )
            ) STORED,
            
            -- 時間戳
            reviewed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            
            -- 約束
            CONSTRAINT one_review_per_order_item UNIQUE (order_item_id)
        );
        
        -- 評論表索引
        CREATE INDEX IF NOT EXISTS idx_reviews_product ON product_reviews(product_id);
        CREATE INDEX IF NOT EXISTS idx_reviews_rating ON product_reviews(rating);
        CREATE INDEX IF NOT EXISTS idx_reviews_search ON product_reviews USING GIN(review_vector);
        CREATE INDEX IF NOT EXISTS idx_reviews_pros ON product_reviews USING GIN(pros);
        CREATE INDEX IF NOT EXISTS idx_reviews_date ON product_reviews(reviewed_at);
        
        -- 9. 促銷規(guī)則表(使用復(fù)雜約束和JSONB)
        CREATE TABLE IF NOT EXISTS promotion_rules (
            id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
            name VARCHAR(200) NOT NULL,
            description TEXT,
            
            -- 規(guī)則條件(JSONB存儲復(fù)雜規(guī)則)
            conditions JSONB NOT NULL,
            
            -- 折扣信息
            discount_type VARCHAR(50) NOT NULL,
            discount_value DECIMAL(10, 2),
            discount_percentage DECIMAL(5, 2),
            max_discount_amount DECIMAL(12, 2),
            
            -- 時間范圍
            valid_from TIMESTAMP NOT NULL,
            valid_until TIMESTAMP,
            
            -- 使用限制
            usage_limit INTEGER,
            per_customer_limit INTEGER,
            minimum_order_amount DECIMAL(12, 2),
            
            -- 狀態(tài)
            is_active BOOLEAN DEFAULT TRUE,
            
            -- 元數(shù)據(jù)
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            
            -- 約束
            CONSTRAINT valid_discount_values CHECK (
                (discount_type = 'AMOUNT' AND discount_value IS NOT NULL) OR
                (discount_type = 'PERCENTAGE' AND discount_percentage IS NOT NULL)
            ),
            CONSTRAINT valid_discount_percentage CHECK (
                discount_percentage IS NULL OR 
                (discount_percentage >= 0 AND discount_percentage <= 100)
            )
        );
        
        -- 促銷規(guī)則索引
        CREATE INDEX IF NOT EXISTS idx_promotions_active 
        ON promotion_rules(is_active, valid_from, valid_until);
        
        CREATE INDEX IF NOT EXISTS idx_promotions_conditions 
        ON promotion_rules USING GIN(conditions);
        
        -- 10. 物化視圖:產(chǎn)品統(tǒng)計
        CREATE MATERIALIZED VIEW IF NOT EXISTS product_statistics AS
        SELECT 
            p.id as product_id,
            p.name as product_name,
            p.category_id,
            p.base_price,
            
            -- 銷售統(tǒng)計
            COUNT(DISTINCT oi.order_id) as total_orders,
            SUM(oi.quantity) as total_units_sold,
            SUM(oi.item_total) as total_revenue,
            
            -- 庫存統(tǒng)計
            p.stock_quantity,
            p.reserved_quantity,
            p.stock_quantity - p.reserved_quantity as available_quantity,
            
            -- 評分統(tǒng)計
            COALESCE(AVG(pr.rating), 0) as average_rating,
            COUNT(pr.id) as review_count,
            
            -- 時間統(tǒng)計
            MAX(o.ordered_at) as last_sale_date
            
        FROM products p
        LEFT JOIN order_items oi ON p.id = oi.product_id
        LEFT JOIN orders_partitioned o ON oi.order_id = o.id
        LEFT JOIN product_reviews pr ON p.id = pr.product_id
        
        GROUP BY p.id, p.name, p.category_id, p.base_price,
                 p.stock_quantity, p.reserved_quantity
        
        WITH DATA;
        
        -- 物化視圖索引
        CREATE UNIQUE INDEX IF NOT EXISTS idx_product_stats_product 
        ON product_statistics(product_id);
        
        CREATE INDEX IF NOT EXISTS idx_product_stats_category 
        ON product_statistics(category_id);
        
        CREATE INDEX IF NOT EXISTS idx_product_stats_revenue 
        ON product_statistics(total_revenue DESC);
        
        -- 創(chuàng)建刷新物化視圖的函數(shù)
        CREATE OR REPLACE FUNCTION refresh_product_statistics()
        RETURNS TRIGGER AS $$
        BEGIN
            REFRESH MATERIALIZED VIEW CONCURRENTLY product_statistics;
            RETURN NULL;
        END;
        $$ LANGUAGE plpgsql;
        """
        
        try:
            # 分步執(zhí)行架構(gòu)創(chuàng)建
            statements = schema_sql.split(';')
            for statement in statements:
                statement = statement.strip()
                if statement:
                    self.cursor.execute(statement)
            
            self.conn.commit()
            logger.info("電商系統(tǒng)數(shù)據(jù)庫架構(gòu)創(chuàng)建成功")
            
        except Exception as e:
            self.conn.rollback()
            logger.error(f"創(chuàng)建架構(gòu)失敗: {e}")
            raise
    
    def search_products(
        self,
        query: Optional[str] = None,
        category_id: Optional[str] = None,
        min_price: Optional[float] = None,
        max_price: Optional[float] = None,
        brands: Optional[List[str]] = None,
        min_rating: Optional[float] = None,
        in_stock_only: bool = False,
        sort_by: str = 'relevance',
        limit: int = 20,
        offset: int = 0
    ) -> List[Dict[str, Any]]:
        """
        高級產(chǎn)品搜索
        
        Args:
            query: 搜索關(guān)鍵詞
            category_id: 分類ID
            min_price: 最低價格
            max_price: 最高價格
            brands: 品牌列表
            min_rating: 最低評分
            in_stock_only: 僅顯示有貨商品
            sort_by: 排序方式
            limit: 返回數(shù)量
            offset: 偏移量
            
        Returns:
            產(chǎn)品列表
        """
        search_sql = """
        SELECT 
            p.id,
            p.sku,
            p.name,
            p.description,
            p.brand,
            p.base_price,
            p.discount_price,
            p.stock_quantity,
            p.reserved_quantity,
            p.attributes,
            
            -- 計算可用庫存
            p.stock_quantity - p.reserved_quantity as available_quantity,
            
            -- 計算折扣率
            CASE 
                WHEN p.discount_price IS NOT NULL 
                THEN ROUND((1 - p.discount_price / p.base_price) * 100, 1)
                ELSE 0
            END as discount_percentage,
            
            -- 獲取評分信息
            COALESCE(ps.average_rating, 0) as average_rating,
            COALESCE(ps.review_count, 0) as review_count,
            
            -- 計算相關(guān)性得分(如果有關(guān)鍵詞)
            {relevance_score}
            
        FROM products p
        LEFT JOIN product_statistics ps ON p.id = ps.product_id
        
        WHERE 1=1
            {search_condition}
            {category_condition}
            {price_condition}
            {brand_condition}
            {rating_condition}
            {stock_condition}
            
        {order_clause}
        
        LIMIT %s OFFSET %s
        """
        
        # 構(gòu)建查詢條件
        conditions = []
        params = []
        
        # 全文搜索條件
        if query:
            conditions.append("p.search_vector @@ plainto_tsquery('english', %s)")
            params.append(query)
        
        # 分類條件
        if category_id:
            # 獲取分類及其所有子分類
            subcategories = self._get_all_subcategories(category_id)
            if subcategories:
                placeholders = ', '.join(['%s'] * len(subcategories))
                conditions.append(f"p.category_id IN ({placeholders})")
                params.extend(subcategories)
        
        # 價格條件
        if min_price is not None:
            conditions.append("p.base_price >= %s")
            params.append(min_price)
        if max_price is not None:
            conditions.append("p.base_price <= %s")
            params.append(max_price)
        
        # 品牌條件
        if brands:
            placeholders = ', '.join(['%s'] * len(brands))
            conditions.append(f"p.brand IN ({placeholders})")
            params.extend(brands)
        
        # 評分條件
        if min_rating is not None:
            conditions.append("COALESCE(ps.average_rating, 0) >= %s")
            params.append(min_rating)
        
        # 庫存條件
        if in_stock_only:
            conditions.append("(p.stock_quantity - p.reserved_quantity) > 0")
        
        # 構(gòu)建相關(guān)性得分計算
        relevance_score = ""
        if query:
            relevance_score = """
            , ts_rank(
                p.search_vector, 
                plainto_tsquery('english', %s)
            ) as relevance_score
            """
        
        # 構(gòu)建排序子句
        order_clause_map = {
            'relevance': "ORDER BY relevance_score DESC NULLS LAST",
            'price_asc': "ORDER BY p.base_price ASC",
            'price_desc': "ORDER BY p.base_price DESC",
            'rating': "ORDER BY ps.average_rating DESC NULLS LAST",
            'popularity': "ORDER BY ps.total_units_sold DESC NULLS LAST",
            'newest': "ORDER BY p.created_at DESC"
        }
        
        order_clause = order_clause_map.get(sort_by, "ORDER BY p.created_at DESC")
        
        # 格式化SQL
        formatted_sql = search_sql.format(
            relevance_score=relevance_score,
            search_condition=f"AND {' AND '.join(conditions)}" if conditions else "",
            category_condition="",
            price_condition="",
            brand_condition="",
            rating_condition="",
            stock_condition="",
            order_clause=order_clause
        )
        
        # 添加分頁參數(shù)
        params.extend([limit, offset])
        
        try:
            self.cursor.execute(formatted_sql, params)
            products = self.cursor.fetchall()
            
            return [
                {
                    'id': str(product['id']),
                    'sku': product['sku'],
                    'name': product['name'],
                    'brand': product['brand'],
                    'base_price': float(product['base_price']),
                    'discount_price': float(product['discount_price']) if product['discount_price'] else None,
                    'available_quantity': product['available_quantity'],
                    'discount_percentage': product['discount_percentage'],
                    'average_rating': float(product['average_rating']),
                    'review_count': product['review_count'],
                    'attributes': product['attributes']
                }
                for product in products
            ]
            
        except Exception as e:
            logger.error(f"產(chǎn)品搜索失敗: {e}")
            return []
    
    def _get_all_subcategories(self, category_id: str) -> List[str]:
        """
        獲取分類及其所有子分類
        
        Args:
            category_id: 分類ID
            
        Returns:
            子分類ID列表
        """
        recursive_sql = """
        WITH RECURSIVE category_tree AS (
            -- 基礎(chǔ)分類
            SELECT id, parent_id
            FROM categories
            WHERE id = %s
            
            UNION ALL
            
            -- 遞歸獲取子分類
            SELECT c.id, c.parent_id
            FROM categories c
            INNER JOIN category_tree ct ON c.parent_id = ct.id
        )
        SELECT id FROM category_tree
        """
        
        try:
            self.cursor.execute(recursive_sql, (category_id,))
            results = self.cursor.fetchall()
            return [str(row['id']) for row in results]
        except Exception as e:
            logger.error(f"獲取子分類失敗: {e}")
            return []
    
    def get_product_recommendations(
        self, 
        product_id: str, 
        customer_id: Optional[str] = None,
        limit: int = 10
    ) -> List[Dict[str, Any]]:
        """
        獲取產(chǎn)品推薦
        
        Args:
            product_id: 產(chǎn)品ID
            customer_id: 客戶ID(可選)
            limit: 返回數(shù)量
            
        Returns:
            推薦產(chǎn)品列表
        """
        recommendations_sql = """
        -- 基于多種策略的混合推薦
        
        (
            -- 策略1: 同品牌產(chǎn)品
            SELECT 
                p.id,
                p.name,
                p.brand,
                p.base_price,
                'same_brand' as recommendation_reason,
                0.7 as recommendation_score
            FROM products p
            WHERE p.brand = (
                SELECT brand FROM products WHERE id = %s
            )
            AND p.id != %s
            AND (p.stock_quantity - p.reserved_quantity) > 0
            LIMIT 3
        )
        
        UNION ALL
        
        (
            -- 策略2: 同分類熱門產(chǎn)品
            SELECT 
                p.id,
                p.name,
                p.brand,
                p.base_price,
                'popular_in_category' as recommendation_reason,
                ps.total_units_sold::float / 
                    (SELECT MAX(total_units_sold) FROM product_statistics) as recommendation_score
            FROM products p
            JOIN product_statistics ps ON p.id = ps.product_id
            WHERE p.category_id = (
                SELECT category_id FROM products WHERE id = %s
            )
            AND p.id != %s
            AND (p.stock_quantity - p.reserved_quantity) > 0
            ORDER BY ps.total_units_sold DESC
            LIMIT 3
        )
        
        UNION ALL
        
        (
            -- 策略3: 經(jīng)常一起購買的產(chǎn)品
            SELECT 
                p.id,
                p.name,
                p.brand,
                p.base_price,
                'frequently_bought_together' as recommendation_reason,
                COUNT(DISTINCT oi2.order_id)::float / 
                    (SELECT COUNT(DISTINCT oi3.order_id) 
                     FROM order_items oi3 
                     WHERE oi3.product_id = %s) as recommendation_score
            FROM order_items oi1
            JOIN order_items oi2 ON oi1.order_id = oi2.order_id
            JOIN products p ON oi2.product_id = p.id
            WHERE oi1.product_id = %s
            AND oi2.product_id != %s
            AND (p.stock_quantity - p.reserved_quantity) > 0
            GROUP BY p.id, p.name, p.brand, p.base_price
            HAVING COUNT(DISTINCT oi2.order_id) >= 2
            ORDER BY recommendation_score DESC
            LIMIT 3
        )
        
        {customer_based_recommendations}
        
        ORDER BY recommendation_score DESC
        LIMIT %s
        """
        
        # 如果有客戶ID,添加個性化推薦
        customer_recommendations = ""
        if customer_id:
            customer_recommendations = """
            UNION ALL
            
            (
                -- 策略4: 基于客戶購買歷史的推薦
                SELECT 
                    p.id,
                    p.name,
                    p.brand,
                    p.base_price,
                    'based_on_your_purchases' as recommendation_reason,
                    COUNT(DISTINCT oi.order_id)::float / 10 as recommendation_score
                FROM order_items oi
                JOIN products p ON oi.product_id = p.id
                WHERE oi.order_id IN (
                    SELECT order_id 
                    FROM order_items 
                    WHERE product_id = %s
                )
                AND p.id != %s
                AND (p.stock_quantity - p.reserved_quantity) > 0
                GROUP BY p.id, p.name, p.brand, p.base_price
                HAVING COUNT(DISTINCT oi.order_id) >= 1
                ORDER BY recommendation_score DESC
                LIMIT 2
            )
            """
        
        # 格式化SQL
        formatted_sql = recommendations_sql.format(
            customer_based_recommendations=customer_recommendations
        )
        
        # 準(zhǔn)備參數(shù)
        params = [product_id, product_id, product_id, product_id, 
                 product_id, product_id, product_id]
        
        if customer_id:
            params.extend([customer_id, customer_id, product_id, product_id])
        
        params.append(limit)
        
        try:
            self.cursor.execute(formatted_sql, params)
            recommendations = self.cursor.fetchall()
            
            return [
                {
                    'id': str(rec['id']),
                    'name': rec['name'],
                    'brand': rec['brand'],
                    'price': float(rec['base_price']),
                    'recommendation_reason': rec['recommendation_reason'],
                    'recommendation_score': float(rec['recommendation_score'])
                }
                for rec in recommendations
            ]
            
        except Exception as e:
            logger.error(f"獲取推薦失敗: {e}")
            return []
    
    def create_order(
        self,
        customer_id: str,
        items: List[Dict[str, Any]],
        shipping_address: Dict[str, Any],
        promotion_code: Optional[str] = None
    ) -> Optional[Dict[str, Any]]:
        """
        創(chuàng)建訂單
        
        Args:
            customer_id: 客戶ID
            items: 訂單項(xiàng)列表
            shipping_address: 配送地址
            promotion_code: 促銷代碼
            
        Returns:
            創(chuàng)建的訂單信息
        """
        try:
            # 開始事務(wù)
            self.conn.autocommit = False
            
            # 生成訂單號
            order_number = f"ORD-{datetime.now().strftime('%Y%m%d')}-{self._generate_order_suffix()}"
            
            # 計算訂單金額
            order_calculation = self._calculate_order_amounts(items, promotion_code)
            
            # 插入訂單
            order_sql = """
            INSERT INTO orders_partitioned (
                order_number, customer_id, status, order_metadata,
                subtotal, tax_amount, shipping_amount, 
                discount_amount, total_amount, ordered_at
            )
            VALUES (%s, %s, 'PENDING', %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP)
            RETURNING id, order_number, total_amount
            """
            
            self.cursor.execute(
                order_sql,
                (
                    order_number,
                    customer_id,
                    Json({
                        'shipping_address': shipping_address,
                        'promotion_code': promotion_code
                    }),
                    order_calculation['subtotal'],
                    order_calculation['tax_amount'],
                    order_calculation['shipping_amount'],
                    order_calculation['discount_amount'],
                    order_calculation['total_amount']
                )
            )
            
            order_result = self.cursor.fetchone()
            order_id = order_result['id']
            
            # 插入訂單項(xiàng)
            for item in items:
                # 獲取產(chǎn)品快照
                product_snapshot = self._get_product_snapshot(item['product_id'])
                
                # 插入訂單項(xiàng)
                item_sql = """
                INSERT INTO order_items (
                    order_id, product_id, variant_id,
                    product_snapshot, quantity, unit_price,
                    discount_percentage, item_total
                )
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
                """
                
                self.cursor.execute(
                    item_sql,
                    (
                        order_id,
                        item['product_id'],
                        item.get('variant_id'),
                        Json(product_snapshot),
                        item['quantity'],
                        item['unit_price'],
                        item.get('discount_percentage', 0),
                        item['item_total']
                    )
                )
                
                # 更新庫存
                self._update_inventory(
                    product_id=item['product_id'],
                    variant_id=item.get('variant_id'),
                    quantity_change=-item['quantity'],
                    reference_id=order_id,
                    reference_type='ORDER',
                    change_type='SALE'
                )
            
            # 提交事務(wù)
            self.conn.commit()
            
            return {
                'order_id': str(order_id),
                'order_number': order_result['order_number'],
                'total_amount': float(order_result['total_amount']),
                'items_count': len(items)
            }
            
        except Exception as e:
            self.conn.rollback()
            logger.error(f"創(chuàng)建訂單失敗: {e}")
            return None
    
    def _calculate_order_amounts(
        self,
        items: List[Dict[str, Any]],
        promotion_code: Optional[str] = None
    ) -> Dict[str, float]:
        """計算訂單金額"""
        subtotal = sum(item['item_total'] for item in items)
        
        # 應(yīng)用促銷折扣
        discount_amount = 0
        if promotion_code:
            # 這里可以添加促銷邏輯
            pass
        
        # 計算稅費(fèi)(簡化示例)
        tax_amount = subtotal * 0.1  # 10%稅率
        
        # 計算運(yùn)費(fèi)(簡化示例)
        shipping_amount = 5.99 if subtotal < 50 else 0
        
        # 計算總額
        total_amount = subtotal + tax_amount + shipping_amount - discount_amount
        
        return {
            'subtotal': subtotal,
            'tax_amount': tax_amount,
            'shipping_amount': shipping_amount,
            'discount_amount': discount_amount,
            'total_amount': total_amount
        }
    
    def _get_product_snapshot(self, product_id: str) -> Dict[str, Any]:
        """獲取產(chǎn)品快照"""
        snapshot_sql = """
        SELECT 
            id, sku, name, description, brand,
            base_price, attributes
        FROM products
        WHERE id = %s
        """
        
        self.cursor.execute(snapshot_sql, (product_id,))
        product = self.cursor.fetchone()
        
        return {
            'product_id': str(product['id']),
            'sku': product['sku'],
            'name': product['name'],
            'brand': product['brand'],
            'price_at_time_of_purchase': float(product['base_price']),
            'attributes': product['attributes']
        }
    
    def _update_inventory(
        self,
        product_id: str,
        variant_id: Optional[str],
        quantity_change: int,
        reference_id: str,
        reference_type: str,
        change_type: str
    ):
        """更新庫存"""
        # 獲取當(dāng)前庫存
        if variant_id:
            stock_sql = """
            SELECT variant_stock as current_quantity
            FROM product_variants
            WHERE id = %s
            """
            self.cursor.execute(stock_sql, (variant_id,))
        else:
            stock_sql = """
            SELECT stock_quantity as current_quantity
            FROM products
            WHERE id = %s
            """
            self.cursor.execute(stock_sql, (product_id,))
        
        result = self.cursor.fetchone()
        if not result:
            raise Exception("產(chǎn)品不存在")
        
        current_quantity = result['current_quantity']
        new_quantity = current_quantity + quantity_change
        
        if new_quantity < 0:
            raise Exception("庫存不足")
        
        # 更新庫存
        if variant_id:
            update_sql = """
            UPDATE product_variants
            SET variant_stock = %s
            WHERE id = %s
            """
            self.cursor.execute(update_sql, (new_quantity, variant_id))
        else:
            update_sql = """
            UPDATE products
            SET stock_quantity = %s
            WHERE id = %s
            """
            self.cursor.execute(update_sql, (new_quantity, product_id))
        
        # 記錄庫存變更
        log_sql = """
        INSERT INTO inventory_logs (
            product_id, variant_id, change_type,
            quantity_change, previous_quantity, new_quantity,
            reference_id, reference_type
        )
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
        """
        
        self.cursor.execute(
            log_sql,
            (
                product_id,
                variant_id,
                change_type,
                quantity_change,
                current_quantity,
                new_quantity,
                reference_id,
                reference_type
            )
        )
    
    def _generate_order_suffix(self) -> str:
        """生成訂單號后綴"""
        import random
        import string
        
        # 生成6位隨機(jī)字母數(shù)字
        return ''.join(random.choices(
            string.ascii_uppercase + string.digits, k=6
        ))
    
    def refresh_materialized_views(self):
        """刷新物化視圖"""
        refresh_sql = """
        REFRESH MATERIALIZED VIEW CONCURRENTLY product_statistics;
        """
        
        try:
            self.cursor.execute(refresh_sql)
            self.conn.commit()
            logger.info("物化視圖刷新成功")
        except Exception as e:
            self.conn.rollback()
            logger.error(f"刷新物化視圖失敗: {e}")
    
    def close(self):
        """關(guān)閉連接"""
        if self.cursor:
            self.cursor.close()
        if self.conn:
            self.conn.close()
        logger.info("數(shù)據(jù)庫連接已關(guān)閉")


def demonstrate_ecommerce_features():
    """演示電商系統(tǒng)高級特性"""
    dsn = "dbname=ecommerce user=postgres password=password host=localhost port=5432"
    
    print("電商系統(tǒng)PostgreSQL高級特性演示")
    print("=" * 60)
    
    # 創(chuàng)建數(shù)據(jù)庫實(shí)例
    db = ECommerceDatabase(dsn)
    
    try:
        # 1. 創(chuàng)建架構(gòu)
        print("\n1. 創(chuàng)建數(shù)據(jù)庫架構(gòu)...")
        db.create_schema()
        print("   架構(gòu)創(chuàng)建完成")
        
        # 2. 演示搜索功能
        print("\n2. 演示高級產(chǎn)品搜索...")
        products = db.search_products(
            query="wireless headphone",
            min_price=50,
            max_price=200,
            min_rating=4.0,
            in_stock_only=True,
            sort_by='rating',
            limit=5
        )
        
        print(f"   找到 {len(products)} 個產(chǎn)品:")
        for product in products:
            print(f"   - {product['name']} (評分: {product['average_rating']:.1f}, "
                  f"價格: ${product['base_price']:.2f})")
        
        # 3. 演示推薦系統(tǒng)
        if products:
            print("\n3. 演示產(chǎn)品推薦系統(tǒng)...")
            recommendations = db.get_product_recommendations(
                product_id=products[0]['id'],
                limit=5
            )
            
            print(f"   為 '{products[0]['name']}' 的推薦:")
            for rec in recommendations:
                print(f"   - {rec['name']} (原因: {rec['recommendation_reason']}, "
                      f"得分: {rec['recommendation_score']:.2f})")
        
        # 4. 演示訂單創(chuàng)建
        print("\n4. 演示訂單創(chuàng)建...")
        # 這里需要實(shí)際的產(chǎn)品數(shù)據(jù),所以只是演示代碼結(jié)構(gòu)
        print("   訂單創(chuàng)建功能就緒")
        
        # 5. 刷新物化視圖
        print("\n5. 刷新物化視圖...")
        db.refresh_materialized_views()
        print("   物化視圖刷新完成")
        
        print("\n演示完成!")
        
    except Exception as e:
        print(f"演示過程中出錯: {e}")
    finally:
        db.close()


if __name__ == "__main__":
    demonstrate_ecommerce_features()

5. 性能監(jiān)控與調(diào)優(yōu)工具

5.1 PostgreSQL性能監(jiān)控指標(biāo)體系

5.2 關(guān)鍵性能指標(biāo)計算公式

緩沖區(qū)命中率

索引使用效率

緩存命中率

6. 完整代碼示例

"""
PostgreSQL高級特性與性能優(yōu)化完整示例
"""
import psycopg2
from psycopg2.extras import DictCursor, Json
import time
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
import json
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class PostgreSQLAdvancedOptimizer:
    """PostgreSQL高級優(yōu)化器"""
    
    def __init__(self, dsn: str):
        """
        初始化優(yōu)化器
        
        Args:
            dsn: 數(shù)據(jù)庫連接字符串
        """
        self.dsn = dsn
        self.conn = psycopg2.connect(dsn)
        self.cursor = self.conn.cursor(cursor_factory=DictCursor)
    
    def analyze_database_health(self) -> Dict[str, Any]:
        """
        全面分析數(shù)據(jù)庫健康狀態(tài)
        
        Returns:
            數(shù)據(jù)庫健康報告
        """
        health_report = {
            'timestamp': datetime.now().isoformat(),
            'overall_score': 0,
            'categories': {},
            'issues': [],
            'recommendations': []
        }
        
        # 分析各個維度
        categories = [
            ('連接與并發(fā)', self._analyze_connections),
            ('查詢性能', self._analyze_query_performance),
            ('索引效率', self._analyze_index_efficiency),
            ('緩存性能', self._analyze_cache_performance),
            ('存儲效率', self._analyze_storage_efficiency),
            ('配置優(yōu)化', self._analyze_configuration)
        ]
        
        total_score = 0
        category_count = 0
        
        for category_name, analyzer_func in categories:
            try:
                category_result = analyzer_func()
                health_report['categories'][category_name] = category_result
                
                if 'score' in category_result:
                    total_score += category_result['score']
                    category_count += 1
                
                if 'issues' in category_result:
                    health_report['issues'].extend(category_result['issues'])
                
                if 'recommendations' in category_result:
                    health_report['recommendations'].extend(category_result['recommendations'])
                    
            except Exception as e:
                logger.error(f"分析{category_name}失敗: {e}")
                health_report['issues'].append({
                    'category': category_name,
                    'severity': 'ERROR',
                    'message': f'分析失敗: {str(e)}'
                })
        
        # 計算總體評分
        if category_count > 0:
            health_report['overall_score'] = total_score / category_count
        
        # 排序問題和建議
        health_report['issues'] = sorted(
            health_report['issues'], 
            key=lambda x: {'CRITICAL': 3, 'HIGH': 2, 'MEDIUM': 1, 'LOW': 0}.get(x.get('severity', 'LOW'), 0),
            reverse=True
        )
        
        health_report['recommendations'] = sorted(
            health_report['recommendations'],
            key=lambda x: x.get('priority', 3),
            reverse=False
        )
        
        return health_report
    
    def _analyze_connections(self) -> Dict[str, Any]:
        """分析連接與并發(fā)"""
        analysis = {
            'score': 100,
            'metrics': {},
            'issues': [],
            'recommendations': []
        }
        
        try:
            # 獲取連接統(tǒng)計
            self.cursor.execute("""
                SELECT 
                    COUNT(*) as total_connections,
                    COUNT(*) FILTER (WHERE state = 'active') as active_connections,
                    COUNT(*) FILTER (WHERE state = 'idle') as idle_connections,
                    COUNT(*) FILTER (WHERE wait_event_type IS NOT NULL) as waiting_connections,
                    MAX(now() - backend_start) as oldest_connection_age
                FROM pg_stat_activity
                WHERE pid <> pg_backend_pid()
            """)
            conn_stats = self.cursor.fetchone()
            
            # 獲取配置參數(shù)
            self.cursor.execute("""
                SELECT setting::integer as max_connections
                FROM pg_settings
                WHERE name = 'max_connections'
            """)
            max_conns = self.cursor.fetchone()['max_connections']
            
            # 計算連接使用率
            conn_usage = conn_stats['total_connections'] / max_conns
            analysis['metrics'] = {
                'total_connections': conn_stats['total_connections'],
                'active_connections': conn_stats['active_connections'],
                'idle_connections': conn_stats['idle_connections'],
                'waiting_connections': conn_stats['waiting_connections'],
                'connection_usage_percentage': round(conn_usage * 100, 1),
                'max_connections': max_conns
            }
            
            # 分析問題
            if conn_usage > 0.8:
                analysis['score'] -= 30
                analysis['issues'].append({
                    'severity': 'HIGH',
                    'message': f'連接使用率過高: {conn_usage:.1%}',
                    'details': '接近最大連接數(shù)限制'
                })
                analysis['recommendations'].append({
                    'priority': 1,
                    'action': '考慮增加max_connections參數(shù)或優(yōu)化連接池配置'
                })
            
            if conn_stats['waiting_connections'] > 5:
                analysis['score'] -= 20
                analysis['issues'].append({
                    'severity': 'MEDIUM',
                    'message': f'等待連接數(shù)較多: {conn_stats["waiting_connections"]}',
                    'details': '可能存在鎖爭用或資源競爭'
                })
            
            if conn_stats['oldest_connection_age'] and \
               conn_stats['oldest_connection_age'].total_seconds() > 3600:
                analysis['score'] -= 10
                analysis['issues'].append({
                    'severity': 'LOW',
                    'message': f'存在長時間連接: {conn_stats["oldest_connection_age"]}',
                    'details': '考慮優(yōu)化連接生命周期'
                })
            
        except Exception as e:
            logger.error(f"連接分析失敗: {e}")
            analysis['score'] = 0
        
        return analysis
    
    def _analyze_query_performance(self) -> Dict[str, Any]:
        """分析查詢性能"""
        analysis = {
            'score': 100,
            'metrics': {},
            'issues': [],
            'recommendations': []
        }
        
        try:
            # 獲取慢查詢統(tǒng)計
            self.cursor.execute("""
                WITH query_stats AS (
                    SELECT 
                        query,
                        calls,
                        total_time,
                        mean_time,
                        rows,
                        100.0 * shared_blks_hit / nullif(shared_blks_hit + shared_blks_read, 0) as buffer_hit_rate,
                        ROW_NUMBER() OVER (ORDER BY total_time DESC) as time_rank
                    FROM pg_stat_statements
                    WHERE query NOT LIKE '%pg_stat_statements%'
                    AND calls > 0
                )
                SELECT 
                    COUNT(*) as total_queries,
                    SUM(calls) as total_calls,
                    SUM(total_time) as total_time_ms,
                    AVG(mean_time) as avg_query_time_ms,
                    PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY mean_time) as p95_query_time_ms,
                    SUM(CASE WHEN mean_time > 100 THEN 1 ELSE 0 END) as slow_queries_count,
                    AVG(buffer_hit_rate) as avg_buffer_hit_rate
                FROM query_stats
            """)
            query_stats = self.cursor.fetchone()
            
            analysis['metrics'] = {
                'total_queries': query_stats['total_queries'],
                'total_calls': query_stats['total_calls'],
                'total_time_seconds': round(query_stats['total_time_ms'] / 1000, 1),
                'avg_query_time_ms': round(query_stats['avg_query_time_ms'], 2),
                'p95_query_time_ms': round(query_stats['p95_query_time_ms'], 2),
                'slow_queries_count': query_stats['slow_queries_count'],
                'avg_buffer_hit_rate': round(query_stats['avg_buffer_hit_rate'], 1)
            }
            
            # 分析問題
            if query_stats['avg_query_time_ms'] > 50:
                analysis['score'] -= 20
                analysis['issues'].append({
                    'severity': 'HIGH',
                    'message': f'平均查詢時間較高: {query_stats["avg_query_time_ms"]:.2f}ms',
                    'details': '可能存在查詢優(yōu)化空間'
                })
                analysis['recommendations'].append({
                    'priority': 1,
                    'action': '分析并優(yōu)化最耗時的查詢'
                })
            
            if query_stats['slow_queries_count'] > 10:
                analysis['score'] -= 15
                analysis['issues'].append({
                    'severity': 'MEDIUM',
                    'message': f'發(fā)現(xiàn) {query_stats["slow_queries_count"]} 個慢查詢',
                    'details': '定義: 平均執(zhí)行時間 > 100ms'
                })
            
            if query_stats['avg_buffer_hit_rate'] < 90:
                analysis['score'] -= 10
                analysis['issues'].append({
                    'severity': 'MEDIUM',
                    'message': f'平均緩沖區(qū)命中率較低: {query_stats["avg_buffer_hit_rate"]:.1f}%',
                    'details': '建議增加shared_buffers或優(yōu)化查詢'
                })
            
        except Exception as e:
            logger.error(f"查詢性能分析失敗: {e}")
            analysis['score'] = 0
        
        return analysis
    
    def _analyze_index_efficiency(self) -> Dict[str, Any]:
        """分析索引效率"""
        analysis = {
            'score': 100,
            'metrics': {},
            'issues': [],
            'recommendations': []
        }
        
        try:
            # 獲取索引使用統(tǒng)計
            self.cursor.execute("""
                WITH index_stats AS (
                    SELECT 
                        schemaname,
                        tablename,
                        indexname,
                        idx_scan as index_scans,
                        idx_tup_read as tuples_read,
                        idx_tup_fetch as tuples_fetched,
                        pg_relation_size(indexname::regclass) as index_size_bytes,
                        CASE 
                            WHEN idx_scan = 0 THEN 0
                            ELSE idx_tup_fetch::float / idx_tup_read * 100
                        END as index_efficiency
                    FROM pg_stat_user_indexes
                    WHERE schemaname NOT LIKE 'pg_%'
                )
                SELECT 
                    COUNT(*) as total_indexes,
                    SUM(index_size_bytes) as total_index_size_bytes,
                    SUM(CASE WHEN index_scans = 0 THEN 1 ELSE 0 END) as unused_indexes_count,
                    AVG(index_efficiency) as avg_index_efficiency,
                    PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY index_efficiency) as median_index_efficiency
                FROM index_stats
            """)
            index_stats = self.cursor.fetchone()
            
            analysis['metrics'] = {
                'total_indexes': index_stats['total_indexes'],
                'total_index_size_gb': round(index_stats['total_index_size_bytes'] / (1024**3), 2),
                'unused_indexes_count': index_stats['unused_indexes_count'],
                'avg_index_efficiency': round(index_stats['avg_index_efficiency'], 1),
                'median_index_efficiency': round(index_stats['median_index_efficiency'], 1)
            }
            
            # 分析問題
            unused_ratio = index_stats['unused_indexes_count'] / index_stats['total_indexes'] \
                if index_stats['total_indexes'] > 0 else 0
            
            if unused_ratio > 0.1:
                analysis['score'] -= 25
                analysis['issues'].append({
                    'severity': 'MEDIUM',
                    'message': f'未使用索引比例較高: {unused_ratio:.1%}',
                    'details': f'{index_stats["unused_indexes_count"]} 個索引從未使用'
                })
                analysis['recommendations'].append({
                    'priority': 2,
                    'action': '考慮刪除未使用的索引以節(jié)省空間和提高寫入性能'
                })
            
            if index_stats['avg_index_efficiency'] < 50:
                analysis['score'] -= 15
                analysis['issues'].append({
                    'severity': 'MEDIUM',
                    'message': f'平均索引效率較低: {index_stats["avg_index_efficiency"]:.1f}%',
                    'details': '索引可能不是最優(yōu)選擇'
                })
            
        except Exception as e:
            logger.error(f"索引效率分析失敗: {e}")
            analysis['score'] = 0
        
        return analysis
    
    def _analyze_cache_performance(self) -> Dict[str, Any]:
        """分析緩存性能"""
        analysis = {
            'score': 100,
            'metrics': {},
            'issues': [],
            'recommendations': []
        }
        
        try:
            # 獲取緩存統(tǒng)計
            self.cursor.execute("""
                SELECT 
                    -- 共享緩沖區(qū)命中率
                    CASE 
                        WHEN blks_hit + blks_read > 0 
                        THEN blks_hit::float / (blks_hit + blks_read) * 100
                        ELSE 0 
                    END as shared_buffer_hit_rate,
                    
                    -- TOAST緩沖區(qū)命中率
                    CASE 
                        WHEN toast_blks_hit + toast_blks_read > 0 
                        THEN toast_blks_hit::float / (toast_blks_hit + toast_blks_read) * 100
                        ELSE 0 
                    END as toast_buffer_hit_rate,
                    
                    -- 臨時文件使用
                    temp_files,
                    temp_bytes,
                    
                    -- 檢查點(diǎn)統(tǒng)計
                    checkpoints_timed,
                    checkpoints_req,
                    checkpoint_write_time,
                    checkpoint_sync_time
                    
                FROM pg_stat_bgwriter
            """)
            cache_stats = self.cursor.fetchone()
            
            # 獲取緩沖區(qū)配置
            self.cursor.execute("""
                SELECT 
                    name,
                    setting,
                    unit
                FROM pg_settings
                WHERE name IN ('shared_buffers', 'effective_cache_size')
            """)
            cache_config = {row['name']: row for row in self.cursor.fetchall()}
            
            analysis['metrics'] = {
                'shared_buffer_hit_rate': round(cache_stats['shared_buffer_hit_rate'], 1),
                'toast_buffer_hit_rate': round(cache_stats['toast_buffer_hit_rate'], 1),
                'temp_files_count': cache_stats['temp_files'],
                'temp_files_size_gb': round(cache_stats['temp_bytes'] / (1024**3), 2),
                'shared_buffers': cache_config.get('shared_buffers', {}).get('setting', 'N/A'),
                'effective_cache_size': cache_config.get('effective_cache_size', {}).get('setting', 'N/A')
            }
            
            # 分析問題
            if cache_stats['shared_buffer_hit_rate'] < 90:
                analysis['score'] -= 20
                analysis['issues'].append({
                    'severity': 'MEDIUM',
                    'message': f'共享緩沖區(qū)命中率較低: {cache_stats["shared_buffer_hit_rate"]:.1f}%',
                    'details': '建議增加shared_buffers或優(yōu)化工作集'
                })
                analysis['recommendations'].append({
                    'priority': 2,
                    'action': '考慮增加shared_buffers參數(shù)'
                })
            
            if cache_stats['temp_files'] > 100:
                analysis['score'] -= 15
                analysis['issues'].append({
                    'severity': 'MEDIUM',
                    'message': f'臨時文件使用較多: {cache_stats["temp_files"]} 個文件',
                    'details': '可能存在排序或哈希操作溢出到磁盤'
                })
                analysis['recommendations'].append({
                    'priority': 2,
                    'action': '增加work_mem參數(shù)以減少臨時文件使用'
                })
            
        except Exception as e:
            logger.error(f"緩存性能分析失敗: {e}")
            analysis['score'] = 0
        
        return analysis
    
    def _analyze_storage_efficiency(self) -> Dict[str, Any]:
        """分析存儲效率"""
        analysis = {
            'score': 100,
            'metrics': {},
            'issues': [],
            'recommendations': []
        }
        
        try:
            # 獲取表膨脹信息
            self.cursor.execute("""
                SELECT 
                    schemaname,
                    tablename,
                    n_dead_tup as dead_tuples,
                    n_live_tup as live_tuples,
                    CASE 
                        WHEN n_live_tup > 0 
                        THEN n_dead_tup::float / n_live_tup * 100
                        ELSE 0 
                    END as dead_tuple_ratio,
                    last_vacuum,
                    last_autovacuum,
                    last_analyze,
                    last_autoanalyze
                FROM pg_stat_user_tables
                WHERE schemaname NOT LIKE 'pg_%'
                ORDER BY dead_tuple_ratio DESC
                LIMIT 10
            """)
            table_stats = self.cursor.fetchall()
            
            # 獲取數(shù)據(jù)庫大小
            self.cursor.execute("""
                SELECT 
                    pg_database_size(current_database()) as database_size_bytes,
                    pg_size_pretty(pg_database_size(current_database())) as database_size_pretty
            """)
            db_size = self.cursor.fetchone()
            
            analysis['metrics'] = {
                'database_size': db_size['database_size_pretty'],
                'tables_analyzed': len(table_stats),
                'top_tables_by_dead_tuples': [
                    {
                        'table': f"{row['schemaname']}.{row['tablename']}",
                        'dead_tuple_ratio': round(row['dead_tuple_ratio'], 1),
                        'dead_tuples': row['dead_tuples'],
                        'last_vacuum': row['last_vacuum']
                    }
                    for row in table_stats[:5]
                ]
            }
            
            # 分析問題
            high_dead_tables = [
                row for row in table_stats 
                if row['dead_tuple_ratio'] > 20
            ]
            
            if high_dead_tables:
                analysis['score'] -= 25
                analysis['issues'].append({
                    'severity': 'MEDIUM',
                    'message': f'發(fā)現(xiàn) {len(high_dead_tables)} 個表死元組比例超過20%',
                    'details': '可能導(dǎo)致查詢性能下降和存儲空間浪費(fèi)'
                })
                analysis['recommendations'].append({
                    'priority': 2,
                    'action': '對高死元組比例的表執(zhí)行VACUUM操作'
                })
            
        except Exception as e:
            logger.error(f"存儲效率分析失敗: {e}")
            analysis['score'] = 0
        
        return analysis
    
    def _analyze_configuration(self) -> Dict[str, Any]:
        """分析配置優(yōu)化"""
        analysis = {
            'score': 100,
            'metrics': {},
            'issues': [],
            'recommendations': []
        }
        
        try:
            # 獲取關(guān)鍵配置參數(shù)
            self.cursor.execute("""
                SELECT 
                    name,
                    setting,
                    unit,
                    context,
                    vartype,
                    source
                FROM pg_settings
                WHERE name IN (
                    'shared_buffers',
                    'work_mem',
                    'maintenance_work_mem',
                    'effective_cache_size',
                    'max_connections',
                    'checkpoint_timeout',
                    'checkpoint_completion_target',
                    'wal_buffers',
                    'random_page_cost',
                    'seq_page_cost',
                    'effective_io_concurrency'
                )
                ORDER BY name
            """)
            configs = {row['name']: row for row in self.cursor.fetchall()}
            
            analysis['metrics'] = {
                'key_parameters': configs
            }
            
            # 檢查配置合理性
            issues = []
            recommendations = []
            
            # 檢查shared_buffers(應(yīng)為系統(tǒng)內(nèi)存的25%)
            if 'shared_buffers' in configs:
                shared_buffers = configs['shared_buffers']['setting']
                if shared_buffers.endswith('MB'):
                    mb_value = int(shared_buffers[:-2])
                    if mb_value < 128:  # 小于128MB
                        issues.append('shared_buffers設(shè)置可能過小')
                        recommendations.append('考慮增加shared_buffers到系統(tǒng)內(nèi)存的25%')
            
            # 檢查work_mem
            if 'work_mem' in configs:
                work_mem = configs['work_mem']['setting']
                if work_mem.endswith('kB'):
                    kb_value = int(work_mem[:-2])
                    if kb_value < 4096:  # 小于4MB
                        issues.append('work_mem設(shè)置可能過小')
                        recommendations.append('適當(dāng)增加work_mem以減少臨時文件使用')
            
            # 檢查checkpoint配置
            if 'checkpoint_timeout' in configs:
                checkpoint_timeout = int(configs['checkpoint_timeout']['setting'])
                if checkpoint_timeout > 900:  # 超過15分鐘
                    issues.append('checkpoint_timeout設(shè)置過長')
                    recommendations.append('考慮減少checkpoint_timeout以降低恢復(fù)時間')
            
            if issues:
                analysis['score'] -= len(issues) * 10
                for issue in issues:
                    analysis['issues'].append({
                        'severity': 'MEDIUM',
                        'message': issue
                    })
                
                for rec in recommendations:
                    analysis['recommendations'].append({
                        'priority': 3,
                        'action': rec
                    })
            
        except Exception as e:
            logger.error(f"配置分析失敗: {e}")
            analysis['score'] = 0
        
        return analysis
    
    def generate_optimization_report(self) -> Dict[str, Any]:
        """
        生成優(yōu)化報告
        
        Returns:
            優(yōu)化報告
        """
        health_report = self.analyze_database_health()
        
        report = {
            'summary': {
                'overall_score': health_report['overall_score'],
                'assessment': self._get_assessment(health_report['overall_score']),
                'total_issues': len(health_report['issues']),
                'total_recommendations': len(health_report['recommendations'])
            },
            'detailed_analysis': health_report['categories'],
            'critical_issues': [
                issue for issue in health_report['issues']
                if issue.get('severity') in ['CRITICAL', 'HIGH']
            ],
            'optimization_plan': self._create_optimization_plan(health_report),
            'execution_checklist': self._create_execution_checklist()
        }
        
        return report
    
    def _get_assessment(self, score: float) -> str:
        """根據(jù)評分獲取評估結(jié)果"""
        if score >= 90:
            return 'EXCELLENT'
        elif score >= 80:
            return 'GOOD'
        elif score >= 70:
            return 'FAIR'
        elif score >= 60:
            return 'NEEDS_IMPROVEMENT'
        else:
            return 'POOR'
    
    def _create_optimization_plan(self, health_report: Dict[str, Any]) -> List[Dict[str, Any]]:
        """創(chuàng)建優(yōu)化計劃"""
        plan = []
        
        # 按優(yōu)先級排序建議
        sorted_recommendations = sorted(
            health_report['recommendations'],
            key=lambda x: x.get('priority', 3)
        )
        
        for i, rec in enumerate(sorted_recommendations[:10], 1):  # 取前10個
            plan.append({
                'step': i,
                'action': rec['action'],
                'priority': rec.get('priority', 3),
                'estimated_effort': self._estimate_effort(rec['action']),
                'expected_impact': self._estimate_impact(rec['action'])
            })
        
        return plan
    
    def _estimate_effort(self, action: str) -> str:
        """估計實(shí)施難度"""
        low_effort_keywords = ['調(diào)整', '設(shè)置', '啟用', '禁用']
        medium_effort_keywords = ['優(yōu)化', '重構(gòu)', '重建', '遷移']
        high_effort_keywords = ['重寫', '重構(gòu)架構(gòu)', '數(shù)據(jù)遷移', '集群擴(kuò)展']
        
        action_lower = action.lower()
        
        if any(keyword in action_lower for keyword in high_effort_keywords):
            return 'HIGH'
        elif any(keyword in action_lower for keyword in medium_effort_keywords):
            return 'MEDIUM'
        else:
            return 'LOW'
    
    def _estimate_impact(self, action: str) -> str:
        """估計影響程度"""
        high_impact_keywords = ['性能提升', '顯著改善', '根本解決', '關(guān)鍵修復(fù)']
        medium_impact_keywords = ['優(yōu)化', '改進(jìn)', '增強(qiáng)', '調(diào)整']
        low_impact_keywords = ['微調(diào)', '小優(yōu)化', '維護(hù)', '清理']
        
        action_lower = action.lower()
        
        if any(keyword in action_lower for keyword in high_impact_keywords):
            return 'HIGH'
        elif any(keyword in action_lower for keyword in medium_impact_keywords):
            return 'MEDIUM'
        else:
            return 'LOW'
    
    def _create_execution_checklist(self) -> List[Dict[str, Any]]:
        """創(chuàng)建執(zhí)行檢查清單"""
        checklist = [
            {
                'phase': '準(zhǔn)備階段',
                'tasks': [
                    {'task': '備份數(shù)據(jù)庫', 'completed': False},
                    {'task': '驗(yàn)證備份完整性', 'completed': False},
                    {'task': '準(zhǔn)備回滾計劃', 'completed': False},
                    {'task': '安排維護(hù)窗口', 'completed': False}
                ]
            },
            {
                'phase': '實(shí)施階段',
                'tasks': [
                    {'task': '應(yīng)用配置變更', 'completed': False},
                    {'task': '執(zhí)行索引優(yōu)化', 'completed': False},
                    {'task': '運(yùn)行VACUUM操作', 'completed': False},
                    {'task': '更新統(tǒng)計信息', 'completed': False}
                ]
            },
            {
                'phase': '驗(yàn)證階段',
                'tasks': [
                    {'task': '驗(yàn)證性能改進(jìn)', 'completed': False},
                    {'task': '運(yùn)行回歸測試', 'completed': False},
                    {'task': '監(jiān)控系統(tǒng)穩(wěn)定性', 'completed': False},
                    {'task': '更新文檔記錄', 'completed': False}
                ]
            }
        ]
        
        return checklist
    
    def close(self):
        """關(guān)閉連接"""
        if self.cursor:
            self.cursor.close()
        if self.conn:
            self.conn.close()
        logger.info("數(shù)據(jù)庫連接已關(guān)閉")


def demonstrate_optimization():
    """演示優(yōu)化功能"""
    dsn = "dbname=testdb user=postgres password=password host=localhost port=5432"
    
    print("PostgreSQL高級優(yōu)化演示")
    print("=" * 60)
    
    optimizer = PostgreSQLAdvancedOptimizer(dsn)
    
    try:
        # 生成健康報告
        print("\n生成數(shù)據(jù)庫健康報告...")
        health_report = optimizer.analyze_database_health()
        
        print(f"\n總體評分: {health_report['overall_score']:.1f}/100")
        print(f"發(fā)現(xiàn)問題: {len(health_report['issues'])} 個")
        print(f"優(yōu)化建議: {len(health_report['recommendations'])} 條")
        
        # 顯示關(guān)鍵問題
        critical_issues = [
            issue for issue in health_report['issues']
            if issue.get('severity') in ['CRITICAL', 'HIGH']
        ]
        
        if critical_issues:
            print("\n關(guān)鍵問題:")
            for issue in critical_issues[:3]:
                print(f"  ? [{issue['severity']}] {issue['message']}")
        
        # 顯示高優(yōu)先級建議
        high_priority_recs = [
            rec for rec in health_report['recommendations']
            if rec.get('priority', 3) == 1
        ]
        
        if high_priority_recs:
            print("\n高優(yōu)先級建議:")
            for rec in high_priority_recs[:3]:
                print(f"  ? {rec['action']}")
        
        # 生成詳細(xì)報告
        print("\n\n生成詳細(xì)優(yōu)化報告...")
        report = optimizer.generate_optimization_report()
        
        print(f"\n優(yōu)化計劃 ({len(report['optimization_plan'])} 個步驟):")
        for step in report['optimization_plan'][:5]:
            print(f"  步驟{step['step']}: {step['action']}")
            print(f"     優(yōu)先級: {step['priority']}, 難度: {step['estimated_effort']}, "
                  f"影響: {step['expected_impact']}")
        
        print("\n執(zhí)行檢查清單:")
        for phase in report['execution_checklist']:
            print(f"  {phase['phase']}:")
            for task in phase['tasks']:
                status = '?' if task['completed'] else '○'
                print(f"    {status} {task['task']}")
        
        print("\n優(yōu)化演示完成!")
        
    except Exception as e:
        print(f"演示過程中出錯: {e}")
    finally:
        optimizer.close()


if __name__ == "__main__":
    demonstrate_optimization()

7. 代碼自查與最佳實(shí)踐

7.1 代碼質(zhì)量檢查清單

為確保PostgreSQL相關(guān)代碼的質(zhì)量,應(yīng)遵循以下檢查清單:

"""
PostgreSQL代碼質(zhì)量自查工具
"""
import re
import ast
from typing import List, Dict, Any, Set
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class PostgreSQLCodeChecker:
    """PostgreSQL代碼質(zhì)量檢查器"""
    
    def __init__(self):
        self.rules = {
            'sql_injection': self.check_sql_injection,
            'connection_management': self.check_connection_management,
            'transaction_handling': self.check_transaction_handling,
            'index_usage': self.check_index_usage,
            'data_type_validation': self.check_data_type_validation,
            'error_handling': self.check_error_handling,
            'performance_anti_patterns': self.check_performance_anti_patterns
        }
        
    def check_file(self, filepath: str) -> Dict[str, List[Dict[str, Any]]]:
        """
        檢查Python文件中的PostgreSQL相關(guān)代碼
        
        Args:
            filepath: Python文件路徑
            
        Returns:
            檢查結(jié)果
        """
        with open(filepath, 'r', encoding='utf-8') as f:
            content = f.read()
        
        issues = {}
        
        for rule_name, rule_func in self.rules.items():
            rule_issues = rule_func(content, filepath)
            if rule_issues:
                issues[rule_name] = rule_issues
        
        return issues
    
    def check_sql_injection(self, content: str, filepath: str) -> List[Dict[str, Any]]:
        """
        檢查SQL注入漏洞
        
        Args:
            content: 文件內(nèi)容
            filepath: 文件路徑
            
        Returns:
            問題列表
        """
        issues = []
        
        # 查找可能的字符串拼接SQL
        patterns = [
            (r'execute\s*\(.*?\s*%\s*.*?\)', '使用字符串格式化執(zhí)行SQL'),
            (r'execute\s*\(.*?\s*\+\s*.*?\)', '使用字符串拼接執(zhí)行SQL'),
            (r'executemany\s*\(.*?\s*%\s*.*?\)', '使用字符串格式化執(zhí)行批量SQL'),
            (r'f"SELECT.*{.*}.*"', '使用f-string直接嵌入變量'),
        ]
        
        lines = content.split('\n')
        for i, line in enumerate(lines, 1):
            for pattern, description in patterns:
                if re.search(pattern, line, re.IGNORECASE):
                    issues.append({
                        'line': i,
                        'severity': 'CRITICAL',
                        'message': f'潛在的SQL注入漏洞: {description}',
                        'suggestion': '使用參數(shù)化查詢(%s占位符)',
                        'code_snippet': line.strip()[:100]
                    })
        
        return issues
    
    def check_connection_management(self, content: str, filepath: str) -> List[Dict[str, Any]]:
        """
        檢查數(shù)據(jù)庫連接管理
        
        Args:
            content: 文件內(nèi)容
            filepath: 文件路徑
            
        Returns:
            問題列表
        """
        issues = []
        lines = content.split('\n')
        
        # 查找連接創(chuàng)建但不關(guān)閉的情況
        connect_pattern = r'psycopg2\.connect\(|connect\('
        close_pattern = r'\.close\(\)'
        
        in_function = False
        function_start = 0
        connect_lines = []
        
        for i, line in enumerate(lines, 1):
            # 檢測函數(shù)開始
            if line.strip().startswith('def '):
                in_function = True
                function_start = i
                connect_lines = []
            
            # 檢測連接創(chuàng)建
            if re.search(connect_pattern, line):
                connect_lines.append(i)
            
            # 檢測連接關(guān)閉
            if re.search(close_pattern, line) and 'close' in line:
                if connect_lines:
                    connect_lines.pop()
            
            # 檢測函數(shù)結(jié)束
            if line.strip() == '' or i == len(lines):
                if in_function and connect_lines:
                    for connect_line in connect_lines:
                        issues.append({
                            'line': connect_line,
                            'severity': 'HIGH',
                            'message': '數(shù)據(jù)庫連接可能未正確關(guān)閉',
                            'suggestion': '確保在finally塊中關(guān)閉連接',
                            'context': f'函數(shù)開始于第{function_start}行'
                        })
                in_function = False
        
        return issues
    
    def check_transaction_handling(self, content: str, filepath: str) -> List[Dict[str, Any]]:
        """
        檢查事務(wù)處理
        
        Args:
            content: 文件內(nèi)容
            filepath: 文件路徑
            
        Returns:
            問題列表
        """
        issues = []
        lines = content.split('\n')
        
        # 查找沒有明確事務(wù)管理的操作
        write_operations = [
            'INSERT', 'UPDATE', 'DELETE', 'CREATE', 'ALTER', 'DROP'
        ]
        
        transaction_keywords = [
            'BEGIN', 'COMMIT', 'ROLLBACK', 'autocommit',
            'set_session', 'transaction'
        ]
        
        in_write_operation = False
        has_transaction_control = False
        
        for i, line in enumerate(lines, 1):
            line_upper = line.upper()
            
            # 檢查是否有寫操作
            if any(op in line_upper for op in write_operations):
                in_write_operation = True
            
            # 檢查是否有事務(wù)控制
            if any(keyword in line for keyword in transaction_keywords):
                has_transaction_control = True
            
            # 如果是空行或注釋,檢查之前的操作
            if line.strip() == '' or line.strip().startswith('#'):
                if in_write_operation and not has_transaction_control:
                    issues.append({
                        'line': i - 1,
                        'severity': 'MEDIUM',
                        'message': '寫操作沒有顯式的事務(wù)管理',
                        'suggestion': '使用明確的事務(wù)控制(BEGIN/COMMIT/ROLLBACK)',
                        'context': '多個寫操作應(yīng)該在同一事務(wù)中'
                    })
                
                in_write_operation = False
                has_transaction_control = False
        
        return issues
    
    def check_index_usage(self, content: str, filepath: str) -> List[Dict[str, Any]]:
        """
        檢查索引使用
        
        Args:
            content: 文件內(nèi)容
            filepath: 文件路徑
            
        Returns:
            問題列表
        """
        issues = []
        lines = content.split('\n')
        
        # 查找可能受益于索引的查詢模式
        index_patterns = [
            (r'WHERE\s+\w+\s*=\s*', '等值查詢'),
            (r'WHERE\s+\w+\s+IN\s*\(', 'IN列表查詢'),
            (r'WHERE\s+\w+\s+LIKE\s+\'', 'LIKE查詢(可能前綴匹配)'),
            (r'ORDER\s+BY\s+\w+', '排序操作'),
            (r'GROUP\s+BY\s+\w+', '分組操作'),
            (r'JOIN\s+\w+\s+ON\s+\w+\s*=\s*\w+', '連接操作')
        ]
        
        for i, line in enumerate(lines, 1):
            line_upper = line.upper()
            
            # 跳過注釋
            if line.strip().startswith('#'):
                continue
            
            for pattern, description in index_patterns:
                if re.search(pattern, line_upper, re.IGNORECASE):
                    issues.append({
                        'line': i,
                        'severity': 'LOW',
                        'message': f'查詢可能受益于索引: {description}',
                        'suggestion': '考慮在相關(guān)列上創(chuàng)建索引',
                        'code_snippet': line.strip()[:100]
                    })
                    break
        
        return issues
    
    def generate_report(self, issues: Dict[str, List[Dict[str, Any]]]) -> str:
        """生成檢查報告"""
        report_lines = []
        report_lines.append("=" * 60)
        report_lines.append("PostgreSQL代碼質(zhì)量檢查報告")
        report_lines.append("=" * 60)
        
        total_issues = sum(len(rule_issues) for rule_issues in issues.values())
        report_lines.append(f"\n總共發(fā)現(xiàn) {total_issues} 個問題\n")
        
        severity_counts = {'CRITICAL': 0, 'HIGH': 0, 'MEDIUM': 0, 'LOW': 0}
        
        for rule_name, rule_issues in issues.items():
            if rule_issues:
                report_lines.append(f"\n{rule_name.upper()} ({len(rule_issues)}個問題):")
                report_lines.append("-" * 40)
                
                for issue in rule_issues:
                    severity = issue.get('severity', 'UNKNOWN')
                    severity_counts[severity] = severity_counts.get(severity, 0) + 1
                    
                    report_lines.append(
                        f"[{severity}] 第{issue['line']}行: {issue['message']}"
                    )
                    if 'suggestion' in issue:
                        report_lines.append(f"    建議: {issue['suggestion']}")
                    if 'code_snippet' in issue:
                        report_lines.append(f"    代碼: {issue['code_snippet']}")
                    report_lines.append("")
        
        # 添加嚴(yán)重性統(tǒng)計
        report_lines.append("\n嚴(yán)重性統(tǒng)計:")
        report_lines.append("-" * 40)
        for severity, count in severity_counts.items():
            if count > 0:
                report_lines.append(f"  {severity}: {count} 個問題")
        
        return '\n'.join(report_lines)


def check_postgresql_best_practices():
    """PostgreSQL最佳實(shí)踐檢查示例"""
    checker = PostgreSQLCodeChecker()
    
    # 示例代碼
    sample_code = """
import psycopg2

# 不好的示例:字符串拼接SQL(SQL注入風(fēng)險)
def bad_example(user_input):
    conn = psycopg2.connect("dbname=test")
    cursor = conn.cursor()
    
    # SQL注入漏洞
    query = f"SELECT * FROM users WHERE username = '{user_input}'"
    cursor.execute(query)
    
    # 連接未關(guān)閉
    return cursor.fetchall()

# 好的示例:參數(shù)化查詢
def good_example(username):
    conn = None
    cursor = None
    try:
        conn = psycopg2.connect("dbname=test")
        cursor = conn.cursor()
        
        # 參數(shù)化查詢
        query = "SELECT * FROM users WHERE username = %s"
        cursor.execute(query, (username,))
        
        return cursor.fetchall()
    finally:
        if cursor:
            cursor.close()
        if conn:
            conn.close()

# 事務(wù)處理示例
def transaction_example(user_data):
    conn = psycopg2.connect("dbname=test")
    cursor = conn.cursor()
    
    try:
        # 開始事務(wù)
        conn.autocommit = False
        
        # 多個寫操作
        cursor.execute(
            "INSERT INTO users (name, email) VALUES (%s, %s)",
            (user_data['name'], user_data['email'])
        )
        
        cursor.execute(
            "INSERT INTO logs (action) VALUES (%s)",
            ('user_created',)
        )
        
        # 提交事務(wù)
        conn.commit()
        
    except Exception as e:
        # 回滾事務(wù)
        conn.rollback()
        raise e
    finally:
        cursor.close()
        conn.close()

# 可能受益于索引的查詢
def query_without_index():
    conn = psycopg2.connect("dbname=test")
    cursor = conn.cursor()
    
    # 這個查詢可能受益于索引
    cursor.execute("""
        SELECT * FROM orders 
        WHERE customer_id = %s 
        AND order_date > %s
        ORDER BY order_date DESC
    """, (123, '2024-01-01'))
    
    return cursor.fetchall()
    """
    
    # 將示例代碼寫入臨時文件
    import tempfile
    with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
        f.write(sample_code)
        temp_file = f.name
    
    try:
        # 檢查代碼
        issues = checker.check_file(temp_file)
        report = checker.generate_report(issues)
        print(report)
    finally:
        import os
        os.unlink(temp_file)


if __name__ == "__main__":
    check_postgresql_best_practices()

7.2 PostgreSQL最佳實(shí)踐總結(jié)

連接管理最佳實(shí)踐

  • 使用連接池管理數(shù)據(jù)庫連接
  • 確保連接在finally塊中正確關(guān)閉
  • 設(shè)置合適的連接超時和重試策略

查詢優(yōu)化最佳實(shí)踐

  • 使用EXPLAIN ANALYZE分析查詢計劃
  • 為頻繁查詢的列創(chuàng)建合適索引
  • 避免SELECT *,只選擇需要的列
  • 使用LIMIT限制返回行數(shù)

事務(wù)管理最佳實(shí)踐

  • 保持事務(wù)盡可能短小
  • 使用合適的隔離級別
  • 處理異常并正確回滾
  • 避免長時間持有鎖

索引設(shè)計最佳實(shí)踐

  • 基于查詢模式設(shè)計復(fù)合索引
  • 定期分析和重建索引
  • 使用部分索引減少索引大小
  • 考慮BRIN索引用于時間序列數(shù)據(jù)

配置優(yōu)化最佳實(shí)踐

  • 根據(jù)工作負(fù)載調(diào)整shared_buffers
  • 設(shè)置合適的work_mem減少臨時文件
  • 配置有效的維護(hù)工作內(nèi)存
  • 定期更新統(tǒng)計信息

8. 總結(jié)與展望

PostgreSQL作為功能最豐富的開源數(shù)據(jù)庫,其高級特性和性能優(yōu)化能力使其能夠應(yīng)對各種復(fù)雜的應(yīng)用場景。通過本文的深入探討,我們了解到:

8.1 核心要點(diǎn)總結(jié)

  • 高級特性:JSONB、全文搜索、分區(qū)表、物化視圖等特性使PostgreSQL能夠處理多樣化數(shù)據(jù)需求
  • 性能優(yōu)化:合理的索引設(shè)計、查詢優(yōu)化、配置調(diào)整是提升性能的關(guān)鍵
  • 并發(fā)控制:MVCC機(jī)制和適當(dāng)?shù)逆i策略確保高并發(fā)下的數(shù)據(jù)一致性
  • 監(jiān)控維護(hù):系統(tǒng)化監(jiān)控和定期維護(hù)保證數(shù)據(jù)庫長期穩(wěn)定運(yùn)行

8.2 未來發(fā)展趨勢

  • 人工智能集成:PostgreSQL的MADlib擴(kuò)展支持機(jī)器學(xué)習(xí)算法
  • 時序數(shù)據(jù)優(yōu)化:TimescaleDB擴(kuò)展提供專業(yè)時序數(shù)據(jù)處理能力
  • 地理空間增強(qiáng):PostGIS繼續(xù)擴(kuò)展地理信息系統(tǒng)功能
  • 云原生支持:更好的Kubernetes集成和云服務(wù)優(yōu)化
  • 向量搜索:對AI生成內(nèi)容(AIGC)的向量相似度搜索支持

通過持續(xù)學(xué)習(xí)和實(shí)踐,開發(fā)者可以充分利用PostgreSQL的強(qiáng)大功能,構(gòu)建高效、穩(wěn)定、可擴(kuò)展的數(shù)據(jù)存儲解決方案。無論是對初創(chuàng)公司還是大型企業(yè),PostgreSQL都提供了企業(yè)級數(shù)據(jù)庫所需的一切功能,同時保持了開源軟件的靈活性和成本優(yōu)勢。

以上就是PostgreSQL高級特性與性能優(yōu)化的實(shí)戰(zhàn)指南的詳細(xì)內(nèi)容,更多關(guān)于PostgreSQL性能優(yōu)化的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • PostgreSQL 默認(rèn)隔離級別的設(shè)置

    PostgreSQL 默認(rèn)隔離級別的設(shè)置

    PostgreSQL的默認(rèn)事務(wù)隔離級別是讀已提交,這是其事務(wù)處理系統(tǒng)的基礎(chǔ)行為模式,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2025-06-06
  • PostgreSQL GIN 索引原理、應(yīng)用場景與最佳實(shí)踐

    PostgreSQL GIN 索引原理、應(yīng)用場景與最佳實(shí)踐

    本文介紹了PostgreSQL的GIN索引,涵蓋了其底層原理、支持的數(shù)據(jù)類型、工作原理、性能特征、適用場景、實(shí)戰(zhàn)案例、性能調(diào)優(yōu)、常見問題及解決方案,并提供了GIN與其它索引類型的比較,通過本文,讀者可以全面了解GIN索引的使用方法和最佳實(shí)踐,感興趣的朋友跟隨小編一起看看吧
    2025-12-12
  • 史上最全PostgreSQL?DBA最常用SQL

    史上最全PostgreSQL?DBA最常用SQL

    這篇文章主要介紹了PostgreSQL?DBA最常用SQL?,主要包括背景及常用查詢語句,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2022-10-10
  • PostgreSQL 序列綁定字段與不綁定字段的區(qū)別說明

    PostgreSQL 序列綁定字段與不綁定字段的區(qū)別說明

    這篇文章主要介紹了PostgreSQL 序列綁定字段與不綁定字段的區(qū)別說明,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-02-02
  • PostgreSQL實(shí)現(xiàn)定期備份的方法

    PostgreSQL實(shí)現(xiàn)定期備份的方法

    PostgreSQL定期備份功能可以自動備份數(shù)據(jù)庫,避免了手動備份過程中可能發(fā)生的錯誤,也極大地減輕了管理員的工作壓力,所以本文將給大家介紹一下PostgreSQL實(shí)現(xiàn)定期備份的方法,需要的朋友可以參考下
    2024-03-03
  • postgresql 存儲函數(shù)調(diào)用變量的3種方法小結(jié)

    postgresql 存儲函數(shù)調(diào)用變量的3種方法小結(jié)

    這篇文章主要介紹了postgresql 存儲函數(shù)調(diào)用變量的3種方法小結(jié),具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-01-01
  • 基于pgrouting的路徑規(guī)劃處理方法

    基于pgrouting的路徑規(guī)劃處理方法

    這篇文章主要介紹了基于pgrouting的路徑規(guī)劃處理,根據(jù)pgrouting已經(jīng)集成的Dijkstra算法來,結(jié)合postgresql數(shù)據(jù)庫來處理最短路徑,需要的朋友可以參考下
    2022-04-04
  • postgresql 利用fdw來實(shí)現(xiàn)不同數(shù)據(jù)庫之間數(shù)據(jù)互通(推薦)

    postgresql 利用fdw來實(shí)現(xiàn)不同數(shù)據(jù)庫之間數(shù)據(jù)互通(推薦)

    這篇文章主要介紹了postgresql 利用fdw來實(shí)現(xiàn)不同數(shù)據(jù)庫之間數(shù)據(jù)互通,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-02-02
  • postgresql 中的序列nextval詳解

    postgresql 中的序列nextval詳解

    這篇文章主要介紹了postgresql 中的序列nextval詳解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-02-02
  • SQLCipher數(shù)據(jù)遷移到PostgreSql詳細(xì)教程

    SQLCipher數(shù)據(jù)遷移到PostgreSql詳細(xì)教程

    這篇文章主要介紹了SQLCipher數(shù)據(jù)遷移到PostgreSql詳細(xì)教程,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友參考下吧
    2025-09-09

最新評論

精品首页在线观看视频| 大鸡巴操b视频在线| 青青操免费日综合视频观看| 伊人情人综合成人久久网小说 | 大鸡八强奸视频在线观看| 日韩欧美一级黄片亚洲| 一级黄片大鸡巴插入美女| 国产视频网站一区二区三区| 精品视频中文字幕在线播放| 大香蕉伊人中文字幕| 红桃av成人在线观看| 偷拍自拍 中文字幕| 91极品新人『兔兔』精品新作| 一区二区三区四区五区性感视频| 天天日天天干天天插舔舔| 久久久久久99国产精品| 99一区二区在线观看| 国产一区二区火爆视频| 欧美激情精品在线观看| 亚洲一区二区三区精品乱码| 人妻激情图片视频小说| 亚洲成人av在线一区二区| 在线播放国产黄色av| 国产成人精品久久二区91| 国产91精品拍在线观看| 亚洲精品午夜久久久久| 精品人妻每日一部精品| 国产精品国色综合久久| 亚洲高清一区二区三区视频在线| 青青色国产视频在线| 97少妇精品在线观看| 国产精品午夜国产小视频| 久久久精品999精品日本| 18禁美女羞羞免费网站| 毛片一级完整版免费| 日韩欧美中文国产在线 | 日韩欧美国产一区ab| 免费一级特黄特色大片在线观看| 欧美性受xx黑人性猛交| 伊人网中文字幕在线视频| 在线亚洲天堂色播av电影| 亚洲一级美女啪啪啪| 亚洲狠狠婷婷综合久久app| 黄色av网站免费在线| 夏目彩春在线中文字幕| 青青色国产视频在线| 一区二区三区四区五区性感视频| 国产视频在线视频播放| 国产在线观看黄色视频| 亚洲福利精品视频在线免费观看| 蜜桃视频在线欧美一区| 9国产精品久久久久老师| 中文字幕日本人妻中出| 爆乳骚货内射骚货内射在线 | 最新91九色国产在线观看| 亚洲1区2区3区精华液| 久久三久久三久久三久久| 青青青视频自偷自拍38碰| 成人国产影院在线观看| 国产不卡av在线免费| 久久99久久99精品影院| 中文字幕在线永久免费播放| 91破解版永久免费| 国产亚洲成人免费在线观看| 欧美成一区二区三区四区| 中文字幕在线免费第一页| 大鸡巴插入美女黑黑的阴毛| 亚洲一区自拍高清免费视频| 日本人妻欲求不满中文字幕| 新97超碰在线观看| 2021久久免费视频| 55夜色66夜色国产精品站| av中文字幕网址在线| 欧美亚洲自偷自拍 在线| 中文字幕中文字幕人妻| 日本成人不卡一区二区| 五月天久久激情视频| 岳太深了紧紧的中文字幕| 不卡一区一区三区在线| 大胆亚洲av日韩av| 日本一本午夜在线播放| 999九九久久久精品| 在线免费91激情四射| 日本黄色三级高清视频| caoporn蜜桃视频| 爆乳骚货内射骚货内射在线 | 喷水视频在线观看这里只有精品| 91快播视频在线观看| 日日操夜夜撸天天干| 晚上一个人看操B片| 制丝袜业一区二区三区| 国产又粗又硬又大视频| 中文字幕在线免费第一页| 国产在线自在拍91国语自产精品| 99热久久这里只有精品8| 偷拍自拍福利视频在线观看| 自拍偷拍 国产资源| 欧美偷拍亚洲一区二区| 日韩剧情片电影在线收看| 老司机欧美视频在线看| 在线免费视频 自拍| 中文字幕在线视频一区二区三区| 1000部国产精品成人观看视频| 夜夜嗨av一区二区三区中文字幕| 偷青青国产精品青青在线观看| 中文字幕第一页国产在线| 国产超码片内射在线| 日本性感美女视频网站| 少妇高潮无套内谢麻豆| 亚洲欧洲一区二区在线观看| 天天日天天干天天插舔舔| 欧美成人小视频在线免费看| 天天躁夜夜躁日日躁a麻豆| 丝袜美腿视频诱惑亚洲无| 欧美偷拍自拍色图片| 国内精品在线播放第一页| 中文字幕日本人妻中出| 欧美另类重口味极品在线观看| 亚洲va国产va欧美精品88| 99精品一区二区三区的区| 视频二区在线视频观看| 欧美精品黑人性xxxx| 538精品在线观看视频| 国产在线拍揄自揄视频网站| 国产美女午夜福利久久| 国产亚洲四十路五十路| 亚洲精品国产久久久久久| 99的爱精品免费视频| 强行扒开双腿猛烈进入免费版| 欧美色呦呦最新网址| 精彩视频99免费在线| av视网站在线观看| 亚洲欧美人精品高清| 秋霞午夜av福利经典影视| 亚洲一区二区久久久人妻| 夜夜嗨av蜜臀av| 婷婷久久久综合中文字幕| 97香蕉碰碰人妻国产樱花| 制服丝袜在线人妻中文字幕| 欧美日韩亚洲国产无线码| 中出中文字幕在线观看| 欧美成人综合色在线噜噜| 丁香花免费在线观看中文字幕| 亚洲在线观看中文字幕av| 欧美男同性恋69视频| 国产成人无码精品久久久电影| 亚洲激情,偷拍视频| 激情伦理欧美日韩中文字幕| 在线观看av亚洲情色| 人妻无码色噜噜狠狠狠狠色| 成人影片高清在线观看| 久久久久久久99精品| 亚洲天天干 夜夜操| 中文字幕一区二区人妻电影冢本| 一区二区三区四区中文| eeuss鲁片一区二区三区| 色秀欧美视频第一页| 午夜在线观看岛国av,com| 综合激情网激情五月五月婷婷| 在线免费观看日本片| 888欧美视频在线| 青青青爽视频在线播放| 成人在线欧美日韩国产| 亚洲欧美激情人妻偷拍| 9色精品视频在线观看| 国产性感美女福利视频| 91天堂天天日天天操| 中文字幕高清免费在线人妻| 果冻传媒av一区二区三区| 2025年人妻中文字幕乱码在线| 国产污污污污网站在线| 午夜在线观看岛国av,com| 亚洲欧美成人综合视频| 免费高清自慰一区二区三区网站| 欲乱人妻少妇在线视频裸| 欲乱人妻少妇在线视频裸| 国产日韩欧美视频在线导航| 天天日天天操天天摸天天舔| 福利视频一区二区三区筱慧| 97超碰最新免费在线观看| 一个人免费在线观看ww视频| 韩国亚洲欧美超一级在线播放视频| 绝色少妇高潮3在线观看| 免费啪啪啪在线观看视频| 久久免看30视频口爆视频| 日韩加勒比东京热二区| 天天操天天爽天天干| 免费大片在线观看视频网站| 国产精品一二三不卡带免费视频| 超鹏97历史在线观看| 青青青青草手机在线视频免费看| 老熟妇凹凸淫老妇女av在线观看| 激情啪啪啪啪一区二区三区 | 新婚人妻聚会被中出| 早川濑里奈av黑人番号| 日本精品一区二区三区在线视频。| 天天摸天天日天天操| 婷婷五月亚洲综合在线| aⅴ五十路av熟女中出| 福利视频广场一区二区| 天堂av在线官网中文| 日韩美女搞黄视频免费| 人妻3p真实偷拍一二区| 天天日天天干天天要| 在线观看视频 你懂的| 国产精品一区二区三区蜜臀av| 福利视频一区二区三区筱慧| 日韩a级黄色小视频| 91大屁股国产一区二区| 日本黄色三级高清视频| 黄色的网站在线免费看| 国产熟妇乱妇熟色T区| 天堂女人av一区二区| 99国产精品窥熟女精品| 日韩av中文在线免费观看| 大鸡吧插逼逼视频免费看| 婷婷色国产黑丝少妇勾搭AV| 丁香花免费在线观看中文字幕| 91中文字幕免费在线观看| 人妻少妇中文有码精品| 亚洲av琪琪男人的天堂| 亚洲成人精品女人久久久| 9色精品视频在线观看| 在线免费观看黄页视频| 激情色图一区二区三区| 日本真人性生活视频免费看| 搡老熟女一区二区在线观看| 91自产国产精品视频| 搞黄色在线免费观看| 欧洲精品第一页欧洲精品亚洲| 东游记中文字幕版哪里可以看到 | 日韩在线视频观看有码在线| 亚洲成a人片777777| 肏插流水妹子在线乐播下载| 午夜频道成人在线91| 三级黄色亚洲成人av| 大香蕉日本伊人中文在线| av成人在线观看一区| okirakuhuhu在线观看| 精品久久久久久久久久中文蒉| 岛国av高清在线成人在线| 国产精品入口麻豆啊啊啊| 亚洲va天堂va国产va久| 国产V亚洲V天堂无码欠欠| 欧美专区第八页一区在线播放| 福利片区一区二体验区| 三级黄色亚洲成人av| 社区自拍揄拍尻屁你懂的| 国产精品视频男人的天堂| eeuss鲁片一区二区三区| 黑人解禁人妻叶爱071| 97瑟瑟超碰在线香蕉| 99视频精品全部15| 精品一区二区三区在线观看| 欧美日韩一区二区电影在线观看 | 亚洲老熟妇日本老妇| 亚洲国产第一页在线观看| 欧美麻豆av在线播放| 一区二区熟女人妻视频| 亚洲免费国产在线日韩| 国产91精品拍在线观看| 一级黄色av在线观看| 9色精品视频在线观看| 老司机免费福利视频网| 欧美一区二区三区激情啪啪啪| 日本美女成人在线视频| 东游记中文字幕版哪里可以看到| 国产91久久精品一区二区字幕| 中文字幕第一页国产在线| 美女张开腿让男生操在线看| 黑人解禁人妻叶爱071| 香港一级特黄大片在线播放| 丰满的继坶3中文在线观看| 亚洲综合一区二区精品久久| 好男人视频在线免费观看网站| 国产三级片久久久久久久| 亚洲一区二区三区久久受| 青青草成人福利电影| 亚洲综合自拍视频一区| 国产麻豆91在线视频| 91色网站免费在线观看| 91极品新人『兔兔』精品新作| 沈阳熟妇28厘米大战黑人| 久久久久只精品国产三级| 亚洲av黄色在线网站| 果冻传媒av一区二区三区| 91久久精品色伊人6882| 91精品高清一区二区三区| 粉嫩欧美美人妻小视频| 青青伊人一精品视频| 午夜影院在线观看视频羞羞羞| 国产日本精品久久久久久久| 中文字幕在线欧美精品| 在线免费观看日本片| 黄片大全在线观看观看| 女同互舔一区二区三区| 日韩中文字幕精品淫| 涩涩的视频在线观看视频| 日韩a级黄色小视频| 一级黄色片夫妻性生活| 天天日天天敢天天干| 同居了嫂子在线播高清中文| 国产精品自偷自拍啪啪啪| 99视频精品全部15| 午夜激情久久不卡一区二区 | 国产一区成人在线观看视频 | 啊慢点鸡巴太大了啊舒服视频| 91试看福利一分钟| 91高清成人在线视频| 日本免费一级黄色录像| 日韩av中文在线免费观看| 国产精品中文av在线播放| 黑人巨大精品欧美视频| 在线观看免费视频网| 中文字幕一区二区三区蜜月| 国产91嫩草久久成人在线视频| av日韩在线免费播放| 国产高清在线观看1区2区| 日韩近亲视频在线观看| 亚洲老熟妇日本老妇| 污污小视频91在线观看| 97青青青手机在线视频| 久久久制服丝袜中文字幕| 小穴多水久久精品免费看| 综合精品久久久久97| 中文字幕日本人妻中出| 色婷婷综合激情五月免费观看| 九色porny九色9l自拍视频| 天天射夜夜操狠狠干| 亚洲天堂有码中文字幕视频| 精品视频中文字幕在线播放| 婷婷午夜国产精品久久久| 午夜精品一区二区三区更新| 任你操视频免费在线观看| 久久久久久久久久一区二区三区| 国产一线二线三线的区别在哪| 美女大bxxxx内射| 一区二区三区综合视频| 国产不卡av在线免费| 区一区二区三国产中文字幕| 青青青青青青青青青国产精品视频 | 国产又粗又黄又硬又爽| 欧美一区二区三区啪啪同性| 日本韩国免费一区二区三区视频| 亚洲自拍偷拍综合色| 伊人精品福利综合导航| 熟女在线视频一区二区三区| 欧美女同性恋免费a| 日本丰满熟妇BBXBBXHD| 亚洲国产免费av一区二区三区 | 欧洲日韩亚洲一区二区三区 | 欧美日韩情色在线观看| 午夜美女福利小视频| 天天操天天弄天天射| 超污视频在线观看污污污| 北条麻妃肉色丝袜视频| 91在线视频在线精品3| 一区二区在线视频中文字幕 | av欧美网站在线观看| 国产精品久久综合久久| 97青青青手机在线视频| 国产欧美日韩在线观看不卡| 亚洲av男人的天堂你懂的| 偷拍自拍福利视频在线观看| 直接观看免费黄网站| 韩国黄色一级二级三级| 夜夜嗨av蜜臀av| 天天色天天操天天透| 最新国产亚洲精品中文在线| 日韩欧美国产精品91| 最新日韩av传媒在线| 93人妻人人揉人人澡人人| 久久久噜噜噜久久熟女av| 欧美一级视频一区二区| 亚洲免费成人a v| 国产精彩对白一区二区三区| 免费人成黄页网站在线观看国产| 国产一区二区欧美三区| 五月精品丁香久久久久福利社| 日韩剧情片电影在线收看| 视频二区在线视频观看| 精品国产乱码一区二区三区乱| 久久精品久久精品亚洲人| 五十路熟女人妻一区二区9933| 成人国产小视频在线观看| 亚洲欧美自拍另类图片| 中文字幕在线永久免费播放| 扒开腿挺进肉嫩小18禁视频| 亚洲熟女综合色一区二区三区四区| 视频二区在线视频观看| 亚洲国产精品久久久久久6| 91大屁股国产一区二区| 香蕉aⅴ一区二区三区| 四虎永久在线精品免费区二区| aaa久久久久久久久| 女警官打开双腿沦为性奴| 青青在线视频性感少妇和隔壁黑丝| 中文字幕 码 在线视频| 91久久人澡人人添人人爽乱| 动色av一区二区三区| 免费69视频在线看| 男人和女人激情视频| 91在线视频在线精品3| 国产精品一区二区av国| 亚洲天堂有码中文字幕视频| 久久精品国产23696| 亚洲av在线观看尤物| 久久艹在线观看视频| 日韩激情文学在线视频| 激情内射在线免费观看| 成年午夜影片国产片| 中文字幕人妻一区二区视频| 欧洲欧美日韩国产在线| 18禁无翼鸟成人在线| 日日操夜夜撸天天干| 99婷婷在线观看视频| 都市家庭人妻激情自拍视频| 国产精品三级三级三级| 午夜美女福利小视频| 97少妇精品在线观看| 成人精品视频99第一页| 亚洲综合在线视频可播放| 家庭女教师中文字幕在线播放 | 狠狠躁夜夜躁人人爽天天天天97| 国产日韩一区二区在线看| 中文字幕中文字幕 亚洲国产| 亚洲公开视频在线观看| 日本一二三中文字幕| 真实国产乱子伦一区二区| 日本中文字幕一二区视频| 1769国产精品视频免费观看| 黑人乱偷人妻中文字幕| av天堂资源最新版在线看| 欧美日韩情色在线观看| 国产性色生活片毛片春晓精品 | 国产又粗又黄又硬又爽| 亚洲成人线上免费视频观看| 最新日韩av传媒在线| 日本高清成人一区二区三区| 青青草亚洲国产精品视频| 国产视频一区二区午夜| 国产精品熟女久久久久浪潮| 青青青激情在线观看视频| 久久久精品欧洲亚洲av| 中文字幕av一区在线观看| 四川五十路熟女av| 97超碰最新免费在线观看| 一区二区三区四区中文| 天天想要天天操天天干| 99的爱精品免费视频| 岛国黄色大片在线观看| 成人24小时免费视频| 淫秽激情视频免费观看| 国产午夜福利av导航| 日韩熟女av天堂系列| 亚洲1区2区3区精华液| 男人操女人的逼免费视频| 日本www中文字幕| 午夜影院在线观看视频羞羞羞| 男生舔女生逼逼视频| 亚洲熟女综合色一区二区三区四区 | 涩涩的视频在线观看视频| 色综合久久五月色婷婷综合| 日本xx片在线观看| 日韩美女福利视频网| 中文字幕一区二区自拍| 不卡精品视频在线观看| 久久丁香婷婷六月天| 亚洲国产最大av综合| 视频在线亚洲一区二区| 三级av中文字幕在线观看| av久久精品北条麻妃av观看| 特级无码毛片免费视频播放 | 888亚洲欧美国产va在线播放| 亚洲精品国产在线电影| 人妻熟女在线一区二区| 经典亚洲伊人第一页| 午夜激情精品福利视频| 瑟瑟视频在线观看免费视频| 国产精品人妻一区二区三区网站| 中文字幕免费福利视频6| 国产福利小视频免费观看| 亚洲欧美色一区二区| 日韩二区视频一线天婷婷五| 日韩欧美一级黄片亚洲| 蜜桃视频在线欧美一区| 青青青青青青青青青国产精品视频| 我想看操逼黄色大片| 国产欧美精品免费观看视频| 最新的中文字幕 亚洲| 黄色视频在线观看高清无码 | 91在线视频在线精品3| 任我爽精品视频在线播放| 国产又色又刺激在线视频| 国产九色91在线视频| 日本乱人一区二区三区| 97人妻无码AV碰碰视频| 中文字幕AV在线免费看 | 福利片区一区二体验区| 馒头大胆亚洲一区二区| 国产精品一区二区三区蜜臀av| 欧洲亚洲欧美日韩综合| 午夜成午夜成年片在线观看 | 天天摸天天日天天操| 国产麻豆乱子伦午夜视频观看| 中文字幕人妻三级在线观看| 嫩草aⅴ一区二区三区| 在线成人日韩av电影| 丝袜国产专区在线观看| 亚洲精品中文字幕下载| 岛国免费大片在线观看| 男人天堂av天天操| 久久麻豆亚洲精品av| 亚洲免费成人a v| 99人妻视频免费在线| 欧洲日韩亚洲一区二区三区 | 日本午夜福利免费视频| 美女小视频网站在线| av乱码一区二区三区| 天天摸天天干天天操科普| 欧美日韩激情啪啪啪| 日本人妻欲求不满中文字幕| 国产福利小视频二区| 精品美女福利在线观看| 99热99这里精品6国产| 日本裸体熟妇区二区欧美| 欧美黑人与人妻精品| 日韩精品中文字幕福利| 欧美xxx成人在线| 黄色男人的天堂视频| 国产精品人妻一区二区三区网站| 婷婷久久久久深爱网| 4个黑人操素人视频网站精品91| 99人妻视频免费在线| 日韩一区二区三区三州| 最新欧美一二三视频| 青青青青青青青在线播放视频| 端庄人妻堕落挣扎沉沦| 青青青国产免费视频| 国产揄拍高清国内精品对白| 黄色成人在线中文字幕| 非洲黑人一级特黄片| 五月婷婷在线观看视频免费| 色综合久久久久久久久中文| 首之国产AV医生和护士小芳| av一本二本在线观看| 肏插流水妹子在线乐播下载| 日本五十路熟新垣里子| 开心 色 六月 婷婷| 人妻少妇性色欲欧美日韩| mm131美女午夜爽爽爽| 亚洲午夜精品小视频| 天堂v男人视频在线观看| 日本人竟这样玩学生妹| 淫秽激情视频免费观看| 亚洲天堂精品久久久| 国产剧情演绎系列丝袜高跟| 欧美一区二区三区在线资源 | 狠狠嗨日韩综合久久| 超碰在线中文字幕一区二区| 亚洲区欧美区另类最新章节| 国产在线拍揄自揄视频网站| 日本一二三中文字幕| 十八禁在线观看地址免费| 一区二区三区日本伦理| 男人的天堂在线黄色| 一区二区三区 自拍偷拍| 国产一区二区视频观看| 在线免费观看av日韩| 欧美视频综合第一页| 日韩不卡中文在线视频网站| 国产va在线观看精品| 综合精品久久久久97| 亚洲成人黄色一区二区三区| 在线视频免费观看网| 美女福利写真在线观看视频| 欧美在线精品一区二区三区视频| 最新中文字幕乱码在线| av老司机精品在线观看| 一个人免费在线观看ww视频| 国产成人自拍视频在线免费观看| 精品区一区二区三区四区人妻| 成人av天堂丝袜在线观看| 日本一二三区不卡无| 精品成人啪啪18免费蜜臀| 午夜精品福利91av| 玖玖一区二区在线观看| 年轻的人妻被夫上司侵犯| 天天操天天弄天天射| 精品国产午夜视频一区二区| 91免费福利网91麻豆国产精品| 欧美一级色视频美日韩| 亚洲老熟妇日本老妇| 97少妇精品在线观看| 久久丁香花五月天色婷婷| 亚洲国产免费av一区二区三区 | 夜色福利视频在线观看| 免费手机黄页网址大全| 中文字幕高清在线免费播放| 久久久久只精品国产三级| 亚欧在线视频你懂的| 午夜福利人人妻人人澡人人爽| 欧美色呦呦最新网址| 国产黑丝高跟鞋视频在线播放| 夜色福利视频在线观看| 91免费福利网91麻豆国产精品| 搡老熟女一区二区在线观看| av天堂中文免费在线| 成年人该看的视频黄免费| 91老师蜜桃臀大屁股| 中文字幕日韩无敌亚洲精品 | 精品国产亚洲av一淫| 亚洲免费视频欧洲免费视频| 天天操夜夜操天天操天天操 | 色狠狠av线不卡香蕉一区二区| 亚洲av成人网在线观看| 亚洲综合一区二区精品久久| 亚洲黄色av网站免费播放| 国际av大片在线免费观看| 性欧美日本大妈母与子| 亚洲国产在线精品国偷产拍| 中文字幕一区二区亚洲一区| 人妻凌辱欧美丰满熟妇| 男生舔女生逼逼的视频| 99热99这里精品6国产| 亚洲一级特黄特黄黄色录像片| 边摸边做超爽毛片18禁色戒 | 中国把吊插入阴蒂的视频| 激情伦理欧美日韩中文字幕| 狠狠操狠狠操免费视频| 激情图片日韩欧美人妻| 免费在线看的黄片视频| 欧美一区二区三区乱码在线播放| 亚洲公开视频在线观看| 亚洲熟妇x久久av久久| 亚洲成人情色电影在线观看| 激情小视频国产在线| 人妻无码色噜噜狠狠狠狠色| 999九九久久久精品| 亚洲激情唯美亚洲激情图片| 中文字幕在线视频一区二区三区| 国产女人被做到高潮免费视频| 青青青爽视频在线播放| 日日操综合成人av| 国产日韩精品免费在线| 国内精品在线播放第一页| 天天通天天透天天插| 国产精品久久久久久久精品视频 | 99热久久极品热亚洲| 青青青青操在线观看免费| 2022国产精品视频| 久精品人妻一区二区三区 | 欧美另类一区二区视频| 51国产成人精品视频| 欧美怡红院视频在线观看| 93精品视频在线观看| 国产白嫩美女一区二区| 天天干狠狠干天天操| 啪啪啪操人视频在线播放| 国产综合高清在线观看| 人妻av无码专区久久绿巨人| 日韩av大胆在线观看| 啊用力插好舒服视频| 2025年人妻中文字幕乱码在线| 亚洲熟女女同志女同| 色哟哟国产精品入口| 啪啪啪啪啪啪啪啪av| 亚洲成人黄色一区二区三区 | 三级黄色亚洲成人av| 91av精品视频在线| 欧美日本在线视频一区| 亚洲日本一区二区久久久精品| 亚洲综合在线视频可播放| 中文字幕人妻被公上司喝醉在线| 久久久极品久久蜜桃| 动漫av网站18禁| 99人妻视频免费在线| 精品国产成人亚洲午夜| 天天做天天干天天舔| 五十路人妻熟女av一区二区| 欲乱人妻少妇在线视频裸| 欧美精品资源在线观看| 1024久久国产精品| 天天日天天操天天摸天天舔| 国产九色91在线观看精品| 亚洲av一妻不如妾| 国际av大片在线免费观看| 欧美国产亚洲中英文字幕| 亚洲中文字幕国产日韩| 日本熟女精品一区二区三区| 国产老熟女伦老熟妇ⅹ| 国产欧美精品免费观看视频| 888亚洲欧美国产va在线播放| 亚洲成人国产综合一区| 国产精品午夜国产小视频| 一区二区三区毛片国产一区| 欧美亚洲免费视频观看| 欧美久久一区二区伊人| 中国熟女@视频91| 少妇高潮无套内谢麻豆| 日本午夜久久女同精女女| 亚洲 清纯 国产com| 熟女国产一区亚洲中文字幕| 亚洲av在线观看尤物| 午夜精品久久久久久99热| 人人妻人人人操人人人爽| 国产va在线观看精品| 57pao国产一区二区| 国产性感美女福利视频| 色花堂在线av中文字幕九九| 中文字幕—97超碰网| 在线观看911精品国产| 高清成人av一区三区| 91国产在线视频免费观看| 国产精品一区二区三区蜜臀av| 搞黄色在线免费观看| 大鸡巴操娇小玲珑的女孩逼| 姐姐的朋友2在线观看中文字幕| 欧美视频不卡一区四区| 91欧美在线免费观看| 亚洲第17页国产精品| 内射久久久久综合网| 2020国产在线不卡视频| 中文字幕日韩精品就在这里| 91九色国产熟女一区二区| 成熟丰满熟妇高潮xx×xx | 中国把吊插入阴蒂的视频| av老司机精品在线观看| 亚洲人妻国产精品综合| 国产变态另类在线观看| 人人在线视频一区二区| 性感美女福利视频网站| 亚洲人人妻一区二区三区| av中文字幕网址在线| 精品久久久久久高潮| 农村胖女人操逼视频| 青青青青青青青青青国产精品视频| 欧美精品欧美极品欧美视频 | 天天做天天干天天舔| 在线观看视频污一区| 国产之丝袜脚在线一区二区三区| 非洲黑人一级特黄片| 免费男阳茎伸入女阳道视频 | 欧美男人大鸡吧插女人视频| 中文字幕无码一区二区免费| 天天插天天色天天日| 绝顶痉挛大潮喷高潮无码| 日本欧美视频在线观看三区| 日日摸夜夜添夜夜添毛片性色av| 超级av免费观看一区二区三区| huangse网站在线观看| 欧美日本在线视频一区| 日本男女操逼视频免费看 | 青青青视频自偷自拍38碰| 亚洲av成人免费网站| 丝袜肉丝一区二区三区四区在线| 强行扒开双腿猛烈进入免费版| 久久这里有免费精品| 婷婷色国产黑丝少妇勾搭AV| 一级黄色片夫妻性生活| 性欧美激情久久久久久久| 好太好爽好想要免费| 播放日本一区二区三区电影| 天美传媒mv视频在线观看| 五十路熟女av天堂| 88成人免费av网站| 91精品啪在线免费| 亚洲成人av一区久久| 成年人免费看在线视频| 国产V亚洲V天堂无码欠欠| 18禁免费av网站| 2020国产在线不卡视频| 国产伊人免费在线播放| 一区二区三区毛片国产一区| 夫妻在线观看视频91| 91色九色porny| 久久久久久性虐视频| 日韩av中文在线免费观看| 福利在线视频网址导航| 亚洲av人人澡人人爽人人爱| tube69日本少妇| 亚洲视频在线观看高清| 大鸡吧插入女阴道黄色片| 欧美怡红院视频在线观看| 亚洲高清国产拍青青草原| 动漫精品视频在线观看| 午夜精品久久久久麻豆影视| 天堂av在线播放免费| 人妻av无码专区久久绿巨人| 国产精品欧美日韩区二区| 777奇米久久精品一区| 高清一区二区欧美系列| 91欧美在线免费观看| 天天躁日日躁狠狠躁av麻豆| 夜女神免费福利视频| 国产综合高清在线观看| 青青热久免费精品视频在线观看 | 一区二区三区欧美日韩高清播放| 欧美专区日韩专区国产专区| 岛国免费大片在线观看| 少妇高潮无套内谢麻豆| 亚洲欧美色一区二区| 天天射,天天操,天天说| 国产三级片久久久久久久| 青娱乐极品视频青青草| 快点插进来操我逼啊视频| 亚洲午夜电影之麻豆| 日韩av大胆在线观看| 成人激情文学网人妻| 一级黄片大鸡巴插入美女| 伊人情人综合成人久久网小说| 九九热99视频在线观看97| 年轻的人妻被夫上司侵犯| 日韩成人免费电影二区| 国产视频网站一区二区三区| 亚欧在线视频你懂的| 亚洲精品av在线观看| 晚上一个人看操B片| 天天操天天污天天射| 国产福利小视频二区| 午夜激情高清在线观看| 少妇人妻久久久久视频黄片| 岛国av高清在线成人在线| 国产精品亚洲а∨天堂免| 成年人免费看在线视频| 成人国产影院在线观看| 日日夜夜精品一二三| 老司机免费视频网站在线看| 日本www中文字幕| avjpm亚洲伊人久久| 国产一线二线三线的区别在哪| 韩国三级aaaaa高清视频| 精品视频国产在线观看| 国产精品自拍视频大全| 久久这里只有精品热视频| 国产无遮挡裸体免费直播视频| 夫妻在线观看视频91| 蜜桃久久久久久久人妻| 欧美aa一级一区三区四区| 超碰97人人澡人人| 欧美精品 日韩国产| 国产精品黄色的av| 久久农村老妇乱69系列| 老司机99精品视频在线观看| 污污小视频91在线观看| 欧美精品亚洲精品日韩在线| 97精品成人一区二区三区 | 91麻豆精品91久久久久同性| 99热久久极品热亚洲| 亚洲一级美女啪啪啪| 天堂资源网av中文字幕| 国产一区av澳门在线观看| 在线视频国产欧美日韩| 蜜桃精品久久久一区二区| 桃色视频在线观看一区二区| 亚洲精品欧美日韩在线播放| 在线免费视频 自拍| 欧美日本aⅴ免费视频| 成人乱码一区二区三区av| 五十路人妻熟女av一区二区 | 成人伊人精品色xxxx视频| 最新国产亚洲精品中文在线| 欧美国产亚洲中英文字幕| 1区2区3区不卡视频| 亚洲国产精品免费在线观看| 激情五月婷婷综合色啪| 热思思国产99re| 夜夜嗨av一区二区三区中文字幕| 亚洲成a人片777777| 最新欧美一二三视频| 国产精品成久久久久三级蜜臀av| 在线不卡成人黄色精品| 99精品免费久久久久久久久a| 免费观看成年人视频在线观看| 人人爱人人妻人人澡39| 亚洲精品中文字幕下载| 91亚洲手机在线视频播放| 久久精品亚洲国产av香蕉| 三上悠亚和黑人665番号| 亚洲第一黄色在线观看| 91超碰青青中文字幕| 国产日韩精品电影7777| 女同久久精品秋霞网| 91精品啪在线免费| 天美传媒mv视频在线观看| 亚洲午夜精品小视频| 精品乱子伦一区二区三区免费播 | 天天日天天日天天擦| 93精品视频在线观看| 97人妻色免费视频| 动漫精品视频在线观看| 人人妻人人人操人人人爽| 91快播视频在线观看| 女生自摸在线观看一区二区三区| 99国内小视频在现欢看| 大陆精品一区二区三区久久| 2o22av在线视频| 中出中文字幕在线观看| 久久久久久性虐视频| 青青青青操在线观看免费| 欧美成人黄片一区二区三区 | 亚洲欧美国产麻豆综合| 久久丁香花五月天色婷婷| 韩国爱爱视频中文字幕| 日本福利午夜电影在线观看| 男人的天堂在线黄色| 日韩美av高清在线| 日日夜夜精品一二三| 亚洲狠狠婷婷综合久久app| 国产黄色片蝌蚪九色91| 亚洲人妻30pwc| 亚洲熟女久久久36d| 男人的天堂在线黄色| 成人av亚洲一区二区| 亚洲综合自拍视频一区| 亚洲伊人久久精品影院一美女洗澡 | 亚洲免费成人a v| 黑人借宿ntr人妻的沦陷2| 国产精品久久久久久美女校花| 亚洲精品在线资源站| 午夜久久久久久久精品熟女 | 青娱乐极品视频青青草| 久久综合老鸭窝色综合久久| 一级黄色片夫妻性生活| 国产精品久久久久久久久福交| 动漫黑丝美女的鸡巴| 红杏久久av人妻一区| 黄色录像鸡巴插进去| 久碰精品少妇中文字幕av| 日韩精品中文字幕播放| 中文字幕高清免费在线人妻| 在线观看视频 你懂的| 日视频免费在线观看| 日韩欧美一级aa大片| 天天躁日日躁狠狠躁av麻豆| 性色av一区二区三区久久久| 亚洲青青操骚货在线视频| 天天干天天操天天爽天天摸| 我想看操逼黄色大片| 亚洲欧美人精品高清| av视屏免费在线播放| 午夜在线精品偷拍一区二| 欧美一区二区三区久久久aaa| 亚洲va天堂va国产va久| 天天日天天摸天天爱| 国产午夜亚洲精品不卡在线观看| 宅男噜噜噜666国产| 最新日韩av传媒在线| 日本免费一级黄色录像| 大鸡吧插逼逼视频免费看 | 亚洲国产精品久久久久蜜桃| 亚洲成人黄色一区二区三区| 国产亚洲欧美45p| 亚洲精品无码色午夜福利理论片| 久久精品在线观看一区二区| 国产一区自拍黄视频免费观看| 国产亚洲精品品视频在线| 日本啪啪啪啪啪啪啪| jul—619中文字幕在线| 国产视频在线视频播放| 欧美 亚洲 另类综合| 国产美女精品福利在线| 被大鸡吧操的好舒服视频免费| 天天做天天干天天舔| 东游记中文字幕版哪里可以看到| 99热国产精品666| 亚洲国产欧美国产综合在线| 青青青青操在线观看免费| 日日夜夜大香蕉伊人| 好吊操视频这里只有精品| 香港三日本三韩国三欧美三级| 天天通天天透天天插| 亚洲1卡2卡三卡4卡在线观看| 美女福利视频导航网站| 美女骚逼日出水来了| 91精品国产综合久久久蜜| 福利午夜视频在线合集| 性欧美日本大妈母与子| 中文字幕日韩人妻在线三区| 国产91嫩草久久成人在线视频| 天天艹天天干天天操| 抽查舔水白紧大视频| 久久久91蜜桃精品ad| 欧美一级色视频美日韩| yellow在线播放av啊啊啊| 成人网18免费视频版国产| 在线新三级黄伊人网| 77久久久久国产精产品| 天天干狠狠干天天操| 久久免看30视频口爆视频| 极品粉嫩小泬白浆20p主播| aⅴ五十路av熟女中出| 亚洲Av无码国产综合色区| 一区二区三区 自拍偷拍| 亚洲图库另类图片区| 精品高跟鞋丝袜一区二区| 亚洲综合在线观看免费| 精品日产卡一卡二卡国色天香 | 黄页网视频在线免费观看| 夜夜操,天天操,狠狠操| 亚洲欧美精品综合图片小说| 久久永久免费精品人妻专区| 亚洲国产精品黑丝美女| 播放日本一区二区三区电影| 精品少妇一二三视频在线| 韩国男女黄色在线观看| 国产精品黄大片在线播放| 东京热男人的av天堂| 97少妇精品在线观看| 日本啪啪啪啪啪啪啪| 中文字幕免费福利视频6| 亚洲自拍偷拍精品网| 传媒在线播放国产精品一区| 亚洲va国产va欧美精品88| 日本女人一级免费片| 五月天色婷婷在线观看视频免费| 男女啪啪视频免费在线观看| 夏目彩春在线中文字幕| 夜色17s精品人妻熟女| 人妻丰满熟妇综合网| 亚洲免费视频欧洲免费视频| 在线不卡日韩视频播放| 被大鸡吧操的好舒服视频免费| 亚洲 色图 偷拍 欧美| 中文字幕高清免费在线人妻| 新婚人妻聚会被中出| 天天干天天操天天爽天天摸| 91she九色精品国产| 91精品国产观看免费| 9国产精品久久久久老师| 亚洲欧美清纯唯美另类| 中文字幕在线欧美精品| 91精品国产91青青碰| 国产一区二区神马久久| 日韩亚洲高清在线观看| 亚洲熟妇久久无码精品| 老有所依在线观看完整版| 亚洲日本一区二区久久久精品| 久久一区二区三区人妻欧美| 视频二区在线视频观看| 成人18禁网站在线播放| 日韩美女搞黄视频免费| 91高清成人在线视频| 自拍偷拍日韩欧美亚洲| 在线观看日韩激情视频| 夜夜操,天天操,狠狠操| 国产品国产三级国产普通话三级| 18禁网站一区二区三区四区| 日韩欧美在线观看不卡一区二区| 国产精品三级三级三级| 在线观看黄色成年人网站 | 国产精品一区二区av国| 九一传媒制片厂视频在线免费观看 | 亚洲麻豆一区二区三区| 女同久久精品秋霞网| 日韩精品激情在线观看| 精品国产在线手机在线| 欧美中文字幕一区最新网址| 东游记中文字幕版哪里可以看到| 92福利视频午夜1000看| 亚洲一区二区三区久久午夜| 五十路人妻熟女av一区二区| 亚洲精品在线资源站| 亚洲av午夜免费观看| 毛片一级完整版免费| 欧美色呦呦最新网址| av线天堂在线观看| 91免费放福利在线观看| 国产极品美女久久久久久| 一区二区久久成人网| 国产第一美女一区二区三区四区 | 三级av中文字幕在线观看| 日韩欧美制服诱惑一区在线| 国产日韩欧美视频在线导航| 日本特级片中文字幕| 亚洲欧美激情国产综合久久久| 成人高清在线观看视频| 亚洲一级av大片免费观看| 国产精品3p和黑人大战| 啪啪啪啪啪啪啪免费视频| 国产不卡av在线免费| 亚洲成a人片777777| 黄色成年网站午夜在线观看| 欧美日韩国产一区二区三区三州| 国产精品久久久久久久精品视频| 11久久久久久久久久久| 在线新三级黄伊人网| 中文字幕免费在线免费| 亚洲日本一区二区久久久精品| 日韩视频一区二区免费观看| 真实国模和老外性视频| 免费在线观看污污视频网站| 热99re69精品8在线播放| 大陆胖女人与丈夫操b国语高清| 黄页网视频在线免费观看| 青青青青青手机视频| 欧美成人综合色在线噜噜| 欧美视频综合第一页| 国产精品国产三级国产精东| 99精品视频之69精品视频 | 欧美一级色视频美日韩| 北条麻妃av在线免费观看| 国产精品国产精品一区二区| 蜜臀av久久久久久久| 青青青青爽手机在线| 大骚逼91抽插出水视频| 国产亚洲精品视频合集| 天堂av狠狠操蜜桃| AV无码一区二区三区不卡| 粉嫩av蜜乳av蜜臀| 久久丁香婷婷六月天| 青草久久视频在线观看| 中文字幕在线视频一区二区三区 | 国内资源最丰富的网站| 在线观看视频污一区| 97国产在线av精品| 中文字幕av第1页中文字幕| 亚洲综合一区成人在线| 大鸡巴后入爆操大屁股美女| 国产精品入口麻豆啊啊啊| 成人国产激情自拍三区| 中出中文字幕在线观看| 9久在线视频只有精品| 98精产国品一二三产区区别| 黄色成年网站午夜在线观看 | 2022精品久久久久久中文字幕| 在线观看日韩激情视频| 天堂av狠狠操蜜桃| 亚洲精品 欧美日韩| 综合一区二区三区蜜臀| 亚洲一区制服丝袜美腿| av视屏免费在线播放| 夜夜操,天天操,狠狠操| 久草电影免费在线观看| 特大黑人巨大xxxx| 日本韩国亚洲综合日韩欧美国产| 人妻丝袜精品中文字幕| 少妇系列一区二区三区视频| 天天插天天色天天日| 极品性荡少妇一区二区色欲| 小泽玛利亚视频在线观看| 中文字母永久播放1区2区3区| nagger可以指黑人吗| 中文字幕在线一区精品| 国产日韩精品免费在线| 午夜精彩视频免费一区| 五月色婷婷综合开心网4438| 极品粉嫩小泬白浆20p主播 | 成人影片高清在线观看 | 99精品国产aⅴ在线观看| 91亚洲手机在线视频播放| 欧美va不卡视频在线观看| 成人精品视频99第一页| 欧美成人一二三在线网| 欧美一级片免费在线成人观看| 顶级尤物粉嫩小尤物网站| 亚洲精品ww久久久久久| 青青青青在线视频免费观看| 日本免费午夜视频网站| 日本性感美女三级视频| 国产大学生援交正在播放| 女同久久精品秋霞网| 国产福利小视频免费观看| 黄色黄色黄片78在线| 伊人日日日草夜夜草| 中国视频一区二区三区| 国产三级片久久久久久久 | 最新中文字幕乱码在线| 一区二区三区av高清免费| 亚洲最大黄 嗯色 操 啊| 2021天天色天天干| 黄片三级三级三级在线观看| 免费岛国喷水视频在线观看| 免费观看成年人视频在线观看| 中文字幕av熟女人妻| 亚洲特黄aaaa片| 成人H精品动漫在线无码播放| 人妻3p真实偷拍一二区| 1区2区3区不卡视频| 韩国一级特黄大片做受| rct470中文字幕在线| 一区二区三区av高清免费| 日本一道二三区视频久久| 久草视频首页在线观看| 操日韩美女视频在线免费看| 99精品久久久久久久91蜜桃| 人妻无码中文字幕专区| 喷水视频在线观看这里只有精品| 大香蕉大香蕉大香蕉大香蕉大香蕉 | 久久久久五月天丁香社区| 免费在线看的黄网站| 毛片一级完整版免费| 91福利视频免费在线观看| 亚洲成人黄色一区二区三区 | 噜噜色噜噜噜久色超碰| 精品国产在线手机在线| 九九视频在线精品播放| 天天干天天操天天插天天日| 天堂av在线播放免费| 国产精品中文av在线播放| 成人乱码一区二区三区av| 日本高清撒尿pissing| 美味人妻2在线播放| 亚洲 人妻 激情 中文| 天天干狠狠干天天操| 97人妻总资源视频| 日韩av有码中文字幕| 亚洲最大黄 嗯色 操 啊| 91成人在线观看免费视频| 久草极品美女视频在线观看| 亚洲区美熟妇久久久久| 端庄人妻堕落挣扎沉沦| 少妇ww搡性bbb91| 成熟熟女国产精品一区| 亚洲福利天堂久久久久久| 亚洲中文字幕校园春色| 日本啪啪啪啪啪啪啪| 美女张开腿让男生操在线看| nagger可以指黑人吗| 国产美女一区在线观看| 男生舔女生逼逼视频| 天天草天天色天天干| 亚洲1069综合男同| 国产变态另类在线观看| 91极品大一女神正在播放| 国产麻豆乱子伦午夜视频观看| 国产精品福利小视频a| 3337p日本欧洲大胆色噜噜| 天堂女人av一区二区| 日韩写真福利视频在线观看| 视频一区二区在线免费播放| 久久久久久久亚洲午夜综合福利 | 五十路息与子猛烈交尾视频 | 韩国黄色一级二级三级| 播放日本一区二区三区电影| 大香蕉日本伊人中文在线| 青青青青青免费视频| 夜夜骑夜夜操夜夜奸| 国产精品伦理片一区二区| 亚洲在线观看中文字幕av| 亚洲 国产 成人 在线| 丁香花免费在线观看中文字幕| 大陆胖女人与丈夫操b国语高清 | 欧美xxx成人在线| 亚洲激情,偷拍视频| 欧美专区日韩专区国产专区| 东游记中文字幕版哪里可以看到| 国产麻豆剧果冻传媒app| wwwxxx一级黄色片| 国产日韩av一区二区在线| 欧美怡红院视频在线观看| 亚洲男人在线天堂网| 欧美精品免费aaaaaa| 97国产在线观看高清| 人妻少妇亚洲一区二区| 一二三中文乱码亚洲乱码one | 521精品视频在线观看| 欧美黑人性猛交xxxxⅹooo| 黑人进入丰满少妇视频| 国产真实乱子伦a视频| 黄色视频在线观看高清无码| 精品少妇一二三视频在线| 日韩人妻在线视频免费| 日本少妇精品免费视频| 亚洲一区久久免费视频| 青青草原网站在线观看| 午夜激情久久不卡一区二区| av日韩在线免费播放| 黑人3p华裔熟女普通话| 精品视频中文字幕在线播放| 中国黄片视频一区91| 嫩草aⅴ一区二区三区| 亚洲精品无码色午夜福利理论片| 亚洲av色香蕉一区二区三区| 激情五月婷婷免费视频| 综合激情网激情五月五月婷婷| chinese国产盗摄一区二区| 51国产成人精品视频| 密臀av一区在线观看| 99热国产精品666| 99热这里只有精品中文| 一区二区三区av高清免费| 国产内射中出在线观看| 97小视频人妻一区二区| 欧洲国产成人精品91铁牛tv| 懂色av蜜桃a v| 亚洲综合在线观看免费| 丰满的继坶3中文在线观看| 午夜在线一区二区免费| 沈阳熟妇28厘米大战黑人| 中文字幕日韩91人妻在线| 国产三级片久久久久久久| 久久久极品久久蜜桃| 中文字幕av第1页中文字幕| 97人妻总资源视频| 亚洲欧美人精品高清| 热思思国产99re| 国产97视频在线精品| 亚洲成a人片777777| 午夜大尺度无码福利视频| 天天日天天添天天爽| 日日操夜夜撸天天干| 欧美一区二区三区激情啪啪啪 | 日本成人一区二区不卡免费在线| 中文字幕1卡1区2区3区| 青草亚洲视频在线观看| 美女被肏内射视频网站| 亚洲在线一区二区欧美| 天天日天天日天天擦| 国产1区,2区,3区| 欧美成人猛片aaaaaaa| 日本最新一二三区不卡在线| 精品91高清在线观看| 经典亚洲伊人第一页| 93精品视频在线观看| 青青草国内在线视频精选| 亚洲一区久久免费视频| 噜噜色噜噜噜久色超碰| 天天日天天舔天天射进去| 美女av色播在线播放| 在线新三级黄伊人网| 亚洲国产精品美女在线观看| 熟妇一区二区三区高清版| 国产中文字幕四区在线观看| gay gay男男瑟瑟在线网站| 在线免费观看99视频| 啪啪啪啪啪啪啪啪av| 中文字幕人妻三级在线观看| 天天干天天插天天谢| 香蕉av影视在线观看| avjpm亚洲伊人久久| 人妻最新视频在线免费观看| 绯色av蜜臀vs少妇| 国产精品成久久久久三级蜜臀av | 青草亚洲视频在线观看| 国产乱子伦精品视频潮优女| 青草青永久在线视频18| 亚洲国产成人最新资源| 欧美日韩人妻久久精品高清国产| 国产久久久精品毛片| av资源中文字幕在线观看| 999九九久久久精品| 亚洲福利精品福利精品福利| 国产在线91观看免费观看| 91快播视频在线观看| 亚洲午夜在线视频福利| 欧美精品亚洲精品日韩在线| 精品av国产一区二区三区四区| 亚洲综合自拍视频一区| 五色婷婷综合狠狠爱| 国产夫妻视频在线观看免费| 国产性感美女福利视频| 午夜大尺度无码福利视频| 蜜桃色婷婷久久久福利在线| 亚洲精品欧美日韩在线播放| 久久久久91精品推荐99| 久久www免费人成一看片| 婷婷久久久综合中文字幕| 亚洲人妻av毛片在线| 日韩精品中文字幕福利| 亚洲综合图片20p| 把腿张开让我插进去视频| 久碰精品少妇中文字幕av| 久久久精品精品视频视频| 欧美日韩精品永久免费网址| 中文字幕在线视频一区二区三区| 午夜dv内射一区区| 后入美女人妻高清在线| 91麻豆精品久久久久| 免费在线看的黄片视频| 精品黑人一区二区三区久久国产| 国产精品福利小视频a| 人妻无码色噜噜狠狠狠狠色| 姐姐的朋友2在线观看中文字幕| 人妻丝袜诱惑我操她视频| 欧美成人精品在线观看| 操日韩美女视频在线免费看| 91传媒一区二区三区| 91传媒一区二区三区| av完全免费在线观看av| 一区二区三区国产精选在线播放| 人妻凌辱欧美丰满熟妇| 国产又大又黄免费观看| 国产性感美女福利视频| 韩国女主播精品视频网站| 国产1区,2区,3区| 国产亚洲视频在线观看| 亚洲av日韩av网站| 999九九久久久精品| 免费福利av在线一区二区三区| 阴茎插到阴道里面的视频| 在线观看av2025| 中文字幕第三十八页久久| 国产一级精品综合av| 性色蜜臀av一区二区三区| 国产午夜男女爽爽爽爽爽视频| 亚洲1区2区3区精华液| 2020av天堂网在线观看| 午夜dv内射一区区| 国产美女午夜福利久久| 亚洲 色图 偷拍 欧美| 国产亚洲成人免费在线观看| 91国内精品久久久久精品一| 亚洲高清视频在线不卡| 亚洲精品麻豆免费在线观看 | 天码人妻一区二区三区在线看| 日本脱亚入欧是指什么| www,久久久,com| 人妻丝袜榨强中文字幕| 一区二区三区四区中文| 激情人妻校园春色亚洲欧美| 亚洲女人的天堂av| 一区二区三区日韩久久| 成人亚洲精品国产精品| 中文字幕中文字幕 亚洲国产| heyzo蜜桃熟女人妻| 天天干天天搞天天摸| av完全免费在线观看av| 亚洲人妻av毛片在线| 亚洲国产精品免费在线观看| jiujiure精品视频在线| av线天堂在线观看| 精品久久久久久久久久久久人妻 | 日本真人性生活视频免费看| 亚洲福利精品福利精品福利| 又色又爽又黄又刺激av网站| 黑人解禁人妻叶爱071| 天天日天天添天天爽| 日本丰满熟妇大屁股久久| 免费费一级特黄真人片| 国产精品久久久久网| 亚洲欧美激情国产综合久久久 | 91亚洲精品干熟女蜜桃频道| 91精品高清一区二区三区| 亚洲伊人色一综合网| 大香蕉日本伊人中文在线| 2025年人妻中文字幕乱码在线| 最新欧美一二三视频| 1769国产精品视频免费观看| 2o22av在线视频| 国产成人精品av网站| 蝴蝶伊人久久中文娱乐网| 中文字幕人妻一区二区视频| 嫩草aⅴ一区二区三区| 女同性ⅹxx女同h偷拍| 天天色天天爱天天爽| 视频二区在线视频观看| 丰满熟女午夜福利视频| 国产精品久久久久久久久福交| av手机免费在线观看高潮| 亚洲午夜电影在线观看| 免费在线观看视频啪啪| 国产白嫩美女一区二区| 老司机你懂得福利视频| 92福利视频午夜1000看| 免费在线观看污污视频网站| 国产高清在线观看1区2区| 黄页网视频在线免费观看| 一区二区三区四区视频| av在线免费观看亚洲天堂| 天天日夜夜操天天摸| 午夜福利资源综合激情午夜福利资| 国产精品黄页网站视频| 五月精品丁香久久久久福利社| 动漫美女的小穴视频| 青青社区2国产视频| 亚洲图片欧美校园春色| 操操网操操伊剧情片中文字幕网| 中文字幕日韩无敌亚洲精品| 亚洲av天堂在线播放| 又粗又长 明星操逼小视频| 97人妻夜夜爽二区欧美极品| 91高清成人在线视频| 激情色图一区二区三区| 国产精品一区二区三区蜜臀av| 色在线观看视频免费的| 欧美一区二区三区在线资源| 亚洲男人让女人爽的视频| av一区二区三区人妻| 18禁美女无遮挡免费| 国产男女视频在线播放| 日韩欧美亚洲熟女人妻| 亚洲天堂精品福利成人av| 中文字幕在线观看极品视频| 亚洲推理片免费看网站| 大香蕉伊人国产在线| 亚洲中文字幕乱码区| 欧美成人黄片一区二区三区 | 天干天天天色天天日天天射| 在线观看免费视频色97| 淫秽激情视频免费观看| 成年人中文字幕在线观看| 天天干天天爱天天色| 国产一级精品综合av| 亚洲欧美激情人妻偷拍| 亚洲天堂av最新网址| 欧美性受xx黑人性猛交| 日本啪啪啪啪啪啪啪| 免费啪啪啪在线观看视频| 一二三区在线观看视频| 亚洲国产最大av综合| 美女福利写真在线观看视频| 国产性感美女福利视频| 噜噜色噜噜噜久色超碰| 中国产一级黄片免费视频播放| 男人的天堂av日韩亚洲| 老司机欧美视频在线看| 成年午夜免费无码区| 日韩三级电影华丽的外出| 在线播放一区二区三区Av无码| 国产精品久久久久久久精品视频| 大鸡巴插入美女黑黑的阴毛| 好吊操视频这里只有精品| 美女大bxxxx内射| 青青伊人一精品视频| 日韩精品二区一区久久| 天天干天天日天天干天天操| 漂亮 人妻被中出中文| 91自产国产精品视频| 国产久久久精品毛片| 精品成人午夜免费看| 亚洲女人的天堂av| 色婷婷六月亚洲综合香蕉| 国产成人精品一区在线观看| 2o22av在线视频| 动漫精品视频在线观看| 亚洲午夜电影在线观看| 五月天色婷婷在线观看视频免费| 好太好爽好想要免费| 久久人人做人人妻人人玩精品vr| 农村胖女人操逼视频| 青青草视频手机免费在线观看| 搡老妇人老女人老熟女| 精品黑人巨大在线一区| 98精产国品一二三产区区别| 亚洲超碰97人人做人人爱| 精品一区二区三区欧美| 熟女少妇激情五十路| 成人av在线资源网站| asmr福利视频在线观看| 久久久久国产成人精品亚洲午夜| 这里有精品成人国产99| 亚洲av可乐操首页| 操人妻嗷嗷叫视频一区二区| 国产亚洲天堂天天一区| 精品国产高潮中文字幕| 这里有精品成人国产99| 11久久久久久久久久久| 黄片色呦呦视频免费看| 中国视频一区二区三区| 日日操综合成人av| 国产精品国产三级国产精东| 3344免费偷拍视频| 大学生A级毛片免费视频| 老司机深夜免费福利视频在线观看| 欧美综合婷婷欧美综合| 男人靠女人的逼视频| 久草极品美女视频在线观看| 55夜色66夜色国产精品站| 亚洲高清一区二区三区视频在线| 18禁精品网站久久| 中国熟女一区二区性xx| 黑人解禁人妻叶爱071| 国产密臀av一区二区三| 无套猛戳丰满少妇人妻| 大香蕉福利在线观看| 传媒在线播放国产精品一区| 成人国产影院在线观看| 人妻丝袜av在线播放网址| 边摸边做超爽毛片18禁色戒| 偷青青国产精品青青在线观看 | 日本一区精品视频在线观看| 国产 在线 免费 精品| 亚洲av可乐操首页| 人妻少妇亚洲精品中文字幕| 福利片区一区二体验区| 亚洲激情偷拍一区二区| 婷婷六月天中文字幕| 亚洲国产精品黑丝美女| 人妻丝袜诱惑我操她视频| 日日夜夜大香蕉伊人| 999九九久久久精品| 精品少妇一二三视频在线| 老熟妇凹凸淫老妇女av在线观看| 国产视频在线视频播放| 天天日天天干天天干天天日| 国产视频在线视频播放| 午夜精品在线视频一区| 热99re69精品8在线播放| 成人av电影免费版| 天天干天天插天天谢| 日本成人不卡一区二区| 九色porny九色9l自拍视频| 天天日天天干天天舔天天射| 91国语爽死我了不卡| 亚洲国产欧美一区二区三区久久| 1000小视频在线| 亚洲av日韩高清hd| 久久这里只有精品热视频| 大鸡巴插入美女黑黑的阴毛| 超碰97人人澡人人| 亚洲精品无码久久久久不卡| 国产精品福利小视频a| 初美沙希中文字幕在线| 又色又爽又黄的美女裸体| 北条麻妃av在线免费观看| 最新国产精品网址在线观看| 男女啪啪啪啪啪的网站| 国产va在线观看精品| 超级福利视频在线观看| 都市激情校园春色狠狠| 亚洲精品在线资源站| 97小视频人妻一区二区| 天堂av在线播放免费| 亚洲欧美国产综合777| 亚洲精品三级av在线免费观看| 伊人综合免费在线视频| 欧美日韩精品永久免费网址| 国产成人午夜精品福利| 国产午夜激情福利小视频在线| 2022中文字幕在线| 亚洲午夜高清在线观看| 中国老熟女偷拍第一页| 伊人综合aⅴ在线网| 宅男噜噜噜666国产| 免费十精品十国产网站| 自拍偷拍日韩欧美亚洲| 大香蕉大香蕉在线看| 男人天堂最新地址av| 亚洲国产最大av综合| 91精品啪在线免费| 黄色男人的天堂视频| 91天堂精品一区二区| 在线网站你懂得老司机| 骚货自慰被发现爆操| 亚洲熟女女同志女同| 绯色av蜜臀vs少妇| 亚洲一区二区三区久久受| 老司机欧美视频在线看| 天堂av在线播放免费| 欧美性感尤物人妻在线免费看| 青春草视频在线免费播放| 国产一线二线三线的区别在哪| 亚洲一区制服丝袜美腿| 欧美日本aⅴ免费视频| 欧美中文字幕一区最新网址| 91九色porny蝌蚪国产成人| 91综合久久亚洲综合| 动漫av网站18禁| 最新日韩av传媒在线| 天天躁日日躁狠狠躁躁欧美av| av中文字幕网址在线| 国产97视频在线精品| 在线国产日韩欧美视频| 欧洲黄页网免费观看| 国产精品3p和黑人大战| 老师啊太大了啊啊啊尻视频| 亚洲蜜臀av一区二区三区九色 | 日韩无码国产精品强奸乱伦| 欧美国品一二三产区区别| 国产亚洲欧美45p| 热思思国产99re| 日韩视频一区二区免费观看| 久久精品国产999| 亚洲国产在人线放午夜| 亚洲va天堂va国产va久| 青青青视频自偷自拍38碰| 天天日天天干天天搡| 亚洲综合在线观看免费| 国产视频精品资源网站| 日韩美在线观看视频黄| 啊啊啊想要被插进去视频| 亚洲码av无色中文| 亚洲欧美另类自拍偷拍色图| 99国产精品窥熟女精品| 国产亚洲欧美视频网站| 2020国产在线不卡视频| 国产不卡av在线免费| 美女 午夜 在线视频| 国产精品福利小视频a| 亚洲中文精品人人免费| 黑人大几巴狂插日本少妇| 国产成人自拍视频播放| 大鸡吧插逼逼视频免费看| 9国产精品久久久久老师| 精品国产成人亚洲午夜| 欧美偷拍自拍色图片| 最近的中文字幕在线mv视频| 国产成人精品一区在线观看| 亚洲av自拍偷拍综合| 色秀欧美视频第一页| 亚洲老熟妇日本老妇| 日韩欧美在线观看不卡一区二区 | 日本少妇的秘密免费视频| av在线观看网址av| 欧美精品 日韩国产| 亚洲的电影一区二区三区| 2022精品久久久久久中文字幕| 大香蕉大香蕉大香蕉大香蕉大香蕉| 日本少妇精品免费视频| huangse网站在线观看| 青青青青爽手机在线| 2020久久躁狠狠躁夜夜躁| 人妻自拍视频中国大陆| 国产精品视频一区在线播放| 日本熟妇色熟妇在线观看| 9l人妻人人爽人人爽| 欧美亚洲一二三区蜜臀| 亚洲国产欧美一区二区丝袜黑人| 偷偷玩弄新婚人妻h视频| 国产福利小视频二区| 无忧传媒在线观看视频| 激情综合治理六月婷婷| 91中文字幕最新合集| 日本av熟女在线视频| 亚洲av天堂在线播放| 午夜精彩视频免费一区| av天堂加勒比在线| 成熟熟女国产精品一区| 国产九色91在线观看精品| 人妻少妇中文有码精品| 亚洲av在线观看尤物| 88成人免费av网站| 免费观看丰满少妇做受| 亚洲 色图 偷拍 欧美| 91色秘乱一区二区三区| 人妻久久久精品69系列| 午夜激情高清在线观看| 男人天堂最新地址av| 中文字幕在线观看国产片| 亚洲高清一区二区三区视频在线| 精品区一区二区三区四区人妻| 水蜜桃一区二区三区在线观看视频 | 白白操白白色在线免费视频| 国产日韩精品免费在线| 一区二区麻豆传媒黄片| 水蜜桃一区二区三区在线观看视频| 人妻久久无码中文成人| 大胸性感美女羞爽操逼毛片| h国产小视频福利在线观看| 国产精彩对白一区二区三区| 97小视频人妻一区二区| 天天干天天插天天谢| 日日操综合成人av| 久久精品久久精品亚洲人| 成年人中文字幕在线观看| 中文字幕日韩精品就在这里| 天天日天天透天天操| 亚洲视频乱码在线观看| 超鹏97历史在线观看| rct470中文字幕在线| 一区二区三区国产精选在线播放| 强行扒开双腿猛烈进入免费版| 国产日韩一区二区在线看| 精品老妇女久久9g国产| 久久久麻豆精亚洲av麻花| 一区二区三区蜜臀在线| 超碰97人人做人人爱| 中文字幕高清免费在线人妻| 国产自拍黄片在线观看| 伊人精品福利综合导航| 亚洲无线观看国产高清在线| 日韩美av高清在线| 亚洲2021av天堂| 熟女人妻在线中出观看完整版| 干逼又爽又黄又免费的视频| 97青青青手机在线视频| 香港一级特黄大片在线播放| 国产美女精品福利在线| 人妻激情图片视频小说| 中文字幕在线乱码一区二区| 精品高潮呻吟久久av| 国产精品黄页网站视频| 天天日天天干天天舔天天射| www天堂在线久久| 国产亚洲视频在线观看| 国产男女视频在线播放| av在线资源中文字幕| 久草视频福利在线首页| 人人爽亚洲av人人爽av| 女生被男生插的视频网站| 大香蕉伊人国产在线| 经典亚洲伊人第一页| 99国内小视频在现欢看| 婷婷激情四射在线观看视频| 亚洲欧美激情中文字幕| 91极品大一女神正在播放| 久久久久久久久久性潮| 成人国产影院在线观看| 日本少妇在线视频大香蕉在线观看| 日韩成人综艺在线播放| 不卡一区一区三区在线| 视频久久久久久久人妻| av天堂加勒比在线| 色花堂在线av中文字幕九九| 亚洲国产精品久久久久蜜桃| 国产一区二区欧美三区| 中国熟女@视频91| 韩国男女黄色在线观看| 日日夜夜精品一二三| 国产又色又刺激在线视频| 99国内小视频在现欢看| 大鸡吧插逼逼视频免费看| 欧美专区第八页一区在线播放| 精品av久久久久久久| 青娱乐极品视频青青草| 欧美日韩人妻久久精品高清国产 | 青青伊人一精品视频| 蜜桃色婷婷久久久福利在线| 91片黄在线观看喷潮| 99热国产精品666| 欧美激情电影免费在线| 日韩伦理短片在线观看| 黑人巨大精品欧美视频| 91啪国自产中文字幕在线| 国产黄色大片在线免费播放 | 2021最新热播中文字幕| 熟女视频一区,二区,三区| 精品一区二区三区三区88| 国产精品中文av在线播放| 欧美精品中文字幕久久二区| 国产日韩精品免费在线| 亚洲激情,偷拍视频| 在线可以看的视频你懂的| 1769国产精品视频免费观看| 中文字幕AV在线免费看 | 大鸡巴插入美女黑黑的阴毛| 青青青国产免费视频| 韩国女主播精品视频网站| 日韩欧美一级aa大片| 亚洲av男人天堂久久| 最新日韩av传媒在线| 粉嫩欧美美人妻小视频| 韩国黄色一级二级三级| 999九九久久久精品| 久久久久久性虐视频| 青青青激情在线观看视频| 欧美日本国产自视大全| 欧美日韩中文字幕欧美| 日本少妇人妻xxxxxhd| 337p日本大胆欧美人| 午夜精品一区二区三区4| 一区二区三区欧美日韩高清播放| 大香蕉大香蕉大香蕉大香蕉大香蕉| 国产丰满熟女成人视频| 国内资源最丰富的网站| 欧美80老妇人性视频| 亚洲免费成人a v| 小泽玛利亚视频在线观看| 日本黄色三级高清视频| 成人色综合中文字幕| 国产美女精品福利在线| 在线视频精品你懂的| 亚洲国产欧美一区二区三区久久| 91精品免费久久久久久| 天天射,天天操,天天说| 91精品国产麻豆国产| 欧美3p在线观看一区二区三区| 精品91自产拍在线观看一区| 2021年国产精品自拍| 男人和女人激情视频| 欧美综合婷婷欧美综合| 国产精品一区二区av国| 天干天天天色天天日天天射 | 亚洲精品国品乱码久久久久| 亚洲视频在线观看高清| 社区自拍揄拍尻屁你懂的| 91在线视频在线精品3| 五十路息与子猛烈交尾视频| 亚洲国产精品免费在线观看| 国产午夜亚洲精品不卡在线观看| 亚洲国产中文字幕啊啊啊不行了 | 精品久久久久久久久久中文蒉| 最新中文字幕乱码在线| 日韩av有码中文字幕| 视频一区 二区 三区 综合| 黑人乱偷人妻中文字幕| 精品suv一区二区69| 亚洲精品av在线观看| 精品国产成人亚洲午夜| 少妇深喉口爆吞精韩国| 成熟熟女国产精品一区| 国产一区二区在线欧美| 女同性ⅹxx女同h偷拍| 任你操任你干精品在线视频| 91国产资源在线视频| 久久麻豆亚洲精品av| av乱码一区二区三区| 顶级尤物粉嫩小尤物网站| 成人伊人精品色xxxx视频| 视频啪啪啪免费观看| 国产午夜男女爽爽爽爽爽视频 | 女生被男生插的视频网站| 视频一区二区三区高清在线| japanese日本熟妇另类| 干逼又爽又黄又免费的视频| 亚洲av色图18p| 国产变态另类在线观看| 国产三级精品三级在线不卡| 亚洲精品久久综合久| 78色精品一区二区三区| 国产av国片精品一区二区| jiujiure精品视频在线| 欧美成人精品欧美一级黄色| 一区二区麻豆传媒黄片| 亚洲2021av天堂| 男人的天堂一区二区在线观看| 成人色综合中文字幕| 亚洲欧美国产综合777| 91精品啪在线免费| 亚洲午夜伦理视频在线 | 四川乱子伦视频国产vip| 免费岛国喷水视频在线观看| 极品粉嫩小泬白浆20p主播 | 亚洲成人三级在线播放| 91片黄在线观看喷潮| chinese国产盗摄一区二区| 成人sm视频在线观看| 国产在线观看免费人成短视频| 啪啪啪啪啪啪啪啪av| 天天日天天舔天天射进去| 欧美xxx成人在线| 亚洲欧美成人综合视频| 骚逼被大屌狂草视频免费看| 亚洲伊人av天堂有码在线| 97人妻夜夜爽二区欧美极品| 韩国男女黄色在线观看| 久久久久久cao我的性感人妻| 国产午夜激情福利小视频在线| 9国产精品久久久久老师| 在线免费91激情四射| 天天色天天操天天透| 成人30分钟免费视频| 久久精品久久精品亚洲人| 美女小视频网站在线| av破解版在线观看| 亚洲一区二区三区久久午夜| 久久精品亚洲国产av香蕉| 国产精品国产三级麻豆| 99av国产精品欲麻豆| 国产揄拍高清国内精品对白 | 午夜精品一区二区三区4| 亚洲欧美综合另类13p| 国产精品熟女久久久久浪潮| 91大神福利视频网| 亚洲男人让女人爽的视频| 亚洲av男人天堂久久| 97青青青手机在线视频| 日韩精品激情在线观看| 2020韩国午夜女主播在线| 在线视频自拍第三页| 清纯美女在线观看国产| 国产乱弄免费视频观看| 超级福利视频在线观看| 国产麻豆精品人妻av| 91国内视频在线观看| 国产高清精品一区二区三区| 在线免费91激情四射| 91天堂天天日天天操| 干逼又爽又黄又免费的视频| 亚洲一级av无码一级久久精品| 91精品综合久久久久3d动漫| 一区国内二区日韩三区欧美| 青青青艹视频在线观看| 丝袜美腿视频诱惑亚洲无| 午夜福利人人妻人人澡人人爽| 青青青青青青青青青国产精品视频| 一区二区三区综合视频| 天天日天天干天天干天天日| 91麻豆精品久久久久| 在线不卡成人黄色精品| 亚洲一区二区三区五区| 精品美女久久久久久| 成年女人免费播放视频| 鸡巴操逼一级黄色气| 国产伦精品一区二区三区竹菊| 亚洲国产成人最新资源| 亚洲 中文字幕在线 日韩| 91精品免费久久久久久| 天天操天天弄天天射| 大尺度激情四射网站| 色婷婷精品大在线观看| 日韩人妻在线视频免费| 2018在线福利视频| 国产高清女主播在线| 亚洲熟女久久久36d| 91精品激情五月婷婷在线| 精品国产污污免费网站入口自| av久久精品北条麻妃av观看| av在线shipin| 国产激情av网站在线观看| 大尺度激情四射网站| 美女福利写真在线观看视频| 老司机福利精品视频在线| 成人av电影免费版| 青青青视频手机在线观看| 精品国产乱码一区二区三区乱| 国产免费高清视频视频| 亚洲视频在线观看高清| 日本男女操逼视频免费看| 日本裸体熟妇区二区欧美| 中国把吊插入阴蒂的视频| 日韩欧美亚洲熟女人妻| 亚洲国产精品黑丝美女| 自拍偷区二区三区麻豆| 天天色天天操天天舔| 青青青青操在线观看免费| chinese国产盗摄一区二区 | 国产清纯美女al在线| 2012中文字幕在线高清| 免费啪啪啪在线观看视频| wwwxxx一级黄色片| 特级无码毛片免费视频播放| 1769国产精品视频免费观看| 天天插天天狠天天操| 热思思国产99re| 日本一区美女福利视频| 日韩亚国产欧美三级涩爱| 中文字幕中文字幕人妻| 91人妻精品一区二区在线看| 国产黄色片在线收看| caoporm超碰国产| 在线观看亚洲人成免费网址| 欧美亚洲国产成人免费在线| 成人免费公开视频无毒| 日本少妇人妻xxxxxhd| 精品av国产一区二区三区四区| 精品成人啪啪18免费蜜臀| 成人资源在线观看免费官网| 亚洲精品欧美日韩在线播放| 国产自拍黄片在线观看| 中文字幕av一区在线观看| 91小伙伴中女熟女高潮| 青青青视频手机在线观看| 97人妻色免费视频| 久久久制服丝袜中文字幕| av视屏免费在线播放| 91久久人澡人人添人人爽乱| 欧美80老妇人性视频| 天天日天天鲁天天操| 激情五月婷婷综合色啪| 日韩精品中文字幕在线| 欧美一区二区三区久久久aaa| 999热精品视频在线| 福利午夜视频在线观看| 五十路熟女av天堂| 日本www中文字幕| 无码精品一区二区三区人| 亚洲综合一区二区精品久久| 91老师蜜桃臀大屁股| 91九色国产熟女一区二区| 天天日天天透天天操| 国产高潮无码喷水AV片在线观看| av一区二区三区人妻| 欧美老妇精品另类不卡片| 韩国亚洲欧美超一级在线播放视频| 免费无毒热热热热热热久| 岛国毛片视频免费在线观看| 午夜影院在线观看视频羞羞羞| 成人av久久精品一区二区| 99热99这里精品6国产| 亚洲av男人的天堂你懂的| 少妇人妻真实精品视频| 18禁无翼鸟成人在线| 国产污污污污网站在线| 一区二区三区日本伦理| 国产97在线视频观看| 91精品国产观看免费| 又粗又硬又猛又爽又黄的| 国产精品大陆在线2019不卡| 一区二区视频在线观看免费观看| 在线 中文字幕 一区| 喷水视频在线观看这里只有精品| 久久这里只有精彩视频免费| 9久在线视频只有精品| 天码人妻一区二区三区在线看| av日韩在线免费播放| 亚洲色偷偷综合亚洲AV伊人 | 日本福利午夜电影在线观看| 老鸭窝在线观看一区| 无码日韩人妻精品久久| 91片黄在线观看喷潮| 日本阿v视频在线免费观看| 日本中文字幕一二区视频| 97欧洲一区二区精品免费 | 日本韩国免费一区二区三区视频 | 成人蜜臀午夜久久一区| 欧美aa一级一区三区四区| 国产精彩福利精品视频| 国产亚洲欧美另类在线观看| 日日夜夜精品一二三| 五十路息与子猛烈交尾视频| 精品一区二区三区三区色爱| 成年人中文字幕在线观看| 女生自摸在线观看一区二区三区 | 亚洲中文精品人人免费| www日韩a级s片av| 亚洲天堂有码中文字幕视频| 国产精品久久久黄网站| 日本少妇在线视频大香蕉在线观看| 亚洲熟色妇av日韩熟色妇在线| 九九视频在线精品播放| 亚洲成人熟妇一区二区三区| 国产一区二区三免费视频| 成人动漫大肉棒插进去视频| 高清成人av一区三区| 天天干天天日天天干天天操| 2022中文字幕在线| 天堂av狠狠操蜜桃| 最后99天全集在线观看| 人妻在线精品录音叫床| 日韩精品一区二区三区在线播放| 91色老99久久九九爱精品| 都市家庭人妻激情自拍视频| 无码中文字幕波多野不卡| 中字幕人妻熟女人妻a62v网| jul—619中文字幕在线| 国产午夜男女爽爽爽爽爽视频 | 久久久精品999精品日本| 日韩人妻丝袜中文字幕| 青青青青草手机在线视频免费看| 亚洲av无硬久久精品蜜桃| 欧美日韩激情啪啪啪| 色吉吉影音天天干天天操 | avjpm亚洲伊人久久| 亚洲第一黄色在线观看| 超碰97免费人妻麻豆| 在线新三级黄伊人网| 97瑟瑟超碰在线香蕉| 不戴胸罩引我诱的隔壁的人妻| 国产自拍黄片在线观看| 91免费观看国产免费| 欧美香蕉人妻精品一区二区| 精品久久久久久久久久久a√国产| 天天夜天天日天天日| 大香蕉伊人国产在线| 涩涩的视频在线观看视频| 亚洲欧美成人综合在线观看| 91久久国产成人免费网站| 福利午夜视频在线观看| 亚洲va欧美va人人爽3p| 亚洲欧美人精品高清| 久久国产精品精品美女| 欧美精品欧美极品欧美视频| 亚洲av日韩精品久久久| 91成人精品亚洲国产| 中文字幕+中文字幕| 激情图片日韩欧美人妻| 国产成人精品久久二区91| 91中文字幕免费在线观看| 2012中文字幕在线高清| 欧美老妇精品另类不卡片| 中文字幕av一区在线观看 | 免费费一级特黄真人片| 日本一道二三区视频久久| 天天日天天摸天天爱| 亚洲av男人天堂久久| 亚洲中文字幕国产日韩| 日本美女成人在线视频| 99久久超碰人妻国产| 国产一级麻豆精品免费| 99热这里只有国产精品6| 护士特殊服务久久久久久久| 日韩一区二区电国产精品| 免费大片在线观看视频网站| 亚洲狠狠婷婷综合久久app| 男人天堂最新地址av| 亚洲欧美一区二区三区电影| av久久精品北条麻妃av观看| 不卡精品视频在线观看| 在线免费观看99视频| 青青社区2国产视频| 99热久久这里只有精品8| 国产黄色片蝌蚪九色91| 亚洲福利精品福利精品福利| 超碰在线观看免费在线观看| 婷婷午夜国产精品久久久| 天天日天天干天天干天天日| 色婷婷综合激情五月免费观看| 国产清纯美女al在线| 100%美女蜜桃视频| 熟女人妻一区二区精品视频| 18禁精品网站久久| 97黄网站在线观看| 中文字幕之无码色多多| 涩爱综合久久五月蜜臀| av天堂中文免费在线| 99一区二区在线观看| 热思思国产99re| 最新97国产在线视频| 大胸性感美女羞爽操逼毛片| 热久久只有这里有精品| 亚洲国产在人线放午夜| 动漫av网站18禁| 18禁美女黄网站色大片下载| 91久久精品色伊人6882| 99久久成人日韩欧美精品| 国产综合视频在线看片| av中文字幕在线观看第三页| 亚洲在线免费h观看网站| 懂色av之国产精品| 亚洲国产欧美一区二区丝袜黑人| 天天干天天搞天天摸| 欧美美女人体视频一区| 日本免费一级黄色录像| 一本一本久久a久久精品综合不卡| 91快播视频在线观看| yy96视频在线观看| 日本三极片中文字幕| 在线免费观看视频一二区| 97小视频人妻一区二区| japanese五十路熟女熟妇| 国产精品福利小视频a| 青青青青青青草国产| 亚洲福利天堂久久久久久| 欧美专区日韩专区国产专区| 18禁污污污app下载| 天堂中文字幕翔田av| 婷婷色国产黑丝少妇勾搭AV| 在线视频自拍第三页| 亚洲第一黄色在线观看| 青青青青视频在线播放| 伊人综合aⅴ在线网| 青青草国内在线视频精选| 青青青视频自偷自拍38碰| 这里只有精品双飞在线播放| 天天色天天操天天透| 日韩美女福利视频网| 国产白袜脚足J棉袜在线观看| 免费无毒热热热热热热久| 换爱交换乱高清大片| 大陆精品一区二区三区久久| 91中文字幕免费在线观看| 成人国产小视频在线观看| 国产亚洲视频在线观看| 久久久久国产成人精品亚洲午夜| 在线观看免费视频网| 中文字幕午夜免费福利视频| 中国熟女@视频91| 我想看操逼黄色大片| 少妇高潮无套内谢麻豆| 亚洲综合另类精品小说| 护士特殊服务久久久久久久| 老师啊太大了啊啊啊尻视频| 性欧美激情久久久久久久| 亚洲精品色在线观看视频| 特大黑人巨大xxxx| 2025年人妻中文字幕乱码在线| 日本少妇在线视频大香蕉在线观看| 天天插天天狠天天操| 免费av岛国天堂网站| 国产精品中文av在线播放| 不卡一不卡二不卡三| 亚洲国际青青操综合网站| 亚洲国产精品黑丝美女| 天堂av在线播放免费| 91快播视频在线观看| 97超碰国语国产97超碰| 精品成人啪啪18免费蜜臀| 美女视频福利免费看| 久久久极品久久蜜桃| 亚洲熟妇x久久av久久| 天天操天天干天天日狠狠插| 日韩欧美国产一区不卡| 老熟妇凹凸淫老妇女av在线观看| 中文字幕第三十八页久久| 中文字幕一区二 区二三区四区| 国产三级影院在线观看| 美女吃鸡巴操逼高潮视频| 欧美日韩高清午夜蜜桃大香蕉| 欧美80老妇人性视频| 天天日天天玩天天摸| 婷婷色国产黑丝少妇勾搭AV| 欧美偷拍自拍色图片| 国产又粗又硬又大视频| 老司机在线精品福利视频| 999九九久久久精品| 1769国产精品视频免费观看| 欧美伊人久久大香线蕉综合| 日本脱亚入欧是指什么| 99精品国自产在线人| 日日爽天天干夜夜操| 三级黄色亚洲成人av| 日韩视频一区二区免费观看| 天堂av在线最新版在线| 端庄人妻堕落挣扎沉沦| 沙月文乃人妻侵犯中文字幕在线| 天天日天天鲁天天操| 精品乱子伦一区二区三区免费播| 92福利视频午夜1000看| 中文人妻AV久久人妻水| 成人精品在线观看视频| 东游记中文字幕版哪里可以看到| 欧美另类重口味极品在线观看| 成人免费毛片aaaa| 欧美精品免费aaaaaa| 红桃av成人在线观看| 国产精品一区二区三区蜜臀av | 免费大片在线观看视频网站| 超碰97人人澡人人| 日韩中文字幕在线播放第二页| 91 亚洲视频在线观看| 含骚鸡巴玩逼逼视频| 国产精品sm调教视频| 男生舔女生逼逼的视频| 又色又爽又黄又刺激av网站| 99av国产精品欲麻豆| 色花堂在线av中文字幕九九| 国产精品成久久久久三级蜜臀av | 午夜激情精品福利视频| 精品区一区二区三区四区人妻| 国产精品黄大片在线播放| 骚逼被大屌狂草视频免费看| 丰满少妇翘臀后进式| 国产又粗又黄又硬又爽| 熟女在线视频一区二区三区| av高潮迭起在线观看| 国产夫妻视频在线观看免费| 18禁污污污app下载| 老司机福利精品免费视频一区二区| 男人和女人激情视频| 天天日天天操天天摸天天舔| 一二三中文乱码亚洲乱码one| 国产乱子伦精品视频潮优女| 啪啪啪啪啪啪啪啪av| 天天通天天透天天插| 狠狠躁狠狠爱网站视频| 欧美成一区二区三区四区| 亚洲熟女女同志女同| 97超碰人人搞人人| 99re6热在线精品| 人妻自拍视频中国大陆| 77久久久久国产精产品| 天码人妻一区二区三区在线看| 精品一区二区亚洲欧美| 午夜场射精嗯嗯啊啊视频| 天天日天天做天天日天天做| 午夜精品久久久久久99热| 欧美精品亚洲精品日韩在线| 欧美一区二区中文字幕电影| 青娱乐在线免费视频盛宴| 老司机福利精品视频在线| 日本少妇的秘密免费视频| aaa久久久久久久久| 大陆精品一区二区三区久久| 欧洲日韩亚洲一区二区三区 | 最新日韩av传媒在线| 国产性生活中老年人视频网站| 亚洲一区二区人妻av| 国产之丝袜脚在线一区二区三区 | 国产一区二区欧美三区| 干逼又爽又黄又免费的视频| 一区二区久久成人网| 91人妻精品久久久久久久网站| 青青青视频自偷自拍38碰| 亚洲成人激情视频免费观看了| 国产精品欧美日韩区二区 | 日本熟妇一区二区x x| 中文字幕AV在线免费看 | 老司机福利精品视频在线| 日本av高清免费网站| 日韩av中文在线免费观看| 久久艹在线观看视频| 免费大片在线观看视频网站| aⅴ精产国品一二三产品| 18禁美女黄网站色大片下载| 久久久极品久久蜜桃| 亚洲av色图18p| 黄色视频在线观看高清无码 | av中文在线天堂精品| 日本真人性生活视频免费看| 操人妻嗷嗷叫视频一区二区| huangse网站在线观看| 亚洲激情偷拍一区二区| 91天堂精品一区二区| 亚洲在线一区二区欧美| 午夜免费体验区在线观看| 一二三中文乱码亚洲乱码one| 521精品视频在线观看| 久青青草视频手机在线免费观看 | 视频一区二区三区高清在线| 色偷偷伊人大杳蕉综合网 | 玖玖一区二区在线观看| 99一区二区在线观看| 亚洲自拍偷拍综合色| caoporn蜜桃视频| 伊人开心婷婷国产av| 久久热久久视频在线观看| 日韩a级精品一区二区| 欧美第一页在线免费观看视频| 天堂女人av一区二区| 欧美亚洲国产成人免费在线| av中文字幕福利网| 2019av在线视频| 国产乱子伦一二三区| 亚洲一区二区三区精品视频在线 | 91av精品视频在线| 午夜91一区二区三区| 和邻居少妇愉情中文字幕| 亚洲日本一区二区久久久精品| 亚洲精品三级av在线免费观看| 91免费黄片可看视频| 3344免费偷拍视频| 亚洲av天堂在线播放| 国产黑丝高跟鞋视频在线播放| 亚洲 国产 成人 在线| 超碰公开大香蕉97| 性感美女诱惑福利视频| 欧美日本aⅴ免费视频| 日韩精品中文字幕播放| 2020中文字幕在线播放| 在线观看操大逼视频| av俺也去在线播放| 亚洲欧美久久久久久久久| 亚洲国产在线精品国偷产拍| 久久久久国产成人精品亚洲午夜| 久久精品亚洲成在人线a| 亚洲成人国产av在线| 亚洲1069综合男同| 亚洲天堂第一页中文字幕| 在线播放 日韩 av| 丝袜肉丝一区二区三区四区在线看| 涩涩的视频在线观看视频| 少妇人妻久久久久视频黄片| 日本美女成人在线视频| 中文字幕1卡1区2区3区| 搡老妇人老女人老熟女| jiujiure精品视频在线| 粉嫩小穴流水视频在线观看|