[Hard Python] [Chapter 2 - asynchronous IO] 2. Execution of asynchronous tasks in event loop

Continue First words After the event loop is created, how to run the concurrent task and asynchronous IO task?

By asyncio The code of run shows that loop run_ until_ Complete is the method of running a collaborative process. It is defined as follows:

# base_events.py
class BaseEventLoop(events.AbstractEventLoop):
    def run_until_complete(self, future):
        self._check_closed()
        self._check_running()
        new_task = not futures.isfuture(future)
        future = tasks.ensure_future(future, loop=self)
        if new_task:
            future._log_destroy_pending = False
        future.add_done_callback(_run_until_complete_cb)
        try:
            self.run_forever()
        except:
            if new_task and future.done() and not future.cancelled():
                future.exception()
            raise
        finally:
            future.remove_done_callback(_run_until_complete_cb)
        if not future.done():
            raise RuntimeError('Event loop stopped before Future completed.')
        return future.result()
    

# tasks.py
def ensure_future(coro_or_future, *, loop=None):
    return _ensure_future(coro_or_future, loop=loop)


def _ensure_future(coro_or_future, *, loop=None):
    if futures.isfuture(coro_or_future):
        if loop is not None and loop is not futures._get_loop(coro_or_future):
            raise ValueError('The future belongs to a different loop than '
                            'the one specified as the loop argument')
        return coro_or_future

    if not coroutines.iscoroutine(coro_or_future):
        if inspect.isawaitable(coro_or_future):
            coro_or_future = _wrap_awaitable(coro_or_future)
        else:
            raise TypeError('An asyncio.Future, a coroutine or an awaitable '
                            'is required')

    if loop is None:
        loop = events._get_event_loop(stacklevel=4)
    return loop.create_task(coro_or_future)

run_ until_ The coroutine passed in by the complete method will pass through tasks ensure_ The future method is encapsulated into a task instance. As can be seen from the above code, it is finally implemented in loop create_ Task method.

# base_events.py
class BaseEventLoop(events.AbstractEventLoop):
    def create_task(self, coro, *, name=None):
        self._check_closed()
        if self._task_factory is None:
            task = tasks.Task(coro, loop=self, name=name)
            if task._source_traceback:
                del task._source_traceback[-1]
        else:
            task = self._task_factory(self, coro)
            tasks._set_task_name(task, name)

        return task
    

# task.py
class Task(futures._PyFuture):
    def __init__(self, coro, *, loop=None, name=None):
        super().__init__(loop=loop)
        if self._source_traceback:
            del self._source_traceback[-1]
        if not coroutines.iscoroutine(coro):
            self._log_destroy_pending = False
            raise TypeError(f"a coroutine was expected, got {coro!r}")

        if name is None:
            self._name = f'Task-{_task_name_counter()}'
        else:
            self._name = str(name)

        self._must_cancel = False
        self._fut_waiter = None
        self._coro = coro
        self._context = contextvars.copy_context()

        self._loop.call_soon(self.__step, context=self._context)
        _register_task(self)


# base_events.py
class BaseEventLoop(events.AbstractEventLoop):
    def call_soon(self, callback, *args, context=None):
        self._check_closed()
        if self._debug:
            self._check_thread()
            self._check_callback(callback, 'call_soon')
        handle = self._call_soon(callback, args, context)
        if handle._source_traceback:
            del handle._source_traceback[-1]
        return handle
    
    def _call_soon(self, callback, args, context):
        handle = events.Handle(callback, args, self, context)
        if handle._source_traceback:
            del handle._source_traceback[-1]
        self._ready.append(handle)
        return handle

loop. create_ The Task method will eventually generate a Task instance. The call instance of call and a series of other call instances encapsulate the_ Soon method, passed in the instance__ step function. call_ The function passed in by the soon method will be passed through events Handle encapsulation generates a handle instance and adds it to the of the event loop_ In the ready queue.

__ The step method will pass Coro Send (none) or Coro The throw (exc) method starts the collaboration within the Task instance and obtains the return result of the collaboration. For general collaboration, Coro Send (none) will throw a StopIteration exception directly and attach the return value of the process to the exception result. Of course, there are other situations (for example, await creates an Awaitable instance that yield s multiple times) that may require multiple calls_ Son collaboration Task__ Step function. For related examples, you can view the function of stackoverflow This article.

After that, we go back to run_until_complete method, in ensure_ After the future, call loop run_ The forever method starts the event loop.

