Core function design of RPC service caller

In the previous article, we introduced the implementation of RPC basic services. Now we will go on to how to make an RPC service into a standard product and what needs to be implemented.

It is not enough for our service consumers to only realize remote invocation, which is still a long way from commercialization.

The core functions of Consumer include:

  • Connection management
  • load balancing
  • Request routing
  • timeout handler
  • health examination

The core functions of the Provider include:

  • Queue / thread pool
  • Timeout discard
  • Elegant close
  • overload protection

1, Design and implementation of RPC service consumer core functions

1. Connection management

First, let's analyze the connection management function design of consumer.

The business logic initiates a remote call service, calls it through the proxy, first serializes it, then finds the connection, sends the request, and receives the result.

The consumer needs to maintain a long connection with the service provider to transmit the request data and return the result.

Connection management includes the following function points:

  • Initialization timing: when to create a connection, different modules have different requirements.
    • Gateway: the gateway needs to connect many modules. Some modules may have few requests. If they can be created in advance, there may be a waste of resources. Therefore, it is better for the gateway to adopt the lazy loading method, and create a connection with the corresponding module when calling the service
    • Business services can be created in advance
  • Connection number maintenance
  • Heartbeat / reconnection

2. Load balancing

Load balancing needs to ensure that the traffic of multiple service provider nodes is uniform / reasonable, and support node expansion and gray-scale publishing.

There are many load balancing algorithms

  • polling
  • random
  • Take mold
  • Weighted
  • Consistency Hash

We all know about polling, randomization and module taking. We won't talk about it here. We'll mainly talk about the weighted algorithm.

(1) Weighted load balancing design

  • Value within the weight range of 0 ~ 10
  • A higher value means a higher weight
  • The higher the weight, the greater the proportion of allocated traffic

Algorithm design:

data structure

  • Array, filled according to weight values
  • The positions of 0 and 1 are randomly disrupted

Algorithm description

  • Load balancing selects a node
  • Generate random numbers between 0 and 9, or choose an array?
  • Corresponding value: 0 uses this node, 1 does not use this node

Creation of weight array:

//  num indicates the number of 1 in the generated array. In the array, 1 indicates discarding the request and 0 indicates accepting the request
public static byte[] randomGenerator(int limit, int num) {

    byte[] tempArray = new byte[limit];

    if (num <= 0) {
        for (int i = 0; i < limit; i++) {
            tempArray[i] = 0;
        }
        return tempArray;
    }
    if (num >= limit) {
        for (int i = 0; i < limit; i++) {
            tempArray[i] = 1;
        }
        return tempArray;
    }

    //Randomly fill the array with num 1s
    Random random = new Random();
    for (int i = 0; i < num; i++) {
        int temp = Math.abs(random.nextInt()) % limit;
        while (tempArray[temp] == 1) {
            temp = Math.abs(random.nextInt()) % limit;
        }
        tempArray[temp] = 1;
    }
    return tempArray;
}

(2) Implementation of polling + weight load balancing

  • Polling to a server node
  • Then filter again according to the weight
  • Poll to next node
for (int i = start; i < start + count; i++) {
  int index = i % count;
  Server server = servers.get(index);
  if (needChooseAnotherOne.test(server)) {
    requestCount.getAndIncrement();
    continue;
  }
  
  int requestTime = this.getRequestTimeCountAndSet(server, count);
  if (server.getWeights() <10 && server.getWeights() > -1) {
    byte[] abandonArray = server.getAbandonArray();
    // abandonTimes[i] == 1 indicates that the server does not accept the request
    if (abandonArray[requestTime % abandonArray.length] == 1) {
      requestCount.getAndIncrement();
    	continue;
    }
  }
  
  if (serverState.NORMAL == server.getState()) {
    result = server;
    break;
  }
  
  requestCount.getAndIncrement();
}

(3) Request routing

The function of request routing is to filter out the list of service providing nodes that can be selected through a series of rules. It plays an important role in application isolation, read-write separation and gray-scale publishing.

The functional design of routing includes

  • Matching rules
  • behavior
  • Linked list

