Spark编程实验五:Spark Structured Streaming编程
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode
object StructuredNetworkWordCount {
def main(args: Array[String]) {
// 创建SparkSession
val spark = SparkSession
.builder
.appName("StructuredNetworkWordCount")
.getOrCreate()
// 设置日志级别
spark.sparkContext.setLogLevel("ERROR")
// 定义流数据读取来源和格式
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
// 将文本行转换为单词,并对单词进行聚合
val words = lines.as[String](spark.implicits.newStringEncoder) .flatMap(_.split(" "))
.groupBy("value")
.count()
// 设置输出模式和输出目标(控制台)
words.writeStream
.outputMode(OutputMode.Complete())
.format("console")
.start()
.awaitTermination()
}
}
这段代码使用Spark Structured Streaming来创建一个简单的网络单词计数程序。它从本地主机的9999端口上的socket读取数据,并统计接收到的文本中的单词频率。统计结果会输出到控制台。注意,在运行此代码之前,需要确保在本地主机的9999端口上有一个应用程序在发送数据。
评论已关闭