使用spark的dataframe进行计算时有时需要添加新的列。本文介绍两种添加新列的方法,比较常见的一种方法是调用dataframe的withColumn方法,但是该方法存在一定的限制,即新添加的列只能根据现有列转换得到;另一种方法是利用UDF(user defined function)模块。下面结合例子进行说明,现有预测得到的pm2.5数据,需要添加其他污染项目的预测数据及预测时间。
1、withColumn
dataframe的withColumn方法可以用于添加新的列,但是新的列仅能根据现有列计算得到。
yHat = yHat.withColumn("pm25", yHat["pm25"]*(maxValue - minValue) + minValue)
yHat = yHat.withColumn("pm10", yHat["pm25"] + 10)
yHat = yHat.withColumn("CO", yHat["pm25"] + 20)
yHat = yHat.withColumn("NO2", yHat["pm25"] + 30)
yHat = yHat.withColumn("NO", yHat["pm25"] + 40)
yHat = yHat.withColumn("SO2", yHat["pm25"] + 50)
图1
2、udf
除了withColumn方法,还可以利用spark的udf模块添加新的列。在本例中,还需要添加相应的时间列,此时withColumn方法并不适用,需要导入udf方法,该方法有两个参数,分别为自定义的函数名及返回值类型。
global idx
idx = 0
date = gettime()
def set_date(x):
global idx # 将idx设置为全局变量
if x is not None:
idx += 1
return date[idx - 1]
index = udf(set_date, StringType())
yHat = yHat.withColumn("date", index(yHat["pm25"]))
图2