Paramore

Libraries and supporting examples for use with the Ports and Adapters and CQRS architectural styles for .NET, with support for Task Queues

View the Project on GitHub BrighterCommand/Brighter

Paramore Home

Next

Prev

Brighter

Tutorial

This tutorial takes you building the Greetings project, which is Hello World via a Task Queue. The walkthrough will build the example availabe in the Examples folder of Brighter available in the public repo at Greetings Example if you want to follow along there instead of typing in the code.

Note that you will need to have RabbitMQ installed to step through this example as a tutorial.

Step One

Create a C# Console Application, targeting .NET 4.5.

Note that you can use any kind of application with Brighter.ServiceActivator, it's purpose is to take messages of a task queue and route to a Request Handler seamlessly. We use a console application as we intend to build a Windows Service which is one of the most common use cases for the Service Activator pattern.

Step Two

Install the Paramore.Brighter.ServiceActivator package from NuGet

This will install Paramore.Brighter.ServiceActivator and it's dependencies Paramore.Brighter, Polly, and Newtonsoft.Json.

Although the Service Activator provides support for a consumer reading messages of an Input Channel, we need to supply a concrete implementation of IAmAMessageConsumer which abstracts the Message-Oriented-Middleware used by that Input Channel for the task queue implementation.

Install the Paramore.Brighter.MessagingGateway.RMQ package from NuGet

This will install RabbitMQ.Client as a dependency.

This will install the Paramore.Brighter.MessagingGateway.RMQ package which provides support for a Task Queue implemented in Rabbit MQ

Step Three

Use Topshelf to run the console application as a service

Install the Topshelf package from NuGet

Please see Topshelf's own documentation for how to use it in more depth.

Step Four

We use TinyIoC as a DI container within Greetings, so we need to add that package into the solution as well. Brighter is a DI Friendly Frameworks so you can use the DI container of your choice with Brighter.

Step Five

Brighter uses LibLog to abstract the implementation details of a client's logger. Greetings uses log4net as the concrete logger so we need to add a NuGet reference to that project too.

Step Six

We use boiler plate code to implement the Main method to configure the Topshelf service.


public static void Main()
{
    /*
    * Send a message in this format to this service and it will print it out
    * We document this here so that you can simply paste this into the RMQ web portal
    * to see commands flowing through the system.
    * {"Greeting":"hello world","Id":"0a81cbbc-5f82-4912-99ee-19f0b7ee4bc8"}
    */

    HostFactory.Run(x => x.Service<GreetingService>(sc =>
    {
        sc.ConstructUsing(() => new GreetingService());

        // the start and stop methods for the service
        sc.WhenStarted((s, hostcontrol) => s.Start(hostcontrol));
        sc.WhenStopped((s, hostcontrol) => s.Stop(hostcontrol));

        // optional, when shutdown is supported
        sc.WhenShutdown((s, hostcontrol) => s.Shutdown(hostcontrol));
    }));
}
        

A summary of this code is: it provides callbacks for Topshelf to call in response to OS instructions to a Windows Service to start, stop or shutdown. In other words it configures how we respond to service lifetime events. We use a class called GreetingService to implement our response.

Step Seven

We now need to implement the GreetingsService to respond to the control messages. Add a new class to the project called GreetingService and enter the following code:


using System;
using Paramore.Brighter;
using Paramore.Brighter.MessagingGateway.RMQ;
using Paramore.Brighter.MessagingGateway.RMQ.MessagingGatewayConfiguration;
using Paramore.Brighter.ServiceActivator;
using Polly;
using TinyIoC;
using Topshelf;

namespace Greetings.Adapters.ServiceHost
{
    internal class GreetingService : ServiceControl
    {
        private Dispatcher _dispatcher;

