Introduction to 1.1 Disruptor Concurrency Framework
Martin Fowler wrote an article on his website about the LMAX architecture, in which he described LMAX as a new retail financial trading platform that can generate a large number of transactions with very low latency.This system is built on the JVM platform and its core is a business logic processor that can process 6 million orders per second in one thread.Business logic processors run entirely in memory, using an event source-driven approach.The core of the business logic processor is Disruptor.
Disruptor is an open source concurrency framework, and won the 2011 Duke's Program Framework Innovation Award, enabling the Queue concurrent operation of the network without locks.
Disruptor is a high-performance asynchronous processing framework, which can be considered the fastest messaging framework (lightweight JMS), an implementation of the observer mode, or an implementation of the event listening mode.
1.2 Disruptor Concurrency Framework uses
At present, we have updated the disruptor to version 3.x, which provides better performance and more ways to use the API than the previous version 2.x.
Download disruptor-3.3.2.jar to introduce our project and start a disruptor journey.
Before using it, explain the main features of disruptor, which you can understand as an efficient producer-consumer model.It also has much higher performance than traditional BlockingQueue containers.
Official Learning Website: http://ifeve.com/disruptor-get-start/
In Disruptor, we want to implement hello world in the following steps:
First: Create an Event class//a specific data
Second: Create a factory Event class for creating Event class instance objects//data type specification
Third, there needs to be a listening event class for processing data (Event class)
Fourth, we need to write test code.Instantiate a Disruptor instance and configure a series of parameters.We then bind the listen event class to the Disruptor instance to accept and process the data.
Fifth: In Disruptor, the core of the real data storage is called RingBuffer. We get it through the Disruptor instance, then we produce the data and add it to the RingBuffer instance object.
Let's take a look at this HelloWorld program: com.bjsxt.base
HelloWorld1:
package bhz.base; //http://ifeve.com/disruptor-getting-started/ public class LongEvent { private long value; public long getValue() { return value; } public void setValue(long value) { this.value = value; } }
package bhz.base; import com.lmax.disruptor.EventFactory; // We need disruptor to create events for us, and we also declare an EventFactory to instantiate Event objects. public class LongEventFactory implements EventFactory { @Override public Object newInstance() { return new LongEvent(); } }
package bhz.base; import com.lmax.disruptor.EventHandler; //We also need an event consumer, an event handler.This event handler simply prints the data stored in the event to the terminal: public class LongEventHandler implements EventHandler<LongEvent> { @Override public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception { System.out.println(longEvent.getValue()); } }
package bhz.base; import java.nio.ByteBuffer; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; public class LongEventMain { public static void main(String[] args) throws Exception { //Create Buffer Pool ExecutorService executor = Executors.newCachedThreadPool(); //Create a factory LongEventFactory factory = new LongEventFactory(); //To create a bufferSize, which is the RingBuffer size, it must be 2nd power int ringBufferSize = 1024 * 1024; // /** //BlockingWaitStrategy Is the least efficient strategy, but it consumes the least CPU and provides more consistent performance across different deployment environments WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy(); //SleepingWaitStrategy Performance is similar to BlockingWaitStrategy and CPU consumption, but it has the least impact on producer threads and is suitable for scenarios similar to asynchronous logging WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy(); //YieldingWaitStrategy Performance is optimal for systems with low latency.This strategy is recommended in scenarios where extremely high performance is required and the number of event processing lines is less than the logical core of the CPU; for example, the CPU turns on the hyperthreading feature WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy(); */ //Create disruptor Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, ringBufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy()); // Connect Consumption Event Method disruptor.handleEventsWith(new LongEventHandler()); // start-up disruptor.start(); //Disruptor's event publishing process is a two-phase commit process: //Publish Events RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); LongEventProducer producer = new LongEventProducer(ringBuffer); //LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer); ByteBuffer byteBuffer = ByteBuffer.allocate(8); for(long l = 0; l<100; l++){ byteBuffer.putLong(0, l); producer.onData(byteBuffer); //Thread.sleep(1000); } disruptor.shutdown();//Close the disruptor and the method will be blocked until all events have been dealt with; executor.shutdown();//Close the thread pool used by disruptor; if necessary, it must be closed manually, and disruptor will not shut down automatically when shutdown occurs; } }
package bhz.base; import java.nio.ByteBuffer; import com.lmax.disruptor.RingBuffer; /** * It is clear that more detail is involved when publishing events in a simple queue because event objects also need to be pre-created. * Publishing an event requires at least two steps: getting the next event slot and publishing the event (use try/finnally when publishing the event to ensure that it will be published). * If we use RingBuffer.next() to get an event slot, we must publish the corresponding event. * If events cannot be published, then the Disruptor state can be disrupted. * Especially with multiple event producers, event consumers will stall and have to restart the application to recover. */ public class LongEventProducer { private final RingBuffer<LongEvent> ringBuffer; public LongEventProducer(RingBuffer<LongEvent> ringBuffer){ this.ringBuffer = ringBuffer; } /** * onData Used to publish events, one event per call * Its parameters are passed to consumers using events */ public void onData(ByteBuffer bb){ //1. You can think of ringBuffer as an event queue, so next is the next event slot long sequence = ringBuffer.next(); try { //2. Use the index above to fetch an empty event to populate (get the event object corresponding to the ordinal) LongEvent event = ringBuffer.get(sequence); //3. Get business data to be delivered through events event.setValue(bb.getLong(0)); } finally { //4. Publish Events //Note that the final ringBuffer.publish method must be included in finally to ensure it must be called; if a request's sequence is not committed, subsequent publishing operations or other producer s will be blocked. ringBuffer.publish(sequence); } } }
package bhz.base; import java.nio.ByteBuffer; import com.lmax.disruptor.EventTranslatorOneArg; import com.lmax.disruptor.RingBuffer; /** * Disruptor 3.0 A lambda-style API is provided.This puts some complicated operations in Ring Buffer. * Event Publisher or Event Translator is the best way to publish events after Disruptor 3.0 * <B>System name: </B><BR> * <B>Module name: </B><BR> * <B>Chinese Class Name: </B><BR> * <B>Summary description: </B><BR> * @author Alenware, Beijing * @since 2015 November 23, 2001 */ public class LongEventProducerWithTranslator { //A translator can be seen as an event initializer, which is called by the publicEvent method //Fill Event s private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() { @Override public void translateTo(LongEvent event, long sequeue, ByteBuffer buffer) { event.setValue(buffer.getLong(0)); } }; private final RingBuffer<LongEvent> ringBuffer; public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(ByteBuffer buffer){ ringBuffer.publishEvent(TRANSLATOR, buffer); } }
HelloWorld2:
package bhz.multi; import java.util.concurrent.atomic.AtomicInteger; import com.lmax.disruptor.WorkHandler; public class Consumer implements WorkHandler<Order>{ private String consumerId; private static AtomicInteger count = new AtomicInteger(0); public Consumer(String consumerId){ this.consumerId = consumerId; } @Override public void onEvent(Order order) throws Exception { System.out.println("Current Consumer: " + this.consumerId + ",Consumption information:" + order.getId()); count.incrementAndGet(); } public int getCount(){ return count.get(); } }
package bhz.multi; import java.nio.ByteBuffer; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.ExceptionHandler; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.SequenceBarrier; import com.lmax.disruptor.WorkHandler; import com.lmax.disruptor.WorkerPool; import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.dsl.ProducerType; public class Main { public static void main(String[] args) throws Exception { //Create ringBuffer RingBuffer<Order> ringBuffer = RingBuffer.create(ProducerType.MULTI, new EventFactory<Order>() { @Override public Order newInstance() { return new Order(); } }, 1024 * 1024, new YieldingWaitStrategy()); SequenceBarrier barriers = ringBuffer.newBarrier(); Consumer[] consumers = new Consumer[3]; for(int i = 0; i < consumers.length; i++){ consumers[i] = new Consumer("c" + i); } WorkerPool<Order> workerPool = new WorkerPool<Order>(ringBuffer, barriers, new IntEventExceptionHandler(), consumers); ringBuffer.addGatingSequences(workerPool.getWorkerSequences()); workerPool.start(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())); final CountDownLatch latch = new CountDownLatch(1); for (int i = 0; i < 100; i++) { final Producer p = new Producer(ringBuffer); new Thread(new Runnable() { @Override public void run() { try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } for(int j = 0; j < 100; j ++){ p.onData(UUID.randomUUID().toString()); } } }).start(); } Thread.sleep(2000); System.out.println("---------------Start production-----------------"); latch.countDown(); Thread.sleep(5000); System.out.println("Total:" + consumers[0].getCount() ); } static class IntEventExceptionHandler implements ExceptionHandler { public void handleEventException(Throwable ex, long sequence, Object event) {} public void handleOnStartException(Throwable ex) {} public void handleOnShutdownException(Throwable ex) {} } }
package bhz.multi; public class Order { private String id;//ID private String name; private double price;//Amount of money public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } }
package bhz.multi; import java.nio.ByteBuffer; import java.util.UUID; import bhz.base.LongEvent; import com.lmax.disruptor.EventTranslatorOneArg; import com.lmax.disruptor.RingBuffer; /** * <B>System name: </B><BR> * <B>Module name: </B><BR> * <B>Chinese Class Name: </B><BR> * <B>Summary description: </B><BR> * @author Alenware, Beijing * @since 2015 November 23, 2001 */ public class Producer { private final RingBuffer<Order> ringBuffer; public Producer(RingBuffer<Order> ringBuffer){ this.ringBuffer = ringBuffer; } /** * onData Used to publish events, one event per call * Its parameters are passed to consumers using events */ public void onData(String data){ //You can think of ringBuffer as an event queue, so next is the next event slot long sequence = ringBuffer.next(); try { //Use the index above to fetch an empty event to populate (get the event object corresponding to the ordinal) Order order = ringBuffer.get(sequence); //Get business data to be passed through events order.setId(data); } finally { //Publish Events //Note that the final ringBuffer.publish method must be included in finally to ensure it must be called; if a request's sequence is not committed, subsequent publishing operations or other producer s will be blocked. ringBuffer.publish(sequence); } } }
helloWorld3:
package bhz.generate1; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import com.lmax.disruptor.BatchEventProcessor; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.EventProcessor; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.SequenceBarrier; import com.lmax.disruptor.YieldingWaitStrategy; public class Main1 { public static void main(String[] args) throws Exception { int BUFFER_SIZE=1024; int THREAD_NUMBERS=4; /* * createSingleProducer Create a single producer RingBuffer, * The first parameter, EventFactory, is known by name as Event Factory, but its function is to generate data to fill RingBuffer's blocks. * The second parameter is the size of RingBuffer, which must be an exponential multiple of 2 for the purpose of converting modulo operations to & operations to improve efficiency * The third parameter is that RingBuffer's production is waiting strategy when there are no available blocks (possibly consumers (or event processors) are too slow) */ final RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<Trade>() { @Override public Trade newInstance() { return new Trade(); } }, BUFFER_SIZE, new YieldingWaitStrategy()); //Create Thread Pool ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS); //Create SequenceBarrier SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); //Create Message Processor BatchEventProcessor<Trade> transProcessor = new BatchEventProcessor<Trade>( ringBuffer, sequenceBarrier, new TradeHandler()); //The goal of this step is to inject consumer location information references into the producer that can be omitted if there is only one consumer ringBuffer.addGatingSequences(transProcessor.getSequence()); //Submit message processor to thread pool executors.submit(transProcessor); //If there are multiple consumers, repeat the above three lines of code to replace TradeHandler with another consumer class Future<?> future= executors.submit(new Callable<Void>() { @Override public Void call() throws Exception { long seq; for(int i=0;i<10;i++){ seq = ringBuffer.next();//Hold a hole--ringBuffer is a available block ringBuffer.get(seq).setPrice(Math.random()*9999);//Put data into this block ringBuffer.publish(seq);//Publish data for this block to make the handler(consumer) visible } return null; } }); future.get();//Waiting for producer to finish Thread.sleep(1000);//Wait a second, wait until consumption is processed transProcessor.halt();//Notification Event (or Message) Processor can end (not immediately!!!) executors.shutdown();//Terminate Thread } }
package bhz.generate1; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.IgnoreExceptionHandler; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.SequenceBarrier; import com.lmax.disruptor.WorkHandler; import com.lmax.disruptor.WorkerPool; public class Main2 { public static void main(String[] args) throws InterruptedException { int BUFFER_SIZE=1024; int THREAD_NUMBERS=4; EventFactory<Trade> eventFactory = new EventFactory<Trade>() { public Trade newInstance() { return new Trade(); } }; RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(eventFactory, BUFFER_SIZE); SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUMBERS); WorkHandler<Trade> handler = new TradeHandler(); WorkerPool<Trade> workerPool = new WorkerPool<Trade>(ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(), handler); workerPool.start(executor); //Here are 8 data for this production for(int i=0;i<8;i++){ long seq=ringBuffer.next(); ringBuffer.get(seq).setPrice(Math.random()*9999); ringBuffer.publish(seq); } Thread.sleep(1000); workerPool.halt(); executor.shutdown(); } }
package bhz.generate1; import java.util.concurrent.atomic.AtomicInteger; public class Trade { private String id;//ID private String name; private double price;//Amount of money private AtomicInteger count = new AtomicInteger(0); public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } public AtomicInteger getCount() { return count; } public void setCount(AtomicInteger count) { this.count = count; } }
package bhz.generate1; import java.util.UUID; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.WorkHandler; public class TradeHandler implements EventHandler<Trade>, WorkHandler<Trade> { @Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { this.onEvent(event); } @Override public void onEvent(Trade event) throws Exception { //Here's the specific consumption logic event.setId(UUID.randomUUID().toString());//Simple Generate Lower ID System.out.println(event.getId()); } }
helloWorld4:
package bhz.generate2; import java.util.UUID; import bhz.generate1.Trade; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.WorkHandler; public class Handler1 implements EventHandler<Trade>,WorkHandler<Trade> { @Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { this.onEvent(event); } @Override public void onEvent(Trade event) throws Exception { System.out.println("handler1: set name"); event.setName("h1"); Thread.sleep(1000); } }
package bhz.generate2; import bhz.generate1.Trade; import com.lmax.disruptor.EventHandler; public class Handler2 implements EventHandler<Trade> { @Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { System.out.println("handler2: set price"); event.setPrice(17.0); Thread.sleep(1000); } }
package bhz.generate2; import bhz.generate1.Trade; import com.lmax.disruptor.EventHandler; public class Handler3 implements EventHandler<Trade> { @Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { System.out.println("handler3: name: " + event.getName() + " , price: " + event.getPrice() + "; instance: " + event.toString()); } }
package bhz.generate2; import java.util.UUID; import bhz.generate1.Trade; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.WorkHandler; public class Handler4 implements EventHandler<Trade>,WorkHandler<Trade> { @Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { this.onEvent(event); } @Override public void onEvent(Trade event) throws Exception { System.out.println("handler4: get name : " + event.getName()); event.setName(event.getName() + "h4"); } }
package bhz.generate2; import java.util.UUID; import bhz.generate1.Trade; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.WorkHandler; public class Handler5 implements EventHandler<Trade>,WorkHandler<Trade> { @Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { this.onEvent(event); } @Override public void onEvent(Trade event) throws Exception { System.out.println("handler5: get price : " + event.getPrice()); event.setPrice(event.getPrice() + 3.0); } }
package bhz.generate2; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import bhz.generate1.Trade; import bhz.generate1.TradeHandler; import com.lmax.disruptor.BusySpinWaitStrategy; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.EventHandlerGroup; import com.lmax.disruptor.dsl.ProducerType; public class Main { public static void main(String[] args) throws InterruptedException { long beginTime=System.currentTimeMillis(); int bufferSize=1024; ExecutorService executor=Executors.newFixedThreadPool(8); Disruptor<Trade> disruptor = new Disruptor<Trade>(new EventFactory<Trade>() { @Override public Trade newInstance() { return new Trade(); } }, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy()); //Diamond operation /** //Create consumer groups C1,C2 using disruptor EventHandlerGroup<Trade> handlerGroup = disruptor.handleEventsWith(new Handler1(), new Handler2()); //Declare that JMS message sending is performed after C1,C2 is complete, that is, the process goes to C3 handlerGroup.then(new Handler3()); */ //Sequential operation /** disruptor.handleEventsWith(new Handler1()). handleEventsWith(new Handler2()). handleEventsWith(new Handler3()); */ //Hexagon operation. /** Handler1 h1 = new Handler1(); Handler2 h2 = new Handler2(); Handler3 h3 = new Handler3(); Handler4 h4 = new Handler4(); Handler5 h5 = new Handler5(); disruptor.handleEventsWith(h1, h2); disruptor.after(h1).handleEventsWith(h4); disruptor.after(h2).handleEventsWith(h5); disruptor.after(h4, h5).handleEventsWith(h3); */ disruptor.start();//start-up CountDownLatch latch=new CountDownLatch(1); //Producer preparation executor.submit(new TradePublisher(latch, disruptor)); latch.await();//Waiting for the producer to finish. disruptor.shutdown(); executor.shutdown(); System.out.println("Time taken for tests:"+(System.currentTimeMillis()-beginTime)); } }
package bhz.generate2; import java.util.Random; import java.util.concurrent.CountDownLatch; import bhz.generate1.Trade; import com.lmax.disruptor.EventTranslator; import com.lmax.disruptor.dsl.Disruptor; public class TradePublisher implements Runnable { Disruptor<Trade> disruptor; private CountDownLatch latch; private static int LOOP=10;//Simulate millions of transactions public TradePublisher(CountDownLatch latch,Disruptor<Trade> disruptor) { this.disruptor=disruptor; this.latch=latch; } @Override public void run() { TradeEventTranslator tradeTransloator = new TradeEventTranslator(); for(int i=0;i<LOOP;i++){ disruptor.publishEvent(tradeTransloator); } latch.countDown(); } } class TradeEventTranslator implements EventTranslator<Trade>{ private Random random=new Random(); @Override public void translateTo(Trade event, long sequence) { this.generateTrade(event); } private Trade generateTrade(Trade trade){ trade.setPrice(random.nextDouble()*9999); return trade; } }