The Backstory

I am busy building a SaaS product that has to reliably send large numbers of emails. At ZappiStore we use the Ruby on Rails stack extensively. I think Rails is an excellent choice for building web applications. Rails makes you super productive and the community support is excellent. Naturally I want to use Rails for the web application layer of the system. There are a couple of different options built into Rails for sending emails.

I don’t want a user to have to wait one or two seconds for an SMTP request to finish before they get a response, so sending will have to be done asynchronously. The ActionMailer API in Rails is built for sending emails but ideally I want to pass off the load of sending mails to something that is good at making large amounts of requests and handling them asynchronously. There is support for asynchronous requests in Ruby but it is not as geared towards it as JavaScript is. Also, since Ruby is single-threaded with a global interpreter lock, I want it to do as little processing as possible and rather play to its strengths.

Node.js is built from the ground up with asynchronous operations in mind. Functions are first-class citizens in JavaScript and this makes programming with callbacks easy. Therefore it is a good choice for the creation of a microservice that sends emails. I am using the Mailgun API in order to send emails using POST requests.

Since the web application layer is written in Ruby and I have a microservice written in Node.js, these need to pass data between one-another in some way. The communication solution should have the following properties:

  • Robust. If a service goes down, the message should be persisted in the queue until it is handled.
  • Decoupled. The individual microservices should ideally not have any direct knowledge of each other. Each service should communicate via the message broker.

Enter RabbitMQ. Rabbit has been around for a long time and has proven to be a high-performance and reliable message queue. The idea is to use Ruby to push a message onto the queue and then pop the message off of the queue in the Node.js microservice.

Getting Started with RabbitMQ

For those of us that use OS X, installing RabbitMQ is easiest done with Homebrew:

brew install rabbitmq

Once RabbitMQ is installed, start it up as a service with the following command:

brew services start rabbitmq

The Ruby Layer

In order to connect to RabbitMQ from Ruby we are going to use the bunny gem. Add it to your Gemfile:

gem 'bunny', '~> 2.3.1'

Then run a bundle install.

The bunny gem gives us a lot of great functionality out of the box like robust connection handling. If the connection to RabbitMQ drops, bunny will attempt to reconnect until the connection is restored. The bunny gem also gives us an easy API for leveraging RabbitMQ from within our application.

The connection to RabbitMQ should be established at application startup and remain open for the duration of the application’s lifetime. To this end we create a MessagingService singleton class:

# The messaging service class implements the Singleton pattern
class MessagingService

  # Class variable holding the instance of the class
  @@instance = nil

  def initialize
    # Set the connection object to an instance variable on the object
    @connection = Bunny.new(:host => Settings.rabbitmq.host, :port => Settings.rabbitmq.port)
    @connection.start
  end

  def self.instance
    @@instance ||= MessagingService.new
  end

  def connection
    @connection
  end

end

# Acquire the connection
MessagingService.instance

By using the Singleton pattern for the MessageService class we can ensure that the connection is not closed resulting in us having to reconnect every time we want to send an email. The EmailQueueService class uses the connection provided by the MessageService class to create a queue for our email pipeline:

require_relative 'messaging_service'

# Email queue service also implements the Singleton pattern
class EmailQueueService

  @@instance = nil

  def initialize
    # We can get the connection from the MassagingService
    @connection = MessagingService.instance.connection
    # Create a channel. Since the application is single-threaded, all communication
    # will flow through this single channel
    @channel = @connection.create_channel
    # Create a queue with a name that was defined in the settings.yml file
    @queue = @channel.queue(Settings.queues.email, :durable => true, :auto_delete => false)
  end

  def self.instance
    @@instance ||= EmailQueueService.new
  end

  # Method for adding a message object to the queue
  # The message object is serialized to a JSON string
  def add(msg_object)
    @queue.publish(Oj.dump(msg_object), :persistent => true)
  end

end

# Initialze the object
EmailQueueService.instance

The AccountActivationService class is specific to sending account activation emails when new users sign up:

require_relative 'email_queue_service'

# Service specific to sending account activation emails
class AccountActivationService

  @@instance = nil

  def initialize
    # Load the email template into memory
    @text_template = File.open('app/views/email/account_activation_email.text.erb').read
  end

  def self.instance
    @@instance ||= AccountActivationService.new
  end

  def send_email(user)
    # Set template variables
    @user = user
    @settings = Settings

    msg_object = {}
    msg_object['to'] = create_email_address_string(first_name: user.first_name, last_name: user.last_name, email: user.email)
    msg_object['from'] = create_email_address_string(first_name: Settings.app.name, email: Settings.app.emails.activation)
    msg_object['subject'] = "Account Activation"
    # Pass the object binding to the template so that it can access the instance variables
    msg_object['text'] = ERB.new(@text_template).result(binding)
    # Add the message object to the email queue
    EmailQueueService.instance.add(msg_object)
  end

  # Create an email address string of this form: "Timmy Droptables <[email protected]>"
  def create_email_address_string(first_name: nil, last_name: nil, email: nil)
    [first_name, last_name, "<#{email}>"].compact.join(" ")
  end

