Stop Thinking, Just Do!

Sungsoo Kim's Blog

Runtime API in Apache Tez

tagsTags

16 January 2014


Overview

Apache Tez models data processing as a dataflow graph, with the vertices in the graph representing processing of data and edges representing movement of data between the processing. Thus user logic, that analyses and modifies the data, sits in the vertices. Edges determine the consumer of the data, how the data is transferred and the dependency between the producer and consumer vertices.

For users of MapReduce (MR), the most primitive functionality that Tez can provide is an ability to run a chain of Reduce stages as compared to a single Reduce stage in the current MR implementation. Via the Task API, Tez can do this and much more by facilitating execution of any form of processing logic that does not need to be retrofitted into a Map or Reduce task and also by supporting multiple options of data transfer between different vertices that are not restricted to the MapReduce shuffle transport mechanism.

The Building Blocks of Tez

The Task API provides the building blocks for a user to plug-in their logic to analyze and modify data into the vertex and augment this processing logic with the necessary plugins to transfer and route data between vertices.

Tez models the user logic running in each vertex as a composition of a set of Inputs, a Processor and a set of Outputs.

  • Input: An input represents a pipe through which a processor can accept input data from a data source such as HDFS or the output generated by another vertex.
  • Processor: The entity responsible for consuming one or more Inputs and producing one or more Outputs.
  • Output: An output represents a pipe through which a processor can generate output data for another vertex to consume or to a data sink such as HDFS.

Given that an edge in a DAG is a logical entity that represents a number of physical connections between the tasks of 2 connected vertices, to improve ease of programmability for a developer implementing a new Processor, there are 2 kinds of Inputs and Outputs to either expose or hide the level of complexity:

  • Logical: A corresponding pair of a LogicalInput and a LogicalOutput represent the logical edge between 2 vertices. The implementation of Logical objects hides all the underlying physical connections and exposes a single view to the data.
  • Physical: The pair of Physical Input and Output represents the connection between a task of the Source vertex and a task of a Destination vertex.

An example of the Reduce stage within an MR job would be a Reduce Processor that receives data from the maps via ShuffleInput and generates output to HDFS. Likewise, an intermediate Reduce stage in an MRR chain would be quite similar to the final Reduce stage except for the difference in the Output type.

tez1

Tez Runtime API

tez3

To implement a new Input, Processor or Output, a user to implement the appropriate interfaces mentioned above. All objects are given a Context object in their initialize functions. This context is the hook for these objects to communicate to the Tez framework. The Inputs and Outputs are expected to provide implementations for their respective Readers and Writers which are then used by the Processor to read/write data. In a task, after the Tez framework has initialized all the necessary Inputs, Outputs and the Processor, the Tez framework invokes the Processor’s run function and passes the appropriate handles to all the Inputs and Outputs for that particular task.

Tez allows all inputs and outputs to be pluggable. This requires support for passing of information from the Output of a source vertex to the Input of the destination vertex. For example, let us assume that the Output of a source vertex writes all of its data to a key-value store. The Output would need to communicate the “key” to the Input of the next stage so that the Input can retrieve the correct data from the key-value store. To facilitate this, Tez uses Events.

Events in Tez

Events in Tez are a way to pass information amongst different components.

  • The Tez framework uses Events to pass information of system events such as task failures to the required components.
  • Inputs of a vertex can inform the framework of any failures encountered when trying to retrieve data from the source vertex’s Output that in turn can be used by the framework to take failure recovery measures.
  • An Output can pass information of the location of the data, which it generates, to the Inputs of the destination vertex. An example of this is described in the Shuffle Event diagram which shows how the output of a Map stage informs the Shuffle Input of the Reduce stage of the location of its output via a Data Movement Event.

tez2

Another use of Events is to enable run-time changes to the DAG execution plan. For example, based on the amount of the data being generated by a Map stage, it may be more optimal to run less reduce tasks within the following Reduce stage. Events generated by Outputs are routed to the pluggable Vertex/Edge management modules, allowing them to make the necessary decisions to modify some run-time parameters as needed.

Available implementations of Inputs/Processors/Outputs

The flexibility of Tez allows anyone to implement their Inputs and Outputs, whether they use blocking/non-blocking transport protocols, handle data in the form of raw bytes/records/key-value pairs etc., and build Processors to handle these variety of Inputs and Outputs.

