Sunday, December 18, 2011

Building a windows service that never fails

A process that runs as a windows service is meant to provide a service or do work without stopping, it should always be there and provide that service or do the work when it needs to be done, it should never fail. These services could be running mission critical processes, executing trades for banks, or transactions for e-commerce platforms, insurance claims for insurers, the list goes on. Almost all companies have these systems and the impact a failed service can have on a company and resources can be significant. People either having to scramble to work through a backlog, or are being called at out of office hours to restart a failed system.
These services fail all to often though, and for really bad reasons. For mature products (that have gone through a thorough QA cycle) the cause is most often the unavailability of another resource. This can be a database that's busy and is generating timeout errors, or has gone temporarily offline ( a reboot or similar), an external webservice that's not available or any number of other resource with availability issues.
In this blog I'm going to show you how to build high availability windows services that can deal with these and many other issues, recover or even fix failures and continue to execute their mission critical task with minimal delay.
I started working on this problem at Merrill Lynch. The director I was working for suggested at one of the many design meetings I had with him, that I figure out a way to design a service pattern that could withstand temporary db, network and other service outages. That initial design has evolved many iterations, this one is I think the most concise and cleanest. It can be adapted to work outside of the service framework as well, by incorporating it into an exception management framework for example.
The Service State Engine
This solution uses the 'State Engine' or 'State' design pattern from the 'Gang of Four' to solve the problem. A services is typically in the 'Running' State, but when exceptions that can't be handled occur, the service changes into it's 'Diagnostic' State. In this state, the service tries to determine what happened, it may probe the environment to come to a conclusion, but typically, a regular expression analysis of the exception message is enough to diagnose the error. The state of the service then changes to the 'Recovery' state. Diagnostic information is provided to the recoverer and depending on the diagnosis, it can take different actions to get the system back to the 'Running' state. In the case of outage for example, the recovery state may simply wait until the outage is resolved and then resumes work by changing back to the 'Running' state. Often the Recoverer will actively take steps to 'fix' a problem, this could be starting an external service that it depends on, or any other type of action that a user would have to take to get a system back in working condition.
Design & Implementation
This solution is easy to apply to any given service because the RunningState is essentially the service as it would be implemented without the Diagnostic/Recover states. By separating out the Diagnostic and Recovery States into their own classes we give them a single responsibility, it keeps the solution clean and ensures that the states don't muddle each other logic. Transitions from one state to another are managed through the ServiceController. The Service States fire events as necessary and the controller receives them and assigns a new state to manage the events as necessary (shown below).
Services tend to follow a set pattern, a list of work items is retrieved and each item is then processed, when the list of items is complete, a new batch of work items is retrieved and worked on, ad infinitum. Here's an example for what the RunningState's start() method would look like in a generic sense.
       public void Start()
        {
            try
            {
                Queue workList = getWorkItems();
                while (workList.Count > 0)
                {
                    try
                    {
                        var workItem = workList.Dequeue();
                        doWork(workItem);
                        if (workList.Count = 0)
                            workList = getWorkItems();
                    }
                    catch (Exception ex)
                    {
                        logger.Error("Exception encountered during gateway processing", ex);
                        SendStatusEvent(new StateEventArgs { state = StateEvent.Errored, stateDetails = ex });
                    }
                }
            }
            catch (Exception outerEx)
            {
                logger.Error("Outer exception raised", outerEx);
                SendStatusEvent(new StateEventArgs { state = StateEvent.Errored, stateDetails = outerEx });
            }
        }
         
This type of solution needs two exception catchers, one for the inner loop that works on each work item, and then an outer catchall that catches anything else. The inner catch could also have been incorporated into the 'doWork()' method. The inner catch block ensures that the remaining work items in the list get processed.
When an exception is caught, it is logged and the StatusChangeEvent is fired to be caught by the controller.
The code below is a snippet from the service controller, the code snippet is really the heart or brains of the controller. When the states fire events the controller intercepts them and assigns a new state manager as necessary to deal with the event. Line 5 shows the controller handling an error event, it changes state to the diagnosticState and sets the exception property for the diagnostic to analyze. The responsibility of the diagnostic is as its name implies to diagnose the error. It is usually enough to just analyze the error message, but it may sometimes be necessary to probe the environment for more information if the error string doesn't provide enough information for a complete diagnosis.
void ServiceStateBase_StatusChanged(object sender, EventArgs<StateEventArgs> e)
{
    switch (e.EventData.state)
    {
        case StateEvent.Errored: ChangeState(diagnosticState);
            diagnosticState.SetException((Exception) e.EventData.stateDetails);
            diagnosticState.Start();
            break;
        case StateEvent.Diagnosed: ChangeState(recoveryState);
            recoveryState.Diagnosis = (Diagnostic) e.EventData.stateDetails;
            recoveryState.Start();
            break;
        case StateEvent.Recovered: ChangeState(runningState);
            runningState.Start();
            break;
        case StateEvent.Stopped : ChangeState(runningState);
            //doesn't call the start automatically, waits for user to do so.
            break;
               
    }
}
        
The diagnosis of an error is very specific to the environment, dependencies and logic of the solution that has been implemented. Solutions tend to exhibit failure trends that start to show themselves as soon as a product goes into integration testing, by the time the product goes into production, you'll have encountered most of the failure scenarios. As you encounter them, you should update the diagnostic to be able to recognize them, and the recoverer to be able to deal with them. The code snippet below deals with the most common errors I've discussed above. We first deal with the sudden unavailability of external systems. With regard to the sample solution that's a companion to this blog that would be the database and messagebus. We can recognize those issues by simply looking at the exception type or analyzing the exception message.
protected void Diagnose(Exception e)
{
    if (MessageBusError(e))
    {
        diagnosis = Diagnostic.MessageBusUnavailable;
    }
    else if (DatabaseError(e))
    {
        diagnosis = Diagnostic.DatabaseError;
    }
    else if (errorsInInterval() < 100)
    {
        diagnosis = Diagnostic.Unknown;
    }
    else
    {
        logger.Fatal("Fatal error occured", exception);
        diagnosis = Diagnostic.FatalForUnknownReasons;
    }
}  
        
