Drop duplicates by some condition

Initial data:

user   hour code count
bob    8    A    8
bob    8    B    12
bob    9    A    10
bob    13   D    3
bob    13   C    3

Result that we want:

user   hour  code   count
bob    8     B      12 ==> keep record with maximum count
bob    9     A      10
bob    13    D      3  ==> When the count is equal, keep one random record

We we for each user, within each hour, there is only one record. And we keep the record such that the “count” is the maximum.

My naive approach

At first I didn’t know about the dropDuplicates() function [read here]. So I came up with much more difficult approach.

I look for the maximum count for each group of user & hour. And then performing the join with the original dataset on user, hour, maximum_value_of_count. I still run into problem of 2 records belonging to the same user and hour have the same number of count. I should keep only one of them, instead of two.

After a little bit of reading

A much more sophisticated solution I found was from these 2 questions in StackOverflow.

Removing duplicates from rows based on specific columns in an RDD/Spark DataFrame

David Griffin provided simple answer with groupBy and then agg. We can do thing like:

myDF.groupBy("user", "hour").agg(max("count"))

However, this one doesn’t return the data frame with cgi. We can try further with:

myDF.groupBy("user", "hour").agg(max("count"), max("cgi"))

And this one doesn’t guarantee that the max(“count”) and max(“cgi”) returned would actually come from the same row from the original data frame myDF.

SPARK DataFrame: select the first row of each group

zero323 gave excellent answer on how to return only the first row for each group. And a group here is defined to be a set of records with the same user and hour value. In the original dataset in the beginning of the post, we have 3 groups in total.


The idea is we need to sort the data according to the group. And then we will keep only the first record in each group with dropDuplicates. The final code is:

import pyspark.sql.functions as F
.orderBy("user", "hour", F.col("count").desc())\
.dropDuplicates(["user", "hour"]).show(10)

The result: the column name is a little bit different, but they are basically the same as the toy example in this post.

|    generationSnHash|hhour|               cgi|eventCount|
|0049d73e3492daeb9...|night|222-01-00002-51723|         4|
|0d772216b15bb7f81...|night|222-01-00001-00995|         4|
|1077b05f12df3311e...|night|222-01-00004-00909|         5|
|2269b2c133b1515a0...|night|222-01-00003-17650|        13|

Without the ordering descendingly for column count, the result would be wrong, for example, notice on the second row, comparing between the second row, the correct DF has the eventCount of 4, and cgi=222-01-00001-00995, while the wrong DF has eventCount=3 and another different cgi.

|    generationSnHash|hhour|               cgi|eventCount|
|0049d73e3492daeb9...|night|222-01-00002-51723|         4|
|0d772216b15bb7f81...|night|222-01-00001-16453|         3|
|1077b05f12df3311e...|night|222-01-00004-00909|         5|
|2269b2c133b1515a0...|night|222-01-00003-00102|         9|

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s