Stop Thinking, Just Do!

Sungsoo Kim's Blog

Spark Lab 3

tagsTags

5 September 2015


Spark Lab 3

Overview

For this lab we’ll reuse the bike trips dataset. We have some helper Scala classes which we’ll use to parse each line of the CSV files instead of working with raw arrays.

The objective for this lab is to familiarize yourself with the different operations and their impact to performance and job scheduling. You’ll have a better understanding of using some of the more advanced Pair RDD operations and when to avoid groupByKey.

import java.text.SimpleDateFormat
import java.sql.Timestamp

object utils extends Serializable {
    case class Trip(
      id: Int,
      duration: Int,
      startDate: Timestamp,
      startStation: String,
      startTerminal: Int,
      endDate: Timestamp,
      endStation: String,
      endTerminal: Int,
      bike: Int,
      subscriberType: String,
      zipCode: Option[String]
    )

object Trip {
      def parse(i: Array[String]) = {
        val fmt = new SimpleDateFormat("M/d/yyyy HH:mm")
        val zip = i.length match { // zip is optional
          case 11 => Some(i(10))
          case _ => None
        }
        Trip(i(0).toInt, i(1).toInt, new Timestamp(fmt.parse(i(2)).getTime), i(3), i(4).toInt, new Timestamp(fmt.parse(i(5)).getTime), i(6), i(7).toInt, i(8).toInt, i(9), zip)
      }
    }
    case class Station(
      id: Int,
      name: String,
      lat: Double,
      lon: Double,
      docks: Int,
      landmark: String,
      installDate: Timestamp
    )

object Station {
      def parse(i: Array[String]) = {
        val fmt = new SimpleDateFormat("M/d/yyyy")
        Station(i(0).toInt, i(1), i(2).toDouble, i(3).toDouble, i(4).toInt, i(5), new Timestamp(fmt.parse(i(6)).getTime))
      }
    }
}

val input1 = sc.textFile("/user/student/data/trips/*")

val header1 = input1.first // to skip the header row

val trips = input1.filter(_!=header1).map(_.split(",")).map(utils.Trip.parse(_))

val input2 = sc.textFile("/user/student/data/stations/*")

val header2 = input2.first // to skip the header row

val stations = input2.filter(_!=header2).map(_.split(",")).map(utils.Station.parse(_))

val byStartTerminal = trips.keyBy(_.startStation)
val durationByStart = byStartTerminal.mapValues(_.duration)
//val grouped = durationByStart.groupByKey().mapValues(list => list.sum / list.size)

//grouped.toDebugString
//grouped.take(10).foreach(println)

val results = durationByStart.aggregateByKey((0,0))((acc, value) => (acc._1 + value, acc._2 + 1), (acc1, acc2) => (acc1._1 + acc2._2, acc1._2 + acc2._2))

val finalAvg = results.mapValues(i=>i._1 / i._2)
finalAvg.take(10).foreach(println)

comments powered by Disqus