Third party packages used
- Learn from prom and use kingpin to parse command and flag
- Use GitHub COM / oklog / run package to start a set of processes
http server, grpc server, dynamic discovery of downstream components implementing STORE API
- github. COM / go Kit / kit microservice framework
Best practices
- 1. prom is a stateful node. We take the basic prom and stroe as data sources
- 2. ruler is also a data source. One or more groups can be enabled for fast precomputation according to the location of the data center
- 3. query is stateless, and we can extend it infinitely to meet different needs. For example, scheduling requires fast response
query
framework
startup parameter
- thanos code is built using a standard directory structure
- The same binary file format is unified at the entrance. Each component starts with thanos. Each component will be registered in. You can know which component is running this time according to the parsing command
thanos query \ --log.level=debug \ --query.auto-downsampling \ --grpc-address=0.0.0.0:10901 \ --http-address=0.0.0.0:9090 \ --query.partial-response \ --query.replica-label=prometheus_replica \ --query.replica-label=rule_replica \ --store=dnssrv+_grpc._tcp.prometheus-headless.thanos.svc.cluster.local \ --store=dnssrv+_grpc._tcp.thanos-rule.thanos.svc.cluster.local \ --store=dnssrv+_grpc._tcp.thanos-store.thanos.svc.cluster.local
be careful:
1. Partial response must be added
The purpose of this flag is to follow the principle of data consistency
If one of the returned query data in 1, 2 and 3 is empty or timeout, do you want to keep the overall result this time
When it is turned on, it feels like three people become tigers
If it is not enabled, each upstream must have a result, which is really consistent
2,deduplicate
Without de duplication, you will see which data source the same data comes from, especially prom and ruler sources
To restart, the program background will score the data source of each response, and select the excellent source as the gRPC object this time
Code logic
- 1. main method. Create an app object. The app object contains the startup functions of all Thanos components, but only one function is taken from the map for startup. Which function is taken depends on the startup command.
func main() { app := extkingpin.NewApp(kingpin.New(filepath.Base(os.Args[0]), "A block storage based long-term storage for Prometheus").Version(version.Print("thanos"))) // Put the startup logic of all components into the setups list in the app object registerSidecar(app) registerStore(app) registerQuery(app) registerRule(app) registerCompact(app) registerTools(app) registerReceive(app) registerQueryFrontend(app) // According to the information on the command line, take out a component logic from the setups list of the app object cmd, setup := app.Parse() logger := logging.NewLogger(*logLevel, *logFormat, *debugName) var g run.Group var tracer opentracing.Tracer /* tracing Related codes */ reloadCh := make(chan struct{}, 1) // Start a specific component (one of sidecar, query, store and other components), and the bottom layer still executes g.Add(...) if err := setup(&g, logger, metrics, tracer, reloadCh, *logLevel == "debug"); err != nil { os.Exit(1) } // Listen for kill signals from the system { cancel := make(chan struct{}) g.Add(func() error { return interrupt(logger, cancel) }, func(error) { close(cancel) }) } // Listen to configure overloaded signals { cancel := make(chan struct{}) g.Add(func() error { return reload(logger, cancel, reloadCh) }, func(error) { close(cancel) }) } // Wait for all exits in the collaboration to be blocked // One of the coroutines will return, and other coroutines will also return if err := g.Run(); err != nil { level.Error(logger).Log("err", fmt.Sprintf("%+v", errors.Wrapf(err, "%s command failed", cmd))) os.Exit(1) } // When you get here, the whole program is over. level.Info(logger).Log("msg", "exiting") }
- 2. registerQuery function
func registerQuery(app *extkingpin.App) { cmd := app.Command(comp.String(), "query node exposing PromQL enabled Query API with data retrieved from multiple store nodes") /* Parsing command line parameters */ //The parameter input method of Setup() will be put into the setups list of app object //The core is the runQuery() method cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error { return runQuery( g, logger, reg, tracer, *requestLoggingDecision, *grpcBindAddr, time.Duration(*grpcGracePeriod), *grpcCert, *grpcKey, *grpcClientCA, /* Other codes */ ) ) }
- 3. runQuery function
//Use run Group object to start http server, grpc server and service discovery process. func runQuery( g *run.Group, //In fact, it comes from the main() method logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, requestLoggingDecision string, grpcBindAddr string, grpcGracePeriod time.Duration, grpcCert string, grpcKey string, grpcClientCA string, /* Other codes */ ) error { var ( // The type of the stores object StoreSet. It contains a set of store components //(downstream components that implement the Store API), this group of store components can change dynamically /* type StoreSet struct { //Other properties stores map[string]*storeRef } */ stores = query.NewStoreSet(...) // Proxy object, that is, the proxy of the downstream Store API component // The list of downstream Store API components is actually the input parameter stores of the constructor Get this method to get proxy = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout) rulesProxy = rules.NewProxy(logger, stores.GetRulesClients) /* queryableCreator Is a method for creating a query structure object; querier The attribute proxy of the structure is the proxy object, which contains a group of thanos store components that will change dynamically (the dynamic change is because some additional special processes are started to modify the slice dynamically); */ queryableCreator = query.NewQueryableCreator( logger, extprom.WrapRegistererWithPrefix("thanos_query_", reg), proxy, maxConcurrentSelects, queryTimeout, ) /* This section of code starts some collaborative processes, finds the changes of Store API components regularly and dynamically, and then updates the attribute of map[string]*storeRef in the stores object */ // Create an http server, register an http handler, and start the server { router := route.New() //New QueryAPI structure object api := v1.NewQueryAPI( logger, stores, engine, queryableCreator, rules.NewGRPCClientWithDedup(rulesProxy, queryReplicaLabels), enableAutodownsampling, enableQueryPartialResponse, enableRulePartialResponse, queryReplicaLabels, flagsMap, instantDefaultMaxSourceResolution, defaultMetadataTimeRange, gate.New( extprom.WrapRegistererWithPrefix("thanos_query_concurrent_", reg), maxConcurrentQueries, ), ) // Register http methods for router objects api.Register(router.WithPrefix("/api/v1"), tracer, logger, ins, logMiddleware) srv := httpserver.New(logger, reg, comp, httpProbe, httpserver.WithListen(httpBindAddr), httpserver.WithGracePeriod(httpGracePeriod), ) // The http server uses the router object srv.Handle("/", router) g.Add(func() error { statusProber.Healthy() // Start http server return srv.ListenAndServe() }, func(err error) { statusProber.NotReady(err) defer statusProber.NotHealthy(err) srv.Shutdown(err) }) } // Create gprc server, register grpc handler, and start the server { tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), grpcCert, grpcKey, grpcClientCA) if err != nil { return errors.Wrap(err, "setup gRPC server") } s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, grpcserver.WithServer(store.RegisterStoreServer(proxy)), // Register grpc handler grpcserver.WithServer(rules.RegisterRulesServer(rulesProxy)), // Register grpc handler grpcserver.WithListen(grpcBindAddr), grpcserver.WithGracePeriod(grpcGracePeriod), grpcserver.WithTLSConfig(tlsCfg), ) g.Add(func() error { statusProber.Ready() // Start grpc server return s.ListenAndServe() }, func(error) { statusProber.NotReady(err) s.Shutdown(err) }) } // At this point, both http server and grpc server are started. level.Info(logger).Log("msg", "starting query node") return nil ) }
- 4. QueryAPI structure and its method
// QueryAPI is an API used by Thanos Query. type QueryAPI struct { baseAPI *api.BaseAPI logger log.Logger gate gate.Gate // Construction method to create a query structure object queryableCreate query.QueryableCreator queryEngine *promql.Engine ruleGroups rules.UnaryClient /* Other codes */ replicaLabels []string storeSet *query.StoreSet } func (qapi *QueryAPI) Register(r *route.Router, tracer opentracing.Tracer, logger log.Logger, ins extpromhttp.InstrumentationMiddleware, logMiddleware *logging.HTTPServerMiddleware) { qapi.baseAPI.Register(r, tracer, logger, ins, logMiddleware) instr := api.GetInstr(tracer, logger, ins, logMiddleware) /* Other codes */ // Put qapi query,qapi.series, qapi.stores is registered to the input parameter r to complete the registration of http handler // Regardless of the / query interface and / series interface, a query object is created every time a request arrives, and the query object contains a set of Store API components r.Get("/query", instr("query", qapi.query)) r.Get("/series", instr("series", qapi.series)) r.Get("/stores", instr("stores", qapi.stores)) } //Return indicator data func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiError) { /* Other codes */ // Create a query object // The property proxy of the query object contains a set of thanos store components q, err := qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, math.MaxInt64, enablePartialResponse, true). Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end)) /* Other codes */ var ( metrics = []labels.Labels{} sets []storage.SeriesSet ) for _, mset := range matcherSets { // Call the Select() method of the query object to get the index sets = append(sets, q.Select(false, nil, mset...)) } set := storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge) for set.Next() { metrics = append(metrics, set.At().Labels()) } return metrics, set.Warnings(), nil }
- 5. querier structure and its method
type querier struct { ctx context.Context logger log.Logger cancel func() mint, maxt int64 replicaLabels map[string]struct{} storeDebugMatchers [][]*labels.Matcher // proxy contains a set of dynamic thanos store components proxy storepb.StoreServer deduplicate bool maxResolutionMillis int64 partialResponse bool skipChunks bool selectGate gate.Gate selectTimeout time.Duration } func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet { /* Other codes */ promise := make(chan storage.SeriesSet, 1) go func() { defer close(promise) var err error /* Other codes */ //Get indicator data set, err := q.selectFn(ctx, hints, ms...) if err != nil { // Send the error to the pipeline and exit this process promise <- storage.ErrSeriesSet(err) return } //Send indicator data to pipeline promise <- set }() // Returns the encapsulation of the indicator return &lazySeriesSet{ create: func() (storage.SeriesSet, bool) { /* Other codes */ // Reading indicators from pipes set, ok := <-promise return set, set.Next() } } } // Get the indicator and call the Series(...) of the property proxy method func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, error) { /* Other codes */ // The serieserver structure writes the Send() method, in which the data returned by gprc is stored in its serieset property resp := &seriesServer{ctx: ctx} // q. The implementation of proxy is the ProxyStore structure // q.proxy.Series() is the grpc method (streaming) // q.proxy. After the series() call is completed, the value of the serieset property of resp will be filled in if err := q.proxy.Series(&storepb.SeriesRequest{ MinTime: hints.Start, MaxTime: hints.End, Matchers: sms, /* Other codes */ }, resp); err != nil { return nil, errors.Wrap(err, "proxy Series()") } /* Other codes */ set := &promSeriesSet{ mint: q.mint, maxt: q.maxt, set: newStoreSeriesSet(resp.seriesSet), // Extract the serieset property of resp aggrs: aggrs, warns: warns, } // set is the indicator return newDedupSeriesSet(set, q.replicaLabels, len(aggrs) == 1 && aggrs[0] == storepb.Aggr_COUNTER), nil }
- 6. ProxyStore structure and its method
// ProxyStore implements the store API that proxies request to all given underlying stores. type ProxyStore struct { logger log.Logger // Return the downstream component that implements the Store API interface. This property will be used when querying indicators stores func() []Client component component.StoreAPI selectorLabels labels.Labels responseTimeout time.Duration metrics *proxyStoreMetrics } /* According to the request of the client, query the indicators from all downstream Store API components, merge and de duplicate them, and finally transfer the indicators to the input parameter srv This is a gprc streaming interface. */ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { /* Other codes */ g, gctx := errgroup.WithContext(srv.Context()) respSender, respCh := newCancelableRespChannel(gctx, 10) // Producer collaboration g.Go(func() error { /* This process will obtain indicators from the thanos store component at the back end and merge indicators. When this process is closed, the consumer process will also be closed. */ var ( seriesSet []storepb.SeriesSet storeDebugMsgs []string wg = &sync.WaitGroup{} ) defer func() { wg.Wait() //The close() method will cause the consumer to exit the process close(respCh) }() // Traverse the Store API component of the back end for _, st := range s.stores() { /* Other codes */ sc, err := st.Series(seriesCtx, r) seriesSet = append(seriesSet, startStreamSeriesSet(seriesCtx, s.logger, closeSeries, wg, sc, respSender, st.String(), !r.PartialResponseDisabled, s.responseTimeout, s.metrics.emptyStreamResponses)) /* Other codes */ // Obtain the merged indicators and send them to the respCh pipeline mergedSet := storepb.MergeSeriesSets(seriesSet...) for mergedSet.Next() { lset, chk := mergedSet.At() // respSender.send(...) It is actually sending indicators to the respCh pipeline respSender.send(storepb.NewSeriesResponse(&storepb.Series{Labels: labelpb.ZLabelsFromPromLabels(lset), Chunks: chk})) } return mergedSet.Err() }) // Consumer collaboration g.Go(func() error { // The response (which has been merged) is obtained by this process and sent to the method input parameter srv for resp := range respCh { if err := srv.Send(resp); err != nil { return status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error()) } } return nil }) // Wait for the end of the producer process and the consumer process if err := g.Wait(); err != nil { return err } return nil }