博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
hadoop之MapReduce框架JobTracker端心跳机制分析(源码分析第七篇)
阅读量:7091 次
发布时间:2019-06-28

本文共 23150 字,大约阅读时间需要 77 分钟。

一、小概述

    JobTracker端的心跳机制主要任务是处理TaskTracker发送过来的心跳信息,判断TaskTrakcer是否存活、及时让JobTracker获取到各个节点上的资源使用情况和任务运行情况和为TaskTracker下达各种命令等功能。

二、心跳处理过程详解

    JobTracker端的心跳机制处理代码只要是JobTracker类的heartbeat方法,通过逐步的分析其中大概的流程比较多,但是万变不离其中都是先各种容错性检测,对Task、TaskTracker、job状态信息的更新以及临时资源的清理,处理心跳信息,返回心跳应答信息等。下面看看简单的流程图:

(1)过程一,检测当前TaskTracker是否可以接入JobTracker。

在该过程中,JobTracker会对TaskTracker是否可以进行连接进行检测。

当且仅当该TaskTracker在host list(由参数mapred.hosts配置)中,但是不在exclude list(由mapred.hosts.exclude指定)中,才允许接入。如果不允许接入则会直接抛出异常终止本次心跳处理。代码定位到JobTracker类的heartbeat方法:

...//JobTracker检测该TaskTracker是否可以进行连接,当且仅当该TaskTracker在host list(由参数mapred.hosts配置)中,    //但是不在exclude list(由mapred.hosts.exclude指定)中,才允许接入。    if (!acceptTaskTracker(status)) {      throw new DisallowedTaskTrackerException(status);    }...

追踪进入acceptTaskTracker方法,可以清楚的看到:

.../**   * Returns true if the tasktracker is in the hosts list and   * not in the exclude list.   */  private boolean acceptTaskTracker(TaskTrackerStatus status) {    return (inHostsList(status) && !inExcludedHostsList(status));  }...

(2)过程二,检测TaskTracker是否已经被重启

   在JobTracker允许TaskTracker接入的前提下,检查该TaskTracker是否已经被重启过了(是否重启过的restarted参数有TaskTracker通过心跳传给JobTracker)。如果TaskTracker被重启过,则将之标注为健康状态的TaskTracker,并且从黑名或者灰名单中清除,否则启动容错机制以检测它是否处于健康状态。

(3)过程三,检测TaskTracker是否是第一次连接JobTracker。

   这是一个非常重要的容错步骤。如果是非初次心跳,正常情况下在JobTracker中的trackerToHeartbeatResponseMap对象中会存在该TaskTracker上一次的心跳应答对象信息HeartbeatResponse,初次心跳连接则不会有该TaskTracker上一次的应答对象。但是在非正常情况下会出现,非初次心跳,但是trackerToHeartbeatResponseMap没有该TaskTracker的应答对象信息存在,这是可能由于网络等各方面的原因造成的,另外重要的一点是initialContact标识是从TaskTracker端通过心跳发送过来的,JobTracker端记录的该TaskTracker状态不一定是一致的。因此,在这一步要进行比较多的容错性校验。

   首先,通过TaskTracker通过心跳带来的initialContact标志判断该TaskTracker是否是初次链接,如果是则直接进入处理心跳信息环节。如果非初次连接并且上一次心跳应答对象不存在,则会去判断该JobTracker是否被重启过,如果不是,说明上次心跳应答丢失那么直接给该TaskTracker组装重新初始化命令让其重新初始化,如果JobTracker被重启过则会自动先检测是否存在需要恢复运行状态的作业。如果有,则可以通过日志恢复这些作业的运行状态,并重新调度那么些没有运行的tasks,当然也包括了已经产生部分结果的tasks(前提是启用作业恢复机制,有参数mapred.jobtracker.restart.recover配置,具体地内容可以参考我的blog源码分析系列第4篇)。在JobTracker重启过程中各个TaskTracker依旧还活着的,此时通知RecoveryManager该TaskTracker已经重新连接,并且从recoveryManager中remove掉。另外的一种情况,非初次连接并且针对该TaskTracker的上次的心跳应答对象信息还存在,则判断JobTracker端的上一次对该TaskTracker的心跳应答ID是否与该TaskTracker带过来的应答ID一致,如果不一致,属于丢失心跳情况,返回上一次心跳信息给该TaskTracker,这样可以防止处理重复的心跳请求你。这部分的容错校验比较复杂步骤比较多,想整体了解还是看上面的流程图吧。贴一下这段代码:

...if (initialContact != true) {//该TaskTracker不是是第一次连接JobTracker      // If this isn't the 'initial contact' from the tasktracker,      // there is something seriously wrong if the JobTracker has      // no record of the 'previous heartbeat'; if so, ask the      // tasktracker to re-initialize itself.      //TaskTracker非第一次连接JobTracker,并且上一次心跳应答对象为空      if (prevHeartbeatResponse == null) {        // This is the first heartbeat from the old tracker to the newly        // started JobTracker        if (hasRestarted()) {//判断JobTracker是否重启过          addRestartInfo = true;          // inform the recovery manager about this tracker joining back          recoveryManager.unMarkTracker(trackerName);        } else {//心跳丢失,向该TaskTracker返回重新进行初始化命令          // Jobtracker might have restarted but no recovery is needed          // otherwise this code should not be reached          LOG.warn("Serious problem, cannot find record of 'previous' " +                   "heartbeat for '" + trackerName +                   "'; reinitializing the tasktracker");          return new HeartbeatResponse(responseId,              new TaskTrackerAction[] {new ReinitTrackerAction()});        }      } else {//TaskTracker非第一次连接JobTracker,并且上一次心跳应答对象不为空                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         // It is completely safe to not process a 'duplicate' heartbeat from a        // {@link TaskTracker} since it resends the heartbeat when rpcs are        // lost see {@link TaskTracker.transmitHeartbeat()};        // acknowledge it by re-sending the previous response to let the        // {@link TaskTracker} go forward.        //判断JobTracker端的上一次对该TaskTracker的心跳应答ID是否与该TaskTracker带过来的应答ID一致        if (prevHeartbeatResponse.getResponseId() != responseId) {//不一致,属于丢失心跳情况,返回上一次心跳信息给该TaskTracker          LOG.info("Ignoring 'duplicate' heartbeat from '" +              trackerName + "'; resending the previous 'lost' response");          return prevHeartbeatResponse;        }      }    }...

(4)过程四,处理心跳请求。

处理心跳请求环节是整个JobTracker心跳的机制的核心。这里最主要的任务就是根据心跳带来的TraskTracker状态信息去更新当前集群的资源使用情况、Task信息和节点健康信息。代码如下:

// Process this heartbeat   short newResponseId = (short)(responseId + 1);//心跳相应加1   //记录心跳的发送时间,以便发现在一定时间内未发送心跳的TaskTracker,并将之标注为死亡状态,以后不再向其分配新的Tasks   status.setLastSeen(now);   if (!processHeartbeat(status, initialContact, now)) {     //如果存在针对该TaskTracker的上一次心跳应答时,将其清空以便将新的应答放进来     if (prevHeartbeatResponse != null) {       trackerToHeartbeatResponseMap.remove(trackerName);     }     //当处理心跳请求失败时将返回“重新初始化”给该TaskTracker     return new HeartbeatResponse(newResponseId,                  new TaskTrackerAction[] {new ReinitTrackerAction()});   }

其他的小的细节可以自己看代码和注释,这个主要讲讲processHeartbeat方法,跟进去:

/**   * Process incoming heartbeat messages from the task trackers.   */  private synchronized boolean processHeartbeat(                                 TaskTrackerStatus trackerStatus,                                 boolean initialContact,                                 long timeStamp) throws UnknownHostException {    getInstrumentation().heartbeat();    String trackerName = trackerStatus.getTrackerName();    synchronized (taskTrackers) {      synchronized (trackerExpiryQueue) {         //通过旧的以及当前心跳传来的TaskTracker状态信息更新TaskTracker对象记录的状态资源信息,seenBefore返回值代表是否在JobTracker端的上一次存在的该TaskTracker的状态信息(true表示存在)        boolean seenBefore = updateTaskTrackerStatus(trackerName,        TaskTracker taskTracker = getTaskTracker(trackerName);        if (initialContact) {//初次心跳连接          // If it's first contact, then clear out          // any state hanging around          if (seenBefore) {//在JobTracker端,存在上一次该TaskTracker的状态信息,则将其清空            lostTaskTracker(taskTracker);          }        } else {//非初次连接          // If not first contact, there should be some record of the tracker          if (!seenBefore) {//在JobTracker端,不存在上一次该TaskTracker的状态信息            LOG.warn("Status from unknown Tracker : " + trackerName);            updateTaskTrackerStatus(trackerName, null);//更新该TaskTrakcer的资源状况            return false;          }        }        if (initialContact) {          // if this is lost tracker that came back now, and if it's blacklisted          // increment the count of blacklisted trackers in the cluster          if (isBlacklisted(trackerName)) {//判断该TaskTracker是否在黑名单中            faultyTrackers.incrBlacklistedTrackers(1);          }          // This could now throw an UnknownHostException but only if the          // TaskTracker status itself has an invalid name          addNewTracker(taskTracker);        }      }    }    updateTaskStatuses(trackerStatus);//更新Task信息    updateNodeHealthStatus(trackerStatus, timeStamp);//更新节点健康状态信息    return true;  }

该方法处理新的心跳请求,简单地来说就是通过旧的TaskTracker状态信息和当前心跳带来的TaskTracker对象的状态信息去更新集群当前的资源信息。另外还会去更新该TraskTracker上的Tasks信息和更新发来当前心跳节点的健康状态信息。上面处理心跳信息的代码中,还会判断是否是TT和JT的初次连接,如果非初次连接并且上次的TaskTracker状态信息不存在则该次心跳处理失败直接返回false,对该TaskTracker下重新初始化命令,让其重启初始化。再看看具体的更新方法updateTaskTrackerStatus

private boolean updateTaskTrackerStatus(String trackerName,                                          TaskTrackerStatus status) {    TaskTracker tt = getTaskTracker(trackerName);    //获得上一次TT在JT中的状态信息对象    TaskTrackerStatus oldStatus = (tt == null) ? null : tt.getStatus();    //根据上一次TT在JT中的状态信息对象存在时,更新资源信息。    if (oldStatus != null) {      totalMaps -= oldStatus.countMapTasks();      totalReduces -= oldStatus.countReduceTasks();      occupiedMapSlots -= oldStatus.countOccupiedMapSlots();      occupiedReduceSlots -= oldStatus.countOccupiedReduceSlots();      getInstrumentation().decRunningMaps(oldStatus.countMapTasks());      getInstrumentation().decRunningReduces(oldStatus.countReduceTasks());      getInstrumentation().decOccupiedMapSlots(oldStatus.countOccupiedMapSlots());      getInstrumentation().decOccupiedReduceSlots(oldStatus.countOccupiedReduceSlots());...    //根据当前心跳传过来的TaskTracker状态信息对象更新当前资源信息。    if (status != null) {      totalMaps += status.countMapTasks();      totalReduces += status.countReduceTasks();      occupiedMapSlots += status.countOccupiedMapSlots();      occupiedReduceSlots += status.countOccupiedReduceSlots();      getInstrumentation().addRunningMaps(status.countMapTasks());      getInstrumentation().addRunningReduces(status.countReduceTasks());      getInstrumentation().addOccupiedMapSlots(status.countOccupiedMapSlots());      getInstrumentation().addOccupiedReduceSlots(status.countOccupiedReduceSlots());... getInstrumentation().setMapSlots(totalMapTaskCapacity); getInstrumentation().setReduceSlots(totalReduceTaskCapacity);...

(5)过程五,为TaskTracker下达命令过程。

   该过程主要是遍历JobTracker中的各种对象映射集合(可以看我源码分析第二篇文章的第三点),取得相关命令,然后为TaskTracker下达命令。

...// Check for tasks to be killed    List
 killTasksList = getTasksToKill(trackerName);    if (killTasksList != null) {      actions.addAll(killTasksList);    }                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       // Check for jobs to be killed/cleanedup    List
 killJobsList = getJobsForCleanup(trackerName);    if (killJobsList != null) {      actions.addAll(killJobsList);    }    // Check for tasks whose outputs can be saved    List
 commitTasksList = getTasksToSave(status);    if (commitTasksList != null) {      actions.addAll(commitTasksList);    }    // calculate next heartbeat interval and put in heartbeat response    int nextInterval = getNextHeartbeatInterval();//生成下一次心跳汇报时间    response.setHeartbeatInterval(nextInterval);    response.setActions(                        actions.toArray(new TaskTrackerAction[actions.size()]));                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      // check if the restart info is req    if (addRestartInfo) {      response.setRecoveredJobs(recoveryManager.getJobsToRecover());    }                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          // Update the trackerToHeartbeatResponseMap    trackerToHeartbeatResponseMap.put(trackerName, response);    // Done processing the hearbeat, now remove 'marked' tasks    removeMarkedTasks(trackerName);...

详细说说各种命令:

  • ReinitTrackerAction,重新初始化命令

上面的过程分析中,已经多次提到过ReinitTrackerAction命令,它在容错性检测出错的时候往往会被执行,JobTracker发送这个命令对TaskTracker重新进行初始化。在JobTracker端的整个心跳处理过程中,运行ReinitTrackerAction命令大体有2种(在上面的分析过程中都有提到,可以回头看看):

  • 丢失上次心跳应道信息:JobTracker保存了向每个TaskTracker发送的最近心跳应答信息。如果JobTracker不是刚刚重启并且TaskTracker不是初次链接JobTracker,而最近的心跳应答信息  丢失,这会造成不一致情况。

  • 丢失TaskTracker的状态信息:JobTracker接收到任何一个心跳信息后,都会将其TaskTracker的对象信息(TaskTrackerStatus类)保存起来。如果一个TaskTracker 非初次连接JobTracker但是其状态信息又不存在,造成不一致的情况。

  • LaunchTaskAction

该类封装了TaskTracker分配的新任务,新任务的类型分两种:计算型任务和辅助型任务。其中计算型任务负责处理实际数据的任务,也就是我们常说的Reduce和Map任务,他们由专门的任务调度器去调用(默认是FIFO调度器)。另外辅助型任务包括:job-setup task、job-cleanup task和task-cleanup task三种。task-cleanup task的主要作用是清理失败的map或者reduce任务的部分结果数据,它由jobTracker直接调度,而且其调度的优先级比map和reduce任务都要高。至于job-setup task和job-cleanup task任务的作用我认为比较简单,主要是标志map和reduce作业运行开始和运行结束的同步标志。jobTracker为一个有空闲slot并且不在黑名单中的taskTracker分配任务的选择顺序是先辅助型任务然后计算型任务,这点我们可以从heartbeat方法看出来:

HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);   List
 actions = new ArrayList
();   //faultyTrackers队列,当该TaskTracker上有任务运行失败,那么其对应的host会被添加到此列表中,   //同时错误信息以及发生错误的时间、错误次数也会被记录起来.按一定的规则(没有详细研究)组成黑名单和灰名单。   boolean isBlacklisted = faultyTrackers.isBlacklisted(status.getHost());   // Check for new tasks to be executed on the tasktracker   if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) {       //获得所有TT状态信息     TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName);     if (taskTrackerStatus == null) {       LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);     } else {       //有限选择辅助型任务,选择优先顺序为:job-cleanup task、task-cleanup task和job-setup task       //这样可以让运行完成的作业快速结束,新提交的作业立刻进入到运行状态       List
 tasks = getSetupAndCleanupTasks(taskTrackerStatus);      //如果没有辅助型任务就选择执行计算行任务       if (tasks == null ) {         //直接有任务调度器去选择一个或者多个计算行任务         tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName));       }       if (tasks != null) {         for (Task task : tasks) {           expireLaunchingTasks.addNewTask(task.getTaskID());           if(LOG.isDebugEnabled()) {             LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID());           }           actions.add(new LaunchTaskAction(task));         }       }     }
  • KillTaskAction

    该类封装了TaskTracker需要杀死的任务。TaskTracker收到该任务后会杀掉对应的任务以及清理掉工作目录和释放slot。JobTracker主要在下面情况下会向TrackerTracker发送KillTaskAction命令(不全):

  (1)整个作业运行失败,其他正在运行的Task也会被杀死。

  (2)启动推测执行任务机制(默认启动)后,同一份数据可能有2个Task Attempt处理。只要一个Task Attempt运行成功后,两外一个处理同一份数据的任务就会被kill掉。

  (3)TaskTracker在指定时间内没有向JobTracker汇报心跳,则JobTracker会任务它已经死掉,那么在它上面运行的所有task就会被标注为死亡。

  (4)用户自己直接调用命令"bin/hadoop job -kill-task"或者"bin/hadoop job -fail-task",使一个或者多个任务失败。

  • KillTaskAction

KillTaskAction封装了TaskTracker待清理的作业。当TaskTracker接收到KillTaskAction命令后会清理此作业工作目录。下面几个主要的场景会出现这种情况:

  (1)用户主动杀死作业,调用"bin/hadoop job -kill"或者"bin/hadoop job -fail"命令。

  (2)作业运行完毕,通知TaskTracker清理作业目录。

(3)作业运行失败,通知TaskTracker清理作业目录。

  • CommitTaskAction

    该类封装了TaskTracker需要提交的任务。为了防止同一个TaskInProgress的两个同时运行的Task Attempt(例如打开推测执行功能,一个任务可能有多个备份任务)同时打开一个文件或者往同一个文件中写数据而生产冲突,Hadoop让每个Task Attempt写到单独一个文件中(以taskAttemptID命名,比如attempt_201402171212_0009_r_000000_0)中。通常而言,Hadoop让每个Task Attempt将计算结果写到临时目录${mapred.output.dir}/_tmporary/_${taskID}中,当某个Task Attempt运行成功后,再将运行结果转移到最终目录${mapred.output.dir}中。Hadoop将一个成功运行的Task Attempt结果文件从临时目录移动到最终目录的过程,称为“任务提交”。当TaskProgress中的一个任务被提交后,其他任务将被杀死,同时意味着该TaskTracker运行完成。

集群规模调整动态调整心跳间隔    

    从在上一篇blog中分析TaskTracker端的心跳机制中,我们知道TaskTracker会通过“外带心跳”和根据Task完成数目来动态调整心跳间距。 JobTracker端的心跳处理也有相应的机制以用来调整TaskTracker心跳的汇报时间间隔。这种调整的意义在于可以提高系统的吞吐量,避免心跳间隔过小造成JobTracker要进行并发处理的请求压力大,和心跳间隔过大造成资源浪费不能是TaskTracker及时汇报资源信息给JobTracker,进而为之分配任务。JobTracker监控者整个集群的规模情况,可以通过这个动态地调整每个TaskTracker下次汇报心跳的时间间隔,并通过心跳机制告诉TaskTracker。        最重要的是这种机制还是可控制的,JobTracker允许用户通过参数设置心跳的时间间隔加速比例,简单说就是每增加mapred.heartbeats.in.second(最小是1,默认为100)个节点,心跳时间间隔就增加mapreduce.jobtracker.heartbeats.scaling.factor(最小是0.01,默认1)秒,简单说默认情况下增加100个节点心跳就增加1秒。但是,为了防止用户参数设置不合理而对JobTracker产生较大负载,JobTracker要求心跳间隔大于或者等于3秒。具体算法看代码,JobTracker类的getNextHeartbeatInterval方法:

/**   * Calculates next heartbeat interval using cluster size.   * Heartbeat interval is incremented by 1 second for every 100 nodes by default.   * @return next heartbeat interval.   */  public int getNextHeartbeatInterval() {    // get the no of task trackers    //获得当前集群的TaskTracker数目    int clusterSize = getClusterStatus().getTaskTrackers();    int heartbeatInterval =  Math.max(                                (int)(1000 * HEARTBEATS_SCALING_FACTOR *                                      Math.ceil((double)clusterSize /                                                NUM_HEARTBEATS_IN_SECOND)),                                HEARTBEAT_INTERVAL_MIN) ;    return heartbeatInterval;  }

其中HEARTBEATS_SCALING_FACTOR对应上面所说的mapreduce.jobtracker.heartbeats.scaling.factor参数,NUM_HEARTBEATS_IN_SECOND对应着mapred.heartbeats.in.second参数,HEARTBEAT_INTERVAL_MIN默认值为3 * 1000。

参考文献:

[1]《Hadoop技术内幕:深入解析MapReduce架构设计与实现原理》

[2]

转载地址:http://dpyql.baihongyu.com/

你可能感兴趣的文章
poj3067 Japan(树状数组)
查看>>
[java面试]关于多态性的理解
查看>>
常见的MIME类型
查看>>
Leetcode_Wildcard Matching
查看>>
docker 私有仓库简易搭建
查看>>
WCF系列教程之客户端异步调用服务
查看>>
P1201 [USACO1.1]贪婪的送礼者Greedy Gift Givers
查看>>
Android自带的分享功能案例
查看>>
Android广播机制分析
查看>>
Android ADB工具-截图和录制视频(五)
查看>>
PHP/Javascript 数组定义 及JSON中的使用 ---OK
查看>>
php中urldecode()和urlencode()起什么作用啊
查看>>
UVA 11542 Square 高斯消元 异或方程组求解
查看>>
Nginx的内部(进程)模型
查看>>
基于设备树的controller学习(1)
查看>>
递归--练习1--noi3089爬楼梯
查看>>
慢慢过渡到个人博客
查看>>
深度学习 Deep Learning UFLDL 最新Tutorial 学习笔记 4:Debugging: Gradient Checking
查看>>
【转】spring boot web相关配置
查看>>
oc53--autorelease注意事项
查看>>