文章出處

Flink運行時之TaskManager執行Task:當一個任務被JobManager部署到TaskManager之后,它將會被執行。本篇我們將分析任務的執行細節。

submitTask方法分析

一個任務實例被部署所產生的實際影響就是JobManager會將一個TaskDeploymentDescriptor對象封裝在SubmitTask消息中發送給TaskManager。而處理該消息的入口方法是submitTask方法,它是TaskManager接收任務部署并啟動任務執行的入口方法,值得我們關注一下它的實現細節。

submitTask方法中的第一個關鍵點是它先構建一個Task對象:

val task = new Task(    tdd,    memoryManager,    ioManager,    network,    bcVarManager,    selfGateway,    jobManagerGateway,    config.timeout,    libCache,    fileCache,    runtimeInfo,    taskMetricGroup)

該Task封裝了其在TaskManager中執行時需要的一些關鍵對象。task對象將會被加入TaskManager中的一個ExecutionAttemptID與Task的Map中,如果發現該ExecutionAttemptID所對應的Task對象已存在于Map中,則將原先的Task實例重新放回到Map中,同時拋出異常:

val execId = tdd.getExecutionIdval prevTask = runningTasks.put(execId, task)if (prevTask != null) {    runningTasks.put(execId, prevTask)    throw new IllegalStateException("TaskManager already contains a task for id " + execId)}

如果一切正常,接下來就啟動線程并執行任務,接著發送應答消息進行回復:

task.startTaskThread()sender ! decorateMessage(Acknowledge)

submitTask方法比起JobManager的submitJob方法,邏輯和代碼量都相對簡單。我們會進一步分析兩個過程:

Task對象的構造方法 Task作為一個線程,其run方法的實現

首先關注的是Task的構造方法,Task作為TaskManager的啟動對象,其需要的參數基本都跟其執行有關,參數如下:

public Task(TaskDeploymentDescriptor tdd,            //任務描述符            MemoryManager memManager,                        //內存管理器    IOManager ioManager,                             //IO管理器    NetworkEnvironment networkEnvironment,           //網絡環境對象,處理網絡請求    BroadcastVariableManager bcVarManager,           //廣播變量管理器    ActorGateway taskManagerActor,                   //TaskManager對應的actor通信網關    ActorGateway jobManagerActor,                    //JobManager對應的actor通信網關    FiniteDuration actorAskTimeout,                  //actor響應超時時間    LibraryCacheManager libraryCache,                //用戶程序的Jar、類庫緩存             FileCache fileCache,                             //用戶定義的文件緩存,執行時需要    TaskManagerRuntimeInfo taskManagerConfig         //TaskManager運行時配置)

構造方法的第一段代碼是將TaskDeploymentDescriptor封裝的大量信息“轉交”給Task對象。

接下來會根據結果分區部署描述符ResultPartitionDeploymentDescriptor和輸入網關部署描述符InputGateDeploymentDescriptor來初始化結果分區以及輸入網關,其中結果分區是當前的task實例產生的,而輸入網關是用來從網絡上消費前一個任務的結果分區。首先看一下結果分區的初始化:

this.producedPartitions = new ResultPartition[partitions.size()];this.writers = new ResultPartitionWriter[partitions.size()];for (int i = 0; i < this.producedPartitions.length; i++) {       ResultPartitionDeploymentDescriptor desc = partitions.get(i);       ResultPartitionID partitionId = new ResultPartitionID(desc.getPartitionId(), executionId);       this.producedPartitions[i] = new ResultPartition(                 taskNameWithSubtaskAndId,                 jobId,                 partitionId,                 desc.getPartitionType(),                 desc.getEagerlyDeployConsumers(),                 desc.getNumberOfSubpartitions(),                 networkEnvironment.getPartitionManager(),                 networkEnvironment.getPartitionConsumableNotifier(),                 ioManager,                 networkEnvironment.getDefaultIOMode());       this.writers[i] = new ResultPartitionWriter(this.producedPartitions[i]);}

以上代碼主要的邏輯是循環初始化結果分區對象數組producedPartitions以及結果分區寫入器數組writers。結果分區對象初始化時,會根據ResultPartitionType的類型來判斷是創建阻塞式的子分區還是創建管道式的子分區,這涉及到數據傳輸的方式。ResultPartitionWriter是面向結果分區的運行時結果寫入器對象。

下面的代碼用于輸入網關的初始化:

this.inputGates = new SingleInputGate[consumedPartitions.size()];this.inputGatesById = new HashMap();for (int i = 0; i < this.inputGates.length; i++) {       SingleInputGate gate = SingleInputGate.create(                 taskNameWithSubtaskAndId, jobId, executionId, consumedPartitions.get(i), networkEnvironment);       this.inputGates[i] = gate;   inputGatesById.put(gate.getConsumedResultId(), gate);},>

輸入網關的初始化則是根據上游task產生的結果分區來進行挨個初始化。

最終它會為該任務的執行創建一個線程:

executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask);

