Implementation of message queue using Redis PUSH-POP mechanism

Readers can access this article on their personal website: Huanggwenwei - Interline: Using Redis PUSH-POP mechanism to achieve message queuing

background

In order to improve management efficiency, optimize work flow and team organization structure, more and more traditional industries have introduced the concept of SASS to standardize and upgrade their traditional enterprises with software in order to adapt to the transformation of commercial Internet more quickly and more strongly.
In April 17, the blogger was hired by a Hangzhou used-car SASS software start-up company in a remote collaborative way. The company's predecessor used JAVA to develop a trading market management system suitable for the parking lot. At the present stage, it maintains a set of SAS software for multi-end (PC and APP) vehicles from warehousing to sales tracking. At the end of August, the company cooperated with a subsidiary company of a large domestic automobile group. We integrated and improved the existing two systems to meet each other's needs. There is only one month from start to go online. The duration of the project is so short that the test of the team from skills to body and mind is relatively large.

Malformed system structure

The system structure of the project is: Trading Market Management System manages the vehicle data and daily operation management of all the car dealers in the parking lot (JAVA implemented system). The APP side has the car dealer version and the trading market version. The roles used are the car dealer and the car dealer administrator respectively. A Rails set up a Web service for car visitors to provide API to APP.
Considering the project schedule and system architecture, the transaction market management system and the back-end of passengers and cars use the same database. The corresponding database tables between the two systems are as follows:

Exchange Market (ERP) chelaike Remarks
trade cars vehicle
market companies company
agency shop Car dealer
staff user user

The business logic of the trading market is just to modify its own data sheet, as is the case with incoming passengers. However, the tables corresponding to the two systems need to synchronize data, such as saving the information draft of the vehicle on the mobile phone of the car visitors, seeing the information of the newly entered vehicle in the vehicle list of the trading market, generating a record in the car visitors table, and the synchronization program will produce a record consistent with the new record information in the trade table.
In addition to synchronizing data, message queue also acts as a system call between the trading market and passengers. For example, in the operation and management of the trading market, new system announcements need to call the function of pushing messages from passengers to APP.

MQ Open Source Framework Selection

There are many open source software for message queuing. RabbitMQ has been used as a simple messaging service between Bitcoin-core and Web services (callback messages confirmed in the Bitcoin hot wallet are sent to Rails applications), so it is one of the open source software to be selected. Then I wonder how my former colleagues will choose to implement it, and they mention the pub/su that Redis zi comes with. B mechanism, publish-subscribe mode, so add the option to be selected; the last scheme is proposed by Party A's CTO. He suggests using Redis's PUSH/POP mechanism, using list data structure, the mode is producer lpush message, consumer brpop message. After comprehensive consideration, the last way is chosen. Knowingly, a question and answer explains the pros and cons of using Redis as a message queue very well. How does redis do message queues?

Since Redis PUSH-POP mechanism sh implementation is chosen, it is not difficult for producers and consumers to implement push and pop of control columns. In order not to repeat wheel building, I went to Github to search for the encapsulated Gem. I found a Gem called chasqui:

Chasqui adds persistent publish-subscribe (pub-sub) messaging capabilities to Sidekiq and Resque workers.

When reading the source code, we found that it also realized a simple message queue for sidekiq and Resque worker through the PUSH-POP mechanism of Redis, which was very consistent with the requirements of the project, so we chose it.

Integrating chasqui into Rails project

To configure

Sidekiq already exists in the project as background job processing, just add Sidekiq worker processing message queue to config/initializers/sidekiq.rb file.

redis_config = if Rails.env.production?
                 { url: ENV["REDIS_URL"], password: ENV["REDIS_PWD"] }
               else
                 { url: ENV["REDIS_URL"] }
               end

Sidekiq.configure_server do |config|
  config.average_scheduled_poll_interval = 2
  config.redis = redis_config
end

Sidekiq.configure_client do |config|
  config.redis = redis_config
end

