文章出處

在大型企業中,由于業務復雜、數據量大、數據格式不同、數據交互格式繁雜,并非所有的操作都能通過交互界面進行處理。而有一些操作需要定期讀取大批量的數據,然后進行一系列的后續處理。這樣的過程就是“批處理”。

批處理應用通常有以下特點:

  • 數據量大,從數萬到數百萬甚至上億不等;
  • 整個過程全部自動化,并預留一定接口進行自定義配置;
  • 這樣的應用通常是周期性運行,比如按日、周、月運行;
  • 對數據處理的準確性要求高,并且需要容錯機制、回滾機制、完善的日志監控等。

什么是Spring batch

Spring batch是一個輕量級的全面的批處理框架,它專為大型企業而設計,幫助開發健壯的批處理應用。Spring batch為處理大批量數據提供了很多必要的可重用的功能,比如日志追蹤、事務管理、job執行統計、重啟job和資源管理等。同時它也提供了優化和分片技術用于實現高性能的批處理任務。

它的核心功能包括:

  • 事務管理
  • 基于塊的處理過程
  • 聲明式的輸入/輸出操作
  • 啟動、終止、重啟任務
  • 重試/跳過任務
  • 基于Web的管理員接口

筆者所在的部門屬于國外某大型金融公司的CRM部門,在日常工作中我們經常需要開發一些批處理應用,對Spring Batch有著豐富的使用經驗。近段時間筆者特意總結了這些經驗。

使用Spring Batch 3.0以及Spring Boot

在使用Spring Batch時推薦使用最新的Spring Batch 3.0版本。相比Spring Batch2.2,它做了以下方面的提升:

  • 支持JSR-352標準
  • 支持Spring4以及Java8
  • 增強了Spring Batch Integration的功能
  • 支持JobScope
  • 支持SQLite

支持Spring4和Java8是一個重大的提升。這樣就可以使用Spring4引入的Spring boot組件,從而開發效率方面有了一個質的飛躍。引入Spring-batch框架只需要在build.gradle中加入一行代碼即可:

1
compile("org.springframework.boot:spring-boot-starter-batch")

而增強Spring Batch Integration的功能后,我們就可以很方便的和Spring家族的其他組件集成,還可以以多種方式來調用job,也支持遠程分區操作以及遠程塊處理。

而支持JobScope后我們可以隨時為對象注入當前Job實例的上下文信息。只要我們制定Bean的scope為job scope,那么就可以隨時使用jobParameters和jobExecutionContext等信息。

1
2
3
4
5
6
7
<bean id="..." class="..." scope="job">
    <property name="name" value="#{jobParameters[input]}" />
</bean>
                
<bean id="..." class="..." scope="job">
    <property name="name" value="#{jobExecutionContext['input.name']}.txt" />
</bean>

使用Java Config而不是xml的配置方式

之前我們在配置job和step的時候都習慣用xml的配置方式,但是隨著時間的推移發現問題頗多。

  • xml文件數急劇膨脹,配置塊長且復雜,可讀性很差;
  • xml文件缺少語法檢查,有些低級錯誤只有在運行集成測試的時候才能發現;
  • 在xml文件中進行代碼跳轉時IDE的支持力度不夠;

我們漸漸發現使用純Java類的配置方式更靈活,它是類型安全的,而且IDE的支持更好。在構建job或step時采用的流式語法相比xml更加簡潔易懂。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

        @Bean
        public Step step(){
                return stepBuilders.get("step")
                                .<Partner,Partner>chunk(1)
                                .reader(reader())
                                .processor(processor())
                                .writer(writer())
                                .listener(logProcessListener())
                                .faultTolerant()
                                .skipLimit(10)
                                .skip(UnknownGenderException.class)
                                .listener(logSkipListener())
                                .build();
        }

在這個例子中可以很清楚的看到該step的配置,比如reader/processor/writer組件,以及配置了哪些listener等。

本地集成測試中使用內存數據庫

Spring batch在運行時需要數據庫支持,因為它需要在數據庫中建立一套schema來存儲job和step運行的統計信息。而在本地集成測試中我們可以借助Spring batch提供的內存Repository來存儲Spring batch的任務執行信息,這樣即避免了在本地配置一個數據庫,又可以加快job的執行。

