Spark Lab 3
Overview
For this lab we’ll reuse the bike trips dataset. We have some helper Scala classes which we’ll use to parse each line of the CSV files instead of working with raw arrays.
The objective for this lab is to familiarize yourself with the different operations and their impact to performance and job scheduling. You’ll have a better understanding of using some of the more advanced Pair RDD operations and when to avoid groupByKey.
import java.text.SimpleDateFormat
import java.sql.Timestamp
object utils extends Serializable {
case class Trip(
id: Int,
duration: Int,
startDate: Timestamp,
startStation: String,
startTerminal: Int,
endDate: Timestamp,
endStation: String,
endTerminal: Int,
bike: Int,
subscriberType: String,
zipCode: Option[String]
)
object Trip {
def parse(i: Array[String]) = {
val fmt = new SimpleDateFormat("M/d/yyyy HH:mm")
val zip = i.length match { // zip is optional
case 11 => Some(i(10))
case _ => None
}
Trip(i(0).toInt, i(1).toInt, new Timestamp(fmt.parse(i(2)).getTime), i(3), i(4).toInt, new Timestamp(fmt.parse(i(5)).getTime), i(6), i(7).toInt, i(8).toInt, i(9), zip)
}
}
case class Station(
id: Int,
name: String,
lat: Double,
lon: Double,
docks: Int,
landmark: String,
installDate: Timestamp
)
object Station {
def parse(i: Array[String]) = {
val fmt = new SimpleDateFormat("M/d/yyyy")
Station(i(0).toInt, i(1), i(2).toDouble, i(3).toDouble, i(4).toInt, i(5), new Timestamp(fmt.parse(i(6)).getTime))
}
}
}
val input1 = sc.textFile("/user/student/data/trips/*")
val header1 = input1.first // to skip the header row
val trips = input1.filter(_!=header1).map(_.split(",")).map(utils.Trip.parse(_))
val input2 = sc.textFile("/user/student/data/stations/*")
val header2 = input2.first // to skip the header row
val stations = input2.filter(_!=header2).map(_.split(",")).map(utils.Station.parse(_))
val byStartTerminal = trips.keyBy(_.startStation)
val durationByStart = byStartTerminal.mapValues(_.duration)
//val grouped = durationByStart.groupByKey().mapValues(list => list.sum / list.size)
//grouped.toDebugString
//grouped.take(10).foreach(println)
val results = durationByStart.aggregateByKey((0,0))((acc, value) => (acc._1 + value, acc._2 + 1), (acc1, acc2) => (acc1._1 + acc2._2, acc1._2 + acc2._2))
val finalAvg = results.mapValues(i=>i._1 / i._2)
finalAvg.take(10).foreach(println)