Brian Clapper, @brianclapper
... compile-time type-safe
... lazy.
... based on the Scala collections API, so the operations are familiar to Scala programmers.
So much so, in fact, that it can be confusing to new users:
Return a new RDD that contains all matching values by applying f
.
Return an array that contains all of the elements in this RDD.
This blob of code creates an RDD from a file on a distributed file system.
val rdd = sc.textFile("hdfs:/user/bmc/wikipedia-pagecounts.gz")
val parsedRDD = rdd.flatMap { line =>
line.split("""\s+""") match {
case Array(project, page, numRequests, _) => Some((project, page, numRequests))
case _ => None
}
}
parsedRDD.filter { case (project, page, numRequests) => project == "en" }.
map { case (_, page, numRequests) => (page, numRequests) }.
reduceByKey(_ + _).
take(100).
foreach { case (page, requests) => println(s"$page: $requests") }
RDDs are type-safe. However, they're also low-level, and they suffer from some problems, including:
parsedRDD.filter { case (project, page, numRequests) => project == "en" }.
map { case (_, page, numRequests) => (page, numRequests) }.
reduceByKey(_ + _). <--- INEFFICIENT
filter { case (page, _) => ! isSpecialPage(page) }. <--- ORDERING
take(100).
foreach { case (project, requests) => println(s"project: $requests") }
The DataFrame API provides a higher-level abstraction (a DSL, really), allowing you to use a query language to manipulate data. In fact, you can use SQL, as well.
This code does essentially the same thing the previous RDD code does. Look how much easier it is to read.
val df = parsedRDD.toDF("project", "page", "numRequests")
df.filter($"project" === "en").
groupBy($"page").
agg(sum($"numRequests").as("count")).
limit(100).
show(100)
Here's the same thing in SQL.
df.registerTempTable("edits")
sqlContext.sql("""|SELECT page, sum(numRequests) AS count FROM edits
|WHERE project = 'en'
|GROUP BY page LIMIT 100""".stripMargin)
.show(100)
users.join(events, users("id") === events("uid"))
.filter(events("date") > "2015-01-01")
Because of the optimization, they tend to outperform RDDs.
What happens if we call the collect()
action?
scala> :type df.collect()
Array[org.apache.spark.sql.Row]
Unfortunately, Row
isn't typesafe. It's defined as
trait Row extends Serializable
Mapping it back to something useful is ugly and error-prone:
df.collect().map { row =>
val project = row(0).asInstanceOf[String] // Yuck.
val numRequests = row(1).asInstanceOf[Long] // Yuck.
}
Datasets are:
Like an RDD, a Dataset has a type.
// Read a DataFrame from a JSON file
val df = sqlContext.read.json("people.json")
// Convert the data to a domain object.
case class Person(name: String, age: Long)
val ds: Dataset[Person] = df.as[Person]
// |----------|
In Spark 2.0, a DataFrame is just a Dataset[Row]
.
With Datasets, you can still access a DataFrame-like query API. (You can also go back and forth between DataFrames and Datasets.)
RDDs:
val lines = sc.textFile("hdfs://path/to/some/ebook.txt")
val words = lines.flatMap(_.split("""\s+""")).filter(_.nonEmpty)
val counts = words.groupBy(_.toLowerCase).map { case (w, all) => (w, all.size) }
Datasets:
val lines = sqlContext.read.text("hdfs://path/to/some/ebook.txt").as[String]
val words = lines.flatMap(_.split("""\s+""")).filter(_.nonEmpty)
val counts = words.groupByKey(_.toLowerCase).count()
// RDD
val counts = words.groupBy(_.toLowerCase).map { case (w, all) => (w, all.size) }
// Dataset
val counts = words.groupByKey(_.toLowerCase).count()
The Dataset version can use the built-in DataFrame-like
count()
aggregator function.
The Dataset code is slightly more visually compact (less typing! yay!) and will tend to execute faster than the RDD counterpart (with caveats...)
Datasets tend to use less memory.
Spark has to serialize data ... a lot.
Let's look at some code.
demo
directory of the GitHub repo, there's a
notebooks.dbc
file you can load into Databricks Community Edition. You'll then get the
two notebooks in the demo, and you can try them yourself.
https://databricks.com/blog/2016/01/04/introducing-spark-datasets.html
http://spark.apache.org/docs/latest/sql-programming-guide.html
Try the latest version of Apache Spark and the preview of 2.0. http://databricks.com/try