Stop Thinking, Just Do!

Sung-Soo Kim's Blog

Spark 1.5 DataFrame API Highlights

tagsTags

23 September 2015


Article Source


Spark 1.5 DataFrame API Highlights

Date/Time/String Handling, Time Intervals, and UDAFs

A few days ago, we announced the release of Spark 1.5. This release contains major under-the-hood changes that improve Spark’s performance, usability, and operational stability. Besides these changes, we have been continuously improving DataFrame API. In this blog post, we’d like to highlight three major improvements to DataFrame API in Spark 1.5, which are:

  • New built-in functions;
  • Time intervals; and
  • Experimental user-defined aggregation function (UDAF) interface.

New Built-in Functions in Spark 1.5

In Spark 1.5, we have added a comprehensive list of built-in functions to the DataFrame API, complete with optimized code generation for execution. This code generation allows pipelines that call functions to take full advantage of the efficiency changes made as part of Project Tungsten. With these new additions, Spark SQL now supports a wide range of built-in functions for various use cases, including:

</tbody> </table> For all available built-in functions, please refer to our API docs ([Scala Doc](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$), [Java Doc](https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html), and [Python Doc](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions)). Unlike normal functions, which execute immediately and return a result, DataFrame functions return a Column, that will be evaluated inside of a parallel job. These columns can be used inside of DataFrame operations, such as select, filter, groupBy, etc. The input to a function can either be another Column (i.e. df['columnName']) or a literal value (i.e. a constant value). To make this more concrete, let’s look at the syntax for calling the round function in Python. round is a function that rounds a numeric value to the specified precision. When the given precision is a positive number, a given input numeric value is rounded to the decimal position specified by the precision. When the specified precision is a zero or a negative number, a given input numeric value is rounded to the position of the integral part specified by the precision.
Category Functions
Aggregate Functions approxCountDistinct, avg, count, countDistinct, first, last, max, mean, min, sum, sumDistinct
Collection Functions array_contains, explode, size, sort_array
Date/time Functions Date/timestamp conversion:</p>

unix_timestamp, from_unixtime, to_date, quarter, day, dayofyear, weekofyear, from_utc_timestamp, to_utc_timestamp

Extracting fields from a date/timestamp value:

year, month, dayofmonth, hour, minute, second

Date/timestamp calculation:

datediff, date_add, date_sub, add_months, last_day, next_day, months_between

Misc.:

current_date, current_timestamp, trunc, date_format</td> </tr>

Math Functions abs, acros, asin, atan, atan2, bin, cbrt, ceil, conv, cos, sosh, exp, expm1, factorial, floor, hex, hypot, log, log10, log1p, log2, pmod, pow, rint, round, shiftLeft, shiftRight, shiftRightUnsigned, signum, sin, sinh, sqrt, tan, tanh, toDegrees, toRadians, unhex
Misc. Functions array, bitwiseNOT, callUDF, coalesce, crc32, greatest, if, inputFileName, isNaN, isnotnull, isnull, least, lit, md5, monotonicallyIncreasingId, nanvl, negate, not, rand, randn, sha, sha1, sparkPartitionId, struct, when
String Functions ascii, base64, concat, concat_ws, decode, encode, format_number, format_string, get_json_object, initcap, instr, length, levenshtein, locate, lower, lpad, ltrim, printf, regexp_extract, regexp_replace, repeat, reverse, rpad, rtrim, soundex, space, split, substring, substring_index, translate, trim, unbase64, upper
Window Functions (in addition to Aggregate Functions) cumeDist, denseRank, lag, lead, ntile, percentRank, rank, rowNumber
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# Create a simple DataFrame
data = [
  (234.5, "row1"),
  (23.45, "row2"),
  (2.345, "row3"),
  (0.2345, "row4")]
df = sqlContext.createDataFrame(data, ["i", "j"])
 
# Import functions provided by Spark’s DataFrame API
from pyspark.sql.functions import *
 
# Call round function directly
df.select(
  round(df['i'], 1),
  round(df['i'], 0),
  round(df['i'], -1)).show()
 
 
+----------+----------+-----------+
|round(i,1)|round(i,0)|round(i,-1)|
+----------+----------+-----------+
|     234.5|     235.0|      230.0|
|      23.5|      23.0|       20.0|
|       2.3|       2.0|        0.0|
|       0.2|       0.0|        0.0|
+----------+----------+-----------+

Alternatively, all of the added functions are also available from SQL using standard syntax:

1
</td>
SELECT round(i, 1) FROM dataFrame
</tr></tbody></table></div></div>

Finally, you can even mix and match SQL syntax with DataFrame operations by using the expr function. By using expr, you can construct a DataFrame column expression from a SQL expression String.

1
2
3
4
df.select(
  expr("round(i, 1) AS rounded1"),
  expr("round(i, 0) AS rounded2"),
  expr("round(i, -1) AS rounded3")).show()

Time Interval Literals

In the last section, we introduced several new date and time functions that were added in Spark 1.5 (e.g. datediff, date_add, date_sub), but that is not the only new feature that will help users dealing with date or timestamp values. Another related feature is a new data type, interval, that allows developers to represent fixed periods of time (i.e. 1 day or 2 months) as interval literals. Using interval literals, it is possible to perform subtraction or addition of an arbitrary amount of time from a date or timestamp value. This representation can be useful when you want to add or subtract a time period from a fixed point in time. For example, users can now easily express queries like “Find all transactions that have happened during the past hour”.

