Java EE 7 – Batch Applications for the Java Platform(三)
|
Java EE 7 – Batch Applications for the Java Platform(二) << 前情 前言本次連載爲 Java EE 批次處理四個主要功能中的「例外處理」、「平行處理」做進一步介紹。 例外處理在第一篇的時候提過對不同的錯誤原因,就必須要有不同的處理方式:
JSR-352 提供了兩種方式來對應當遇到錯誤時該怎麼處理,遇到錯誤時可以設定「跳過」錯誤的資料,「重試」,或是不設定上述兩種直接讓整個 Job 停止,等之後排除後再重新執行 Job,以下將簡單介紹一下如何設定,以及設定後當 Job 出現錯誤時會產生的行為。 假設在處理資料時可能遇到的其中一個問題為資料格式不符,例如要處理的資料中原本預期會有值但不存在,需要為數字但結果是字串…等等。這邊修改第一篇的輸入,假設其中一個資料在當初產生時發生錯誤。 A1,1000,2 A2,2000,3 B2,???,1 . . 如果設定沒有改變,那麼當處理到「 ??? 」這筆資料時便會發生 NumberFormatException,可以發現我們的輸出結果檔案寫到一半便會停止,然後在 Application Server 上也會看到 Job 的狀態變成 FAILED。 跳過錯誤資料遇到就算重複在處理也沒用的狀況時,就是需要跳過資料的時候。這時我們可以在 Job 的 xml 內設定 <skippable-exception-classes>,當遇到會丟出設定的錯誤類型時,便會跳過該筆資料。 .
<chunk>
<reader ref="simpleJobReader"/>
<processor ref="simpleJobProcessor"/>
<writer ref="simpleJobWriter"/>
<skippable-exception-classes>
<include class="java.lang.NumberFormatException"/>
</skippable-exception-classes>
</chunk>
.
執行後便會發現除了錯誤那筆資料以外的結果都能順利寫出到檔案中,Job 狀態也會順利變成 COMPLETED。如果錯誤是發生在 Writer 呢?因為進入 Writer 寫出資料 writeItems 方法時已經是複數筆資料,所以會一次跳過複數筆結果。 我們也可以設定 Listener ,當遇到跳過時可以做一些處理, 總共有三種介面可以實作,分別為 SkipReadListener,SkipProcessListener 和 SkipWriteListener,以下是當 Process 階段發生錯誤時將錯誤資料寫到 log 的程式。 @Named
@Dependent
public class MySkipProcessListener implements SkipProcessListener{
.
.
@Override
public void onSkipProcessItem(Object item, Exception ex) throws Exception {
logger.logw(item, ex);
}
.
}
接著在 job.xml 中將 listener 設定好。 <job id="simpleJob" xmlns="https://xmlns.jcp.org/xml/ns/javaee" version="1.0">
<step id="step1">
<listeners>
<listener ref="mySkipProcessListener"/>
</listeners>
<chunk>
<reader ref="simpleJobReader"/>
<processor ref="simpleJobProcessor"/>
<writer ref="simpleJobWriter"/>
<skippable-exception-classes>
<include class="java.lang.NumberFormatException"/>
</skippable-exception-classes>
</chunk>
</step>
</job>
執行後便可以在 log 內看到發生錯誤的訊息。因為是「 例外」狀況,通常不會去預期這件事情常常發生,如果太常發生代表應該要有人為介入去處理這類事情。所以最好能設定例外發生的上限次數,如果達到一定數量代表已經不是偶爾會發生的狀況。只需要在 <chunk> 上設定 skip-limit 就可以。 .
<chunk skip-limit="3">
<reader ref="..."/>
.
例如上述設定在發生錯誤超過三次時便會讓 Job 變成 FAILED。以上範例介紹了遇到錯誤時可以用跳過的方式來對應。要注意的是錯誤發生不應該為常態,上面的範例通常可以在產生這些輸入資料時,或是在 Job 讀取時便可以過濾掉,而不需要等到處理或寫出時才做對應。 重試除了跳過以外,另一種方式就是重試了。例如處理資料的過程中需要從外部系統取得資料,這時候可能會因為網路或是環境的關係暫時出錯,但只要重新嘗試便可以在取得的狀況時,就適合用重試的機制。我們只需要在 <chunk> 下加上 <retryable-exception-classes> ,當處理過程中發生對應的狀況時便會重新再處理。 .
<chunk>
<reader ref="simpleJobReader"/>
<processor ref="simpleJobProcessor"/>
<writer ref="simpleJobWriter"/>
<retryable-exception-classes>
<include class="java.io.IOException"/>
</retryable-exception-classes>
</chunk>
.
要注意這樣在發生錯誤時會一直重複執行下去,所以通常會在 chunk 設定 retry-limit 以避免失敗和重試無限輪迴。另外重試會馬上再開,所以需要自行處理是不是要等待一段時間再開。 .
<chunk retry-limit="3">
<reader ref="..."/>
.
同樣我們也可以實作 RetryReadListener,RetryProcessListener 和 RetryWriteListener 來撰寫遇到錯誤時的行為。 重試跟跳過可以組合使用,當同時符合條件時會先重試,如果持續發生錯誤而超過重試次數便會跳過,如果跳過次數再超過設定的跳過上限才會算 Job 失敗。 .
<chunk retry-limit="3" skip-limit="5">
<reader ref="simpleJobReader"/>
<processor ref="simpleJobProcessor"/>
<writer ref="simpleJobWriter"/>
<retryable-exception-classes>
...
</retryable-exception-classes>
<skippable-exception-classes>
...
</skippable-exception-classes>
</chunk>
.
重新執行 Job如果沒有處理錯誤狀況,或是超過上面兩種所設定的上限數量時,Job 會變成 FAILED,這時候我們可以透過 JobOperator 的 restart 方法來重新執行 Job。 JobOperator jobOperator = BatchRuntime.getJobOperator(); Properties props = new Properties(); jobOperator.restart(jobExecutionId, props); 在這邊我們可以透過 JobOperator 取得 JobInstance 跟 JobExecution 和判斷狀態,然後再將要重新執行的 JobExecution 的 id 傳給 restart 執行。通常一連串的 step 中如果發生錯誤,重新啟動時會從失敗的 step 開始執行。例如以下範例,如果錯誤發生在 step2,那麼透過 restart 再次啟動時,step1 不會執行。 .
<step id="step1" next="step2">
<chunk item-count="2">
<reader ref="simpleJobReader"/>
<processor ref="simpleJobProcessor"/>
<writer ref="simpleJobWriter"/>
</chunk>
</step>
<step id="step2" next="step3">
<batchlet ref="batch1"/>
</step>
<step id="step3">
<batchlet ref="batch2"/>
</step>
.
但是有時候重新啟動時,之前產生的資料可能已經過期,那麼我們會希望重啟時可以重新執行之前順利完成的 step,這時就只要在 step 上設定 allow-start-if-complete 為 true 就可以了。 .
<step id="step1" next="step2" allow-start-if-complete="true">
<chunk item-count="2">
<reader ref="simpleJobReader"/>
<processor ref="simpleJobProcessor"/>
<writer ref="simpleJobWriter"/>
</chunk>
</step>
<step id="step2" next="step3">
<batchlet ref="batch1"/>
</step>
<step id="step3">
<batchlet ref="batch2"/>
</step>
.
上面範例如果在 batch1 或 batch2 發生錯誤後重新啟動的話也會執行 step1。 以上簡單介紹了 Java EE 7 Batch 在處理過程中發生錯誤時提供的對應方式,當然在正式環境中錯誤處理的狀況應該會更加複雜,請依情況選擇合適的方式。 平行處理
批次處理通常都是處理大量資料,這時候如果可以善加利用機器的效能,讓工作能在多個執行緒上執行,便有機會(多少執行緒,處理資料的特性都會影響效能,不一定越多執行緒越好)在較短的時間內完成工作。JSR-352 提供了兩種方式設定工作該如何分割,一種為靜態決定切割方式,執行前就決定好執行的切割數量,適合在開發或部署時便已經知道需要多少資源處理多少資料時使用;一種為動態,則是執行時決定,適合在執行時才知道要處理的資料數或能使用的資源多寡的狀況。 靜態決定靜態設定主要就是在 job.xml 直接設定好數量,以下為使用 <partition> 分割成兩個部分,並且設定最多可以利用的執行緒數。 .
<step id="step1">
<chunk item-count="2">
<reader ref="simpleJobReader">
<properties>
<property name="start" value="#{partitionPlan['start']}" />
<property name="end" value="#{partitionPlan['end']}" />
</properties>
</reader>
<processor ref="simpleJobProcessor"/>
<writer ref="simpleJobWriter"/>
</chunk>
<partition>
<plan partitions="2" threads="2">
<properties partition="0">
<property name="start" value="1"/>
<property name="end" value="10"/>
</properties>
<properties partition="1">
<property name="start" value="11"/>
<property name="end" value="20"/>
</properties>
</plan>
</partition>
</step>
.
可以看到 <plan> 定義了切割的數量,另外針對每個 partition 都有不同的變數,然後 reader 中透過 #{paritionPlan['start']} 跟 #{paritionPlan['end']} 取得當下執行時要處理的資料範圍。執行時便可以發現最多會同時有兩個執行緒執行 chunk 內定義的動作,分別處理 1 到 10 跟 11到 20 的資料。以下為對應的 SimpleJobReader 的示意程式。 @Named
@Dependent
public class SimpleJobReader extends AbstractItemReader {
@Inject
@BatchProperty(name = "start")
private String start;
@Inject
@BatchProperty(name = "end")
private String end;
@Override
public void open(Serializable checkpoint) throws Exception {
// 讀取指定的 start ~ end 區間的資料
.
}
.
}
動態決定動態設定則是透過實作 PartitionMapper 來達成,它只有一個方法 mapPartitions,讓我們自己實作以動態產生 PartitionPlan 並回傳,例如可以先查詢總共要處理的資料數量後再決定切割跟執行緒量。下面的範例是仿上面靜態設定的內容。 @Dependent
@Named
public class SimplePartitionMapper implements PartitionMapper {
@Override
public PartitionPlan mapPartitions() throws Exception {
PartitionPlanImpl partitionPlan = new PartitionPlanImpl();
partitionPlan.setPartitions(2);
partitionPlan.setThreads(2);
Properties[] partitionProperties = new Properties[partitionPlan.getPartitions()];
for (int i = 0; i < partitionPlan.getPartitions(); i++) {
Properties props = new Properties();
props.setProperty("start", Integer.toString(1 + i * 10));
props.setProperty("end", Integer.toString((i + 1) * 10));
partitionProperties[i] = props;
}
partitionPlan.setPartitionProperties(partitionProperties);
return partitionPlan;
}
}
然後只要在 job.xml 設定好要使用剛剛寫好的 PartitionMapper 即可。 <step>
.
<partition>
<mapper ref="simplePartitionMapper"/>
</partition>
</step>
除了動態靜態方式分割外,partition 下還有其他用於控制和分析用的 reducer,collector 和 analyzer,本文就不詳細介紹這部分,讀者可以到 Java EE 7 Tutorial 了解這部分。 結尾本次說明了 JSR-352 剩下的兩個主要功能,下一次將會對 Java EE 7 的其他功能做介紹。
|

Java 學習之路