Ways to write UDF for Spark

UDF stands for User-Defined Function. You can write custom function to ask Spark to do more complex thing for you.

First way

The first way is to write a normal function, then making it a UDF by calling udf(custom_function, <return type>)

def rename(col):
return "some prefix: " + str(col)

rename_udf = udf(rename, StringType())
print sample_data_df.select('features').show()
print sample_data_df.select(rename_udf("features")).show()

|            features|
|[[0,mouse], [1,bl...|
|[[0,cat], [1,tabb...|
|[[0,bear], [1,bla...|

|      some prefix: [Row...|
|      some prefix: [Row...|
|      some prefix: [Row...|

Second way: returning a UDFAnother way of writing the UDF is you can write a function returning a UDF. Pay attention to rename_udf()("features"), because the rename_udf function returning a UDF. Then this UDF will be executed with the column features passing into it. That’s why we needs ()("features")

def rename_udf():
return udf(lambda col: "new string")

print sample_data_df.select(rename_udf()("features")).show()

UDF with extra input parameters

def rename_udf(string_to_replace):
return udf(lambda col: string_to_replace)

new_string = "hello"
print sample_data_df.select(rename_udf(new_string)("features")).show()

|                       hello|
|                       hello|
|                       hello|

More complicated example with passing the broadcast variable

one_hot_encoding() take every single Row, and transform it into one-hot-encoding value.

to_ohe() is an UDF, it take every single Row, and call the one_hot_encoding() function on that row. And then it takes the returned value, and make a new dataframe based on the returned value for every single row.

def to_ohe(ohe_dict_broadcast):
length = len(ohe_dict_broadcast.value.keys())
return udf(lambda x: one_hot_encoding(x, ohe_dict_broadcast, length), VectorUDT())

print "Original dataset"
print sample_data_df.show(truncate=False)

sample_ohe_dict_manual_broadcast = sc.broadcast(sample_ohe_dict_manual)
print "Dataset with one-hot-encoding"
print sample_data_df.select(to_ohe(sample_ohe_dict_manual_broadcast)("features")).show(truncate=False)

Original dataset
|features                         |
|[[0,mouse], [1,black]]           |
|[[0,cat], [1,tabby], [2,mouse]]  |
|[[0,bear], [1,black], [2,salmon]]|

Dataset with one-hot-encoding
|(7,[2,3],[1.0,1.0])         |
|(7,[1,4,5],[1.0,1.0,1.0])   |
|(7,[0,3,6],[1.0,1.0,1.0])   |



Drop duplicates by some condition

Initial data:

user   hour code count
bob    8    A    8
bob    8    B    12
bob    9    A    10
bob    13   D    3
bob    13   C    3

Result that we want:

user   hour  code   count
bob    8     B      12 ==> keep record with maximum count
bob    9     A      10
bob    13    D      3  ==> When the count is equal, keep one random record

We we for each user, within each hour, there is only one record. And we keep the record such that the “count” is the maximum.

My naive approach

At first I didn’t know about the dropDuplicates() function [read here]. So I came up with much more difficult approach.

I look for the maximum count for each group of user & hour. And then performing the join with the original dataset on user, hour, maximum_value_of_count. I still run into problem of 2 records belonging to the same user and hour have the same number of count. I should keep only one of them, instead of two.

After a little bit of reading

A much more sophisticated solution I found was from these 2 questions in StackOverflow.

Removing duplicates from rows based on specific columns in an RDD/Spark DataFrame

David Griffin provided simple answer with groupBy and then agg. We can do thing like:

myDF.groupBy("user", "hour").agg(max("count"))

However, this one doesn’t return the data frame with cgi. We can try further with:

myDF.groupBy("user", "hour").agg(max("count"), max("cgi"))

And this one doesn’t guarantee that the max(“count”) and max(“cgi”) returned would actually come from the same row from the original data frame myDF.

SPARK DataFrame: select the first row of each group

zero323 gave excellent answer on how to return only the first row for each group. And a group here is defined to be a set of records with the same user and hour value. In the original dataset in the beginning of the post, we have 3 groups in total.


The idea is we need to sort the data according to the group. And then we will keep only the first record in each group with dropDuplicates. The final code is:

import pyspark.sql.functions as F
.orderBy("user", "hour", F.col("count").desc())\
.dropDuplicates(["user", "hour"]).show(10)

The result: the column name is a little bit different, but they are basically the same as the toy example in this post.

|    generationSnHash|hhour|               cgi|eventCount|
|0049d73e3492daeb9...|night|222-01-00002-51723|         4|
|0d772216b15bb7f81...|night|222-01-00001-00995|         4|
|1077b05f12df3311e...|night|222-01-00004-00909|         5|
|2269b2c133b1515a0...|night|222-01-00003-17650|        13|

Without the ordering descendingly for column count, the result would be wrong, for example, notice on the second row, comparing between the second row, the correct DF has the eventCount of 4, and cgi=222-01-00001-00995, while the wrong DF has eventCount=3 and another different cgi.

|    generationSnHash|hhour|               cgi|eventCount|
|0049d73e3492daeb9...|night|222-01-00002-51723|         4|
|0d772216b15bb7f81...|night|222-01-00001-16453|         3|
|1077b05f12df3311e...|night|222-01-00004-00909|         5|
|2269b2c133b1515a0...|night|222-01-00003-00102|         9|

Spark local mode

Spark local mode is one of the 4 ways to run Spark (the others are (i) standalone mode, (ii) YARN mode and (iii) MESOS)

The Web UI for jobs running in local mode by default can be found in: http://localhost:4040

You can change the URL for Spark Web UI – Jobs by setting the object pyspark.SparkConf.

The default spark.ui.port is 4040, we can change it to whatever we want. As in this example, I changed it into 4041.

The whole other configuration options can be found in Spark Configuration documentation.

from pyspark import SparkConf, SparkContext
SPARK_MASTER = "local[*]"

sparkConf = SparkConf().setAll([
("spark.cores.max", "4"),
("spark.executor.memory", "2G"),
("spark.ui.port", "4041")

sc = SparkContext(conf=sparkConf)


How to test Spark

Self-Contained ApplicationsProblem: how to verify that we have Spark installed correctly.


Suppose we have set up correctly everything for spark in the directory $SPARK_HOME/conf. Now invoke spark-shell in the command line.

Suppose that we have a file in the machine that we are running the shell. And the file is at the location /home/hadoop/spark/README.md then in the spark-shell we invoke:

scala&gt; val textFile = sc.textFile("file:/home/hadoop/spark/README.md")
scala&gt; textFile.count()

For more functions to test, you can check out Spark Tutorial – Quick Start


Follow the section Self-Contained Applications in the Quick Start. Remember to modify the variable logFile.

$ spark-submit SimpleApp.py
Lines with a: 60, lines with b: 29


Or you can also running with the example code from Spark. The result is not going to show in your currently running shell. Now let just execute.

$ spark-submit $SPARK_HOME/examples/src/main/python/pi.py 4

You can check for log directory of spark. Try one of these commands

$ cat $SPARK_HOME/conf/spark-evn.sh
export SPARK_DAEMON_JAVA_OPTS="-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70"
export SPARK_LOCAL_DIRS=/mnt/spark,/mnt1/spark,/mnt2/spark,/mnt3/spark
export SPARK_LOG_DIR=/mnt/var/log/apps
export SPARK_CLASSPATH="/home/hadoop/spark/conf:/home/hadoop/conf:/home/hadoop/spark/classpath/distsupplied/*:/home/hadoop/spark/classpath/emr/*:/home/hadoop/spark/classpath/emrfs/*:/home/hadoop/share/hadoop/common/lib/*:/home/hadoop/share/hadoop/common/lib/hadoop-lzo.jar:/usr/share/aws/emr/auxlib/*"


So now try to go to /mnt/var/log/apps directory, and read the stdout file.



Hive on Spark is not working

Problem: in Hive CLI, the simple command doesn’t return a result.

Solution: make sure you have at least one worker (or slave) for Spark Master

hive> select count(*) from subset1_data_stream_with_cgi;

Status: Running (Hive on Spark job[0])
Job Progress Format
CurrentTime StageId_StageAttemptId: SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount [StageCost]
2016-06-30 15:09:54,526    Stage-0_0: 0/1    Stage-1_0: 0/1
2016-06-30 15:09:57,545    Stage-0_0: 0/1    Stage-1_0: 0/1
2016-06-30 15:10:00,561    Stage-0_0: 0/1    Stage-1_0: 0/1

Continue reading “Hive on Spark is not working”

PySpark and Jupyter

Problem: how to run PySpark in Jupyter notebook.

Some assumption before starting:

  • You have Anaconda installed.
  • You have Spark installed. District Data Lab has an exceptional article on how to get started with Spark in Python. It’s long, but detailed.
  • pyspark is in the $PATHvariable.

There are 2 solutions:

  1. The first one, it modified the environment variable that pyspark read. Then the jupyter/ipython notebook with pyspark environment would be started instead of pyspark console.
  2. The second one is installing the separate spark kernel for Jupyter. This way is more flexible, because the spark-kernel from IBM This solution is better because this spark kernel can run code in Scala, Python, Java, SparkSQL.

Continue reading “PySpark and Jupyter”