What is Disruptor?
Disruptor is a high-performance asynchronous processing framework, which can be considered as the fastest message framework (lightweight JMS), an implementation of observer mode, or an implementation of event listening mode.
Performance is much higher than traditional BlockingQueue containers
Disruptor uses the observer mode to actively send messages to consumers instead of waiting for consumers to get them from the queue. It implements concurrent operation of queue (Ring Buffer) without locking, and its performance is much higher than that of Blocking Queue.
Disruptor's Design Idea
Ring Array Structure
To avoid garbage collection, using arrays makes the cache mechanism of arrays more friendly to processors
Array length is 2^n, through bit operation, speed up the positioning speed, subscript incremental way, do not worry about index overflow
No lock design
Each producer or consumer thread first applies for the location of the operable elements in the array, and then writes or reads data directly at that location.
Disruptor Realizes Production and Consumption Model
pom
<dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.2.1</version> </dependency>
LongEvent
// Declare an Event to contain the data to be passed public class LongEvent { private Long value; public Long getValue() { return value; } public void setValue(Long value) { this.value = value; } }
LongEventFactory
// Event factory public class LongEventFactory implements EventFactory<LongEvent> { public LongEvent newInstance() { return new LongEvent(); } }
LongEventHandler
// Event consumers public class LongEventHandler implements EventHandler<LongEvent> { public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception { System.out.println("Consumer:"+event.getValue()); } }
LongEventProducer
public class LongEventProducer { private RingBuffer<LongEvent> ringBuffer; public LongEventProducer(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(ByteBuffer byteBuffer) { // Get event queue subscript position long sequence = ringBuffer.next(); try { // Remove empty queue LongEvent longEvent = ringBuffer.get(sequence); // assignment longEvent.setValue(byteBuffer.getLong(0)); } catch (Exception e) { e.printStackTrace(); } finally { System.out.println("Producers send data..."); // send data ringBuffer.publish(sequence); } } }
Main
public class Main { public static void main(String[] args) { // Create a cacheable thread pool ExecutorService executorService = Executors.newCachedThreadPool(); // Create a factory EventFactory eventFactory = new LongEventFactory(); // Create ringBufferSize int ringBufferSize = 1024 * 1024; // Create disruptor // MULTI denotes that there can be multiple producers Disruptor<LongEvent> longEventDisruptor = new Disruptor<LongEvent>(eventFactory, ringBufferSize, executorService, ProducerType.MULTI, new YieldingWaitStrategy()); // Registered Consumers longEventDisruptor.handleEventsWith(new LongEventHandler()); // start-up longEventDisruptor.start(); // Creating RingBuffer Containers RingBuffer<LongEvent> ringBuffer = longEventDisruptor.getRingBuffer(); // Creating Producers LongEventProducer longEventProducer = new LongEventProducer(ringBuffer); // Specify buffer size ByteBuffer byteBuffer = ByteBuffer.allocate(8); for (int i = 0; i < 100; i++) { byteBuffer.putLong(0, i); longEventProducer.onData(byteBuffer); } executorService.shutdown(); longEventDisruptor.shutdown(); } }
What is RingBuffer
It is a ring (the end-to-end ring) that stores data and transfers data between different threads.
Each RingBuffer block has a serial number that points to the next available element in the ring array structure.
As the filling ring is continuously written in, the pointer ordinal number increases until the ring is bypassed.
If the circle is full, it will cover the gold data, as shown in the figure above: the next area in the area of 12 is currently 3. If new data arrives, the data of area 3 will be covered into 13 when the pointer moves downward. The framework provides a series of monitoring to help us parallel consumption, which will control the speed between producers and consumers well, so as to achieve production and consumer. Balance between consumption
Why is Ring Buffer efficient?
Using arrays, arrays support index access
The memory allocation of arrays is pre-loaded. Once the specified size is created, it always exists, which means that it does not need to spend a lot of time doing garbage collection, while the blocking queue is implemented by a linked list, which requires constant deletion and creation of nodes.
The Core Concept of Disruptor
- RingBuffer: The implementation of the underlying data structure of Disruptor, the core class, is a transit point for data exchange between threads
- Sequence: Serial number, declaring a serial number to track changes in tasks in ringbuffer and consumer consumption
- Sequencer: A bridge between producer and buffer RingBuffer. Single producer and multi producer correspond to two implementations of Single Producer Sequencer and Multiproducer Sequencer, respectively. Sequencer is used to request space from RingBuffer. Sequence Barrier is notified through waitStrategy of all pending consumable events using publish method.
- Sequence Barrier: Number fence, manages and coordinates the producer's cursor number and the consumer's serial number, ensures that the producer does not cover the information that the consumer will be able to process in the future, and ensures that the dependent consumers can be processed in the right order.
- WaitStrategy: There are several implementations to represent consumers'waiting strategies when there are no consumable events
- Event: Consumer Events
- Event Processor: Event Processor listens for RingBuffer events and consumes available events. Events read from RingBuffer are consumed by the actual producer implementation class, which listens for the next available serial number until the event corresponding to that serial number is ready.
- EventHandler: Business Processor, which is the interface of the actual consumer, completes the implementation of specific business logic. The third party implements the interface, which represents the consumer.
- Producer: Producer interface, where third-party threads play that role, producer writes events to RingBuffer