Ways to write UDF for Spark

UDF stands for User-Defined Function. You can write custom function to ask Spark to do more complex thing for you.

First way

The first way is to write a normal function, then making it a UDF by calling udf(custom_function, <return type>)

def rename(col):
return "some prefix: " + str(col)

rename_udf = udf(rename, StringType())
print sample_data_df.select('features').show()
print sample_data_df.select(rename_udf("features")).show()

+--------------------+
|            features|
+--------------------+
|[[0,mouse], [1,bl...|
|[[0,cat], [1,tabb...|
|[[0,bear], [1,bla...|
+--------------------+

+--------------------------+
|PythonUDF#rename(features)|
+--------------------------+
|      some prefix: [Row...|
|      some prefix: [Row...|
|      some prefix: [Row...|
+--------------------------+

Second way: returning a UDFAnother way of writing the UDF is you can write a function returning a UDF. Pay attention to rename_udf()("features"), because the rename_udf function returning a UDF. Then this UDF will be executed with the column features passing into it. That’s why we needs ()("features")

def rename_udf():
return udf(lambda col: "new string")

print sample_data_df.select(rename_udf()("features")).show()

UDF with extra input parameters

def rename_udf(string_to_replace):
return udf(lambda col: string_to_replace)

new_string = "hello"
print sample_data_df.select(rename_udf(new_string)("features")).show()

+----------------------------+
|PythonUDF#&lt;lambda&gt;(features)|
+----------------------------+
|                       hello|
|                       hello|
|                       hello|
+----------------------------+

More complicated example with passing the broadcast variable

one_hot_encoding() take every single Row, and transform it into one-hot-encoding value.

to_ohe() is an UDF, it take every single Row, and call the one_hot_encoding() function on that row. And then it takes the returned value, and make a new dataframe based on the returned value for every single row.

def to_ohe(ohe_dict_broadcast):
length = len(ohe_dict_broadcast.value.keys())
return udf(lambda x: one_hot_encoding(x, ohe_dict_broadcast, length), VectorUDT())

print "Original dataset"
print sample_data_df.show(truncate=False)

sample_ohe_dict_manual_broadcast = sc.broadcast(sample_ohe_dict_manual)
print "Dataset with one-hot-encoding"
print sample_data_df.select(to_ohe(sample_ohe_dict_manual_broadcast)("features")).show(truncate=False)

Original dataset
+---------------------------------+
|features                         |
+---------------------------------+
|[[0,mouse], [1,black]]           |
|[[0,cat], [1,tabby], [2,mouse]]  |
|[[0,bear], [1,black], [2,salmon]]|
+---------------------------------+

Dataset with one-hot-encoding
+----------------------------+
|PythonUDF#&lt;lambda&gt;(features)|
+----------------------------+
|(7,[2,3],[1.0,1.0])         |
|(7,[1,4,5],[1.0,1.0,1.0])   |
|(7,[0,3,6],[1.0,1.0,1.0])   |
+----------------------------+

None

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s