# windows_events.py
class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
    def run_forever(self):
        try:
            assert self._self_reading_future is None
            self.call_soon(self._loop_self_reading)
            super().run_forever()
        finally:
            if self._self_reading_future is not None:
                ov = self._self_reading_future._ov
                self._self_reading_future.cancel()
                if ov is not None:
                    self._proactor._unregister(ov)
                self._self_reading_future = None
                
                
# proactor_events.py
class BaseProactorEventLoop(base_events.BaseEventLoop):
    def _loop_self_reading(self, f=None):
        try:
            if f is not None:
                f.result()  # may raise
            if self._self_reading_future is not f:
                return
            f = self._proactor.recv(self._ssock, 4096)
        except exceptions.CancelledError:
            return
        except (SystemExit, KeyboardInterrupt):
            raise
        except BaseException as exc:
            self.call_exception_handler({
                'message': 'Error on reading from the event loop self pipe',
                'exception': exc,
                'loop': self,
            })
        else:
            self._self_reading_future = f
            f.add_done_callback(self._loop_self_reading)


# base_events.py
class BaseEventLoop(events.AbstractEventLoop):
    def run_forever(self):
        self._check_closed()
        self._check_running()
        self._set_coroutine_origin_tracking(self._debug)
        self._thread_id = threading.get_ident()

        old_agen_hooks = sys.get_asyncgen_hooks()
        sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
                               finalizer=self._asyncgen_finalizer_hook)
        try:
            events._set_running_loop(self)
            while True:
                self._run_once()
                if self._stopping:
                    break
        finally:
            self._stopping = False
            self._thread_id = None
            events._set_running_loop(None)
            self._set_coroutine_origin_tracking(False)
            sys.set_asyncgen_hooks(*old_agen_hooks)
            
    def _run_once(self):
        sched_count = len(self._scheduled)
        if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
            self._timer_cancelled_count / sched_count >
                _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
            new_scheduled = []
            for handle in self._scheduled:
                if handle._cancelled:
                    handle._scheduled = False
                else:
                    new_scheduled.append(handle)

            heapq.heapify(new_scheduled)
            self._scheduled = new_scheduled
            self._timer_cancelled_count = 0
        else:
            while self._scheduled and self._scheduled[0]._cancelled:
                self._timer_cancelled_count -= 1
                handle = heapq.heappop(self._scheduled)
                handle._scheduled = False

        timeout = None
        if self._ready or self._stopping:
            timeout = 0
        elif self._scheduled:
            # Compute the desired timeout.
            when = self._scheduled[0]._when
            timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)

        event_list = self._selector.select(timeout)
        self._process_events(event_list)

        end_time = self.time() + self._clock_resolution
        while self._scheduled:
            handle = self._scheduled[0]
            if handle._when >= end_time:
                break
            handle = heapq.heappop(self._scheduled)
            handle._scheduled = False
            self._ready.append(handle)

        ntodo = len(self._ready)
        for i in range(ntodo):
            handle = self._ready.popleft()
            if handle._cancelled:
                continue
            if self._debug:
                # debug mode code, plus time statistics
            else:
                handle._run()
        handle = None  # Needed to break cycles when an exception occurs.

ProactorEventLoop is calling run_ Call will be used first when you use "forever"_ The soon method will_ loop_ self_ The reading method adds scheduling_ loop_ self_ The reading method will read the future in the Proctor and add itself to the future completion callback to continuously read the future instance.

After that, ProactorEventLoop calls the run of BaseEventLoop_ The forever method, in which it will be executed continuously_ run_ The once method iterates over and over the event loop. Round_ run_once will do the following:

  • Clean up_ Cancelled scheduled tasks in scheduled
  • select the event list and process it
  • From_ scheduled takes out the task at that time and adds it to_ In the ready list
    • From the above logic, call_soon's mission will also be added to_ In the ready list
  • From_ Take out all handle s in order from the ready list and call_ Run method run

Through this mechanism, the event cycle can continuously run the task.

By the above_ run_ According to the definition of once, IOCP will appear in the select event list step. This is because the selector of base proactor eventloop is the Proctor, and the actual incoming is the IOCP instance. Therefore, the select method of IOCP instance is called in the end. Only in this step can I handle some IO operations.

So the question is, how does asyncio schedule IO operations? Let's first look at iocpproactor Implementation of select:

# windows_events.py
class IocpProactor:
    def select(self, timeout=None):
        if not self._results:
            self._poll(timeout)
        tmp = self._results
        self._results = []
        return tmp
    
    def _poll(self, timeout=None):
        if timeout is None:
            ms = INFINITE
        elif timeout < 0:
            raise ValueError("negative timeout")
        else:
            ms = math.ceil(timeout * 1e3)
            if ms >= INFINITE:
                raise ValueError("timeout too big")

        while True:
            status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
            if status is None:
                break
            ms = 0

            err, transferred, key, address = status
            try:
                f, ov, obj, callback = self._cache.pop(address)
            except KeyError:
                if self._loop.get_debug():
                    self._loop.call_exception_handler({
                        'message': ('GetQueuedCompletionStatus() returned an '
                                    'unexpected event'),
                        'status': ('err=%s transferred=%s key=%#x address=%#x'
                                   % (err, transferred, key, address)),
                    })
                if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
                    _winapi.CloseHandle(key)
                continue

            if obj in self._stopped_serving:
                f.cancel()
            elif not f.done():
                try:
                    value = callback(transferred, key, ov)
                except OSError as e:
                    f.set_exception(e)
                    self._results.append(f)
                else:
                    f.set_result(value)
                    self._results.append(f)

        # Remove unregistered futures
        for ov in self._unregistered:
            self._cache.pop(ov.address, None)
        self._unregistered.clear()

At iocpproactor_ In poll, GetQueuedCompletionStatus will be called to query the results of the completion port. Until the result appears, the cached callback will be pop ped out and executed according to the address data cached in the result.

By analyzing an example of IO operation, we can observe the specific mystery:

from multiprocessing import Process
import asyncio
import time


HOST, PORT = '127.0.0.1', 31077


async def _svr_handler(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
    data = await reader.read(1024)
    msg = data.decode()
    print(f'[Server] recv: {msg}')

    msg_back = ''.join([msg[i] for i in range(len(msg) - 1, 0, -1)])
    print(f'[Server] send: {msg_back}')
    writer.write(msg_back.encode())
    await writer.drain()

    writer.close()


async def _svr_task():
    svr = await asyncio.start_server(_svr_handler, host=HOST, port=PORT)
    async with svr:
        await svr.serve_forever()


def _svr():
    asyncio.run(_svr_task())


async def _test_cli(msg: str):
    reader, writer = await asyncio.open_connection(HOST, PORT)

    print(f'[Client] send: {msg}')
    writer.write(msg.encode())
    await writer.drain()

    data = await reader.read(1024)
    print(f'[Client] recv: {data.decode()}')

    writer.close()
    await writer.wait_closed()


def test_cli():
    p = Process(target=_svr, daemon=True)
    p.start()
    time.sleep(0.5)
    _msg = 'helloworld'
    asyncio.run(_test_cli(_msg))
    p.kill()


if __name__ == '__main__':
    test_cli()

This is a very simple implementation of echo server. The client sends the information to the server, and the server returns the reverse of the information. We use the client's write operation writer Take write as an example to see how IO events are processed in the event loop.

First, open_ The connection function creates a connection to a specific host and port, and returns the reader and writer of the connection flow.

async def open_connection(host=None, port=None, *,
                          limit=_DEFAULT_LIMIT, **kwds):
    loop = events.get_running_loop()
    reader = StreamReader(limit=limit, loop=loop)
    protocol = StreamReaderProtocol(reader, loop=loop)
    transport, _ = await loop.create_connection(
        lambda: protocol, host, port, **kwds)
    writer = StreamWriter(transport, protocol, reader, loop)
    return reader, writer

For the reader, first initialize a StreamReader instance, and then further encapsulate the reader with StreamReaderProtocol.

For the writer, the first step is through the create of loop_ The connection method creates a transport instance for this connection, which is equivalent to the encapsulation of a communication pipeline. The transport instance is bound to the previously created StreamReaderProtocol instance. Then, bind the created transport instance to the writer.

In ProactorEventLoop, the transport instance is created as follows:

# proactor_events.py
class BaseProactorEventLoop(base_events.BaseEventLoop):
    return _ProactorSocketTransport(self, sock, protocol, waiter,
                                        extra, server)


class _ProactorSocketTransport(_ProactorReadPipeTransport,
                               _ProactorBaseWritePipeTransport,
                               transports.Transport):

    def __init__(self, loop, sock, protocol, waiter=None,
                 extra=None, server=None):
        super().__init__(loop, sock, protocol, waiter, extra, server)
        base_events._set_nodelay(sock)

_ The ProactorSocketTransport instance will_ Proctorreadpipetransport and_ ProactorBaseWritePipeTransport is initialized, so it will provide the function of reading and writing to the pipeline. Its inheritance chain is as follows:

(<class 'asyncio.proactor_events._ProactorSocketTransport'>,
 <class 'asyncio.proactor_events._ProactorReadPipeTransport'>,
 <class 'asyncio.proactor_events._ProactorBaseWritePipeTransport'>,
 <class 'asyncio.proactor_events._ProactorBasePipeTransport'>,
 <class 'asyncio.transports._FlowControlMixin'>,
 <class 'asyncio.transports.Transport'>,
 <class 'asyncio.transports.ReadTransport'>,
 <class 'asyncio.transports.WriteTransport'>,
 <class 'asyncio.transports.BaseTransport'>,
 <class 'object'>)

Then, when the client starts writing, call writer When writing, the following operations are performed:

# proactor_events.py
class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
                                      transports.WriteTransport):
    def write(self, data):
        # Omit some judgment logic
        # Observable states:
        # 1. IDLE: _write_fut and _buffer both None
        # 2. WRITING: _write_fut set; _buffer None
        # 3. BACKED UP: _write_fut set; _buffer a bytearray
        # We always copy the data, so the caller can't modify it
        # while we're still waiting for the I/O to happen.
        if self._write_fut is None:  # IDLE -> WRITING
            assert self._buffer is None
            # Pass a copy, except if it's already immutable.
            self._loop_writing(data=bytes(data))
        elif not self._buffer:  # WRITING -> BACKED UP
            # Make a mutable copy which we can extend.
            self._buffer = bytearray(data)
            self._maybe_pause_protocol()
        else:  # BACKED UP
            # Append to buffer (also copies).
            self._buffer.extend(data)
            self._maybe_pause_protocol()
            
    def _loop_writing(self, f=None, data=None):
        try:
            if f is not None and self._write_fut is None and self._closing:
                return
            assert f is self._write_fut
            self._write_fut = None
            self._pending_write = 0
            if f:
                f.result()
            if data is None:
                data = self._buffer
                self._buffer = None
            if not data:
                if self._closing:
                    self._loop.call_soon(self._call_connection_lost, None)
                if self._eof_written:
                    self._sock.shutdown(socket.SHUT_WR)
                self._maybe_resume_protocol()
            else:
                self._write_fut = self._loop._proactor.send(self._sock, data)
                if not self._write_fut.done():
                    assert self._pending_write == 0
                    self._pending_write = len(data)
                    self._write_fut.add_done_callback(self._loop_writing)
                    self._maybe_pause_protocol()
                else:
                    self._write_fut.add_done_callback(self._loop_writing)
            if self._empty_waiter is not None and self._write_fut is None:
                self._empty_waiter.set_result(None)
        except ConnectionResetError as exc:
            self._force_close(exc)
        except OSError as exc:
            self._fatal_error(exc, 'Fatal write error on pipe transport')

