Python应用随笔3——pyspark读写数据库
    		       		warning:
    		            这篇文章距离上次修改已过420天,其中的内容可能已经有所变动。
    		        
        		                
                
from pyspark.sql import SparkSession
from pyspark.sql.types import *
 
# 初始化Spark会话
spark = SparkSession.builder \
    .appName("pyspark_example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
 
# 定义一个DataFrame结构
schema = StructType([
    StructField("id", LongType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    # 添加其他字段...
])
 
# 读取数据库中的数据
df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://host:port/database") \
    .option("dbtable", "schema.table") \
    .option("user", "username") \
    .option("password", "password") \
    .option("driver", "org.postgresql.Driver") \
    .load()
 
# 显示读取的数据
df.show()
 
# 写入数据到数据库
df.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://host:port/database") \
    .option("dbtable", "schema.table") \
    .option("user", "username") \
    .option("password", "password") \
    .option("driver", "org.postgresql.Driver") \
    .save()
 
# 停止Spark会话
spark.stop()这段代码展示了如何使用PySpark读取和写入数据库。需要注意的是,这里使用的是PostgreSQL数据库,并且需要提前添加PostgreSQL的JDBC驱动到Spark的classpath中。此外,需要根据实际情况替换数据库的连接信息和表名。
评论已关闭