1. selectors I/O Multiplex Abstraction
The selectors module provides a platform-independent abstraction layer on top of platform-specific I/O monitoring functions in select.
1.1 Operational Model
The API in selectors is event-based, similar to poll() in selectors.It has multiple implementations, and this module automatically sets the alias DefaultSelector to indicate the most efficient implementation for the current system configuration.
The selector object provides methods to specify which events to look for on a socket and then have the caller wait for events in a platform-independent manner.Registering interest in events creates a SelectorKey that contains the socket, information about registered events, and possibly optional application data.The owner of the selector calls its select() method to understand the event.The return value is a sequence of key objects and a bitmask indicating what events occurred.Programs that use selectors call select () repeatedly and handle events appropriately.
1.2 Return Server
The loopback server example given here uses the application data in Selectorkey to register a callback function to be called when a new event occurs.The main loop obtains this callback from the key and passes the socket and event mask to the callback.When the server starts, it registers the accept() function to be called when a read event occurs on the primary server socket.Accepting a connection produces a new socket and registers the read() function as a callback to the read event.
import selectors import socket mysel = selectors.DefaultSelector() keep_running = True def read(connection, mask): "Callback for read events" global keep_running client_address = connection.getpeername() print('read({})'.format(client_address)) data = connection.recv(1024) if data: # A readable client socket has data print(' received {!r}'.format(data)) connection.sendall(data) else: # Interpret empty result as closed connection print(' closing') mysel.unregister(connection) connection.close() # Tell the main loop to stop keep_running = False def accept(sock, mask): "Callback for new connections" new_connection, addr = sock.accept() print('accept({})'.format(addr)) new_connection.setblocking(False) mysel.register(new_connection, selectors.EVENT_READ, read) server_address = ('localhost', 9999) print('starting up on {} port {}'.format(*server_address)) server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setblocking(False) server.bind(server_address) server.listen(5) mysel.register(server, selectors.EVENT_READ, accept) while keep_running: print('waiting for I/O') for key, mask in mysel.select(timeout=1): callback = key.data callback(key.fileobj, mask) print('shutting down') mysel.close()
If read() does not receive any data from the socket, it interrupts reading events instead of sending data when the other end of the connection closes.The socket is then deleted from the selector and closed.Since this is only a sample program, the server shuts itself down after it has finished communicating with the only client.
1.3 Return to Customer
The following example of a callback customer handles all I/O events in the main loop instead of using callbacks.It establishes selectors to report read events on sockets and to report when sockets are ready to send data.Since it looks at two types of events, the customer must check which event occurred by looking at the mask value.When all data is sent, it modifies the selector configuration and reports only when there is data that can be read.
import selectors import socket mysel = selectors.DefaultSelector() keep_running = True outgoing = [ b'It will be repeated.', b'This is the message. ', ] bytes_sent = 0 bytes_received = 0 # Connecting is a blocking operation, so call setblocking() # after it returns. server_address = ('localhost', 9999) print('connecting to {} port {}'.format(*server_address)) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect(server_address) sock.setblocking(False) # Set up the selector to watch for when the socket is ready # to send data as well as when there is data to read. mysel.register( sock, selectors.EVENT_READ | selectors.EVENT_WRITE, ) while keep_running: print('waiting for I/O') for key, mask in mysel.select(timeout=1): connection = key.fileobj client_address = connection.getpeername() print('client({})'.format(client_address)) if mask & selectors.EVENT_READ: print(' ready to read') data = connection.recv(1024) if data: # A readable client socket has data print(' received {!r}'.format(data)) bytes_received += len(data) # Interpret empty result as closed connection, # and also close when we have received a copy # of all of the data sent. keep_running = not ( data or (bytes_received and (bytes_received == bytes_sent)) ) if mask & selectors.EVENT_WRITE: print(' ready to write') if not outgoing: # We are out of messages, so we no longer need to # write anything. Change our registration to let # us keep reading responses from the server. print(' switching to read-only') mysel.modify(sock, selectors.EVENT_READ) else: # Send the next message. next_msg = outgoing.pop() print(' sending {!r}'.format(next_msg)) sock.sendall(next_msg) bytes_sent += len(next_msg) print('shutting down') mysel.unregister(connection) connection.close() mysel.close()
This customer tracks not only the amount of data it sends out, but also the amount of data it receives.When these values are consistent and non-zero, the customer exits the processing cycle and closes properly, removing the socket from the selector and closing the socket and selector.
1.4 Servers and Customers
To run clients and servers in different terminal windows so that they can communicate with each other.Server output shows inbound connections and data, as well as responses sent back to customers.
Customer input shows the message sent and the response from the server.