Drop duplicates by some condition

Initial data:

user   hour code count
=============================
bob    8    A    8
bob    8    B    12
bob    9    A    10
bob    13   D    3
bob    13   C    3

Result that we want:

user   hour  code   count
=============================
bob    8     B      12 ==> keep record with maximum count
bob    9     A      10
bob    13    D      3  ==> When the count is equal, keep one random record

We we for each user, within each hour, there is only one record. And we keep the record such that the “count” is the maximum.

My naive approach

At first I didn’t know about the dropDuplicates() function [read here]. So I came up with much more difficult approach.

I look for the maximum count for each group of user & hour. And then performing the join with the original dataset on user, hour, maximum_value_of_count. I still run into problem of 2 records belonging to the same user and hour have the same number of count. I should keep only one of them, instead of two.

After a little bit of reading

A much more sophisticated solution I found was from these 2 questions in StackOverflow.

Removing duplicates from rows based on specific columns in an RDD/Spark DataFrame

David Griffin provided simple answer with groupBy and then agg. We can do thing like:

myDF.groupBy("user", "hour").agg(max("count"))

However, this one doesn’t return the data frame with cgi. We can try further with:

myDF.groupBy("user", "hour").agg(max("count"), max("cgi"))

And this one doesn’t guarantee that the max(“count”) and max(“cgi”) returned would actually come from the same row from the original data frame myDF.

SPARK DataFrame: select the first row of each group

zero323 gave excellent answer on how to return only the first row for each group. And a group here is defined to be a set of records with the same user and hour value. In the original dataset in the beginning of the post, we have 3 groups in total.

Solution

The idea is we need to sort the data according to the group. And then we will keep only the first record in each group with dropDuplicates. The final code is:

import pyspark.sql.functions as F
myDF\
.orderBy("user", "hour", F.col("count").desc())\
.dropDuplicates(["user", "hour"]).show(10)

The result: the column name is a little bit different, but they are basically the same as the toy example in this post.

# CORRECT RESULT
+--------------------+-----+------------------+----------+
|    generationSnHash|hhour|               cgi|eventCount|
+--------------------+-----+------------------+----------+
|0049d73e3492daeb9...|night|222-01-00002-51723|         4|
|0d772216b15bb7f81...|night|222-01-00001-00995|         4|
|1077b05f12df3311e...|night|222-01-00004-00909|         5|
|2269b2c133b1515a0...|night|222-01-00003-17650|        13|

Without the ordering descendingly for column count, the result would be wrong, for example, notice on the second row, comparing between the second row, the correct DF has the eventCount of 4, and cgi=222-01-00001-00995, while the wrong DF has eventCount=3 and another different cgi.

# WRONG RESULT
+--------------------+-----+------------------+----------+
|    generationSnHash|hhour|               cgi|eventCount|
+--------------------+-----+------------------+----------+
|0049d73e3492daeb9...|night|222-01-00002-51723|         4|
|0d772216b15bb7f81...|night|222-01-00001-16453|         3|
|1077b05f12df3311e...|night|222-01-00004-00909|         5|
|2269b2c133b1515a0...|night|222-01-00003-00102|         9|
Advertisements

How to test Spark

Self-Contained ApplicationsProblem: how to verify that we have Spark installed correctly.

spark-shell

Suppose we have set up correctly everything for spark in the directory $SPARK_HOME/conf. Now invoke spark-shell in the command line.

Suppose that we have a file in the machine that we are running the shell. And the file is at the location /home/hadoop/spark/README.md then in the spark-shell we invoke:

scala> val textFile = sc.textFile("file:/home/hadoop/spark/README.md")
scala> textFile.count()

For more functions to test, you can check out Spark Tutorial – Quick Start

spark-submit

Follow the section Self-Contained Applications in the Quick Start. Remember to modify the variable logFile.

$ spark-submit SimpleApp.py
Lines with a: 60, lines with b: 29

 

Or you can also running with the example code from Spark. The result is not going to show in your currently running shell. Now let just execute.

$ spark-submit $SPARK_HOME/examples/src/main/python/pi.py 4

You can check for log directory of spark. Try one of these commands

$ cat $SPARK_HOME/conf/spark-evn.sh
export SPARK_DAEMON_JAVA_OPTS="-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70"
export SPARK_LOCAL_DIRS=/mnt/spark,/mnt1/spark,/mnt2/spark,/mnt3/spark
export SPARK_LOG_DIR=/mnt/var/log/apps
export SPARK_CLASSPATH="/home/hadoop/spark/conf:/home/hadoop/conf:/home/hadoop/spark/classpath/distsupplied/*:/home/hadoop/spark/classpath/emr/*:/home/hadoop/spark/classpath/emrfs/*:/home/hadoop/share/hadoop/common/lib/*:/home/hadoop/share/hadoop/common/lib/hadoop-lzo.jar:/usr/share/aws/emr/auxlib/*"

$ echo $SPARK_LOG_DIR
/mnt/var/log/apps

So now try to go to /mnt/var/log/apps directory, and read the stdout file.