kube-proxy source code parsing
ipvs has higher performance and stability than iptables mode. This paper focuses on the source code analysis of iptables mode. If you want to understand the principle of iptables mode, you can refer to its implementation. There is no difference in architecture.
The main function of kube-proxy is to listen for service and endpoint events, and then delegate the proxy strategy to the machine. Underlying calls docker/libnetwork And libnetwork finally calls netlink With netns to achieve the creation of ipvs and other actions
<!--more-->
Initialization configuration
Code entry: cmd/kube-proxy/app/server.go Run() function
Initialize the configuration of proxyServer through command line parameters
proxyServer, err := NewProxyServer(o)
type ProxyServer struct { // k8s client Client clientset.Interface EventClient v1core.EventsGetter // ipvs related interface IptInterface utiliptables.Interface IpvsInterface utilipvs.Interface IpsetInterface utilipset.Interface // Processors for Synchronization Proxier proxy.ProxyProvider // Proxy mode, ipvs iptables userspace kernelspace(windows) ProxyMode string // Configure synchronization cycle ConfigSyncPeriod time.Duration // service and endpoint event handlers ServiceEventHandler config.ServiceHandler EndpointsEventHandler config.EndpointsHandler }
Proxier is the main entry, abstracting two functions:
type ProxyProvider interface { // Sync immediately synchronizes the ProxyProvider's current state to iptables. Sync() // Regular implementation SyncLoop() }
The interface of ipvs is very important:
type Interface interface { // Delete all rules Flush() error // Add a virtual server AddVirtualServer(*VirtualServer) error UpdateVirtualServer(*VirtualServer) error DeleteVirtualServer(*VirtualServer) error GetVirtualServer(*VirtualServer) (*VirtualServer, error) GetVirtualServers() ([]*VirtualServer, error) // Add a realserver to the virtual server, such as Virtual Server, which is a clusterip realServer, which is a pod (or a custom endpoint) AddRealServer(*VirtualServer, *RealServer) error GetRealServers(*VirtualServer) ([]*RealServer, error) DeleteRealServer(*VirtualServer, *RealServer) error }
Let's look at how ipvs_linux implements the above interface in more detail below.
virtual server and realserver, most importantly ip:port, and then some proxy models such as session Affinity, etc.
type VirtualServer struct { Address net.IP Protocol string Port uint16 Scheduler string Flags ServiceFlags Timeout uint32 } type RealServer struct { Address net.IP Port uint16 Weight int }
Create apiserver client
client, eventClient, err := createClients(config.ClientConnection, master)
Create Proxier This is a proxier focused solely on the ipvs model
else if proxyMode == proxyModeIPVS { glog.V(0).Info("Using ipvs Proxier.") proxierIPVS, err := ipvs.NewProxier( iptInterface, ipvsInterface, ipsetInterface, utilsysctl.New(), execer, config.IPVS.SyncPeriod.Duration, config.IPVS.MinSyncPeriod.Duration, config.IPTables.MasqueradeAll, int(*config.IPTables.MasqueradeBit), config.ClusterCIDR, hostname, getNodeIP(client, hostname), recorder, healthzServer, config.IPVS.Scheduler, ) ... proxier = proxierIPVS serviceEventHandler = proxierIPVS endpointsEventHandler = proxierIPVS
This Proxier has the following methods:
+OnEndpointsAdd(endpoints *api.Endpoints) +OnEndpointsDelete(endpoints *api.Endpoints) +OnEndpointsSynced() +OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) +OnServiceAdd(service *api.Service) +OnServiceDelete(service *api.Service) +OnServiceSynced() +OnServiceUpdate(oldService, service *api.Service) +Sync() +SyncLoop()
So this Proxier of ipvs implements most of the interfaces we need
To sum up:
+-----------> endpointHandler | +-----------> serviceHandler | ^ ||+ -------- - > Sync periodic synchronization, etc. | | | ProxyServer - -- - > Proxier - - - - > Service Event Callback | | |+ -------- - > endpoint event callback || Trigger +-----> ipvs interface ipvs handler <-----+
Start proxyServer
- Check if you have a clean up parameter, and if so, clear all rules out
- OOM adjuster does not seem to be implemented, ignore
- resouceContainer was not implemented, ignored
- Start metrics server, which is important, such as when we want to monitor, we can pass in this parameter, including promethus metrics. metrics-bind-address parameter
- Start informer, start listening for events, and start collaboration processing, respectively.
12.34 We need not pay too much attention to it. Look at 5 carefully.
informerFactory := informers.NewSharedInformerFactory(s.Client, s.ConfigSyncPeriod) serviceConfig := config.NewServiceConfig(informerFactory.Core().InternalVersion().Services(), s.ConfigSyncPeriod) // Register service handler and start serviceConfig.RegisterEventHandler(s.ServiceEventHandler) // Here we just assign ServiceEventHandler to the informer callback go serviceConfig.Run(wait.NeverStop) endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().InternalVersion().Endpoints(), s.ConfigSyncPeriod) // Register endpoint endpointsConfig.RegisterEventHandler(s.EndpointsEventHandler) go endpointsConfig.Run(wait.NeverStop) go informerFactory.Start(wait.NeverStop)
Service Config. Run and endpoint Config. Run only assign values to callback functions, so registered handler gives informer, which calls back when informer listens for an event:
for i := range c.eventHandlers { glog.V(3).Infof("Calling handler.OnServiceSynced()") c.eventHandlers[i].OnServiceSynced() }
So the question arises. What is the handler registered? Look back at the above
serviceEventHandler = proxierIPVS endpointsEventHandler = proxierIPVS
So it's all this proxier IPVS.
handler callback function, informer callback these functions, so when we develop our own implementation of this interface registration can be:
type ServiceHandler interface { // OnServiceAdd is called whenever creation of new service object // is observed. OnServiceAdd(service *api.Service) // OnServiceUpdate is called whenever modification of an existing // service object is observed. OnServiceUpdate(oldService, service *api.Service) // OnServiceDelete is called whenever deletion of an existing service // object is observed. OnServiceDelete(service *api.Service) // OnServiceSynced is called once all the initial even handlers were // called and the state is fully propagated to local cache. OnServiceSynced() }
Start listening
go informerFactory.Start(wait.NeverStop)
After execution here, we create delete service endpoint and other actions will be monitored, and then call back, review the above figure, and ultimately by Proxier to achieve, so we focus on Proxier can be followed.
s.Proxier.SyncLoop()
Then start SyncLoop, which is described below.
Proxier Implementation
When we create a service, the OnServiceAdd method is called. Here we record the previous state and the current state, and send a signal to syncRunner to process:
func (proxier *Proxier) OnServiceAdd(service *api.Service) { namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} if proxier.serviceChanges.update(&namespacedName, nil, service) && proxier.isInitialized() { proxier.syncRunner.Run() } }
Record the service information, you can see that nothing has been done, that is, the service is stored in the map, if not changed directly delete the map information without any processing:
change, exists := scm.items[*namespacedName] if !exists { change = &serviceChange{} // Old service information change.previous = serviceToServiceMap(previous) scm.items[*namespacedName] = change } // service information currently being monitored change.current = serviceToServiceMap(current) //If the same, delete it directly if reflect.DeepEqual(change.previous, change.current) { delete(scm.items, *namespacedName) }
A signal is sent in proxier.syncRunner.Run().
select { case bfr.run <- struct{}{}: default: }
Here the signal is processed.
s.Proxier.SyncLoop() func (proxier *Proxier) SyncLoop() { // Update healthz timestamp at beginning in case Sync() never succeeds. if proxier.healthzServer != nil { proxier.healthzServer.UpdateTimestamp() } proxier.syncRunner.Loop(wait.NeverStop) }
Signals received in runner are executed, and those not received are executed regularly:
func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) { glog.V(3).Infof("%s Loop running", bfr.name) bfr.timer.Reset(bfr.maxInterval) for { select { case <-stop: bfr.stop() glog.V(3).Infof("%s Loop stopping", bfr.name) return case <-bfr.timer.C(): // Regular implementation bfr.tryRun() case <-bfr.run: bfr.tryRun() // Receive Event Signal Execution } } }
The most important idea we need in this bfr runner is a callback function, which is checked in tryRun to see if the callback meets the scheduled conditions:
type BoundedFrequencyRunner struct { name string // the name of this instance minInterval time.Duration // the min time between runs, modulo bursts maxInterval time.Duration // the max time between runs run chan struct{} // try an async run mu sync.Mutex // guards runs of fn and all mutations fn func() // function to run, this callback lastRun time.Time // time of last run timer timer // timer for deferred runs limiter rateLimiter // rate limiter for on-demand runs } // The function proxier.syncProxyRules passed in proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
This is a rubbing function of about 600 lines, and it's also where the main logic is handled.
syncProxyRules
- Set up some iptables rules, such as mark and comment
- Make sure there is a network card on the machine, and ipvs need to bind the address to it
- Identify that there is an ipset, which is an extension of iptables and can set iptables rules for a batch of addresses It's stinky, long, repetitive, and you can't read it anymore. Let's go over the details for yourself.
- What we are most concerned about is how to deal with Virtual Server's
serv := &utilipvs.VirtualServer{ Address: net.ParseIP(ingress.IP), Port: uint16(svcInfo.port), Protocol: string(svcInfo.protocol), Scheduler: proxier.ipvsScheduler, } if err := proxier.syncService(svcNameString, serv, false); err == nil { if err := proxier.syncEndpoint(svcName, svcInfo.onlyNodeLocalEndpoints, serv); err != nil { } }
Look at the implementation. If not, create it. If it already exists, update it. Bind cluster ip of service to network card:
func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer, bindAddr bool) error { appliedVirtualServer, _ := proxier.ipvs.GetVirtualServer(vs) if appliedVirtualServer == nil || !appliedVirtualServer.Equal(vs) { if appliedVirtualServer == nil { if err := proxier.ipvs.AddVirtualServer(vs); err != nil { return err } } else { if err := proxier.ipvs.UpdateVirtualServer(appliedVirtualServer); err != nil { return err } } } // bind service address to dummy interface even if service not changed, // in case that service IP was removed by other processes if bindAddr { _, err := proxier.netlinkHandle.EnsureAddressBind(vs.Address.String(), DefaultDummyDevice) if err != nil { return err } } return nil }
Create service implementation
Now we can look at the implementation of AddVirtualServer for ipvs, mainly by using socket to communicate with the kernel process. The runner architecture of pkg/util/ipvs/ipvs_linux.go implements these methods, using the docker/libnetwork/ipvs library:
// runner implements Interface. type runner struct { exec utilexec.Interface ipvsHandle *ipvs.Handle } // New returns a new Interface which will call ipvs APIs. func New(exec utilexec.Interface) Interface { ihandle, err := ipvs.New("") // github.com/docker/libnetwork/ipvs if err != nil { glog.Errorf("IPVS interface can't be initialized, error: %v", err) return nil } return &runner{ exec: exec, ipvsHandle: ihandle, } }
New created a special socket, which is no different from our ordinary socket programming. The key is the parameter syscall.AF_NETLINK, which represents the communication with the kernel process:
sock, err := nl.GetNetlinkSocketAt(n, netns.None(), syscall.NETLINK_GENERIC) func getNetlinkSocket(protocol int) (*NetlinkSocket, error) { fd, err := syscall.Socket(syscall.AF_NETLINK, syscall.SOCK_RAW|syscall.SOCK_CLOEXEC, protocol) if err != nil { return nil, err } s := &NetlinkSocket{ fd: int32(fd), } s.lsa.Family = syscall.AF_NETLINK if err := syscall.Bind(fd, &s.lsa); err != nil { syscall.Close(fd) return nil, err } return s, nil }
Create a service, convert it into docker service format, and call it directly:
// AddVirtualServer is part of Interface. func (runner *runner) AddVirtualServer(vs *VirtualServer) error { eSvc, err := toBackendService(vs) if err != nil { return err } return runner.ipvsHandle.NewService(eSvc) }
Then the service information is packaged and written into the socket.
func (i *Handle) doCmdwithResponse(s *Service, d *Destination, cmd uint8) ([][]byte, error) { req := newIPVSRequest(cmd) req.Seq = atomic.AddUint32(&i.seq, 1) if s == nil { req.Flags |= syscall.NLM_F_DUMP //Flag to dump all messages req.AddData(nl.NewRtAttr(ipvsCmdAttrService, nil)) //Add a dummy attribute } else { req.AddData(fillService(s)) } // Squeeze the service into the request if d == nil { if cmd == ipvsCmdGetDest { req.Flags |= syscall.NLM_F_DUMP } } else { req.AddData(fillDestinaton(d)) } // Send service information to the kernel process res, err := execute(i.sock, req, 0) if err != nil { return [][]byte{}, err } return res, nil }
Construct Request
func newIPVSRequest(cmd uint8) *nl.NetlinkRequest { return newGenlRequest(ipvsFamily, cmd) }
The ipvs protocol cluster is passed in when the request is constructed
Then construct a message header that communicates with the kernel
func NewNetlinkRequest(proto, flags int) *NetlinkRequest { return &NetlinkRequest{ NlMsghdr: syscall.NlMsghdr{ Len: uint32(syscall.SizeofNlMsghdr), Type: uint16(proto), Flags: syscall.NLM_F_REQUEST | uint16(flags), Seq: atomic.AddUint32(&nextSeqNr, 1), }, } }
Adding Data to the message, which is an array, requires two methods:
type NetlinkRequestData interface { Len() int // length Serialize() []byte // Serialization, kernel communication also requires a certain data format, service information also needs to be implemented. }
For example, the header is so serialized that it takes a long time to understand when it looks stunned. Take it down and see: ([unsafe.Sizeof(hdr)]byte) A [] byte type whose length is the size of the structure (unsafe.Pointer(hdr)) Converts the structure to the byte pointer type Add one and take its value. Return with [:] to byte
func (hdr *genlMsgHdr) Serialize() []byte { return (*(*[unsafe.Sizeof(*hdr)]byte)(unsafe.Pointer(hdr)))[:] }
Send service information to the kernel
A very common socket sends and receives data
func execute(s *nl.NetlinkSocket, req *nl.NetlinkRequest, resType uint16) ([][]byte, error) { var ( err error ) if err := s.Send(req); err != nil { return nil, err } pid, err := s.GetPid() if err != nil { return nil, err } var res [][]byte done: for { msgs, err := s.Receive() if err != nil { return nil, err } for _, m := range msgs { if m.Header.Seq != req.Seq { continue } if m.Header.Pid != pid { return nil, fmt.Errorf("Wrong pid %d, expected %d", m.Header.Pid, pid) } if m.Header.Type == syscall.NLMSG_DONE { break done } if m.Header.Type == syscall.NLMSG_ERROR { error := int32(native.Uint32(m.Data[0:4])) if error == 0 { break done } return nil, syscall.Errno(-error) } if resType != 0 && m.Header.Type != resType { continue } res = append(res, m.Data) if m.Header.Flags&syscall.NLM_F_MULTI == 0 { break done } } } return res, nil }
Service data packaging The core idea is that the kernel only recognizes standard data in a certain format. We can pack and send service information to the kernel according to its standard. As for how to pack it, I will not elaborate on it.
func fillService(s *Service) nl.NetlinkRequestData { cmdAttr := nl.NewRtAttr(ipvsCmdAttrService, nil) nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrAddressFamily, nl.Uint16Attr(s.AddressFamily)) if s.FWMark != 0 { nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrFWMark, nl.Uint32Attr(s.FWMark)) } else { nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrProtocol, nl.Uint16Attr(s.Protocol)) nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrAddress, rawIPData(s.Address)) // Port needs to be in network byte order. portBuf := new(bytes.Buffer) binary.Write(portBuf, binary.BigEndian, s.Port) nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrPort, portBuf.Bytes()) } nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrSchedName, nl.ZeroTerminated(s.SchedName)) if s.PEName != "" { nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrPEName, nl.ZeroTerminated(s.PEName)) } f := &ipvsFlags{ flags: s.Flags, mask: 0xFFFFFFFF, } nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrFlags, f.Serialize()) nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrTimeout, nl.Uint32Attr(s.Timeout)) nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrNetmask, nl.Uint32Attr(s.Netmask)) return cmdAttr }
summary
Service generally speaking, the code is relatively simple, but feel that some places to achieve a bit around, not simple and direct enough. Overall, it monitors apiserver events, then compares processing, and regularly executes synchronization strategies.
Scanning Focus on sealyun Exploring additive QQ groups: 98488045