其實Task類實現了Runnable接口,它的實例本身就可以被線程執行,然后它又在內部實例化了一個線程對象并保存了執行它自身的線程引用,進而獲得了對該線程的完全控制。比如,用startTaskThread方法來啟動執行Task的線程。Task線程的執行細節,我們將會在接下來進行分析。

從這里我們也能看到,每個任務的部署會產生一個Task對象,而一個Task對象恰好對應一個執行它的線程實例。

Task線程的執行

Task實現了Runnable接口,那么毫無疑問其run方法承載了Task被執行的核心邏輯。而之前,我們將會分析Task線程的執行流程。

首先,第一步先對Task的執行狀態進行轉換:

while (true) {    ExecutionState current = this.executionState;    //如果當前的執行狀態為CREATED,則對其應用CAS操作,將其設置為DEPLOYING狀態,如果設置成功,將退出while無限循環    if (current == ExecutionState.CREATED) {        if (STATE_UPDATER.compareAndSet(this, ExecutionState.CREATED, ExecutionState.DEPLOYING)) {            // success, we can start our work            break;        }    }    //如果當前執行狀態為FAILED,則發出最終狀態的通知消息,并退出run方法的執行    else if (current == ExecutionState.FAILED) {        notifyFinalState();        return;    }    //如果當前執行狀態為CANCELING,則對其應用cas操作,并將其修改為CANCELED狀態,如果修改成功則發出最終狀態通知消息,    //同時退出run方法的執行    else if (current == ExecutionState.CANCELING) {        if (STATE_UPDATER.compareAndSet(this, ExecutionState.CANCELING, ExecutionState.CANCELED)) {            notifyFinalState();            return;        }    }    //如果當前的執行狀態為其他狀態,則拋出異常    else {        throw new IllegalStateException("Invalid state for beginning of task operation");    }}

接下來,是對用戶代碼所打成的jar包的加載并生成對應的類加載器,同時獲取到程序的執行配置ExecutionConfig。根據類加載器以及用戶的可執行體在Flink中所對應的具體的實現類名來加載該類:

invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass);

Flink中所有類型的操作都有特定的可執行體,它們無一例外都是對AbstractInvokable類的擴展。每個的可執行體的名稱在生產JobGraph時就已確定。

緊接著的一個關鍵操作就是向網絡棧注冊該任務對象:

network.registerTask(this);

這個操作是為了讓Task之間可以基于網絡互相進行數據交換,包含了分配網絡緩沖、結果分區注冊等一系列內部操作,并且有可能會由于系統無足夠的內存而發生失敗。

然后會把各種配置、管理對象都打包到Task在執行時的統一環境對象Environment中,并將該環境對象賦予可執行體:

invokable.setEnvironment(env);

在此之后,對于有狀態的任務,如果它們的狀態不為空,則會對這些有狀態的任務進行狀態初始化:

SerializedValue> operatorState = this.operatorState;if (operatorState != null) {    if (invokable instanceof StatefulTask) {    try {        StateHandle state = operatorState.deserializeValue(userCodeClassLoader);        StatefulTask op = (StatefulTask) invokable;        StateUtils.setOperatorState(op, state);    }    catch (Exception e) {        throw new RuntimeException("Failed to deserialize state handle and "             + " setup initial operator state.", e);    }    }    else {        throw new IllegalStateException("Found operator state for a non-stateful task invokable");    }}<?>

