Disruptor Concurrency Framework, Core Component RingBuffer

Introduction to 1.1 Disruptor Concurrency Framework

Martin Fowler wrote an article on his website about the LMAX architecture, in which he described LMAX as a new retail financial trading platform that can generate a large number of transactions with very low latency.This system is built on the JVM platform and its core is a business logic processor that can process 6 million orders per second in one thread.Business logic processors run entirely in memory, using an event source-driven approach.The core of the business logic processor is Disruptor.
Disruptor is an open source concurrency framework, and won the 2011 Duke's Program Framework Innovation Award, enabling the Queue concurrent operation of the network without locks.
Disruptor is a high-performance asynchronous processing framework, which can be considered the fastest messaging framework (lightweight JMS), an implementation of the observer mode, or an implementation of the event listening mode.

1.2 Disruptor Concurrency Framework uses

At present, we have updated the disruptor to version 3.x, which provides better performance and more ways to use the API than the previous version 2.x.
Download disruptor-3.3.2.jar to introduce our project and start a disruptor journey.
Before using it, explain the main features of disruptor, which you can understand as an efficient producer-consumer model.It also has much higher performance than traditional BlockingQueue containers.
Official Learning Website: http://ifeve.com/disruptor-get-start/

In Disruptor, we want to implement hello world in the following steps:
First: Create an Event class//a specific data
Second: Create a factory Event class for creating Event class instance objects//data type specification
Third, there needs to be a listening event class for processing data (Event class)
Fourth, we need to write test code.Instantiate a Disruptor instance and configure a series of parameters.We then bind the listen event class to the Disruptor instance to accept and process the data.
Fifth: In Disruptor, the core of the real data storage is called RingBuffer. We get it through the Disruptor instance, then we produce the data and add it to the RingBuffer instance object.

Let's take a look at this HelloWorld program: com.bjsxt.base
HelloWorld1:

package bhz.base;

//http://ifeve.com/disruptor-getting-started/
public class LongEvent { 
    private long value;
    public long getValue() { 
        return value; 
    } 
 
    public void setValue(long value) { 
        this.value = value; 
    } 
} 
package bhz.base;

import com.lmax.disruptor.EventFactory;
// We need disruptor to create events for us, and we also declare an EventFactory to instantiate Event objects.
public class LongEventFactory implements EventFactory { 

    @Override 
    public Object newInstance() { 
        return new LongEvent(); 
    } 
} 
package bhz.base;

import com.lmax.disruptor.EventHandler;

//We also need an event consumer, an event handler.This event handler simply prints the data stored in the event to the terminal:
public class LongEventHandler implements EventHandler<LongEvent>  {

	@Override
	public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
		System.out.println(longEvent.getValue());
	}

}
package bhz.base;

import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

public class LongEventMain {

	public static void main(String[] args) throws Exception {
		//Create Buffer Pool
		ExecutorService  executor = Executors.newCachedThreadPool();
		//Create a factory
		LongEventFactory factory = new LongEventFactory();
		//To create a bufferSize, which is the RingBuffer size, it must be 2nd power
		int ringBufferSize = 1024 * 1024; // 

		/**
		//BlockingWaitStrategy Is the least efficient strategy, but it consumes the least CPU and provides more consistent performance across different deployment environments
		WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();
		//SleepingWaitStrategy Performance is similar to BlockingWaitStrategy and CPU consumption, but it has the least impact on producer threads and is suitable for scenarios similar to asynchronous logging
		WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();
		//YieldingWaitStrategy Performance is optimal for systems with low latency.This strategy is recommended in scenarios where extremely high performance is required and the number of event processing lines is less than the logical core of the CPU; for example, the CPU turns on the hyperthreading feature
		WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
		*/
		
		//Create disruptor
		Disruptor<LongEvent> disruptor = 
				new Disruptor<LongEvent>(factory, ringBufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());
		// Connect Consumption Event Method
		disruptor.handleEventsWith(new LongEventHandler());
		
		// start-up
		disruptor.start();
		
		//Disruptor's event publishing process is a two-phase commit process:
		//Publish Events
		RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
		
		LongEventProducer producer = new LongEventProducer(ringBuffer); 
		//LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer);
		ByteBuffer byteBuffer = ByteBuffer.allocate(8);
		for(long l = 0; l<100; l++){
			byteBuffer.putLong(0, l);
			producer.onData(byteBuffer);
			//Thread.sleep(1000);
		}

		
		disruptor.shutdown();//Close the disruptor and the method will be blocked until all events have been dealt with;
		executor.shutdown();//Close the thread pool used by disruptor; if necessary, it must be closed manually, and disruptor will not shut down automatically when shutdown occurs;		
	
	}
}
package bhz.base;

