Aggregating Results in Orleans

Orleans is great for maintaining hundreds of thousands of gains, each with their own internal state. However, it’s sometimes difficult to get aggregate figures out the system, such as the total for a particular value across all grains. You can’t practically make a fan out request to all grains to retrieve a value, firstly such a request would time out, secondly you probably don’t know what grains you’ve got. The answer is to set up a single grain, which all other grains will report to. We’ll look at a simple scenario to see how this could be done.

Let’s suppose that we have grains that hold a score, and we’d like the total score for the entire system. Every time the score changes in the grains, we call a ‘total score grain’ which maintains the overall total. The grain looks like this:

 

    // Total Score Grain interface
    public interface ITotalScoreGrain : IGrain
    {
        Task AddScore(int value);

        Task GetTotalScore();
    }


    // Total Score Grain implementation
    public class TotalScoreGrain : GrainBase, ITotalScoreGrain
    {
        int total;

        public Task AddScore(int value)
        {
            this.total += value;
            return TaskDone.Done;
        }

        public Task GetTotalScore()
        {
            return Task.FromResult(total);
        }
    }

This looks simple enough, but it’s a really bad idea, for a couple of reasons:

  • We now have a single grain acting as a bottleneck for the entire system. We’ve defeated the point in having a distributed system.
  • If you’re running the system on several servers, the chances are that the ‘total score grain’ will not be in the same silo as the grain calling it. This makes the call to the ‘total score grain’ a remote call, which is more expensive (takes more time).

Let’s improve on this by introducing  a ‘sub total grain’ which will be responsible for collecting all scores from each silo, and reporting this up to the ‘total score grain’:

    // Sub Total Grain interface
    [StatelessWorker]
    public interface ISubTotalGrain : Orleans.IGrain
    {
        Task AddScore(int value);
    }

    // Sub Total Grain implementation
    public class SubTotalGrain : GrainBase, ISubTotalGrain
    {
        int score;

        public override Task ActivateAsync()
        {
            RegisterTimer(SendUpdate, null, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5));
            return base.ActivateAsync();
        }

        async Task SendUpdate(object _)
        {
            if (score == 0) return;
            var totalScoreGrain = TotalScoreGrainFactory.GetGrain(0);
            await totalScoreGrain.AddScore(score);
            score = 0;
        }

        public Task AddScore(int value)
        {
            this.score += value;
            return TaskDone.Done;
        }
    }

This grain has two interesting properties. First of all it’s a Stateless Worker (see the attribute on the interface), secondly it has a timer, so it doesn’t forward every score update on, it just updates some internal state, and forwards the information if required every 5 seconds. This means that:

  • The grain will auto scale as required, removing the bottleneck.
  • The grain will always be created in the local silo. Which removes the need for network hop to report score changes. The cross-silo hop will only be triggered every 5 seconds.

This design introduces ‘eventual consistency’ to the total score, as the score could always be up to 5 seconds behind. However, the system should scale nicely with no bottlenecks.

Is it right to store state in a Stateless Worker? It’s not the intention of Stateless Worker grains to be used in this way, but they are activated and work in the same way as normal grains, there are just multiple copies of them, and they’re created locally. This is fine if you can reconcile the state distributed across all the activations, which in our case is easy, because we can just add it up.

Acknowledgements to Sergey Bykov for helping to foster the idea.

 

Advertisements