Drop duplicates by some condition

Initial data:

user   hour code count
=============================
bob    8    A    8
bob    8    B    12
bob    9    A    10
bob    13   D    3
bob    13   C    3

Result that we want:

user   hour  code   count
=============================
bob    8     B      12 ==> keep record with maximum count
bob    9     A      10
bob    13    D      3  ==> When the count is equal, keep one random record

We we for each user, within each hour, there is only one record. And we keep the record such that the “count” is the maximum.

My naive approach

At first I didn’t know about the dropDuplicates() function [read here]. So I came up with much more difficult approach.

I look for the maximum count for each group of user & hour. And then performing the join with the original dataset on user, hour, maximum_value_of_count. I still run into problem of 2 records belonging to the same user and hour have the same number of count. I should keep only one of them, instead of two.

After a little bit of reading

A much more sophisticated solution I found was from these 2 questions in StackOverflow.

Removing duplicates from rows based on specific columns in an RDD/Spark DataFrame

David Griffin provided simple answer with groupBy and then agg. We can do thing like:

myDF.groupBy("user", "hour").agg(max("count"))

However, this one doesn’t return the data frame with cgi. We can try further with:

myDF.groupBy("user", "hour").agg(max("count"), max("cgi"))

And this one doesn’t guarantee that the max(“count”) and max(“cgi”) returned would actually come from the same row from the original data frame myDF.

SPARK DataFrame: select the first row of each group

zero323 gave excellent answer on how to return only the first row for each group. And a group here is defined to be a set of records with the same user and hour value. In the original dataset in the beginning of the post, we have 3 groups in total.

Solution

The idea is we need to sort the data according to the group. And then we will keep only the first record in each group with dropDuplicates. The final code is:

import pyspark.sql.functions as F
myDF\
.orderBy("user", "hour", F.col("count").desc())\
.dropDuplicates(["user", "hour"]).show(10)

The result: the column name is a little bit different, but they are basically the same as the toy example in this post.

# CORRECT RESULT
+--------------------+-----+------------------+----------+
|    generationSnHash|hhour|               cgi|eventCount|
+--------------------+-----+------------------+----------+
|0049d73e3492daeb9...|night|222-01-00002-51723|         4|
|0d772216b15bb7f81...|night|222-01-00001-00995|         4|
|1077b05f12df3311e...|night|222-01-00004-00909|         5|
|2269b2c133b1515a0...|night|222-01-00003-17650|        13|

Without the ordering descendingly for column count, the result would be wrong, for example, notice on the second row, comparing between the second row, the correct DF has the eventCount of 4, and cgi=222-01-00001-00995, while the wrong DF has eventCount=3 and another different cgi.

# WRONG RESULT
+--------------------+-----+------------------+----------+
|    generationSnHash|hhour|               cgi|eventCount|
+--------------------+-----+------------------+----------+
|0049d73e3492daeb9...|night|222-01-00002-51723|         4|
|0d772216b15bb7f81...|night|222-01-00001-16453|         3|
|1077b05f12df3311e...|night|222-01-00004-00909|         5|
|2269b2c133b1515a0...|night|222-01-00003-00102|         9|
Advertisements

Spark local mode

Spark local mode is one of the 4 ways to run Spark (the others are (i) standalone mode, (ii) YARN mode and (iii) MESOS)

The Web UI for jobs running in local mode by default can be found in: http://localhost:4040

You can change the URL for Spark Web UI – Jobs by setting the object pyspark.SparkConf.

The default spark.ui.port is 4040, we can change it to whatever we want. As in this example, I changed it into 4041.

The whole other configuration options can be found in Spark Configuration documentation.

from pyspark import SparkConf, SparkContext
SPARK_MASTER = "local[*]"

sparkConf = SparkConf().setAll([
("spark.cores.max", "4"),
("spark.executor.memory", "2G"),
("spark.ui.port", "4041")
]).setMaster(
SPARK_MASTER).setAppName(
"Preprocessing")

sc = SparkContext(conf=sparkConf)

 

Missing “=” sign

I was struggling with this piece of code. I have one similar piece of code, and it was running perfectly. I have to go back and forth, delete gradually pieces and pieces of code to compare between the two.

Turn out that the mistake is really really small, as shown in line 38.

I'M MISSING THE =

The problem I got:

Name: org.apache.spark.SparkException
Message: Job aborted due to stage failure: Task 0 in stage 365.0 failed 4 times, most recent failure: Lost task 0.3 in stage 365.0 (TID 12524, lamar.homenet.telecomitalia.it): java.lang.ClassCastException

