Java EE 7 – Batch Applications for the Java Platform(二) by kapa | CodeData
top

Java EE 7 – Batch Applications for the Java Platform(二)

分享:

Java EE 7 – Batch Applications for the Java Platform(一) << 前情

前言

本次連載爲 Java EE 批次處理四個主要功能中的「流程控制」、「Checkpoint」做進一步介紹。

流程控制

在上一次的介紹中有提過流程控制的幾個元件,Flow、Split 和 Decision:

Flow 是將一群 Step 包成一個運作單位的方法,尤其遇到多個 Step 時,對於流程控制會有幫助。Split 則是用來定義可以並行計算的多個不同 Flow,來善用多執行緒的優勢來執行多個可同時執行的工作。 Decison 則是可以依照 Flow 執行狀況決定接下來要執行的 Flow 或結束。

batch flow

先將上一次介紹的範例再加上一個 step,將輸出的結果上傳到遠端的主機上。這次不使用 Chunk Oriented Processing 方式,而是改採用 Batch 方式來實作上傳部分。 job 的設定部分就會像以下範例,先完成計算( compute )後的下一步就是上傳檔案( upload )。

<?xml version="1.0" encoding="UTF-8"?>
<job id="simplejob" xmlns="http://xmlns.jcp.org/xml/ns/javaee"version="1.0">
    <step id="compute" next="upload">
        <chunk>
            <reader ref="simpleJobReader"/>
            <processor ref="simpleJobProcessor"/>
            <writer ref="simpleJobWriter"/>
        </chunk>
    </step>
    <step id="upload">
       <batchlet ref="uploader" />
    </step>
</job>

Batch 跟 Chunk 不同在於,Chunk 適合在需要一筆一筆處理資料時使用,例如讀取檔案或資料庫中的資料,並且可以逐一處理內容時;而 Batch 則適用於非資料筆數相關,需要長時間執行的動作,例如執行作業系統的命令,(解)壓縮或是上下傳檔案。以下為本次用於上傳的範例,使用Apache HttpComponents 來完成上傳檔案的 Http Form 的網站。Batch 常用在長時間的不可分割行為上,所以繼承 AbstractBatchlet 後只有一個最重要的 process 方法需要實作。最後回傳的字串主要是要決定整個處理的狀態,在這邊先不管是否上傳成功,都回 COMPLETED 來當作正確完成。

@Named 
@Dependent 
public class Uploader extends AbstractBatchlet { 
    @Override 
    public String process() throws Exception { 
        Response response = Request.Post("http://localhost:8080/sample") 
                 .body(MultipartEntityBuilder.create() 
                         .addBinaryBody("result", new File("/tmp/output.txt"))
                         .build()) 
                 .execute(); 
        return "COMPLETED"; 
    } 
}

所以有多個動作要執行時,就像上面 job xml 的設定只要一個接著一個就可以。

Flow

在這邊如果將上述的兩個 step 合併成一個單位,那就需要使用 flow 來達成,例如下面這樣。

<?xml version="1.0" encoding="UTF-8"?>
<job id="simplejob" xmlns="http://xmlns.jcp.org/xml/ns/javaee"version="1.0">
    <flow id="computeFlow" next="step2">
        <step id="compute" next="upload">
            <chunk>
                <reader ref="simpleJobReader"/>
                <processor ref="simpleJobProcessor"/>
                <writer ref="simpleJobWriter"/>
            </chunk>
        </step>
        <step id="upload">
           <batchlet ref="uploader" />
        </step>
    </flow>
    <step id="step2"...</step> 
    <flow id="flow2"...</flow>
</job>

flow 跟 step 一樣可以設定 next 指定下一個行為,另外 flow 下面除了可以放入 step 以外,還可以放同樣是 flow ,以及之後會介紹到的 split 和 decision。

Split

當有複數 flow 可以同時執行時,可以使用 split 將多個 flow 整理在一起。例如在處理資料完畢後,需要將資料用不同協定傳到多個不同的主機。

<?xml version="1.0" encoding="UTF-8"?>
<job id="simplejob" xmlns="http://xmlns.jcp.org/xml/ns/javaee"version="1.0">
    <step id="step1"...</step>
    <split id="split1" next="step2"> 
        <flow id="flow1"> 
            <step id="uploadToA"> 
                <batchlet ref="upload1"/> 
            </step> 
        </flow> 
        <flow id="flow2"> 
            <step id="uploadToB"> 
                <batchlet ref="upload2"/> 
            </step> 
        </flow> 
    </split>
    <step id="step2"...</step>
</job>

執行環境便會依照 split 的設定並行執行,因此像上面的範例便會同時執行 uploadToA 和 uploadToB 兩個工作,當 split 下的工作都完成以後便會執行下一個設定的工作。以上範例可以了解,split 本身並不困難,只要將可以同時執行的行為放在 split 下就可以了。

Decision

關於流程控制,規格中定義了 next, fail, end 和 stop 四種元件,這四種元件都有一個屬性 on,當收到 on 所指定的狀態字串( exit status )時便執行定義的行為。next 設定要導到的下一步(例如 step );fail 將整個 job 設定為 FAILED 並結束;end 將整個 job 設定為 COMPLETED 並結束;stop比較特別是除了將整個 job 設定為 STOPPED 並結束外,還另外必須設定 restart 屬性,當 job 重啟時會依照 restart  所設定的單位( step, flow, split )開始執行。

