UDF stands for User-Defined Function. You can write custom function to ask Spark to do more complex thing for you.
First way
The first way is to write a normal function, then making it a UDF by calling udf(custom_function, <return type>)
def rename(col): return "some prefix: " + str(col) rename_udf = udf(rename, StringType()) print sample_data_df.select('features').show() print sample_data_df.select(rename_udf("features")).show() +--------------------+ | features| +--------------------+ |[[0,mouse], [1,bl...| |[[0,cat], [1,tabb...| |[[0,bear], [1,bla...| +--------------------+ +--------------------------+ |PythonUDF#rename(features)| +--------------------------+ | some prefix: [Row...| | some prefix: [Row...| | some prefix: [Row...| +--------------------------+
Second way: returning a UDFAnother way of writing the UDF is you can write a function returning a UDF. Pay attention to rename_udf()("features")
, because the rename_udf
function returning a UDF. Then this UDF will be executed with the column features
passing into it. That’s why we needs ()("features")
def rename_udf(): return udf(lambda col: "new string") print sample_data_df.select(rename_udf()("features")).show()
UDF with extra input parameters
def rename_udf(string_to_replace): return udf(lambda col: string_to_replace) new_string = "hello" print sample_data_df.select(rename_udf(new_string)("features")).show() +----------------------------+ |PythonUDF#<lambda>(features)| +----------------------------+ | hello| | hello| | hello| +----------------------------+
More complicated example with passing the broadcast variable
one_hot_encoding()
take every single Row, and transform it into one-hot-encoding value.
to_ohe()
is an UDF, it take every single Row, and call the one_hot_encoding()
function on that row. And then it takes the returned value, and make a new dataframe based on the returned value for every single row.
def to_ohe(ohe_dict_broadcast): length = len(ohe_dict_broadcast.value.keys()) return udf(lambda x: one_hot_encoding(x, ohe_dict_broadcast, length), VectorUDT()) print "Original dataset" print sample_data_df.show(truncate=False) sample_ohe_dict_manual_broadcast = sc.broadcast(sample_ohe_dict_manual) print "Dataset with one-hot-encoding" print sample_data_df.select(to_ohe(sample_ohe_dict_manual_broadcast)("features")).show(truncate=False) Original dataset +---------------------------------+ |features | +---------------------------------+ |[[0,mouse], [1,black]] | |[[0,cat], [1,tabby], [2,mouse]] | |[[0,bear], [1,black], [2,salmon]]| +---------------------------------+ Dataset with one-hot-encoding +----------------------------+ |PythonUDF#<lambda>(features)| +----------------------------+ |(7,[2,3],[1.0,1.0]) | |(7,[1,4,5],[1.0,1.0,1.0]) | |(7,[0,3,6],[1.0,1.0,1.0]) | +----------------------------+ None