There is already a small repository of various implementations of Inputs/Outputs/Processors:

  • MRInput and MROutput: Basic input and outputs to handle data to/from HDFS that are MapReduce compatible as they use MapReduce constructs such as InputFormat, RecordReader, OutputFormat and RecordWriter.
  • OnFileSortedOutput and ShuffleMergedInput: A pair of key-value based Input and Output that use the local disk for all I/O and provide the same sort+merge functionality that is required for the “shuffle” edge between the Map and Reduce stages in a MapReduce job.
  • OnFileUnorderedKVOutput and ShuffledUnorderedKVInput: These are similar to the shuffle pair mentioned earlier except that the data is not sorted implicitly. This can be a big performance boost in various situations.
  • MapProcessor and ReduceProcessor: As the names suggest, these processors are available for anyone trying to run a MapReduce job on the Tez execution framework. They can be used to run an MRR chain too.

As the Hive and Pig projects adapt to use Tez, we hope this repository will grow to house a common set of building blocks for use across the different projects.

References

[1] Bikas Saha, Runtime API in Apache Tez, Hortonworks, September 24th, 2013.
[2] Apache tez. http://incubator.apache.org/projects/tez.html.
[3] Netty project. http://netty.io.
[4] Storm. http://storm-project.net/.
[5] H.Ballani, P.Costa, T.Karagiannis, and A.I.Rowstron. Towards predictable datacenter networks. In SIGCOMM, volume 11, pages 242–253, 2011.
[6] F.P.Brooks,Jr. The mythical man-month (anniversary ed.). Addison-Wesley Longman Publishing Co., Inc., Boston, MA, USA, 1995.
[7] N. Capit, G. Da Costa, Y. Georgiou, G. Huard, C. Martin, G. Mounie, P. Neyron, and O. Richard. A batch scheduler with high level components. In Cluster Computing and the Grid, 2005. CC-Grid 2005. IEEE International Symposium on, volume 2, pages 776–783 Vol. 2, 2005.
[8] R. Chaiken, B. Jenkins, P.-A. Larson, B. Ramsey, D. Shakib, S. Weaver, and J. Zhou. Scope: easy and efficient parallel processing of massive data sets. Proc. VLDB Endow., 1(2):1265–1276, Aug. 2008.
[9] M. Chowdhury, M. Zaharia, J. Ma, M. I. Jordan, and I. Stoica. Managing data transfers in computer clusters with orchestra. SIGCOMM- Computer Communication Review, 41(4):98, 2011.
[10] B.-G. Chun, T. Condie, C. Curino, R. Ramakrishnan, R. Sears, and M. Weimer. Reef: Retainable evaluator execution framework. In VLDB 2013, Demo, 2013.
[11] B. F. Cooper, E. Baldeschwieler, R. Fonseca, J. J. Kistler, P. Narayan, C. Neerdaels, T. Negrin, R. Ramakrishnan, A. Silberstein, U. Srivastava, et al. Building a cloud for Yahoo! IEEE Data Eng. Bull., 32(1):36–43, 2009.
[12] J. Dean and S. Ghemawat. MapReduce: simplified data processing on large clusters. Commun. ACM, 51(1):107–113, Jan. 2008.
[13] W. Emeneker, D. Jackson, J. Butikofer, and D. Stanzione. Dynamic virtual clustering with xen and moab. In G. Min, B. Martino, L. Yang, M. Guo, and G. Rnger, editors, Frontiers of High Performance Computing and Networking, ISPA 2006 Workshops, volume 4331 of Lecture Notes in Computer Science, pages 440–451. Springer Berlin Heidelberg, 2006.
[14] Facebook Engineering Team. Under the Hood: Scheduling MapReduce jobs more efficiently with Corona. http://on.fb.me/TxUsYN, 2012.
[15] D. Gottfrid. Self-service prorated super-computing fun. http://open. blogs.nytimes.com/2007/11/01/self-service-prorated-super-computing-fun, 2007.
[16] T. Graves. GraySort and MinuteSort at Yahoo on Hadoop 0.23. http://sortbenchmark. org/Yahoo2013Sort.pdf, 2013.
[17] B. Hindman, A. Konwinski, M. Zaharia, A. Ghodsi, A. D. Joseph, R. Katz, S. Shenker, and I. Stoica. Mesos: a platform for fine-grained resource sharing in the data center. In Proceedings of the 8th USENIX conference on Networked systems design and implementation, NSDI’11, pages 22–22, Berkeley, CA, USA, 2011. USENIX Association.
[18] M. Isard, M. Budiu, Y. Yu, A. Birrell, and D. Fetterly. Dryad: distributed data-parallel programs from sequential building blocks. In Proceedings of the 2nd ACM SIGOPS/EuroSys European Conference on Computer Systems 2007, EuroSys ’07, pages 59–72, New York, NY, USA, 2007. ACM.
[19] M. Islam, A. K. Huang, M. Battisha, M. Chiang, S. Srinivasan, C. Peters, A. Neumann, and A. Abdelnur. Oozie: towards a scalable workflow management system for hadoop. In Proceedings of the 1st ACM SIGMOD Workshop on Scalable Workflow Execution Engines and Technologies, page 4. ACM, 2012.
[20] D. B. Jackson, Q. Snell, and M. J. Clement. Core algorithms of the maui scheduler. In Revised Papers from the 7th International Workshop on Job Scheduling Strategies for Parallel Processing, JSSPP ’01, pages 87–102, London, UK, UK, 2001. Springer-Verlag.
[21] S. Loughran, D. Das, and E. Baldeschwieler. Introducing Hoya – HBase on YARN. http://hortonworks.com/blog/introducing-hoya-hbase-on-yarn/, 2013.
[22] G. Malewicz, M. H. Austern, A. J. Bik, J. C. Dehnert, I. Horn, N. Leiser, and G. Czajkowski. Pregel: a system for large-scale graph processing. In Proceedings of the 2010 ACM SIGMOD International Conference on Management of data, SIGMOD ’10, pages 135–146, New York, NY, USA, 2010. ACM.
[23] R. O. Nambiar and M. Poess. The making of tpcds. In Proceedings of the 32nd international conference on Very large data bases, VLDB ’06, pages 1049–1058. VLDB Endowment, 2006.
[24] C. Olston, B. Reed, U. Srivastava, R. Kumar, and A. Tomkins. Pig Latin: a not-so-foreign language for data processing. In Proceedings of the 2008 ACM SIGMOD international conference on Management of data, SIGMOD ’08, pages 1099–1110, New York, NY, USA, 2008. ACM.
[25] O.O’Malley. Hadoop: The Definitive Guide, chapter Hadoop at Yahoo!, pages 11–12. O’Reilly Media, 2012.
[26] M. Schwarzkopf, A. Konwinski, M. Abd-El-Malek, and J. Wilkes. Omega: flexible, scalable schedulers for large compute clusters. In Proceedings of the 8th ACM European Conference on Computer Systems, EuroSys ’13, pages 351–364, New York, NY, USA, 2013. ACM.
[27] K.Shvachko, H.Kuang, S.Radia, and R.Chansler. The Hadoop Distributed File System. In Proceedings of the 2010 IEEE 26th Symposium on Mass Storage Systems and Technologies (MSST), MSST ’10, pages 1–10, Washington, DC, USA, 2010. IEEE Computer Society.
[28] T.-W. N. Sze. The two quadrillionth bit of π is 0! http://developer.yahoo.com/blogs/hadoop/two-quadrillionth-bit-0-467.html.
[29] D. Thain, T. Tannenbaum, and M. Livny. Distributed computing in practice: the Condor experience. Concurrency and Computation: Practice and Experience, 17(2-4):323–356, 2005.
[30] A. Thusoo, J. S. Sarma, N. Jain, Z. Shao, P. Chakka, N. Z. 0002, S. Anthony, H. Liu, and R. Murthy. Hive - a petabyte scale data warehouse using Hadoop. In F. Li, M. M. Moro, S. Ghandeharizadeh, J. R. Haritsa, G. Weikum, M. J. Carey, F. Casati, E. Y. Chang, I. Manolescu, S. Mehrotra, U. Dayal, and V. J. Tsotras, editors, Proceedings of the 26th International Conference on Data Engineering, ICDE 2010, March 1-6, 2010, Long Beach, California, USA, pages 996–1005. IEEE, 2010.
[31] Y. Yu, M. Isard, D. Fetterly, M. Budiu, U. Erlingsson, P. K. Gunda, and J. Currey. DryadLINQ: a system for general-purpose distributed data-parallel computing using a high-level language. In Proceedings of the 8th USENIX conference on Operating systems design and implementation, OSDI’08, pages 1–14, Berkeley, CA, USA, 2008. USENIX Association.
[32] M. Zaharia, M. Chowdhury, M. J. Franklin, S. Shenker, and I. Stoica. Spark: cluster computing with working sets. In Proceedings of the 2nd USENIX conference on Hot topics in cloud computing, HotCloud’10, pages 10–10, Berkeley, CA, USA, 2010. USENIX Association.
[33] Vinod Kumar Vavilapali, et. al, Apache Hadoop YARN – Yet Another Resource Negotiator, SoCC’13, 1-3 Oct. 2013, Santa Clara, California, USA.


comments powered by Disqus