【Guava 教學】(9)ListenableFuture 聽取未來需求
【Guava 教學】(8)你需要的其實是範圍(Range)? << 前情 在許多語言中,多少都內建了一些非同步處理的方案,像是 Java,在 JDK5 後為了簡化非同步處理,提出了 ... public static void readFile(String file, Consumer<String> success, Consumer<Throwable> fail, ExecutorService service) { service.submit(() -> { try { success.accept(new String(Files.readAllBytes(Paths.get(file)))); } catch (IOException ex) { fail.accept(ex); } }); } ... 這麼一來,你就可以用類似 Node.js 的風格來讀取一個文字檔案: readFile(args[0], content -> out.println(content), // success ex -> ex.printStackTrace(), // error Executors.newFixedThreadPool(10) ); 可以看到,就算搭配了 JDK8 的 Lambda,使用 Guava 的 ... public static void readFile(String file, Consumer<String> success, Consumer<Throwable> fail, ListeningExecutorService service) { ListenableFuture<String> future = service.submit( () -> new String(Files.readAllBytes(Paths.get(file))) ); Futures.addCallback(future, new FutureCallback<String>() { public void onSuccess(String content) { success.accept(content); } public void onFailure(Throwable thrown) { fail.accept(thrown); } }); } ...
readFile(args[0], content -> out.println(content), // success ex -> ex.printStackTrace(), // error MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10)) ); 然而在非同步操作使用回呼(Callback)風格,在每次回呼中又再度進行非同步操作及回呼,很容易寫出回呼地獄(Callback hell),造成可讀性不佳。例如若有個類似 readFile(args[0], content -> asyncProcess(content, processedContent -> out.println(processedContent) , errorHandler, service), errorHandler, service); 為了避免可讀性變差的問題,你可以使用 .... public static <R> ListenableFuture<R> asyncFuncFuture(Callable<R> callable, ListeningExecutorService service) { return service.submit(() -> callable.call()); } public static <P, R> ListenableFuture<R> asyncComposeFuture( ListenableFuture<P> future, Function<P, R> f, ListeningExecutorService service) { AsyncFunction<P, R> asyncFunc = new AsyncFunction<P, R>() { public ListenableFuture<R> apply(P content) throws Exception { return service.submit(() -> f.apply(content)); } }; return Futures.transform(future, asyncFunc, service); } .... 你可以傳給 有了 ... ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10)); ListenableFuture<String> contentFuture = asyncFuncFuture( () -> new String(Files.readAllBytes(Paths.get(args[0]))), service); ListenableFuture<String> processedContentFuture = asyncComposeFuture(contentFuture, content -> process(content), service); Futures.addCallback(processedContentFuture, new FutureCallback<String>() { public void onSuccess(String result) { out.println(result); } public void onFailure(Throwable t) { t.printStackTrace(); } }); ... 在撰寫非同步處理時,其實有許多方面在考量上與同步處理是不同的,可讀性是一個問題,上面的 有時候你會想要非同步但按照順序地執行一連串任務,例如非同步地讀取一連串檔案,但讀取時按照指定的順序: readFile(args[0], content1 -> readFile(args[1], content2 -> readFile(args[2], content3 -> /* 依序處理 content1, content2 與 content3 */ , errHandler, service), errHandler, service), errHandler, service); 在最後一次的回呼中,才會依序處理 ... public static void readAllFiles(String[] files, Consumer<List<String>> success, Consumer<Throwable> fail, ListeningExecutorService service) { List<ListenableFuture<String>> listenables = new ArrayList<>(files.length); for(String file : files) { listenables.add(service.submit( () -> new String(Files.readAllBytes(Paths.get(file))) )); } ListenableFuture<List<String>> contentsFuture = Futures.allAsList(listenables); Futures.addCallback(contentsFuture, new FutureCallback<List<String>>() { public void onSuccess(List<String> contents) { success.accept(contents); } public void onFailure(Throwable thrwb) { fail.accept(thrwb); } }); } ... 這麼一來,你就可以使用這個 readAllFiles(args, contents -> process(contents), ex -> ex.printStackTrace(), service); 實際上非同步處理時要注意的事項不只如此,可否注意到第一個的程式碼示範如何處理例外? ... try { readFile(args[0], content -> process(content), service); } catch(Exception ex) { // 處理錯誤 } ... 實際上 類似這類與同步與非同步程式執行習慣不同的情況還有不少…大部份情況下,我們都習慣同步處理,面對非同步處理時,還有不少要學習的地方,JDK5 之後有了些標準 API 可以使用,一直到 JDK8 還在補充,可見得非同步處理日益重要,像 Guava 這類的程式庫也有不少簡化之處,然而最重要的,還是我們的思考方式,沒有這層思考,使用 Guava 只是少打了幾個字,可讀性也不見得會好到哪裡去,有了這層思考,在打造自己程式的可讀性時,使用 Guava 這類的程式庫,才可以得到它簡化上的益處。 |