SuperEdge cloud side tunnel

author

Li Tengfei, Tencent container technology R & D Engineer, Tencent cloud TKE background R & D, SuperEdge core development member.

Du yanghao, senior engineer of Tencent cloud, is keen on open source, containers and Kubernetes. At present, he is mainly engaged in the research and development of image warehouse, Kubernetes cluster high availability & Backup and restore, and edge computing.

Introduction to SuperEdge

SuperEdge is a native edge container solution of Kubernetes. It extends Kubernetes' powerful container management capabilities to edge computing scenarios and provides solutions to common technical challenges in edge computing scenarios, such as cross region of single cluster node, unreliable cloud edge network, and edge node in NAT network. These capabilities enable applications to be easily deployed to edge computing nodes and run reliably. They can help you easily manage computing resources distributed everywhere in a Kubernetes cluster, including but not limited to edge cloud computing resources, private cloud resources and field devices, so as to build your edge PaaS platform. SuperEdge supports all Kubernetes resource types, API interfaces, usage methods and operation and maintenance tools without additional learning costs. It is also compatible with other cloud native projects, such as Promethues. Users can use it in combination with other cloud native projects. The project is jointly sponsored by Tencent, Intel, VMware, tiger tooth live broadcast, Cambrian, capital online and meituan.

Structure and principle of yunbian tunnel

In the edge scenario, it is often a one-way network, that is, only edge nodes can actively access the cloud. Cloud side tunnel is mainly used to proxy cloud access to edge node components to solve the problem that the cloud cannot directly access edge nodes.

The architecture diagram is as follows:

The implementation principle is:

  • The tunnel edge on the edge node actively connects to the tunnel cloud service, which transfers the request to the tunnel cloud pod according to the load balancing policy

  • After establishing gRPC connection between tunnel edge and tunnel cloud, tunnel cloud will write the mapping of its own podIp and nodeName of the node where tunnel edge is located into tunnel DNS. After the gRPC connection is disconnected, tunnel cloud will delete the mapping of related podIp and node name

The proxy forwarding process of the whole request is as follows:

  • When apiserver or other cloud applications access kubelet or other applications on the edge node, tunnel DNS forwards the request to the pod of tunnel cloud through DNS hijacking (resolving the node name in the host in the HTTP Request to the podip of tunnel cloud)

  • Tunnel cloud forwards the request information to the gRPC connection established with tunnel edge corresponding to the node name according to the node name

  • The tunnel edge requests the application on the edge node according to the received request information

Tunnel internal module data interaction

After introducing the configuration of Tunnel, the following describes the internal data flow of Tunnel:

The figure above indicates the data flow of HTTPS proxy. The data flow of TCP proxy is similar to that of HTTPS. The key steps are as follows:

  • HTTPS Server - > streamserver: HTTPS Server sends StreamMsg to Stream Server through channel, where the channel is based on StreamMsg The node field gets node from nodeContext Channel

  • Streamserver - > StreamClient: each cloud side tunnel will be assigned a node object. Send StreamClient to the Channel in the node to send data to StreamClient

  • Streamserver - > HTTPS Server: streamserver sends StreamMsg to HTTPS Server through Channel, where the Channel is based on StreamMsg Node obtains node from nodeContext through StreamMsg Match topic with conn.uid to obtain conn.Channel of HTTPS module

Both nodeContext and connContext manage connections, but the life cycles of nodeContext managing gRPC long connections and connContext managing upper layer forwarding request connections (TCP and HTTPS) are different, so they need to be managed separately

Connection management of Tunnel

The connections managed by Tunnel can be divided into the underlying connection (gRPC connection of cloud Tunnel) and the upper application connection (HTTPS connection and TCP connection). The management of connection exceptions can be divided into the following scenarios:

gRPC connection is normal and the upper layer connection is abnormal

Taking HTTPS connection as an example, the HTTPS Client of tunnel edge is disconnected from the edge node Server abnormally, and a StreamMsg (StreamMsg.Type=CLOSE) message will be sent. After receiving the StreamMsg message, tunnel cloud will actively close the connection between HTTPS Server and HTTPS Client.

