The core implementation of kubernetes command execution

Command execution in K8s is jointly completed by apiserver, kubelet, cri, docker and other components. The most complex is protocol switching and various stream copying. Let's take a look at the key implementation. Although there are many codes, they should be able to understand without development. Good luck

1. Basic concepts

There are many protocol related processing in the command execution of K8s. Let's take a look at the basic concepts related to these protocol processing

1.1 Connection and Upgrade in HTTP protocol

In HTTP/1.1, it is allowed to realize protocol conversion through Connection in Header header and Upgrade in the same link. In short, it is allowed to use other protocols to communicate over the link established through HTTP, which is also the key to realize protocol Upgrade in k8s command

1.2 101 status code in HTTP protocol

In addition to our common HTTP 1.1, the HTTP protocol also supports protocols such as websocket/spdy. How can the server and the client switch over different protocols over HTTP? The first element is the 101 (switching protocol) status code here, that is, the server informs the client that we switch to the protocol defined by upgrade for communication (reusing the current link)

1.3 stream in spdy protocol

SPDY protocol is a TCP session layer protocol developed by google, In SPDY protocol, Http Request/Response is called stream, and it supports TCP link reuse. At the same time, multiple streams are marked by stream ID. in short, it supports multiple request response processing in a single link at the same time, and they do not affect each other. The command execution in k8s mainly uses stream for message transmission

1.4 file descriptor redirection

In Linux, the process execution usually includes three FDS: standard input, standard output and standard error. The command execution in k8s will redirect the corresponding FD to obtain the output of the container's command. Where is the redirection? Of course, the stream we mentioned above (because I am not familiar with docker, this place does not guarantee the accuracy of docker)

1.5 Hijacker in http

After the conversion based on the current link is completed between the client and the server through 101 status code, connection, upragde, etc, The data transmitted on the current link is no longer the previous http1.1 protocol. At this time, the corresponding http link will be converted to the corresponding protocol for conversion. During the execution of k8s command, the corresponding request and response will be obtained through the http Hijacker interface to obtain the tcp link at the bottom layer, so as to continue to complete the request forwarding

1.6 forwarding of stream copy based on tcp

After getting the two bottom-level tcp readerwriter s through Hijacker, you can copy the corresponding data on the two streams directly through io.copy, so that you don't need to convert the protocol in apiserver, but you can forward the request and result directly through tcp stream copying

This is the basic introduction. Let's take a look at the specific implementation of the underlying layer. Let's start from the kubectl part and analyze it layer by layer

2.kubectl

Kubectl execution command is mainly divided into two parts: pod validity detection and command execution. Pod validity detection is mainly to obtain the status of the corresponding pod and check whether it is running. Here we focus on the command execution part

2.1 core process of command execution

The core of command execution is divided into two steps: 1. Establish a link through SPDY protocol; 2) build a Stream to establish a link

func (*DefaultRemoteExecutor) Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error {
    exec, err := remotecommand.NewSPDYExecutor(config, method, url)
    if err != nil {
        return err
    }
    return exec.Stream(remotecommand.StreamOptions{
        Stdin:             stdin,
        Stdout:            stdout,
        Stderr:            stderr,
        Tty:               tty,
        TerminalSizeQueue: terminalSizeQueue,
    })
}

2.2 exec request building

We can see that the Url /pods/{namespace}/{podName}/exec spliced here is actually the sub resource interface corresponding to the pod on the apserver, and then we can see the request processing on the apserver side

	// Create an exec
		req := restClient.Post().
			Resource("pods").
			Name(pod.Name).
			Namespace(pod.Namespace).
			SubResource("exec")
		req.VersionedParams(&corev1.PodExecOptions{
			Container: containerName,
			Command:   p.Command,
			Stdin:     p.Stdin,
			Stdout:    p.Out != nil,
			Stderr:    p.ErrOut != nil,
			TTY:       t.Raw,
		}, scheme.ParameterCodec)
return p.Executor.Execute("POST", req.URL(), p.Config, p.In, p.Out, p.ErrOut, t.Raw, sizeQueue)

2.3 create Stream