Matching rule design

  • Attributes to be compared
  • operator
  • Attribute matching value
  • Matching node

Data structure design

  • Linked list

IP streaming rules: attribute=IP, operator=IN, value={IP1, IP2}, servers={Node1, Node2}

The above is a classic Routing Strategy of Dubbo. We have four service providers. Now we publish in grayscale and grayscale ProviderA-1. There are two rules in the rule chain: the first rule is that all traffic will not call ProviderA-1, so there will be no traffic to ProviderA-1; The second rule is that when gray traffic reaches ProviderA-1, other traffic is accessed normally.

(4) Timeout processing

Caller timeout processing:

  • Worker thread blocking
    • Waiting for packet return notification
  • Timeout logic
    • Worker waiting for notification
    • Data return termination wait
    • Timeout throw exception
  1. Caller timeout processing
/**Proxy class requests send and receive call logic
 * 1,Register WindowData
 * 2,Asynchronous transmission
 * 3,Wait for data to arrive
 * 4,Log out of WindowData
 */
public Protocol request(Protocol requestProtocol) throws Exception {
  // Server status judgment
  if (ServerState.Reboot == state || ServerState.Dead == state) {
    throw new RebootException();
  }
  
  increaseCU();
  
  CSocket socket = null;
  try {
    try {
      socket = socketPool.getSocket();
      byte[] data = requestProtocol.toBytes(socket.isRights(), socket.getDESKey()); // serialize
      // Register WondowData and put it into the Map of session windowdata
      socket.registerRec(requestProtocol.getSessionId());
      // Send asynchronously and put the data into the sending queue
      socket.send(data);
    } catch (TimeoutException e) {
      timeout();
      throw e;
    } catch (IOException e) {
      if (socket == null || !socket.connecting()) {
        if (testServerSocket() != ServerState.Normal) {
          this.asDeath();
          logger.info("this server : {}  is dead , will choose another one !", address);
//        logger.error(String.format("server %s is dead", new Object[]{this.address}), e);
          throw new RebootException();
        }
      }
      throw e;
    } catch (Exception e) {
      throw e;
    } finally {
      if (socket != null) {
        socket.dispose();
      }
    }
    
    // receive data 
    byte[] buffer = socket.receive(requestProtocol.getSessionId(), currUserCount);
    Protocol receiveProtocol = Protocol.fromBytes(buffer, socket.isRights(), socket.getDESKey());
    
    return receiveProtocol;
  } finally {
    if (socket != null) {
      if (sockett != null) {
        // Log out of windowsdata
        socket.unregisterRec(requestProtocol.getSessionId());
      }
    }
  }
}

// Data transmission implementation logic
// 1. Register the Map that sends the event sessionid windowdata
public void registerRec(int sessionId) {
  AutoResetEvent event = new AutoResetEvent();
  WindowData wd = new WindowData(event);
  WaitWindows.put(sessionId, wd);
}

// 2. Asynchronous transmission
public void send(byte[] data) {
  try {
    if (null != transmitter) {
      TiresiasClientHelper.getInstance().setEndPoint(channel);
      TransmitterTask task = new TransmitterTask(this, data);
      transmitter.invoke(task);
    }
  } catch (NotYetConnectedException ex) {
    _connecting = false;
    throw ex;
  }
}

public void invoke(TransmitterTask task) {
  int size = wqueue.size();
  if (size > 1024 * 64) {
    logger.warn(Version.ID + " send queue is to max size is:" + size);
  }
  //Put in queue and send asynchronously
  wqueue.offer(task);
}

