thanos source code analysis

Third party packages used

  • Learn from prom and use kingpin to parse command and flag
  • Use GitHub COM / oklog / run package to start a set of processes

http server, grpc server, dynamic discovery of downstream components implementing STORE API

  • github. COM / go Kit / kit microservice framework

Best practices

  • 1. prom is a stateful node. We take the basic prom and stroe as data sources
  • 2. ruler is also a data source. One or more groups can be enabled for fast precomputation according to the location of the data center
  • 3. query is stateless, and we can extend it infinitely to meet different needs. For example, scheduling requires fast response

query

framework

startup parameter

  1. thanos code is built using a standard directory structure
  2. The same binary file format is unified at the entrance. Each component starts with thanos. Each component will be registered in. You can know which component is running this time according to the parsing command
thanos query \
--log.level=debug \
--query.auto-downsampling \
--grpc-address=0.0.0.0:10901 \
--http-address=0.0.0.0:9090 \
--query.partial-response \
--query.replica-label=prometheus_replica \
--query.replica-label=rule_replica \
--store=dnssrv+_grpc._tcp.prometheus-headless.thanos.svc.cluster.local \
--store=dnssrv+_grpc._tcp.thanos-rule.thanos.svc.cluster.local \
--store=dnssrv+_grpc._tcp.thanos-store.thanos.svc.cluster.local 

be careful:
1. Partial response must be added

The purpose of this flag is to follow the principle of data consistency
If one of the returned query data in 1, 2 and 3 is empty or timeout, do you want to keep the overall result this time
When it is turned on, it feels like three people become tigers
If it is not enabled, each upstream must have a result, which is really consistent

2,deduplicate

Without de duplication, you will see which data source the same data comes from, especially prom and ruler sources
To restart, the program background will score the data source of each response, and select the excellent source as the gRPC object this time

Code logic

  • 1. main method. Create an app object. The app object contains the startup functions of all Thanos components, but only one function is taken from the map for startup. Which function is taken depends on the startup command.
func main() {
	app := extkingpin.NewApp(kingpin.New(filepath.Base(os.Args[0]), "A block storage based long-term storage for Prometheus").Version(version.Print("thanos")))

	// Put the startup logic of all components into the setups list in the app object
	registerSidecar(app)
	registerStore(app)
	registerQuery(app)
	registerRule(app)
	registerCompact(app)
	registerTools(app)
	registerReceive(app)
	registerQueryFrontend(app)

	// According to the information on the command line, take out a component logic from the setups list of the app object
	cmd, setup := app.Parse()
	logger := logging.NewLogger(*logLevel, *logFormat, *debugName)

	var g run.Group
	var tracer opentracing.Tracer
	
	/*
		tracing Related codes
	*/
	
	
	reloadCh := make(chan struct{}, 1)

	// Start a specific component (one of sidecar, query, store and other components), and the bottom layer still executes g.Add(...)
	if err := setup(&g, logger, metrics, tracer, reloadCh, *logLevel == "debug"); err != nil {		
		os.Exit(1)
	}

	// Listen for kill signals from the system
	{
		cancel := make(chan struct{})
		g.Add(func() error {
			return interrupt(logger, cancel)
		}, func(error) {
			close(cancel)
		})
	}

	// Listen to configure overloaded signals
	{
		cancel := make(chan struct{})
		g.Add(func() error {
			return reload(logger, cancel, reloadCh)
		}, func(error) {
			close(cancel)
		})
	}

	// Wait for all exits in the collaboration to be blocked
	// One of the coroutines will return, and other coroutines will also return
	if err := g.Run(); err != nil {
		level.Error(logger).Log("err", fmt.Sprintf("%+v", errors.Wrapf(err, "%s command failed", cmd)))
		os.Exit(1)
	}
	
	// When you get here, the whole program is over.
	level.Info(logger).Log("msg", "exiting")
}
  • 2. registerQuery function