import java.nio.ByteBuffer;

import com.lmax.disruptor.RingBuffer;
/**
 * It is clear that more detail is involved when publishing events in a simple queue because event objects also need to be pre-created.
 * Publishing an event requires at least two steps: getting the next event slot and publishing the event (use try/finnally when publishing the event to ensure that it will be published).
 * If we use RingBuffer.next() to get an event slot, we must publish the corresponding event.
 * If events cannot be published, then the Disruptor state can be disrupted.
 * Especially with multiple event producers, event consumers will stall and have to restart the application to recover.
 */
public class LongEventProducer {

	private final RingBuffer<LongEvent> ringBuffer;
	
	public LongEventProducer(RingBuffer<LongEvent> ringBuffer){
		this.ringBuffer = ringBuffer;
	}
	
	/**
	 * onData Used to publish events, one event per call
	 * Its parameters are passed to consumers using events
	 */
	public void onData(ByteBuffer bb){
		//1. You can think of ringBuffer as an event queue, so next is the next event slot
		long sequence = ringBuffer.next();
		try {
			//2. Use the index above to fetch an empty event to populate (get the event object corresponding to the ordinal)
			LongEvent event = ringBuffer.get(sequence);
			//3. Get business data to be delivered through events
			event.setValue(bb.getLong(0));
		} finally {
			//4. Publish Events
			//Note that the final ringBuffer.publish method must be included in finally to ensure it must be called; if a request's sequence is not committed, subsequent publishing operations or other producer s will be blocked.
			ringBuffer.publish(sequence);
		}
	}
}
package bhz.base;

import java.nio.ByteBuffer;

import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;

/**
 * Disruptor 3.0 A lambda-style API is provided.This puts some complicated operations in Ring Buffer.
 * Event Publisher or Event Translator is the best way to publish events after Disruptor 3.0
 * <B>System name: </B><BR>
 * <B>Module name: </B><BR>
 * <B>Chinese Class Name: </B><BR>
 * <B>Summary description: </B><BR>
 * @author Alenware, Beijing
 * @since 2015 November 23, 2001
 */
public class LongEventProducerWithTranslator {

	//A translator can be seen as an event initializer, which is called by the publicEvent method
	//Fill Event s
	private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = 
			new EventTranslatorOneArg<LongEvent, ByteBuffer>() {
				@Override
				public void translateTo(LongEvent event, long sequeue, ByteBuffer buffer) {
					event.setValue(buffer.getLong(0));
				}
			};
	
	private final RingBuffer<LongEvent> ringBuffer;
	
	public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) {
		this.ringBuffer = ringBuffer;
	}
	
	public void onData(ByteBuffer buffer){
		ringBuffer.publishEvent(TRANSLATOR, buffer);
	}
}

HelloWorld2:

package bhz.multi;

import java.util.concurrent.atomic.AtomicInteger;

import com.lmax.disruptor.WorkHandler;

public class Consumer implements WorkHandler<Order>{
	
	private String consumerId;
	
	private static AtomicInteger count = new AtomicInteger(0);
	
	public Consumer(String consumerId){
		this.consumerId = consumerId;
	}

	@Override
	public void onEvent(Order order) throws Exception {
		System.out.println("Current Consumer: " + this.consumerId + ",Consumption information:" + order.getId());
		count.incrementAndGet();
	}
	
	public int getCount(){
		return count.get();
	}

}
package bhz.multi;

import java.nio.ByteBuffer;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.WorkerPool;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;

public class Main {
	
	public static void main(String[] args) throws Exception {

		//Create ringBuffer
		RingBuffer<Order> ringBuffer = 
				RingBuffer.create(ProducerType.MULTI, 
						new EventFactory<Order>() {  
				            @Override  
				            public Order newInstance() {  
				                return new Order();  
				            }  
				        }, 
				        1024 * 1024, 
						new YieldingWaitStrategy());
		
		SequenceBarrier barriers = ringBuffer.newBarrier();
		
		Consumer[] consumers = new Consumer[3];
		for(int i = 0; i < consumers.length; i++){
			consumers[i] = new Consumer("c" + i);
		}
		
		WorkerPool<Order> workerPool = 
				new WorkerPool<Order>(ringBuffer, 
						barriers, 
						new IntEventExceptionHandler(),
						consumers);
		
        ringBuffer.addGatingSequences(workerPool.getWorkerSequences());  
        workerPool.start(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));  
        