// Send thread processing logic
class SendTask implements Runnable {
  @Override
  public void run() {
    int offset = 0;
    // Cache data for aggregate sending
    TransmitterTask[] elementData = new TransmitterTask[5];
    int waitTime = 0;
    for (;;) {
      try {
        TransmitterTask task = wqueue.poll(waitTime, TimeUnit.MILLISECONDS);
        
        // There is no request data in the request queue. Send data
        if (null == task) {
          if (elementData.length > 0 && offset > 0) {
            send(elementData, offset);
            offset = 0;
            arrayClear(elementData);
          }
          waitTime = 10;
          continue;
        }
        
        // Array full, send data
        if (offset == 5) {
          // send out
          if (null != elementData) {
            send(elementData, offset);
          }
          offset = 0;
          arrayClear(elementData);
        }
        
        // It is not sent in real time and is temporarily put into the array
        elementData[offset] = task;
        waitTime = 0;
        ++offset;
      } catch (Exception ex) {
        offset = 0;
        arrayClear(elementData);
        ex.printStackTrace();
      } catch (Throwable e) {
        e.printStackTrace();
      }
    }
  }
}

// Worker thread receiving data processing logic
public byte[] receive(int sessionId, int queueLen) {
  // Get WindowData
  WindowData wd = WaitWindons.get(sessionId);
  if (wd == null) {
    throw new RuntimeException("Need invoke 'registerRec' method before invoke 'receive' method!");
  }
  
  AutoResetEvent event = wd.getEvent();
  // Wait for data to arrive event
  if (!event.waitOne(socketConfig.getReceiveTimeout())) {
    throw new TimeoutException("ServiceIP:[" + this.getServiceIP() + "],Receive data timeout or error!timeout:" +	 		 socketConfig.getReceiveTimeout() + "ms,queue length:"
                + queueLen);
  }
  
  // Get Data from WindowData
  byte[] data = wd.getData();
  int offset = SFPStruct.Version;
  int len = ByteConverter.bytesToIntLitterEndian(data, offset);
  if (len != data.length) {
    throw new ProtocolException("The data length inconsistent!datalen:" + data.length + ",check len:" + len);
  }
  return data;
}

// AutoResetEvent implementation
public class AutoResetEvent {
  CountDownLatch cdl;
  
  public AutoResetEvent() {
        cdl = new CountDownLatch(1);
    }

    public AutoResetEvent(int waitCount) {
        cdl = new CountDownLatch(waitCount);
    }

    public void set() {
        cdl.countDown();
    }

    public boolean waitOne(long time) {
        try {
            return cdl.await(time, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

// Receive thread processing logic
public void decode(ByteBuffer receiveBuffer, byte[] receiveArray) throws Exception {
  try {
    int limit = receiveBuffer.limit();
    int num = 0;
    for (; num < limit; num++) {
      byte b = receiveArray[num];
      receiveData.write(b);
      
      if (b == ProtocolConst.P_END_TAG[index]) {
        index++;
        if (index == ProtocolConst.P_END_TAG.length) {
          byte[] pak = receiveData.toByteArray(ProtocolConst.P_START_TAG.length, receiveData.size() - ProtocolConst.P_END_TAG.length - ProtocolConst.P_START_TAG.length);
          
          // Resolve the SessionId in the returned package
          int pSessionId = ByteConverter.bytesToIntLittleEndian(pak, SFPStruct.Version + SFPStruct.TotalLen);
          
          // Obtain the corresponding windowsdata according to the sessionId
          WindowData wd = WaitWindows.get(pSessionId);
          if (wd != null) {
            if (wd.getFlag() == 0) {
              // Put the returned data into WindowData
              wd.setData(pak);
              // Call countDown of CountDownlatch to end the waiting of the worker thread
              wd.getEvent().set();
            } else if (wd.getFlag() == 1) {
              // asynchronous
              if (null != unregisterRec(pSessionId)) {
                wd.getReceiveHandler().notify(pak, wd.getInvokeCnxn());
              }
            } else if (wd.getFlag() == 2) {
              // asynchronous
              logger.info("unsupport request type");
            }
          }
          
          index = 0;
          receiveData.reset();
          continue;
        }
      } else if (index != 0) {
        if (b == ProtocolConst.P_END_TAG[0]) {
          index = 1;
        } else {
          index = 0;
        }
      }
    }
  } catch (Exception e) {
    index = 0;
    ex.printStackTrace();
    receiveData.clear()
  }
}

Keywords: Java Distribution rpc

Added by Nunners on Sat, 18 Sep 2021 15:04:42 +0300