Deep analysis of KubeSphere backend source code

In this article, we will try to debug and learn the KubeSphere back-end module architecture based on the ssh remote plug-in on vscode.

premise

  • Install vscode and ssh remote container plug-ins;
  • Install kubenertes container "operating system" and kubesphere > = V3.0 on the remote host 1.0 cloud "control panel";
  • Installation go > = 1.16;
  • Install ks components that need to be debug ged on KubeSphere, such as devops, kubeedge or whatever. If the components are activated by default, such as monitoring, they do not need to be deactivated.

Configure launch file

$ cat .vscode/launch.json
{
    // Use IntelliSense to understand related properties. 
    // Hover to view the description of an existing property.
    // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
    "version": "0.2.0",
    "configurations": [
        {
            "name": "ks-apiserver",
            "type": "go",
            "request": "launch",
            "mode": "auto",
            "program": "${workspaceFolder}/cmd/ks-apiserver/apiserver.go"
        }
        
    ]
}

KS apiserver debug dependency file

Configure kubesphere under the relative path CMD / Ks apiserver / yaml.

First, view the cm configuration file in the cluster:

$ kubectl -n kubesphere-system get cm kubesphere-config -oyaml

Because there are few kubeconfig related configurations in the above configmap, you need to copy the above yaml file to integrate the following.

Why add kubeconfig file with?

This is mainly because k8s when creating a client, such a file is required, and inclusterconfig is used in the container, so there is no need to add it.

If you are interested, see the following example of client go:

https://github.com/kubernetes...

https://github.com/kubernetes...

Therefore, the complete configuration startup file is as follows:

$ cat ./cmd/ks-apiserver/kubesphere.yaml
kubernetes:
  kubeconfig: "/root/.kube/config"
  master: https://192.168.88.6:6443
  $qps: 1e+06
  burst: 1000000
authentication:
  authenticateRateLimiterMaxTries: 10
  authenticateRateLimiterDuration: 10m0s
  loginHistoryRetentionPeriod: 168h
  maximumClockSkew: 10s
  multipleLogin: True
  kubectlImage: kubesphere/kubectl:v1.20.0
  jwtSecret: "Xtc8ZWUf9f3cJN89bglrTJhfUPMZR87d"
  oauthOptions:
    clients:
    - name: kubesphere
      secret: kubesphere
      redirectURIs:
      - '*'
network:
  ippoolType: none
monitoring:
  endpoint: http://prometheus-operated.kubesphere-monitoring-system.svc:9090
  enableGPUMonitoring: false
gpu:
  kinds:
  - resourceName: nvidia.com/gpu
    resourceType: GPU
    default: True
notification:
  endpoint: http://notification-manager-svc.kubesphere-monitoring-system.svc:19093
  
kubeedge:
  endpoint: http://edge-watcher.kubeedge.svc/api/

gateway:
  watchesPath: /var/helm-charts/watches.yaml
  namespace: kubesphere-controls-system

In addition to kubernetes, the key in the first layer indicates the ks components that have been activated according to or by default in our cluster. Now you can start debug ging through F5.

Before debug ging, you may ask why this configuration file should be placed in / CMD / Ks apiserver / kubesphere yaml?

Let's first explore the operation logic of a wave of KS apiserver.

Start KS apiserver

View CMD / Ks apiserver / APP / server Go logic:

// Load configuration from file
conf, err := apiserverconfig.TryLoadFromDisk()

The logic of TryLoadFromDisk is as follows:

viper.SetConfigName(defaultConfigurationName) // kubesphere
viper.AddConfigPath(defaultConfigurationPath) // /etc/kubesphere

// Load from current working directory, only used for debugging
viper.AddConfigPath(".")

// Load from Environment variables
viper.SetEnvPrefix("kubesphere")
viper.AutomaticEnv()
viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))

// After the above configuration, step-by-step debugging. The file path read in ReadInConfig is    
// v.configPaths: ["/etc/kubesphere","/root/go/src/kubesphere.io/kubesphere/cmd/ks-apiserver"]
if err := viper.ReadInConfig(); err != nil {
    if _, ok := err.(viper.ConfigFileNotFoundError); ok {
        return nil, err
    } else {
        return nil, fmt.Errorf("error parsing configuration file %s", err)
    }
}

conf := New() // Initialize the configuration of each component

// Deserialize to the conf struct from the read actual path configuration file
if err := viper.Unmarshal(conf); err != nil {
    return nil, err
}

