MongoDB Spark Connector 使用指南
import org.apache.spark.sql.SparkSession
import com.mongodb.spark.config._
// 创建 SparkSession
val spark = SparkSession.builder()
.appName("MongoDB Spark Connector Example")
.getOrCreate()
// 连接到 MongoDB
val readConfig = Map(
"spark.mongodb.input.uri" -> "mongodb://localhost:27017/database.collection",
"spark.mongodb.output.uri" -> "mongodb://localhost:27017/database.collection"
)
// 读取 MongoDB 数据
val df = spark.read.format("com.mongodb.spark.sql.DefaultSource").options(readConfig).load()
// 对数据进行处理
// ...
// 将处理后的数据写回 MongoDB
df.write.format("com.mongodb.spark.sql.DefaultSource").options(readConfig).save()
// 关闭 SparkSession
spark.stop()
这段代码展示了如何使用MongoDB Spark Connector在Spark中读取MongoDB数据库中的数据,进行简单处理,并将处理后的数据保存回MongoDB。代码首先创建了一个SparkSession,然后定义了连接MongoDB所需的配置,接着加载数据,进行操作,并保存数据。最后关闭了SparkSession。
评论已关闭