In the previous article, we introduced the implementation of RPC basic services. Now we will go on to how to make an RPC service into a standard product and what needs to be implemented.
It is not enough for our service consumers to only realize remote invocation, which is still a long way from commercialization.
The core functions of Consumer include:
- Connection management
- load balancing
- Request routing
- timeout handler
- health examination
The core functions of the Provider include:
- Queue / thread pool
- Timeout discard
- Elegant close
- overload protection
1, Design and implementation of RPC service consumer core functions
1. Connection management
First, let's analyze the connection management function design of consumer.
The business logic initiates a remote call service, calls it through the proxy, first serializes it, then finds the connection, sends the request, and receives the result.
The consumer needs to maintain a long connection with the service provider to transmit the request data and return the result.
Connection management includes the following function points:
- Initialization timing: when to create a connection, different modules have different requirements.
- Gateway: the gateway needs to connect many modules. Some modules may have few requests. If they can be created in advance, there may be a waste of resources. Therefore, it is better for the gateway to adopt the lazy loading method, and create a connection with the corresponding module when calling the service
- Business services can be created in advance
- Connection number maintenance
- Heartbeat / reconnection
2. Load balancing
Load balancing needs to ensure that the traffic of multiple service provider nodes is uniform / reasonable, and support node expansion and gray-scale publishing.
There are many load balancing algorithms
- polling
- random
- Take mold
- Weighted
- Consistency Hash
We all know about polling, randomization and module taking. We won't talk about it here. We'll mainly talk about the weighted algorithm.
(1) Weighted load balancing design
- Value within the weight range of 0 ~ 10
- A higher value means a higher weight
- The higher the weight, the greater the proportion of allocated traffic
Algorithm design:
data structure
- Array, filled according to weight values
- The positions of 0 and 1 are randomly disrupted
Algorithm description
- Load balancing selects a node
- Generate random numbers between 0 and 9, or choose an array?
- Corresponding value: 0 uses this node, 1 does not use this node
Creation of weight array:
// num indicates the number of 1 in the generated array. In the array, 1 indicates discarding the request and 0 indicates accepting the request public static byte[] randomGenerator(int limit, int num) { byte[] tempArray = new byte[limit]; if (num <= 0) { for (int i = 0; i < limit; i++) { tempArray[i] = 0; } return tempArray; } if (num >= limit) { for (int i = 0; i < limit; i++) { tempArray[i] = 1; } return tempArray; } //Randomly fill the array with num 1s Random random = new Random(); for (int i = 0; i < num; i++) { int temp = Math.abs(random.nextInt()) % limit; while (tempArray[temp] == 1) { temp = Math.abs(random.nextInt()) % limit; } tempArray[temp] = 1; } return tempArray; }
(2) Implementation of polling + weight load balancing
- Polling to a server node
- Then filter again according to the weight
- Poll to next node
for (int i = start; i < start + count; i++) { int index = i % count; Server server = servers.get(index); if (needChooseAnotherOne.test(server)) { requestCount.getAndIncrement(); continue; } int requestTime = this.getRequestTimeCountAndSet(server, count); if (server.getWeights() <10 && server.getWeights() > -1) { byte[] abandonArray = server.getAbandonArray(); // abandonTimes[i] == 1 indicates that the server does not accept the request if (abandonArray[requestTime % abandonArray.length] == 1) { requestCount.getAndIncrement(); continue; } } if (serverState.NORMAL == server.getState()) { result = server; break; } requestCount.getAndIncrement(); }
(3) Request routing
The function of request routing is to filter out the list of service providing nodes that can be selected through a series of rules. It plays an important role in application isolation, read-write separation and gray-scale publishing.
The functional design of routing includes
- Matching rules
- behavior
- Linked list
Matching rule design
- Attributes to be compared
- operator
- Attribute matching value
- Matching node
Data structure design
- Linked list
IP streaming rules: attribute=IP, operator=IN, value={IP1, IP2}, servers={Node1, Node2}
The above is a classic Routing Strategy of Dubbo. We have four service providers. Now we publish in grayscale and grayscale ProviderA-1. There are two rules in the rule chain: the first rule is that all traffic will not call ProviderA-1, so there will be no traffic to ProviderA-1; The second rule is that when gray traffic reaches ProviderA-1, other traffic is accessed normally.
(4) Timeout processing
Caller timeout processing:
- Worker thread blocking
- Waiting for packet return notification
- Timeout logic
- Worker waiting for notification
- Data return termination wait
- Timeout throw exception
- Caller timeout processing
/**Proxy class requests send and receive call logic * 1,Register WindowData * 2,Asynchronous transmission * 3,Wait for data to arrive * 4,Log out of WindowData */ public Protocol request(Protocol requestProtocol) throws Exception { // Server status judgment if (ServerState.Reboot == state || ServerState.Dead == state) { throw new RebootException(); } increaseCU(); CSocket socket = null; try { try { socket = socketPool.getSocket(); byte[] data = requestProtocol.toBytes(socket.isRights(), socket.getDESKey()); // serialize // Register WondowData and put it into the Map of session windowdata socket.registerRec(requestProtocol.getSessionId()); // Send asynchronously and put the data into the sending queue socket.send(data); } catch (TimeoutException e) { timeout(); throw e; } catch (IOException e) { if (socket == null || !socket.connecting()) { if (testServerSocket() != ServerState.Normal) { this.asDeath(); logger.info("this server : {} is dead , will choose another one !", address); // logger.error(String.format("server %s is dead", new Object[]{this.address}), e); throw new RebootException(); } } throw e; } catch (Exception e) { throw e; } finally { if (socket != null) { socket.dispose(); } } // receive data byte[] buffer = socket.receive(requestProtocol.getSessionId(), currUserCount); Protocol receiveProtocol = Protocol.fromBytes(buffer, socket.isRights(), socket.getDESKey()); return receiveProtocol; } finally { if (socket != null) { if (sockett != null) { // Log out of windowsdata socket.unregisterRec(requestProtocol.getSessionId()); } } } } // Data transmission implementation logic // 1. Register the Map that sends the event sessionid windowdata public void registerRec(int sessionId) { AutoResetEvent event = new AutoResetEvent(); WindowData wd = new WindowData(event); WaitWindows.put(sessionId, wd); } // 2. Asynchronous transmission public void send(byte[] data) { try { if (null != transmitter) { TiresiasClientHelper.getInstance().setEndPoint(channel); TransmitterTask task = new TransmitterTask(this, data); transmitter.invoke(task); } } catch (NotYetConnectedException ex) { _connecting = false; throw ex; } } public void invoke(TransmitterTask task) { int size = wqueue.size(); if (size > 1024 * 64) { logger.warn(Version.ID + " send queue is to max size is:" + size); } //Put in queue and send asynchronously wqueue.offer(task); } // Send thread processing logic class SendTask implements Runnable { @Override public void run() { int offset = 0; // Cache data for aggregate sending TransmitterTask[] elementData = new TransmitterTask[5]; int waitTime = 0; for (;;) { try { TransmitterTask task = wqueue.poll(waitTime, TimeUnit.MILLISECONDS); // There is no request data in the request queue. Send data if (null == task) { if (elementData.length > 0 && offset > 0) { send(elementData, offset); offset = 0; arrayClear(elementData); } waitTime = 10; continue; } // Array full, send data if (offset == 5) { // send out if (null != elementData) { send(elementData, offset); } offset = 0; arrayClear(elementData); } // It is not sent in real time and is temporarily put into the array elementData[offset] = task; waitTime = 0; ++offset; } catch (Exception ex) { offset = 0; arrayClear(elementData); ex.printStackTrace(); } catch (Throwable e) { e.printStackTrace(); } } } } // Worker thread receiving data processing logic public byte[] receive(int sessionId, int queueLen) { // Get WindowData WindowData wd = WaitWindons.get(sessionId); if (wd == null) { throw new RuntimeException("Need invoke 'registerRec' method before invoke 'receive' method!"); } AutoResetEvent event = wd.getEvent(); // Wait for data to arrive event if (!event.waitOne(socketConfig.getReceiveTimeout())) { throw new TimeoutException("ServiceIP:[" + this.getServiceIP() + "],Receive data timeout or error!timeout:" + socketConfig.getReceiveTimeout() + "ms,queue length:" + queueLen); } // Get Data from WindowData byte[] data = wd.getData(); int offset = SFPStruct.Version; int len = ByteConverter.bytesToIntLitterEndian(data, offset); if (len != data.length) { throw new ProtocolException("The data length inconsistent!datalen:" + data.length + ",check len:" + len); } return data; } // AutoResetEvent implementation public class AutoResetEvent { CountDownLatch cdl; public AutoResetEvent() { cdl = new CountDownLatch(1); } public AutoResetEvent(int waitCount) { cdl = new CountDownLatch(waitCount); } public void set() { cdl.countDown(); } public boolean waitOne(long time) { try { return cdl.await(time, TimeUnit.MILLISECONDS); } catch (Exception e) { throw new RuntimeException(e); } } } // Receive thread processing logic public void decode(ByteBuffer receiveBuffer, byte[] receiveArray) throws Exception { try { int limit = receiveBuffer.limit(); int num = 0; for (; num < limit; num++) { byte b = receiveArray[num]; receiveData.write(b); if (b == ProtocolConst.P_END_TAG[index]) { index++; if (index == ProtocolConst.P_END_TAG.length) { byte[] pak = receiveData.toByteArray(ProtocolConst.P_START_TAG.length, receiveData.size() - ProtocolConst.P_END_TAG.length - ProtocolConst.P_START_TAG.length); // Resolve the SessionId in the returned package int pSessionId = ByteConverter.bytesToIntLittleEndian(pak, SFPStruct.Version + SFPStruct.TotalLen); // Obtain the corresponding windowsdata according to the sessionId WindowData wd = WaitWindows.get(pSessionId); if (wd != null) { if (wd.getFlag() == 0) { // Put the returned data into WindowData wd.setData(pak); // Call countDown of CountDownlatch to end the waiting of the worker thread wd.getEvent().set(); } else if (wd.getFlag() == 1) { // asynchronous if (null != unregisterRec(pSessionId)) { wd.getReceiveHandler().notify(pak, wd.getInvokeCnxn()); } } else if (wd.getFlag() == 2) { // asynchronous logger.info("unsupport request type"); } } index = 0; receiveData.reset(); continue; } } else if (index != 0) { if (b == ProtocolConst.P_END_TAG[0]) { index = 1; } else { index = 0; } } } } catch (Exception e) { index = 0; ex.printStackTrace(); receiveData.clear() } }