func registerQuery(app *extkingpin.App) {
	cmd := app.Command(comp.String(), "query node exposing PromQL enabled Query API with data retrieved from multiple store nodes")	
	/*
		Parsing command line parameters
	*/

	//The parameter input method of Setup() will be put into the setups list of app object
	//The core is the runQuery() method
	cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
		
		return runQuery(
			g,
			logger,
			reg,
			tracer,
			*requestLoggingDecision,
			*grpcBindAddr,
			time.Duration(*grpcGracePeriod),
			*grpcCert,
			*grpcKey,
			*grpcClientCA,
			/*
				Other codes
			*/
		)
	)

}

  • 3. runQuery function
//Use run Group object to start http server, grpc server and service discovery process.
func runQuery(
	g *run.Group,		//In fact, it comes from the main() method
	logger log.Logger,
	reg *prometheus.Registry,
	tracer opentracing.Tracer,
	requestLoggingDecision string,
	grpcBindAddr string,
	grpcGracePeriod time.Duration,
	grpcCert string,
	grpcKey string,
	grpcClientCA string,
	/*
		Other codes
	*/
) error {
	
	var (
		// The type of the stores object StoreSet. It contains a set of store components
		//(downstream components that implement the Store API), this group of store components can change dynamically
		/*
		type StoreSet struct {
			//Other properties
			stores       map[string]*storeRef
		}		
		*/		
		stores = query.NewStoreSet(...)
		
		// Proxy object, that is, the proxy of the downstream Store API component
		// The list of downstream Store API components is actually the input parameter stores of the constructor Get this method to get
		proxy            = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout)
		rulesProxy       = rules.NewProxy(logger, stores.GetRulesClients)
				
		/*
			queryableCreator Is a method for creating a query structure object;			
			querier The attribute proxy of the structure is the proxy object, which contains a group of thanos store components that will change dynamically (the dynamic change is because some additional special processes are started to modify the slice dynamically);
		*/	
		queryableCreator = query.NewQueryableCreator(
			logger,
			extprom.WrapRegistererWithPrefix("thanos_query_", reg),
			proxy,
			maxConcurrentSelects,
			queryTimeout,
		)
							
		/*
			This section of code starts some collaborative processes, finds the changes of Store API components regularly and dynamically, and then updates the attribute of map[string]*storeRef in the stores object	
		*/
				
		
		// Create an http server, register an http handler, and start the server
		{

			router := route.New()
			//New QueryAPI structure object
			api := v1.NewQueryAPI(
						logger,
						stores,
						engine,
						queryableCreator,
						rules.NewGRPCClientWithDedup(rulesProxy, queryReplicaLabels),
						enableAutodownsampling,
						enableQueryPartialResponse,
						enableRulePartialResponse,
						queryReplicaLabels,
						flagsMap,
						instantDefaultMaxSourceResolution,
						defaultMetadataTimeRange,
						gate.New(
							extprom.WrapRegistererWithPrefix("thanos_query_concurrent_", reg),
							maxConcurrentQueries,
						),
					)
					
			// Register http methods for router objects	
			api.Register(router.WithPrefix("/api/v1"), tracer, logger, ins, logMiddleware)
			
			srv := httpserver.New(logger, reg, comp, httpProbe,
					httpserver.WithListen(httpBindAddr),
					httpserver.WithGracePeriod(httpGracePeriod),
			)
			// The http server uses the router object
			srv.Handle("/", router)
			
			g.Add(func() error {
				statusProber.Healthy()
				// Start http server
				return srv.ListenAndServe()		
			}, func(err error) {
				statusProber.NotReady(err)
				defer statusProber.NotHealthy(err)
				srv.Shutdown(err)
		})
		}
			
		// Create gprc server, register grpc handler, and start the server
		{
			tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), grpcCert, grpcKey, grpcClientCA)
			if err != nil {
				return errors.Wrap(err, "setup gRPC server")
			}

			s := grpcserver.New(logger, reg, tracer, comp, grpcProbe,				
				grpcserver.WithServer(store.RegisterStoreServer(proxy)),		// Register grpc handler
				grpcserver.WithServer(rules.RegisterRulesServer(rulesProxy)),   // Register grpc handler
				grpcserver.WithListen(grpcBindAddr),
				grpcserver.WithGracePeriod(grpcGracePeriod),
				grpcserver.WithTLSConfig(tlsCfg),
			)

			g.Add(func() error {
				statusProber.Ready()
				// Start grpc server
				return s.ListenAndServe()		
			}, func(error) {
				statusProber.NotReady(err)
				s.Shutdown(err)
			})
		}

		// At this point, both http server and grpc server are started.
		level.Info(logger).Log("msg", "starting query node")
		return nil			
	)
	
}

  • 4. QueryAPI structure and its method