In exec.Stream, the main function is to pass the type of Stream to be created through Headers and negotiate with the server

	// set up stdin stream
	if p.Stdin != nil {
		headers.Set(v1.StreamType, v1.StreamTypeStdin)
		p.remoteStdin, err = conn.CreateStream(headers)
		if err != nil {
			return err
		}
	}

	// set up stdout stream
	if p.Stdout != nil {
		headers.Set(v1.StreamType, v1.StreamTypeStdout)
		p.remoteStdout, err = conn.CreateStream(headers)
		if err != nil {
			return err
		}
	}

	// set up stderr stream
	if p.Stderr != nil && !p.Tty {
		headers.Set(v1.StreamType, v1.StreamTypeStderr)
		p.remoteStderr, err = conn.CreateStream(headers)
		if err != nil {
			return err
		}
	}

3.APIServer

APIServer acts as an agent in the process of command execution. It is responsible for forwarding the request between kubectl and kubelet. Note that this forwarding is mainly based on tcp stream copying. Because the communication between kubectl and kubelet is actually spdy protocol. Let's look at the key implementation

3.1 Connection

Exec's SPDY request will be sent to the Connect interface first. The Connection interface is responsible for establishing the link with the back-end kubelet and returning the response results. In the Connection interface, the corresponding Node information will be obtained through the Pod first, and the Location is the link address and transport of the back-end kubelet

func (r *ExecREST) Connect(ctx context.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) {
	execOpts, ok := opts.(*api.PodExecOptions)
	if !ok {
		return nil, fmt.Errorf("invalid options object: %#v", opts)
	}
	// Return the corresponding address and establish the link
	location, transport, err := pod.ExecLocation(r.Store, r.KubeletConn, ctx, name, execOpts)
	if err != nil {
		return nil, err
	}
	return newThrottledUpgradeAwareProxyHandler(location, transport, false, true, true, responder), nil
}

3.2 get back-end service address

In order to obtain the address, it is mainly to build the location information of the backend. Here, the host and Port information of the corresponding node will be obtained through the information reported by kubelet, and the final point Path of the pod is assembled, that is, the Path field / exec/{namespace}/{podName}/{containerName}

	loc := &url.URL{
		Scheme:   nodeInfo.Scheme,
		Host:     net.JoinHostPort(nodeInfo.Hostname, nodeInfo.Port),	// Port of node
		Path:     fmt.Sprintf("/%s/%s/%s/%s", path, pod.Namespace, pod.Name, container),	// Route
		RawQuery: params.Encode(),
	}

3.3 initialization of protocol promotion handler

Protocol promotion is mainly implemented by the UpgradeAwareHandler controller. After receiving the request, the handler will first try to improve the protocol, which is mainly to check whether the Connection value in the http header is Upragde. From the previous analysis of kubelet, we can know that it must be true here

func newThrottledUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired, interceptRedirects bool, responder rest.Responder) *proxy.UpgradeAwareHandler {

	handler := proxy.NewUpgradeAwareHandler(location, transport, wrapTransport, upgradeRequired, proxy.NewErrorResponder(responder))
	handler.InterceptRedirects = interceptRedirects && utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StreamingProxyRedirects)
	handler.RequireSameHostRedirects = utilfeature.DefaultFeatureGate.Enabled(genericfeatures.ValidateProxyRedirects)
	handler.MaxBytesPerSec = capabilities.Get().PerConnectionBandwidthLimitBytesPerSec
	return handler
}

func (h *UpgradeAwareHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
	// If the promotion of the agreement is successful, it will be completed by the agreement
	if h.tryUpgrade(w, req) {
		return
	}
    // Omit N multiple codes
}

3.4 protocol promotion

There are a lot of logic in protocol promotion processing. It is described in several sections in turn. It is mainly to obtain the request from the HTTP link and forward it, then hold two links at the same time, and copy the TCP stream on the link

3.4.1 link with kubelet

The first step of protocol promotion is to establish a link with the kubelet at the back end. Here, the request sent by the kubelet will be copied and sent to the kubelet at the back end, and a link with the http established by the kubelet will also be obtained here, which will be used later in stream copying, Note that in fact, the status code of the http request response is 101, that is, kubelet actually constructs a handler of spdy protocol to communicate

		// Build http request
		req, err := http.NewRequest(method, location.String(), body)
		if err != nil {
			return nil, nil, err
		}

		req.Header = header

		// Send request to establish link
		intermediateConn, err = dialer.Dial(req)
		if err != nil {
			return nil, nil, err
		}

		// Peek at the backend response.
		rawResponse.Reset()
		respReader := bufio.NewReader(io.TeeReader(
			io.LimitReader(intermediateConn, maxResponseSize), // Don't read more than maxResponseSize bytes.
			rawResponse)) // Save the raw response.
			// Read response information
		resp, err := http.ReadResponse(respReader, nil)