# Bind the worker that processes the message queue to the corresponding channel
# As a consumer, worker listens for messages from the Chasqui daemon
Chasqui.subscribe do
  on 'company_sync', DataSync::CompanyWorker
  on 'shop_sync', DataSync::ShopWorker
  on 'user_sync', DataSync::UserWorker
  on 'car_sync', DataSync::CarWorker
  on 'announcement_sync', DataSync::AnnouncementWorker
  on 'pc_token_sync', DataSync::PcTokenWorker
end

The first parameter of on here is the channel name, and I have a channel processing for each type of data; the second parameter is the ordinary idekiq worker, which specifies that the worker processes messages sent by the corresponding channel.

Finally, you need to configure the connection redis settings for chasqui

# config/initializers/chasqui.rb
Chasqui.configure do |c|
  c.redis = if Rails.env.production?
              Redis.new(url: ENV["REDIS_URL"], password: ENV["REDIS_PWD"])
            else
              Redis.new(url: ENV["REDIS_URL"])
            end
end

Subscriber and Business Logic Implementation

Next we will only explain the process of data synchronization in the trading market. When creating a new market record in ERP, we need to send messages to the queue, the data synchronization program needs to create a new company record for passengers and a record for the administrator user of the transaction market; the administrator user has to generate two records in the staff table of ERP and the user table of passengers. The staffs table records are logged in ERP and used in the transaction of JAVA. Easy Market System) users table records are used in the APP login of car visitors.
Take a look at CompanyWorker's code:

class DataSync::CompanyWorker
  include Sidekiq::Worker
  sidekiq_options retry: 1, queue: :data_sync

  # {"channel"=>"company_sync", "syn_source_id"=>"1", "to"=>"chelaike"}
  def perform(body, *args)
    to = body.try(:fetch, "to")
    syn_source_id = body.try(:fetch, "syn_source_id")

    Rails.logger.info ">>>>> company_sync json #{body}"

    syn_source = (to == "chelaike") ? (Erp::Market.find(syn_source_id)) : (Company.find(syn_source_id))
    DataSynService::Company.new(syn_source).execute
  end
end

data_sync queue is universally used by synchronous data worker, and this type of queue is deployed on only one application server. I standardized the content format of json for message queues, such as

{
  "channel"=>"company_sync",
  "syn_source_id"=>"1",
  "to"=>"chelaike"
}

Channel is the type of message, syn_source_id is the database record ID that needs synchronization, and to is which system needs synchronization data; for example, when a new trading market is built in ERP, channel is company_sync, syn_source_id is the new record ID in the Market table, and the value of to is chelaike to indicate the need for synchronization data in the passenger and bus system.
Then, the format of push data to Redis in JAVA system should be:

redis.lpush 'chasqui:chasqui-inbox',
            {
              "channel": "company_sync",
              "syn_source_id": "1",
              "to": "chelaike"
            }.to_json

The business logic of the synchronization program is abstracted and placed in the service directory. Let's take a look at service/data_sync/company.rb.

module DataSynsService
  class Company < Base
    def execute
      syn_obj
    end

    private
    # Records requiring synchronization
    # For example: @syn_source is a Compact record of chelaike, and @obj is a Market record of erp
    def syn_obj
      ActiveRecord::Base.transaction do
        case @syn_source.class.to_s
        when "Company"
          Rails.logger.info "Company sync >>>> chelaike to erp"
          obj = @syn_source.erp_market
          obj.blank? ? (::Erp::Market.create(erp_params)) : (obj.update_attributes(erp_params))
          users = @syn_source.users  # User data synchronization
          Rails.logger.info "New Market Success Corresponding to Company: #{obj.try(:market_name)}"
        when "Erp::Market"
          Rails.logger.info "Company sync >>>> erp to chelaike"
          obj = @syn_source.chelaike_company
          obj.blank? ? (obj = ::Company.create(chelaike_params)) : (obj.update_attributes(chelaike_params))
          obj.update_attributes(erp_market_id: @syn_source.id) if obj.present?
          staffs = @syn_source.staffs # User data synchronization
          Rails.logger.info "Success in building new market-oriented companies: #{obj.try(:name)}"
        end
        DataSynService::SyncWithUser.execute(@syn_source) unless (staffs.present? || users.present?)
      end
    end

    def chelaike_params
      {
        name: @syn_source.market_name,
        contact: @syn_source.market_linkman_name,
        contact_mobile: @syn_source.market_linkman_mobile,
        company_state: @syn_source.market_status,
        erp_market_id: @syn_source.id
      }
    end

    def erp_params
      {
        market_name: @syn_source.name,
        market_linkman_name: @syn_source.contact,
        market_linkman_mobile: @syn_source.contact_mobile,
        market_status: @syn_source.company_state
      }
    end
  end
