Article Source
- Title: Spark 1.5 DataFrame API Highlights: Date/Time/String Handling, Time Intervals, and UDAFs
- Authors: Michael Armbrust, Yin Huai, Davies Liu and Reynold Xin
Spark 1.5 DataFrame API Highlights
Date/Time/String Handling, Time Intervals, and UDAFs
A few days ago, we announced the release of Spark 1.5. This release contains major under-the-hood changes that improve Spark’s performance, usability, and operational stability. Besides these changes, we have been continuously improving DataFrame API. In this blog post, we’d like to highlight three major improvements to DataFrame API in Spark 1.5, which are:
- New built-in functions;
- Time intervals; and
- Experimental user-defined aggregation function (UDAF) interface.
New Built-in Functions in Spark 1.5
In Spark 1.5, we have added a comprehensive list of built-in functions to the DataFrame API, complete with optimized code generation for execution. This code generation allows pipelines that call functions to take full advantage of the efficiency changes made as part of Project Tungsten. With these new additions, Spark SQL now supports a wide range of built-in functions for various use cases, including:
Category | Functions |
---|---|
Aggregate Functions | approxCountDistinct, avg, count, countDistinct, first, last, max, mean, min, sum, sumDistinct |
Collection Functions | array_contains, explode, size, sort_array |
Date/time Functions | Date/timestamp conversion:</p>
Extracting fields from a date/timestamp value:
Date/timestamp calculation:
Misc.:
|
Math Functions | abs, acros, asin, atan, atan2, bin, cbrt, ceil, conv, cos, sosh, exp, expm1, factorial, floor, hex, hypot, log, log10, log1p, log2, pmod, pow, rint, round, shiftLeft, shiftRight, shiftRightUnsigned, signum, sin, sinh, sqrt, tan, tanh, toDegrees, toRadians, unhex |
Misc. Functions | array, bitwiseNOT, callUDF, coalesce, crc32, greatest, if, inputFileName, isNaN, isnotnull, isnull, least, lit, md5, monotonicallyIncreasingId, nanvl, negate, not, rand, randn, sha, sha1, sparkPartitionId, struct, when |
String Functions | ascii, base64, concat, concat_ws, decode, encode, format_number, format_string, get_json_object, initcap, instr, length, levenshtein, locate, lower, lpad, ltrim, printf, regexp_extract, regexp_replace, repeat, reverse, rpad, rtrim, soundex, space, split, substring, substring_index, translate, trim, unbase64, upper |
Window Functions (in addition to Aggregate Functions) | cumeDist, denseRank, lag, lead, ntile, percentRank, rank, rowNumber |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | # Create a simple DataFrame data = [ ( 234.5 , "row1" ), ( 23.45 , "row2" ), ( 2.345 , "row3" ), ( 0.2345 , "row4" )] df = sqlContext.createDataFrame(data, [ "i" , "j" ]) # Import functions provided by Spark’s DataFrame API from pyspark.sql.functions import * # Call round function directly df.select( round (df[ 'i' ], 1 ), round (df[ 'i' ], 0 ), round (df[ 'i' ], - 1 )).show() + - - - - - - - - - - + - - - - - - - - - - + - - - - - - - - - - - + | round (i, 1 )| round (i, 0 )| round (i, - 1 )| + - - - - - - - - - - + - - - - - - - - - - + - - - - - - - - - - - + | 234.5 | 235.0 | 230.0 | | 23.5 | 23.0 | 20.0 | | 2.3 | 2.0 | 0.0 | | 0.2 | 0.0 | 0.0 | + - - - - - - - - - - + - - - - - - - - - - + - - - - - - - - - - - + |
Alternatively, all of the added functions are also available from SQL using standard syntax:
SELECT
round(i, 1)
FROM
dataFrame
Finally, you can even mix and match SQL syntax with DataFrame operations by using the expr
function. By using expr
, you can construct a DataFrame column expression from a SQL expression String.
1 2 3 4 | df.select( expr( "round(i, 1) AS rounded1" ), expr( "round(i, 0) AS rounded2" ), expr( "round(i, -1) AS rounded3" )).show() |
Time Interval Literals
In the last section, we introduced several new date and time functions that were added in Spark 1.5 (e.g. datediff
, date_add
, date_sub
), but that is not the only new feature that will help users dealing with date or timestamp values. Another related feature is a new data type, interval, that allows developers to represent fixed periods of time (i.e. 1 day or 2 months) as interval literals. Using interval literals, it is possible to perform subtraction or addition of an arbitrary amount of time from a date or timestamp value. This representation can be useful when you want to add or subtract a time period from a fixed point in time. For example, users can now easily express queries like “Find all transactions that have happened during the past hour”.
An interval literal is constructed using the following syntax:
1 | INTERVAL value unit |
Breaking the above expression down, all time intervals start with the INTERVAL
keyword. Next, the value and unit together specify the time difference. Available units are YEAR
, MONTH
, uDAY
, HOUR
, MINUTE
, SECOND
, MILLISECOND
, and MICROSECOND
. For example, the following interval literal represents 3 years.
1 | INTERVAL 3 YEAR |
In addition to specifying an interval literal with a single unit, users can also combine different units. For example, the following interval literal represents a 3-year and 3-hour time difference.
1 | INTERVAL 3 YEAR 3 HOUR |
In the DataFrame API, the expr
function can be used to create a Column
representing an interval. The following code in Python is an example of using an interval literal to select records where start_time
and end_time
are in the same day and they differ by less than an hour.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | # Import functions. from pyspark.sql.functions import * # Create a simple DataFrame. data = [ ( "2015-01-01 23:59:59" , "2015-01-02 00:01:02" , 1 ), ( "2015-01-02 23:00:00" , "2015-01-02 23:59:59" , 2 ), ( "2015-01-02 22:59:58" , "2015-01-02 23:59:59" , 3 )] df = sqlContext.createDataFrame(data, [ "start_time" , "end_time" , "id" ]) df = df.select( df.start_time.cast( "timestamp" ).alias( "start_time" ), df.end_time.cast( "timestamp" ).alias( "end_time" ), df. id ) # Get all records that have a start_time and end_time in the # same day, and the difference between the end_time and start_time # is less or equal to 1 hour. condition = \ (to_date(df.start_time) = = to_date(df.end_time)) & \ (df.start_time + expr( "INTERVAL 1 HOUR" ) > = df.end_time) df. filter (condition).show() + - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - + - - - + |start_time | end_time | id | + - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - + - - - + | 2015 - 01 - 02 23 : 00 : 00.0 | 2015 - 01 - 02 23 : 59 : 59.0 | 2 | + - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - + - - - + |
User-defined Aggregate Function Interface
For power users, Spark 1.5 introduces an experimental API for user-defined aggregate functions (UDAFs). These UDAFs can be used to compute custom calculations over groups of input data (in contrast, UDFs compute a value looking at a single input row), such as calculating geometric mean or calculating the product of values for every group.
A UDAF maintains an aggregation buffer to store intermediate results for every group of input data. It updates this buffer for every input row. Once it has processed all input rows, it generates a result value based on values of the aggregation buffer.
An UDAF inherits the base class UserDefinedAggregateFunction
and implements the following eight methods, which are:
inputSchema: inputSchema
returns aStructType
and every field of this StructType represents an input argument of this UDAF.bufferSchema: bufferSchema
returns aStructType
and every field of this StructType represents a field of this UDAF’s intermediate results.dataType: dataType
returns aDataType
representing the data type of this UDAF’s returned value.deterministic: deterministic
returns a boolean indicating if this UDAF always generate the same result for a given set of input values.initialize: initialize
is used to initialize values of an aggregation buffer, represented by aMutableAggregationBuffer
.update: update
is used to update an aggregation buffer represented by aMutableAggregationBuffer
for an inputRow
.merge: merge
is used to merge two aggregation buffers and store the result to aMutableAggregationBuffer
.evaluate: evaluate
is used to generate the final result value of this UDAF based on values stored in an aggregation buffer represented by aRow
.
Below is an example UDAF implemented in Scala that calculates the geometric mean of the given set of double values. The geometric mean can be used as an indicator of the typical value of an input set of numbers by using the product of their values (as opposed to the standard builtin mean which is based on the sum of the input values). For the purpose of simplicity, null handling logic is not shown in the following code.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | import org.apache.spark.sql.expressions.MutableAggregationBuffer import org.apache.spark.sql.expressions.UserDefinedAggregateFunction import org.apache.spark.sql.Row import org.apache.spark.sql.types. _ class GeometricMean extends UserDefinedAggregateFunction { def inputSchema : org.apache.spark.sql.types.StructType = StructType(StructField( "value" , DoubleType) :: Nil) def bufferSchema : StructType = StructType( StructField( "count" , LongType) :: StructField( "product" , DoubleType) :: Nil ) def dataType : DataType = DoubleType def deterministic : Boolean = true def initialize(buffer : MutableAggregationBuffer) : Unit = { buffer( 0 ) = 0 L buffer( 1 ) = 1.0 } def update(buffer : MutableAggregationBuffer,input : Row) : Unit = { buffer( 0 ) = buffer.getAs[Long]( 0 ) + 1 buffer( 1 ) = buffer.getAs[Double]( 1 ) * input.getAs[Double]( 0 ) } def merge(buffer 1 : MutableAggregationBuffer, buffer 2 : Row) : Unit = { buffer 1 ( 0 ) = buffer 1 .getAs[Long]( 0 ) + buffer 2 .getAs[Long]( 0 ) buffer 1 ( 1 ) = buffer 1 .getAs[Double]( 1 ) * buffer 2 .getAs[Double]( 1 ) } def evaluate(buffer : Row) : Any = { math.pow(buffer.getDouble( 1 ), 1 .toDouble / buffer.getLong( 0 )) } } |
A UDAF can be used in two ways. First, an instance of a UDAF can be used immediately as a function. Second, users can register a UDAF to Spark SQL’s function registry and call this UDAF by the assigned name. The example code is shown below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | import org.apache.spark.sql.functions. _ // Create a simple DataFrame with a single column called "id" // containing number 1 to 10. val df = sqlContext.range( 1 , 11 ) // Create an instance of UDAF GeometricMean. val gm = new GeometricMean // Show the geometric mean of values of column "id". df.groupBy().agg(gm(col( "id" )).as( "GeometricMean" )).show() // Register the UDAF and call it "gm". sqlContext.udf.register( "gm" , gm) // Invoke the UDAF by its assigned name. df.groupBy().agg(expr( "gm(id) as GeometricMean" )).show() |
Summary
In this blog post, we introduced three major additions to DataFrame APIs, a set of built-in functions, time interval literals, and user-defined aggregation function interface. With new built-in functions, it is easier to manipulate string data and data/timestamp data, and to apply math operations. If your existing programs use any user-defined functions that do the same work with these built-in functions, we strongly recommend you to migrate your code to these new built-in functions to take full advantage of the efficiency changes made as part of Project Tungsten. Combining date/time functions and interval literals, it is much easier to work with date/timestamp data and to calculate date/timestamp values for various use cases. With user-defined aggregate function, users can apply custom aggregations over groups of input data in the DataFrame API.
To try new these new features, download Spark 1.5 or sign up Databricks for a free trial.