3.4.2 Hijack of request request

In fact, this request is based on the spdy protocol. After getting the link to the underlying layer through Hijack, you need to forward the above request to kubelet first to trigger kubelet to send the following Stream request to establish the link. Here, Write forwards the result of kubelet

	requestHijackedConn, _, err := requestHijacker.Hijack()
	// Forward raw response bytes back to client.
	if len(rawResponse) > 0 {
		klog.V(6).Infof("Writing %d bytes to hijacked connection", len(rawResponse))
		if _, err = requestHijackedConn.Write(rawResponse); err != nil {
			utilruntime.HandleError(fmt.Errorf("Error proxying response from backend to client: %v", err))
		}
	}

3.4.3 two way flow copying

After the above two steps, apserver has two http links. Because the protocol is not http, apserver can not directly operate, but can only forward the request and response in the way of stream copy

	// Two way copy link
	go func() {
		var writer io.WriteCloser
		if h.MaxBytesPerSec > 0 {
			writer = flowrate.NewWriter(backendConn, h.MaxBytesPerSec)
		} else {
			writer = backendConn
		}
		_, err := io.Copy(writer, requestHijackedConn)
		if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
			klog.Errorf("Error proxying data from client to backend: %v", err)
		}
		close(writerComplete)
	}()

	go func() {
		var reader io.ReadCloser
		if h.MaxBytesPerSec > 0 {
			reader = flowrate.NewReader(backendConn, h.MaxBytesPerSec)
		} else {
			reader = backendConn
		}
		_, err := io.Copy(requestHijackedConn, reader)
		if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
			klog.Errorf("Error proxying data from backend to client: %v", err)
		}
		close(readerComplete)
	}()

4.kubelet

The command execution on kubelet mainly depends on CRI.RuntimeService. Kubelet is only responsible for the forwarding of corresponding requests, and finally builds a Stream agent to forward subsequent requests, which completes its mission

4.1 execute command main process

The main process is to get the command to be executed, then detect the corresponding Pod new, call host.GetExec to return a corresponding URL, and then the subsequent requests will be completed by proxyStream. We start to deepen step by step

func (s *Server) getExec(request *restful.Request, response *restful.Response) {
	// Get execution command
	params := getExecRequestParams(request)
	streamOpts, err := remotecommandserver.NewOptions(request.Request)
	// Get pod information
	pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
	podFullName := kubecontainer.GetPodFullName(pod)
	url, err := s.host.GetExec(podFullName, params.podUID, params.containerName, params.cmd, *streamOpts)
	proxyStream(response.ResponseWriter, request.Request, url)
}

4.2 Exec returns execution results

host.GetExec will finally call the Exec interface of runtimeService, cri.RuntimeService, to execute the request. The interface will return an address, i.e. / exec/{token}. At this time, the real command is not executed but a command execution request is created

func (m *kubeGenericRuntimeManager) GetExec(id kubecontainer.ContainerID, cmd []string, stdin, stdout, stderr, tty bool) (*url.URL, error) {
	// Omit request construction
	// Executive order
	resp, err := m.runtimeService.Exec(req)
	return url.Parse(resp.Url)
}

In the end, we call the exec interface of cri. First, we ignore the specific return of the interface and read the rest of kubelet's logic

func (c *runtimeServiceClient) Exec(ctx context.Context, in *ExecRequest, opts ...grpc.CallOption) (*ExecResponse, error) {
	err := c.cc.Invoke(ctx, "/runtime.v1alpha2.RuntimeService/Exec", in, out, opts...)
}

4.3 proxyStream

Here we can see that it's the upgrade aware handler we've seen before, but this time the url is the url returned by the backend exec execution, and then the rest is similar to that in the apserver. Stream copy between two http links

Let's think about the place where Request and Response are actually the links established between the corresponding apiserver and kubelet. On this link is the head of spdy. Remember this place, then continue to establish links with the back end. The back end is actually a server of the spdy protocol, So far, we are still short of the last part, which is the link returned, and who is the corresponding controller. Go to the cri part of the next section

