基于Hadoop的云计算与大数据处理(Spark Streaming WordCount)
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object WordCount {
def main(args: Array[String]) {
// 检查输入参数
if (args.length < 1) {
System.err.println("Usage: WordCount <master> <hostname> <port>")
System.exit(1)
}
// 初始化Spark配置和Streaming上下文
val sparkConf = new SparkConf().setAppName("WordCount").setMaster(args(0))
val ssc = new StreamingContext(sparkConf, Seconds(args(2).toInt))
// 创建输入数据流
val lines = ssc.socketTextStream(args(1), args(2).toInt)
// 执行词频统计
val words = lines.flatMap(_.split(" "))
.map(word => (word, 1))
val wordCounts = words.reduceByKey(_ + _)
// 打印结果并启动接收数据
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
这段代码演示了如何使用Spark Streaming库来进行实时的词频统计。它接收一个主节点(master),主机名(hostname)和端口号(port)作为参数,并通过这个端口号从指定主机接收文本数据流。代码中的flatMap
, map
, reduceByKey
是Spark Streaming操作的例子,它们用于将数据流转换为词频形式,并在控制台打印出来。
评论已关闭