Streaming

Brian Clapper
ArdenTex Inc.

bmc@ardentex.com, @brianclapper

Topics

  • (Brief!) Overview of Spark
    • Quick Architectural Overview
    • RDDs
    • Transformations and Actions
    • DataFrames
  • What is Spark Streaming?
  • How does it work?
  • Things you can stream
  • The Databricks notebook environment
  • Some actual code

Credits

Portions of this slide deck are original. Portions have been adapted, with permission, from Databricks training material.

Learning More

NewCircle will be running a public Spark Foundations course here in Philadelphia, November 3 through November 5. (I'll be teaching it.)

If you're interested, you can find more information here: http://tinyurl.com/spark-philly-11-3 (I also have a discount code for anyone who wants to register.)

Brief Overview of Spark

Spark is a general computing engine for distributed processing of data.

Spark is:

  • Fast. Unlike Hadoop, Spark keeps data in memory as much as possible.
  • Compact. In general, you'll write a lot less code to get a job done in Spark than you would in a framework like Hadoop.
  • Multi-language. You can write Spark applications in Scala, Java, Python and R.

Write Less Code

Computing an Average, in Hadoop

    
private IntWritable one = new IntWritable(1);
private IntWritable output = new IntWritable();
protected void map(LongWritable key, Text value, Context context) {
    String[] fields = value.split("\t");
    output.set(Integer.parseInt(fields[1]));
    context.write(one, output);
}

IntWritable one = new IntWritable(1)
DoubleWritable average = new DoubleWritable();

protected void reduce(IntWritable key,
                      Iterable<IntWritable> values,
                      Context context) {
    int sum = 0;
    int count = 0;
    for (IntWritable value: values) {
        sum += value.get();
        count++;
    }
    average.set(sum / (double) count);
    context.write(key, average);
}
    
  

Write Less Code

Computing an Average, in Spark

    
var data = sc.textFile(...).split("\t")
data.map { x => (x(0), (x(1), 1))) }
    .reduceByKey { case (x, y) => (x._1 + y._1, x._2 + y._2) }
    .map { x => (x._1, x._2(0) / x._2(1)) }
    .collect()
    
  

Quick Architectural Overview

  • Your Spark program is the Driver.
  • So is the Spark shell.
  • The driver program is just one part of a Spark application.
  • Other parts of the application run on the cluster.

Application Flow

Worker Worker Driver Program Master 1. Spark application starts on 2-node cluster. 1 2. Driver contacts Master, requesting resources. 2 3. Master contacts workers. 3 T T Executor 4. Workers spawn Executor JVMs. T T Executor 4 4 5. Executors connect to Driver. 5 RDD RDD RDD RDD 6. Executors work on distributed data. 6 6

What is an RDD?

  • "RDD" stands for "Resilient Distributed Dataset".
  • Conceptually, an RDD is a distributed collection of elements. Think of it as a distributed array or list...
  • ...like an array or list in your program, but spread out across multiple nodes in the cluster.
  • An RDD represents partitioned data.
  • Within your program (the Driver), an RDD object is a handle to that distributed data.

What is an RDD?

item-1 item-2 item-3 item-4 item-5 item-21 item-22 item-23 item-24 item-25 item-16 item-17 item-18 item-19 item-20 item-11 item-12 item-13 item-14 item-15 item-6 item-7 item-8 item-9 item-10 Worker T T Executor RDD RDD Worker T T Executor RDD RDD Worker T T Executor RDD

What is an RDD?

An RDD can be created in several ways:

  • Parallelizing a collection
  • Reading data from an external (usually distributed) source, like S3, Cassandra, HDFS, etc.
  • By transforming an existing RDD into a new RDD.
  • Via Spark Streaming. (More on this in a bit.)

Creating an RDD

    
val wordsRDD = sc.parallelize(Array("Nobody", "expects", "the",
                                    "Spanish", "Inquisition"))

val linesRDD = sc.textFile("s3n://my-spark-bucket/foobar.txt")
    
  

Transformations