// QueryAPI is an API used by Thanos Query.
type QueryAPI struct {
	baseAPI         *api.BaseAPI
	logger          log.Logger
	gate            gate.Gate
	
	// Construction method to create a query structure object
	queryableCreate query.QueryableCreator
	
	queryEngine     *promql.Engine
	ruleGroups      rules.UnaryClient
	/*
	Other codes
	*/
	replicaLabels []string
	storeSet      *query.StoreSet
}


func (qapi *QueryAPI) Register(r *route.Router, tracer opentracing.Tracer, logger log.Logger, ins extpromhttp.InstrumentationMiddleware, logMiddleware *logging.HTTPServerMiddleware) {
	qapi.baseAPI.Register(r, tracer, logger, ins, logMiddleware)
	instr := api.GetInstr(tracer, logger, ins, logMiddleware)
	/*
		Other codes
	*/
	
	// Put qapi query,qapi.series, qapi.stores is registered to the input parameter r to complete the registration of http handler
	// Regardless of the / query interface and / series interface, a query object is created every time a request arrives, and the query object contains a set of Store API components
	r.Get("/query", instr("query", qapi.query))
	r.Get("/series", instr("series", qapi.series))
	r.Get("/stores", instr("stores", qapi.stores))
}

//Return indicator data
func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiError) {
	/*
	Other codes
	*/
	
	// Create a query object
	// The property proxy of the query object contains a set of thanos store components
	q, err := qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, math.MaxInt64, enablePartialResponse, true).
		Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
		
	/*
	Other codes
	*/
	
	var (
		metrics = []labels.Labels{}
		sets    []storage.SeriesSet
	)
	for _, mset := range matcherSets {
		// Call the Select() method of the query object to get the index
		sets = append(sets, q.Select(false, nil, mset...))
	}
	set := storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge)
	for set.Next() {
		metrics = append(metrics, set.At().Labels())
	}	
	return metrics, set.Warnings(), nil
}

  • 5. querier structure and its method
type querier struct {
	ctx                 context.Context
	logger              log.Logger
	cancel              func()
	mint, maxt          int64
	replicaLabels       map[string]struct{}
	storeDebugMatchers  [][]*labels.Matcher
	
	// proxy contains a set of dynamic thanos store components
	proxy               storepb.StoreServer
	
	deduplicate         bool	
	maxResolutionMillis int64
	partialResponse     bool
	skipChunks          bool
	selectGate          gate.Gate
	selectTimeout       time.Duration
}


func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet {
	/*
		Other codes
	*/	
	promise := make(chan storage.SeriesSet, 1)
	go func() {
		defer close(promise)
		var err error
		/*
			Other codes
		*/	
		//Get indicator data
		set, err := q.selectFn(ctx, hints, ms...)
		if err != nil {
			// Send the error to the pipeline and exit this process
			promise <- storage.ErrSeriesSet(err)
			return
		}
		//Send indicator data to pipeline
		promise <- set
	}()

	// Returns the encapsulation of the indicator
	return &lazySeriesSet{
		create: func() (storage.SeriesSet, bool) {
			/*
				Other codes
			*/	
			// Reading indicators from pipes
			set, ok := <-promise	
			return set, set.Next()
		}
	}
}