// proxyStream proxies stream to url.
func proxyStream(w http.ResponseWriter, r *http.Request, url *url.URL) {
	// TODO(random-liu): Set MaxBytesPerSec to throttle the stream.
	handler := proxy.NewUpgradeAwareHandler(url, nil /*transport*/, false /*wrapTransport*/, true /*upgradeRequired*/, &responder{})
	handler.ServeHTTP(w, r)
}

5.CRI

CRI.RuntimeService is responsible for the final command execution, which is also the real execution location of the command execution. It also involves many protocol processing related operations. Let's take a look at the key implementation

5.1 registration of dockerruntime

We call the Exec interface of RuntimeService above, and finally find the following code in kubelet, create a DockerServer and start it

ds, err := dockershim.NewDockerService(kubeDeps.DockerClientConfig, crOptions.PodSandboxImage, streamingConfig,
dockerServer := dockerremote.NewDockerServer(remoteRuntimeEndpoint, ds)
		if err := dockerServer.Start(); err != nil {
			return err
		}

In the Start function, the following two runtimeservices are registered, and friends who have written grpc know that this is actually to register the implementation of the corresponding rpc interface. In fact, we call the DockerService interface

	runtimeapi.RegisterRuntimeServiceServer(s.server, s.service)
	runtimeapi.RegisterImageServiceServer(s.server, s.service)

5.2 Exec implementation of dockerservice

The final implementation of Exec can find that it actually calls the GetExec interface of streamingServer and returns an interface of / exec/{token}

func (ds *dockerService) Exec(_ context.Context, req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
	// Execute Exec request
	return ds.streamingServer.GetExec(req)
}

As we continue to trace the streaming server, we can see that the GetExec interface is implemented as follows. Finally, we build a url=/exec/{token}. Note that the current Request request is actually stored in the cache

func (s *server) GetExec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
	// Generate token
	token, err := s.cache.Insert(req)
	return &runtimeapi.ExecResponse{
		Url: s.buildURL("exec", token),
	}, nil
}

5.3 build command parameters execute Exec

First, get the previously cached Request through token, then build StreamOpts through exec Request command, and finally call ServeExec for execution. Next is the most difficult part to understand. High energy ahead

func (s *server) serveExec(req *restful.Request, resp *restful.Response) {
	// Get token
	token := req.PathParameter("token")
	// Cache request
	cachedRequest, ok := s.cache.Consume(token)
	// Building exec parameters
	exec, ok := cachedRequest.(*runtimeapi.ExecRequest)

	streamOpts := &remotecommandserver.Options{
		Stdin:  exec.Stdin,
		Stdout: exec.Stdout,
		Stderr: exec.Stderr,
		TTY:    exec.Tty,
	}

	// Build ServerExec to execute requests
	remotecommandserver.ServeExec(
		resp.ResponseWriter,
		req.Request,
		s.runtime,
		"", // unused: podName
		"", // unusued: podUID
		exec.ContainerId,
		exec.Cmd,
		streamOpts,
		s.config.StreamIdleTimeout,
		s.config.StreamCreationTimeout,
		s.config.SupportedRemoteCommandProtocols)
}

5.4 ServerExec

The key steps of ServerExec are as follows: 1) create stream 2) execute request. The more complex part is mainly focused on creating stream. We pay attention to the parameter part of ExecInContainer. The stream of ctx related file descriptor obtained by creating stream is passed in. There are two protocols in createstreams: websocket and https, Here we mainly analyze https (what we use kubectl is https protocol)

func ServeExec(w http.ResponseWriter, req *http.Request, executor Executor, podName string, uid types.UID, container string, cmd []string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) {
	// Create serveExec
	ctx, ok := createStreams(req, w, streamOpts, supportedProtocols, idleTimeout, streamCreationTimeout)

	defer ctx.conn.Close()

	// Getting execution is a blocking process. err will get whether the current execution is successful or not. Here, all the information in ctx is passed in, which corresponds to various flows
	err := executor.ExecInContainer(podName, uid, container, cmd, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan, 0)

}

5.5 create HTTPS Stream