Error <ts> msg1 Warn <ts> msg2 Error <ts> msg3 Error <ts> msg12 Error <ts> msg13 Warn <ts> msg14 Info <ts> msg8 Error <ts> msg9 Error <ts> msg10 Info <ts> msg11 Info <ts> msg4 Warn <ts> msg5 Info <ts> msg6 Error <ts> msg7 LinesRDD filter(_ contains "Error") Error <ts> msg1 Error <ts> msg3 Error <ts> msg12 Error <ts> msg13 Error <ts> msg9 Error <ts> msg10 Error <ts> msg7 ErrorsRDD

Transformations

  • Transformations are lazy.
  • They contribute to the execution plan (called the DAG, for Directed Acyclic Graph), but they don't execute anything.
  • No file is read and no transformation is performed until an action is called.
  • A RDD transformation is conceptually similar to a transformation (like filter() or map()) on a Scala collection view.

Actions

Error <ts> msg1 Error <ts> msg3 Error <ts> msg12 Error <ts> msg13 Error <ts> msg9 Error <ts> msg10 Error <ts> msg7 ErrorsRDD collect() Driver Program

Actions

  • Actions trigger execution of the DAG, forcing:
    • the data source to be read (modulo any caching) and
    • the transformations to be run.
  • There are two kinds of actions.
    • Driver: Invoking the action pulls data back to the driver. Examples: collect(), count()
    • Distributed: The action executes on the nodes in the cluster. Examples: saveAsTextFile(), foreach()
  • Calling an action like collect() is conceptually similar to calling toArray() on a Scala collection view.

Let's look at some code

DataFrames

Since version 1.3, Spark has had DataFrames.

  • Based on DataFrames in R and in Python Pandas.
  • Higher level, and easier to use, than RDDs.
  • Significantly faster.
  • DataFrames are the preferred interface to Spark.
    • Use them instead of RDDs, unless you can't do what you need to do with DataFrames.

DataFrames are:

  • Schema-oriented.
  • A schema consists of named columns and types.
  • Examples:
    • Parquet
    • RDBMS
    • Hive
    • JSON (inferred schema)
    • CSV (inferred schema)

DataFrames

The DataFrames API is a declarative API, providing functions like select(), filter(), orderBy(), groupBy(), etc.

DataFrames are tightly tied to Spark SQL.

The DataFrames API often requires even less code than comparable RDD solutions.

Computing an Average (again)

    
// RDDs

var data = sc.textFile(...).split("\t")
data.map { x => (x(0), (x(1), 1))) }
    .reduceByKey { case (x, y) => (x._1 + y._1, x._2 + y._2) }
    .map { x => (x._1, x._2(0) / x._2(1)) }
    .collect()

// DataFrames
val df = sqlContext.read.parquet("...")
df.groupBy("name").agg("name", avg("age")).collect()
    
  

DataFrames

Here's an example of using a DataFrame to read a Parquet file. Parquet files have a self-describing schema, so they can be used directly from the DataFrame API.


    
val df = sqlContext.read.parquet(
   "hdfs://mynamenode:9000/data/employees.parquet"
)
val df2 = df.filter(df("age") < 30)
val youngsters = df2.collect()

val linesRDD = sc.textFile("s3n://my-spark-bucket/foobar.txt")
    
  

DataFrames

Here's an example of creating a DataFrame from a file without a self-describing schema. We start with an RDD and "teach" the DataFrames API the schema using a case class.


    
val rdd = sc.textFile(
  "hdfs://mynamenode:9000/data/people.csv"
)

case class Person(name: String, gender: String, age: Int)

val personRDD = rdd.map { line =>
  val cols = line.split(",")
  Person(name=cols(0), gender=cols(1), age=cols(2).toInt)
}

val df = personRDD.toDF
    
  

DataFrames

That's a super-brief overview of DataFrames. We'll be seeing a few of them in some of the code examples. For more information, see the Spark SQL and DataFrame Guide, at http://spark.apache.org/docs/latest/sql-programming-guide.html

Enough background. What about streaming?

Now that we have some background, let's talk about Spark Streaming.

Spark Streaming

  • Provides a way to consume continual streams of data.
  • Scalable, high-throughput, fault-tolerant.
  • Built on top of Spark Core.
    • API is very similar to Core Spark API.
    • If you already know Spark, learning Streaming is easier than learning the API for a completely unrelated product (like Twitter Storm)
  • Supports many inputs, like TCP socket, Kafka, Flume, HDFS, S3, Kinesis (and even Twitter—cool for demos).
  • Currently based on RDDs, but work is underway to integrate DataFrames.