        final CountDownLatch latch = new CountDownLatch(1);
        for (int i = 0; i < 100; i++) {  
        	final Producer p = new Producer(ringBuffer);
        	new Thread(new Runnable() {
				@Override
				public void run() {
					try {
						latch.await();
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					for(int j = 0; j < 100; j ++){
						p.onData(UUID.randomUUID().toString());
					}
				}
			}).start();
        } 
        Thread.sleep(2000);
        System.out.println("---------------Start production-----------------");
        latch.countDown();
        Thread.sleep(5000);
        System.out.println("Total:" + consumers[0].getCount() );
	}
	
	static class IntEventExceptionHandler implements ExceptionHandler {  
	    public void handleEventException(Throwable ex, long sequence, Object event) {}  
	    public void handleOnStartException(Throwable ex) {}  
	    public void handleOnShutdownException(Throwable ex) {}  
	} 
}
package bhz.multi;


public class Order {  
	
	private String id;//ID  
	private String name;
	private double price;//Amount of money  
	
	public String getId() {
		return id;
	}
	public void setId(String id) {
		this.id = id;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public double getPrice() {
		return price;
	}
	public void setPrice(double price) {
		this.price = price;
	}
	  
}  
package bhz.multi;

import java.nio.ByteBuffer;
import java.util.UUID;

import bhz.base.LongEvent;

import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;

/**
 * <B>System name: </B><BR>
 * <B>Module name: </B><BR>
 * <B>Chinese Class Name: </B><BR>
 * <B>Summary description: </B><BR>
 * @author Alenware, Beijing
 * @since 2015 November 23, 2001
 */
public class Producer {

	private final RingBuffer<Order> ringBuffer;
	
	public Producer(RingBuffer<Order> ringBuffer){
		this.ringBuffer = ringBuffer;
	}
	
	/**
	 * onData Used to publish events, one event per call
	 * Its parameters are passed to consumers using events
	 */
	public void onData(String data){
		//You can think of ringBuffer as an event queue, so next is the next event slot
		long sequence = ringBuffer.next();
		try {
			//Use the index above to fetch an empty event to populate (get the event object corresponding to the ordinal)
			Order order = ringBuffer.get(sequence);
			//Get business data to be passed through events
			order.setId(data);
		} finally {
			//Publish Events
			//Note that the final ringBuffer.publish method must be included in finally to ensure it must be called; if a request's sequence is not committed, subsequent publishing operations or other producer s will be blocked.
			ringBuffer.publish(sequence);
		}
	}
	
	
}

helloWorld3:

package bhz.generate1;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventProcessor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.YieldingWaitStrategy;

public class Main1 {  
   
	public static void main(String[] args) throws Exception {  
        int BUFFER_SIZE=1024;  
        int THREAD_NUMBERS=4;  
        /* 
         * createSingleProducer Create a single producer RingBuffer, 
         * The first parameter, EventFactory, is known by name as Event Factory, but its function is to generate data to fill RingBuffer's blocks. 
         * The second parameter is the size of RingBuffer, which must be an exponential multiple of 2 for the purpose of converting modulo operations to & operations to improve efficiency 
         * The third parameter is that RingBuffer's production is waiting strategy when there are no available blocks (possibly consumers (or event processors) are too slow) 
         */  
        final RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<Trade>() {  
            @Override  
            public Trade newInstance() {  
                return new Trade();  
            }  
        }, BUFFER_SIZE, new YieldingWaitStrategy());  
        
        //Create Thread Pool  
        ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS);  
        
        //Create SequenceBarrier  
        SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();  
          
        //Create Message Processor  
        BatchEventProcessor<Trade> transProcessor = new BatchEventProcessor<Trade>(  
                ringBuffer, sequenceBarrier, new TradeHandler());  
          
        //The goal of this step is to inject consumer location information references into the producer that can be omitted if there is only one consumer 
        ringBuffer.addGatingSequences(transProcessor.getSequence());  
          