Write for the first time_ Future and buffer are empty, so trigger_ loop_writing logic. In_ loop_ In writing, the self._ is called. loop._ proactor. Send (self. _, sock, data) generates the future of a write operation. And_ Proctor, that is, the IocpProactor instance in proctoreventloop.

# windows_events.py
class IocpProactor:
    def send(self, conn, buf, flags=0):
        self._register_with_iocp(conn)
        ov = _overlapped.Overlapped(NULL)
        if isinstance(conn, socket.socket):
            ov.WSASend(conn.fileno(), buf, flags)
        else:
            ov.WriteFile(conn.fileno(), buf)

        def finish_send(trans, key, ov):
            try:
                return ov.getresult()
            except OSError as exc:
                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
                                    _overlapped.ERROR_OPERATION_ABORTED):
                    raise ConnectionResetError(*exc.args)
                else:
                    raise

        return self._register(ov, conn, finish_send)
    
    def _register_with_iocp(self, obj):
        if obj not in self._registered:
            self._registered.add(obj)
            _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
            
    def _register(self, ov, obj, callback):
        self._check_closed()

        f = _OverlappedFuture(ov, loop=self._loop)
        if f._source_traceback:
            del f._source_traceback[-1]
        if not ov.pending:
            try:
                value = callback(None, None, ov)
            except OSError as e:
                f.set_exception(e)
            else:
                f.set_result(value)

        self._cache[ov.address] = (f, ov, obj, callback)
        return f

In the send method, the following operations are performed:

  • Register the socket to the completion port through CreateIoCompletionPort
  • Create an Overlapped instance and send data to the socket through WSASend
  • Create a future associated with the Overlapped instance, and judge whether the Overlapped instance is not in the pending state, and directly execute the callback. After that, cache the future instance to_ In cache.