gRPC connection exception

gRPC connection is abnormal, and the Stream module will be based on the node bound with gPRC connection Conncontext: send StreamMsg(StreamMsg.Type=CLOSE) to HTTPS and TCP modules. After receiving the message, HTTPS or TCP modules actively disconnect.

Stream (gRPC cloud side tunnel)

func (stream *Stream) Start(mode string) {
    context.GetContext().RegisterHandler(util.STREAM_HEART_BEAT, util.STREAM, streammsg.HeartbeatHandler)
    if mode == util.CLOUD {
        ...
        //Start gRPC server
        go connect.StartServer()
        ...
        //Synchronize the configuration file of the hosts plug-in of coredns
        go connect.SynCorefile()
    } else {
        //gRPC client startup
        go connect.StartSendClient()
        ...
    }
    ...
}

Tunnel cloud first calls RegisterHandler to register the heartbeat message processing function HeartbeatHandler SynCorefile to synchronize the configuration file of the hosts plug-in of tunnel cores, and checks hosts every other minute (considering the time when configmap synchronizes the pod mount file of tunnel cloud), as follows:

func SynCorefile() {
    for {
        ...
        err := coreDns.checkHosts()
        ...
        time.Sleep(60 * time.Second)
    }
}

checkHosts is responsible for the specific refresh operation of configmap:

func (dns *CoreDns) checkHosts() error {
    nodes, flag := parseHosts()
    if !flag {
        return nil
    }
    ...
    _, err = dns.ClientSet.CoreV1().ConfigMaps(dns.Namespace).Update(cctx.TODO(), cm, metav1.UpdateOptions{})
    ...
}

checkHosts first calls parseHosts to obtain the edge node name in the local hosts file and the corresponding tunnel cloud podIp mapping list. Compare the corresponding node name of podIp with the node name in memory. If there is any change, overwrite this content, write it into the configmap and update it:

In addition, the purpose of configmap local mount file introduced in tunnel cloud is to optimize managed mode Performance of simultaneous synchronization of tunnel coredns in multiple clusters

Tunnel edge first calls StartClient to establish gRPC connection with tunnel edge and returns gRPC ClientConn

func StartClient() (*grpc.ClientConn, ctx.Context, ctx.CancelFunc, error) {
    ...
    opts := []grpc.DialOption{grpc.WithKeepaliveParams(kacp),
    grpc.WithStreamInterceptor(ClientStreamInterceptor),
    grpc.WithTransportCredentials(creds)}
    conn, err := grpc.Dial(conf.TunnelConf.TunnlMode.EDGE.StreamEdge.Client.ServerName, opts...)
    ...
}

When calling gRPC When dials, gRPC Withstreamminterceptor (clientstreamminterceptor) dialoption, passing clientstreamminterceptor to gRPC as a streamclientinceptor Wait for the client Send function to connect, and then change to the RPC state. streamClient.TunnelStreaming calls StreamClientInterceptor to return wrappedClientStream object

func ClientStreamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
    ...
    opts = append(opts, grpc.PerRPCCredentials(oauth.NewOauthAccess(&oauth2.Token{AccessToken: clientToken})))
    ...
    return newClientWrappedStream(s), nil
}

Clientstreamminterceptor will construct the edge node name and token as oauth2 Token. Use accesstoken to pass authentication and build wrappedClientStream

stream.Send will call wrappedclientstream concurrently Sendmsg and wrappedclientstream Recvmsg is used for tunnel edge sending and receiving, and blocking waiting

Note: tunnel edge registers node information with tunnel cloud when creating gRPC Stream, not grpc When connclient

The whole process is shown in the figure below:

Accordingly, when initializing tunnel cloud, grpc Streamminterceptor (serverstreamminterceptor) is built into gRPC ServerOption, and serverstreamminterceptor is passed to grpc as streamserverinceptor Server:

func StartServer() {
    ...
    opts := []grpc.ServerOption{grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp), grpc.StreamInterceptor(ServerStreamInterceptor), grpc.Creds(creds)}
    s := grpc.NewServer(opts...)
    proto.RegisterStreamServer(s, &stream.Server{})
    ...
}

When the cloud gRPC service receives a tunnel edge request (establishing a Stream stream Stream), it will call serverstreamminterceptor, and serverstreamminterceptor will gRPC metadata Resolve the edge node name and token corresponding to this gRPC connection, verify the token, and then build wrappedServerStream according to the node name as the processing object communicating with the edge node (each edge node corresponds to a processing object), and the handler function will call stream Tunnelstreaming and pass wrappedServerStream to it (wrappedServerStream implements proto.Stream_TunnelStreamingServer interface)

func ServerStreamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
    md, ok := metadata.FromIncomingContext(ss.Context())
    ...
    tk := strings.TrimPrefix(md["authorization"][0], "Bearer ")
    auth, err := token.ParseToken(tk)
    ...
    if auth.Token != token.GetTokenFromCache(auth.NodeName) {
        klog.Errorf("invalid token node = %s", auth.NodeName)
        return ErrInvalidToken
    }
    err = handler(srv, newServerWrappedStream(ss, auth.NodeName))
    if err != nil {
        ctx.GetContext().RemoveNode(auth.NodeName)
        klog.Errorf("node disconnected node = %s err = %v", auth.NodeName, err)
    }
    return err
}

When the TunnelStreaming method exits, it will execute the ServerStreamInterceptor to remove the logical CTX of the node GetContext(). RemoveNode

TunnelStreaming will call wrappedserverstream concurrently Sendmsg and wrappedserverstream Recvmsg is used for tunnel cloud sending and receiving respectively, and blocking and waiting:

func (s *Server) TunnelStreaming(stream proto.Stream_TunnelStreamingServer) error {
    errChan := make(chan error, 2)
    go func(sendStream proto.Stream_TunnelStreamingServer, sendChan chan error) {
        sendErr := sendStream.SendMsg(nil)
        ...
        sendChan <- sendErr
    }(stream, errChan)
    go func(recvStream proto.Stream_TunnelStreamingServer, recvChan chan error) {
        recvErr := stream.RecvMsg(nil)
        ...
        recvChan <- recvErr
    }(stream, errChan)
    e := <-errChan
    return e
}

SendMsg will accept StreamMsg from the corresponding edge node node of wrappedServerStream and call serverstream SendMsg sends this message to tunnel edge

func (w *wrappedServerStream) SendMsg(m interface{}) error {    if m != nil {        return w.ServerStream.SendMsg(m)    }    node := ctx.GetContext().AddNode(w.node)    ...    for {        msg := <-node.NodeRecv()        ...        err := w.ServerStream.SendMsg(msg)        ...    }}

RecvMsg will continue to accept StreamMsg from tunnel edge and call StreamMsg The corresponding processing function

Summary:

  • The Stream module is responsible for establishing gRPC connection and communication (cloud side tunnel)
  • The tunnel edge on the edge node actively connects to the cloud tunnel cloud service, which transfers the request to the tunnel cloud pod according to the load balancing policy
  • After the tunnel edge establishes gRPC connection with the tunnel cloud, the tunnel cloud will write the mapping of its own podIp and the nodeName of the node where the tunnel edge is located into the tunnel cores. After the gRPC connection is disconnected, tunnel cloud will delete the mapping of related podIp and node name
  • Tunnel edge will use the edge node name and token to build gRPC connection, while tunnel cloud will analyze the edge node corresponding to gRPC connection through authentication information, and build a wrappedServerStream for each edge node for processing (the same tunnel cloud can handle multiple tunnel edge connections)
  • Tunnel cloud synchronizes the edge node name and the mapping of tunnel cloud podip to the configuration file of the hosts plug-in of tunnel coredns corresponding to configmap every one minute (considering the time when configmap synchronizes the pod mount file of tunnel cloud); In addition, configmap local mount file is introduced to optimize the managed mode Performance of simultaneous synchronization of tunnel coredns in multiple clusters
  • The tunnel edge will send a heartbeat StreamMsg representing the normal heartbeat of the node to the tunnel cloud every minute, and the tunnel cloud will respond after receiving the heartbeat (the heartbeat is to detect whether the gRPC Stream is normal)
  • StreamMsg includes different types of messages such as heartbeat, TCP proxy and HTTPS request; At the same time, tunnel cloud passes context Node distinguishes gRPC connection tunnel with different edge nodes

