Python實(shí)現(xiàn)將MySQL中所有表的數(shù)據(jù)都導(dǎo)出為CSV文件并壓縮
Python將MySQL數(shù)據(jù)庫中所有表的數(shù)據(jù)都導(dǎo)出為CSV文件到一個(gè)目錄,并壓縮為zip文件到另一個(gè)目錄下,然后解壓縮這個(gè)目錄中的所有zip文件到第三個(gè)目錄下。不使用Pandas庫,需要考慮SQL結(jié)果集是大數(shù)據(jù)量分批數(shù)據(jù)導(dǎo)出的情況,通過多線程和異步操作來提高程序性能,程序需要異常處理和輸出,輸出出錯(cuò)時(shí)的錯(cuò)誤信息,每次每個(gè)查詢導(dǎo)出數(shù)據(jù)的運(yùn)行狀態(tài)和表數(shù)據(jù)行數(shù)以及運(yùn)行時(shí)間戳,導(dǎo)出時(shí)間,輸出每個(gè)文件記錄數(shù)量的日志。
該腳本已在考慮大數(shù)據(jù)量、異常處理和性能優(yōu)化的基礎(chǔ)上進(jìn)行了全面設(shè)計(jì),能夠處理大多數(shù)常見場(chǎng)景。根據(jù)具體需求可進(jìn)一步調(diào)整批量大?。╞atch_size)和線程數(shù)(max_workers)以獲得最佳性能。
import os
import csv
import zipfile
import logging
import mysql.connector
from datetime import datetime
import time
import concurrent.futures
import glob
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('data_export.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def export_table_to_csv(table_name, csv_path, db_config, batch_size=1000):
"""導(dǎo)出單個(gè)表的數(shù)據(jù)到CSV文件,分批處理"""
conn = None
cursor = None
total_rows = 0
try:
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
# 獲取數(shù)據(jù)并寫入CSV
with open(csv_path, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.writer(csvfile)
# 執(zhí)行查詢并獲取列名
cursor.execute(f"SELECT * FROM `{table_name}`")
columns = [col[0] for col in cursor.description]
writer.writerow(columns)
# 分批獲取數(shù)據(jù)
while True:
rows = cursor.fetchmany(batch_size)
if not rows:
break
writer.writerows(rows)
total_rows += len(rows)
logger.debug(f"{table_name} 已導(dǎo)出 {total_rows} 行")
logger.info(f"{table_name} CSV導(dǎo)出完成,總行數(shù):{total_rows}")
return total_rows
except Exception as e:
logger.error(f"導(dǎo)出表 {table_name} 失敗: {str(e)}", exc_info=True)
raise
finally:
if cursor:
cursor.close()
if conn and conn.is_connected():
conn.close()
def compress_to_zip(source_path, zip_path):
"""壓縮文件為ZIP格式"""
try:
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
zipf.write(source_path, arcname=os.path.basename(source_path))
logger.info(f"成功壓縮 {source_path} 到 {zip_path}")
except Exception as e:
logger.error(f"壓縮 {source_path} 失敗: {str(e)}", exc_info=True)
raise
def process_table(table_name, db_config, csv_dir, zip_dir):
"""處理單個(gè)表的導(dǎo)出和壓縮"""
start_time = time.time()
logger.info(f"開始處理表: {table_name}")
status = "成功"
rows_exported = 0
try:
# 定義文件路徑
csv_filename = f"{table_name}.csv"
zip_filename = f"{table_name}.zip"
csv_path = os.path.join(csv_dir, csv_filename)
zip_path = os.path.join(zip_dir, zip_filename)
# 導(dǎo)出CSV
rows_exported = export_table_to_csv(table_name, csv_path, db_config)
# 壓縮文件
compress_to_zip(csv_path, zip_path)
except Exception as e:
status = f"失敗: {str(e)}"
# 清理可能存在的中間文件
for path in [csv_path, zip_path]:
if path and os.path.exists(path):
try:
os.remove(path)
logger.warning(f"已清理文件: {path}")
except Exception as clean_error:
logger.error(f"清理文件失敗: {clean_error}")
finally:
duration = time.time() - start_time
log_message = (
f"表處理完成 - 表名: {table_name}, "
f"狀態(tài): {status}, "
f"導(dǎo)出行數(shù): {rows_exported}, "
f"耗時(shí): {duration:.2f}秒"
)
logger.info(log_message)
def unzip_files(zip_dir, unzip_dir):
"""解壓指定目錄中的所有ZIP文件"""
zip_files = glob.glob(os.path.join(zip_dir, '*.zip'))
if not zip_files:
logger.warning("未找到ZIP文件,跳過解壓")
return
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for zip_path in zip_files:
futures.append(executor.submit(
lambda: extract_zip(zip_path, unzip_dir)
))
for future in concurrent.futures.as_completed(futures):
try:
future.result()
except Exception as e:
logger.error(f"解壓過程中發(fā)生錯(cuò)誤: {str(e)}")
def extract_zip(zip_path, unzip_dir):
"""解壓單個(gè)ZIP文件"""
try:
start_time = time.time()
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(unzip_dir)
duration = time.time() - start_time
logger.info(f"解壓完成: {zip_path} => {unzip_dir} (耗時(shí): {duration:.2f}秒)")
except Exception as e:
logger.error(f"解壓 {zip_path} 失敗: {str(e)}", exc_info=True)
raise
def main():
# 配置參數(shù)
db_config = {
'host': 'localhost',
'user': 'your_username',
'password': 'your_password',
'database': 'your_database'
}
# 目錄配置
base_dir = os.path.dirname(os.path.abspath(__file__))
csv_dir = os.path.join(base_dir, 'csv_exports')
zip_dir = os.path.join(base_dir, 'zip_archives')
unzip_dir = os.path.join(base_dir, 'unzipped_files')
# 創(chuàng)建目錄
for dir_path in [csv_dir, zip_dir, unzip_dir]:
os.makedirs(dir_path, exist_ok=True)
logger.info(f"目錄已準(zhǔn)備: {dir_path}")
# 獲取所有表名
try:
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
logger.info(f"發(fā)現(xiàn) {len(tables)} 個(gè)需要處理的表")
except Exception as e:
logger.error(f"獲取數(shù)據(jù)庫表失敗: {str(e)}", exc_info=True)
return
finally:
if 'cursor' in locals():
cursor.close()
if 'conn' in locals() and conn.is_connected():
conn.close()
# 處理所有表(多線程導(dǎo)出和壓縮)
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
futures = []
for table in tables:
futures.append(executor.submit(
process_table,
table,
db_config,
csv_dir,
zip_dir
))
# 處理任務(wù)結(jié)果
for future in concurrent.futures.as_completed(futures):
try:
future.result()
except Exception as e:
logger.error(f"表處理異常: {str(e)}")
# 解壓所有ZIP文件(多線程解壓)
logger.info("開始解壓所有ZIP文件")
unzip_files(zip_dir, unzip_dir)
logger.info("全部處理流程完成")
if __name__ == "__main__":
main()
關(guān)鍵特性說明:
1.分批處理大數(shù)據(jù):
- 使用fetchmany(batch_size)分批獲取數(shù)據(jù)(默認(rèn)每批1000行)
- 流式處理減少內(nèi)存占用
2.多線程處理:
- 使用ThreadPoolExecutor并行處理不同表的導(dǎo)出和壓縮
- 獨(dú)立的數(shù)據(jù)庫連接池(每個(gè)線程有自己的連接)
- 并行解壓處理
3.異常處理:
- 全面的try-except塊覆蓋所有關(guān)鍵操作
- 自動(dòng)清理失敗時(shí)產(chǎn)生的中間文件
- 詳細(xì)的錯(cuò)誤日志記錄(包含堆棧跟蹤)
4.日志記錄:
- 同時(shí)輸出到文件和終端
- 記錄時(shí)間戳、操作類型、狀態(tài)、耗時(shí)等關(guān)鍵信息
- 包含每個(gè)表的處理結(jié)果統(tǒng)計(jì)
5.文件管理:
- 自動(dòng)創(chuàng)建所需目錄
- 使用ZIP_DEFLATED進(jìn)行高效壓縮
- 安全的文件路徑處理
6.性能優(yōu)化:
- 使用服務(wù)器端游標(biāo)避免內(nèi)存過載
- 可配置的批量大小和線程數(shù)
- 異步I/O操作
使用說明:
安裝依賴:
pip install mysql-connector-python
修改配置:
更新db_config中的數(shù)據(jù)庫連接信息
根據(jù)需要調(diào)整目錄路徑(csv_dir, zip_dir, unzip_dir)
運(yùn)行腳本:
python script.py
查看日志:
實(shí)時(shí)終端輸出
詳細(xì)日志文件data_export.log
擴(kuò)展建議:
通過命令行參數(shù)接受數(shù)據(jù)庫配置和目錄路徑
添加郵件通知功能(處理完成或失敗時(shí)通知)
實(shí)現(xiàn)斷點(diǎn)續(xù)傳功能
添加文件校驗(yàn)(MD5校驗(yàn)和)
支持配置文件(YAML/JSON格式)
添加進(jìn)度條顯示
到此這篇關(guān)于Python實(shí)現(xiàn)將MySQL中所有表的數(shù)據(jù)都導(dǎo)出為CSV文件并壓縮的文章就介紹到這了,更多相關(guān)Python MySQL數(shù)據(jù)導(dǎo)出為CSV內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- 使用python將csv數(shù)據(jù)導(dǎo)入mysql數(shù)據(jù)庫
- 使用python的pandas庫讀取csv文件保存至mysql數(shù)據(jù)庫
- Python之csv文件從MySQL數(shù)據(jù)庫導(dǎo)入導(dǎo)出的方法
- python 從csv讀數(shù)據(jù)到mysql的實(shí)例
- Python實(shí)現(xiàn)將MySQL數(shù)據(jù)庫表中的數(shù)據(jù)導(dǎo)出生成csv格式文件的方法
- 利用Python批量導(dǎo)出mysql數(shù)據(jù)庫表結(jié)構(gòu)的操作實(shí)例
相關(guān)文章
python實(shí)現(xiàn)動(dòng)態(tài)創(chuàng)建類的方法分析
這篇文章主要介紹了python實(shí)現(xiàn)動(dòng)態(tài)創(chuàng)建類的方法,結(jié)合實(shí)例形式分析了Python動(dòng)態(tài)創(chuàng)建類的原理、實(shí)現(xiàn)方法及相關(guān)操作技巧,需要的朋友可以參考下2019-06-06
python多線程并發(fā)讓兩個(gè)LED同時(shí)亮的方法
今天小編就為大家分享一篇python多線程并發(fā)讓兩個(gè)LED同時(shí)亮的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2019-02-02
Python任意字符串轉(zhuǎn)16, 32, 64進(jìn)制的方法
今天小編就為大家分享一篇Python任意字符串轉(zhuǎn)16, 32, 64進(jìn)制的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2019-06-06
將自己的數(shù)據(jù)集制作成TFRecord格式教程
今天小編就為大家分享一篇將自己的數(shù)據(jù)集制作成TFRecord格式教程,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-02-02
Python 通過requests實(shí)現(xiàn)騰訊新聞抓取爬蟲的方法
今天小編就為大家分享一篇Python 通過requests實(shí)現(xiàn)騰訊新聞抓取爬蟲的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2019-02-02
python使用redis實(shí)現(xiàn)消息隊(duì)列(異步)的實(shí)現(xiàn)完整例程
本文主要介紹了python使用redis實(shí)現(xiàn)消息隊(duì)列(異步)的實(shí)現(xiàn)完整例程,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-01-01
Python實(shí)現(xiàn)爬取馬云的微博功能示例
這篇文章主要介紹了Python實(shí)現(xiàn)爬取馬云的微博功能,結(jié)合實(shí)例形式較為詳細(xì)的分析了Python模擬ajax請(qǐng)求爬取馬云微博的相關(guān)操作技巧與注意事項(xiàng),需要的朋友可以參考下2019-02-02