end

# Initialize the object
AccountActivationService.instance

The Node Layer

The package.json for the project is as follows:

{
  "name": "email_worker",
  "version": "1.0.0",
  "description": "Email sender microservice for the Mailgun API.",
  "main": "main.js",
  "dependencies": {
    "amqplib": "^0.4.2",
    "lodash": "^4.13.1",
    "mime": "^1.3.4",
    "request": "^2.72.0",
    "winston": "^2.2.0",
    "yamljs": "^0.2.7"
  },
  "devDependencies": {
    "chai": "^3.5.0",
    "mocha": "^2.5.3",
    "mock-require": "^1.3.0"
  },
  "scripts": {
    "start": "node main.js",
    "test": "node node_modules/.bin/mocha"
  },
  "author": "Sebastian Coetzee <[email protected]>"
}

The amqplib library is used to communicate with RabbitMQ, lodash as a helper library, mime to get the mime types of the email attachments, request for sending the HTTP requests, winston for logging and yamljs to parse the config.yml file containing settings.

The bulk of the work is done in the worker.js file:

var config = require('./config');
var logger = require('./logger');
var mime = require('mime');
var request = require('request');
var _ = require('lodash');
var fs = require('fs');

// Holds the number of active HTTP connections to the Mailgun API.
// I am limiting the max number of open HTTP connections so that
// the worker doesn't get overloaded and so that the work gets spread
// out over multiple workers.
var conns = 0;

module.exports = {

  // Gets a new message from RabbitMQ if one is available
  getMessage: function(conn, channel){
    if (conns >= config.max_http_connections){ return; }
    var that = this;
    channel.get(config.queue, {}, function(err, msg) {
        if (msg) {
          logger.info("Message picked up.", _.omit(msg, ['content']));
          that.send(conn, channel, msg);

          // Add to the count of active connections
          conns++;
          // Only get a new message from RabbitMQ if there are connection slots available
          if (conns < config.max_http_connections){
            // Recursive call
            that.getMessage(conn, channel);
          }
        }
      }
    );
  },

  // Accessor method for the number of active HTTP connections
  getConnections: function(){
    return conns;
  },

  // The request library expects a form data object in this format
  buildFormData: function(msg_obj){
    var formData = _.pick(msg_obj, ['from', 'to', 'cc', 'bcc', 'subject', 'text', 'html']);
    if (msg_obj['file_path']){
      formData['attachment'] = {
        value: fs.createReadStream(msg_obj['file_path']),
        options: {
          filename: msg_obj['file_name'],
          contentType: mime.lookup(msg_obj['file_name'])
        }
      };
    }

    return formData;
  },

  // Send the email using the Mailgun API
  send: function(conn, channel, msg){
    var msg_obj = JSON.parse(msg.content.toString());
    var formData = this.buildFormData(msg_obj);

    var auth = {
      'user': 'api',
      'pass': config.api_key
    }

    request.post(
      {
        url: config.api_base_url + "/messages",
        formData: formData,
        auth: auth
      },
      function(err, response, body){
        // Return a possible connection to the pool
        conns--;
        if (err) {
          logger.error("An error occured while posting to the Mailgun API: ", err);

          // Acknowledge that the handling of the message failed.
          return channel.nack(msg, false, false);
        }
        logger.info("Successfully sent mail.", _.omit(msg, ['content']));

        // Acknowledge that the handling of the message succeeded.
        channel.ack(msg);
      }
    )
  }

};

The worker is the started using the main.js file:

var config = require('./config');
var logger = require('./logger');
var worker = require('./worker');

require('amqplib/callback_api')
  .connect(
    'amqp://' + config.rabbitmq_host + ':' + config.rabbitmq_port,
    function(err, conn) {
      if (err != null){
        logger.error("Could not create connection.", err);
        process.exit(1);
      }
      conn.createChannel(
        function(err, channel) {
          if (err != null){
            logger.error("Could not create channel.", err);
            process.exit(1);
          }
          channel.assertQueue(config.queue);
          setInterval(
            function(){
              logger.info("Poll. HTTP connections: " + worker.getConnections(), null);
              worker.getMessage(conn, channel);
            },
            config.polling_rate
          );
        }
      );
    }
  );

Putting it all together

In order to send email accross the queue, the following code is executed from the Rails UsersController:

class UsersController < ApplicationController

  def create
    return unless validate_required_params(:email, :password, :password_confirmation)
    user = User.new(params.permit(:email, :password, :password_confirmation))
    if user.save

      # Send the user the activation email
      AccountActivationService.instance.send_email(user)

      render_json(200, StatusMessage::SUCCESS, nil, "User successfully registered. Please check your email for activation instructions.")
    else
      render_json(401, StatusMessage::ERROR, nil, "User registration failed.")
    end
  end

end

Summary

Node.js provides first-class support for asynchronous execution of HTTP requests. This makes it a great technology to use for sending emails via the Mailgun API. Ruby on Rails is great for building web applications. We want to leverage the best features of these two technologies together and we implemented RabbitMQ to enable robust communication between the Ruby and the JavaScript layers.