Ways to write UDF for Spark

UDF stands for User-Defined Function. You can write custom function to ask Spark to do more complex thing for you.

First way

The first way is to write a normal function, then making it a UDF by calling udf(custom_function, <return type>)

def rename(col):
return "some prefix: " + str(col)

rename_udf = udf(rename, StringType())
print sample_data_df.select('features').show()
print sample_data_df.select(rename_udf("features")).show()

+--------------------+
|            features|
+--------------------+
|[[0,mouse], [1,bl...|
|[[0,cat], [1,tabb...|
|[[0,bear], [1,bla...|
+--------------------+

+--------------------------+
|PythonUDF#rename(features)|
+--------------------------+
|      some prefix: [Row...|
|      some prefix: [Row...|
|      some prefix: [Row...|
+--------------------------+

Second way: returning a UDFAnother way of writing the UDF is you can write a function returning a UDF. Pay attention to rename_udf()("features"), because the rename_udf function returning a UDF. Then this UDF will be executed with the column features passing into it. That’s why we needs ()("features")

def rename_udf():
return udf(lambda col: "new string")

print sample_data_df.select(rename_udf()("features")).show()

UDF with extra input parameters

def rename_udf(string_to_replace):
return udf(lambda col: string_to_replace)

new_string = "hello"
print sample_data_df.select(rename_udf(new_string)("features")).show()

+----------------------------+
|PythonUDF#&lt;lambda&gt;(features)|
+----------------------------+
|                       hello|
|                       hello|
|                       hello|
+----------------------------+

More complicated example with passing the broadcast variable

one_hot_encoding() take every single Row, and transform it into one-hot-encoding value.

to_ohe() is an UDF, it take every single Row, and call the one_hot_encoding() function on that row. And then it takes the returned value, and make a new dataframe based on the returned value for every single row.

def to_ohe(ohe_dict_broadcast):
length = len(ohe_dict_broadcast.value.keys())
return udf(lambda x: one_hot_encoding(x, ohe_dict_broadcast, length), VectorUDT())

print "Original dataset"
print sample_data_df.show(truncate=False)

sample_ohe_dict_manual_broadcast = sc.broadcast(sample_ohe_dict_manual)
print "Dataset with one-hot-encoding"
print sample_data_df.select(to_ohe(sample_ohe_dict_manual_broadcast)("features")).show(truncate=False)

Original dataset
+---------------------------------+
|features                         |
+---------------------------------+
|[[0,mouse], [1,black]]           |
|[[0,cat], [1,tabby], [2,mouse]]  |
|[[0,bear], [1,black], [2,salmon]]|
+---------------------------------+

Dataset with one-hot-encoding
+----------------------------+
|PythonUDF#&lt;lambda&gt;(features)|
+----------------------------+
|(7,[2,3],[1.0,1.0])         |
|(7,[1,4,5],[1.0,1.0,1.0])   |
|(7,[0,3,6],[1.0,1.0,1.0])   |
+----------------------------+

None

PySpark and Jupyter

Problem: how to run PySpark in Jupyter notebook.

Some assumption before starting:

  • You have Anaconda installed.
  • You have Spark installed. District Data Lab has an exceptional article on how to get started with Spark in Python. It’s long, but detailed.
  • pyspark is in the $PATHvariable.

There are 2 solutions:

  1. The first one, it modified the environment variable that pyspark read. Then the jupyter/ipython notebook with pyspark environment would be started instead of pyspark console.
  2. The second one is installing the separate spark kernel for Jupyter. This way is more flexible, because the spark-kernel from IBM This solution is better because this spark kernel can run code in Scala, Python, Java, SparkSQL.

Continue reading “PySpark and Jupyter”