以上四種元件放在 step, split 和 flow 之下,並依照執行結果做流程控制。下面是一個 step 下設定流程的範例。在 chunk 過程中如果結果為 status1 那麼接著就會到 step2 執行,而如果是 status2 和 status3 則是結束並設定 job 的狀態為 FAILED 或是 STOPPED。

<?xml version="1.0" encoding="UTF-8"?>
<job id="simplejob" xmlns="http://xmlns.jcp.org/xml/ns/javaee"version="1.0">
    <step id="step1" >
        <chunk> 
            <reader ref="simpleJobReader"/> 
            <processor ref="simpleJobProcessor"/> 
            <writer ref="simpleJobWriter"/> 
        </chunk>
        <next on="status1" to="step2"/>
        <fail on="status2"/>
        <end  on="status3"/>
    </step>
    <step id="step2"...</step>
</job>

Batchlet 的 process 方法可以直接回傳狀態字串,而如果是用 Chunk 的方式來設計,那麼就得將 StepContext 注入到對應的 reader, processor 或 writer 後,依照狀況使用 setExitStatus 來設定狀態字串。

@Named 
@Dependent 
public class SimpleJobReader extends AbstractItemReader { 
    @Inject StepContext stepContext; 
    @Override public Object readItem() throws Exception { 
        ...
        if(...) {
            stepContext.setExitStatus("fail"); 
            ....
        }
        ....
    } 
}

除了上面這種方式外,另一個方式就是獨立出一個 Decider 來判斷。Decider 的  decide 方法會取得來源的執行結果,因此可以透過這些結果來控制流程。以下為 job xml 設定和程式示意,當來源的 step 的狀態為 FAILED 時便會回傳 status2 ,因此整個 job 會停下並設定為 COMPLETED。

<?xml version="1.0" encoding="UTF-8"?>
<job id="simplejob" xmlns="http://xmlns.jcp.org/xml/ns/javaee"version="1.0">
    <step id="step1" next="decider"...</step>
    <decision id="decider" ref="decider"> 
        <next on="status1" to="step2"/>
        <fail on="status2"/>
        <end  on="status3" />
        <stop on="status4" restart="step2"/>
    </decision>
    <step id="step2"...</step>
</job>
@Named 
@Dependent 
public class Decider1 implements Decider { 
    @Override public String decide(StepExecution[] executions) throws Exception { 
        ...
        if(executions[0].getExitStatus().equals("FAILED")) {
            return "status2"; 
        }
        ....
        return "status1";
    } 
}

Checkpoint

在上一次的介紹中有大略提過Checkpoint:

需要長時間運作的程式,最怕的就是執行失敗後必須重來,如果是秒到分鐘單位的運算,可能不會讓人覺得麻煩,但如果一個工作在執行小時為單位後必須重來,那麼就真的是浪費時間了。為了解決此問題,我們通常會將運作的過程,一個階段一個階段儲存起來,就算過程中發生意外,我們也可以從最後執行到的資料開始接著,而不需要重頭開始。

Chunk 的 ItemReader 和 ItemWriter 有 checkpointInfo 方法讓我們回傳相關資訊,每當 Chunk 完成時便呼叫 checkpointInfo 方法,然後在 open 時會傳入記錄的 checkpoint。。預設每 Chunk 的筆數為10筆,如果想要修改可以改變 job xml 中 chunk 的 item-count 屬性就可以了。

<?xml version="1.0" encoding="UTF-8"?>
<job id="simplejob" xmlns="http://xmlns.jcp.org/xml/ns/javaee"version="1.0">
    <step id="step1" >
        <chunk item-count="10"> 
            ... 
        </chunk>
    </step>
</job>

以下為一個簡單的記錄 Checkpoint 和重新執行後,從指定筆數開始讀取。

@Named 
@Dependent public class SimpleJobReader extends AbstractItemReader { 
    private BufferedReader inputReader; 
    private int count;

    @Override public void open(Serializable checkpoint) throws Exception { 
        inputReader = new BufferedReader(new FileReader("/tmp/input.txt")); 
        if(checkpoint == null){
            return;
        }
        count = (int) checkpoint;
        for(int i = 0; i < count; ++i){
            inputReader.readLine();
        }
    } 
    @Override public void close() throws Exception { 
        inputReader.close(); 
    } 

    @Override public Object readItem() throws Exception { 
        String line = inputReader.readLine(); 
        if(line == null) { 
            return null; 
        }
        count++;
        String[] fields = line.split(","); 
        return fields[1]; 
    } 
    @Override public Serializable checkpointInfo() throws Exception { 
        return count;
    }   
}

Checkpoint 的設定除了上述的 item-count 以外,也有另一種方式來設定是否該設定 checkpoint。將 job xml 中 chunk 的 checkpoint-policy 設定為 custom,並新增一個實作 CheckpointAlgorithm 的類別和對應的設定來達到複雜一點的 checkpoint 判斷。

結尾

本次稍微再詳細說明了 JSR-352 的其中兩個主要功能,下一次將會對 Batch 剩下的兩個重要功能做介紹。

後續 >> Java EE 7 – Batch Applications for the Java Platform(三)

分享:
按讚!加入 CodeData Facebook 粉絲群

相關文章

留言

留言請先。還沒帳號註冊也可以使用FacebookGoogle+登錄留言

j4tony09/25

請問一下,在Decision 節的 Decider段落下面的xml範例。範列中decision node 的 屬性ref 值是否為decider class 名稱呢?

關於作者

愛好 Java 的工程師,前幾年主要開發 Web Application,最近則在接觸 Mobile App 開發。

熱門論壇文章

熱門技術文章