Spark 7:Spark SQL 函数定义
在Spark SQL中,可以通过spark.udf.register
方法注册一个UDF(用户自定义函数),然后在Spark SQL查询中使用这个自定义函数。以下是一个简单的例子:
假设我们有一个自定义函数my_function
,它接受一个整数并返回一个字符串。
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql import SparkSession
# 创建Spark会话
spark = SparkSession.builder.appName("example").getOrCreate()
# 定义自定义函数
def my_function(i):
if i > 0:
return "Positive"
elif i < 0:
return "Negative"
else:
return "Zero"
# 注册UDF
spark.udf.register("myFunction", my_function, StringType())
# 使用UDF创建DataFrame
data = [(1,), (-1,), (0,)]
df = spark.createDataFrame(data, ["value"])
# 使用UDF
df.selectExpr("value", "myFunction(value) as sign").show()
在上述代码中,我们首先定义了一个名为my_function
的Python函数。然后,我们使用udf
装饰器将其转换为UDF,并通过spark.udf.register
方法注册。最后,我们创建了一个DataFrame,并在查询中使用了这个UDF。这个例子展示了如何在Spark SQL中定义和使用自定义函数。
评论已关闭