As mentioned earlier, when the event loop is executed, the IocpProactor instance will call_ poll method, in which GetQueuedCompletionStatus will be used to query the IO operation completion results. If an IO operation is found to be completed, ov. Is extracted from the operation Address and in_ pop callback in cache and execute. In this way, add the event loop through the IOCP model (the essence of the event loop is the worker in the IOCP), and the writer The write operation is strung from start to finish.

Then await writer Drain, in essence, does the following operations:

# streams.py
class StreamWriter:
    async def drain(self):
        await self._protocol._drain_helper()
        

class FlowControlMixin(protocols.Protocol):  # This will be inherited by StreamReaderProtocol
    async def _drain_helper(self):
        if self._connection_lost:
            raise ConnectionResetError('Connection lost')
        if not self._paused:
            return
        waiter = self._drain_waiter
        assert waiter is None or waiter.cancelled()
        waiter = self._loop.create_future()
        self._drain_waiter = waiter
        await waiter

writer.drain is essentially await the instance of StreamReaderProtocol_ drain_helper process, in which some pre checks are done, and then one is set according to the current event cycle_ drain_ The future instance of waiter and await. Why do you do this?

First, we can observe that in_ run_ In the logic of once, if_ The ready queue has tasks or_ scheduled tasks, then iocpproactor_ The GetQueuedCompletionStatus in the poll will have a timeout, otherwise the timeout corresponding to GetQueuedCompletionStatus is INFINITE and will be blocked until an IO event is completed. Interested students can create a collaborative task, create_ After the future await, you'll know as soon as you try.

Then, go back to_ ProactorBaseWritePipeTransport_ loop_writing method_ write_ After fut is created, it will be added directly_ loop_writing calls back for its own completion. When the IocpProactor instance obtains a completion event from GetQueuedCompletionStatus, it will be taken out for execution GetResult () (in finish_send of the send method) to get the result, and the result will be put into the_ write_fut as its final return result. At this time_ write_ Due to the completion of fut, it will call its own callback_ loop_writing, but at this time, because there is no data in the buffer, it will go to_ maybe_resume_protocol

# transports.py
class _FlowControlMixin(Transport):
    def _maybe_resume_protocol(self):
        if (self._protocol_paused and
                self.get_write_buffer_size() <= self._low_water):
            self._protocol_paused = False
            try:
                self._protocol.resume_writing()
            except (SystemExit, KeyboardInterrupt):
                raise
            except BaseException as exc:
                self._loop.call_exception_handler({
                    'message': 'protocol.resume_writing() failed',
                    'exception': exc,
                    'transport': self,
                    'protocol': self._protocol,
                })
                

# streams.py
class FlowControlMixin(protocols.Protocol):
    def resume_writing(self):
        assert self._paused
        self._paused = False
        if self._loop.get_debug():
            logger.debug("%r resumes writing", self)

        waiter = self._drain_waiter
        if waiter is not None:
            self._drain_waiter = None
            if not waiter.done():
                waiter.set_result(None)

In writer In drain, we have actually been await all the time_ drain_waiter. In call_ maybe_ resume_ After protocol, you actually go to the resume of the StreamReaderProtocol instance_ The writing method is defined in the FlowControlMixin class. This method performs two operations:

  • Will_ The paused status is set to False
    • _ loop_ In writing, if the data is not sent out, it will go to another location_ maybe_pause_protocol, this state will be set to true. Await writer. Is called Drain, you'll just walk to await_ drain_ waiter
  • Will_ drain_ The waiter setting is complete. So, await writer Drain can finish it

The write operation of the client is scheduled in the event loop in the above complex way. In general, the steps are as follows:

  • The user calls writer Write transfers data into the write buffer of transport
  • transport_ loop_writing finds data in the buffer and creates a write future
    • Bind socket and completion port through CreateIoCompletionPort
    • Send data via WSASend
    • Set a callback to get the result of sending data
    • Write the result to the future
    • Write future preset_ loop_writing is to complete the callback and execute the next round after obtaining the result_ loop_writing
  • The user calls await writer drain
    • After the write future is created, it is found that the write future is not in the completion state. Call first_ maybe_pause_protocol setting_ paused is True
    • In writer Judge the protocol in drain as_ paused, reset_ drain_waiter is a new instance and await
    • When the write operation is completed, the callback for writing future is triggered_ loop_writing. Next round_ loop_writing finds that no data is sent. Call_ maybe_resume_protocol, setting the of the protocol_ paused is False and set_ drain_waiter is complete
    • _ drain_ When the waiter is finished, exit await writer drain

For read operations and other IO operations, interested partners can conduct in-depth research_

Keywords: Python asyncio

Added by ashrust on Sun, 20 Feb 2022 15:45:08 +0200