nameko pit avoidance note: avoid frequent instantiation of ClusterRpcProxy

See an article Use and precautions of nameko , there is a paragraph about initiating a remote call

rpc is followed by the value of the class variable name in the microservice definition, that is, the microservice name, followed by the rpc method, using call_async is called asynchronously, and result is called_ async. result() will wait for the asynchronous task to return the result. It should be noted that when running ClusterRpcProxy(config), a connection to the queue will be created, which is time-consuming. If there are a large number of micro service calls, the connection should not be created repeatedly, and all calls should be completed in the statement block. The result of an asynchronous call can only be obtained within a statement block, that is, the call result() waits for the result. If the connection outside the statement block is disconnected, it cannot be obtained.

The general meaning is to frequently instantiate ClusterRpcProxy. Combined with the strange phenomenon I saw in the background of RabbitMQ, I feel it necessary to study it:

The following is a common way to call nameko

api.py

from fastapi import FastAPI
from rpc import (
    upload_service_rpc
)

app = FastAPI()

@app.get('/')
async def root():
    return {"message": "Hello World"}

@app.post('/upload/')
def upload(data: UploadRequestBody):
    logger.debug(data.json(ensure_ascii=False))

    success: bool = upload_service_rpc(data)  # rpc call initiated here
    return {
        'status': success
    }

upload_ service_ The RPC method is a layer of function packaging for the ClusterRpcProxy provided by nameko

rpc.py

from nameko.standalone.rpc import ClusterRpcProxy
import settings
from schemas import (
    UploadRequestBody,
)
from loguru import logger

config = {
    'AMQP_URI': f'amqp://{settings.AMQP_URI.RABBIT_USER}:'
                f'{settings.AMQP_URI.RABBIT_PASSWORD}@{settings.AMQP_URI.RABBIT_HOST}:'
                f'{settings.AMQP_URI.RABBIT_PORT}/{settings.AMQP_URI.RABBIT_VHOST}'
}

def upload_service_rpc(data: UploadRequestBody) -> bool:
    """ to fatapi exposed rpc Interface """
    with ClusterRpcProxy(config) as cluster_rpc:   # Initiate RPC request through ClusterRpcProxy
        success: bool = cluster_rpc.console_service.upload(
            data=data.json(ensure_ascii=False)
        )
        return success

However, the above description looks perfect, but the implementation of nameko is that every time we instantiate ClusterRpcProxy, a new queue will be created in RabbitMQ. If we instantiate ClusterRpcProxy as frequently as the above code every rpc request, we will spend a lot of time creating a queue.

The following figure is a screenshot of the RabbmitMQ background management interface. You can see that a large number of RPCs will appear when multiple requests are initiated reply-standalone_ rpc_ proxy_ queue in {routing_key} format

These RPC reply-standalone_ rpc_ proxy_ The {routing_key} queue will be closed a few seconds after there is no message and will not exist forever

Next, modify the code:

api.py

import settings
from loguru import logger
from fastapi import FastAPI
from schemas import (
    UploadRequestBody
)
from rpc import (
    init_rpc_proxy
)

app = FastAPI()


rpc_proxy = init_rpc_proxy()    #  RPC_ The proxy object becomes a global variable, and its life cycle accompanies the whole program


@app.post('/upload/')
def upload(data: UploadRequestBody):
    logger.debug(data.json(ensure_ascii=False))

    success: bool = rpc_proxy.console_service.upload(  # Execute rpc call console_service upload method
        data=data.json(ensure_ascii=False)
    )

    return {
        'status': success
    }


rpc.py

# coding=utf-8

from nameko.standalone.rpc import ClusterRpcProxy
import settings
from schemas import (
    UploadRequestBody,
)
from loguru import logger

config = {
    'AMQP_URI': f'amqp://{settings.AMQP_URI.RABBIT_USER}:'
                f'{settings.AMQP_URI.RABBIT_PASSWORD}@{settings.AMQP_URI.RABBIT_HOST}:'
                f'{settings.AMQP_URI.RABBIT_PORT}/{settings.AMQP_URI.RABBIT_VHOST}'
}


def init_rpc_proxy():
    return ClusterRpcProxy(config) # init_rpc_proxy is only responsible for returning objects and does not execute code

