Stop Thinking, Just Do!

Sungsoo Kim's Blog

DataFrame and SQL Operations

tagsTags

9 April 2015


DataFrame and SQL Operations

You can easily use DataFrames and SQL operations on streaming data. You have to create a SQLContext using the SparkContext that the StreamingContext is using. Furthermore this has to done such that it can be restarted on driver failures. This is done by creating a lazily instantiated singleton instance of SQLContext. This is shown in the following example. It modifies the earlier word count example to generate word counts using DataFrames and SQL. Each RDD is converted to a DataFrame, registered as a temporary table and then queried using SQL.

    /** Lazily instantiated singleton instance of SQLContext */
    object SQLContextSingleton {
      @transient private var instance: SQLContext = null

      // Instantiate SQLContext on demand
      def getInstance(sparkContext: SparkContext): SQLContext = synchronized {
        if (instance == null) {
          instance = new SQLContext(sparkContext)
        }
        instance
      }
    }

    ...

    /** Case class for converting RDD to DataFrame */
    case class Row(word: String)

    ...

    /** DataFrame operations inside your streaming program */

    val words: DStream[String] = ...

    words.foreachRDD { rdd =>

      // Get the singleton instance of SQLContext
      val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
      import sqlContext.implicits._

      // Convert RDD[String] to RDD[case class] to DataFrame
      val wordsDataFrame = rdd.map(w => Row(w)).toDF()

      // Register as table
      wordsDataFrame.registerTempTable("words")

      // Do word count on DataFrame using SQL and print it
      val wordCountsDataFrame = 
        sqlContext.sql("select word, count(*) as total from words group by word")
      wordCountsDataFrame.show()
    }

See the full source code.

    /** Lazily instantiated singleton instance of SQLContext */
    class JavaSQLContextSingleton {
      static private transient SQLContext instance = null;
      static public SQLContext getInstance(SparkContext sparkContext) {
        if (instance == null) {
          instance = new SQLContext(sparkContext);
        }
        return instance;
      }
    }

    ...

    /** Java Bean class for converting RDD to DataFrame */
    public class JavaRow implements java.io.Serializable {
      private String word;

      public String getWord() {
        return word;
      }

      public void setWord(String word) {
        this.word = word;
      }
    }

    ...

    /** DataFrame operations inside your streaming program */

    JavaDStream<String> words = ... 

    words.foreachRDD(
      new Function2<JavaRDD<String>, Time, Void>() {
        @Override
        public Void call(JavaRDD<String> rdd, Time time) {
          SQLContext sqlContext = JavaSQLContextSingleton.getInstance(rdd.context());

          // Convert RDD[String] to RDD[case class] to DataFrame
          JavaRDD<JavaRow> rowRDD = rdd.map(new Function<String, JavaRow>() {
            public JavaRow call(String word) {
              JavaRow record = new JavaRow();
              record.setWord(word);
              return record;
            }
          });
          DataFrame wordsDataFrame = sqlContext.createDataFrame(rowRDD, JavaRow.class);

          // Register as table
          wordsDataFrame.registerTempTable("words");

          // Do word count on table using SQL and print it
          DataFrame wordCountsDataFrame =
              sqlContext.sql("select word, count(*) as total from words group by word");
          wordCountsDataFrame.show();
          return null;
        }
      }
    );

See the full source code.

    # Lazily instantiated global instance of SQLContext
    def getSqlContextInstance(sparkContext):
        if ('sqlContextSingletonInstance' not in globals()):
            globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext)
        return globals()['sqlContextSingletonInstance']

    ...

    # DataFrame operations inside your streaming program

    words = ... # DStream of strings

    def process(time, rdd):
        print "========= %s =========" % str(time)
        try:
            # Get the singleton instance of SQLContext
            sqlContext = getSqlContextInstance(rdd.context)

            # Convert RDD[String] to RDD[Row] to DataFrame
            rowRdd = rdd.map(lambda w: Row(word=w))
            wordsDataFrame = sqlContext.createDataFrame(rowRdd)

            # Register as table
            wordsDataFrame.registerTempTable("words")

            # Do word count on table using SQL and print it
            wordCountsDataFrame = sqlContext.sql("select word, count(*) as total from words group by word")
            wordCountsDataFrame.show()
        except:
            pass

    words.foreachRDD(process)

See the full source code.

You can also run SQL queries on tables defined on streaming data from a different thread (that is, asynchronous to the running StreamingContext). Just make sure that you set the StreamingContext to remember sufficient amount of streaming data such that query can run. Otherwise the StreamingContext, which is unaware of the any asynchronous SQL queries, will delete off old streaming data before the query can complete. For example, if you want to query the last batch, but your query can take 5 minutes to run, then call streamingContext.remember(Minutes(5)) (in Scala, or equivalent in other languages).

See the DataFrames and SQL guide to learn more about DataFrames.


comments powered by Disqus