Python应用随笔3——pyspark读写数据库
from pyspark.sql import SparkSession
from pyspark.sql.types import *
# 初始化Spark会话
spark = SparkSession.builder \
.appName("pyspark_example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
# 定义一个DataFrame结构
schema = StructType([
StructField("id", LongType(), True),
StructField("first_name", StringType(), True),
StructField("last_name", StringType(), True),
# 添加其他字段...
])
# 读取数据库中的数据
df = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://host:port/database") \
.option("dbtable", "schema.table") \
.option("user", "username") \
.option("password", "password") \
.option("driver", "org.postgresql.Driver") \
.load()
# 显示读取的数据
df.show()
# 写入数据到数据库
df.write \
.format("jdbc") \
.option("url", "jdbc:postgresql://host:port/database") \
.option("dbtable", "schema.table") \
.option("user", "username") \
.option("password", "password") \
.option("driver", "org.postgresql.Driver") \
.save()
# 停止Spark会话
spark.stop()
这段代码展示了如何使用PySpark读取和写入数据库。需要注意的是,这里使用的是PostgreSQL数据库,并且需要提前添加PostgreSQL的JDBC驱动到Spark的classpath中。此外,需要根据实际情况替换数据库的连接信息和表名。
评论已关闭