Python操作Spark常用命令指南
Python操作Spark的常用命令指南,涵蓋從環(huán)境配置到數(shù)據(jù)分析的核心操作。
一、環(huán)境配置與Spark初始化
PySpark是Apache Spark的Python API,它結(jié)合了Python的易用性和Spark的分布式計(jì)算能力。
1. 安裝與基礎(chǔ)導(dǎo)入
# 安裝PySpark !pip install pyspark # 導(dǎo)入必要的庫(kù) from pyspark.sql import SparkSession import pyspark.sql.functions as F # 常用函數(shù) import pyspark.sql.types as T # 數(shù)據(jù)類型
2. 創(chuàng)建SparkSession
SparkSession是與Spark集群交互的統(tǒng)一入口點(diǎn),絕大多數(shù)操作都從這里開(kāi)始。
spark = SparkSession.builder \
.appName("MySparkApp") \ # 設(shè)置應(yīng)用名稱
.config("spark.driver.memory", "2g") \ # 配置參數(shù)(可選)
.getOrCreate()
3. 基礎(chǔ)環(huán)境檢查
# 檢查Spark版本
print(spark.version)
# 查看當(dāng)前配置
print(spark.conf.getAll())
# 列出數(shù)據(jù)庫(kù)和表
spark.sql("SHOW DATABASES").show()
spark.sql("USE your_database") # 切換數(shù)據(jù)庫(kù)
spark.sql("SHOW TABLES").show()
二、核心操作:數(shù)據(jù)讀寫
1. 創(chuàng)建DataFrame
有多種方式創(chuàng)建DataFrame,這是Spark中的核心數(shù)據(jù)結(jié)構(gòu)。
# 方式1:從Python列表創(chuàng)建
data = [("Alice", 29), ("Bob", 35)]
columns = ["name", "age"]
df = spark.createDataFrame(data, schema=columns)
# 方式2:指定詳細(xì)模式(Schema)
schema = T.StructType([
T.StructField("name", T.StringType(), True),
T.StructField("age", T.IntegerType(), True)
])
df = spark.createDataFrame(data, schema=schema)2. 從文件讀取數(shù)據(jù)
PySpark支持CSV、Parquet、JSON等多種格式。
# 讀取CSV文件(常用)
df = spark.read.csv(
"path/to/file.csv",
header=True, # 第一行作為列名
inferSchema=True # 自動(dòng)推斷列類型
)
# 讀取Parquet文件(列式存儲(chǔ),高效)
df = spark.read.parquet("path/to/file.parquet")
# 讀取JSON文件
df = spark.read.json("path/to/file.json")3. 數(shù)據(jù)寫入與保存
# 保存為Parquet格式(推薦,壓縮率高)
df.write.mode("overwrite").parquet("output_path.parquet")
# 保存為CSV格式
df.write.mode("overwrite") \
.option("header", True) \
.csv("output_path.csv")
# 保存為Spark表(可在集群中持久化)
df.write.saveAsTable("table_name")三、數(shù)據(jù)處理與轉(zhuǎn)換
數(shù)據(jù)處理的核心是對(duì)DataFrame進(jìn)行列和行的操作。
1. 列操作
# 選擇特定列
df.select("name", "age").show()
# 創(chuàng)建新列(示例:年齡加1)
df = df.withColumn("age_plus_one", F.col("age") + 1)
# 重命名列
df = df.withColumnRenamed("old_name", "new_name")
# 更改列類型(示例:整型轉(zhuǎn)字符串)
df = df.withColumn("age_str", F.col("age").cast(T.StringType()))
# 刪除列
df = df.drop("column_to_remove")2. 行操作(過(guò)濾與排序)
# 過(guò)濾行(示例:年齡大于30)
df_filtered = df.filter(F.col("age") > 30)
# 等價(jià)寫法
df_filtered = df.where(df["age"] > 30)
# 排序
df_sorted = df.orderBy(F.col("age").desc()) # 按年齡降序3. 處理缺失值
# 刪除包含任何空值的行
df_clean = df.dropna()
# 填充缺失值(示例:用0填充特定列)
df_filled = df.fillna({"age": 0, "name": "Unknown"})四、數(shù)據(jù)聚合與高級(jí)分析
1. 分組與聚合
這是數(shù)據(jù)分析中最常用的操作之一。
# 基礎(chǔ)分組聚合(示例:按部門計(jì)算平均工資)
df.groupBy("department").agg(
F.avg("salary").alias("avg_salary"),
F.count("*").alias("employee_count")
).show()
2. 連接(Join)操作
用于合并兩個(gè)DataFrame。
# 假設(shè)有另一個(gè)部門信息表df_dept
df_joined = df.join(
df_dept,
on="department_id", # 連接鍵
how="inner" # 連接方式:inner, left, right, outer等
)
3. 窗口函數(shù)
用于計(jì)算排名、移動(dòng)平均等高級(jí)分析。
from pyspark.sql.window import Window
# 定義窗口:按部門分區(qū),按工資降序
window_spec = Window.partitionBy("department").orderBy(F.col("salary").desc())
# 計(jì)算部門內(nèi)工資排名
df.withColumn("salary_rank", F.rank().over(window_spec)).show()4. 用戶自定義函數(shù)(UDF)
當(dāng)內(nèi)置函數(shù)無(wú)法滿足需求時(shí)使用。
from pyspark.sql.functions import udf
# 定義Python函數(shù)
def categorize_age(age):
return "Young" if age < 30 else "Senior"
# 注冊(cè)為UDF(需指定返回類型)
categorize_udf = udf(categorize_age, T.StringType())
# 應(yīng)用UDF
df.withColumn("age_group", categorize_udf(F.col("age"))).show()五、在PySpark中運(yùn)行SQL查詢
你可以直接在PySpark中執(zhí)行SQL語(yǔ)句,這為熟悉SQL的用戶提供了便利。
# 將DataFrame注冊(cè)為臨時(shí)視圖
df.createOrReplaceTempView("people")
# 執(zhí)行SQL查詢
result = spark.sql("""
SELECT department, AVG(salary) as avg_sal
FROM people
WHERE age > 25
GROUP BY department
ORDER BY avg_sal DESC
""")
result.show()六、性能優(yōu)化與最佳實(shí)踐
- 避免數(shù)據(jù)混洗:
groupBy、join等操作可能導(dǎo)致數(shù)據(jù)在節(jié)點(diǎn)間大量移動(dòng),應(yīng)盡量減少這類操作或提前過(guò)濾數(shù)據(jù)。 - 選擇合適的數(shù)據(jù)格式:生產(chǎn)環(huán)境中,Parquet通常是比CSV更好的選擇,因?yàn)樗С至惺酱鎯?chǔ)和謂詞下推,能顯著提高查詢性能。
- 利用緩存:對(duì)需要多次使用的中間結(jié)果進(jìn)行緩存。
df.cache() # 將DataFrame緩存到內(nèi)存 df.unpersist() # 使用后釋放緩存
- 及時(shí)關(guān)閉會(huì)話:處理完成后關(guān)閉SparkSession以釋放資源。
spark.stop()
七、一個(gè)完整的示例
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# 1. 初始化
spark = SparkSession.builder.appName("Example").getOrCreate()
# 2. 讀取數(shù)據(jù)
df = spark.read.csv("sales.csv", header=True, inferSchema=True)
# 3. 數(shù)據(jù)處理
result = (df
.filter(F.col("amount") > 100) # 篩選大額交易
.groupBy("region", "product") # 按地區(qū)和產(chǎn)品分組
.agg(F.sum("amount").alias("total_sales")) # 計(jì)算總銷售額
.orderBy(F.col("total_sales").desc()) # 按銷售額降序排序
)
# 4. 輸出
result.show()
result.write.mode("overwrite").parquet("sales_summary.parquet")
# 5. 清理
spark.stop()到此這篇關(guān)于Python操作Spark常用命令指南的文章就介紹到這了,更多相關(guān)python spark命令內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Django錯(cuò)誤:TypeError at / ''bool'' object is not callable解決
這篇文章主要介紹了Django 錯(cuò)誤:TypeError at / 'bool' object is not callable解決,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-08-08
Python基礎(chǔ)之?dāng)?shù)據(jù)結(jié)構(gòu)詳解
這篇文章主要介紹了Python基礎(chǔ)之?dāng)?shù)據(jù)結(jié)構(gòu)詳解,文中有非常詳細(xì)的代碼示例,對(duì)正在學(xué)習(xí)python基礎(chǔ)的小伙伴們有非常好的幫助,需要的朋友可以參考下2021-04-04
Python自己定義一個(gè)求累加和的函數(shù)方式
定義了一個(gè)函數(shù)來(lái)計(jì)算不確定數(shù)量參數(shù)的累加和,使用`*args`來(lái)接收不定數(shù)量的位置參數(shù),函數(shù)內(nèi)部通過(guò)遍歷`args`并使用`sum()`函數(shù)計(jì)算所有參數(shù)的累加和,運(yùn)行結(jié)果是累加和的值2025-11-11
Python Traceback異常代碼排錯(cuò)利器使用指南
這篇文章主要為大家介紹了Python Traceback異常代碼排錯(cuò)利器使用指南,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2024-01-01
PyCharm使用Docker鏡像搭建Python開(kāi)發(fā)環(huán)境
這篇文章主要介紹了PyCharm使用Docker鏡像搭建Python開(kāi)發(fā)環(huán)境,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-12-12
Python基于FastAPI和WebSocket實(shí)現(xiàn)實(shí)時(shí)聊天應(yīng)用
這篇文章主要為大家詳細(xì)介紹了Python如何基于FastAPI和WebSocket實(shí)現(xiàn)實(shí)時(shí)聊天應(yīng)用,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2025-04-04
使用matplotlib動(dòng)態(tài)刷新指定曲線實(shí)例
這篇文章主要介紹了使用matplotlib動(dòng)態(tài)刷新指定曲線實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-04-04

