Stop Thinking, Just Do!

Sung-Soo Kim's Blog

YARN (MapReduce 2)


12 December 2013


For very large clusters in the region of 4000 nodes and higher, the MapReduce system described in the previous section begins to hit scalability bottlenecks, so in 2010 a group at Yahoo! began to design the next generation of MapReduce. The result was YARN, short for Yet Another Resource Negotiator (or if you prefer recursive ancronyms, YARN Application Resource Negotiator).
YARN meets the scalability shortcomings of “classic” MapReduce by splitting the responsibilities of the jobtracker into separate entities. The jobtracker takes care of both job scheduling (matching tasks with tasktrackers) and task progress monitoring (keeping track of tasks and restarting failed or slow tasks, and doing task bookkeeping such as maintaining counter totals).
YARN separates these two roles into two independent daemons: a resource manager to manage the use of resources across the cluster, and an application master to manage the lifecycle of applications running on the cluster. The idea is that an application master negotiates with the resource manager for cluster resources—described in terms of a number of containers each with a certain memory limit—then runs application-specific processes in those containers. The containers are overseen by node managers running on cluster nodes, which ensure that the application does not use more resources than it has been allocated.
In contrast to the jobtracker, each instance of an application—here a MapReduce job —has a dedicated application master, which runs for the duration of the application. This model is actually closer to the original Google MapReduce paper, which describes how a master process is started to coordinate map and reduce tasks running on a set of workers.

As described, YARN is more general than MapReduce, and in fact MapReduce is just one type of YARN application. There are a few other YARN applications—such as a distributed shell that can run a script on a set of nodes in the cluster—and others are actively being worked on (some are listed at PoweredByYarn). The beauty of YARN’s design is that different YARN applications can co-exist on the same cluster—so a MapReduce application can run at the same time as an MPI application, for example—which brings great benefits for managability and cluster utilization.

Furthermore, it is even possible for users to run different versions of MapReduce on the same YARN cluster, which makes the process of upgrading MapReduce more managable. (Note that some parts of MapReduce, like the job history server and the shuffle handler, as well as YARN itself, still need to be upgraded across the cluster.)

MapReduce on YARN

MapReduce on YARN involves more entities than classic MapReduce. They are:
The client, which submits the MapReduce job.

  • The YARN resource manager, which coordinates the allocation of compute resources on the cluster.
  • The YARN node managers, which launch and monitor the compute containers on machines in the cluster.
  • The MapReduce application master, which coordinates the tasks running the MapReduce job. The application master and the MapReduce tasks run in containers that are scheduled by the resource manager, and managed by the node managers.
  • The distributed filesystem (normally HDFS), which is used for sharing job files between the other entities.

The process of running a job is shown in the below figure, and described in the following sections.

Job Submission

Jobs are submitted in MapReduce 2 using the same user API as MapReduce 1 (step 1). MapReduce 2 has an implementation of ClientProtocol that is activated when is set to yarn. The submission process is very similar to the classic implementation. The new job ID is retrieved from the resource manager (rather than the jobtracker), although in the nomenclature of YARN it is an application ID (step 2). The job client checks the output specification of the job; computes input splits (although there is an option to generate them on the cluster, pute-splits-in-cluster, which can be beneficial for jobs with many splits); and copies job resources (including the job JAR, configuration, and split information) to HDFS (step 3). Finally, the job is submitted by calling submitApplication() on the resource manager (step 4).

Job Initialization

When the resource manager receives a call to its submitApplication(), it hands off the request to the scheduler. The scheduler allocates a container, and the resource manager then launches the application master’s process there, under the node manager’s management (steps 5a and 5b).

The application master for MapReduce jobs is a Java application whose main class is MRAppMaster. It initializes the job by creating a number of bookkeeping objects to keep track of the job’s progress, as it will receive progress and completion reports from the tasks (step 6). Next, it retrieves the input splits computed in the client from the shared filesystem (step 7). It then creates a map task object for each split, and a number of reduce task objects determined by the mapreduce.job.reduces property.