return conf, n

The above comments explain the need to add kubesphere under the specified path Yaml starts the KS apiserver command line.

Let's go on and use Cobra Command package for command line integration:

func Run(s *options.ServerRunOptions, ctx context.Context) error {
    // NewAPIServer starts the apiserver instance through the given configuration and binds the client s of the instantiated components
    // In this step, you can register some custom gvks to k8s through AddToScheme, and finally expose them to APIs
    // With rest Config and scheme initialize runtimecache and runtimeClient 
    apiserver, err := s.NewAPIServer(ctx.Done())
    if err != nil {
        return err
    }
    
    // PrepareRun mainly uses restful go to integrate kapis API
    // In the previous step, the client of each component is bound. In this step, the client of each component can be called to access the server side of the corresponding component
    // Guess what the 4.0 backend pluggable architecture will look like?
    err = apiserver.PrepareRun(ctx.Done())
    if err != nil {
        return nil
    }
    
    // Run various informers to synchronize resources and start KS apiserver to listen for requests
    return apiserver.Run(ctx)
}

s.NewAPIServer(ctx.Done()) mainly creates an apiserver instance. In the step of creating an apiserver instance, you can also register ks customized GVK to k8s through scheme to expose the API for the API request path.

PrepareRun mainly uses the restful go framework to integrate the proxy request or integration service of each sub module, exposing the API function of kapis request path.

apiserver.Run(ctx) synchronizes resources and starts server listening.

The following is a separate description.

NewAPIServer

First, bind various client s and informers:

// Call the NewForConfig method of each component to integrate the clientset
kubernetesClient, err := k8s.NewKubernetesClient(s.KubernetesOptions)
if err != nil {
    return nil, err
}
apiServer.KubernetesClient = kubernetesClient
informerFactory := informers.NewInformerFactories(kubernetesClient.Kubernetes(), kubernetesClient.KubeSphere(),kubernetesClient.Istio(), kubernetesClient.Snapshot(), kubernetesClient.ApiExtensions(), kubernetesClient.Prometheus())
apiServer.InformerFactory = informerFactory
...
// According to kubesphere Yaml or kubesphere config configmap to bind the client of ks component
...

After initializing the binding, a server will be started to respond to the request, so an addr binding will be made here:

...
server := &http.Server{
    Addr: fmt.Sprintf(":%d", s.GenericServerRunOptions.InsecurePort),
}

if s.GenericServerRunOptions.SecurePort != 0 {
    certificate, err := tls.LoadX509KeyPair(s.GenericServerRunOptions.TlsCertFile, s.GenericServerRunOptions.TlsPrivateKey)
    if err != nil {
        return nil, err
    }

    server.TLSConfig = &tls.Config{
        Certificates: []tls.Certificate{certificate},
    }
    server.Addr = fmt.Sprintf(":%d", s.GenericServerRunOptions.SecurePort)
}

sch := scheme.Scheme
if err := apis.AddToScheme(sch); err != nil {
    klog.Fatalf("unable add APIs to scheme: %v", err)
}
...

Pay attention to this step, APIs Addtoscheme (SCH), register the GVK we defined in k8s.

Incidentally, GVK refers to Group,Version, Kind, for example:

{Group: "", Version: "v1", Resource: "namespaces"}
{Group: "", Version: "v1", Resource: "nodes"}
{Group: "", Version: "v1", Resource: "resourcequotas"}
...
{Group: "tenant.kubesphere.io", Version: "v1alpha1", Resource: "workspaces"}
{Group: "cluster.kubesphere.io", Version: "v1alpha1", Resource: "clusters"}
...

Scheme manages the relationship between GVK and type. A GVK can only correspond to one reflect Type, a reflect Type may correspond to multiple gvks; In addition, scheme also aggregates converter and cloner to convert different versions of structures and obtain copies of structure values; Limited space, interested children's shoes can be explored in depth.

Returning to the text, let's see how to inject scheme:

// AddToSchemes may be used to add all resources defined in the project to a Schemevar AddToSchemes runtime.SchemeBuilder
// AddToScheme adds all Resources to the Schemefunc 
AddToScheme(s *runtime.Scheme) error {    return AddToSchemes.AddToScheme(s)}

AddToSchemes is an alias of [] func(*Scheme) error. You only need to implement the corresponding init() method in the interface file under package apis to import the implemented version API, and then you can inject it into Scheme.

