kafka is liked by many enterprises because of its high-performance sending and consumption ability. So let's look at some source code implementations of kafka as follows:
1 public void run() { 2 int messageNo = 1; 3 while (true) { 4 String messageStr = "Message_" + messageNo; 5 long startTime = System.currentTimeMillis(); 6 if (isAsync) { 7 producer.send(new ProducerRecord<>(topic, 8 messageNo, 9 messageStr), new DemoCallBack(startTime, messageNo, messageStr));// Asynchronous transmission 10 } else { 11 try { 12 producer.send(new ProducerRecord<>(topic, 13 messageNo, 14 messageStr)).get();// Synchronous transmission 15 System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")"); 16 } catch (InterruptedException | ExecutionException e) { 17 e.printStackTrace(); 18 } 19 } 20 ++messageNo; 21 } 22 }
This code excerpt is a fragment in the message demo (kafka.examples.Producer) sent by the kafka source producer, which mainly involves two knowledge points: one is to send messages asynchronously,
The implementation of callback function, the other is synchronous sending, the realization of multi-threaded Future.get mode. Now I will elaborate the two ways of realization.
Asynchronous callback mode
In fact, this method is mainly used in calling multiple threads to execute a task, without waiting for the thread to complete to get the corresponding feedback information. For example, the Client side needs to call the Server side to perform a task, and wants the Server side to complete the execution
Actively tell the corresponding results to the Client. This process is called callback. The following code:
1 public class Client implements CSCallBack { 2 3 private volatile boolean stopThread = false; 4 private Server server; 5 6 public Client(Server server) { 7 this.server = server; 8 } 9 10 public void sendMsg(final String msg){ 11 System.out.println("ThreadName="+Thread.currentThread().getName()+" Client: the message sent is:" + msg); 12 new Thread(new Runnable() { 13 @Override 14 public void run() { 15 server.getClientMsg(Client.this,msg);// Core code 1: take callee as parameter(client)Pass to caller(Server) 16 17 while(!stopThread) {// The simulation waits for another server-side code to complete 18 System.out.println("ThreadName="+Thread.currentThread().getName()+"Client: simulation waiting for callback to complete"); 19 20 try { 21 Thread.sleep(50); 22 } catch (InterruptedException e) { 23 e.printStackTrace(); 24 } 25 } 26 } 27 }).start(); 28 System.out.println("ThreadName="+Thread.currentThread().getName()+" Client: asynchronous sending succeeded"); 29 } 30 31 @Override 32 public void process(String status) { 33 stopThread = true; 34 System.out.println("ThreadName="+Thread.currentThread().getName()+" Client: the status of the callback received from the server is:" + status); 35 } 36 }
1 public class Server { 2 3 public void getClientMsg(CSCallBack csCallBack , String msg) { 4 5 6 // Simulation server needs to process data 7 try { 8 new Thread(new Runnable() { 9 @Override 10 public void run() { 11 System.out.println("ThreadName="+Thread.currentThread().getName()+" Server: the server receives the message sent by the client as:" + msg); 12 while(true) { 13 int max=10,min=1; 14 int ranNum = (int) (Math.random()*(max-min)+min); 15 16 if(ranNum >6) {// When the random number is greater than 5, the task is considered completed 17 System.out.println("ThreadName="+Thread.currentThread().getName()+" Server side:Data processing successful, return to success status 200"); 18 String status = "200"; 19 csCallBack.process(status);// Core code 2: Caller(Server)After the task processing completes the corresponding task, call the callee(Client)Method to inform the completion of the task 20 break; 21 } 22 23 try { 24 Thread.sleep(80); 25 } catch (InterruptedException e) { 26 e.printStackTrace(); 27 } 28 } 29 } 30 }).start(); 31 32 } catch (Exception e) { 33 e.printStackTrace(); 34 } 35 36 } 37 }
In fact, there are two core codes:
Client side: the callee itself is passed to the caller (Server) as a parameter (client).
Server side: after the caller (Server) completes the corresponding task processing, the method of the callee (Client) is called to inform the task to complete.
Synchronous sending multithreaded Future.get mode implementation
This method is mainly used to wait for a task to be completed and then execute a task in sequence. As in the above example, the client side requests the server side to complete a task, and expects the server side to return the result after completing the task
Example code is as follows:
1 public class FutureDemo { 2 3 protected RealData realdata = null; 4 protected boolean isReady = false; 5 public synchronized void requestData(RealData realdata) {// client request server Complete a task 6 if (isReady) { 7 return; 8 } 9 this.realdata = realdata; 10 isReady = true; 11 notifyAll();//Core code 2: wake up the waiting thread when the requested task processing is completed 12 } 13 14 public synchronized String getResult() {// client wait for server Return after completing the task,This is equivalent to Future.get 15 while (!isReady) { 16 try { 17 wait();//Core code 1: wait for the thread to be activated after the request is made 18 } catch (InterruptedException e) { 19 } 20 } 21 return realdata.result; 22 } 23 }
The core implementation code is actually the implementation of wait and notify in multithreading. The biggest difference between asynchronous callback and synchronous Future get mode, for example,
The wife (client side) loves her husband very much. The husband (server side) works overtime very late every day. The wife will wait for the husband to go home and make him a midnight snack (synchronous Future get mode).
The wife (client side) loves her husband very much. The husband (server side) works overtime very late every day. The wife thinks it's too tired to wait all the time, so she goes to bed first, informs the wife (callback) when the husband comes back, and then the wife makes a midnight snack for her husband (asynchronous callback mode).
So we all expect our wife to be Future get mode or asynchronous callback mode?