To establish a stream, I will summarize it into the following steps: 1) perform https handshake; 2) upgrade the protocol to spdy; 3) wait for the stream to be established. Let's see in turn

1. Complete https handshake

protocol, err := httpstream.Handshake(req, w, supportedStreamProtocols)

2. Agreement promotion

	// Flow pipe
	streamCh := make(chan streamAndReply)

	upgrader := spdy.NewResponseUpgrader()
	// Building spdy links
	conn := upgrader.UpgradeResponse(w, req, func(stream httpstream.Stream, replySent <-chan struct{}) error {
		// When a new request is created, it is appended to the stream
		streamCh <- streamAndReply{Stream: stream, replySent: replySent}
		return nil
	})

A key mechanism here is to pass func callback function and stream. After a link is established, a Server will be created, And a controller passed in is the func callback function. Every time a link is established for this function, if the corresponding stream is obtained, it will be added to streamch. The following is the most complex network processing part. Because it is too complex, it's better to open a separate section

5.6 establishment of spdy stream

The overall process seems simple, which is to switch the protocol according to the request first, then return to 101, and build SPDY request processing based on the current link, then wait for kubectl to send the streams to be established through apiserver, and then complete the establishment of communication streams with each other

5.6.1 response to Protocol Improvement

The first step is to respond to the protocol promotion first. Here we pay attention to several key parts, spdy protocol and 101 status code

	// Agreement
	hijacker, ok := w.(http.Hijacker)
	if !ok {
		errorMsg := fmt.Sprintf("unable to upgrade: unable to hijack response")
		http.Error(w, errorMsg, http.StatusInternalServerError)
		return nil
	}

	w.Header().Add(httpstream.HeaderConnection, httpstream.HeaderUpgrade)
	// sydy protocol
	w.Header().Add(httpstream.HeaderUpgrade, HeaderSpdy31)
	w.WriteHeader(http.StatusSwitchingProtocols)

5.6.2 establish spdyServer

spdyConn, err := NewServerConnection(connWithBuf, newStreamHandler)

Finally, it will be responsible for the establishment of new links through newConnection

func NewServerConnection(conn net.Conn, newStreamHandler httpstream.NewStreamHandler) (httpstream.Connection, error) {
	// Create a new link through an existing network link
	spdyConn, err := spdystream.NewConnection(conn, true)

	return newConnection(spdyConn, newStreamHandler), nil
}

Here we can see that it is to start a background server to process the link request

func newConnection(conn *spdystream.Connection, newStreamHandler httpstream.NewStreamHandler) httpstream.Connection {
	c := &connection{conn: conn, newStreamHandler: newStreamHandler}
	// When a syn request is made to create a stream after the link is established, newSpdyStream will be called
	go conn.Serve(c.newSpdyStream)
	return c
}

5.6.3 Serve

1. First, multiple goroutine s will be started to handle the request. Here, the number of worker s is 5, and the queue size is 20,

frameQueues := make([]*PriorityFrameQueue, FRAME_WORKERS)
	for i := 0; i < FRAME_WORKERS; i++ {
		frameQueues[i] = NewPriorityFrameQueue(QUEUE_SIZE)

		// Ensure frame queue is drained when connection is closed
		go func(frameQueue *PriorityFrameQueue) {
			<-s.closeChan
			frameQueue.Drain()
		}(frameQueues[i])

		wg.Add(1)
		go func(frameQueue *PriorityFrameQueue) {
			// let the WaitGroup know this worker is done
			defer wg.Done()

			s.frameHandler(frameQueue, newHandler)
		}(frameQueues[i])
	}