An interval literal is constructed using the following syntax:

1
INTERVAL value unit

Breaking the above expression down, all time intervals start with the INTERVAL keyword. Next, the value and unit together specify the time difference. Available units are YEAR, MONTH, uDAY, HOUR, MINUTE, SECOND, MILLISECOND, and MICROSECOND. For example, the following interval literal represents 3 years.

1
INTERVAL 3 YEAR

In addition to specifying an interval literal with a single unit, users can also combine different units. For example, the following interval literal represents a 3-year and 3-hour time difference.

1
INTERVAL 3 YEAR 3 HOUR

In the DataFrame API, the expr function can be used to create a Column representing an interval. The following code in Python is an example of using an interval literal to select records where start_time and end_time are in the same day and they differ by less than an hour.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# Import functions.
from pyspark.sql.functions import *
 
# Create a simple DataFrame.
data = [
  ("2015-01-01 23:59:59", "2015-01-02 00:01:02", 1),
  ("2015-01-02 23:00:00", "2015-01-02 23:59:59", 2),
  ("2015-01-02 22:59:58", "2015-01-02 23:59:59", 3)]
df = sqlContext.createDataFrame(data, ["start_time", "end_time", "id"])
df = df.select(
  df.start_time.cast("timestamp").alias("start_time"),
  df.end_time.cast("timestamp").alias("end_time"),
  df.id)
 
# Get all records that have a start_time and end_time in the
# same day, and the difference between the end_time and start_time
# is less or equal to 1 hour.
condition = \
  (to_date(df.start_time) == to_date(df.end_time)) & \
  (df.start_time + expr("INTERVAL 1 HOUR") >= df.end_time)
 
df.filter(condition).show()
+---------------------+---------------------+---+
|start_time           |            end_time |id |
+---------------------+---------------------+---+
|2015-01-02 23:00:00.0|2015-01-02 23:59:59.0|2  |
+---------------------+---------------------+---+

User-defined Aggregate Function Interface

For power users, Spark 1.5 introduces an experimental API for user-defined aggregate functions (UDAFs). These UDAFs can be used to compute custom calculations over groups of input data (in contrast, UDFs compute a value looking at a single input row), such as calculating geometric mean or calculating the product of values for every group.

A UDAF maintains an aggregation buffer to store intermediate results for every group of input data. It updates this buffer for every input row. Once it has processed all input rows, it generates a result value based on values of the aggregation buffer.

An UDAF inherits the base class UserDefinedAggregateFunction and implements the following eight methods, which are:

Below is an example UDAF implemented in Scala that calculates the geometric mean of the given set of double values. The geometric mean can be used as an indicator of the typical value of an input set of numbers by using the product of their values (as opposed to the standard builtin mean which is based on the sum of the input values). For the purpose of simplicity, null handling logic is not shown in the following code.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
 
class GeometricMean extends UserDefinedAggregateFunction {
  def inputSchema: org.apache.spark.sql.types.StructType =
    StructType(StructField("value", DoubleType) :: Nil)
 
  def bufferSchema: StructType = StructType(
    StructField("count", LongType) ::
    StructField("product", DoubleType) :: Nil
  )
 
  def dataType: DataType = DoubleType
 
  def deterministic: Boolean = true
 
  def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = 0L
    buffer(1) = 1.0
  }
 
  def update(buffer: MutableAggregationBuffer,input: Row): Unit = {
    buffer(0) = buffer.getAs[Long](0) + 1
    buffer(1) = buffer.getAs[Double](1) * input.getAs[Double](0)
  }
 
  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getAs[Long](0) + buffer2.getAs[Long](0)
    buffer1(1) = buffer1.getAs[Double](1) * buffer2.getAs[Double](1)
  }
 
  def evaluate(buffer: Row): Any = {
    math.pow(buffer.getDouble(1), 1.toDouble / buffer.getLong(0))
  }
}

A UDAF can be used in two ways. First, an instance of a UDAF can be used immediately as a function. Second, users can register a UDAF to Spark SQL’s function registry and call this UDAF by the assigned name. The example code is shown below.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import org.apache.spark.sql.functions._
// Create a simple DataFrame with a single column called "id"
// containing number 1 to 10.
val df = sqlContext.range(1, 11)
 
// Create an instance of UDAF GeometricMean.
val gm = new GeometricMean
 
// Show the geometric mean of values of column "id".
df.groupBy().agg(gm(col("id")).as("GeometricMean")).show()
 
// Register the UDAF and call it "gm".
sqlContext.udf.register("gm", gm)
// Invoke the UDAF by its assigned name.
df.groupBy().agg(expr("gm(id) as GeometricMean")).show()

Summary

In this blog post, we introduced three major additions to DataFrame APIs, a set of built-in functions, time interval literals, and user-defined aggregation function interface. With new built-in functions, it is easier to manipulate string data and data/timestamp data, and to apply math operations. If your existing programs use any user-defined functions that do the same work with these built-in functions, we strongly recommend you to migrate your code to these new built-in functions to take full advantage of the efficiency changes made as part of Project Tungsten. Combining date/time functions and interval literals, it is much easier to work with date/timestamp data and to calculate date/timestamp values for various use cases. With user-defined aggregate function, users can apply custom aggregations over groups of input data in the DataFrame API.

To try new these new features, download Spark 1.5 or sign up Databricks for a free trial.


comments powered by Disqus