Pub/Sub with ZeroMQ on Azure

I wanted to do real-time publish/subscribe, and although the Azure Service Bus supports this with topics and subscriptions, it seems to be geared towards guaranteed delivery rather than real-time. You need to make a request to receive a message, which didn’t quite fit the real-time nature of my application. Enter ZeroMQ.

ZeroMQ (or ØMQ) is a library for working with sockets. There are clients for a variety of languages, including C# – although I can’t get the node module to build :¬(. It’s a fairly cool piece of technology, and takes all the pain out of socket programming.

ZeroMQ needs to be installed on the machine, and some C++ runtime DLLs are also required. To make this super easy I created a plugin in the AzurePluginLibrary to do just this. To install, just run this command using APM.

> apm install ZeroMQ

Setting up the Publisher

Next I created a Cloud Project, and a WorkerRole to act as my publisher. To install the ZeroMQ references, you just need to run this nuget command in Visual Studio:

PM> Install-Package clrzmq -Pre

In the ServiceDefinition file for the Worker, add the ZeroMQ plugin, and two endpoints (the port numbers don’t matter):

<WorkerRole name="WorkerRole1" vmsize="Small">
  <Imports>
    <Import moduleName="Diagnostics" />
    <Import moduleName="ZeroMQ" />
  </Imports>
  <Endpoints>
    <InternalEndpoint name="Endpoint1" protocol="tcp" />
    <InternalEndpoint name="Endpoint2" protocol="tcp" />
  </Endpoints>
</WorkerRole>

Now you just need to write the C# code to receive a message from a client, and publish to all connected clients. I put this code in the ‘Run’ method of my RoleEntryPoint.

public override void Run()
{
  var context = ZmqContext.Create();

  // set up the socket which publishes messages to subscribers
  var endpoint1 = GetEndpoint("Endpoint1");
  var publisher = context.CreateSocket(SocketType.PUB);
  publisher.Bind(string.Format("tcp://*:{0}", endpoint1.IPEndpoint.Port));

  // set up the socket which receives messages from subscribers
  var endpoint2 = GetEndpoint("Endpoint2");
  var receiver = context.CreateSocket(SocketType.PULL);
  receiver.Bind(string.Format("tcp://*:{0}", endpoint2.IPEndpoint.Port));

  // publish every message you receive!
  while (true)
  {
    var message = receiver.Receive(Encoding.UTF8);
    publisher.Send(message, Encoding.UTF8);
  }
}

The ‘GetEndpoint’ function in this case just returns the required RoleInstanceEndpoint of the current instance.

private RoleInstanceEndpoint GetEndpoint(string name)
{
    return RoleEnvironment
        .CurrentRoleInstance
        .InstanceEndpoints
        .Where(x => x.Key == name)
        .Select(x => x.Value).FirstOrDefault();
}

Setting up the Subscriber

I then created a WebRole which acts as the subscriber. This also needs the ServiceDefinition file updating to include the ZeroMQ plugin. I added the code into the Global.asax, but for simplicity I have extracted it out into a single code block for this example.

var context = ZmqContext.Create();

// set up the socket which subscribed to messages from the publisher
var endpoint1 = GetEndpoint("Endpoint1");
var subscriber = this.Context.CreateSocket(SocketType.SUB);
subscriber.Connect(string.Format("tcp://{0}:{1}", endpoint1.IPEndpoint.Address, endpoint1.IPEndpoint.Port));
subscriber.Identity = Guid.NewGuid().ToByteArray();
subscriber.SubscribeAll();

// set up the socket which sends messages to the publisher
var endpoint2 = GetEndpoint("Endpoint2");
var sender = this.Context.CreateSocket(SocketType.PUSH);
sender.Connect(string.Format("tcp://{0}:{1}", endpoint2.IPEndpoint.Address, endpoint2.IPEndpoint.Port));

while (true)
{
    var message = subsriber.Receive(Encoding.UTF8);
    // do something with the messages you receive
}

To send a message, you just call:

sender.Send(message, Encoding.UTF8);

In this code snipper, the GetEndpoint function is slightly different:

private RoleInstanceEndpoint GetEndpoint(string name)
{
    return RoleEnvironment
        .Roles["WorkerRole1"]
        .Instances.First()
        .InstanceEndpoints
        .Where(x => x.Key == name)
        .Select(x => x.Value)
        .FirstOrDefault();
}

Conclusion

This sample shows a simple pub/sub scenario with ZeroMQ in Azure. Whenever the ‘Send’ function is called, the message will almost instantly pop out at each of the subscribers. Network disruption and reconnecting the socket is all handled for you. ZeroMQ is a powerful tool. This is just one way of configuring it, there are a number of different scenarios that it can support.

Advertisement