        public GreetingService()
        {
            log4net.Config.XmlConfigurator.Configure();

            var container = new TinyIoCContainer();

            var handlerFactory = new TinyIocHandlerFactory(container);
            var messageMapperFactory = new TinyIoCMessageMapperFactory(container);
            container.Register<IHandleRequests<GreetingEvent>, GreetingEventHandler>();

            var subscriberRegistry = new SubscriberRegistry();
            subscriberRegistry.Register<GreetingEvent, GreetingEventHandler>();

            //create policies
            var retryPolicy = Policy
                .Handle<Exception>()
                .WaitAndRetry(new[]
                {
                    TimeSpan.FromMilliseconds(50),
                    TimeSpan.FromMilliseconds(100),
                    TimeSpan.FromMilliseconds(150)
                });

            var circuitBreakerPolicy = Policy
                .Handle<Exception>()
                .CircuitBreaker(1, TimeSpan.FromMilliseconds(500));

            var policyRegistry = new PolicyRegistry
            {
                {CommandProcessor.RETRYPOLICY, retryPolicy},
                {CommandProcessor.CIRCUITBREAKER, circuitBreakerPolicy}
            };

            //create message mappers
            var messageMapperRegistry = new MessageMapperRegistry(messageMapperFactory)
            {
                {typeof(GreetingEvent), typeof(GreetingEventMessageMapper)}
            };

            //create the gateway
            var rmqConnnection = new RmqMessagingGatewayConnection
            {
                AmpqUri = new AmqpUriSpecification(new Uri("amqp://guest:guest@localhost:5672/%2f")),
                Exchange = new Exchange("paramore.brighter.exchange"),
            };

            var rmqMessageConsumerFactory = new RmqMessageConsumerFactory(rmqConnnection);

            _dispatcher = DispatchBuilder.With()
                .CommandProcessor(CommandProcessorBuilder.With()
                    .Handlers(new HandlerConfiguration(subscriberRegistry, handlerFactory))
                    .Policies(policyRegistry)
                    .NoTaskQueues()
                    .RequestContextFactory(new InMemoryRequestContextFactory())
                    .Build())
                .MessageMappers(messageMapperRegistry)
                .DefaultChannelFactory(new InputChannelFactory(rmqMessageConsumerFactory))
                .Connections(new[]
                {
                    new Connection<GreetingEvent>(
                        new ConnectionName("paramore.example.greeting"),
                        new ChannelName("greeting.event"),
                        new RoutingKey("greeting.event"),
                        timeoutInMilliseconds: 200)
                }).Build();
        }

        public bool Start(HostControl hostControl)
        {
            _dispatcher.Receive();
            return true;
        }

        public bool Stop(HostControl hostControl)
        {
            _dispatcher.End().Wait();
            _dispatcher = null;
            return false;
        }

        public void Shutdown(HostControl hostcontrol)
        {
            _dispatcher?.End();
        }
    }
}

The key behavior of Greeting is to configure the Command Processor and the Dispatcher. We covered the basics of the CommandProcessor in the Hello World Example.

We use a DispatchBuilder to build a Dispatcher, which dispatches messages from a Task Queue to a Command Handler. The principle is that once configured you can send messages to handlers in the service without having to write the infrastructure code around reading from a queue, translating the message body into an IRequest (Command or Event), and dispatching to a handler. The goal here is that the task queue should remain transparent to the developer, who simply uses IAmACommandProcessor.Post to send a message from one process and then uses the Dispatcher to read that same message and pass to a handler in another.

We create a Command Processor as part of creating our Dispatcher to map de-serialized Commands or Events to handlers. Note that it may seem counter-intuitive that we set no Task Queue on the Command Processor. This is because we are not sending to a task queue from this service, just reading, so we do not need to configure Command Processor for sending only receiving. The Tasks Example shows an application that has both sending and receiving components.

We add both a Retry Policy and a Circuit Breaker Policy using the Polly library. We create policies to decide what intervals to retry at in the event of failure, and how long to break a circuit for in the presence of persistent failure. We register these policies in the PolicyRegistry, using the well-known names CommandProcessor.RETRYPOLICY and CommandProcessor.CIRCUITBREAKER. Internally, CommandProcessor uses the policies you register when you call IAmACommandProcessor.Post to push a message onto a Task Queue, but you can re-use them yourself. As discussed above, we are not doing a Post here.

(You can also use policies in your own handlers as discussed here).

We register implementations of IAmAMessageMapper with the MessageMapperRegistry to map the message body from the Task Queue into Commands and Events. In this case we only have one: GreetingEventMessageMapper which we use to map a GreetingEvent to and from the message body (as JSON).

In order to read messages from a Task Queue we need a IAmAMessageConsumerFactory. In this case we are reading from a RabbitMQ Task Queue so we use RmqMessageConsumerFactory. We set this as the parameter to an InputChannelFactory and pass to the DispatchBuilder

The Input Channel is an abstraction over the stream from which we read messages - mostly implemented using Message-Oriented Middleware - and Dispatcher uses the InputChannelFactory to create instances of the stream to read from, as specified in configuration. We pass the application protocol specific factory to this, so that we can create input channels for that protocol. The use of abstraction is intended to allow support for different protocols and implementations of those protocols to be used as the stream that underlies the Task Queue