// Get the indicator and call the Series(...) of the property proxy method
func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, error) {

	/*
		Other codes
	*/	
	
	// The serieserver structure writes the Send() method, in which the data returned by gprc is stored in its serieset property
	resp := &seriesServer{ctx: ctx}	
	
	// q. The implementation of proxy is the ProxyStore structure
	// q.proxy.Series() is the grpc method (streaming)
	// q.proxy. After the series() call is completed, the value of the serieset property of resp will be filled in	
	if err := q.proxy.Series(&storepb.SeriesRequest{
		MinTime:                 hints.Start,
		MaxTime:                 hints.End,
		Matchers:                sms,
		/*
			Other codes
		*/
	}, resp); err != nil {
		return nil, errors.Wrap(err, "proxy Series()")
	}
	
	/*
		Other codes
	*/
	
	
	set := &promSeriesSet{
		mint:  q.mint,
		maxt:  q.maxt,
		set:   newStoreSeriesSet(resp.seriesSet),  // Extract the serieset property of resp
		aggrs: aggrs,
		warns: warns,
	}
	// set is the indicator
	return newDedupSeriesSet(set, q.replicaLabels, len(aggrs) == 1 && aggrs[0] == storepb.Aggr_COUNTER), nil
}


  • 6. ProxyStore structure and its method
// ProxyStore implements the store API that proxies request to all given underlying stores.
type ProxyStore struct {
	logger         log.Logger
	
	// Return the downstream component that implements the Store API interface. This property will be used when querying indicators
	stores         func() []Client
	
	component      component.StoreAPI
	selectorLabels labels.Labels

	responseTimeout time.Duration
	metrics         *proxyStoreMetrics
}

/*
According to the request of the client, query the indicators from all downstream Store API components, merge and de duplicate them, and finally transfer the indicators to the input parameter srv
 This is a gprc streaming interface.
*/
func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
	
	/*
		Other codes
	*/
	g, gctx := errgroup.WithContext(srv.Context())
	respSender, respCh := newCancelableRespChannel(gctx, 10)


	// Producer collaboration
	g.Go(func() error {		
		/*
			This process will obtain indicators from the thanos store component at the back end and merge indicators.
			When this process is closed, the consumer process will also be closed.
		*/
		
		var (
			seriesSet      []storepb.SeriesSet
			storeDebugMsgs []string		
			wg = &sync.WaitGroup{}
		)

		defer func() {
			wg.Wait()
			//The close() method will cause the consumer to exit the process
			close(respCh)
		}()

		// Traverse the Store API component of the back end
		for _, st := range s.stores() {
		
			/*
				Other codes
			*/	
			
			sc, err := st.Series(seriesCtx, r)		
			seriesSet = append(seriesSet, startStreamSeriesSet(seriesCtx, s.logger, closeSeries,
				wg, sc, respSender, st.String(), !r.PartialResponseDisabled, s.responseTimeout, s.metrics.emptyStreamResponses))
		
			/*
				Other codes
			*/
			
		// Obtain the merged indicators and send them to the respCh pipeline
		mergedSet := storepb.MergeSeriesSets(seriesSet...)
		for mergedSet.Next() {		
			lset, chk := mergedSet.At()
			// respSender.send(...) It is actually sending indicators to the respCh pipeline
			respSender.send(storepb.NewSeriesResponse(&storepb.Series{Labels: labelpb.ZLabelsFromPromLabels(lset), Chunks: chk}))
		}
		return mergedSet.Err()
	})
	
	// Consumer collaboration
	g.Go(func() error {				
		// The response (which has been merged) is obtained by this process and sent to the method input parameter srv
		for resp := range respCh {
			if err := srv.Send(resp); err != nil {
				return status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error())
			}
		}
		return nil
	})
	
	// Wait for the end of the producer process and the consumer process
	if err := g.Wait(); err != nil {			
		return err
	}
	return nil
}

Keywords: monitor

Added by MerlinJR on Sun, 23 Jan 2022 01:51:31 +0200