Other exception may occur and I've added a counter and interval check to recognize a sudden surge in exceptions that could indicate a new failure scenario that's as yet unhandled. If that's the case, the system will report it and diagnose the error as a Fatal error.
Recovery from database failure is very simple, you wait a set amount of time and try test the connectivity, when the database is available again a "recovered" event is fired and the controller sets the state to the "runningState" again. This recovery scenario applies to most external systems that go offline for some reason.
In "real life" scenarios, I've had to deal with external services that would periodically fail, but who I could programatically restart from within the service I was building. In those instances, the recoverer would execute the restart logic and a previous failure that would have killed the service was recognized and fixed within milliseconds of occurring. In fact this previously common failure scenario was all but considered fixed, even though the issue kept recurring, but it no longer had an impact on dependent services. By far the most common 'recovery' scenario though is simply waiting for an external system to come back online.
The beauty of this system is that it really works well, in fact it works so well that it's sometimes easier to recover from a defect in another system than it is to fix that system. And that is something that you do have to keep in mind, not to just treat the symptoms but to still go after the root cause. However, the benefits to having a system that can solve problems, instead of just quitting is that it creates a much more stable environment. In an enterprise of dependent systems failures occur often and it's nice not to be called out of bed to restart a service and try to figure out what happened. Rebooting a machine or restarting a database no longer requires the coordinated effort of an operations team spread out over many divisions when the services that depend on it can deal with the temporary outage.
The entire solution can be found in this blogs companion website, the source code repository at CodePlex. The LogService is built using the Service State Engine described in this blog.
Happy Coding.

Saturday, November 12, 2011

Enterprise solution for Logging

This blog and its companion blog are about building example solutions for a distributed architecture. The whole idea behind a distributed architecture is to loosely couple expert systems together using a messaging infrastructure. As examples for expert systems we use Notifications and Logging because almost all projects include them, and as such they are great candidates for being transformed into expert systems. This blog focuses on Logging.
Almost all development projects include a log solution. Projects typically choose log4j or log4net or a custom solution. For large companies these log solutions can cause problems. The benefits to logging is that they contain information about the running of an application, the work it has done and the problems it has encountered along the way. This information is valuable to the development teams to learn how the application behaved and what needs to be done to improvement it. This great benefit can potentially become a liability. In the wrong hands this information can be used to discover the applications vulnerabilities. Log systems also typically generate volumes of content. In an enterprise with a great many applications under management, this volume of information can be excessive and very expensive to store, organize and generally manage. Just making sure that developers configure their application according to the corporations standards requires many meetings and a lot of coordination. Disk capacity for Logs has to be ensured to avoid failure. Ultimately the typical log solution tightly couples the system to a component that is not mission critical.
In conclusion, logging needs to be a managed process, that is standardized, secured and monitored and decoupled from the primary application, leaving it do what it needs to do without having to worry about something that is not central to its task. This way each environment can be set up according to the needs of that environment. A development environment will save all log events, successive environments will log less until the product reaches production at which point only the most important messages are logged. Having a centralized logging system ensures that all environments behave in the same way, there's no need for coordination, app developers are given components to integrate with and are not allowed to write to disk. An expert system is required. As this is series on distributed expert systems, let's look at a solution that uses an advanced messaging systems at it's core. As you can see from the image, the solution will end up including a notification system, but that will be described in another blog.

Applying the pattern to Logging

We've discussed advanced messaging before, as well as the ever so reliable product RabbitMQ that is based on the distributed telecom language Erlang. The message bus solution can be clustered and therefore scales well, it is very fast and reliable. It is also available as a cloud solution from VMWare. The advantage of using a message bus between an application and an expert system is that it completely decouples the two, there's one universal solution to logging, one standard that is set by the corporation and can include any number of components to ensure that the content is secured and far away from 'prying eyes'.
Putting such a system together is very easy. All it requires is a very simple Facade that simply hands off the logging functionality to the message bus to be picked up by the 'Expert' logging system. That expert system, at the very least is as complicated as the original solution (that was implemented umpteen times across all applications), but now that this is centralized we can focus a little more attention to making sure the content is secured - both in terms of content, as well as access.

Implementation

Before I start to code a solution, I usually try to define my namespaces by looking at my layer diagram. Logging is a utility, and there will be many other utilities, notifications, exception management, session management, caching, wmi instance logging, the list goes on. Most of these utilities are candidates for standardization even going so far as to create dedicated expert systems for. So a utilities namespace makes sense at a root level. The MessageBus is part of the infrastructure, much like a Database server is or web services that we may consume. So the infrastructure namespace will also exist off the root. The expert systems that provide these type of plumbing services I have aggregated into a 'Monitor' namespace, not sure if that's the right name, but it will do for now.
The layer diagram looks like this.

the logging interface, for the purposes of this example we define an extremely simple interface
namespace YEA.Utilities.Logging
{
    public interface ILog
    {
        void write(string message, LogType level);
    }
}
The Interface is implemented by a Logger class that implements the write method as follows.
        public void write(string message, LogType level)
        {
            var sc = new LogToMessageConverter(level);
            var publisher = _gf.GetPublisher(_publisherName, sc.ConvertObjectToMessage);
            publisher.Publish(message);
        }