end

In fact, it is also very simple, that is, between the two tables, the target system replicates the data from the source data and matches the corresponding fields according to the needs of synchronization.
Above is the data synchronization between market and company between the two systems of the trading market and the back-end of passengers. The data synchronization of other worker and service are similar, so we can not enumerate them one by one.

chasqui dispatch

In the chasqui document, dispatch is described as follows:

The broker is a ruby daemon that listens for events (messages) published to channels (topics) and forwards those events to registered subscribers. In order to work, your broker must use the same Redis database as your Sidekiq/Resque workers.

This means opening a daemon to listen for events that producers send messages to channels and forwarding messages to the channel-bound sidekiq worker, and it is important to note that ** consumers and producers must connect to the same DB as the same Redis.
Through the command line (chasqui-r) redis://localhost:6379/0 ) Start chasqui broker, but I found that if redis set a password with special characters, it can't recognize the instructions through bash, so I put the startup broker in Rails bin directory to start. The startup script is as follows:

#!/usr/bin/env ruby

require 'chasqui'
require 'redis'

redis_url = "redis://redis_url:port"
redis_pwd = "pwd"

Chasqui.configure do |c|
  c.redis = Redis.new(url: redis_url, password: redis_pwd)
end

Chasqui::Broker.start

This is chasqui using Redis PUSH-POP to achieve a simple MQ of all, and put into production environment. At the beginning, the choice of data synchronization through MQ is also a helpless move for the project to quickly go online, which should be modified or discarded in the future.

Pit of deployment

Use of back-end services for passengers capistrano Deployment, as mentioned above, the chasqui Broker startup script is in the bin directory, so the business will automatically deploy the startup broker script in cap. The idea is simple: cd goes to the root directory of the project and executes a Ruby executable through nohup. But I encountered the problem that nohup quit before ssh was executed. Specifically, I wrote it on Ruby-China How does capistrano execute executable files in Rails project bin directory?
The correct script fragment in deploy.rb is as follows:

after :restart, :clear_cache do
  on roles(:app), in: :groups, limit: 3, wait: 10 do
  end

  on roles(:chasqui) do
    execute "for i in $( ps ax | awk '/chasqui_start/ {print $1}' ); do kill ${i}; done"
    execute "bash -l -c 'cd #{deploy_to}/current && (nohup rvm use 2.2.4 do ruby bin/chasqui_start 2>&1 &) && sleep 2 && ps -ef | grep chasqui_start' "
  end
end

At the same time, we need to put the role of chasqui into config/deploy/production.rb:

server "server_a_ip", user: "deploy", roles: %w{app db web lina migration etl car_publisher data_sync chasqui}
server "server_b_ip", user: "deploy", roles: %w{app db web lina migration etl car_publisher}

Write at the end about entrepreneurship

Starting a business is to find fault for oneself and use this cooperation with Party A to exchange more non-technical resources through technology. At the same time, I hope that the team can concentrate on the development for a month without donating money.

Keywords: Redis Ruby Java Database

Added by fingerprn on Tue, 21 May 2019 01:51:59 +0300