Azure Queue Storage 介紹與操作
前言
Azure Queue Storage 適合用於儲存大量的訊息,但又不會需要長期保存時的情境,一個 Queus Storage 可以純存約百萬則訊息,
存取訊息的方式支援 HTTP 與 HTTPS 。 如果要使用 Azure Queue Storage, 則需在 Azure Storage Account 中進行建立與管理。
特性
Azure Queue Storage
- FIFO : 先儲存的會優先被取出來處理,但由於是軟刪除,所以發生 concurrency 不一定會保持此特性
- 每一筆訊息最大容量為 64 KB , 儲存期限為 7 天
- 一個 queue 可容納 500TB 的資訊
- 易於與 Azure 其他服務整合,擴張 (Azure Function)
- 支援 HTTP ,HTTPS 通訊協定存取
儲存的訊息
- 可以是 UTF -8 字串 或是 二進位制 (Byte Arrays) 的格式
- XML 文件,CSV ,TSV 檔案等
應用時機
主要被應用在於非即時回應的非同步處理,想降低不同系統之間的耦合性,或是做為後端伺服器的系統緩衝。
- Email , SMS 的訊息發送
- 後端 Server 紀錄 Log 資料的管道
- 微服務系統之間的溝通橋樑
實作介紹
因為筆者主要使用 Java 開發,接下來 Queue 的實作皆會已 Java 語法進行介紹常用的操作:
- Storage Account 建立 Queue
- 設定 Queue 連線與授權資訊
- 傳送訊息至 Queue
- 取出 Queue 中的訊息
- 更新 Queue 的訊息
- 刪除 Queue 的訊息
- 計算 Queue 中訊息的數量
- Queue 的例外處理
- Storage Account 建立 Queue
登入 Azure Portal 後,選擇 Storage Account , 點選側邊欄的 Queue 後按上面的 + 進行新增
設定 Queue 連線與授權資訊
設定 Queue 服務的 url 與認證方式,認證方式主要有三種 (分別為 SAS Token , Connection String , Storage Account Key) ,
取得認證資訊方式可參考官方文件 Authenticate the client
1
2String queueURL = String.format("https://%s.queue.core.windows.net/%s", ACCOUNT_NAME, queueName);
QueueClient queueClient = new QueueClientBuilder().endpoint(queueURL).sasToken(SAS_TOKEN).buildClient();傳送訊息至 Queue
1
2
3
4
5String queueURL = String.format("https://%s.queue.core.windows.net", ACCOUNT_NAME);
QueueClient queueClient = new QueueClientBuilder().endpoint(queueURL).sasToken(SAS_TOKEN).queueName("myqueue")
.buildClient();
queueClient.sendMessage("myMessage");取出 Queue 中的訊息
取出 Queue 的 Message 有提供其他額外參數設定,由以下多做說明:
- maxMessages: 批次從 queue 中取出訊息的最大數量 。最大為 32 筆,超過設定會出現 exception
- visibilityTimeout: 取出後隔多少時間才能再次看到該 Message, 預設為 30 秒
- timeout: 與 queue 連線多久沒回應的時效限制
- context: 使用服務前,需要額外新增近 Http Pipeline 的資訊
1
2
3
4
5String queueURL = String.format("https://%s.queue.core.windows.net", ACCOUNT_NAME);
QueueClient queueClient = new QueueClientBuilder().endpoint(queueURL).sasToken(SAS_TOKEN).queueName("myqueue")
.buildClient();
queueClient.receiveMessages(10).forEach(message ->
System.out.println(message.getBody().toString()));更新 Queue 的訊息
1
2
3
4// @param messageId: Message 的Id
// @param popReceipt: 用於辨識哪個Message 要被更新
// @param visibilityTimeout: 更新後多久才能再看到
queueClient.updateMessage(messageId, popReceipt, "new message", visibilityTimeout);刪除 Queue 的訊息
1
2
3
4
5String queueURL = String.format("https://%s.queue.core.windows.net", ACCOUNT_NAME);
QueueClient queueClient = new QueueClientBuilder().endpoint(queueURL).sasToken(SAS_TOKEN).queueName("myqueue")
.buildClient();
queueClient.deleteMessage(messageId, popReceipt);計算 Queue 中訊息的數量
1
2
3QueueClient queueClient =
QueueUtils.createQueueClient(queueName, QueueUtils.getDefaultConnString());
queueClient.getProperties().getApproximateMessagesCount();Queue 的例外處理
Azure SDK 有提供額外的 Error Code 讓開發人員進行處理,詳情請見 Queue Storage error codes
1
2
3
4
5
6
7
8String queueServiceURL = String.format("https://%s.queue.core.windows.net", ACCOUNT_NAME);
QueueServiceClient queueServiceClient = new QueueServiceClientBuilder().endpoint(queueServiceURL)
.sasToken(SAS_TOKEN).buildClient();
try {
queueServiceClient.createQueue("myQueue");
} catch (QueueStorageException e) {
logger.error("Failed to create a queue with error code: " + e.getErrorCode());
}
補充說明
除了 Azure Queue Storage , 微軟雲端有再推出另外一種 Queue 型態的服務 - Azure Service Bus 。相較於 Queue Storage , Service Bus 的功能與 Rabbit MQ , Kafaka , GCP PubSub 較為類似,比較偏向 Producer - Consumer 的架構,所以如果希望要較為即時的非同步處理功能,請珍惜生命,不要走錯棚!