Look at kafka of source code programming series

kafka is liked by many enterprises because of its high-performance sending and consumption ability. So let's look at some source code implementations of kafka as follows:

 1  public void run() {
 2         int messageNo = 1;
 3         while (true) {
 4             String messageStr = "Message_" + messageNo;
 5             long startTime = System.currentTimeMillis();
 6             if (isAsync) { 
 7                 producer.send(new ProducerRecord<>(topic,
 8                     messageNo,
 9                     messageStr), new DemoCallBack(startTime, messageNo, messageStr));// Asynchronous transmission
10             } else { 
11                 try {
12                     producer.send(new ProducerRecord<>(topic,
13                         messageNo,
14                         messageStr)).get();// Synchronous transmission
15                     System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
16                 } catch (InterruptedException | ExecutionException e) {
17                     e.printStackTrace();
18                 }
19             }
20             ++messageNo;
21         }
22     }

This code excerpt is a fragment in the message demo (kafka.examples.Producer) sent by the kafka source producer, which mainly involves two knowledge points: one is to send messages asynchronously,

The implementation of callback function, the other is synchronous sending, the realization of multi-threaded Future.get mode. Now I will elaborate the two ways of realization.

Asynchronous callback mode

In fact, this method is mainly used in calling multiple threads to execute a task, without waiting for the thread to complete to get the corresponding feedback information. For example, the Client side needs to call the Server side to perform a task, and wants the Server side to complete the execution

Actively tell the corresponding results to the Client. This process is called callback. The following code:

  

 1 public class Client implements CSCallBack {
 2     
 3     private volatile boolean stopThread = false;
 4     private Server server;
 5 
 6     public Client(Server server) {
 7         this.server = server;
 8     }
 9 
10     public void sendMsg(final String msg){
11         System.out.println("ThreadName="+Thread.currentThread().getName()+" Client: the message sent is:" + msg);
12         new Thread(new Runnable() {
13             @Override
14             public void run() {
15                 server.getClientMsg(Client.this,msg);// Core code 1: take callee as parameter(client)Pass to caller(Server)
16                 
17                 while(!stopThread) {// The simulation waits for another server-side code to complete
18                     System.out.println("ThreadName="+Thread.currentThread().getName()+"Client: simulation waiting for callback to complete");
19                     
20                     try {
21                         Thread.sleep(50);
22                     } catch (InterruptedException e) {
23                         e.printStackTrace();
24                     }
25                 }
26             }
27         }).start();
28         System.out.println("ThreadName="+Thread.currentThread().getName()+" Client: asynchronous sending succeeded");
29     }
30 
31     @Override
32     public void process(String status) {
33         stopThread = true;
34         System.out.println("ThreadName="+Thread.currentThread().getName()+" Client: the status of the callback received from the server is:" + status);
35     }
36 }

 

 1 public class Server {
 2 
 3     public void getClientMsg(CSCallBack csCallBack , String msg) {
 4         
 5 
 6         // Simulation server needs to process data
 7         try {
 8              new Thread(new Runnable() {
 9                  @Override
10                  public void run() {
11                      System.out.println("ThreadName="+Thread.currentThread().getName()+" Server: the server receives the message sent by the client as:" + msg);
12                      while(true) {
13                         int max=10,min=1;
14                          int ranNum = (int) (Math.random()*(max-min)+min); 
15                          
16                          if(ranNum >6) {//  When the random number is greater than 5, the task is considered completed
17                               System.out.println("ThreadName="+Thread.currentThread().getName()+" Server side:Data processing successful, return to success status 200");
18                              String status = "200";
19                              csCallBack.process(status);// Core code 2: Caller(Server)After the task processing completes the corresponding task, call the callee(Client)Method to inform the completion of the task
20                              break;
21                          }
22                          
23                          try {
24                             Thread.sleep(80);
25                         } catch (InterruptedException e) {
26                             e.printStackTrace();
27                         }
28                      }
29                  }
30              }).start();
31             
32         } catch (Exception e) {
33             e.printStackTrace();
34         }
35        
36     }
37 }

         

In fact, there are two core codes:

Client side: the callee itself is passed to the caller (Server) as a parameter (client).

Server side: after the caller (Server) completes the corresponding task processing, the method of the callee (Client) is called to inform the task to complete.

Synchronous sending multithreaded Future.get mode implementation

This method is mainly used to wait for a task to be completed and then execute a task in sequence. As in the above example, the client side requests the server side to complete a task, and expects the server side to return the result after completing the task

Example code is as follows:

  

 1 public class FutureDemo {
 2 
 3     protected RealData realdata = null;
 4     protected boolean isReady = false;
 5     public synchronized void requestData(RealData realdata) {// client request server Complete a task
 6         if (isReady) {                        
 7             return;     
 8         }
 9         this.realdata = realdata;
10         isReady = true;
11         notifyAll();//Core code 2: wake up the waiting thread when the requested task processing is completed
12     }
13     
14     public synchronized String getResult() {// client wait for server Return after completing the task,This is equivalent to Future.get 
15         while (!isReady) {
16             try {
17                 wait();//Core code 1: wait for the thread to be activated after the request is made
18             } catch (InterruptedException e) {
19             }
20         }
21         return realdata.result;
22     }
23 }

The core implementation code is actually the implementation of wait and notify in multithreading. The biggest difference between asynchronous callback and synchronous Future get mode, for example,

The wife (client side) loves her husband very much. The husband (server side) works overtime very late every day. The wife will wait for the husband to go home and make him a midnight snack (synchronous Future get mode).

The wife (client side) loves her husband very much. The husband (server side) works overtime very late every day. The wife thinks it's too tired to wait all the time, so she goes to bed first, informs the wife (callback) when the husband comes back, and then the wife makes a midnight snack for her husband (asynchronous callback mode).

 

So we all expect our wife to be Future get mode or asynchronous callback mode?

Keywords: Java kafka Fragment

Added by fatherlyons on Fri, 22 Nov 2019 21:53:14 +0200