消息队列kafka中间件详解:案例解析(第10天)
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object KafkaWordCount {
def main(args: Array[String]) {
// 初始化Spark配置
val conf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val ssc = new StreamingContext(sc, Seconds(20))
// 配置Kafka参数
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_unique_group_id_here",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
// 定义topic
val topics = Array("wordcount-input")
// 使用Direct方式从Kafka读取数据
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
// 将数据流中的内容进行分词,并统计词频
val words = stream.flatMap(_.value().split(" "))
.map(word => (word, 1))
val wordCounts = words.reduceByKey(_ + _)
// 输出结果到控制台
wordCounts.print()
// 启动流计算
ssc.start()
ssc.awaitTermination()
}
}
这段代码使用Apache Spark Streaming从Kafka中读取数据,并进行简单的词频统计。它展示了如何配置Kafka参数,如何创建Direct方式的数据流,并使用Spark的转换操作进行数据处理。这是学习Spark Streaming与Kafka集成的一个很好的起点。
评论已关闭