        //Submit message processor to thread pool  
        executors.submit(transProcessor);  
        
        //If there are multiple consumers, repeat the above three lines of code to replace TradeHandler with another consumer class  
          
        Future<?> future= executors.submit(new Callable<Void>() {  
            @Override  
            public Void call() throws Exception {  
                long seq;  
                for(int i=0;i<10;i++){  
                    seq = ringBuffer.next();//Hold a hole--ringBuffer is a available block  
                    ringBuffer.get(seq).setPrice(Math.random()*9999);//Put data into this block 
                    ringBuffer.publish(seq);//Publish data for this block to make the handler(consumer) visible  
                }  
                return null;  
            }  
        }); 
        
        future.get();//Waiting for producer to finish  
        Thread.sleep(1000);//Wait a second, wait until consumption is processed  
        transProcessor.halt();//Notification Event (or Message) Processor can end (not immediately!!!)  
        executors.shutdown();//Terminate Thread  
    }  
}  
package bhz.generate1;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.IgnoreExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.WorkerPool;

public class Main2 {  
    public static void main(String[] args) throws InterruptedException {  
        int BUFFER_SIZE=1024;  
        int THREAD_NUMBERS=4;  
        
        EventFactory<Trade> eventFactory = new EventFactory<Trade>() {  
            public Trade newInstance() {  
                return new Trade();  
            }  
        };  
        
        RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(eventFactory, BUFFER_SIZE);  
          
        SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();  
          
        ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUMBERS);  
          
        WorkHandler<Trade> handler = new TradeHandler();  

        WorkerPool<Trade> workerPool = new WorkerPool<Trade>(ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(), handler);  
          
        workerPool.start(executor);  
          
        //Here are 8 data for this production
        for(int i=0;i<8;i++){  
            long seq=ringBuffer.next();  
            ringBuffer.get(seq).setPrice(Math.random()*9999);  
            ringBuffer.publish(seq);  
        }  
          
        Thread.sleep(1000);  
        workerPool.halt();  
        executor.shutdown();  
    }  
}  
package bhz.generate1;

import java.util.concurrent.atomic.AtomicInteger;

public class Trade {  
	
	private String id;//ID  
	private String name;
	private double price;//Amount of money  
	private AtomicInteger count = new AtomicInteger(0);
	
	public String getId() {
		return id;
	}
	public void setId(String id) {
		this.id = id;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public double getPrice() {
		return price;
	}
	public void setPrice(double price) {
		this.price = price;
	}
	public AtomicInteger getCount() {
		return count;
	}
	public void setCount(AtomicInteger count) {
		this.count = count;
	} 
	  
	  
}  
package bhz.generate1;

import java.util.UUID;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;

public class TradeHandler implements EventHandler<Trade>, WorkHandler<Trade> {  
	  
    @Override  
    public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {  
        this.onEvent(event);  
    }  
  
    @Override  
    public void onEvent(Trade event) throws Exception {  
        //Here's the specific consumption logic  
        event.setId(UUID.randomUUID().toString());//Simple Generate Lower ID  
        System.out.println(event.getId());  
    }  
}  

helloWorld4:

package bhz.generate2;

import java.util.UUID;

import bhz.generate1.Trade;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;

public class Handler1 implements EventHandler<Trade>,WorkHandler<Trade> {  
	  
    @Override  
    public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {  
        this.onEvent(event);  
    }  
  
    @Override  
    public void onEvent(Trade event) throws Exception {  
    	System.out.println("handler1: set name");
    	event.setName("h1");
    	Thread.sleep(1000);
    }  
}  
package bhz.generate2;

import bhz.generate1.Trade;

import com.lmax.disruptor.EventHandler;

public class Handler2 implements EventHandler<Trade> {  
	  
    @Override  
    public void onEvent(Trade event, long sequence,  boolean endOfBatch) throws Exception {  
    	System.out.println("handler2: set price");
    	event.setPrice(17.0);
    	Thread.sleep(1000);
    }  
      
}  
package bhz.generate2;

import bhz.generate1.Trade;

import com.lmax.disruptor.EventHandler;

public class Handler3 implements EventHandler<Trade> {
    @Override  
    public void onEvent(Trade event, long sequence,  boolean endOfBatch) throws Exception {  
    	System.out.println("handler3: name: " + event.getName() + " , price: " + event.getPrice() + ";  instance: " + event.toString());
    }  
}
package bhz.generate2;

import java.util.UUID;

import bhz.generate1.Trade;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;

public class Handler4 implements EventHandler<Trade>,WorkHandler<Trade> {  
	  
