Spring 非同步程式設計

前言

為了改善應用程式部分功能的效能,像是呼叫多個第三方 API , 資料處理… 等等,往往會用非同步設計的方式達到並行處理的效果。但若採用 Thread, Runbbable ,Callable 這些寫法來實現,會產出許多笨重且較難以維護的程式碼,因此 Spring @Async 來改善程式的可讀性與設計方式。

本篇會將介紹 Spring @Async 與 Java 8 CompletableFuture 的組合來介紹非同步程式設計的方式。

概念介紹

非同步程式相當於創建一個新的子執行緒 來執行,執行緒 (Thread) 會循序等待每個子執行緒執行完任務,並獲取相關物件值,再次進行相關邏輯運算後,才會回傳最終結果。

而在 Spring Boot 中,透過 @Async 來標註方法來告知 Spring 需要建立子執行緒來執行,並且在方法回傳該執行緒完成的資料,方法回傳值都透過 Future 實現 。 因此可以結合 JDK 8 推出的 CompletableFuture 功能來做到複雜的商業邏輯處理。

應用時機

非同步程式主要是應用於效能優化,但各個資料的狀態互相獨立的場合,一個利用資源換取時間的概念。像是加速批次資料的處理,呼叫多個獨立的 API 端點,資料庫的資料查詢,信件通知等等。

基本啟用設定

在配置類別加上 @EnableAsync 啟用非同步方法功能,並建立一個名稱為 executor 的執行緒池。當標註 @Async 時可使用建立好的執行緒來執行非同步方法。 若都未設定 executor , Spring 預設 SimpleAsyncTaskExecutor` 來執行非同步方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@EnableAsync
@Configuration
public class AsyncConfig {
@Value("${spring.async.core-pool-size:30}")
private Integer corePoolSize;

@Value("${spring.async.max-pool-size:50}")
private Integer maxPoolSize;

@Value("${spring.async.queue-capacity:80}")
private Integer queueCapacity;

@Bean(name = "taskExecutor")
public Executor executor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setTaskDecorator(new MDCTaskDecorator());
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setThreadNamePrefix("async-");
executor.initialize();
return executor;
}
}

建立非同步的方法

後續在業務邏輯層 (@Service ,@Component) 標註 @Async 來告訴 Spring 該方法是一個非同步執行的任務。若方法需要回傳資料,將資料封裝在 CompletableFuture.completedFuture (results) 進行回傳,方便使用該任務的其他方法做後續的處理 (等待 Thread 執行,額外資料轉換等)。

程式範例為模擬判斷找到不同字串的電影 。

Service 層

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

@Service
public class AsyncService {

private static final Logger logger = LoggerFactory.getLogger(AsyncService.class);

private List<String> movies =
new ArrayList<>(
Arrays.asList(
"Forrest Gump",
"Titanic",
"Spirited Away",
"The Shawshank Redemption",
"Zootopia",
"Farewell ",
"Joker",
"Crawl"));

@Async
public CompletableFuture<List<String>> completableFutureTask(String start) {
logger.warn(Thread.currentThread().getName() + "start this task!");
List<String> results =
movies.stream().filter(movie -> movie.startsWith(start)).collect(Collectors.toList());

try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return CompletableFuture.completedFuture(results);
}
}

Controller 層

建立一個 API 端點來測試撰寫的非同步方法是不是用不同的 Thread 來處理資料。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@RestController
@RequestMapping("/async")
public class AsyncController {
@Autowired
AsyncService asyncService;

@GetMapping("/movies")
public String completableFutureTask() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
List<String> words = Arrays.asList("F", "T", "S", "Z", "J", "C");
List<CompletableFuture<List<String>>> completableFutureList =
words.stream()
.map(word -> asyncService.completableFutureTask(word))
.collect(Collectors.toList());
// CompletableFuture.join()
List<List<String>> results = completableFutureList.stream().map(CompletableFuture::join).collect(Collectors.toList());
// print consume time
System.out.println("Elapsed time: " + (System.currentTimeMillis() - start));
return results.toString();
}
}

啟動應用程式後,Postman 或瀏覽器對 http://localhost:8080/async/movies 發出 GET 請求。

應用程式收到請求並執行後會在 console 印出下面結果。可以觀察到該端點會新增不同的執行緒來實現非同步執行。

1
2
3
4
5
6
7
My ThreadPoolTaskExecutor-1start this task!
My ThreadPoolTaskExecutor-6start this task!
My ThreadPoolTaskExecutor-5start this task!
My ThreadPoolTaskExecutor-4start this task!
My ThreadPoolTaskExecutor-3start this task!
My ThreadPoolTaskExecutor-2start this task!
Elapsed time: 1010

總結

此篇文章簡單介紹了 SpringBoot 如何改善複雜的非同步程式設計,並介紹如何啟用 Spring Boot 非同步的功能,並如何實際開發出非同步的程式,最後則是有一個小小的 Demo 來顯示非同步的功能是如何啟用的,至於如何處理回傳的 CompletableFuture , 筆者後續會再詳細介紹有哪些語法可以使用!!!

參考資料