And the serialization of the log message implemented by the following:
public static class LogPropertiesStandards
    {
        public const string LogLevelPropertyName = "LogLevel";
    }
    public class LogToMessageConverter : YEA.Infrastructure.Gateway.Converter.StringToMessageConverter
    {
        public LogType LogLevel { get; set; }
        public LogToMessageConverter(LogType level)
        {
            LogLevel = level;
        }
        public override Message ConvertObjectToMessage(RabbitMQ.Client.IModel channel, object packetToSend)
        {
            Message message = base.ConvertObjectToMessage(channel, packetToSend);
            message.Properties.Headers.Add(LogPropertiesStandards.LogLevelPropertyName, (int)LogLevel);
            message.RoutingKey = UtilitiesHelper.getMyProgramFullName();
            return message;
        }
    }
The converter derives from the simple string to message converter (line 5) and we get the message by calling the base class conversion method ( line 14). All that remains is to set a few properties and the routing key appropriately. The routing key is essentially the application namespace of the calling executable. The log directories on the receiving side will replicate the application namespace. If the namespace follows industry standards, the log files of each department will be located in the same subtree and securing and granting access to the subtree is simplified.
namespace: Company.Organization.Department.BusinessUnit.ApplicationGroup.Application will result in
directory: <drive>:Company/Organization/Department/BusinessUnit/ApplicationGroup/Application.Log
The expert logging system uses log4net with dynamic configuration to save log entries to the directories show above. here's a code extract of the important bits.
using log4net;
using log4net.Repository.Hierarchy;
using log4net.Core;
using log4net.Appender;
using log4net.Layout;
using System;
using System.IO;
using System.Configuration;
using System.Text;

namespace YEA.Monitor.LogManager
{
    public static class Logger
    {
        private const string LOG_PATTERN = "%d [%t] %-5p %m%n";
        private const string ConfigDriveKey = "LogDrive";
        private static char Drive;

        static Logger()
        {
            Drive = GetDriveLetter();
            log4net.LogManager.GetRepository().Configured = true;
        }
        private static char GetDriveLetter()
        {
            var driveLetter = ConfigurationManager.AppSettings[ConfigDriveKey];
            if (driveLetter == null)
                throw new ConfigurationErrorsException(string.Format("Configuration key: {0} is expected", ConfigDriveKey));
            return Convert.ToChar(driveLetter);
        }

