Stop Thinking, Just Do!

Sung-Soo Kim's Blog

Transformations on DStreams

tagsTags

6 April 2015


Transformations on DStreams

Similar to that of RDDs, transformations allow the data from the input DStream to be modified. DStreams support many of the transformations available on normal Spark RDD’s. Some of the common ones are as follows.

TransformationMeaning
map(func) Return a new DStream by passing each element of the source DStream through a function func.
flatMap(func) Similar to map, but each input item can be mapped to 0 or more output items.
filter(func) Return a new DStream by selecting only the records of the source DStream on which func returns true.
repartition(numPartitions) Changes the level of parallelism in this DStream by creating more or fewer partitions.
union(otherStream) Return a new DStream that contains the union of the elements in the source DStream and otherDStream.
count() Return a new DStream of single-element RDDs by counting the number of elements in each RDD of the source DStream.
reduce(func) Return a new DStream of single-element RDDs by aggregating the elements in each RDD of the source DStream using a function func (which takes two arguments and returns one). The function should be associative so that it can be computed in parallel.
countByValue() When called on a DStream of elements of type K, return a new DStream of (K, Long) pairs where the value of each key is its frequency in each RDD of the source DStream.
reduceByKey(func, [numTasks]) When called on a DStream of (K, V) pairs, return a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function. Note: By default, this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks.
join(otherStream, [numTasks]) When called on two DStreams of (K, V) and (K, W) pairs, return a new DStream of (K, (V, W)) pairs with all pairs of elements for each key.
cogroup(otherStream, [numTasks]) When called on DStream of (K, V) and (K, W) pairs, return a new DStream of (K, Seq[V], Seq[W]) tuples.
transform(func) Return a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream. This can be used to do arbitrary RDD operations on the DStream.
updateStateByKey(func) Return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values for the key. This can be used to maintain arbitrary state data for each key.

A few of these transformations are worth discussing in more detail.

UpdateStateByKey Operation

The updateStateByKey operation allows you to maintain arbitrary state while continuously updating it with new information. To use this, you will have to do two steps.

  1. Define the state - The state can be of arbitrary data type.
  2. Define the state update function - Specify with a function how to update the state using the previous state and the new values from input stream.

Let’s illustrate this with an example. Say you want to maintain a running count of each word seen in a text data stream. Here, the running count is the state and it is an integer. We define the update function as

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
	val newCount = ...  // add the new values with the previous running count to get the new count
	Some(newCount)
}

This is applied on a DStream containing words (say, the pairs DStream containing (word, 1) pairs in the earlier example).

val runningCounts = pairs.updateStateByKey[Int](updateFunction _)

The update function will be called for each word, with newValues having a sequence of 1’s (from the (word, 1) pairs) and the runningCount having the previous count. For the complete Scala code, take a look at the example StatefulNetworkWordCount.scala.

    import com.google.common.base.Optional;
    Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =
      new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
        @Override public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
          Integer newSum = ...  // add the new values with the previous running count to get the new count
          return Optional.of(newSum);
        }
      };

This is applied on a DStream containing words (say, the pairs DStream containing (word, 1) pairs in the quick example).

    JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(updateFunction);

The update function will be called for each word, with newValues having a sequence of 1’s (from the (word, 1) pairs) and the runningCount having the previous count. For the complete Java code, take a look at the example JavaStatefulNetworkWordCount.java.

    def updateFunction(newValues, runningCount):
        if runningCount is None:
           runningCount = 0
        return sum(newValues, runningCount)  # add the new values with the previous running count to get the new count

This is applied on a DStream containing words (say, the pairs DStream containing (word, 1) pairs in the earlier example).

    runningCounts = pairs.updateStateByKey(updateFunction)

The update function will be called for each word, with newValues having a sequence of 1’s (from the (word, 1) pairs) and the runningCount having the previous count. For the complete Python code, take a look at the example stateful_network_wordcount.py.

Note that using updateStateByKey requires the checkpoint directory to be configured, which is discussed in detail in the checkpointing section.

Transform Operation

The transform operation (along with its variations like transformWith) allows arbitrary RDD-to-RDD functions to be applied on a DStream. It can be used to apply any RDD operation that is not exposed in the DStream API. For example, the functionality of joining every batch in a data stream with another dataset is not directly exposed in the DStream API. However, you can easily use transform to do this. This enables very powerful possibilities. For example, if you want to do real-time data cleaning by joining the input data stream with precomputed spam information (maybe generated with Spark as well) and then filtering based on it.

    val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information

    val cleanedDStream = wordCounts.transform(rdd => {
      rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
      ...
    })

    import org.apache.spark.streaming.api.java.*;
    // RDD containing spam information
    final JavaPairRDD<String, Double> spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...);

    JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(
      new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>>() {
        @Override public JavaPairRDD<String, Integer> call(JavaPairRDD<String, Integer> rdd) throws Exception {
          rdd.join(spamInfoRDD).filter(...); // join data stream with spam information to do data cleaning
          ...
        }
      });

    spamInfoRDD = sc.pickleFile(...) # RDD containing spam information

    # join data stream with spam information to do data cleaning
    cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...))

In fact, you can also use machine learning and graph computation algorithms in the transform method.

Window Operations

Spark Streaming also provides windowed computations, which allow you to apply transformations over a sliding window of data. This following figure illustrates this sliding window.

Spark
Streaming

