Command execution in K8s is jointly completed by apiserver, kubelet, cri, docker and other components. The most complex is protocol switching and various stream copying. Let's take a look at the key implementation. Although there are many codes, they should be able to understand without development. Good luck
1. Basic concepts
There are many protocol related processing in the command execution of K8s. Let's take a look at the basic concepts related to these protocol processing
1.1 Connection and Upgrade in HTTP protocol
In HTTP/1.1, it is allowed to realize protocol conversion through Connection in Header header and Upgrade in the same link. In short, it is allowed to use other protocols to communicate over the link established through HTTP, which is also the key to realize protocol Upgrade in k8s command
1.2 101 status code in HTTP protocol
In addition to our common HTTP 1.1, the HTTP protocol also supports protocols such as websocket/spdy. How can the server and the client switch over different protocols over HTTP? The first element is the 101 (switching protocol) status code here, that is, the server informs the client that we switch to the protocol defined by upgrade for communication (reusing the current link)
1.3 stream in spdy protocol
SPDY protocol is a TCP session layer protocol developed by google, In SPDY protocol, Http Request/Response is called stream, and it supports TCP link reuse. At the same time, multiple streams are marked by stream ID. in short, it supports multiple request response processing in a single link at the same time, and they do not affect each other. The command execution in k8s mainly uses stream for message transmission
1.4 file descriptor redirection
In Linux, the process execution usually includes three FDS: standard input, standard output and standard error. The command execution in k8s will redirect the corresponding FD to obtain the output of the container's command. Where is the redirection? Of course, the stream we mentioned above (because I am not familiar with docker, this place does not guarantee the accuracy of docker)
1.5 Hijacker in http
After the conversion based on the current link is completed between the client and the server through 101 status code, connection, upragde, etc, The data transmitted on the current link is no longer the previous http1.1 protocol. At this time, the corresponding http link will be converted to the corresponding protocol for conversion. During the execution of k8s command, the corresponding request and response will be obtained through the http Hijacker interface to obtain the tcp link at the bottom layer, so as to continue to complete the request forwarding
1.6 forwarding of stream copy based on tcp
After getting the two bottom-level tcp readerwriter s through Hijacker, you can copy the corresponding data on the two streams directly through io.copy, so that you don't need to convert the protocol in apiserver, but you can forward the request and result directly through tcp stream copying
This is the basic introduction. Let's take a look at the specific implementation of the underlying layer. Let's start from the kubectl part and analyze it layer by layer
2.kubectl
Kubectl execution command is mainly divided into two parts: pod validity detection and command execution. Pod validity detection is mainly to obtain the status of the corresponding pod and check whether it is running. Here we focus on the command execution part
2.1 core process of command execution
The core of command execution is divided into two steps: 1. Establish a link through SPDY protocol; 2) build a Stream to establish a link
func (*DefaultRemoteExecutor) Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error { exec, err := remotecommand.NewSPDYExecutor(config, method, url) if err != nil { return err } return exec.Stream(remotecommand.StreamOptions{ Stdin: stdin, Stdout: stdout, Stderr: stderr, Tty: tty, TerminalSizeQueue: terminalSizeQueue, }) }
2.2 exec request building
We can see that the Url /pods/{namespace}/{podName}/exec spliced here is actually the sub resource interface corresponding to the pod on the apserver, and then we can see the request processing on the apserver side
// Create an exec req := restClient.Post(). Resource("pods"). Name(pod.Name). Namespace(pod.Namespace). SubResource("exec") req.VersionedParams(&corev1.PodExecOptions{ Container: containerName, Command: p.Command, Stdin: p.Stdin, Stdout: p.Out != nil, Stderr: p.ErrOut != nil, TTY: t.Raw, }, scheme.ParameterCodec) return p.Executor.Execute("POST", req.URL(), p.Config, p.In, p.Out, p.ErrOut, t.Raw, sizeQueue)
2.3 create Stream
In exec.Stream, the main function is to pass the type of Stream to be created through Headers and negotiate with the server
// set up stdin stream if p.Stdin != nil { headers.Set(v1.StreamType, v1.StreamTypeStdin) p.remoteStdin, err = conn.CreateStream(headers) if err != nil { return err } } // set up stdout stream if p.Stdout != nil { headers.Set(v1.StreamType, v1.StreamTypeStdout) p.remoteStdout, err = conn.CreateStream(headers) if err != nil { return err } } // set up stderr stream if p.Stderr != nil && !p.Tty { headers.Set(v1.StreamType, v1.StreamTypeStderr) p.remoteStderr, err = conn.CreateStream(headers) if err != nil { return err } }
3.APIServer
APIServer acts as an agent in the process of command execution. It is responsible for forwarding the request between kubectl and kubelet. Note that this forwarding is mainly based on tcp stream copying. Because the communication between kubectl and kubelet is actually spdy protocol. Let's look at the key implementation
3.1 Connection
Exec's SPDY request will be sent to the Connect interface first. The Connection interface is responsible for establishing the link with the back-end kubelet and returning the response results. In the Connection interface, the corresponding Node information will be obtained through the Pod first, and the Location is the link address and transport of the back-end kubelet
func (r *ExecREST) Connect(ctx context.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) { execOpts, ok := opts.(*api.PodExecOptions) if !ok { return nil, fmt.Errorf("invalid options object: %#v", opts) } // Return the corresponding address and establish the link location, transport, err := pod.ExecLocation(r.Store, r.KubeletConn, ctx, name, execOpts) if err != nil { return nil, err } return newThrottledUpgradeAwareProxyHandler(location, transport, false, true, true, responder), nil }
3.2 get back-end service address
In order to obtain the address, it is mainly to build the location information of the backend. Here, the host and Port information of the corresponding node will be obtained through the information reported by kubelet, and the final point Path of the pod is assembled, that is, the Path field / exec/{namespace}/{podName}/{containerName}
loc := &url.URL{ Scheme: nodeInfo.Scheme, Host: net.JoinHostPort(nodeInfo.Hostname, nodeInfo.Port), // Port of node Path: fmt.Sprintf("/%s/%s/%s/%s", path, pod.Namespace, pod.Name, container), // Route RawQuery: params.Encode(), }
3.3 initialization of protocol promotion handler
Protocol promotion is mainly implemented by the UpgradeAwareHandler controller. After receiving the request, the handler will first try to improve the protocol, which is mainly to check whether the Connection value in the http header is Upragde. From the previous analysis of kubelet, we can know that it must be true here
func newThrottledUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired, interceptRedirects bool, responder rest.Responder) *proxy.UpgradeAwareHandler { handler := proxy.NewUpgradeAwareHandler(location, transport, wrapTransport, upgradeRequired, proxy.NewErrorResponder(responder)) handler.InterceptRedirects = interceptRedirects && utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StreamingProxyRedirects) handler.RequireSameHostRedirects = utilfeature.DefaultFeatureGate.Enabled(genericfeatures.ValidateProxyRedirects) handler.MaxBytesPerSec = capabilities.Get().PerConnectionBandwidthLimitBytesPerSec return handler } func (h *UpgradeAwareHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { // If the promotion of the agreement is successful, it will be completed by the agreement if h.tryUpgrade(w, req) { return } // Omit N multiple codes }
3.4 protocol promotion
There are a lot of logic in protocol promotion processing. It is described in several sections in turn. It is mainly to obtain the request from the HTTP link and forward it, then hold two links at the same time, and copy the TCP stream on the link
3.4.1 link with kubelet
The first step of protocol promotion is to establish a link with the kubelet at the back end. Here, the request sent by the kubelet will be copied and sent to the kubelet at the back end, and a link with the http established by the kubelet will also be obtained here, which will be used later in stream copying, Note that in fact, the status code of the http request response is 101, that is, kubelet actually constructs a handler of spdy protocol to communicate
// Build http request req, err := http.NewRequest(method, location.String(), body) if err != nil { return nil, nil, err } req.Header = header // Send request to establish link intermediateConn, err = dialer.Dial(req) if err != nil { return nil, nil, err } // Peek at the backend response. rawResponse.Reset() respReader := bufio.NewReader(io.TeeReader( io.LimitReader(intermediateConn, maxResponseSize), // Don't read more than maxResponseSize bytes. rawResponse)) // Save the raw response. // Read response information resp, err := http.ReadResponse(respReader, nil)
3.4.2 Hijack of request request
In fact, this request is based on the spdy protocol. After getting the link to the underlying layer through Hijack, you need to forward the above request to kubelet first to trigger kubelet to send the following Stream request to establish the link. Here, Write forwards the result of kubelet
requestHijackedConn, _, err := requestHijacker.Hijack() // Forward raw response bytes back to client. if len(rawResponse) > 0 { klog.V(6).Infof("Writing %d bytes to hijacked connection", len(rawResponse)) if _, err = requestHijackedConn.Write(rawResponse); err != nil { utilruntime.HandleError(fmt.Errorf("Error proxying response from backend to client: %v", err)) } }
3.4.3 two way flow copying
After the above two steps, apserver has two http links. Because the protocol is not http, apserver can not directly operate, but can only forward the request and response in the way of stream copy
// Two way copy link go func() { var writer io.WriteCloser if h.MaxBytesPerSec > 0 { writer = flowrate.NewWriter(backendConn, h.MaxBytesPerSec) } else { writer = backendConn } _, err := io.Copy(writer, requestHijackedConn) if err != nil && !strings.Contains(err.Error(), "use of closed network connection") { klog.Errorf("Error proxying data from client to backend: %v", err) } close(writerComplete) }() go func() { var reader io.ReadCloser if h.MaxBytesPerSec > 0 { reader = flowrate.NewReader(backendConn, h.MaxBytesPerSec) } else { reader = backendConn } _, err := io.Copy(requestHijackedConn, reader) if err != nil && !strings.Contains(err.Error(), "use of closed network connection") { klog.Errorf("Error proxying data from backend to client: %v", err) } close(readerComplete) }()
4.kubelet
The command execution on kubelet mainly depends on CRI.RuntimeService. Kubelet is only responsible for the forwarding of corresponding requests, and finally builds a Stream agent to forward subsequent requests, which completes its mission
4.1 execute command main process
The main process is to get the command to be executed, then detect the corresponding Pod new, call host.GetExec to return a corresponding URL, and then the subsequent requests will be completed by proxyStream. We start to deepen step by step
func (s *Server) getExec(request *restful.Request, response *restful.Response) { // Get execution command params := getExecRequestParams(request) streamOpts, err := remotecommandserver.NewOptions(request.Request) // Get pod information pod, ok := s.host.GetPodByName(params.podNamespace, params.podName) podFullName := kubecontainer.GetPodFullName(pod) url, err := s.host.GetExec(podFullName, params.podUID, params.containerName, params.cmd, *streamOpts) proxyStream(response.ResponseWriter, request.Request, url) }
4.2 Exec returns execution results
host.GetExec will finally call the Exec interface of runtimeService, cri.RuntimeService, to execute the request. The interface will return an address, i.e. / exec/{token}. At this time, the real command is not executed but a command execution request is created
func (m *kubeGenericRuntimeManager) GetExec(id kubecontainer.ContainerID, cmd []string, stdin, stdout, stderr, tty bool) (*url.URL, error) { // Omit request construction // Executive order resp, err := m.runtimeService.Exec(req) return url.Parse(resp.Url) }
In the end, we call the exec interface of cri. First, we ignore the specific return of the interface and read the rest of kubelet's logic
func (c *runtimeServiceClient) Exec(ctx context.Context, in *ExecRequest, opts ...grpc.CallOption) (*ExecResponse, error) { err := c.cc.Invoke(ctx, "/runtime.v1alpha2.RuntimeService/Exec", in, out, opts...) }
4.3 proxyStream
Here we can see that it's the upgrade aware handler we've seen before, but this time the url is the url returned by the backend exec execution, and then the rest is similar to that in the apserver. Stream copy between two http links
Let's think about the place where Request and Response are actually the links established between the corresponding apiserver and kubelet. On this link is the head of spdy. Remember this place, then continue to establish links with the back end. The back end is actually a server of the spdy protocol, So far, we are still short of the last part, which is the link returned, and who is the corresponding controller. Go to the cri part of the next section
// proxyStream proxies stream to url. func proxyStream(w http.ResponseWriter, r *http.Request, url *url.URL) { // TODO(random-liu): Set MaxBytesPerSec to throttle the stream. handler := proxy.NewUpgradeAwareHandler(url, nil /*transport*/, false /*wrapTransport*/, true /*upgradeRequired*/, &responder{}) handler.ServeHTTP(w, r) }
5.CRI
CRI.RuntimeService is responsible for the final command execution, which is also the real execution location of the command execution. It also involves many protocol processing related operations. Let's take a look at the key implementation
5.1 registration of dockerruntime
We call the Exec interface of RuntimeService above, and finally find the following code in kubelet, create a DockerServer and start it
ds, err := dockershim.NewDockerService(kubeDeps.DockerClientConfig, crOptions.PodSandboxImage, streamingConfig, dockerServer := dockerremote.NewDockerServer(remoteRuntimeEndpoint, ds) if err := dockerServer.Start(); err != nil { return err }
In the Start function, the following two runtimeservices are registered, and friends who have written grpc know that this is actually to register the implementation of the corresponding rpc interface. In fact, we call the DockerService interface
runtimeapi.RegisterRuntimeServiceServer(s.server, s.service) runtimeapi.RegisterImageServiceServer(s.server, s.service)
5.2 Exec implementation of dockerservice
The final implementation of Exec can find that it actually calls the GetExec interface of streamingServer and returns an interface of / exec/{token}
func (ds *dockerService) Exec(_ context.Context, req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) { // Execute Exec request return ds.streamingServer.GetExec(req) }
As we continue to trace the streaming server, we can see that the GetExec interface is implemented as follows. Finally, we build a url=/exec/{token}. Note that the current Request request is actually stored in the cache
func (s *server) GetExec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) { // Generate token token, err := s.cache.Insert(req) return &runtimeapi.ExecResponse{ Url: s.buildURL("exec", token), }, nil }
5.3 build command parameters execute Exec
First, get the previously cached Request through token, then build StreamOpts through exec Request command, and finally call ServeExec for execution. Next is the most difficult part to understand. High energy ahead
func (s *server) serveExec(req *restful.Request, resp *restful.Response) { // Get token token := req.PathParameter("token") // Cache request cachedRequest, ok := s.cache.Consume(token) // Building exec parameters exec, ok := cachedRequest.(*runtimeapi.ExecRequest) streamOpts := &remotecommandserver.Options{ Stdin: exec.Stdin, Stdout: exec.Stdout, Stderr: exec.Stderr, TTY: exec.Tty, } // Build ServerExec to execute requests remotecommandserver.ServeExec( resp.ResponseWriter, req.Request, s.runtime, "", // unused: podName "", // unusued: podUID exec.ContainerId, exec.Cmd, streamOpts, s.config.StreamIdleTimeout, s.config.StreamCreationTimeout, s.config.SupportedRemoteCommandProtocols) }
5.4 ServerExec
The key steps of ServerExec are as follows: 1) create stream 2) execute request. The more complex part is mainly focused on creating stream. We pay attention to the parameter part of ExecInContainer. The stream of ctx related file descriptor obtained by creating stream is passed in. There are two protocols in createstreams: websocket and https, Here we mainly analyze https (what we use kubectl is https protocol)
func ServeExec(w http.ResponseWriter, req *http.Request, executor Executor, podName string, uid types.UID, container string, cmd []string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) { // Create serveExec ctx, ok := createStreams(req, w, streamOpts, supportedProtocols, idleTimeout, streamCreationTimeout) defer ctx.conn.Close() // Getting execution is a blocking process. err will get whether the current execution is successful or not. Here, all the information in ctx is passed in, which corresponds to various flows err := executor.ExecInContainer(podName, uid, container, cmd, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan, 0) }
5.5 create HTTPS Stream
To establish a stream, I will summarize it into the following steps: 1) perform https handshake; 2) upgrade the protocol to spdy; 3) wait for the stream to be established. Let's see in turn
1. Complete https handshake
protocol, err := httpstream.Handshake(req, w, supportedStreamProtocols)
2. Agreement promotion
// Flow pipe streamCh := make(chan streamAndReply) upgrader := spdy.NewResponseUpgrader() // Building spdy links conn := upgrader.UpgradeResponse(w, req, func(stream httpstream.Stream, replySent <-chan struct{}) error { // When a new request is created, it is appended to the stream streamCh <- streamAndReply{Stream: stream, replySent: replySent} return nil })
A key mechanism here is to pass func callback function and stream. After a link is established, a Server will be created, And a controller passed in is the func callback function. Every time a link is established for this function, if the corresponding stream is obtained, it will be added to streamch. The following is the most complex network processing part. Because it is too complex, it's better to open a separate section
5.6 establishment of spdy stream
The overall process seems simple, which is to switch the protocol according to the request first, then return to 101, and build SPDY request processing based on the current link, then wait for kubectl to send the streams to be established through apiserver, and then complete the establishment of communication streams with each other
5.6.1 response to Protocol Improvement
The first step is to respond to the protocol promotion first. Here we pay attention to several key parts, spdy protocol and 101 status code
// Agreement hijacker, ok := w.(http.Hijacker) if !ok { errorMsg := fmt.Sprintf("unable to upgrade: unable to hijack response") http.Error(w, errorMsg, http.StatusInternalServerError) return nil } w.Header().Add(httpstream.HeaderConnection, httpstream.HeaderUpgrade) // sydy protocol w.Header().Add(httpstream.HeaderUpgrade, HeaderSpdy31) w.WriteHeader(http.StatusSwitchingProtocols)
5.6.2 establish spdyServer
spdyConn, err := NewServerConnection(connWithBuf, newStreamHandler)
Finally, it will be responsible for the establishment of new links through newConnection
func NewServerConnection(conn net.Conn, newStreamHandler httpstream.NewStreamHandler) (httpstream.Connection, error) { // Create a new link through an existing network link spdyConn, err := spdystream.NewConnection(conn, true) return newConnection(spdyConn, newStreamHandler), nil }
Here we can see that it is to start a background server to process the link request
func newConnection(conn *spdystream.Connection, newStreamHandler httpstream.NewStreamHandler) httpstream.Connection { c := &connection{conn: conn, newStreamHandler: newStreamHandler} // When a syn request is made to create a stream after the link is established, newSpdyStream will be called go conn.Serve(c.newSpdyStream) return c }
5.6.3 Serve
1. First, multiple goroutine s will be started to handle the request. Here, the number of worker s is 5, and the queue size is 20,
frameQueues := make([]*PriorityFrameQueue, FRAME_WORKERS) for i := 0; i < FRAME_WORKERS; i++ { frameQueues[i] = NewPriorityFrameQueue(QUEUE_SIZE) // Ensure frame queue is drained when connection is closed go func(frameQueue *PriorityFrameQueue) { <-s.closeChan frameQueue.Drain() }(frameQueues[i]) wg.Add(1) go func(frameQueue *PriorityFrameQueue) { // let the WaitGroup know this worker is done defer wg.Done() s.frameHandler(frameQueue, newHandler) }(frameQueues[i]) }
2. Listen to synStreamFrame and difflue frame, and hash the corresponding frame queues according to the frame's streamframe
case *spdy.SynStreamFrame: if s.checkStreamFrame(frame) { priority = frame.Priority partition = int(frame.StreamId % FRAME_WORKERS) debugMessage("(%p) Add stream frame: %d ", s, frame.StreamId) // Add to the frame corresponding to the corresponding StreamId s.addStreamFrame(frame) } else { debugMessage("(%p) Rejected stream frame: %d ", s, frame.StreamId) continue // Finally, frame will be pushed into the priority queue above frameQueues[partition].Push(readFrame, priority)
3. Read the frame and pass the read stream to the above stream through newHandler
func (s *Connection) frameHandler(frameQueue *PriorityFrameQueue, newHandler StreamHandler) { for { popFrame := frameQueue.Pop() if popFrame == nil { return } var frameErr error switch frame := popFrame.(type) { case *spdy.SynStreamFrame: frameErr = s.handleStreamFrame(frame, newHandler) } }
The flow of consumption to the next section
5.7 wait for Stream to be established
The stream waiting to be established is mainly realized by the StreamType in Headers. In this section, the corresponding stdinStream and the stream binding in the corresponding spdy will be described. The same is true for other types
func (*v3ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error) { ctx := &context{} receivedStreams := 0 replyChan := make(chan struct{}) stop := make(chan struct{}) defer close(stop) WaitForStreams: for { select { case stream := <-streams: streamType := stream.Headers().Get(api.StreamType) switch streamType { case api.StreamTypeError: ctx.writeStatus = v1WriteStatusFunc(stream) go waitStreamReply(stream.replySent, replyChan, stop) case api.StreamTypeStdin: ctx.stdinStream = stream go waitStreamReply(stream.replySent, replyChan, stop) case api.StreamTypeStdout: ctx.stdoutStream = stream go waitStreamReply(stream.replySent, replyChan, stop) case api.StreamTypeStderr: ctx.stderrStream = stream go waitStreamReply(stream.replySent, replyChan, stop) case api.StreamTypeResize: ctx.resizeStream = stream go waitStreamReply(stream.replySent, replyChan, stop) default: runtime.HandleError(fmt.Errorf("unexpected stream type: %q", streamType)) } case <-replyChan: receivedStreams++ if receivedStreams == expectedStreams { break WaitForStreams } case <-expired: // TODO find a way to return the error to the user. Maybe use a separate // stream to report errors? return nil, errors.New("timed out waiting for client to create streams") } } return ctx, nil }
5.8 CRI adapter execution command
Tracing the call chain, you can finally see the following calls, which point to the execHandler.ExecInContainer interface for executing commands in the container
func (a *criAdapter) ExecInContainer(podName string, podUID types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error { // Execute command return a.Runtime.Exec(container, cmd, in, out, err, tty, resize) } func (r *streamingRuntime) Exec(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error { // Execution container return r.exec(containerID, cmd, in, out, err, tty, resize, 0) } // Internal version of Exec adds a timeout. func (r *streamingRuntime) exec(containerID string, cmd []string, in io.Reader, out, errw io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error { // exechandler return r.execHandler.ExecInContainer(r.client, container, cmd, in, out, errw, tty, resize, timeout) }
5.9 command execution main process
The main process of command direction is divided into two parts: 1) create exec execution task 2) start exec execution task
func (*NativeExecHandler) ExecInContainer(client libdocker.Interface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error { // Execute command in container done := make(chan struct{}) defer close(done) // Executive order createOpts := dockertypes.ExecConfig{ Cmd: cmd, AttachStdin: stdin != nil, AttachStdout: stdout != nil, AttachStderr: stderr != nil, Tty: tty, } // Create execute command task execObj, err := client.CreateExec(container.ID, createOpts) startOpts := dockertypes.ExecStartCheck{Detach: false, Tty: tty} // Here we can see that the encapsulation of the stream we obtained earlier is passed into the execution command of the container as FD streamOpts := libdocker.StreamOptions{ InputStream: stdin, OutputStream: stdout, ErrorStream: stderr, RawTerminal: tty, ExecStarted: execStarted, } // Executive order err = client.StartExec(execObj.ID, startOpts, streamOpts) ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() count := 0 for { // Get execution results inspect, err2 := client.InspectExec(execObj.ID) if !inspect.Running { if inspect.ExitCode != 0 { err = &dockerExitError{inspect} } break } <-ticker.C } return err }
Command execution interface call of Docker
func (cli *Client) ContainerExecCreate(ctx context.Context, container string, config types.ExecConfig) (types.IDResponse, error) { resp, err := cli.post(ctx, "/containers/"+container+"/exec", nil, config, nil) return response, err }
5.10 core implementation of command execution
The core implementation of command execution mainly consists of two steps: 1) sending the exec execution request first; 2) starting the corresponding exec and obtaining the result; what is more complicated is the logic of SPDY related Stream
func (d *kubeDockerClient) StartExec(startExec string, opts dockertypes.ExecStartCheck, sopts StreamOptions) error { // Start execution command to get results resp, err := d.client.ContainerExecAttach(ctx, startExec, dockertypes.ExecStartCheck{ Detach: opts.Detach, Tty: opts.Tty, }) // Copy the input stream to the output stream. Here, the result in resp will be copied to the output stream return d.holdHijackedConnection(sopts.RawTerminal || opts.Tty, sopts.InputStream, sopts.OutputStream, sopts.ErrorStream, resp) }
5.10.1 command execution request
cli.postHijacked(ctx, "/exec/"+execID+"/start", nil, config, headers)
5.10.2 send request to get connection
The function of HiHijackConn here is similar to that described before. Its core is to establish http link and then upgrade the protocol. Its conn is the underlying tcp link. At the same time, the corresponding link is set to keep the current 30s. Here we have another link based on spdy two-way communication
func (cli *Client) postHijacked(ctx context.Context, path string, query url.Values, body interface{}, headers map[string][]string) (types.HijackedResponse, error) { conn, err := cli.setupHijackConn(ctx, req, "tcp") return types.HijackedResponse{Conn: conn, Reader: bufio.NewReader(conn)}, err }
5.10.3 establish stream copy
So far on kubelet, we have obtained the Stream to execute commands with the back end and the Stream established with apiserver. At this time, we only need to copy the two streams directly to realize data transmission
func (d *kubeDockerClient) holdHijackedConnection(tty bool, inputStream io.Reader, outputStream, errorStream io.Writer, resp dockertypes.HijackedResponse) error { receiveStdout := make(chan error) if outputStream != nil || errorStream != nil { // Copy the response results to the outputstream go func() { receiveStdout <- d.redirectResponseToOutputStream(tty, outputStream, errorStream, resp.Reader) }() } stdinDone := make(chan struct{}) go func() { if inputStream != nil { io.Copy(resp.Conn, inputStream) } resp.CloseWrite() close(stdinDone) }() return nil }
5.10.4 test execution status
After the execution command is completed, the execution status will be checked every 2s. If it is found that the execution is completed, it will exit directly
func (*NativeExecHandler) ExecInContainer(client libdocker.Interface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error { ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() count := 0 for { // Get execution results inspect, err2 := client.InspectExec(execObj.ID) if err2 != nil { return err2 } if !inspect.Running { if inspect.ExitCode != 0 { err = &dockerExitError{inspect} } break } <-ticker.C } return err } func (cli *Client) ContainerExecInspect(ctx context.Context, execID string) (types.ContainerExecInspect, error) { resp, err := cli.get(ctx, "/exec/"+execID+"/json", nil, nil) return response, err }
6. summary
In fact, the whole command execution process is quite complex, mainly in the part of network protocol switching. We can see that in the whole process, it is based on SPDY protocol, and in the part of CRI.RuntimeService, we can also see that the request processing of Stream is actually multi goroutine concurrent, admiring Daniel's design, what's wrong Place, welcome to discuss, thank you guys for seeing here
kubernetes learning notes address: https://www.yuque.com/baxiaoshi/tyado3
Wechat: baxiaoshi2020 Pay attention to the bulletin number to read more source code analysis articles More articles www.sreguide.com