But when we execute the new code above, we report an error

AttributeError: 'ClusterRpcProxy' object has no attribute 'console_service'

Why? The reason lies in the of the ClusterRpcProxy class__ enter__ Method, but it will not be executed when we do not use the with context manager__ enter__ The content of the method, and the secret is__ enter__ In the method, let's take a look__ enter__ What are the methods!

nameko/standalone/rpc.py

class StandaloneProxyBase(object):   # StandaloneProxyBase is the parent class of ClusterRpcProxy
    class ServiceContainer(object):
        """ Implements a minimum interface of the
        :class:`~containers.ServiceContainer` to be used by the subclasses
        and rpc imports in this module.
        """
        service_name = "standalone_rpc_proxy"

        def __init__(self, config):
            self.config = config
            self.shared_extensions = {}

    class Dummy(Entrypoint):
        method_name = "call"

    _proxy = None

    def __init__(
        self, config, context_data=None, timeout=None,
        reply_listener_cls=SingleThreadedReplyListener
    ):
        container = self.ServiceContainer(config)

        self._worker_ctx = WorkerContext(
            container, service=None, entrypoint=self.Dummy,
            data=context_data)
        self._reply_listener = reply_listener_cls(
            timeout=timeout).bind(container)

    def __enter__(self):
        return self.start()

    def __exit__(self, tpe, value, traceback):
        self.stop()

    def start(self):
        self._reply_listener.setup()
        return self._proxy

    def stop(self):
        self._reply_listener.stop()

class ClusterRpcProxy(StandaloneProxyBase):
    def __init__(self, *args, **kwargs):
        super(ClusterRpcProxy, self).__init__(*args, **kwargs)
        self._proxy = ClusterProxy(self._worker_ctx, self._reply_listener)

StandaloneProxyBase is the parent class of ClusterRpcProxy. You can see__ enter__ Method executes return self Start(), the start method returns return self_ Proxy instead of the common return self, so this leads to our previous error.

Knowing the cause of the problem, it will change quickly!

api.py

import settings
from loguru import logger
from fastapi import FastAPI
from schemas import (
    UploadRequestBody
)
from rpc import (
    init_rpc_proxy
)

app = FastAPI()


_rpc_proxy = init_rpc_proxy()  # Distinguish between two_ rpc_proxy and rpc_proxy
rpc_proxy = _rpc_proxy.start()


@app.post('/upload/')
def upload(data: UploadRequestBody):
    logger.debug(data.json(ensure_ascii=False))

    # success: bool = upload_service_rpc2(data)
    success: bool = rpc_proxy.console_service.upload( # Using rpc_proxy calls rpc method
        data=data.json(ensure_ascii=False)
    )

    return {
        'status': success
    }

rpc.py

# coding=utf-8

from nameko.standalone.rpc import ClusterRpcProxy
import settings
from schemas import (
    UploadRequestBody,
)
from loguru import logger

config = {
    'AMQP_URI': f'amqp://{settings.AMQP_URI.RABBIT_USER}:'
                f'{settings.AMQP_URI.RABBIT_PASSWORD}@{settings.AMQP_URI.RABBIT_HOST}:'
                f'{settings.AMQP_URI.RABBIT_PORT}/{settings.AMQP_URI.RABBIT_VHOST}'
}


def init_rpc_proxy():
    return ClusterRpcProxy(config)

OK, let's look at the speed difference before and after:

Test code:

import requests

data = {
    # This part is hidden
}

for i in range(20):
    response = requests.post('http://localhost:63000/upload/', json=data)
    print(response.status_code, response.text)

Cycle 20 times:

Before modification:

─➤  time python test_api.py
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
python test_api.py  0.14s user 0.05s system 1% cpu 14.696 total

After modification:

─➤  time python test_api.py
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
200 {"status":true}
python test_api.py  0.14s user 0.05s system 2% cpu 7.271 total

Because it avoids creating a queue for each RPC request, the speed is greatly improved.

14 seconds compared with 7 seconds to double the speed!

Keywords: Python Docker Microservices Cloud Native

Added by lavender on Wed, 05 Jan 2022 15:42:16 +0200