The next thing the application master does is decide how to run the tasks that make up the MapReduce job. If the job is small, the application master may choose to run them in the same JVM as itself, since it judges the overhead of allocating new containers and running tasks in them as outweighing the gain to be had in running them in parallel, compared to running them sequentially on one node. (This is different to MapReduce 1, where small jobs are never run on a single tasktracker.) Such a job is said to be uberized, or run as an uber task.

What qualifies as a small job? By default one that has less than 10 mappers, only one reducer, and the input size is less than the size of one HDFS block. (These values may be changed for a job by setting mapreduce.job.ubertask.maxmaps, mapreduce.job.uber task.maxreduces, and mapreduce.job.ubertask.maxbytes.) It’s also possible to disable uber tasks entirely (by setting mapreduce.job.ubertask.enable to false).

Before any tasks can be run the job setup method is called (for the job’s OutputCommitter), to create the job’s output directory. In contrast to MapReduce 1, where it is called in a special task that is run by the tasktracker, in the YARN implementation the method is called directly by the application master.

Task Assignment

If the job does not qualify for running as an uber task, then the application master requests containers for all the map and reduce tasks in the job from the resource manager (step 8). Each request, which are piggybacked on heartbeat calls, includes information about each map task’s data locality, in particular the hosts and corresponding racks that the input split resides on. The scheduler uses this information to make scheduling decisions (just like a jobtracker’s scheduler does): it attempts to place tasks on data-local nodes in the ideal case, but if this is not possible the scheduler prefers rack-local placement to non-local placement.

Requests also specify memory requirements for tasks. By default both map and reduce tasks are allocated 1024 MB of memory, but this is configurable by setting mapre and mapreduce.reduce.memory.mb.

The way memory is allocated is different to MapReduce 1, where tasktrackers have a fixed number of “slots”, set at cluster configuration time, and each task runs in a single slot. Slots have a maximum memory allowance, which again is fixed for a cluster, and which leads both to problems of under utilization when tasks use less memory (since other waiting tasks are not able to take advantage of the unused memory) and problems of job failure when a task can’t complete since it can’t get enough memory to run correctly.

In YARN, resources are more fine-grained, so both these problems can be avoided. In particular, applications may request a memory capability that is anywhere between the minimum allocation and a maximum allocation, and which must be a multiple of the minimum allocation. Default memory allocations are scheduler-specific, and for the capacity scheduler the default minimum is 1024 MB (set by yarn.scheduler.capacity.minimum-allocation-mb), and the default maximum is 10240 MB (set by yarn.scheduler.capacity.maximum-allocation-mb). Thus, tasks can request any memory allocation between 1 and 10 GB (inclusive), in multiples of 1 GB (the scheduler will round to the nearest multiple if needed), by setting and map reduce.reduce.memory.mb appropriately.

Task Execution

Once a task has been assigned a container by the resource manager’s scheduler, the application master starts the container by contacting the node manager (steps 9a and 9b). The task is executed by a Java application whose main class is YarnChild. Before it can run the task it localizes the resources that the task needs, including the job configuration and JAR file, and any files from the distributed cache (step 10). Finally, it runs the map or reduce task (step 11).

The YarnChild runs in a dedicated JVM, for the same reason that tasktrackers spawn new JVMs for tasks in MapReduce 1: to isolate user code from long-running system daemons. Unlike MapReduce 1, however, YARN does not support JVM reuse so each task runs in a new JVM.

Streaming and Pipes programs work in the same way as MapReduce 1. The Yarn Child launches the Streaming or Pipes process and communicates with it using standard input/output or a socket (respectively), as shown in Figure 6-2 (except the child and subprocesses run on node managers, not tasktrackers).

Progress and Status Updates

When running under YARN, the task reports its progress and status (including counters) back to its application master every three seconds (over the umbilical interface), which has an aggregate view of the job. The process is illustrated in the below figure. Contrast this to MapReduce 1, where progress updates flow from the child through the task-tracker to the jobtracker for aggregation.

The client polls the application master every second (set via gressmonitor.pollinterval) to receive progress updates, which are usually displayed to the user.

