首页 热点专区 小学知识 中学知识 出国留学 考研考公
您的当前位置:首页正文

spark dataframe添加新的列

2024-12-18 来源:要发发知识网

使用spark的dataframe进行计算时有时需要添加新的列。本文介绍两种添加新列的方法,比较常见的一种方法是调用dataframe的withColumn方法,但是该方法存在一定的限制,即新添加的列只能根据现有列转换得到;另一种方法是利用UDF(user defined function)模块。下面结合例子进行说明,现有预测得到的pm2.5数据,需要添加其他污染项目的预测数据及预测时间。

1、withColumn

dataframe的withColumn方法可以用于添加新的列,但是新的列仅能根据现有列计算得到。

yHat = yHat.withColumn("pm25", yHat["pm25"]*(maxValue - minValue) + minValue)
yHat = yHat.withColumn("pm10", yHat["pm25"] + 10)
yHat = yHat.withColumn("CO", yHat["pm25"] + 20)
yHat = yHat.withColumn("NO2", yHat["pm25"] + 30)
yHat = yHat.withColumn("NO", yHat["pm25"] + 40)
yHat = yHat.withColumn("SO2", yHat["pm25"] + 50)
图1

2、udf

除了withColumn方法,还可以利用spark的udf模块添加新的列。在本例中,还需要添加相应的时间列,此时withColumn方法并不适用,需要导入udf方法,该方法有两个参数,分别为自定义的函数名及返回值类型。

global idx
idx = 0
date = gettime()
def set_date(x):
    global idx  # 将idx设置为全局变量
    if x is not None:
        idx += 1
        return date[idx - 1]
index = udf(set_date, StringType())
yHat = yHat.withColumn("date", index(yHat["pm25"]))
图2
显示全文