        static void CreateDir(string name)
        {
            Directory.CreateDirectory(name);
        }
        private static string ConvertToPath(string name)
        {
            if( string.IsNullOrWhiteSpace(name) )
                throw new ArgumentException("A valid name has to be supplied to the logging application");
            var names = name.Split('.');
            if (names.Length == 1)
                return Drive + @":\InCorrectNamespaceDump\" + name + ".log";

            var builder = new StringBuilder(Drive + @":");
            for (int i = 0; i < names.Length - 1; i++)
            {
                builder.Append(@"\");
                builder.Append(names[i]);
            }
            
            return builder.ToString();
        }
        static IAppender CreateAppender(string repositoryName)
        {
            PatternLayout patternLayout = new PatternLayout();

            patternLayout.ConversionPattern = LOG_PATTERN;
            patternLayout.ActivateOptions();

            var path = ConvertToPath(repositoryName);
            CreateDir(path);
            var builder = new StringBuilder(path);
            builder.Append(@"\");
            builder.Append(repositoryName.Substring(repositoryName.LastIndexOf('.')));
            builder.Append(".log");
            var fileName = builder.ToString();

            RollingFileAppender roller = new RollingFileAppender();
            roller.Name = repositoryName;
            roller.Layout = patternLayout;
            roller.AppendToFile = true;
            roller.RollingStyle = RollingFileAppender.RollingMode.Size;
            roller.MaxSizeRollBackups = 4;
            roller.MaximumFileSize = "100KB";
            roller.StaticLogFileName = true;
            roller.File = fileName ;
            roller.ActivateOptions();
            return roller;
        }

        public static ILog Get(string name)
        {
            //var h = CreateHierarchy(name);
            var log = log4net.LogManager.GetLogger(name);
            var l = (log4net.Repository.Hierarchy.Logger)log.Logger;

            if (l.GetAppender(name) == null)
            {
                l.RemoveAllAppenders();
                l.AddAppender(CreateAppender(name));
                l.Level = Level.All;
            }
            
            return log;
        }
    }
}
The interesting bit in this implementation of log4net is that it has to dynamically create an appender as a new application (represented by it's namespace) starts sending log messages. The method CreateAppender does that and the Get method assigns the appender if the Logger it has retreived doesn't have one.
So what happens in this scenario if the log directory accidentally get's full? The 'expert system' will likely fail. But because it is loosely coupled to the application that has sent the log request, thanks to the messagebus that sits between them, the application isn't affected. In fact it isn't even aware of the failure. The messagebus will continue to collect log requests, queueing them up until the 'expert system' has resolved the issue, at this point all log requests are processed and it's as if the failure never happened. The message bus will gives the system the buffer ( depending on it's disk capacity) it needs to fix the inevitable temporary outages. The other advantage is that maintenance windows can be implemented without forcing all dependent applications to stow working. All of this adds to a more stable system, that can tolerate failure without causing a chain reaction of failures downstream potentially affecting mission critical applications.
The added advantage of decoupling the logging system from the applications that use it, is that there's now no direct relationship between the location of the log files and the location of the applications that generate them. The two could be located on completely separate networks or domains with one not giving a clue to the location of the other. That makes it that much easier to prevent unwanted eyes from gaining access to an applications vulnerabilities.
Here is where you can find the entire source code solution discussed in this blog

Monday, November 7, 2011

An Enterprise Solution to Notifications

In an earlier blog post I discussed the many benefits of a a distributed architecture of expert systems. This post is about implementing this architecture to solve a problem we all have. Consider the act of sending notifications. It is another typical tasks that most development projects include in some way (the other one being logging that I've also written about). It can be used to notify users and administrators of state changes that occurred in an application particularly when exception are raised and a variety of other reasons amongst them, automatically send business reports to vested parties or send auto generated confirmation emails to end users.

Notifications however also represent a significant security risks. Imagine, those automated reports being emailed to a business persons personal email address and then that person joins the competition. Even with policies in place, it's difficult to ensure that nothing falls between the cracks. In addition, those same application that log that vulnerable application data often send that same information to users in emails.  Now anyone with the right equipment can pick those up and use it to compromise those systems. To summarize, if we were going to build an expert notification system, it would have to protect us from these email violations
  1. Emails containing information about the internal working of applications can not be sent to recipients outside the corporate network.
  2. Emails containing confidential customer information such as social security numbers, credit card numbers etc can not be sent to people outside of the corporate network
  3. Emails containing confidential financial business reports.  Business applications often include reports measuring the business units performance.  These reports are also often automatically sent out to managers.  These also can not be sent to users outside of the corporate network.
These do not represent all of the security risks notifications can include, but indicate that these systems need to be tightly controlled and managed.
Additionally, almost every company I've ever consulted with has had a situation of an out of control application sending thousands or millions of exception emails to users, bringing the corporate email exchange to it's knees.
An expert system is needed, one that is dedicated to monitoring what is sent out and making sure that the security of the company isn't being compromised.

Distributed Notifications

The approach to designing a system like that is to leverage messaging to design a very flexible solution that is both scalable, fault tolerant and decoupled.
A generic approach to implementing such a solution is to have applications implement the desired features by integrating with a Facade to the Expert System.  The facade sends the information to a message bus and the expert system receives the information and processes it.  The advantages of this system is that the transmitting and receiver applications are completely decoupled from each other.  The receiver can be maintained, developed and scaled completely independently from the clients that use it.  As long as the recipient can read the message sent by the transmitters, the recipient can go through its' release cycles without affecting the deployed applications that make use of it in any way.

The design approach to Notifications and Logging looks as follows.

The diagram above shows how various applications send notification and log messages to exchanges. The LogManager reads the queue, analyzes the message and saves it in a secured centralized storage environment. The NotificationManager, analyzes the messages coming off its queue, those that violate the email policy are rejected and notifications sent to the applications. Throttling rules are also applied to ensure that the infrastructure is not overwhelmed with processes gone amok. If everything checks out, the notification is sent to the email exchange.

Technical Design

The following is the layer design of this system.  The Notification and Log wrappers are bundled in a Utilities assembly.  The Gateway solution includes all of the technologies necessary to communicate with the message bus and is incorporated into a Infrastructure assembly that will also include interface components with other infrastructure systems such as a Database.  The expert utility systems are wrapped up in a Monitor assembly that will include other standard utility type expert systems.

The gateway solution that's referred to above is documented in an earlier blog that you can read here.

Implementation

The Notification Facade is extremely simple to implement.  All they do is take the information and serialize them to a message that the messagebus can handle.
Here's an example of the Notification Facade, using the Gateway implementation

But first we define the notification interface, for the purposes of this example we define an extremely simple interface
public interface INotify
    {
        void Transmit(Notification message);
    }
The Interface is implemented by a Notifier class that implements the Transmit method as follows.
 public void Transmit(Notification message)
        {
            server.Send(message);
        }
And the serialization of the Notification message implemented by a JSON serializer as following:
public Message ConvertObjectToMessage(RabbitMQ.Client.IModel channel, object packetToSend)
        {
            Notification message = (Notification)packetToSend;
            if (channel == null)
                throw new ApplicationException("The channel is null, a valid channel is required to create a message");
            
            byte[] bytes = null;
            var properties = channel.CreateBasicProperties();
            var json = JsonConvert.SerializeObject(message);
            bytes = Encoding.GetEncoding(CharSet).GetBytes(json);
            properties.ContentType = CONTENT_TYPE;
            properties.ContentEncoding = CharSet;
            properties.Headers = new Dictionary<string, string>();
            properties.Headers.Add("type", message.GetType().ToString());
            return new Message() { Body = bytes, Properties = properties, RoutingKey = string.Empty };
        }
The converter uses the JSON serializer (see line 9). All that remains is to set a few properties and the routing key appropriately (lines 11 - 13).
I'm not an expert on the trigger words that should block emails, that would not only be a project unto itself, it would probably constitute an entire business model. However, the general solution would take the form below. A list of words will trigger warnings, those words I have in one regular expression. If there are regex performance experts out there, I'd be curious to know if a long list of or'd expressions is more efficient than running through a long list of regular expressions of one word. I'm assuming the or'd way is the way to go. Here we have regex lists for the three violations mentioned earlier.
namespace YEA.Monitor.NotificationManager
{
    public interface IContentAnalyzer
    {
        ContentFlag Analyze(string content);
    }
    public class ContentAnalyzer : IContentAnalyzer
    {
        private Regex BlockedWords;
        private Regex IUOWords;
        private Regex ApplicationData;

        public ContentAnalyzer()
        {
            RegexOptions options = RegexOptions.IgnoreCase | RegexOptions.Singleline;
            var blockedPattern = @"(credit\s+card|creditcard|\d{14})";
            var iuoPattern = @"(report|social\s+security\s+number|ss#|\d{3}\s+\d{2}\s+\d{3})";
            var appData = @"(stacktrace|exception)";
            BlockedWords = new Regex(blockedPattern, options);
            IUOWords = new Regex(iuoPattern, options);
            ApplicationData = new Regex(appData, options);
        }
        public ContentFlag Analyze(string content)
        {
            ContentFlag result = ContentFlag.Ok;
            if (BlockedWords.IsMatch(content))
                result |= ContentFlag.Blocked;
            if (IUOWords.IsMatch(content))
                result |= ContentFlag.InternalUseOnly;
            if (ApplicationData.IsMatch(content))
                result |= ContentFlag.InternalUseOnly;
            return result;
        }
    }
}
     
The result is an enumeration flag that can be combined the same way RegexOptions, I found a nice blog that told me exactly how to create an enumerator that behaves as a bit flag here. Not that it's really needed, but it's cool and using it isn't entirely ridiculous.
[Flags]
public enum ContentFlag
{
    Blocked,
    InternalUseOnly,
    Ok
}
     
The bit flag is used to determine the result of the analysis, and that is used to determine wether to send the notification or block it and respond with an error.
public ContentFlag Check(Notification notification)
        {
            var content = notification.Subject + " " + notification.Body;
            var result = Analyzer.Analyze(content);
            bool isOutgoingMail = checkEmailAddresses(notification);
            
            return isOutgoingMail ? result : ContentFlag.Ok;
        }
Line 4 returns the result from a content analyzer, it determines if the content violates any of the requirements noted at the beginning of this blog. That result is returned in this check only if one of the email recipients is outside of the network. We could also have pulled that email out of the recipient list and return it some sort of hybrid result to the caller, letting it know that there was an outside recipient, but that it was pulled. In this case we've decided not to do that, but to just return an error to the sender for the entire post. The notification (given the nature of the system) is probably automated and having internal emails go to an outside recipient is never a good idea - good habits need to be established.
var result = Check(notification);
   if (result > ContentFlag.Ok)
       SendErrorNotification(ContentFlag.Blocked);
   else
       client.SendAsync(notification, deliveryTag);
Based on the result, the email is either sent (Line 5) or an error notification is sent instead.
Here is where you can find the entire source code solution discussed in this blog

Sunday, November 6, 2011

Creating an AMQP Sample App using RabbitMQ

This sample app is written using .NET 4.0 in C#.
The entire solution can be found at the codeplex repository.

RabbitMQ provides a .net client library appropriately called RabbitMQ.Client.
Documentation on the client can be found here.
As with all new technologies, the documentation is bare bones, the api is kind of raw and disorganized in that almost everything is accessible from one object - IModel.  So one of the first things to do is to:
  1. Make it a little easier to use by organizing the functionality around our most common use cases.
    1. Publish something.
    2. Receive something.
  2. Separate the declarative from functional.  
    • The process of creating exchanges, queue's and bindings is separate from
    • The process of publishing and receiving messages.
  3. Make it configurable using the app.config file.
Before we designing the simplified api's described above, let's take a brief tour of the functionality the RabbitMQ.Client provides.
Here's example code to highlight how the RabbitMQ.Client api allows us to interact with the RabbitMQ server.
// Creating Connections
RabbitMQ.Client.ConnectionFactory factory = new RabbitMQ.Client.ConnectionFactory();
factory.Endpoint = new AmqpTcpEndpoint(server);
factory.UserName = userName;
factory.Password = password;

IConnection Connection = factory.CreateConnection();
To connect to the RabbitMQ server a connection has to be established between the client and the server as show above. Line 7 creates the connection that we're going to use in the following example.

The connection is not what we're going to use to communicate with the server, instead we use dedicated communication channel called IModel object that the connection object creates for us.

IModel is the communication channel between the client and the broker and multiple channels can be made.

One of the RabbitMQ oddities is that the IModel object can't be shared between threads and we need to make sure to take that into account when designing our applications.
The next section shows some of the most commonly used features that the channel object provides.
using (IModel channel = Connection.CreateModel())
{
    ... // Object declaration //
    channel.ExchangeDeclare( /* parameters */);
    channel.QueueDeclare( /* parameters */);
    channel.QueueBind( /* parameters */);

// Publishing 
     channel.BasicPublish( /* object parameters */);

// Synchronous receiving
    channel.BasicGet( /* parameters */);

// Acknowledging a message
   channel.BasicAck( /* parameters */);

// Reject a message
   channel.BasicReject( /* parameters */);

// Requeue a message
   channel.BasicReject( /* parameters - make sure to set requeue = true */);
}
Oddly enough Requeueing messages uses the BasicReject(...) method, BasicReject has a requeue parameter, and by setting that to true, the message is requeued.

Creating a dedicated AMQP App Config Section

When we use RabbitMQ, we have to configure the server to appropriately receive and distribute messages to clients as well as configuring our application to send or receive messages from the appropriate exchange or queues.

The easiest way to do that is to use the configuration file to do so, this also allows us to easily change these values as our application get's deployed to various environments.

I chose to create a dedicated configuration section for RabbitMQ because there's a lot to configure and I wanted to keep all of the RabbitMQ configuration elements together. The result is shown below (code can be found at CodePlex project: codeplex_link



  
    
      
Description of the configuration file by Lines
  • 4, 7: Map the configuration section interpreter to the appropriate class in your project. Each xml element represents an object and has to be interpreted
  • 13. Creates a topic exchange called orders that is durable ( meaning it will persist a server reboot) and is not autodelete ( if the autodelete flag is set to true, the exchange will be removed when all clients are done publishing to it).
  • 16. Creates a queue called "uk_orders" to represent all orders for UK customers. The queue is set to durable and not autodelete
  • 19. Binds the "uk_orders" queue to the "orders" exchange using the "order.uk.#" subscription key. That way all orders starting with "order.uk" will end up in that queue
  • 25: Configures the connection string to the server.
  • 26,. Configures the publisher to publish messages to the order exchange /li>
  • 27. Configures the asynchronous receiver to use 4 threads to pick up orders from "uk_orders" queue as they arrive

The configuration sections can accept lists of each type of AMQP object (exchange, queue and binding).

RabbitAdmin

So what does this look like when used in creating exchanges, queues and their bindings?
namespace Sample.Configuration.AMQP.Admin
{
    public class RabbitAdmin
    {
        internal static void InitializeObjects(IConnection Connection)
        {
            var config = ConfigurationManager.OpenExeConfiguration(ConfigurationUserLevel.None);
            var objects = config.GetSection("AMQPAdmin/AMQPObjectsDeclaration") as AMQPObjectsDeclarationSection;
            if (objects != null)
            {
                Parallel.For(0, objects.ExchangeList.Count, i =>
                {
                    using (IModel channel = Connection.CreateModel())
                    {
                        var exchange = objects.ExchangeList[i];
                        channel.ExchangeDeclare(exchange.Name, exchange.Type.ToString(), exchange.Durable, exchange.AutoDelete, null);
                    }
                });
                Parallel.For(0, objects.QueueList.Count, i =>
                {
                    using (IModel channel = Connection.CreateModel())
                    {
                        var queue = objects.QueueList[i];
                        channel.QueueDeclare(queue.Name, queue.Durable, queue.Exclusive, queue.AutoDelete, null);
                    }
                });
                Parallel.For(0, objects.BindingList.Count, i =>
                {
                    using (IModel channel = Connection.CreateModel())
                    {
                        var binding = objects.BindingList[i];
                        channel.QueueBind(binding.Queue, binding.Exchange, binding.SuscriptionKey);
                    }
                });
            }
      }
}
The RabbitAdmin class creates the exchanges, queues and their bindings. It takes a connection object and uses that to communicate with the RabbitMQ server. The InitializeObjects uses the parallel for loop to create each type of object. Exchanges are created first, then queues, bindings are declared the last since they need both the queue and exchange to be there to bind the two.

GatewayFactory

The GatewayFactory, creates the connection to the server, calls RabbitAdmin to declare all the objects and provides methods to create the publisher and asynchronous listener helper objects. Messages are passed to the publisher and received from the queues, the message object is a very simple object, it contains a header and a body.
    /// 
    /// The object encapsulating all the common message properties transmitted to and received from the message bus.
    /// 
    public class Message { 
        public IBasicProperties Properties { get; set;}
        public byte[] Body { get; set; }
        public string RoutingKey { get; set; }
    }
The body is a simple byte array that your internal data structure is serialized to.
To be able to generically handle conversions between data structures and messages, the following delegates and interfaces are included in teh package
namespace Sample.Configuration.AMQP.Gateway
{
    public delegate Message ConvertToMessage(IModel channel, object packetToSend);
        
    public interface IConvertToMessage {
        Message ConvertObjectToMessage( IModel channel, object packetToSend);
    }
}
The client of the system takes care of converting from their data structures to the message structure.

Publishing

 
             using (var gf = new GatewayFactory())
            {
                var mc = new Sample.Configuration.AMQP.Gateway.Converter.StringToMessageConverter();
                var publisher = gf.GetPublisher(mc.ConvertObjectToMessage);
                publisher.Publish("Hello world");
            }
With configuration in place, publishing is a four line affair. Simply create a GatewayFactory, that internally reads the configuration file and set's everything up. Then request a publisher and pass a objectToMessage conversion handler. The library comes with a default string to message converter that is used in line 5 which is useful for xml documents. Otherwise a custom converter will have to be created. To give you an idea of how to create a converter, let's take a look at the string converter.
 
    public class StringToMessageConverter : IConvertToMessage
    {
        public static readonly string PLAIN_TEXT = "text/plain";
        public const string _defaultCharSet = "utf-8";
        public string CharSet { get; set; }
        public StringToMessageConverter()
        {
            CharSet = _defaultCharSet;
        }
        public virtual Message ConvertObjectToMessage(RabbitMQ.Client.IModel channel, object packetToSend)
        {
            var properties = channel.CreateBasicProperties();
            var bytes = Encoding.GetEncoding(CharSet).GetBytes((string)packetToSend);
            properties.ContentType = PLAIN_TEXT;
            properties.ContentEncoding = CharSet;
            return new Message() { Body = bytes, Properties = properties, RoutingKey = string.Empty };

        }
    }
Conversion to a message is primarily about converting whatever needs to be sent to a bit array, letting the receiver know what the content is and setting the RoutingKey.

The Asynchronous Receiver

Receiving messages asynchronously is as simple as publishing. The receiver has to communicate back to the server the status of the message - it can be acknowledged, rejected or requeued.
 
    class Program
    {
        static void Main(string[] args)
        {
            var mp = new MessageProcessor();
            using (var cf = new GatewayFactory())
            {
                cf.GetAsyncReceiver(mp.ConsumeMessage);
            }
        }
    }

    class MessageProcessor : IMessageConsumer
    {
        public void ConsumeMessage(Message message, RabbitMQ.Client.IModel channel, DeliveryHeader header)
        {
            try
            {
                var str = ConvertFromMessageToString(message);
                channel.BasicAck(header.DeliveryTag, false);
            }
            catch (Exception ex)
            {
                channel.BasicReject(header.DeliveryTag, true);
            }
        }
        public string ConvertFromMessageToString(Message message)
        {
            var content = string.Empty;
            if (message.Properties.ContentType == StringToMessageConverter.PLAIN_TEXT)
            {
                var encoding = Encoding.GetEncoding(message.Properties.ContentEncoding ?? "utf-8");
                var ms = new MemoryStream(message.Body);
                var reader = new StreamReader(ms, encoding, false);
                content = reader.ReadToEnd();
            }
            return content;
        }
    }
In Line 8 we pass the message handler to the asynchronous receiver. The async receiver is multi threaded and the handler is expected to be stateless so that it can be used by multiple threads at the same time. Responding to the server is done through the IModel object. The Delivery header contains a deliverytag and a flag to indicate if this was the first time it was delivered.
Something useful with the message can be done between lines 19 and 20.

Results in Action

To test this I simulated the e-commerce flow I've used as an example previously

This is the result
It shows how orders flow into the order exchange and from there to the queues that are interested in orders.
Here we see how shipments are sent to an exchange and then transmitted to queues that are bound to it.
You can find the source code for a working example of a distributed expert system using rabbitMQ at my source code repository

AMQP provider RabbitMQ


When we think of introducing advanced messaging to the enterprise and realize that this new technology may become the backbone of our enterprise we get a little concerned about issues like scalability, performance, stability, maturity of the product and its history in other enterprises.


We need the message bus to never fail, be blisteringly fast and to have proven itself in large industries.
RabbitMQ fits the bill.  It is used by Google, Mozilla, NASA as well as various financial institutions.  You can see here a list of companies using RabbitMQ.


Erlang:
RabbitMQ is built using Erlang.  Erlang is a language developed by Ericsson to, and I quote them here: 
'Build massively scalable, soft real-time systems with requirements on high availability'
In addition to Erlang, there are a set of libraries called OTP (Open Telecom Platform) that have also been developed in Erlang.  The types of applications Erlang/OTP is particularly suited for are Distributed, reliable, soft real-time concurrent systems.  A number of telecommunications companies use Erlang as does Facebook.  Facebook for example uses Erlang for their Chat backend.


VMWare - Cloud Foundry - vFabric:

Rabbit Technologies developed RabbitMQ and was recently bought by VMWare. VMWare has included the RabbitMQ application in their cloud solution called 'Cloud Foundry' or vFabric.


Rabbit technologies, by developing AMQP using Erlang has done a great job in creating a message bus that is highly stable and massively scalable.  Exactly what we need as our enterprise backbone.
RabbitMQ has another great feature, it's free.


If you're interested, you can read more here:


RabbitMQ Dowload
RabbitMQ Installation
VMWare vFabrid
Erlang
AMQP
AMQP Users

If you've chosen to install RabbitMQ on your own hardware, you'll also need management software.  There are a number of Nagios plugins that can be used to monitor your RabbitMQ installation, but RabbitMQ also comes with it's own management plugins.  You can find them here
RabbitMQ Management Plugins


I personally like the Management Visualizer, it allows you to visualize the topography of connected exchanges and queues.


Installing is a breeze, each plugin is compiled to the latest version and all it takes is to copy the plugins to the RabbitMQ plugin directory (rabbitmq_server_<version number>/plugin).


The installation and configuration instructions are located on two different pages, it's best to have both open in separate tabs/windows.


plugin installation instructions
management configuration


The management plugins make use of the mochiweb web-server and it defauls to port 55672, so if you already have another web-server installed on the machine and it's listening to the default http port (80) they should not interfere with each other.  I installed mine on a windows 2008 R2 web-server and they both worked at the same time.

you then have to remove the rabbitmq service, install the plugins and start the rabbitmq service again.
Like so:


rabbitmq-service.bat remove 

rabbitmq-service.bat install 

rabbitmq-service.bat start
then, add yourself as a user and give yourself the administrator role like so.


rabbitmqctl.bat add_user (username) (password)
rabbitmqctl.bat set_user_tags (username) administrator


To also be able to manage exchanges, queues and bindings you have to give yourself some additional priveledges.


rabbitmqctl.bat set_permissions (username) ".*" ".*" ".*"


there are three levels to the permissions - Config Write and Read
for each level you can enter a regular expression matching the object names the user can configure, write or read.  For an administrator, that would be all and the ".*" regular expression matches all names.


Now you can open your favorite browser and enter http://localhost:55672/mgmt/ and you'll see the below and you'll be able to manage all aspects of your RabbitMQ installation.







Introducing Advanced Messaging

History:
JPMorgan Chase developed the Advanced Message Queue Protocol (AMQP) to reinvigorate the messaging technologies that had become stagnant. This event has opened the door for many other companies to enter the industry, companies such as RedHat, Microsoft, Novell, Credit Suisse, Deutche Boerse Systems, Goldman Sachs and a number of smaller companies, such as Iona technologies, 29West and Rabbit technologies.
AMQP has now been taken over by the AMQP working group that includes a number of very large and smaller companies. More information can be found at the AMQP website


Advanced messaging is taking off, judging by the list of large companies embracing this technology, I'd say that AMQP is here to stay and it's time to get familiar with it.


What is AMQP
AMQP is a wire level protocol - it describes the data going over the wire, much as SOAP or RMI. The intention is for AMQP to promote interoperability between messaging systems.
It also provides a advanced publish and subscribe features which this blog is going to introduce.


What can we do with AMQP?





Advanced messaging introduces exchanges and routing/subscription keys to messaging. A publisher transmits messages that contain routing keys to an exchange, and a queue is bound to an exchange using subscription keys. This separates the publisher from the queue and from the consumer and allows for a number of interesting implementation variations.


Let's cover these new features in a little more detail.
Exchanges, Queues and Bindings
Exchanges are essentially brokers that determine which queue or queues will receive a message. Queues are bound to exchanges using subscription keys, and the broker determines which queue to send the message by comparing the routing key with the queue bindings subscription key. When a match is made, the message is passed to the queue. This architecture allows for multiple copies of a message to be sent to various queues that are bound to it when the keys match.


Routing and Subscription keys.
Routing keys are composed of dot separated strings, each string is considered a level in the key hierarchy.
Subscription keys are composed the same way but include two wild card characters.
*, will match a single string within one level, you can use more than one of these in a subscription key.
#, will match anything below a certain level - only one is allowed.
combinations of * and # are allowed.
for example, the following table illustrates how subscription keys match to routing keys.



routing key subscription keys

order.# order.UK.*
order.UK.store1
order.US.store1
order.CA.store2
order.UK.store2
order.CA.store2
order.UK.store2.shipment
shipment.UK.store2


A queue can be bound to multiple exchanges and to each exchange with multiple subscription keys. That allows us almost limitless variations in how we can bind a queue to exchanges.


E-Commerce Gateway.


Let's review an example of an retail system. The front end is either a point of sale system, e-commerce or wholesale order entry system. It's an international company with retail operations all over the world.


Regardless of the system, all orders and sent to the order exchange, and queues are bound to the exchange so that they pick up what they're interested in.








In the above example we have three e-commerce systems transmit orders to the order exchange. The e-commerce applications are unaware of the queues that are bound to the exchange, nor are they aware of what will be done with the orders once they're put in the exchange. Depending on the business need some systems will pick the order up immediately, like the warehouse management system, the business wants to account for the revenues as fast as possible and can only do so when the order ships. The warehouse is set up to immediately handle orders for it's region as they come on the queue. The accounting system doesn't have this sense of urgency, it recognizes revenue on a daily basis and picks the orders up twice a day, once at noon and once just before the end of the business day. Not shown is an email marketing system that listens to all orders and emails order confirmation emails on the hour. There's an archiving system that archives all messages as they come for auditing purposes.


The beauty and elegance of this system is that many systems can be tied together without impeding each other in any way. The failure of one component does not impact another, messages continue to get queued and once the system comes online again, processing starts where it left off with no loss of information. Integration is simple and the system is robust.


RPC Mode.








AMQP has another feature that allows us to use it in a 'RPC' mode. The client transmits a request to the exchange and multiple listeners pick it up, this allows us to distribute load amongst a variable amount of programs and have them respond to the client using a 'replyto' queue.


The 'replyto' queue can be made to be exclusive to the client and only last as long as the client is connected to the AMQP application. Messages are assigned a 'replyto' queue and the remote process responds to the client by placing a message on that queue. The client associates the response to the request using a correlation_id. The remote process has to make sure that it applies the correlation_id to it's response message and responds using the 'reply_to' queue.


In the example above, the query is distributed amongst several search engines. For this design to be effective the searchable space has to be divided into many smaller pieces, each search engine queries the searchable space it is responsible for ranks the results and responds to the client. The client collects the responses from the various engines, sorts them appropriately and returns the results to it's client.


The only drawback that I can see is that the client doesn't know when the response is complete. There is no 'end of response' marker to let the client know that all respondents have answered. One way around that is for the client to know how many responses to expect and simply count until all have been received. But at that point the client is strongly coupled to the number of processes behind the message wall. Another way is for the apps to coordinate amongst each other and determine who is the last to respond. The process who is last adds an 'end of response' marker that the client recognizes. This approach is overly complex for my taste. Perhaps the simplest way out of this is to simply have one process handle the clients request, a response is then by default the only response the client will receive. The problem with all of these solutions is that the system is no longer fault tolerant or robust and implementing a webservice call or something similar probably makes more sense.


Parallel processing.
The simplest implementation of the advanced messaging is one we're familiar with, parallel task distribution.
The producer sends messages to an exchange, a queue is bound to it and multiple consumers listen to the queue and process the messages. The amount of consumers listening to the queue can be varied, each consumer processes requests in parallel to the other and the system can make short work of the queue.

Introducing Expert Systems and Distributed Architecure


Expert Systems.
The enterprise is typically composed of expert systems that perform core business functions and perform them well.  A retail system consists of points of sale systems, e-commerce websites, inventory management systems, accounting systems, marketing systems, customer service systems, advertising systems and business analytics.  Each supports a core function, and the primary business process ties them all together to execute its main use case.

In addition to these business expert systems, there are a number of process oriented expert systems that take care of security, notifications, logging, search, exception management, monitoring, management etc.  

In a distributed environment, the most common way to interact with expert systems is using webservices, file transfers, shared databases, or including references to expert components in an application.  The problem with most of these methods is that they strongly couple the systems to each other.  The webservice has to be available when the application needs it and if it's not there, can cause cascading failures downstream.   These failures can lead to data loss or require the development of expensive, complex, redundant systems that can handle these failures and recover at later times.  Managing changes to expert components that are referenced by other applications is very difficult, often requiring versioning when interfaces change, rebuilding dependent applications to accommodate changes, or having system wide downtimes to upgrade just one expert system.  In an enterprise, this can get out of hand very quickly.
Ideally you want to loosely couple these expert systems.  In addition to using webservices or referencing expert components directly in an application, we can add advanced messaging to our arsenal and add some stability to the system. Messaging has been around for a very long time and has a wide and varied implementation base, but it's features are limited. Advanced messaging adds several important features that change the way we use messaging, it's basically messaging re-engineered from the bottom up.

One important feature of the advanced messaging protocol is that it applies the publisher/subscriber pattern to message queues.  In the typical queue architecture an application puts a message on the queue specifically for another application who picks it up and processes the work, responding perhaps on another queue.  In the exchange model, an application puts a message in an exchange and provides a routing topic (publishing).  An application can listen to an exchange using a specific subscription key  (subscribing).  The exchange acts as a broker and matches routing keys to subscription keys.  When a match is made the message is moved to as many subscribers as match the routing key.  This mechanism does one very important thing.  It completely decouples the publisher from the subscriber.  This has large implications for how the enterprise can be organized into expert systems.  
Distributed Systems
The diagram above shows an example ecommerce website taking an order and fulfilling it with integration points between accounting, marketing and inventory management systems.   
When you compare advanced messaging to standard message queues, the main difference is the addition of an exchange.  On the surface this may seem like a small and insignificant change, but that one addition fundamentally changes the way we can architect the enterprise.  Perhaps the most complex of software systems, rich graphic, ui driven applications are all event driven.  The event driven systems allows various components in the UI landscape to change as the user changes the way they use the application, creating a very rich satisfying experience.  By comparison, enterprise systems have typically been standalone applications that communicate with other systems in a very linear, stochastic way where the weakest link truly defines the stability of the entire system.  By adding exchanges to the message bus, we've introduced an event model to the enterprise.  This will allow us to create a simple network of loosely coupled expert systems that is more stable and easier to manage that what we have had to date.  The following entries will explain how to build such a system with advanced messaging at its core.