Job Completion

As well as polling the application master for progress, every five seconds the client checks whether the job has completed when using the waitForCompletion() method on Job. The polling interval can be set via the mapreduce.client.completion.polli nterval configuration property.

Notification of job completion via an HTTP callback is also supported like in MapReduce 1. In MapReduce 2 the application master initiates the callback.

On job completion the application master and the task containers clean up their working state, and the OutputCommitter’s job cleanup method is called. Job information is archived by the job history server to enable later interrogation by users if desired.

Studying Hadoop or MapReduce

Studying Hadoop or MapReduce can be a daunting task if you get your hand dirty at the start.
I followed the schedule as follows :

  1. Start with very basics of MR with
  2. Then go for the first two lectures in A very good course intro to MapReduce and Hadoop.

  • Thursday September 25: Introduction/Overview (<a href=lectures/Intro.ppt>ppt</a>)
    Reading: <a href=>"Introduction to Distributed System Design"</a>
    Reading: "Lessons from Giant-Scale Services"
    Reading: "Web Search for a Planet: The Google Cluster Architecture"
  • Tuesday September 30: Functional programming and MapReduce (ppt)
    Reading: <a href=>"Introduction to Parallel Programming and MapReduce"</a>
    Reading: <a href=>"MapReduce: Simplified Data Processing on Large Clusters"</a> (updated version at page 107 <a href=readings/communications200801-dl.pdf>here</a>)
  • Thursday October 2: Hadoop nuts and bolts (ppt)
    Reading: <a href=>"Hadoop Map/Reduce Tutorial"</a>
  • Tuesday October 7, 3:30-4:20: Jeff Dean (Google) Distinguished Lecture, <a href=lectures/Jeff.Dean.colloq.pdf>"Research Problems Inspired by Large-Scale Computing at Google"</a>
  • Tuesday October 7: Jeff Dean (Google), <a href=lectures/Jeff.Dean.class.pdf>"Handling Large Datasets at Google: Current Systems and Future Directions"</a>
  • Thursday October 9: <a href=lectures/490h-asn.pdf>Architecture, Systems, and Networking in 80 minutes</a> (<a href=lectures/490h-asn.ppt>ppt</a>)
  • Tuesday October 14, 3:30-4:20: Vint Cerf (Google) Distinguished Lecture, <a href=lectures/cerf.dls.pdf>"Internet Evolution and Some Challenges for the Early 21st Century"</a>
  • Tuesday October 14: MapReduce algorithms (ppt) <a href=lectures/2008_10_490h_route_stability.pdf>Internet route stability example</a>
    Reading: <a href=>"The Anatomy of a Large-Scale Hypertextual Web Search Engine"</a>
  • Thursday October 16: <a href=lectures/490h-gfs.pdf>The Google File System</a> (<a href=lectures/490h-gfs.ppt>ppt</a>)
    Reading: <a href=>"The Google File System"</a>
  • Tuesday October 21: Mike Cafarella (UW), <a href=lectures/490h_nutch.pdf>Nutch, and Search Engine History</a>
    Reading: "Building Nutch: Open Source Search" (cleaner copy <a href=>here</a>)
  • Thursday October 23: Barry Brumitt (Google), <a href=lectures/MapReduceDesignPatterns-UW2.pdf>"MapReduce Design Patterns"</a>
    Reading: "Map-Reduce for Machine Learning on Multicore"
    Reading: "Highway Hierarchies Hasten Exact Shortest Path Queries"
    Reading: "Reach for A*: Efficient Point-to-Point Shortest Path Algorithms"
  • Tuesday October 28: Phil Bernstein (Microsoft), <a href=lectures/PhilBeCSE490H-F08.pdf>"Transactions and Replication"</a>
    Reading: <a href=readings/ReplicationByBernstein&Newcomer.pdf>"Replication"</a> (from Principles of Transaction Processing, Bernstein & Newcomer, Elsevier, Inc.)
    Reading: <a href=>"Paxos Made Live - An Engineering Perspective"</a>
  • Thursday October 30: Additional topics in reliability / availability / consistency (ppt)
    Reading: "Eventually Consistent - Revisited"
    Reading: <a href=>"The Chubby Lock Service for Loosely-Coupled Distributed Systems"</a>
  • Tuesday November 4: [Aaron away] Steve Gribble (UW and Google), <a href=lectures/cse490_virtualization.pdf>"Virtual Machine Monitors: Implementation and Applications"</a>
    Reading: <a href=>"Xen and the Art of Virtualization"</a>
    Reading: <a href=>"Remus: High Availability via Asynchronous Virtual Machine Replication"</a>
  • Thursday November 6: [Aaron away] <a href=lectures/490h-bt.pdf>BigTable</a> (<a href=lectures/490h-bt.ppt>ppt</a>)
    Reading: <a href=>"Bigtable: A Distributed Storage System for Structured Data"</a>
    (See Jeff Dean's October 2005 colloquium.)
  • Tuesday November 11: Veteran's Day -- UW holiday
  • Thursday November 13: EC2 (ppt)
    Reading: "Amazon Elastic Compute Cloud Getting Started Guide"
    Reading: "Using Amazon S3 from Amazon EC2 with Ruby"
    Reading: "Building GrepTheWeb in the Cloud, Part 1: Cloud Architectures"
    Reading: "Building GrepTheWeb in the Cloud, Part 2: Best Practices"
  • Tuesday November 18, 3:30-4:20: Werner Vogels ( Distinguished Lecture, "Ahead in the Cloud: Amazon Web Services"
  • Tuesday November 18: <a href=lectures/490h-power.pdf>Hardware issues: power, reliability, etc.</a> (<a href=lectures/490h-power.ppt>ppt</a>)
    Reading: <a href=>"Disk failures in the real world: What does an MTTF of 1,000,000 hours mean to you?"</a>
    Reading: <a href=>"Failure Trends in a Large Disk Drive Population"</a>
    Reading: "The Case for Energy-Proportional Computing"
  • Thursday November 20: [Ed away] Yoshi Kohno (UW), <a href=lectures/security-overview.pdf>Security, Privacy, and Cryptography</a>
    Reading: <a href=>"Why Cryptosystems Fail"</a>
    Reading: <a href=>"How to think like a security professional"</a>
  • Tuesday November 25: Werner Vogels (, Software and hardware architecture of large-scale web services
    Reading: "Dynamo: Amazon's Highly Available Key-value Store"
  • Thursday November 27: Thanksgiving -- UW holiday
  • Tuesday December 2: [Aaron away] James Hamilton (Microsoft), "Designing and Deploying Internet-Scale Services" and "Where Does the Power Go and What to do About it?"
    Reading: <a href=>"An Architecture for Modular Datacenters"</a>
    Reading: <a href=>"Power Provisioning for a Warehouse-sized Computer"</a>
    Reading: <a href=>"On Designing and Deploying Internet-Scale Services"</a> 3. Read the seminal paper []( and its improvements in the updated version []( 4. Then go for all the other videos in the U.Washington link given above. 5. Try youtubing the terms Map reduce and hadoop to find videos by ORielly and Google RoundTable for good overview of the future of Hadoop and MapReduce 6. Then off to the most important videos - Cloudera Videos []( and Google MiniLecture Series []( Along with all the Multimedia above we need good written material Documents: 1. Architecture diagrams at are good to have on your wall 2. Hadoop: The definitive guide goes more into the nuts and bolts of the whole system where as Hadoop in Action is a good read with lots of teaching examples to learn the concepts of hadoop. Pro Hadoop is not for beginners 3. pdfs of the documentation from Apache Foundation []( and []( will help you learn as to how model your problem into a MR solution in order to gain the advantages of Hadoop in total. 4. HDFS paper by Yahoo! Research is also a good read in order to gain in depth knowledge of hadoop 5. Subscribe to the User Mailing List of Commons, MapReduce and HDFS in order to know problems, solutions and future solutions. 6. Try the []( link for beginners to expert path to Hadoop For Any Queries ... Contact Apache, Google, Bing, Yahoo! References --- [1] Tome White, *Hadoop; The Definitive Guide*, pp.194-200, Third Edition, 2012. [2] [Apache tez]( [3] [Netty project]( [4] [Storm]( [5] H.Ballani, P.Costa, T.Karagiannis, and A.I.Rowstron. Towards predictable datacenter networks. In *SIGCOMM*, volume 11, pages 242–253, 2011. [6] F.P.Brooks,Jr. *The mythical man-month (anniversary ed.)*. Addison-Wesley Longman Publishing Co., Inc., Boston, MA, USA, 1995. [7] N. Capit, G. Da Costa, Y. Georgiou, G. Huard, C. Martin, G. Mounie, P. Neyron, and O. Richard. A batch scheduler with high level components. In *Cluster Computing and the Grid, 2005. CC-Grid 2005. IEEE International Symposium on*, volume 2, pages 776–783 Vol. 2, 2005. [8] R. Chaiken, B. Jenkins, P.-A. Larson, B. Ramsey, D. Shakib, S. Weaver, and J. Zhou. Scope: easy and efficient parallel processing of massive data sets. *Proc. VLDB Endow.*, 1(2):1265–1276, Aug. 2008. [9] M. Chowdhury, M. Zaharia, J. Ma, M. I. Jordan, and I. Stoica. Managing data transfers in computer clusters with orchestra. *SIGCOMM- Computer Communication Review*, 41(4):98, 2011. [10] B.-G. Chun, T. Condie, C. Curino, R. Ramakrishnan, R. Sears, and M. Weimer. Reef: Retainable evaluator execution framework. In *VLDB 2013*, Demo, 2013. [11] B. F. Cooper, E. Baldeschwieler, R. Fonseca, J. J. Kistler, P. Narayan, C. Neerdaels, T. Negrin, R. Ramakrishnan, A. Silberstein, U. Srivastava, et al. Building a cloud for Yahoo! *IEEE Data Eng. Bull.*, 32(1):36–43, 2009. [12] J. Dean and S. Ghemawat. MapReduce: simplified data processing on large clusters. *Commun. ACM*, 51(1):107–113, Jan. 2008. [13] W. Emeneker, D. Jackson, J. Butikofer, and D. Stanzione. Dynamic virtual clustering with xen and moab. In G. Min, B. Martino, L. Yang, M. Guo, and G. Rnger, editors, *Frontiers of High Performance Computing and Networking, ISPA 2006 Workshops*, volume 4331 of Lecture Notes in Computer Science, pages 440–451. Springer Berlin Heidelberg, 2006. [14] Facebook Engineering Team. *Under the Hood: Scheduling MapReduce jobs more efficiently with Corona*., 2012. [15] D. Gottfrid. *Self-service prorated super-computing fun*. http://open., 2007. [16] T. Graves. *GraySort and MinuteSort at Yahoo on Hadoop 0.23*. http://sortbenchmark. org/Yahoo2013Sort.pdf, 2013. [17] B. Hindman, A. Konwinski, M. Zaharia, A. Ghodsi, A. D. Joseph, R. Katz, S. Shenker, and I. Stoica. Mesos: a platform for fine-grained resource sharing in the data center. In *Proceedings of the 8th USENIX conference on Networked systems design and implementation, NSDI’11*, pages 22–22, Berkeley, CA, USA, 2011. USENIX Association. [18] M. Isard, M. Budiu, Y. Yu, A. Birrell, and D. Fetterly. Dryad: distributed data-parallel programs from sequential building blocks. In *Proceedings of the 2nd ACM SIGOPS/EuroSys European Conference on Computer Systems 2007, EuroSys ’07*, pages 59–72, New York, NY, USA, 2007. ACM. [19] M. Islam, A. K. Huang, M. Battisha, M. Chiang, S. Srinivasan, C. Peters, A. Neumann, and A. Abdelnur. Oozie: towards a scalable workflow management system for hadoop. In *Proceedings of the 1st ACM SIGMOD Workshop on Scalable Workflow Execution Engines and Technologies*, page 4. ACM, 2012. [20] D. B. Jackson, Q. Snell, and M. J. Clement. Core algorithms of the maui scheduler. In *Revised Papers from the 7th International Workshop on Job Scheduling Strategies for Parallel Processing, JSSPP ’01*, pages 87–102, London, UK, UK, 2001. Springer-Verlag. [21] S. Loughran, D. Das, and E. Baldeschwieler. [*Introducing Hoya – HBase on YARN*](, 2013. [22] G. Malewicz, M. H. Austern, A. J. Bik, J. C. Dehnert, I. Horn, N. Leiser, and G. Czajkowski. Pregel: a system for large-scale graph processing. In *Proceedings of the 2010 ACM SIGMOD International Conference on Management of data, SIGMOD ’10*, pages 135–146, New York, NY, USA, 2010. ACM. [23] R. O. Nambiar and M. Poess. The making of tpcds. In *Proceedings of the 32nd international conference on Very large data bases, VLDB ’06*, pages 1049–1058. VLDB Endowment, 2006. [24] C. Olston, B. Reed, U. Srivastava, R. Kumar, and A. Tomkins. Pig Latin: a not-so-foreign language for data processing. In *Proceedings of the 2008 ACM SIGMOD international conference on Management of data, SIGMOD ’08*, pages 1099–1110, New York, NY, USA, 2008. ACM. [25] O.O’Malley. *Hadoop: The Definitive Guide*, chapter Hadoop at Yahoo!, pages 11–12. O’Reilly Media, 2012. [26] M. Schwarzkopf, A. Konwinski, M. Abd-El-Malek, and J. Wilkes. Omega: flexible, scalable schedulers for large compute clusters. In *Proceedings of the 8th ACM European Conference on Computer Systems, EuroSys ’13*, pages 351–364, New York, NY, USA, 2013. ACM. [27] K.Shvachko, H.Kuang, S.Radia, and R.Chansler. The Hadoop Distributed File System. In *Proceedings of the 2010 IEEE 26th Symposium on Mass Storage Systems and Technologies (MSST), MSST ’10*, pages 1–10, Washington, DC, USA, 2010. IEEE Computer Society. [28] T.-W. N. Sze. [*The two quadrillionth bit of π is 0!*]( [29] D. Thain, T. Tannenbaum, and M. Livny. Distributed computing in practice: the Condor experience. *Concurrency and Computation: Practice and Experience*, 17(2-4):323–356, 2005. [30] A. Thusoo, J. S. Sarma, N. Jain, Z. Shao, P. Chakka, N. Z. 0002, S. Anthony, H. Liu, and R. Murthy. Hive - a petabyte scale data warehouse using Hadoop. In F. Li, M. M. Moro, S. Ghandeharizadeh, J. R. Haritsa, G. Weikum, M. J. Carey, F. Casati, E. Y. Chang, I. Manolescu, S. Mehrotra, U. Dayal, and V. J. Tsotras, editors, *Proceedings of the 26th International Conference on Data Engineering, ICDE 2010*, March 1-6, 2010, Long Beach, California, USA, pages 996–1005. IEEE, 2010. [31] Y. Yu, M. Isard, D. Fetterly, M. Budiu, U. Erlingsson, P. K. Gunda, and J. Currey. DryadLINQ: a system for general-purpose distributed data-parallel computing using a high-level language. In *Proceedings of the 8th USENIX conference on Operating systems design and implementation, OSDI’08*, pages 1–14, Berkeley, CA, USA, 2008. USENIX Association. [32] M. Zaharia, M. Chowdhury, M. J. Franklin, S. Shenker, and I. Stoica. Spark: cluster computing with working sets. In *Proceedings of the 2nd USENIX conference on Hot topics in cloud computing, HotCloud’10*, pages 10–10, Berkeley, CA, USA, 2010. USENIX Association. [33] Vinod Kumar Vavilapali, et. al, *Apache Hadoop YARN – Yet Another Resource Negotiator*, SoCC'13, 1-3 Oct. 2013, Santa Clara, California, USA.

  • comments powered by Disqus