1
2
3
4
<bean id="jobRepository"
  class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
    <property name="transactionManager" ref="transactionManager"/>
</bean>

我們在build.gradle中加入對hsqldb的依賴:

1
runtime(‘org.hsqldb:hsqldb:2.3.2’)

然后在測試類中添加對DataSource的配置。

1
2
3
4
5
6
7
@EnableAutoConfiguration
@EnableBatchProcessing
@DataJpaTest
@Import({DataSourceAutoConfiguration.class, BatchAutoConfiguration.class})
public class TestConfiguration {
    
}

并且在applicaton.properties配置中添加初始化Database的配置:

1
spring.batch.initializer.enable=true

合理的使用Chunk機制

Spring batch在配置Step時采用的是基于Chunk的機制。即每次讀取一條數據,再處理一條數據,累積到一定數量后再一次性交給writer進行寫入操作。這樣可以最大化的優化寫入效率,整個事務也是基于Chunk來進行。

當我們在需要將數據寫入到文件、數據庫中之類的操作時可以適當設置Chunk的值以滿足寫入效率最大化。但有些場景下我們的寫入操作其實是調用一個web service或者將消息發送到某個消息隊列中,那么這些場景下我們就需要設置Chunk的值為1,這樣既可以及時的處理寫入,也不會由于整個Chunk中發生異常后,在重試時出現重復調用服務或者重復發送消息的情況。

使用Listener來監視job執行情況并及時做相應的處理

Spring batch提供了大量的Listener來對job的各個執行環節進行全面的監控。

在job層面Spring batch提供了JobExecutionListener接口,其支持在Job開始或結束時進行一些額外處理。在step層面Spring batch提供了StepExecutionListener,ChunkListener,ItemReadListener,ItemProcessListener,ItemWriteListener,SkipListener等接口,同時對Retry和Skip操作也提供了RetryListener及SkipListener。

通常我們會為每個job都實現一個JobExecutionListener,在afterJob操作中我們輸出job的執行信息,包括執行時間、job參數、退出代碼、執行的step以及每個step的詳細信息。這樣無論是開發、測試還是運維人員對整個job的執行情況了如指掌。

如果某個step會發生skip的操作,我們也會為其實現一個SkipListener,并在其中記錄skip的數據條目,用于下一步的處理。

實現Listener有兩種方式,一種是繼承自相應的接口,比如繼承JobExecutionListener接口,另一種是使用annoation(注解)的方式。經過實踐我們認為使用注解的方式更好一些,因為使用接口你需要實現接口的所有方法,而使用注解則只需要對相應的方法添加annoation即可。

下面的這個類采用了繼承接口的方式,我們看到其實我們只用到了第一個方法,第二個和第三個都沒有用到。但是我們必須提供一個空的實現。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class CustomSkipListener implements SkipListener<String, String> {
    @Override
    public void onSkipInRead(Throwable t) {
        // business logic
    }

    @Override
    public void onSkipInWrite(String item, Throwable t) {
        // no need
    }

    @Override
    public void onSkipInProcess(String item, Throwable t) {
        // no need
    }
}

而使用annoation的方式可以簡寫為:

1
2
3
4
5
6
7
public class CustomSkipListener {
    
    @OnSkipInRead
    public void onSkipInRead(Throwable t) {
        // business logic
    }
}

使用Retry和Skip增強批處理工作的健壯性

在處理百萬級的數據過程過程中難免會出現異常。如果一旦出現異常而導致整個批處理工作終止的話那么會導致后續的數據無法被處理。Spring Batch內置了Retry(重試)和Skip(跳過)機制幫助我們輕松處理各種異常。適合Retry的異常的特點是這些異常可能會隨著時間推移而消失,比如數據庫目前有鎖無法寫入、web服務當前不可用、web服務滿載等。所以對這些異常我們可以配置Retry機制。而有些異常則不應該配置Retry,比如解析文件出現異常等,因為這些異常即使Retry也會始終失敗。

即使Retry多次仍然失敗也無需讓整個step失敗,可以對指定的異常設置Skip選項從而保證后續的數據能夠被繼續處理。我們也可以配置SkipLimit選項保證當Skip的數據條目達到一定數量后及時終止整個Job。

