Basic Concepts
Next, we move beyond the simple example and elaborate on the basics of Spark Streaming.
Linking
Similar to Spark, Spark Streaming is available through Maven Central. To write your own Spark Streaming program, you will have to add the following dependency to your SBT or Maven project.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.3.0</version>
</dependency>
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.3.0"
For ingesting data from sources like Kafka, Flume, and Kinesis that are
not present in the Spark Streaming core API, you will have to add the
corresponding artifact spark-streaming-xyz_2.10
to the dependencies.
For example, some of the common ones are as follows.
------------------------------------------------------------------------
Source Artifact
---------- -------------------------------------------------------------
Kafka spark-streaming-kafka\_2.10
Flume spark-streaming-flume\_2.10
Kinesis\ spark-streaming-kinesis-asl\_2.10 [Amazon Software License]
Twitter spark-streaming-twitter\_2.10
ZeroMQ spark-streaming-zeromq\_2.10
MQTT spark-streaming-mqtt\_2.10
------------------------------------------------------------------------
For an up-to-date list, please refer to the Maven repository for the full list of supported sources and artifacts.
Initializing StreamingContext
To initialize a Spark Streaming program, a StreamingContext object has to be created which is the main entry point of all Spark Streaming functionality.
Scala
A StreamingContext object can be created from a SparkConf object.
import org.apache.spark._
import org.apache.spark.streaming._
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
The appName
parameter is a name for your application to show on the
cluster UI. master
is a Spark, Mesos or YARN cluster
URL, or a special
“local[*]” string to run in local mode. In practice, when running
on a cluster, you will not want to hardcode master
in the program, but
rather launch the application with
spark-submit
and receive it there.
However, for local testing and unit tests, you can pass “local[*]” to
run Spark Streaming in-process (detects the number of cores in the local
system). Note that this internally creates a
SparkContext
(starting point of all Spark functionality) which can be accessed as
ssc.sparkContext
.
The batch interval must be set based on the latency requirements of your application and available cluster resources. See the Performance Tuning section for more details.
A StreamingContext
object can also be created from an existing
SparkContext
object.
import org.apache.spark.streaming._
val sc = ... // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))
Java
A JavaStreamingContext object can be created from a SparkConf object.
import org.apache.spark.*;
import org.apache.spark.streaming.api.java.*;
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaStreamingContext ssc = new JavaStreamingContext(conf, Duration(1000));
The appName
parameter is a name for your application to show on the
cluster UI. master
is a Spark, Mesos or YARN cluster
URL, or a special
“local[*]” string to run in local mode. In practice, when running
on a cluster, you will not want to hardcode master
in the program, but
rather launch the application with
spark-submit
and receive it there.
However, for local testing and unit tests, you can pass “local[*]” to
run Spark Streaming in-process. Note that this internally creates a
JavaSparkContext
(starting point of all Spark functionality) which can be accessed as
ssc.sparkContext
.
The batch interval must be set based on the latency requirements of your application and available cluster resources. See the Performance Tuning section for more details.
A JavaStreamingContext
object can also be created from an existing
JavaSparkContext
.
import org.apache.spark.streaming.api.java.*;
JavaSparkContext sc = ... //existing JavaSparkContext
JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));
Python
A StreamingContext object can be created from a SparkContext object.
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext(master, appName)
ssc = StreamingContext(sc, 1)
The appName
parameter is a name for your application to show on the
cluster UI. master
is a Spark, Mesos or YARN cluster
URL, or a special
“local[*]” string to run in local mode. In practice, when running
on a cluster, you will not want to hardcode master
in the program, but
rather launch the application with
spark-submit
and receive it there.
However, for local testing and unit tests, you can pass “local[*]” to
run Spark Streaming in-process (detects the number of cores in the local
system).
The batch interval must be set based on the latency requirements of your application and available cluster resources. See the Performance Tuning section for more details.
After a context is defined, you have to do the following.
- Define the input sources by creating input DStreams.
- Define the streaming computations by applying transformation and output operations to DStreams.
- Start receiving data and processing it using
streamingContext.start()
. - Wait for the processing to be stopped (manually or due to any error)
using
streamingContext.awaitTermination()
. - The processing can be manually stopped using
streamingContext.stop()
.
Points to remember:
- Once a context has been started, no new streaming computations can be set up or added to it.
- Once a context has been stopped, it cannot be restarted.
- Only one StreamingContext can be active in a JVM at the same time.
- stop() on StreamingContext also stops the SparkContext. To stop only
the StreamingContext, set optional parameter of
stop()
calledstopSparkContext
to false. - A SparkContext can be re-used to create multiple StreamingContexts, as long as the previous StreamingContext is stopped (without stopping the SparkContext) before the next StreamingContext is created.