今天我们来讲讲spark中的udf函数,我们经常会碰到某些需求需要我们对数据中某个字段的数据做自定义处理,UDF:User Defined Function,下面我用实际代码举两个例子:
两种方法:
1 dataframe中使用
# 声明自定义方法def json_decode(string): decode_json = json.loads(string) if bool(decode_json): return decode_json.get('channel-v2', '') else: return ''# 注册udfconvertUDF = udf(lambda z: json_decode(z))# 使用udfdf = df.withColumn('channel_v2', convertUDF(col('ext')))
2 sparksql中使用
# 注册udfspark.udf.register("markToId", lambda x: int(x, 16), IntegerType())# 创建临时表df.createOrReplaceTempView("mark_table")# sparksql中使用mark_df = ( spark.sql( "select markToId(mark_id) as mark_id,count(log_id) as pv,count(DISTINCT browser_id) as uv from mark_table) )