2. Listen to synStreamFrame and difflue frame, and hash the corresponding frame queues according to the frame's streamframe

		case *spdy.SynStreamFrame:
			if s.checkStreamFrame(frame) {
				priority = frame.Priority
				partition = int(frame.StreamId % FRAME_WORKERS)
				debugMessage("(%p) Add stream frame: %d ", s, frame.StreamId)
                // Add to the frame corresponding to the corresponding StreamId
				s.addStreamFrame(frame)
			} else {
				debugMessage("(%p) Rejected stream frame: %d ", s, frame.StreamId)
				continue
			
                // Finally, frame will be pushed into the priority queue above
                frameQueues[partition].Push(readFrame, priority)

3. Read the frame and pass the read stream to the above stream through newHandler

func (s *Connection) frameHandler(frameQueue *PriorityFrameQueue, newHandler StreamHandler) {
	for {
		popFrame := frameQueue.Pop()
		if popFrame == nil {
			return
		}

		var frameErr error
		switch frame := popFrame.(type) {
		case *spdy.SynStreamFrame:
			frameErr = s.handleStreamFrame(frame, newHandler)
	}
}

The flow of consumption to the next section

5.7 wait for Stream to be established

The stream waiting to be established is mainly realized by the StreamType in Headers. In this section, the corresponding stdinStream and the stream binding in the corresponding spdy will be described. The same is true for other types


func (*v3ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error) {
	ctx := &context{}
	receivedStreams := 0
	replyChan := make(chan struct{})
	stop := make(chan struct{})
	defer close(stop)
WaitForStreams:
	for {
		select {
		case stream := <-streams:
			streamType := stream.Headers().Get(api.StreamType)
			switch streamType {
			case api.StreamTypeError:
				ctx.writeStatus = v1WriteStatusFunc(stream)
				go waitStreamReply(stream.replySent, replyChan, stop)
			case api.StreamTypeStdin:
				ctx.stdinStream = stream
				go waitStreamReply(stream.replySent, replyChan, stop)
			case api.StreamTypeStdout:
				ctx.stdoutStream = stream
				go waitStreamReply(stream.replySent, replyChan, stop)
			case api.StreamTypeStderr:
				ctx.stderrStream = stream
				go waitStreamReply(stream.replySent, replyChan, stop)
			case api.StreamTypeResize:
				ctx.resizeStream = stream
				go waitStreamReply(stream.replySent, replyChan, stop)
			default:
				runtime.HandleError(fmt.Errorf("unexpected stream type: %q", streamType))
			}
		case <-replyChan:
			receivedStreams++
			if receivedStreams == expectedStreams {
				break WaitForStreams
			}
		case <-expired:
			// TODO find a way to return the error to the user. Maybe use a separate
			// stream to report errors?
			return nil, errors.New("timed out waiting for client to create streams")
		}
	}

	return ctx, nil
}

5.8 CRI adapter execution command

Tracing the call chain, you can finally see the following calls, which point to the execHandler.ExecInContainer interface for executing commands in the container

func (a *criAdapter) ExecInContainer(podName string, podUID types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
	// Execute command
	return a.Runtime.Exec(container, cmd, in, out, err, tty, resize)
}
func (r *streamingRuntime) Exec(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
	//  Execution container
	return r.exec(containerID, cmd, in, out, err, tty, resize, 0)
}

// Internal version of Exec adds a timeout.
func (r *streamingRuntime) exec(containerID string, cmd []string, in io.Reader, out, errw io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
	// exechandler
	return r.execHandler.ExecInContainer(r.client, container, cmd, in, out, errw, tty, resize, timeout)
}

5.9 command execution main process

The main process of command direction is divided into two parts: 1) create exec execution task 2) start exec execution task

func (*NativeExecHandler) ExecInContainer(client libdocker.Interface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
	// Execute command in container
	done := make(chan struct{})
	defer close(done)

	// Executive order
	createOpts := dockertypes.ExecConfig{
		Cmd:          cmd,
		AttachStdin:  stdin != nil,
		AttachStdout: stdout != nil,
		AttachStderr: stderr != nil,
		Tty:          tty,
	}
	// Create execute command task
	execObj, err := client.CreateExec(container.ID, createOpts)

	startOpts := dockertypes.ExecStartCheck{Detach: false, Tty: tty}
	// Here we can see that the encapsulation of the stream we obtained earlier is passed into the execution command of the container as FD
	streamOpts := libdocker.StreamOptions{
		InputStream:  stdin,
		OutputStream: stdout,
		ErrorStream:  stderr,
		RawTerminal:  tty,
		ExecStarted:  execStarted,
	}
	// Executive order
	err = client.StartExec(execObj.ID, startOpts, streamOpts)
	
	ticker := time.NewTicker(2 * time.Second)
	defer ticker.Stop()
	count := 0
	for {
		//  Get execution results
		inspect, err2 := client.InspectExec(execObj.ID)
		if !inspect.Running {
			if inspect.ExitCode != 0 {
				err = &dockerExitError{inspect}
			}
			break
		}
		<-ticker.C
	}

	return err
}

