import org.apache.spark.SparkConf
import org.apache.spark.api.java.JavaSparkContext
import org.elasticsearch.spark.rdd.EsSpark
object SparkEsIntegration {
def main(args: Array[String]): Unit = {
// 初始化Spark配置
val conf = new SparkConf().setAppName("SparkEsIntegration").setMaster("local")
val jsc = new JavaSparkContext(conf)
// 指定Elasticsearch索引和类型
val index = "spark_test_index"
val `type` = "spark_test_type"
// 创建一个包含文档的RDD
val data = jsc.parallelize(Seq("Spark", "Elasticsearch", "Integration"))
// 将RDD保存到Elasticsearch
EsSpark.saveJsonToEs(data, Seq(index, `type`))
// 执行全文搜索
val query = s"""{"query": {"match": {"_all": "Spark"}}}"""
val searchResults = EsSpark.esJsonRDD(jsc, index, `type`, query)
// 输出搜索结果
searchResults.collect().foreach(println)
// 关闭Spark上下文
jsc.stop()
}
}
这段代码展示了如何在Spark应用程序中使用Elasticsearch。首先,我们创建了一个Spark配置并初始化了一个JavaSparkContext。然后,我们指定了Elasticsearch索引和类型。接着,我们创建了一个包含文档的RDD,并使用EsSpark.saveJsonToEs
方法将其保存到Elasticsearch。最后,我们执行了一个全文搜索,并输出了搜索结果。这个例子简单明了地展示了如何将Spark与Elasticsearch集成,并进行数据的索引和搜索操作。