通常什么情況下任務會有初始狀態呢?當任務并不是首次運行,比如之前發生過失敗從某個檢查點恢復時會從檢查點中獲取當前任務的狀態,在執行之前先進行初始化。

接下來,會將任務的執行狀態變更為RUNNING,并向觀察者以及TaskManager發送通知:

if (!STATE_UPDATER.compareAndSet(this, ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {    throw new CancelTaskException();}notifyObservers(ExecutionState.RUNNING, null);taskManager.tell(new UpdateTaskExecutionState(    new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING)));

然后將執行線程的類加載器設置為用戶代碼的類加載器,然后調用可執行體的invoke方法,invoke方法實現了每個可執行體所要執行的核心邏輯。

executingThread.setContextClassLoader(userCodeClassLoader);invokable.invoke();

invoke方法的執行是個分界點,在執行之前用戶邏輯還沒有被觸發執行;而該方法被執行之后,說明用戶邏輯已被執行完成。

然后對當前任務所生產的所有結果分區調用finish方法進行資源釋放:

for (ResultPartition partition : producedPartitions) {    if (partition != null) {        partition.finish();    }}

最后將任務的執行狀態修改為FINISHED,并發出通知:

if (STATE_UPDATER.compareAndSet(this, ExecutionState.RUNNING, ExecutionState.FINISHED)) {    notifyObservers(ExecutionState.FINISHED, null);}else {    throw new CancelTaskException();}

接下來在finally塊里進行一系列資源釋放操作。

最終的可執行體

Task是在TaskManager中執行任務的統一抽象,它的核心仍然是如何執行,而不是如何表述。比如,批處理任務和流處理任務,它們有很大的差別,但我們需要一種表述層面上的抽象,使得它們最終都能被Task所接收,然后得到執行。而該表述層面上的抽象即為AbstractInvokable。它是所有在TaskManager中真正被執行的主體。其類圖如下:

AbstractInvokable定義了一系列的“上下文”對象,同時提供了核心兩個方法:

invoke:該抽象方法是描述用戶邏輯的核心方法,最終在Task線程中被執行的就是該方法; cancel:取消執行用戶邏輯的方法,提供了默認為空的實現,用戶取消執行或者執行失敗會觸發該方法的調用;

跟Flink提供了流處理和批處理的API一致,AbstractInvokable也相應的具有兩個派生類:

StreamTask:所有流處理任務的基類,實現位于flink-streaming-java模塊中; BatchTask:所有批處理任務的基類,實現位于runtime模塊中;

無論是哪種形式的任務,在生成JobGraph階段就已經被確定并加入到JobVertex中:

public void setInvokableClass(Class invokable) {       Preconditions.checkNotNull(invokable);       this.invokableClassName = invokable.getName();       this.isStoppable = StoppableTask.class.isAssignableFrom(invokable);}

隨后被一直攜帶到Task類中,并通過反射的機制從特定的類加載器中創建其實例,最終調用其invoke方法執行:

private AbstractInvokable loadAndInstantiateInvokable(ClassLoader classLoader, String className)     throws Exception {       Class invokableClass;       try {              invokableClass = Class.forName(className, true, classLoader)                        .asSubclass(AbstractInvokable.class);       }   catch (Throwable t) {              throw new Exception("Could not load the task's invokable class.", t);       }       try {              return invokableClass.newInstance();       }   catch (Throwable t) {              throw new Exception("Could not instantiate the task's invokable class.", t);       }}

關于更多用戶邏輯的執行細節,我們后續會進行分析。

看文倉www.kanwencang.com網友整理上傳,為您提供最全的知識大全,期待您的分享,轉載請注明出處。
歡迎轉載:http://www.kanwencang.com/bangong/20170126/95347.html

文章列表


不含病毒。www.avast.com
arrow
arrow
    全站熱搜
    創作者介紹
    創作者 大師兄 的頭像
    大師兄

    IT工程師數位筆記本

    大師兄 發表在 痞客邦 留言(0) 人氣()