Command execution interface call of Docker

func (cli *Client) ContainerExecCreate(ctx context.Context, container string, config types.ExecConfig) (types.IDResponse, error) {
	resp, err := cli.post(ctx, "/containers/"+container+"/exec", nil, config, nil)
	return response, err
}

5.10 core implementation of command execution

The core implementation of command execution mainly consists of two steps: 1) sending the exec execution request first; 2) starting the corresponding exec and obtaining the result; what is more complicated is the logic of SPDY related Stream

func (d *kubeDockerClient) StartExec(startExec string, opts dockertypes.ExecStartCheck, sopts StreamOptions) error {
	// Start execution command to get results
	resp, err := d.client.ContainerExecAttach(ctx, startExec, dockertypes.ExecStartCheck{
		Detach: opts.Detach,
		Tty:    opts.Tty,
	})
	// Copy the input stream to the output stream. Here, the result in resp will be copied to the output stream
	return d.holdHijackedConnection(sopts.RawTerminal || opts.Tty, sopts.InputStream, sopts.OutputStream, sopts.ErrorStream, resp)
}

5.10.1 command execution request

cli.postHijacked(ctx, "/exec/"+execID+"/start", nil, config, headers)

5.10.2 send request to get connection

The function of HiHijackConn here is similar to that described before. Its core is to establish http link and then upgrade the protocol. Its conn is the underlying tcp link. At the same time, the corresponding link is set to keep the current 30s. Here we have another link based on spdy two-way communication

func (cli *Client) postHijacked(ctx context.Context, path string, query url.Values, body interface{}, headers map[string][]string) (types.HijackedResponse, error) {
	conn, err := cli.setupHijackConn(ctx, req, "tcp")
	return types.HijackedResponse{Conn: conn, Reader: bufio.NewReader(conn)}, err
}

5.10.3 establish stream copy

So far on kubelet, we have obtained the Stream to execute commands with the back end and the Stream established with apiserver. At this time, we only need to copy the two streams directly to realize data transmission

func (d *kubeDockerClient) holdHijackedConnection(tty bool, inputStream io.Reader, outputStream, errorStream io.Writer, resp dockertypes.HijackedResponse) error {
	receiveStdout := make(chan error)
	if outputStream != nil || errorStream != nil {
		// Copy the response results to the outputstream
		go func() {
			receiveStdout <- d.redirectResponseToOutputStream(tty, outputStream, errorStream, resp.Reader)
		}()
	}

	stdinDone := make(chan struct{})
	go func() {
		if inputStream != nil {
			io.Copy(resp.Conn, inputStream)
		}
		resp.CloseWrite()
		close(stdinDone)
	}()

	return nil
}

5.10.4 test execution status

After the execution command is completed, the execution status will be checked every 2s. If it is found that the execution is completed, it will exit directly

func (*NativeExecHandler) ExecInContainer(client libdocker.Interface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
	ticker := time.NewTicker(2 * time.Second)
	defer ticker.Stop()
	count := 0
	for {
		//  Get execution results
		inspect, err2 := client.InspectExec(execObj.ID)
		if err2 != nil {
			return err2
		}
		if !inspect.Running {
			if inspect.ExitCode != 0 {
				err = &dockerExitError{inspect}
			}
			break
		}
		<-ticker.C
	}

	return err
}

func (cli *Client) ContainerExecInspect(ctx context.Context, execID string) (types.ContainerExecInspect, error) {
	resp, err := cli.get(ctx, "/exec/"+execID+"/json", nil, nil)
	return response, err
}

6. summary

In fact, the whole command execution process is quite complex, mainly in the part of network protocol switching. We can see that in the whole process, it is based on SPDY protocol, and in the part of CRI.RuntimeService, we can also see that the request processing of Stream is actually multi goroutine concurrent, admiring Daniel's design, what's wrong Place, welcome to discuss, thank you guys for seeing here

kubernetes learning notes address: https://www.yuque.com/baxiaoshi/tyado3

Wechat: baxiaoshi2020 Pay attention to the bulletin number to read more source code analysis articles More articles www.sreguide.com

Keywords: Programming kubelet network Docker REST

Added by akelavlk on Wed, 08 Apr 2020 07:58:24 +0300