HTTPS proxy

The HTTPS module is responsible for establishing the HTTPS proxy of the cloud side and forwarding the HTTPS request of the cloud component (e.g. Kube apiserver) to the side service (e.g. kubelet)

func (https *Https) Start(mode string) {    context.GetContext().RegisterHandler(util.CONNECTING, util.HTTPS, httpsmsg.ConnectingHandler)    context.GetContext().RegisterHandler(util.CONNECTED, util.HTTPS, httpsmsg.ConnectedAndTransmission)    context.GetContext().RegisterHandler(util.CLOSED, util.HTTPS, httpsmsg.ConnectedAndTransmission)    context.GetContext().RegisterHandler(util.TRANSNMISSION, util.HTTPS, httpsmsg.ConnectedAndTransmission)    if mode == util.CLOUD {        go httpsmng.StartServer()    }}

The Start function first registers the processing function of StreamMsg. The CLOSED processing function mainly processes the message of closing the connection and starts the HTTPS Server. When the cloud component sends an HTTPS request to the tunnel cloud, the serverHandler will Start with the request The host field resolves the node name. If you first establish a TLS connection and then write the HTTP request object in the connection, the request at this time is If the host can not be set, it needs to be set from request TLS. Servername resolves the node name. HTTPS Server reads request Body and request The header constructs the HttpsMsg structure and encapsulates it into StreamMsg after sequencing. The StreamMsg is sent through Send2Node and put into StreamMsg The Channel of the node corresponding to the node is sent to the tunnel edge by the Stream module

