Stop Thinking, Just Do!

Sung-Soo Kim's Blog

Failures in YARN


7 April 2014


In the real world, user code is buggy, processes crash, and machines fail. One of the major benefits of using Hadoop is its ability to handle such failures and allow your job to complete.

Failures in YARN

For MapReduce programs running on YARN, we need to consider the failure of any of the following entities: the task, the application master, the node manager, and the resource manager.

얀에서 실행 중인 맵리듀스 프로그램도 태스크 실패, 애플리케이션 마스터 실패, 노드 매니저 실패, 리소스 매니저 실패에 대해 고려할 필요가 있다.

Task Failure

Failure of the running task is similar to the classic case. Runtime exceptions and sudden exits of the JVM are propagated back to the application master and the task attempt is marked as failed. Likewise, hanging tasks are noticed by the application master by the absence of a ping over the umbilical channel (the timeout is set by mapreduce.task.time out), and again the task attempt is marked as failed.

The configuration properties for determining when a task is considered to be failed are the same as the classic case: a task is marked as failed after four attempts (set by for map tasks and mapreduce.reduce.maxattempts for reducer tasks). A job will be failed if more than percent of the map tasks in the job fail, or more than mapreduce.reduce.failures.maxpercent percent of the reduce tasks fail.

실행 중인 태스크의 실패는 전통적인 맵리듀스와 비슷하다. JVM의 런타임 예외 상황과 갑작스런 종료는 애플리케이션 마스터로 거슬러 전파되고 태스크의 시행은 실패로 마킹된다. 같은 방식으로 애플리케이션 마스터는 태스크에 밀접하게 연결된 채널을 통해 핑(ping)의 부재를 확인(mapreduce.task.time out으로 설정된 타임아웃)하고 태스크의 행(hang)을 인지한 후 태스크의 시행을 실패로 마킹한다.

태스크 실패 여부를 결정하는 환경 설정 속성은 고전적인 맵리듀스 때와 같다. 태스크는 네 번의 시행 후 실패로 마킹된다. (맵 태스크는로 설정하고, 리듀스 태스크는 mapreduce.reduce.failures.maxpercent로 설정). 잡 실패가에 설정된 맵 태스크의 실패율보다 많거나 mapreduce.reduce.failures.maxpercent에 설정된 맵 태스크의 실패율보다 많으면 그 잡은 실패할 것이다.

Application Master Failure

Just like MapReduce tasks are given several attempts to succeed (in the face of hardware or network failures) applications in YARN are tried multiple times in the event of failure. By default, applications are marked as failed if they fail once, but this can be increased by setting the property

An application master sends periodic heartbeats to the resource manager, and in the event of application master failure, the resource manager will detect the failure and start a new instance of the master running in a new container (managed by a node manager). In the case of the MapReduce application master, it can recover the state of the tasks that had already been run by the (failed) application so they don’t have to be rerun. By default, recovery is not enabled, so failed application masters will not rerun all their tasks, but you can turn it on by setting ery.enable to true.

The client polls the application master for progress reports, so if its application master fails the client needs to locate the new instance. During job initialization the client asks the resource manager for the application master’s address, and then caches it, so it doesn’t overload the the resource manager with a request every time it needs to poll the application master. If the application master fails, however, the client will experience a timeout when it issues a status update, at which point the client will go back to the resource manager to ask for the new application master’s address.

맵리듀스 태스크가 성공적인 수행을 위해 몇 번의 시행 기회가 있는 것처럼 (하드웨어나 네트워크 장애의 경우), 얀에서 애플리케이션도 실패한 경우 몇 번 더 실행할 수 있다. 기본적으로 애플리케이션은 한 번 실패하면 바로 실패된 것으로 마킹된다. 그러나

Node Manager Failure

If a node manager fails, then it will stop sending heartbeats to the resource manager, and the node manager will be removed from the resource manager’s pool of available nodes. The property yarn.resourcemanager.nm.liveness-monitor.expiry-intervalms, which defaults to 600000 (10 minutes), determines the minimum time the resource manager waits before considering a node manager that has sent no heartbeat in that time as failed.

Any task or application master running on the failed node manager will be recovered using the mechanisms described in the previous two sections.

Node managers may be blacklisted if the number of failures for the application is high. Blacklisting is done by the application master, and for MapReduce the application master will try to reschedule tasks on different nodes if more than three tasks fail on a node manager. The threshold may be set with mapreduce.job.maxtaskfai lures.per.tracker.

