Pattern for processing Azure Queue Messages in parallel

By using a function that dequeus messages as an IEnumerable, you can easily plug in the parallel task library to process message simultaneously.

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.WindowsAzure;
using Microsoft.WindowsAzure.StorageClient;

class Program
{
    static void Main(string[] args)
    {
        var account = CloudStorageAccount.DevelopmentStorageAccount;
        var client = account.CreateCloudQueueClient();
        var queue = client.GetQueueReference("foo");

        Parallel.ForEach<CloudQueueMessage>(GetMessages(queue), (message) =>
        {
            // process your message here
            Console.WriteLine(message.AsString);
            queue.DeleteMessage(message);
        });
    }

    private static IEnumerable<CloudQueueMessage> GetMessages(CloudQueue queue)
    {
        while (true)
        {
            var message = queue.GetMessage();
            if (message != null)
            {
                yield return message;
            }
            else
            {
                yield break;
            }
        }
    }
}

The added advantage to this approach, is that messages added to the queue once the loop has started will still get processed. You could wrap the whole Parallel.ForEach in a loop, and use Thread.Sleep to pause whilst there is nothing in the queue to process. 
Advertisements