As shown in the figure, every time the window slides over a source DStream, the source RDDs that fall within the window are combined and operated upon to produce the RDDs of the windowed DStream. In this specific case, the operation is applied over last 3 time units of data, and slides by 2 time units. This shows that any window operation needs to specify two parameters.

  • window length - The duration of the window (3 in the figure)
  • sliding interval - The interval at which the window operation is performed (2 in the figure).

These two parameters must be multiples of the batch interval of the source DStream (1 in the figure).

Let’s illustrate the window operations with an example. Say, you want to extend the earlier example by generating word counts over last 30 seconds of data, every 10 seconds. To do this, we have to apply the reduceByKey operation on the pairs DStream of (word, 1) pairs over the last 30 seconds of data. This is done using the operation reduceByKeyAndWindow.

    // Reduce last 30 seconds of data, every 10 seconds
    val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))

    // Reduce function adding two integers, defined separately for clarity
    Function2<Integer, Integer, Integer> reduceFunc = new Function2<Integer, Integer, Integer>() {
      @Override public Integer call(Integer i1, Integer i2) throws Exception {
        return i1 + i2;
      }
    };

    // Reduce last 30 seconds of data, every 10 seconds
    JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow(reduceFunc, Durations.seconds(30), Durations.seconds(10));

    # Reduce last 30 seconds of data, every 10 seconds
    windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)

Some of the common window operations are as follows. All of these operations take the said two parameters - windowLength and slideInterval.

TransformationMeaning
window(windowLength, slideInterval) Return a new DStream which is computed based on windowed batches of the source DStream.
countByWindow(windowLength, slideInterval) Return a sliding window count of elements in the stream.
reduceByWindow(func, windowLength, slideInterval) Return a new single-element stream, created by aggregating elements in the stream over a sliding interval using func. The function should be associative so that it can be computed correctly in parallel.
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) When called on a DStream of (K, V) pairs, returns a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function func over batches in a sliding window. Note: By default, this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks.
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) A more efficient version of the above reduceByKeyAndWindow() where the reduce value of each window is calculated incrementally using the reduce values of the previous window. This is done by reducing the new data that enter the sliding window, and "inverse reducing" the old data that leave the window. An example would be that of "adding" and "subtracting" counts of keys as the window slides. However, it is applicable to only "invertible reduce functions", that is, those reduce functions which have a corresponding "inverse reduce" function (taken as parameter invFunc. Like in reduceByKeyAndWindow, the number of reduce tasks is configurable through an optional argument. Note that [checkpointing](#checkpointing) must be enabled for using this operation.
countByValueAndWindow(windowLength, slideInterval, [numTasks]) When called on a DStream of (K, V) pairs, returns a new DStream of (K, Long) pairs where the value of each key is its frequency within a sliding window. Like in reduceByKeyAndWindow, the number of reduce tasks is configurable through an optional argument.

Join Operations

Finally, its worth highlighting how easily you can perform different kinds of joins in Spark Streaming.

Stream-stream joins

Streams can be very easily joined with other streams.

    val stream1: DStream[String, String] = ...
    val stream2: DStream[String, String] = ...
    val joinedStream = stream1.join(stream2)

    JavaPairDStream<String, String> stream1 = ...
    JavaPairDStream<String, String> stream2 = ...
    JavaPairDStream<String, String> joinedStream = stream1.join(stream2);

    stream1 = ...
    stream2 = ...
    joinedStream = stream1.join(stream2)

Here, in each batch interval, the RDD generated by stream1 will be joined with the RDD generated by stream2. You can also do leftOuterJoin, rightOuterJoin, fullOuterJoin. Furthermore, it is often very useful to do joins over windows of the streams. That is pretty easy as well.

    val windowedStream1 = stream1.window(Seconds(20))
    val windowedStream2 = stream2.window(Minutes(1))
    val joinedStream = windowedStream1.join(windowedStream2)

    JavaPairDStream<String, String> windowedStream1 = stream1.window(Durations.seconds(20));
    JavaPairDStream<String, String> windowedStream2 = stream2.window(Durations.minutes(1));
    JavaPairDStream<String, String> joinedStream = windowedStream1.join(windowedStream2);

    windowedStream1 = stream1.window(20)
    windowedStream2 = stream2.window(60)
    joinedStream = windowedStream1.join(windowedStream2)

Stream-dataset joins

This has already been shown earlier while explain DStream.transform operation. Here is yet another example of joining a windowed stream with a dataset.

    val dataset: RDD[String, String] = ...
    val windowedStream = stream.window(Seconds(20))...
    val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }

    JavaPairRDD<String, String> dataset = ...
    JavaPairDStream<String, String> windowedStream = stream.window(Durations.seconds(20));
    JavaPairDStream<String, String> joinedStream = windowedStream.transform(
        new Function<JavaRDD<Tuple2<String, String>>, JavaRDD<Tuple2<String, String>>>() {
            @Override 
            public JavaRDD<Tuple2<String, String>> call(JavaRDD<Tuple2<String, String>> rdd) {
                return rdd.join(dataset);
            }
        }
    );

    dataset = ... # some RDD
    windowedStream = stream.window(20)
    joinedStream = windowedStream.transform(lambda rdd: rdd.join(dataset))

In fact, you can also dynamically change the dataset you want to join against. The function provided to transform is evaluated every batch interval and therefore will use the current dataset that dataset reference points to.

The complete list of DStream transformations is available in the API documentation. For the Scala API, see DStream and PairDStreamFunctions. For the Java API, see JavaDStream and JavaPairDStream. For the Python API, see DStream.


comments powered by Disqus