for instance:

$ cat pkg/apis/addtoscheme_dashboard_v1alpha2.go
package apis
import monitoringdashboardv1alpha2 "kubesphere.io/monitoring-dashboard/api/v1alpha2"
func init() {    
  AddToSchemes = append(AddToSchemes, monitoringdashboardv1alpha2.SchemeBuilder.AddToScheme)
}

That is, the integrated versioned resources of the plug-ins we develop must implement XXX SchemeBuilder. The addtoscheme function can be registered in the scheme, and finally exposed as apis to access API services.

So far, the client s corresponding to all sub modules have been bound to this apiserver.

PrepareRun

Next, let's discuss how PrepareRun registers kapis and binds handler s.

It is mainly implemented through the restful go framework.

The restful go framework uses the container to hold the web service with a specific GVR. A web server can bind multiple router s, allowing the container or web server to add custom interceptors, that is, calling the filter method.

func (s *APIServer) PrepareRun(stopCh <-chan struct{}) error {
  // The container hold s the webservice with a specific GVR
    s.container = restful.NewContainer()
    // Add Request log interceptor
    s.container.Filter(logRequestAndResponse)
    s.container.Router(restful.CurlyRouter{})
    
    // Bind a log handler when Recover occurs
    s.container.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
        logStackOnRecover(panicReason, httpWriter)
    })
    
    // Each API group builds a web service, and then binds the callback function according to the routing rules
  // Complete the binding through AddToContainer
    s.installKubeSphereAPIs()
    
    // Registered metrics indicator: ks_server_request_total,ks_server_request_duration_seconds
    // Bind metrics handler
    s.installMetricsAPI()
    
    // Increase monitoring count for valid requests
    s.container.Filter(monitorRequest)

    for _, ws := range s.container.RegisteredWebServices() {
        klog.V(2).Infof("%s", ws.RootPath())
    }
    
    s.Server.Handler = s.container
    
    // Add interceptors for each call chain for authentication and routing distribution
    s.buildHandlerChain(stopCh)

    return nil
}

The above mainly uses the restful go framework for s.server Handler binds a container and adds various interceptors.

In s.installkubesphere apis(), install GVR and bind kapis agent. The specific implementation is as follows:

// Call the AddToContainer method of each api Group to register kapi with the container:
urlruntime.Must(monitoringv1alpha3.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.MonitoringClient, s.MetricsClient, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.Config.OpenPitrixOptions))

// In detail, each component implements the AddToContainer method
// Add route s to webserver s with GroupVersion information, and bind different handler s to different routing paths
ws := runtime.NewWebService(GroupVersion)
// Bind callback function to sub route
ws.Route(ws.GET("/kubesphere").
    To(h.handleKubeSphereMetricsQuery).
    Doc("Get platform-level metric data.").
    Metadata(restfulspec.KeyOpenAPITags, []string{constants.KubeSphereMetricsTag}).
    Writes(model.Metrics{}).
    Returns(http.StatusOK, respOK, model.Metrics{})).
    Produces(restful.MIME_JSON)

We know that apis corresponds to the request of k8s, while kapis corresponds to the proxy request of sub components in ks, and the response is provided by ks apiserver itself or the forwarding target component server. How do ks apiserver distinguish these requests?

The answer is distributed through buildHandlerChain.

buildHandlerChain

As mentioned above, buildHandlerChain has built interceptors for various services, which are listed below in order.

handler = filters.WithKubeAPIServer(handler, s.KubernetesClient.Config(), &errorResponder{})

if s.Config.AuditingOptions.Enable {
    handler = filters.WithAuditing(handler,
        audit.NewAuditing(s.InformerFactory, s.Config.AuditingOptions, stopCh))
}

handler = filters.WithAuthorization(handler, authorizers)
if s.Config.MultiClusterOptions.Enable {
    clusterDispatcher := dispatch.NewClusterDispatch(s.InformerFactory.KubeSphereSharedInformerFactory().Cluster().V1alpha1().Clusters())
    handler = filters.WithMultipleClusterDispatcher(handler, clusterDispatcher)
}

handler = filters.WithAuthentication(handler, authn)
handler = filters.WithRequestInfo(handler, requestInfoResolver)

The WithRequestInfo filter defines the following logic:

info, err := resolver.NewRequestInfo(req)
---
func (r *RequestInfoFactory) NewRequestInfo(req *http.Request) (*RequestInfo, error) {
   ...
   defer func() {
        prefix := requestInfo.APIPrefix
        if prefix == "" {
            currentParts := splitPath(requestInfo.Path)
            //Proxy discovery API
            if len(currentParts) > 0 && len(currentParts) < 3 {
                prefix = currentParts[0]
            }
        }
    // You can distinguish between apis and kapis in the api routing path
        if kubernetesAPIPrefixes.Has(prefix) {
            requestInfo.IsKubernetesRequest = true
        }
    }()
    
    ...
    // URL forms: /clusters/{cluster}/*
    if currentParts[0] == "clusters" {
        if len(currentParts) > 1 {
            requestInfo.Cluster = currentParts[1]
        }
        if len(currentParts) > 2 {
            currentParts = currentParts[2:]
        }
    }
    ...
}

There are many codes, so I won't take screenshots one by one. The general meaning can be seen from the comments:

// NewRequestInfo returns the information from the http request.  If error is not nil, RequestInfo holds the information as best it is known before the failure
// It handles both resource and non-resource requests and fills in all the pertinent information for each.
// Valid Inputs:
//
// /apis/{api-group}/{version}/namespaces
// /api/{version}/namespaces
// /api/{version}/namespaces/{namespace}
// /api/{version}/namespaces/{namespace}/{resource}
// /api/{version}/namespaces/{namespace}/{resource}/{resourceName}
// /api/{version}/{resource}
// /api/{version}/{resource}/{resourceName}
//
// Special verbs without subresources:
// /api/{version}/proxy/{resource}/{resourceName}
// /api/{version}/proxy/namespaces/{namespace}/{resource}/{resourceName}
//
// Special verbs with subresources:
// /api/{version}/watch/{resource}
// /api/{version}/watch/namespaces/{namespace}/{resource}
//
// /kapis/{api-group}/{version}/workspaces/{workspace}/{resource}/{resourceName}
// /
// /kapis/{api-group}/{version}/namespaces/{namespace}/{resource}
// /kapis/{api-group}/{version}/namespaces/{namespace}/{resource}/{resourceName}
// With workspaces:
// /kapis/clusters/{cluster}/{api-group}/{version}/namespaces/{namespace}/{resource}
// /kapis/clusters/{cluster}/{api-group}/{version}/namespaces/{namespace}/{resource}/{resourceName}

Through the information defined by the route, you can distinguish the level of the request and the server to which the request is distributed.

We add breakpoints to the callback functions of each filter, and then do a small experiment to see what the interception order of the interceptor is.

Suppose that the service of the remote virtual machine has been started, the service port is 9090, and you set monitoring for the global role of anonymous kubesphere. IO the access permission of the resource type under this group is ClusterDashboard. Of course, you can also test directly with an account with access rights.

Next, let's send a kapis request to see how the link jumps:

curl -d '{"grafanaDashboardUrl":"https://grafana.com/api/dashboards/7362/revisions/5/download", "description":"this is a test dashboard."}' -H "Content-Type: application/json" localhost:9090/kapis/monitoring.kubesphere.io/v1alpha3/clusterdashboards/test1/template

The test results are as follows:

WithRequestInfo -> WithAuthentication -> WithAuthorization -> WithKubeAPIServer

Run

This method mainly does two things: one is to start informers to synchronize resources, and the other is to start ks apiserver.

func (s *APIServer) Run(ctx context.Context) (err error) {
  // Start the informer factory, including k8s and ks informers
    // Synchronize resources, including GVR of k8s and ks
    // Check whether GVR exists. If there is no error warning, it will be synchronized
    err = s.waitForResourceSync(ctx)
    if err != nil {
        return err
    }

    shutdownCtx, cancel := context.WithCancel(context.Background())
    defer cancel()

    go func() {
        <-ctx.Done()
        _ = s.Server.Shutdown(shutdownCtx)
    }()
    
    // Start server
    klog.V(0).Infof("Start listening on %s", s.Server.Addr)
    if s.Server.TLSConfig != nil {
        err = s.Server.ListenAndServeTLS("", "")
    } else {
        err = s.Server.ListenAndServe()
    }

    return err
}

At this point, after calling the Run method, KS apiserver starts.

