Using Observers for Pub Sub in Project Orleans

Orleans has a cool feature for pub-sub messaging. This allows you to register for grain originated events without you having to poll a grain. Instead you can asked to be called back when the interesting thing happens.

To do this you create an ‘observer’ object. You then create a reference to this object, and pass it to the grain. The grain maintains a list of all observers. The grain can then send them all a message with a single call.

To create an observer, create an interface which inherits from IGrainObserver. You should put this in your Grain Interfaces project.

public interface IFooGrainObserver : IGrainObserver
{
    void Foo(string message);
}

Note that the method signature for Foo returns void. This is a requirement for subscribers.

Next create an implementation of this class. It doesn’t really matter which project you put this in. It’s probably best placed in the project where you want the code to actually run, so if you’re sending a message back to the client, put it alongside your clide code.

class FooObserver : IFooGrainObserver
{
    public void Foo(string message)
    {
        Console.WriteLine(message);
    }
}

We’ll need a grain which is going to receive the subscriptions, and perform the publication.

This is the grain interface which has a method for registering a subscriber (Subscribe), and another to call when you wish to publish a message (Publish):

public interface IPubSubGrain : IGrain
{
    Task Subscribe(IFooGrainObserver observer);
    Task Publish(string message);
}

And this is the implementation:

public class PubSubGrain : GrainBase, IPubSubGrain
{

    ObserverSubscriptionManager<IFooGrainObserver> subscribers = new ObserverSubscriptionManager<IFooGrainObserver>();

    public Task Subscribe(IFooGrainObserver observer)
    {
        subscribers.Subscribe(observer);
        return TaskDone.Done;
    }

    public Task Publish(string message)
    {
        subscribers.Notify(x => x.Foo(message));
        return TaskDone.Done;
    }
}

Note that Orleans provides a ObserverGrainManager which helps you manage the subscriptions, and send notifications.

Now to actually make this work.

When the Grain Interfaces project compiles a factory is created for our subscriber (FooGrainObserverFactory) – just like the factories are created for the grains.

To use the factory, we pass in an instance of our IFooGrainObserver interface (which will be a FooObserver). This will give us back an object we can then pass to the Subscribe method of our grain.

This is the subscribe process complete. Now, just call publish.

Your client code (perhaps this is in your Dev/Test Silo) will look something like this:

var grain = PubSubGrainFactory.GetGrain(0);

// register a subscriber.
var observerRef = await FooGrainObserverFactory.CreateObjectReference(new FooObserver());
await grain.Subscribe(observerRef);

// send a publish
await grain.Publish("Hello World");

// Hello World is printed on the console by the instance of FooObserver

Simple!

Advertisements