Resource Manager Failure

Failure of the resource manager is serious, since without it neither jobs nor task containers can be launched. The resource manager was designed from the outset to be able to recover from crashes, by using a checkpointing mechanism to save its state to persistent storage, although at the time of writing the latest release did not have a complete implementation.

After a crash, a new resource manager instance is brought up (by an adminstrator) and it recovers from the saved state. The state consists of the node managers in the system as well as the running applications. (Note that tasks are not part of the resource manager’s state, since they are managed by the application. Thus the amount of state to be stored is much more managable than that of the jobtracker.)

The storage used by the reource manager is configurable via the yarn.resourceman property. The default is org.apache.hadoop.yarn.server.resource manager.recovery.MemStore, which keeps the store in memory, and is therefore not highly-available. However, there is a ZooKeeper-based store in the works that will support reliable recovery from resource manager failures in the future.


[1] Apache hadoop.
[2] Apache tez.
[3] Netty project.
[4] Storm.
[5] H.Ballani, P.Costa, T.Karagiannis, and A.I.Rowstron. Towards predictable datacenter networks. In SIGCOMM, volume 11, pages 242–253, 2011.
[6] F.P.Brooks,Jr. The mythical man-month (anniversary ed.). Addison-Wesley Longman Publishing Co., Inc., Boston, MA, USA, 1995.
[7] N. Capit, G. Da Costa, Y. Georgiou, G. Huard, C. Martin, G. Mounie, P. Neyron, and O. Richard. A batch scheduler with high level components. In Cluster Computing and the Grid, 2005. CC-Grid 2005. IEEE International Symposium on, volume 2, pages 776–783 Vol. 2, 2005.
[8] R. Chaiken, B. Jenkins, P.-A. Larson, B. Ramsey, D. Shakib, S. Weaver, and J. Zhou. Scope: easy and efficient parallel processing of massive data sets. Proc. VLDB Endow., 1(2):1265–1276, Aug. 2008.
[9] M. Chowdhury, M. Zaharia, J. Ma, M. I. Jordan, and I. Stoica. Managing data transfers in computer clusters with orchestra. SIGCOMMComputer Communication Review, 41(4):98, 2011.
[10] B.-G. Chun, T. Condie, C. Curino, R. Ramakrishnan, R. Sears, and M. Weimer. Reef: Retainable evaluator execution framework. In VLDB 2013, Demo, 2013.
[11] B. F. Cooper, E. Baldeschwieler, R. Fonseca, J. J. Kistler, P. Narayan, C. Neerdaels, T. Negrin, R. Ramakrishnan, A. Silberstein, U. Srivastava, et al. Building a cloud for Yahoo! IEEE Data Eng. Bull., 32(1):36–43, 2009.
[12] J. Dean and S. Ghemawat. MapReduce: simplified data processing on large clusters. Commun. ACM, 51(1):107–113, Jan. 2008.
[13] W. Emeneker, D. Jackson, J. Butikofer, and D. Stanzione. Dynamic virtual clustering with xen and moab. In G. Min, B. Martino, L. Yang, M. Guo, and G. Rnger, editors, Frontiers of High Performance Computing and Networking, ISPA 2006 Workshops, volume 4331 of Lecture Notes in Computer Science, pages 440–451. Springer Berlin Heidelberg, 2006.
[14] Facebook Engineering Team. Under the Hood: Scheduling MapReduce jobs more efficiently with Corona., 2012.
[15] D. Gottfrid. Self-service prorated super-computing fun. http://open., 2007.
[16] T. Graves. GraySort and MinuteSort at Yahoo on Hadoop 0.23. http://sortbenchmark. org/Yahoo2013Sort.pdf, 2013.
[17] B. Hindman, A. Konwinski, M. Zaharia, A. Ghodsi, A. D. Joseph, R. Katz, S. Shenker, and I. Stoica. Mesos: a platform for fine-grained resource sharing in the data center. In Proceedings of the 8th USENIX conference on Networked systems design and implementation, NSDI’11, pages 22–22, Berkeley, CA, USA, 2011. USENIX Association.
[18] M. Isard, M. Budiu, Y. Yu, A. Birrell, and D. Fetterly. Dryad: distributed data-parallel programs from sequential building blocks. In Proceedings of the 2nd ACM SIGOPS/EuroSys European Conference on Computer Systems 2007, EuroSys ’07, pages 59–72, New York, NY, USA, 2007. ACM.
[19] M. Islam, A. K. Huang, M. Battisha, M. Chiang, S. Srinivasan, C. Peters, A. Neumann, and A. Abdelnur. Oozie: towards a scalable workflow management system for hadoop. In Proceedings of the 1st ACM SIGMOD Workshop on Scalable Workflow Execution Engines and Technologies, page 4. ACM, 2012.
[20] D. B. Jackson, Q. Snell, and M. J. Clement. Core algorithms of the maui scheduler. In Revised Papers from the 7th International Workshop on Job Scheduling Strategies for Parallel Processing, JSSPP ’01, pages 87–102, London, UK, UK, 2001. Springer-Verlag.
[21] S. Loughran, D. Das, and E. Baldeschwieler. Introducing Hoya – HBase on YARN., 2013.
[22] G. Malewicz, M. H. Austern, A. J. Bik, J. C. Dehnert, I. Horn, N. Leiser, and G. Czajkowski. Pregel: a system for large-scale graph processing. In Proceedings of the 2010 ACM SIGMOD International Conference on Management of data, SIGMOD ’10, pages 135–146, New York, NY, USA, 2010. ACM.
[23] R. O. Nambiar and M. Poess. The making of tpcds. In Proceedings of the 32nd international conference on Very large data bases, VLDB ’06, pages 1049–1058. VLDB Endowment, 2006.
[24] C. Olston, B. Reed, U. Srivastava, R. Kumar, and A. Tomkins. Pig Latin: a not-so-foreign language for data processing. In Proceedings of the 2008 ACM SIGMOD international conference on Management of data, SIGMOD ’08, pages 1099–1110, New York, NY, USA, 2008. ACM.
[25] O.O’Malley. Hadoop: The Definitive Guide, chapter Hadoop at Yahoo!, pages 11–12. O’Reilly Media, 2012.
[26] M. Schwarzkopf, A. Konwinski, M. Abd-El-Malek, and J. Wilkes. Omega: flexible, scalable schedulers for large compute clusters. In Proceedings of the 8th ACM European Conference on Computer Systems, EuroSys ’13, pages 351–364, New York, NY, USA, 2013. ACM.
[27] K.Shvachko, H.Kuang, S.Radia, and R.Chansler. The Hadoop Distributed File System. In Proceedings of the 2010 IEEE 26th Symposium on Mass Storage Systems and Technologies (MSST), MSST ’10, pages 1–10, Washington, DC, USA, 2010. IEEE Computer Society.
[28] T.-W. N. Sze. The two quadrillionth bit of π is 0!
[29] D. Thain, T. Tannenbaum, and M. Livny. Distributed computing in practice: the Condor experience. Concurrency and Computation: Practice and Experience, 17(2-4):323–356, 2005.
[30] A. Thusoo, J. S. Sarma, N. Jain, Z. Shao, P. Chakka, N. Z. 0002, S. Anthony, H. Liu, and R. Murthy. Hive a petabyte scale data warehouse using Hadoop. In F. Li, M. M. Moro, S. Ghandeharizadeh, J. R. Haritsa, G. Weikum, M. J. Carey, F. Casati, E. Y. Chang, I. Manolescu, S. Mehrotra, U. Dayal, and V. J. Tsotras, editors, Proceedings of the 26th International Conference on Data Engineering, ICDE 2010, March 1-6, 2010, Long Beach, California, USA, pages 996–1005. IEEE, 2010.
[31] Y. Yu, M. Isard, D. Fetterly, M. Budiu, U. Erlingsson, P. K. Gunda, and J. Currey. DryadLINQ: a system for general-purpose distributed data-parallel computing using a high-level language. In Proceedings of the 8th USENIX conference on Operating systems design and implementation, OSDI’08, pages 1–14, Berkeley, CA, USA, 2008. USENIX Association.
[32] M. Zaharia, M. Chowdhury, M. J. Franklin, S. Shenker, and I. Stoica. Spark: cluster computing with working sets. In Proceedings of the 2nd USENIX conference on Hot topics in cloud computing, HotCloud’10, pages 10–10, Berkeley, CA, USA, 2010. USENIX Association.
[33] Vinod Kumar Vavilapali, et. al, Apache Hadoop YARN – Yet Another Resource Negotiator, SoCC’13, 1-3 Oct. 2013, Santa Clara, California, USA.

comments powered by Disqus