As outlined in Hello World our goal is to be a DI Friendly Frameworks so we rely on the client implementing a factory to provide instances of handlers and message mappers to us. In this example we use TinyIoC as our DI framework and implement the required factories using that DI framework.

Step Eight

Add a TinyIocHandlerFactory class to the project and enter the following code


using System;
using Paramore.Brighter;
using TinyIoC;

namespace Greetings.Adapters.ServiceHost
{
    internal class TinyIocHandlerFactory : IAmAHandlerFactory
    {
       private readonly TinyIoCContainer _container;

    public TinyIocHandlerFactory(TinyIoCContainer container)
    {
        _container = container;
    }

    public IHandleRequests Create(Type handlerType)
    {
        return (IHandleRequests)_container.Resolve(handlerType);
    }

    public void Release(IHandleRequests handler)
    {
        var disposable = handler as IDisposable;
        if (disposable != null)
        {
            disposable.Dispose();
        }
            handler = null;
        }
    }
}
    

Add a TinyIoCMessageMapperFactory class to the project and enter the following code


using System;
using Paramore.Brighter;
using TinyIoC;

namespace Greetings.Adapters.ServiceHost
{
    internal class TinyIoCMessageMapperFactory : IAmAMessageMapperFactory
    {
        private readonly TinyIoCContainer _container;

        public TinyIoCMessageMapperFactory(TinyIoCContainer container)
        {
            _container = container;
        }

        public IAmAMessageMapper Create(Type messageMapperType)
        {
            return (IAmAMessageMapper)_container.Resolve(messageMapperType);
        }
    }
}
        

Step Nine

Now we need to add the GreetingEvent itself. Add a new class GreetingEvent to the project and enter the following code.


using System;
using Paramore.Brighter;

namespace Greetings.Ports.Commands
{
    public class GreetingEvent : Event
    {
        public GreetingEvent() : base(Guid.NewGuid()) { }

        public GreetingEvent(string greeting) : base(Guid.NewGuid())
        {
            Greeting = greeting;
        }

        public string Greeting { get; set; }
    }
}
        

We simply derive our class from Event and add a property that allows you to set the Greeting which we intend to send.

Step Ten

Once we have an event we need to add the code for its Message Mapper which we use to de-serialize the message from the wire protocol. Add a class GreetingEventMessageMapper to the project.


using Greetings.Ports.Commands;
using Newtonsoft.Json;
using Paramore.Brighter;

namespace Greetings.Ports.Mappers
{
    public class GreetingEventMessageMapper : IAmAMessageMapper<GreetingEvent>
    {
        public Message MapToMessage(GreetingEvent request)
        {
            var header = new MessageHeader(messageId: request.Id, topic: "greeting.event", messageType: MessageType.MT_EVENT);
            var body = new MessageBody(JsonConvert.SerializeObject(request));
            var message = new Message(header, body);
            return message;
        }

        public GreetingEvent MapToRequest(Message message)
        {
            var greetingCommand = JsonConvert.DeserializeObject<GreetingEvent>(message.Body.Value);

            return greetingCommand;
        }
    }
}

A message has a header - where we write metadata about the message - and a body - where we write the contents of the message.

When mapping to a message, on the header, we set the Message Type to MT_EVENT because we want to allow any number of handlers to handle the message. The topic is used for routing. Subscribers to the message use the topic to indicate their interest in receiving the message

The body of the message is a JSON string representing the GreetingEvent

Because we don't send from this service, we don't need MapToMessage and could simply throw a NotImplemented exception instead.

When mapping back to a request we simply serialize the entity body into the Command/Event we want to raise.

Step Eleven

Now we need to add the handler, which actually does the work. Add a new class GreetingEventHandler to the project


using System;
using Greetings.Ports.Commands;
using Paramore.Brighter;

namespace Greetings.Ports.CommandHandlers
{
    internal class GreetingEventHandler : RequestHandler<GreetingEvent>
    {
        public override GreetingEvent Handle(GreetingEvent command)
        {
            Console.WriteLine("Received Greeting. Message Follows");
            Console.WriteLine("----------------------------------");
            Console.WriteLine(command.Greeting);
            Console.WriteLine("----------------------------------");
            Console.WriteLine("Message Ends");
            return base.Handle(command);
        }
    }
}
        

