Portions of this slide deck are original. Portions have been
adapted, with permission, from Databricks training material.
Learning More
NewCircle will be running a public Spark Foundations course here
in Philadelphia, November 3 through November 5. (I'll be teaching it.)
If you're interested, you can find more information here:
http://tinyurl.com/spark-philly-11-3
(I also have a discount code for anyone who wants to register.)
Brief Overview of Spark
Spark is a general computing engine for distributed processing of data.
Spark is:
Fast.
Unlike Hadoop, Spark keeps data in memory as much as possible.
Compact.
In general, you'll write a lot less code to get a job done in Spark
than you would in a framework like Hadoop.
Multi-language.
You can write Spark applications in Scala, Java, Python and R.
Write Less Code
Computing an Average, in Hadoop
private IntWritable one = new IntWritable(1);
private IntWritable output = new IntWritable();
protected void map(LongWritable key, Text value, Context context) {
String[] fields = value.split("\t");
output.set(Integer.parseInt(fields[1]));
context.write(one, output);
}
IntWritable one = new IntWritable(1)
DoubleWritable average = new DoubleWritable();
protected void reduce(IntWritable key,
Iterable<IntWritable> values,
Context context) {
int sum = 0;
int count = 0;
for (IntWritable value: values) {
sum += value.get();
count++;
}
average.set(sum / (double) count);
context.write(key, average);
}
Write Less Code
Computing an Average, in Spark
var data = sc.textFile(...).split("\t")
data.map { x => (x(0), (x(1), 1))) }
.reduceByKey { case (x, y) => (x._1 + y._1, x._2 + y._2) }
.map { x => (x._1, x._2(0) / x._2(1)) }
.collect()
Quick Architectural Overview
Your Spark program is the Driver.
So is the Spark shell.
The driver program is just one part of a Spark application.
Other parts of the application run on the cluster.
Application Flow
What is an RDD?
"RDD" stands for "Resilient Distributed Dataset".
Conceptually, an RDD is a distributed collection of elements. Think of it
as a distributed array or list...
...like an array or list in your program, but
spread out across multiple nodes in the cluster.
An RDD represents partitioned data.
Within your program (the Driver), an RDD object is a
handle to that distributed data.
What is an RDD?
What is an RDD?
An RDD can be created in several ways:
Parallelizing a collection
Reading data from an external (usually distributed) source, like
S3, Cassandra, HDFS, etc.
By transforming an existing RDD into a new RDD.
Via Spark Streaming. (More on this in a bit.)
Creating an RDD
val wordsRDD = sc.parallelize(Array("Nobody", "expects", "the",
"Spanish", "Inquisition"))
val linesRDD = sc.textFile("s3n://my-spark-bucket/foobar.txt")
Transformations
Transformations
Transformations are lazy.
They contribute to the execution plan (called the DAG, for
Directed Acyclic Graph), but they don't execute anything.
No file is read and no transformation is performed until an
action is called.
A RDD transformation is conceptually similar to a transformation
(like filter() or map()) on a Scala collection
view.
Actions
Actions
Actions trigger execution of the DAG, forcing:
the data source to be read (modulo any caching) and
the transformations to be run.
There are two kinds of actions.
Driver: Invoking the action pulls data back to the driver.
Examples: collect(), count()
Distributed: The action executes on the nodes in the cluster.
Examples: saveAsTextFile(), foreach()
Calling an action like collect() is conceptually similar to
calling toArray() on a Scala collection view.
Let's look at some code
DataFrames
Since version 1.3, Spark has had DataFrames.
Based on DataFrames in R and in Python Pandas.
Higher level, and easier to use, than RDDs.
Significantly faster.
DataFrames are the preferred interface to Spark.
Use them instead of RDDs, unless you can't do
what you need to do with DataFrames.
DataFrames are:
Schema-oriented.
A schema consists of named columns and types.
Examples:
Parquet
RDBMS
Hive
JSON (inferred schema)
CSV (inferred schema)
DataFrames
The DataFrames API is a declarative API, providing functions like
select(), filter(), orderBy(),
groupBy(), etc.
DataFrames are tightly tied to Spark SQL.
The DataFrames API often requires even less code than comparable
RDD solutions.
Computing an Average (again)
// RDDs
var data = sc.textFile(...).split("\t")
data.map { x => (x(0), (x(1), 1))) }
.reduceByKey { case (x, y) => (x._1 + y._1, x._2 + y._2) }
.map { x => (x._1, x._2(0) / x._2(1)) }
.collect()
// DataFrames
val df = sqlContext.read.parquet("...")
df.groupBy("name").agg("name", avg("age")).collect()
DataFrames
Here's an example of using a DataFrame to read a Parquet file. Parquet
files have a self-describing schema, so they can be used directly from
the DataFrame API.
val df = sqlContext.read.parquet(
"hdfs://mynamenode:9000/data/employees.parquet"
)
val df2 = df.filter(df("age") < 30)
val youngsters = df2.collect()
val linesRDD = sc.textFile("s3n://my-spark-bucket/foobar.txt")
DataFrames
Here's an example of creating a DataFrame from a file without
a self-describing schema. We start with an RDD and "teach" the DataFrames
API the schema using a case class.
val rdd = sc.textFile(
"hdfs://mynamenode:9000/data/people.csv"
)
case class Person(name: String, gender: String, age: Int)
val personRDD = rdd.map { line =>
val cols = line.split(",")
Person(name=cols(0), gender=cols(1), age=cols(2).toInt)
}
val df = personRDD.toDF
Now that we have some background, let's talk about Spark Streaming.
Spark Streaming
Provides a way to consume continual streams of data.
Scalable, high-throughput, fault-tolerant.
Built on top of Spark Core.
API is very similar to Core Spark API.
If you already know Spark, learning Streaming is easier than learning
the API for a completely unrelated product (like Twitter Storm)
Supports many inputs, like TCP socket, Kafka, Flume, HDFS, S3,
Kinesis (and even Twitter—cool for demos).
Currently based on RDDs, but work is underway to integrate DataFrames.
Spark Streaming
Spark Streaming
Spark Streaming uses a "micro-batch" architecture.
Spark Streaming continuously receives live input data streams and
divides the data into batches.
New batches are created at regular time intervals called
batch intervals.
At the beginning of each time interval a new batch is created.
Any data that arrives during that interval gets added to that batch.
Spark Streaming
At the end of the time interval the batch is done growing.
The size of the time intervals is determined by a parameter called
the batch interval.
The batch interval is typically between 500 milliseconds and
several seconds and is configured by the application developer.
Each input batch forms an RDD.
Each RDD is passed to Spark for
processing in a job.
Thus, the stream is broken into pieces (RDDs) that
are processed individually from other pieces.
Spark Streaming
Spark Streaming is built on an abstraction called
discretized streams, or DStreams.
A DStream is a sequence of data arriving over time.
Internally, each DStream is represented as a sequence of RDDs arriving at
each time step. Each RDD in a DStream contains data from a certain
interval.
DStreams
DStream Operations
A DStream is created from a StreamingContext.
Once a DStream has been created, it offers two kinds of operations:
Transformations, each of which yields a new DStream
Output operations, which write data to an external system
Any operation applied to a DStream translates to operations on the
underlying RDDs that are created during stream processing.
DStreams and RDDs
How is a DStream mapped to RDDs?
There are actually two time intervals: the batch interval and
the block interval.
A new RDD is created at the end of each batch interval.
Data that comes in during the batch interval is broken into blocks,
one block per block interval (which defaults to 200ms).
Thus, a batch consists of multiple blocks.
DStreams and RDDs
Batches become RDDs. Blocks become partitions.
Example
import org.apache.spark.streaming._
val ssc = StreamingContext(sc, Seconds(1))
// Connect to the stream on port 7777 of the streaming host.
val linesStream = ssc.socketTextStream("somehost", 7777)
// Filter our DStream for lines with "error" and dump the
// matching lines to a file.
linesStream.filter { _ contains "error" }
.foreachRDD { _.saveAsTextFile("...") }
// Start the streaming context and wait for it to "finish"
ssc.start()
ssc.awaitTermination()
The Receiver Thread
import org.apache.spark.streaming._
val ssc = StreamingContext(sc, Seconds(1))
// Connect to the stream on port 7777 of the streaming host.
val linesStream = ssc.socketTextStream("somehost", 7777)
// Filter our DStream for lines with "error" and dump the
// matching lines to a file.
linesStream.filter { _ contains "error" }
.foreachRDD { _.saveAsTextFile("hdfs:/mynamenode:9000/data/stream") }
// Start the streaming context and wait for it to "finish"
ssc.start()
ssc.awaitTermination()
The Receiver Thread
If there are multiple DStreams, there are multiple receiver threads,
one per DStream.
Things you can stream
The Scala Streaming API supports streaming from the following sources:
TCP (text) socket
Kafka
Flume
HDFS/S3
Kinesis
In addition, it's easy to write your own custom Streaming receiver
class.