Continue reading “Missing “=” sign”

Problem with configuring Scala Spark Application

I want to pass the configuration for the memory of the executors, the number of executor instances, the cores they are allowed to use.

Working solutions

val conf = new SparkConf()
.setAppName("WordCount")
.setMaster("spark://localhost:7077")
.set("spark.executor.memory", "2G")

But only the spark.executor.memory seems to be in effect. Other settings such as:

val conf = new SparkConf()
.setAppName("WordCount")
.setMaster("spark://localhost:7077")
.set("spark.executor.cores", "2")
.set("spark.executor.instances", "2")

does not work.

Continue reading “Problem with configuring Scala Spark Application”

Cannot write to csv with spark-csv in Scala

Problem:

Name: java.lang.NoClassDefFoundError
Message: Could not initialize class com.databricks.spark.csv.util.CompressionCodecs$

spark-scala-jupyter-csv-error.png

One line answer: make sure that the scala version is the same for

  • the scala used to compile spark
  • the spark-csv module
  • the spark running your system

Continue reading “Cannot write to csv with spark-csv in Scala”

Spark 1.2 vs 1.6

I was reading quite old book “Learning Spark” by Oreilly. It was targeted towards Spark 1.1. And since Spark 1.3, lots of new feature were incorporated, notable thing would be Data Frame.

# Code to load csv file into RDD
def loadRecord(line):
    """Parse a CSV line"""
    input = io.StringIO(line)
    reader = csv.DictReader(input)
    return next(reader)

file_path='file:/home/hduser/Desktop/file.csv'
input = sc.textFile(file_path).map(loadRecord)

The blog DataFrame Spark 1.5 from csv file – NodalPoint encourage to use the spark-csv library from databricks.

<code class="python plain">sqlContext.read.</code><code class="python functions">format</code><code class="python plain">(</code><code class="python string">'com.databricks.spark.csv'</code><code class="python plain">).options(header</code><code class="python keyword">=</code><code class="python string">'true'</code><code class="python plain">).load(</code><code class="python string">file_path</code><code class="python plain">)</code>

Automatically load spark-csv library

You don’t want to specify pyspark --package com.databricks:spark-csv everytime you need to use pyspark shell. You can do this:

# Add to $SPARK_HOME/conf/spark-defaults.conf
spark.jars.packages com.databricks:spark-csv_2.11:1.4.0

Then PySpark notebook in Jupyter, it can benefits from this thing as well.

How to test Spark

Self-Contained ApplicationsProblem: how to verify that we have Spark installed correctly.

spark-shell

Suppose we have set up correctly everything for spark in the directory $SPARK_HOME/conf. Now invoke spark-shell in the command line.

Suppose that we have a file in the machine that we are running the shell. And the file is at the location /home/hadoop/spark/README.md then in the spark-shell we invoke:

scala&gt; val textFile = sc.textFile("file:/home/hadoop/spark/README.md")
scala&gt; textFile.count()

For more functions to test, you can check out Spark Tutorial – Quick Start

spark-submit

Follow the section Self-Contained Applications in the Quick Start. Remember to modify the variable logFile.

$ spark-submit SimpleApp.py
Lines with a: 60, lines with b: 29

 

Or you can also running with the example code from Spark. The result is not going to show in your currently running shell. Now let just execute.

$ spark-submit $SPARK_HOME/examples/src/main/python/pi.py 4

You can check for log directory of spark. Try one of these commands

$ cat $SPARK_HOME/conf/spark-evn.sh
export SPARK_DAEMON_JAVA_OPTS="-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70"
export SPARK_LOCAL_DIRS=/mnt/spark,/mnt1/spark,/mnt2/spark,/mnt3/spark
export SPARK_LOG_DIR=/mnt/var/log/apps
export SPARK_CLASSPATH="/home/hadoop/spark/conf:/home/hadoop/conf:/home/hadoop/spark/classpath/distsupplied/*:/home/hadoop/spark/classpath/emr/*:/home/hadoop/spark/classpath/emrfs/*:/home/hadoop/share/hadoop/common/lib/*:/home/hadoop/share/hadoop/common/lib/hadoop-lzo.jar:/usr/share/aws/emr/auxlib/*"

$ echo $SPARK_LOG_DIR
/mnt/var/log/apps

So now try to go to /mnt/var/log/apps directory, and read the stdout file.