有時候我們需要在每次Retry中間隔做一些操作,比如延長Retry時間,恢復操作現場等,Spring Batch提供了BackOffPolicy來達到目的。下面是一個配置了Retry機制、Skip機制以及BackOffPolicy的step示例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Bean
public Step step(){
    return stepBuilders.get("step")
            .<Partner,Partner>chunk(1)
            .reader(reader())
            .processor(processor())
            .writer(writer())
            .listener(logProcessListener())
            .faultTolerant()
            .skipLimit(10)
            .skip(UnknownGenderException.class)
            .retryLimit(5)
            .retry(ServiceUnavailableException.class)
            .backOffPolicy(backoffPolicy)
            .listener(logSkipListener())
            .build();
}

使用自定義的Decider來實現Job flow

在Job執行過程中不一定都是順序執行的,我們經常需要根據某個job的輸出數據或執行結果來決定下一步的走向。以前我們會把一些判斷放置在下游step中進行,這樣可能會導致有些step實際運行了,但其實并沒有做任何事情。比如一個step執行過程中會將失敗的數據條目記錄到一個報告中,而下一個step會判斷有沒有生成報告,如果生成了報告則將該報告發送給指定聯系人,如果沒有則不做任何事情。這種情況下可以通過Decider機制來實現Job的執行流程。在Spring batch 3.0中Decider已經從Step中獨立出來,和Step處于同一級別。

1
2
3
4
5
6
7
8
9
10
public class ReportDecider implements JobExecutionDecider {
    @Override
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
        if (report.isExist()) {
            return new FlowExecutionStatus(“SEND");
        }
        
        return new FlowExecutionStatus(“SKIP");
    }
}

而在job配置中可以這樣來使用Decider。這樣整個Job的執行流程會更加清晰易懂。

1
2
3
4
5
6
7
8
public Job job() {
    return new JobBuilder("petstore")
            .start(orderProcess())
            .next(reportDecider)
            .on("SEND").to(sendReportStep)
            .on("SKIP").end().build()
            .build()
}

采用多種機制加速Job的執行

批處理工作處理的數據量大,而執行窗口一般又要求比較小。所以必須要通過多種方式來加速Job的執行。一般我們有四種方式來實現:

  • 在單個step中多線程執行任務
  • 并行執行不同的Step
  • 并行執行同一個Step
  • 遠程執行Chunk任務

在單個step多線程執行任務可以借助于taskExecutor來實現。這種情況適合于reader、writer是線程安全的并且是無狀態的場景。我們還可以設置線程數量。

1
2
3
4
5
6
public Step step() {
    return stepBuilders.get("step")
            .tasklet(tasklet)
            .throttleLimit(20)
            .build();
}

上述示例中的tasklet需要實現TaskExecutor,Spring Batch提供了一個簡單的多線程TaskExecutor供我們使用:SimpleAsyncTaskExecutor。

并行執行不同的Step在Spring batch中很容易實現,以下是一個示例:

1
2
3
4
5
6
7
public Job job() {
    return stepBuilders.get("parallelSteps")
            .start(step1)
            .split(asyncTaskExecutor).add(flow1, flow2)
            .next(step3)
            .build();
}

在這個示例中我們先執行step1,然后并行執行flow1和flow2,最后再執行step3。

Spring batch提供了PartitionStep來實現對同一個step在多個進程中實現并行處理。通過PartitonStep再配合PartitionHandler可以將一個step擴展到多個Slave上實現并行運行。

遠程執行Chunk任務則是將某個Step的processer操作分割到多個進程中,多個進程通過一些中間件進行通訊(比如采用消息的方式)。這種方式適合于Processer是瓶頸而Reader和Writer不是瓶頸的場景。

結語


Spring Batch對批處理場景進行了合理的抽象,封裝了大量的實用功能,使用它來開發批處理應用可以達到事半功倍的效果。在使用的過程中我們仍需要堅持總結一些最佳實踐,從而能夠交付高質量的可維護的批處理應用,滿足企業級應用的苛刻要求。


文章列表


不含病毒。www.avast.com
arrow
arrow
    全站熱搜
    創作者介紹
    創作者 大師兄 的頭像
    大師兄

    IT工程師數位筆記本

    大師兄 發表在 痞客邦 留言(0) 人氣()