Rabbit MQ, AMPQ, Publishing Messages to the Q

Recently we set up a Rabbit MQ server, I wanted to push and receive message from our Rails apps. After a bit of research I decided to try out AMQP gem:

https://github.com/ruby-amqp/amqp

My biggest concern was using Event Machine and creating blocking threads, none of the examples I found were usable in a production environment, then we stumbled upon, EventMachine-with-Rails which shows how they start up Event machine with their app.

Below shows our initializer, AMQP gem requires that Event Machine is running in order to function:

Raw Code
config/initializers/eventmachine.rb

module AdmitEventMachine  
  def self.start 
    if defined?(PhusionPassenger)
      PhusionPassenger.on_event(:starting_worker_process) do |forked|
        if forked && EM.reactor_running?
          EM.stop
        end
        Thread.new { 
          EM.run do
            puts "EM up!"
          end 
        }
        die_gracefully_on_signal
      end
    else #Development 
      Thread.abort_on_exception = true
        Thread.new { 
          EM.run do
            if EM.reactor_running?  
              puts "EM up!"   
            end                                                    
          end 
        } unless defined?(Thin)  
    end
  end

  def self.die_gracefully_on_signal
    Signal.trap("INT")  { EM.stop }
    Signal.trap("TERM") { EM.stop }
  end
end

AdmitEventMachine.start

Now when I fire up rails console or server EventMachine will start, the else will be hit when running in dev environment. I played around with adding all sorts of events to this, catching all types of errors and the file ended up quite large, only did this as research and then stripped the module back to the basics.

If your app will publish messages frequently then it may be appropriate to establish a connection to the RabbitMQ server:

require 'amqp'
RMQ_CONFIG = YAML.load_file("#{RAILS_ROOT}/config/rabbit.yml")[RAILS_ENV]  

module AdmitEventMachine  
  def self.start 
    if defined?(PhusionPassenger)
      PhusionPassenger.on_event(:starting_worker_process) do |forked|
        if forked && EM.reactor_running?
          EM.stop
        end
        Thread.new { 
          EM.run do
            config_connect_open_channel
          end 
        }
        die_gracefully_on_signal
      end
    else #Development 
      Thread.abort_on_exception = true
        Thread.new { 
          EM.run do
            if EM.reactor_running?  
              config_connect_open_channel   
            end                                                    
          end 
        } unless defined?(Thin)  
    end
  end

  def self.config_connect_open_channel
    AMQP.channel ||= AMQP::Channel.new(AMQP.connect(self.conf))         
  end  

  def self.conf                    
    connection_settings = {:host     => RMQ_CONFIG['server'], 
                           :port     => RMQ_CONFIG['port'],
                           :vhost    => RMQ_CONFIG['vhost'],
                           :user     => RMQ_CONFIG['user'],
                           :password => RMQ_CONFIG['pass'],
                           :timeout  => 0.3}                                                         
  end             

  def self.die_gracefully_on_signal
    Signal.trap("INT")  { EM.stop }
    Signal.trap("TERM") { EM.stop }
  end
end

AdmitEventMachine.start

YAML FILE

production:
server: “yourserver.rabbit.com”
vhost: “/”
port: “5672”
user: “guest”
pass: “guest”

Next I wrote a simple class that can be called to publish messages, you call the class with the name of the queue you want to publish to and the json string to put in the Payload (body) of the message. I have placed comments where you would want to re-queue your message or handle errors.

EventMachine.next_tick - this is the line of code which will push your message 
                         asynchronously (Non Blocking)

lib/rabbitq/client.rb

require "rubygems"
require "amqp"  

RMQ_CONFIG = YAML.load_file("#{RAILS_ROOT}/config/rabbit.yml")[RAILS_ENV] 

module Rabbitq   
  class Client
    attr_accessor :thread

    def self.publish json_string, queue_name
      new.publish json_string, queue_name 
    end

    def publish json_string, queue_name 
      begin
        if EM.reactor_running?  
          connection_settings = {:host => RMQ_CONFIG['server'],
                                 :port => RMQ_CONFIG['port'],
                                 :vhost => RMQ_CONFIG['vhost'],
                                 :user => RMQ_CONFIG['user'],
                                 :password => RMQ_CONFIG['pass'],
                                 :timeout  => 0.3}
          AMQP.channel ||= AMQP::Channel.new(AMQP.connect(connection_settings))  
          EventMachine.next_tick { AMQP.channel.queue(queue_name).publish(json_string)}  
          return true  
        else
         # Event Machine is not running! 
         # requeue?
         return false    
        end       
      rescue Exception => exc   
        puts exc       
        # Store error / Email
        # Requeue sending e.g Delayed Job
        # Error Example: EventMachine::ConnectionError: unable to resolve server address
      end                
    end            

  end   
end 

# Call Method
# require 'rabbitq/client' 
# Rabbitq::Client::publish "test", "testing.qq" 

# New Instance of Class
# x = Rabbitq::Client.new
# x.publish "test", "testing.qq"

 

Leave a Reply

Your email address will not be published. Required fields are marked *