Week 45

Running rubocop on changed files Really nice article describing about how to use rubicon when developing new features

# before committing
git ls-files -m | xargs ls -1 2>/dev/null
# before pushing
git diff-tree -r --no-commit-id --name-only master@\{u\} head | xargs ls -1 2>/dev/null

Drop duplicates by some condition

Initial data:

user   hour code count
bob    8    A    8
bob    8    B    12
bob    9    A    10
bob    13   D    3
bob    13   C    3

Result that we want:

user   hour  code   count
bob    8     B      12 ==> keep record with maximum count
bob    9     A      10
bob    13    D      3  ==> When the count is equal, keep one random record

We we for each user, within each hour, there is only one record. And we keep the record such that the “count” is the maximum.

My naive approach

At first I didn’t know about the dropDuplicates() function [read here]. So I came up with much more difficult approach.

I look for the maximum count for each group of user & hour. And then performing the join with the original dataset on user, hour, maximum_value_of_count. I still run into problem of 2 records belonging to the same user and hour have the same number of count. I should keep only one of them, instead of two.

After a little bit of reading

A much more sophisticated solution I found was from these 2 questions in StackOverflow.

Removing duplicates from rows based on specific columns in an RDD/Spark DataFrame

David Griffin provided simple answer with groupBy and then agg. We can do thing like:

myDF.groupBy("user", "hour").agg(max("count"))

However, this one doesn’t return the data frame with cgi. We can try further with:

myDF.groupBy("user", "hour").agg(max("count"), max("cgi"))

And this one doesn’t guarantee that the max(“count”) and max(“cgi”) returned would actually come from the same row from the original data frame myDF.

SPARK DataFrame: select the first row of each group

zero323 gave excellent answer on how to return only the first row for each group. And a group here is defined to be a set of records with the same user and hour value. In the original dataset in the beginning of the post, we have 3 groups in total.


The idea is we need to sort the data according to the group. And then we will keep only the first record in each group with dropDuplicates. The final code is:

import pyspark.sql.functions as F
.orderBy("user", "hour", F.col("count").desc())\
.dropDuplicates(["user", "hour"]).show(10)

The result: the column name is a little bit different, but they are basically the same as the toy example in this post.

|    generationSnHash|hhour|               cgi|eventCount|
|0049d73e3492daeb9...|night|222-01-00002-51723|         4|
|0d772216b15bb7f81...|night|222-01-00001-00995|         4|
|1077b05f12df3311e...|night|222-01-00004-00909|         5|
|2269b2c133b1515a0...|night|222-01-00003-17650|        13|

Without the ordering descendingly for column count, the result would be wrong, for example, notice on the second row, comparing between the second row, the correct DF has the eventCount of 4, and cgi=222-01-00001-00995, while the wrong DF has eventCount=3 and another different cgi.

|    generationSnHash|hhour|               cgi|eventCount|
|0049d73e3492daeb9...|night|222-01-00002-51723|         4|
|0d772216b15bb7f81...|night|222-01-00001-16453|         3|
|1077b05f12df3311e...|night|222-01-00004-00909|         5|
|2269b2c133b1515a0...|night|222-01-00003-00102|         9|

Spark local mode

Spark local mode is one of the 4 ways to run Spark (the others are (i) standalone mode, (ii) YARN mode and (iii) MESOS)

The Web UI for jobs running in local mode by default can be found in: http://localhost:4040

You can change the URL for Spark Web UI – Jobs by setting the object pyspark.SparkConf.

The default spark.ui.port is 4040, we can change it to whatever we want. As in this example, I changed it into 4041.

The whole other configuration options can be found in Spark Configuration documentation.

from pyspark import SparkConf, SparkContext
SPARK_MASTER = "local[*]"

sparkConf = SparkConf().setAll([
("spark.cores.max", "4"),
("spark.executor.memory", "2G"),
("spark.ui.port", "4041")

sc = SparkContext(conf=sparkConf)


Problem with configuring Scala Spark Application

I want to pass the configuration for the memory of the executors, the number of executor instances, the cores they are allowed to use.

Working solutions

val conf = new SparkConf()
.set("spark.executor.memory", "2G")

But only the spark.executor.memory seems to be in effect. Other settings such as:

val conf = new SparkConf()
.set("spark.executor.cores", "2")
.set("spark.executor.instances", "2")

does not work.

Continue reading “Problem with configuring Scala Spark Application”

Spark 1.2 vs 1.6

I was reading quite old book “Learning Spark” by Oreilly. It was targeted towards Spark 1.1. And since Spark 1.3, lots of new feature were incorporated, notable thing would be Data Frame.

# Code to load csv file into RDD
def loadRecord(line):
    """Parse a CSV line"""
    input = io.StringIO(line)
    reader = csv.DictReader(input)
    return next(reader)

input = sc.textFile(file_path).map(loadRecord)

The blog DataFrame Spark 1.5 from csv file – NodalPoint encourage to use the spark-csv library from databricks.

<code class="python plain">sqlContext.read.</code><code class="python functions">format</code><code class="python plain">(</code><code class="python string">'com.databricks.spark.csv'</code><code class="python plain">).options(header</code><code class="python keyword">=</code><code class="python string">'true'</code><code class="python plain">).load(</code><code class="python string">file_path</code><code class="python plain">)</code>

Automatically load spark-csv library

You don’t want to specify pyspark --package com.databricks:spark-csv everytime you need to use pyspark shell. You can do this:

# Add to $SPARK_HOME/conf/spark-defaults.conf
spark.jars.packages com.databricks:spark-csv_2.11:1.4.0

Then PySpark notebook in Jupyter, it can benefits from this thing as well.

How to test Spark

Self-Contained ApplicationsProblem: how to verify that we have Spark installed correctly.


Suppose we have set up correctly everything for spark in the directory $SPARK_HOME/conf. Now invoke spark-shell in the command line.

Suppose that we have a file in the machine that we are running the shell. And the file is at the location /home/hadoop/spark/README.md then in the spark-shell we invoke:

scala&gt; val textFile = sc.textFile("file:/home/hadoop/spark/README.md")
scala&gt; textFile.count()

For more functions to test, you can check out Spark Tutorial – Quick Start


Follow the section Self-Contained Applications in the Quick Start. Remember to modify the variable logFile.

$ spark-submit SimpleApp.py
Lines with a: 60, lines with b: 29


Or you can also running with the example code from Spark. The result is not going to show in your currently running shell. Now let just execute.

$ spark-submit $SPARK_HOME/examples/src/main/python/pi.py 4

You can check for log directory of spark. Try one of these commands

$ cat $SPARK_HOME/conf/spark-evn.sh
export SPARK_DAEMON_JAVA_OPTS="-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70"
export SPARK_LOCAL_DIRS=/mnt/spark,/mnt1/spark,/mnt2/spark,/mnt3/spark
export SPARK_LOG_DIR=/mnt/var/log/apps
export SPARK_CLASSPATH="/home/hadoop/spark/conf:/home/hadoop/conf:/home/hadoop/spark/classpath/distsupplied/*:/home/hadoop/spark/classpath/emr/*:/home/hadoop/spark/classpath/emrfs/*:/home/hadoop/share/hadoop/common/lib/*:/home/hadoop/share/hadoop/common/lib/hadoop-lzo.jar:/usr/share/aws/emr/auxlib/*"


So now try to go to /mnt/var/log/apps directory, and read the stdout file.