Spark Streaming

Spark Streaming Spark Core Input data stream Batches of RDDs every N seconds Batches of processed data

Spark Streaming

  • Spark Streaming uses a "micro-batch" architecture.
  • Spark Streaming continuously receives live input data streams and divides the data into batches.
  • New batches are created at regular time intervals called batch intervals.
    • At the beginning of each time interval a new batch is created.
    • Any data that arrives during that interval gets added to that batch.

Spark Streaming

  • At the end of the time interval the batch is done growing.
    • The size of the time intervals is determined by a parameter called the batch interval.
    • The batch interval is typically between 500 milliseconds and several seconds and is configured by the application developer.
  • Each input batch forms an RDD.
  • Each RDD is passed to Spark for processing in a job.
  • Thus, the stream is broken into pieces (RDDs) that are processed individually from other pieces.

Spark Streaming

Spark Streaming is built on an abstraction called discretized streams, or DStreams.

A DStream is a sequence of data arriving over time.

Internally, each DStream is represented as a sequence of RDDs arriving at each time step. Each RDD in a DStream contains data from a certain interval.

DStreams

Block 1 Block 2 Block 3 Block 1 Block 2 Block 3 T = 5 T = 10 RDD @ T=5 RDD @ T=10 Input DStream Batch interval = 5 seconds

DStream Operations

  • A DStream is created from a StreamingContext.
  • Once a DStream has been created, it offers two kinds of operations:
    • Transformations, each of which yields a new DStream
    • Output operations, which write data to an external system
  • Any operation applied to a DStream translates to operations on the underlying RDDs that are created during stream processing.

DStreams and RDDs

How is a DStream mapped to RDDs?

  • There are actually two time intervals: the batch interval and the block interval.
  • A new RDD is created at the end of each batch interval.
  • Data that comes in during the batch interval is broken into blocks, one block per block interval (which defaults to 200ms).
  • Thus, a batch consists of multiple blocks.

DStreams and RDDs

Batches become RDDs. Blocks become partitions.

T = 5 Batch Stream Partition 1 Partition 3 Partition 2 Block 1 Block 2 Block 3 RDD

Example

    
import org.apache.spark.streaming._

val ssc = StreamingContext(sc, Seconds(1))

// Connect to the stream on port 7777 of the streaming host.
val linesStream = ssc.socketTextStream("somehost", 7777)

// Filter our DStream for lines with "error" and dump the
// matching lines to a file.
linesStream.filter { _ contains "error" }
           .foreachRDD { _.saveAsTextFile("...") }

// Start the streaming context and wait for it to "finish"
ssc.start()
ssc.awaitTermination()
    
  

The Receiver Thread

Worker Executor RDD 1 (P1) S T S S S R Driver Worker Executor RDD 1 (P2) T S S S S S 3-node cluster Worker Executor RDD 1 (P3) S T S S S S S source of streaming data block, P1 block, P1 block, P2 block, P2 block, P1 block, P1 block, P2 block, P2 block, P1 block, P1 block, P3 block, P3 RDD 2 (P2) block, P2 RDD 2 (P1) block, P1 block, P3 RDD 2 (P3) T T T
    
import org.apache.spark.streaming._

val ssc = StreamingContext(sc, Seconds(1))

// Connect to the stream on port 7777 of the streaming host.
val linesStream = ssc.socketTextStream("somehost", 7777)

// Filter our DStream for lines with "error" and dump the
// matching lines to a file.
linesStream.filter { _ contains "error" }
           .foreachRDD { _.saveAsTextFile("hdfs:/mynamenode:9000/data/stream") }

// Start the streaming context and wait for it to "finish"
ssc.start()
ssc.awaitTermination()
    
  

The Receiver Thread

If there are multiple DStreams, there are multiple receiver threads, one per DStream.

Worker Executor RDD 1 (P1) S T S S S R Driver Worker Executor RDD 1 (P2) T S S S S S Worker Executor RDD 1 (P3) S T S S S S S source of streaming data Kafka source R

Things you can stream

The Scala Streaming API supports streaming from the following sources:

  • TCP (text) socket
  • Kafka
  • Flume
  • HDFS/S3
  • Kinesis

In addition, it's easy to write your own custom Streaming receiver class.

Let's look at some code.

Demo time...