Now let's make a brief summary:

  • Create a KS apiserver instance according to the configuration file, which calls three key methods: NewAPIServer, PrepareRun and Run methods;
  • NewAPIServer binds the client s of each module through the given configuration, registers the customized GVK to Scheme, and exposes the apis routing service;
  • PrepareRun registers and binds kapi routing and callback functions through the restful go framework to respond to itself or send the component server to query and return the merged data to the client;
  • Finally, call the Run method to synchronize resources and start ks-apiserver services.

GVK exploration practice

Obviously, we only need to focus on the AddToContainer method of each module.

iam.kubesphere.io

pkg/kapis/iam/v1alpha2/register.go

From the code comments, this module manages CRUD of users, clustermembers, globalroles, clusterroles, workspaceroles, roles, workspaces groups, workspace members, devops members and other account roles.

Now we can place a breakpoint in the handler to request these APIs.

$ curl "localhost:9090/kapis/iam.kubesphere.io/v1alpha2/users"
$ curl "localhost:9090/kapis/iam.kubesphere.io/v1alpha2/clustermembers"
$ curl "localhost:9090/kapis/iam.kubesphere.io/v1alpha2/users/admin/globalroles"
...

kubeedge.kubesphere.io

pkg/kapis/kubeedge/v1alpha1/register.go

The proxy forwarding request used in the code:

func AddToContainer(container *restful.Container, endpoint string) error {
    proxy, err := generic.NewGenericProxy(endpoint, GroupVersion.Group, GroupVersion.Version)
    if err != nil {
        return nil
    }

    return proxy.AddToContainer(container)
}

That is kapis / kubeedge kubesphere. Io's request is forwarded to http://edge-watcher.kubeedge.... , that is, the service under the namespace kubeedge, where the relevant interfaces are integrated.

For the integration of integrated edge computing platform, in addition to the rapid installation and integration of a mainstream edge framework, an adapter similar to edge shim can also be integrated, which needs to be considered from the following aspects:

  • Proxy endpoint: the current kubeedge uses proxy forwarding mode;
  • Health check interface: at least ensure that cloud components have been successfully deployed;
  • Support of observable components such as events, long-term logs and audits;
  • Other edge auxiliary functions, such as file or configuration distribution;

notification.kubesphere.io

pkg/kapis/notification/v2beta1/register.go

The api under this group mainly implements the global or tenant level config of notification and CRUD of receivers resources.

config resource

Some configurations used to configure the parameters related to the docking notification channel are divided into global and tenant level config resources;

Receiver resource

It is used to configure some configuration information of recipients, and distinguish global and tenant level recipients;

We select a callback function for analysis:

ws.Route(ws.GET("/{resources}").
        To(h.ListResource).
        Doc("list the notification configs or receivers").
        Metadata(KeyOpenAPITags, []string{constants.NotificationTag}).
        Param(ws.PathParameter("resources", "known values include configs, receivers, secrets")).
        Param(ws.QueryParameter(query.ParameterName, "name used for filtering").Required(false)).
        Param(ws.QueryParameter(query.ParameterLabelSelector, "label selector used for filtering").Required(false)).
        Param(ws.QueryParameter("type", "config or receiver type, known values include dingtalk, email, slack, webhook, wechat").Required(false)).
        Param(ws.QueryParameter(query.ParameterPage, "page").Required(false).DataFormat("page=%d").DefaultValue("page=1")).
        Param(ws.QueryParameter(query.ParameterLimit, "limit").Required(false)).
        Param(ws.QueryParameter(query.ParameterAscending, "sort parameters, e.g. ascending=false").Required(false).DefaultValue("ascending=false")).
        Param(ws.QueryParameter(query.ParameterOrderBy, "sort parameters, e.g. orderBy=createTime")).
        Returns(http.StatusOK, api.StatusOK, api.ListResult{Items: []interface{}{}}))
        
func (h *handler) ListResource(req *restful.Request, resp *restful.Response) {
    // The name of the tenant or user
    user := req.PathParameter("user")
    // Resource type, configs / receivers / secrets
    resource := req.PathParameter("resources")
    // Notification channel dingtalk/slack/email/webhook/wechat
    subresource := req.QueryParameter("type")
    q := query.ParseQueryParameter(req)

    if !h.operator.IsKnownResource(resource, subresource) {
        api.HandleBadRequest(resp, req, servererr.New("unknown resource type %s/%s", resource, subresource))
        return
    }

    objs, err := h.operator.List(user, resource, subresource, q)
    handleResponse(req, resp, objs, err)
}

