Azure Queue Storage 介紹與操作

前言

Azure Queue Storage 適合用於儲存大量的訊息,但又不會需要長期保存時的情境,一個 Queus Storage 可以純存約百萬則訊息,

存取訊息的方式支援 HTTP 與 HTTPS 。 如果要使用 Azure Queue Storage, 則需在 Azure Storage Account 中進行建立與管理。

Azure Queue


特性

Azure Queue Storage

  • FIFO : 先儲存的會優先被取出來處理,但由於是軟刪除,所以發生 concurrency 不一定會保持此特性
  • 每一筆訊息最大容量為 64 KB , 儲存期限為 7 天
  • 一個 queue 可容納 500TB 的資訊
  • 易於與 Azure 其他服務整合,擴張 (Azure Function)
  • 支援 HTTP ,HTTPS 通訊協定存取

儲存的訊息

  • 可以是 UTF -8 字串 或是 二進位制 (Byte Arrays) 的格式
  • XML 文件,CSV ,TSV 檔案等

應用時機

image-20240105162500590

主要被應用在於非即時回應的非同步處理,想降低不同系統之間的耦合性,或是做為後端伺服器的系統緩衝。

  • Email , SMS 的訊息發送
  • 後端 Server 紀錄 Log 資料的管道
  • 微服務系統之間的溝通橋樑

實作介紹

因為筆者主要使用 Java 開發,接下來 Queue 的實作皆會已 Java 語法進行介紹常用的操作:

  1. Storage Account 建立 Queue
  2. 設定 Queue 連線與授權資訊
  3. 傳送訊息至 Queue
  4. 取出 Queue 中的訊息
  5. 更新 Queue 的訊息
  6. 刪除 Queue 的訊息
  7. 計算 Queue 中訊息的數量
  8. Queue 的例外處理

  1. Storage Account 建立 Queue

登入 Azure Portal 後,選擇 Storage Account , 點選側邊欄的 Queue 後按上面的 + 進行新增

azure-portal
  1. 設定 Queue 連線與授權資訊

    設定 Queue 服務的 url 與認證方式,認證方式主要有三種 (分別為 SAS Token , Connection String , Storage Account Key) ,

    取得認證資訊方式可參考官方文件 Authenticate the client

    1
    2
    String queueURL = String.format("https://%s.queue.core.windows.net/%s", ACCOUNT_NAME, queueName);
    QueueClient queueClient = new QueueClientBuilder().endpoint(queueURL).sasToken(SAS_TOKEN).buildClient();
  2. 傳送訊息至 Queue

    1
    2
    3
    4
    5
    String queueURL = String.format("https://%s.queue.core.windows.net", ACCOUNT_NAME);
    QueueClient queueClient = new QueueClientBuilder().endpoint(queueURL).sasToken(SAS_TOKEN).queueName("myqueue")
    .buildClient();

    queueClient.sendMessage("myMessage");
  3. 取出 Queue 中的訊息

    取出 Queue 的 Message 有提供其他額外參數設定,由以下多做說明:

    • maxMessages: 批次從 queue 中取出訊息的最大數量 。最大為 32 筆,超過設定會出現 exception
    • visibilityTimeout: 取出後隔多少時間才能再次看到該 Message, 預設為 30 秒
    • timeout: 與 queue 連線多久沒回應的時效限制
    • context: 使用服務前,需要額外新增近 Http Pipeline 的資訊
    1
    2
    3
    4
    5
    String queueURL = String.format("https://%s.queue.core.windows.net", ACCOUNT_NAME);
    QueueClient queueClient = new QueueClientBuilder().endpoint(queueURL).sasToken(SAS_TOKEN).queueName("myqueue")
    .buildClient();
    queueClient.receiveMessages(10).forEach(message ->
    System.out.println(message.getBody().toString()));
  4. 更新 Queue 的訊息

    1
    2
    3
    4
    // @param messageId: Message 的Id
    // @param popReceipt: 用於辨識哪個Message 要被更新
    // @param visibilityTimeout: 更新後多久才能再看到
    queueClient.updateMessage(messageId, popReceipt, "new message", visibilityTimeout);
  5. 刪除 Queue 的訊息

    1
    2
    3
    4
    5
    String queueURL = String.format("https://%s.queue.core.windows.net", ACCOUNT_NAME);
    QueueClient queueClient = new QueueClientBuilder().endpoint(queueURL).sasToken(SAS_TOKEN).queueName("myqueue")
    .buildClient();

    queueClient.deleteMessage(messageId, popReceipt);
  6. 計算 Queue 中訊息的數量

    1
    2
    3
    QueueClient queueClient =
    QueueUtils.createQueueClient(queueName, QueueUtils.getDefaultConnString());
    queueClient.getProperties().getApproximateMessagesCount();
  7. Queue 的例外處理

    Azure SDK 有提供額外的 Error Code 讓開發人員進行處理,詳情請見 Queue Storage error codes

    1
    2
    3
    4
    5
    6
    7
    8
    String queueServiceURL = String.format("https://%s.queue.core.windows.net", ACCOUNT_NAME);
    QueueServiceClient queueServiceClient = new QueueServiceClientBuilder().endpoint(queueServiceURL)
    .sasToken(SAS_TOKEN).buildClient();
    try {
    queueServiceClient.createQueue("myQueue");
    } catch (QueueStorageException e) {
    logger.error("Failed to create a queue with error code: " + e.getErrorCode());
    }

補充說明

除了 Azure Queue Storage , 微軟雲端有再推出另外一種 Queue 型態的服務 - Azure Service Bus 。相較於 Queue Storage , Service Bus 的功能與 Rabbit MQ , Kafaka , GCP PubSub 較為類似,比較偏向 Producer - Consumer 的架構,所以如果希望要較為即時的非同步處理功能,請珍惜生命,不要走錯棚!

參考資料