We derive from RequestHandler to reduce the boiler plate code we need to write, and override the Handle() method to provide an implementation that just echoes the greeting out to the console.

Step Twelve

Build the project

Step Fourteen

Now we need to configure the service to read from the input channels

Amend your app.config file as follows

Add the following to your configSections, for the RMQ consumer and log4net


<section name="serviceActivatorConnections" type="Paramore.Brighter.ServiceActivator.ServiceActivatorConfiguration.ServiceActivatorConfigurationSection, Brighter.serviceactivator" allowLocation="true" allowDefinition="Everywhere" />
<section name="rmqMessagingGateway" type="Paramore.Brighter.messaginggateway.rmq.MessagingGatewayConfiguration.RMQMessagingGatewayConfigurationSection, Paramore.Brighter.messaginggateway.rmq" allowLocation="true" allowDefinition="Everywhere" />
<section name="log4net" type="log4net.Config.Log4NetConfigurationSectionHandler, log4net" />
        

Add the rmwMessagingGateway section and the serviceActivatorConnections, which configures both the AMQP URI for your RabbitMQ server (amend if you are not using defaults) and the channel over which you subscribe to messages


<rmqMessagingGateway>
    <amqpUri uri="amqp://guest:guest@localhost:5672/%2f" />
    <exchange name="Brighter.exchange" />
</rmqMessagingGateway>
<serviceActivatorConnections>
    <connections>
        <add connectionName="paramore.example.greeting" channelName="greeting.command" routingKey="greeting.command" dataType="Greetings.Ports.Commands.GreetingEvent" timeOutInMilliseconds="200" />
    </connections>
</serviceActivatorConnections>
        

We also need to configure log4net:


<log4net>
<appender name="ConsoleAppender" type="log4net.Appender.ConsoleAppender">
<layout type="log4net.Layout.PatternLayout">
<conversionPattern value="%date [%thread] %-5level %logger %ndc - %message%newline" />
</layout>
</appender>
<root>
<level value="DEBUG" />
<appender-ref ref="ConsoleAppender" />
</root>
</log4net>
        

For convenience, the app.config should look like this:


<?xml version="1.0" encoding="utf-8"?>
<configuration>
    <configSections>
        <section name="serviceActivatorConnections" type="Brighter.serviceactivator.ServiceActivatorConfiguration.ServiceActivatorConfigurationSection, Brighter.serviceactivator" allowLocation="true" allowDefinition="Everywhere"/>
        <section name="rmqMessagingGateway" type="Paramore.Brighter.messaginggateway.rmq.MessagingGatewayConfiguration.RMQMessagingGatewayConfigurationSection, Paramore.Brighter.messaginggateway.rmq" allowLocation="true" allowDefinition="Everywhere" />
        <section name="log4net" type="log4net.Config.Log4NetConfigurationSectionHandler, log4net" />
    </configSections>
    <log4net>
        <appender name="ConsoleAppender" type="log4net.Appender.ConsoleAppender">
            <layout type="log4net.Layout.PatternLayout">
               <conversionPattern value="%date [%thread] %-5level %logger %ndc - %message%newline" />
            </layout>
        </appender>
        <root>
            <level value="DEBUG" />
            <appender-ref ref="ConsoleAppender" />
        </root>
    </log4net>
    <runtime>
    </runtime>
    <rmqMessagingGateway>
        <amqpUri uri="amqp://guest:guest@localhost:5672/%2f" />
        <exchange name="Brighter.exchange" />
    </rmqMessagingGateway>
    <serviceActivatorConnections>
        <connections>
            <add connectionName="paramore.example.greeting" channelName="greeting.command" routingKey="greeting.command" dataType="Greetings.Ports.Commands.GreetingEvent" timeOutInMilliseconds="200" />
        </connections>
    </serviceActivatorConnections>
</configuration>
        

Step Fifteen

Once the example is built you can run it using F5 within Visual Studio, or navigate the binary and run that directly, as Topshelf supports running as a console application.

To test the service use the Rabbit MQ management website, to post to a Greeting to the queue that the service will just have created when you ran it.

To get the service working you only need to dispatch a simple message body to the queue

{"Greeting":"hello world","Id":"0a81cbbc-5f82-4912-99ee-19f0b7ee4bc8"}

You can do this in the Publish Message section of RabbitMQ

And you should be able to observe the greeting you entered being output in the console

Next Steps

The Tasks Example contains a full example of a distributed application, that contains a user-agent client, a REST API, and a Windows Service that consumes work from a Task Queue