Let's look at the logic of list object:

// List objects.
func (o *operator) List(user, resource, subresource string, q *query.Query) (*api.ListResult, error) {
    if len(q.LabelSelector) > 0 {
        q.LabelSelector = q.LabelSelector + ","
    }

    filter := ""
    // If the name of the tenant is not given, the global object is obtained
    if user == "" {
        if isConfig(o.GetObject(resource)) {
            // type=default is global to the config resource
            filter = "type=default"
        } else {
            // type=global is global to the receiver resource
            filter = "type=global"
        }
    } else {
    // Otherwise, bind the tenant name to the filter
        filter = "type=tenant,user=" + user
    }
    // Assemble filter label
    q.LabelSelector = q.LabelSelector + filter
    ...
    // Obtain the specified resources in the cluster or namespace through the filter tag
    res, err := o.resourceGetter.List(resource, ns, q)
    if err != nil {
        return nil, err
    }

    if subresource == "" || resource == Secret {
        return res, nil
    }

    results := &api.ListResult{}
    ...
}

In this way, the CRUD of tenant level notification alarm CR configuration is realized. These CRs are classified as follows:

  • config is divided into two levels: Global type = default and tenant type = tenant;
  • Receivers are divided into two levels: Global type = global and tenant type = tenant;

So how do config and receiver bind to each other and how do alarms send messages to tenants through channels?

https://github.com/kubesphere...

https://github.com/kubesphere...

Notification manager is abbreviated as nm. I'll briefly answer it out of context.

Functional aspects:

  • Globally configure the receiver to send all alerts to its defined recipient list through the configured channel. The receiver configured with tenant information can only send alerts under the current ns through the channel;
  • In the receiver, you can further filter alarm messages by configuring the alertSelector parameter;
  • Customize the sending message template by modifying the confimap named notification manager template;

Process from alarm to notification:

  • nm uses port 19093 and API path / api/v2/alerts to receive alarms sent from Alertmanager;
  • The callback function converts alerts into notification template data, and distinguishes alarm data according to namespace;
  • Traverse all receivers, and start a process under each ns to send messages. Here, each ns corresponds to multiple notification channels. Therefore, waitgroup is also used to arrange and complete tasks concurrently;

monitoring.kubesphere.io

pkg/kapis/monitoring/v1alpha3/register.go

The monitoring indicators are divided into platform level, node level, workspaces, namespaces, pods and other levels, which can not only obtain the total statistics, but also obtain all pods/containers and other monitoring indicators under nodes/namespaces/workspaces.

Let's look at the callback function and take handleNamedMetricsQuery as an example:

  • Traverse the legal metric indicators under the given indicator level, and filter the indicator names according to the of metricFilter in the request parameters;
  • Judge whether it is a range query or a real-time query, call the relevant methods in the monitoring package, and obtain the result return through the corresponding client request back end;

The code is as follows:

func (h handler) handleNamedMetricsQuery(resp *restful.Response, q queryOptions) {
    var res model.Metrics

    var metrics []string
    // q.namedMetrics is a group of complete indicator name arrays with promsql expr definitions, which are classified according to the monitoring indicator level
    // The level classification of monitoring indicators is based on monitoring Levelxxx is subdivided in the previous stack, i.e.: monitoring LevelPod
    for _, metric := range q.namedMetrics {
        if strings.HasPrefix(metric, model.MetricMeterPrefix) {
            // skip meter metric
            continue
        }
        // Filter according to the indicator name in the request parameter
        ok, _ := regexp.MatchString(q.metricFilter, metric)
        if ok {
            metrics = append(metrics, metric)
        }
    }
    if len(metrics) == 0 {
        resp.WriteAsJson(res)
        return
    }
    
    // Judge whether it is a range query or a real-time query, and continue to call related functions
    // promsql is mainly queried by prometheus client. The indicators of edge nodes are currently queried by metrics server
    if q.isRangeQuery() {
        res = h.mo.GetNamedMetricsOverTime(metrics, q.start, q.end, q.step, q.option)
    } else {
        res = h.mo.GetNamedMetrics(metrics, q.time, q.option)
        if q.shouldSort() {
            res = *res.Sort(q.target, q.order, q.identifier).Page(q.page, q.limit)
        }
    }
    resp.WriteAsJson(res)
}

Now, let's migrate the perspective to:

pkg/models/monitoring/monitoring.go:156

Taking GetNamedMetricsOverTime as an example, it is explained that the query results of prometheus and metrics server will be merged and returned:

func (mo monitoringOperator) GetNamedMetricsOverTime(metrics []string, start, end time.Time, step time.Duration, opt monitoring.QueryOption) Metrics {
    // Get the query results of prometheus client, mainly using sync Waitgroup queries concurrently, starts a goroutine for each indicator, and finally sums and returns the results
    ress := mo.prometheus.GetNamedMetricsOverTime(metrics, start, end, step, opt)
    // If metrics server is activated
    if mo.metricsserver != nil {

        //Merge edge node data
        edgeMetrics := make(map[string]monitoring.MetricData)

        for i, ressMetric := range ress {
            metricName := ressMetric.MetricName
            ressMetricValues := ressMetric.MetricData.MetricValues
            if len(ressMetricValues) == 0 {
                // this metric has no prometheus metrics data
                if len(edgeMetrics) == 0 {
                    // start to request monintoring metricsApi data
                    mr := mo.metricsserver.GetNamedMetricsOverTime(metrics, start, end, step, opt)
                    for _, mrMetric := range mr {
                        edgeMetrics[mrMetric.MetricName] = mrMetric.MetricData
                    }
                }
                if val, ok := edgeMetrics[metricName]; ok {
                    ress[i].MetricData.MetricValues = append(ress[i].MetricData.MetricValues, val.MetricValues...)
                }
            }
        }
    }

    return Metrics{Results: ress}
}

In addition, the monitoring package also defines the interface methods for monitoring and querying client s, which can be explored as needed:

  • GetMetric(expr string, time time.Time) Metric
  • GetMetricOverTime(expr string, start, end time.Time, step time.Duration) Metric
  • GetNamedMetrics(metrics []string, time time.Time, opt QueryOption) []Metric
  • GetNamedMetricsOverTime(metrics []string, start, end time.Time, step time.Duration, opt QueryOption) []Metric
  • GetMetadata(namespace string) []Metadata
  • GetMetricLabelSet(expr string, start, end time.Time) []map[string]string

tenant.kubesphere.io

Before we talk about api, by the way, multi tenant can be divided into soft multi tenancy and hard multi tenancy in terms of isolation security.

  • Soft isolation is more oriented to the multi rent demand within the enterprise;
  • Hard isolation is more for service providers providing services to the outside world, and more strict isolation is required as a security guarantee.

The more important part of this group is to query logs/audits/events for tenants:

Take the query log as an example:

func (h *tenantHandler) QueryLogs(req *restful.Request, resp *restful.Response) {
    // Query the tenant information carried in the context
    user, ok := request.UserFrom(req.Request.Context())
    if !ok {
        err := fmt.Errorf("cannot obtain user info")
        klog.Errorln(err)
        api.HandleForbidden(resp, req, err)
        return
    }
    // Analyze the query parameters, such as the ns/workload/pod/container query, time period, and whether it is a column query
    queryParam, err := loggingv1alpha2.ParseQueryParameter(req)
    if err != nil {
        klog.Errorln(err)
        api.HandleInternalError(resp, req, err)
        return
    }
    // Export data
    if queryParam.Operation == loggingv1alpha2.OperationExport {
        resp.Header().Set(restful.HEADER_ContentType, "text/plain")
        resp.Header().Set("Content-Disposition", "attachment")
        // Verify whether the account has permission
        // The admin account can export the logs of all ns, and the tenant can only export the logs of this ns
        // Assemble loggingclient for log export
        err := h.tenant.ExportLogs(user, queryParam, resp)
        if err != nil {
            klog.Errorln(err)
            api.HandleInternalError(resp, req, err)
            return
        }
    } else {
        // Verify whether the account has permission
        // The admin account can view the logs of all ns, and the tenant can only view the logs of this ns
        // Assemble loggingclient for log return
        result, err := h.tenant.QueryLogs(user, queryParam)
        if err != nil {
            klog.Errorln(err)
            api.HandleInternalError(resp, req, err)
            return
        }
        resp.WriteAsJson(result)
    }
}

Due to the limited space, only the above GVR has been debugged. If you are interested, you can have an in-depth understanding~

This article is composed of blog one article multi posting platform OpenWrite release!

Keywords: cloud computing

Added by cirma on Fri, 21 Jan 2022 06:46:56 +0200