    @Override  
    public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {  
        this.onEvent(event);  
    }  
  
    @Override  
    public void onEvent(Trade event) throws Exception {  
    	System.out.println("handler4: get name : " + event.getName());
    	event.setName(event.getName() + "h4");
    }  
}  
package bhz.generate2;

import java.util.UUID;

import bhz.generate1.Trade;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;

public class Handler5 implements EventHandler<Trade>,WorkHandler<Trade> {  
	  
    @Override  
    public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {  
        this.onEvent(event);  
    }  
  
    @Override  
    public void onEvent(Trade event) throws Exception {  
    	System.out.println("handler5: get price : " + event.getPrice());
    	event.setPrice(event.getPrice() + 3.0);
    }  
}  
package bhz.generate2;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import bhz.generate1.Trade;
import bhz.generate1.TradeHandler;

import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;

public class Main {  
    public static void main(String[] args) throws InterruptedException {  
       
    	long beginTime=System.currentTimeMillis();  
        int bufferSize=1024;  
        ExecutorService executor=Executors.newFixedThreadPool(8);  

        Disruptor<Trade> disruptor = new Disruptor<Trade>(new EventFactory<Trade>() {  
            @Override  
            public Trade newInstance() {  
                return new Trade();  
            }  
        }, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy());  
        
        //Diamond operation
        /**
        //Create consumer groups C1,C2 using disruptor  
        EventHandlerGroup<Trade> handlerGroup = 
        		disruptor.handleEventsWith(new Handler1(), new Handler2());
        //Declare that JMS message sending is performed after C1,C2 is complete, that is, the process goes to C3 
        handlerGroup.then(new Handler3());
        */
        
        //Sequential operation
        /**
        disruptor.handleEventsWith(new Handler1()).
        	handleEventsWith(new Handler2()).
        	handleEventsWith(new Handler3());
        */
        
        //Hexagon operation. 
        /**
        Handler1 h1 = new Handler1();
        Handler2 h2 = new Handler2();
        Handler3 h3 = new Handler3();
        Handler4 h4 = new Handler4();
        Handler5 h5 = new Handler5();
        disruptor.handleEventsWith(h1, h2);
        disruptor.after(h1).handleEventsWith(h4);
        disruptor.after(h2).handleEventsWith(h5);
        disruptor.after(h4, h5).handleEventsWith(h3);
        */
        
        
        
        disruptor.start();//start-up  
        CountDownLatch latch=new CountDownLatch(1);  
        //Producer preparation  
        executor.submit(new TradePublisher(latch, disruptor));
        
        latch.await();//Waiting for the producer to finish. 
       
        disruptor.shutdown();  
        executor.shutdown();  
        System.out.println("Time taken for tests:"+(System.currentTimeMillis()-beginTime));  
    }  
}  
package bhz.generate2;

import java.util.Random;
import java.util.concurrent.CountDownLatch;

import bhz.generate1.Trade;

import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.dsl.Disruptor;

public class TradePublisher implements Runnable {  
	
    Disruptor<Trade> disruptor;  
    private CountDownLatch latch;  
    
    private static int LOOP=10;//Simulate millions of transactions  
  
    public TradePublisher(CountDownLatch latch,Disruptor<Trade> disruptor) {  
        this.disruptor=disruptor;  
        this.latch=latch;  
    }  
  
    @Override  
    public void run() {  
    	TradeEventTranslator tradeTransloator = new TradeEventTranslator();  
        for(int i=0;i<LOOP;i++){  
            disruptor.publishEvent(tradeTransloator);
        }  
        latch.countDown();
    }  
}  
  
class TradeEventTranslator implements EventTranslator<Trade>{  
    
	private Random random=new Random();  
    
	@Override  
    public void translateTo(Trade event, long sequence) {  
        this.generateTrade(event);  
    }  
    
	private Trade generateTrade(Trade trade){  
        trade.setPrice(random.nextDouble()*9999);  
        return trade;  
    }  
	
}  
42 original articles published, 0 praised, 267 visits
Private letter follow

Keywords: Java jvm network less

Added by rwachowiak on Mon, 20 Jan 2020 03:32:15 +0200