func (serverHandler *ServerHandler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {    var nodeName string    nodeinfo := strings.Split(request.Host, ":")    if context.GetContext().NodeIsExist(nodeinfo[0]) {        nodeName = nodeinfo[0]    } else {        nodeName = request.TLS.ServerName    }    ...    node.Send2Node(StreamMsg)}

The tunnel edge receives the StreamMsg and calls the ConnectingHandler function for processing:

func ConnectingHandler(msg *proto.StreamMsg) error {    go httpsmng.Request(msg)    return nil}func Request(msg *proto.StreamMsg) {    httpConn, err := getHttpConn(msg)    ...    rawResponse := bytes.NewBuffer(make([]byte, 0, util.MaxResponseSize))    rawResponse.Reset()    respReader := bufio.NewReader(io.TeeReader(httpConn, rawResponse))    resp, err := http.ReadResponse(respReader, nil)    ...    node.BindNode(msg.Topic)    ...    if resp.StatusCode != http.StatusSwitchingProtocols {        handleClientHttp(resp, rawResponse, httpConn, msg, node, conn)    } else {        handleClientSwitchingProtocols(httpConn, rawResponse, msg, node, conn)    }}

ConnectingHandler will call Request to process the StreamMsg. Reqeust first establishes a TLS connection with the edge node Server through getHttpConn. Parse the data returned from the TLS connection, obtain the HTTP Response with the Status Code of 200, send the content of the Response to the tunnel cloud with the Status Code of 101, and send the binary data of the Response read from the TLS connection to the tunnel cloud, where StreamMsg Type is CONNECTED.

After the tunnel cloud receives the StreamMsg, it will call ConnectedAndTransmission for processing:

func ConnectedAndTransmission(msg *proto.StreamMsg) error {    conn := context.GetContext().GetConn(msg.Topic)    ...    conn.Send2Conn(msg)    return nil}

Via MSG Topic (conn uid) obtains the conn and plugs the message into the corresponding pipe of the conn through Send2Conn

After receiving the CONNECTED message from the cloud, the cloud HTTPS Server considers that the HTTPS agent has been successfully established. And continue to execute handleClientHttp or handleClientSwitchingProtocols for data transmission. Here, only the data transmission process under handleClientHttp non protocol promotion is analyzed. The processing logic of HTTPS Client is as follows:

func handleClientHttp(resp *http.Response, rawResponse *bytes.Buffer, httpConn net.Conn, msg *proto.StreamMsg, node context.Node, conn context.Conn) {    ...    go func(read chan *proto.StreamMsg, response *http.Response, buf *bytes.Buffer, stopRead chan struct{}) {        rrunning := true        for rrunning {            bbody := make([]byte, util.MaxResponseSize)            n, err := response.Body.Read(bbody)            respMsg := &proto.StreamMsg{                Node:     msg.Node,                Category: msg.Category,                Type:     util.CONNECTED,                Topic:    msg.Topic,                Data:     bbody[:n],            }            ...            read <- respMsg        }        ...    }(readCh, resp, rawResponse, stop)    running := true    for running {        select {        case cloudMsg := <-conn.ConnRecv():            ...        case respMsg := <-readCh:            ...            node.Send2Node(respMsg)            ...        }    }    ...}

Here, handleClientHttp will always try to read the packets from the side components, and construct the TRANSNMISSION type StreamMsg to send to tunnel-cloud. After receiving StreamMsg, tunnel-cloud will call the ConnectedAndTransmission function to put StreamMsg in StreamMsg.. In conn.Channel of HTTPS module corresponding to type

func handleServerHttp(rmsg *HttpsMsg, writer http.ResponseWriter, request *http.Request, node context.Node, conn context.Conn) {    for k, v := range rmsg.Header {        writer.Header().Add(k, v)    }    flusher, ok := writer.(http.Flusher)    if ok {        running := true        for running {            select {            case <-request.Context().Done():                ...            case msg := <-conn.ConnRecv():                ...                _, err := writer.Write(msg.Data)                flusher.Flush()                ...            }        }    ...}

handleServerHttp will MSG after receiving StreamMsg Data, that is, the data packet of the edge component, is sent to the cloud component. The whole data flow is unidirectional and transmitted from the edge to the cloud, as shown below:

For requests like kubectl exec, the data flow is bidirectional. At this time, the side component (kubelet) will return a packet with StatusCode of 101 to mark the protocol promotion. Then the tunnel cloud and tunnel edge will switch to handleServerSwitchingProtocols and handleClientSwitchingProtocols respectively to read and write the HTTPS underlying connection, Complete the two-way transmission of data flow.

The architecture is as follows:

The HTTPS module is summarized as follows:

Summary

  • HTTPS: responsible for establishing cloud side HTTPS proxy (eg: cloud Kube apiserver < - > side kubelet) and transmitting data
  • The function is similar to that of TCP proxy, except that tunnel cloud will read the edge node name carried in the HTTPS request of the cloud component and try to establish an HTTPS proxy with the edge node; Instead of randomly selecting a cloud side tunnel for forwarding like TCP proxy
  • When the cloud apiserver or other cloud applications access the kubelet or other applications on the edge node, tunnel DNS forwards the request to the pod of tunnel cloud through DNS hijacking (resolving the node name in the Request host to the podip of tunnel cloud). Tunnel cloud encapsulates the request information into StreamMsg and sends it to tunnel edge through the cloud tunnel corresponding to the node name, The tunnel edge establishes a TLS connection with the edge Server through the received Addr field of StreamMsg and the certificate in the configuration file, and writes the request information in StreamMsg to the TLS connection. The tunnel edge reads the returned data from the edge Server from the TLS connection, encapsulates it into StreamMsg and sends it to the tunnel cloud. The tunnel cloud writes the received data into the connection established between the cloud component and the tunnel cloud.

TCP

The TCP module is responsible for establishing a TCP proxy tunnel between cloud control cluster and edge independent cluster in multi cluster management:

func (tcp *TcpProxy) Start(mode string) {    context.GetContext().RegisterHandler(util.TCP_BACKEND, tcp.Name(), tcpmsg.BackendHandler)    context.GetContext().RegisterHandler(util.TCP_FRONTEND, tcp.Name(), tcpmsg.FrontendHandler)    context.GetContext().RegisterHandler(util.CLOSED, tcp.Name(), tcpmsg.ControlHandler)    if mode == util.CLOUD {    ...        for front, backend := range Tcp.Addr {            go func(front, backend string) {                ln, err := net.Listen("tcp", front)                ...                for {                    rawConn, err := ln.Accept()                    ....                    fp := tcpmng.NewTcpConn(uuid, backend, node)                    fp.Conn = rawConn                    fp.Type = util.TCP_FRONTEND                    go fp.Write()                    go fp.Read()                }            }(front, backend)        }    }

The Start function first registers the processing function of streammsg. The CLOSED processing function mainly processes the message of closing the connection, and then starts the TCP Server in the cloud. After receiving the request from the cloud component, the TCP Server will encapsulate the request as streammsg and send it to StreamServer, which will send it to the tunnel edge, where streammsg Type=FrontendHandler,StreamMsg.Node randomly selects one from the nodes of the established cloud side tunnel. After the tunnel edge receives the streammsg, it will call the frontendhandler function for processing

func FrontendHandler(msg *proto.StreamMsg) error {    c := context.GetContext().GetConn(msg.Topic)    if c != nil {        c.Send2Conn(msg)        return nil    }    tp := tcpmng.NewTcpConn(msg.Topic, msg.Addr, msg.Node)    tp.Type = util.TCP_BACKEND    tp.C.Send2Conn(msg)    tcpAddr, err := net.ResolveTCPAddr("tcp", tp.Addr)    if err != nil {    ...    conn, err := net.DialTCP("tcp", nil, tcpAddr)    ...    tp.Conn = conn    go tp.Read()    go tp.Write()    return nil}

FrontendHandler first uses streammsg Addr establishes a TCP connection with Edge Server, starts the collaborative process, asynchronously connects Read and Write to TCP, creates a conn object (conn.uid=StreamMsg.Topic) and eammsg Data is written to the TCP connection. After receiving the returned data from the Edge Server, the tunnel edge encapsulates it as StreamMsg(StreamMsg.Topic=BackendHandler) and sends it to the tunnel cloud

The whole process is shown in the figure:

Summary

  • TCP: it is responsible for establishing TCP agents in the cloud and edge in multi cluster management
  • The cloud component accesses the edge Server through the TCP module. When receiving the request, the cloud TCP Server will encapsulate the request into StreamMsg and send it to the tunnel edge through the cloud side tunnel (randomly select one of the connected tunnels, so it is recommended to use TCP agent in the scenario of only one tunnel edge), *The tunnel edge establishes a TCP connection with the edge Server by receiving the Addr field of StreamMag, and writes the request to the TCP connection. The tunnel edge reads the return message of the edge Server from the TCP connection and sends it to the tunnel cloud through the cloud edge tunnel. After receiving the message, the tunnel cloud writes it to the connection established between the cloud component and the TCP Server

expectation

  • Support more network protocols (HTTPS and TCP are supported)
  • Support cloud access to edge node business pod server
  • When multiple edge nodes join the cluster at the same time, the multi copy tunnel cloud pod does not lock when updating the configmap corresponding to the hosts plug-in configuration file of tunnel coredns. Although the probability is low, there is still the possibility of write conflict in theory

Refs

  • kubernetes-reading-notes

    [Tencent cloud native] cloud says new products, Yunyan new technology, cloud tours, new cloud and cloud reward information, scanning code concern about the same official account number, and get more dry cargo in time!!

Added by gray_bale on Wed, 09 Feb 2022 10:19:14 +0200