HTTP traffic artifact Goreplay core source code details

Abstract: Goreplay, formerly known as Gor, is a simple TCP/HTTP traffic recording and playback tool, which is mainly written in Go language.

This article is shared from Huawei cloud community< Analysis of goreplay core source code of traffic playback tool >, author: zuozewei.

1, Foreword

Goreplay, formerly known as Gor, is a simple TCP/HTTP traffic recording and playback tool, mainly written in Go language.

Github address: https://github.com/buger/goreplay

2, Engineering structure

Here with the latest v1 3 version as an example, and v1 The codes of 0 are quite different.

 ~/GoProjects/gor_org/goreplay   release-1.3 ±✚  tree -L 1

.
├── COMM-LICENSE
├── Dockerfile
├── Dockerfile.dev
├── ELASTICSEARCH.md
├── LICENSE.txt
├── Makefile
├── Procfile
├── README.md
├── byteutils
├── capture
├── circle.yml
├── docs
├── elasticsearch.go
├── emitter.go 
├── emitter_test.go
├── examples
├── go.mod
├── go.sum
├── gor.go  
├── gor_stat.go
├── homebrew
├── http_modifier.go
├── http_modifier_settings.go
├── http_modifier_settings_test.go
├── http_modifier_test.go
├── http_prettifier.go
├── http_prettifier_test.go
├── input_dummy.go
├── input_file.go
├── input_file_test.go
├── input_http.go
├── input_http_test.go
├── input_kafka.go
├── input_kafka_test.go
├── input_raw.go
├── input_raw_test.go
├── input_tcp.go
├── input_tcp_test.go
├── kafka.go
├── limiter.go
├── limiter_test.go
├── middleware
├── middleware.go
├── middleware_test.go
├── mkdocs.yml
├── output_binary.go 
├── output_dummy.go
├── output_file.go
├── output_file_test.go
├── output_http.go
├── output_http_test.go
├── output_kafka.go
├── output_kafka_test.go
├── output_null.go
├── output_s3.go
├── output_tcp.go
├── output_tcp_test.go
├── plugins.go     
├── plugins_test.go
├── pro.go
├── proto
├── protocol.go
├── ring
├── s3
├── s3_reader.go
├── s3_test.go
├── settings.go  
├── settings_test.go
├── sidenav.css
├── simpletime
├── site
├── size
├── snapcraft.yaml
├── tcp
├── tcp_client.go
├── test_input.go
├── test_output.go
├── vendor
└── version.go

The project directory is relatively flat, mainly based on plugin go,settings.go,emitter.go several main files, others are divided into input_xxx ,output_xxx is an input-output plug-in adapted to specific protocols, and the program entry is GOR Go's main function.

Description of main documents:

  • settings.go: resolve the startup command parameters and decide which plug-ins to register with plugin Inputs,Plugin.Outputs are in two lists.
  • plugin.go: mainly the management of all input and output plug-ins.
  • emitter.go: program core event processing, which is implemented for plugin Read the inputs input stream, judge whether middlewear processing and http modification are required, and then asynchronously copy the traffic to all plugins Outputs and all plugins at the same time There is response data in outputs, which is copied to all outputs.
  • input_xxx.go: it is mainly an input plug-in, which implements tcp/http/raw/kafka and other protocols and Io Reader interface, and finally register to plugin. Com according to the configuration In the input queue.
  • output_xxx.go: it is mainly an output plug-in, which implements tcp/http/raw/kafka and other protocols and Io The writer interface is finally registered to plugin. Com according to the configuration In the outputs queue.

3, Main core processes

Goreplay has only two concepts: input and output. It is goreplay's abstraction of data flow, which is collectively referred to as plugin.

gor. The main function in go mainly does the following things:

1. Parse command line parameters:

// Parse parses the command-line flags from os.Args[1:]. Must be called
// after all flags are defined and before flags are accessed by the program.
func Parse() {
	// Ignore errors; CommandLine is set for ExitOnError.
	CommandLine.Parse(os.Args[1:])
}

2. Initialize the global Settings variable.

func checkSettings() {
	if Settings.OutputFileConfig.SizeLimit < 1 {
		Settings.OutputFileConfig.SizeLimit.Set("32mb")
	}
	if Settings.OutputFileConfig.OutputFileMaxSize < 1 {
		Settings.OutputFileConfig.OutputFileMaxSize.Set("1tb")
	}
	if Settings.CopyBufferSize < 1 {
		Settings.CopyBufferSize.Set("5mb")
	}
}

3. The command line parameters are defined in settings In the init function of go, it will be executed before the main function.

func init() {
	flag.Usage = usage
	flag.StringVar(&Settings.Pprof, "http-pprof", "", "Enable profiling. Starts  http server on specified port, exposing special /debug/pprof endpoint. Example: `:8181`")
	flag.IntVar(&Settings.Verbose, "verbose", 0, "set the level of verbosity, if greater than zero then it will turn on debug output")
	flag.BoolVar(&Settings.Stats, "stats", false, "Turn on queue stats output")

	if DEMO == "" {
		flag.DurationVar(&Settings.ExitAfter, "exit-after", 0, "exit after specified duration")
	} else {
		Settings.ExitAfter = 5 * time.Minute
	}

	flag.BoolVar(&Settings.SplitOutput, "split-output", false, "By default each output gets same traffic. If set to `true` it splits traffic equally among all outputs.")
	flag.BoolVar(&Settings.RecognizeTCPSessions, "recognize-tcp-sessions", false, "[PRO] If turned on http output will create separate worker for each TCP session. Splitting output will session based as well.")
 
    ......

	// default values, using for tests
	Settings.OutputFileConfig.SizeLimit = 33554432
	Settings.OutputFileConfig.OutputFileMaxSize = 1099511627776
	Settings.CopyBufferSize = 5242880

}

4, according to the command line parameter initialization plug-in, call the InitPlugins function in the main function.

// NewPlugins specify and initialize all available plugins
func NewPlugins() *InOutPlugins {
	plugins := new(InOutPlugins)

	for _, options := range Settings.InputDummy {
		plugins.registerPlugin(NewDummyInput, options)
	}
 
    ......

	return plugins
}

5. Call the Start function to Start emitter. Each input plug-in starts a coroutine to read input and write output. ​

/ Start initialize loop for sending data from inputs to outputs
func (e *Emitter) Start(plugins *InOutPlugins, middlewareCmd string) {
	if Settings.CopyBufferSize < 1 {
		Settings.CopyBufferSize = 5 << 20
	}
	e.plugins = plugins

	if middlewareCmd != "" {
		middleware := NewMiddleware(middlewareCmd)

		for _, in := range plugins.Inputs {
			middleware.ReadFrom(in)
		}

		e.plugins.Inputs = append(e.plugins.Inputs, middleware)
		e.plugins.All = append(e.plugins.All, middleware)
		e.Add(1)
		go func() {
			defer e.Done()
			if err := CopyMulty(middleware, plugins.Outputs...); err != nil {
				Debug(2, fmt.Sprintf("[EMITTER] error during copy: %q", err))
			}
		}()
	} else {
		for _, in := range plugins.Inputs {
			e.Add(1)
			go func(in PluginReader) {
				defer e.Done()
				if err := CopyMulty(in, plugins.Outputs...); err != nil {
					Debug(2, fmt.Sprintf("[EMITTER] error during copy: %q", err))
				}
			}(in)
		}
	}
}

If there is only one co process, there is a performance bottleneck. The default is to copy multiple copies of one input and write multiple outputs. If the -- split output parameter is passed and there are multiple outputs, the simple Round Robin algorithm is used to select the output instead of writing multiple copies. Multiple inputs are parallel, but a single input to multiple outputs is serial. All inputs implement io Both reader interface and output implement io Writer interface. Therefore, when reading the code, the entry of input is the Read() method and the entry of output is the Write() method.

// CopyMulty copies from 1 reader to multiple writers
func CopyMulty(src PluginReader, writers ...PluginWriter) error {
	wIndex := 0
	modifier := NewHTTPModifier(&Settings.ModifierConfig)
	filteredRequests := make(map[string]int64)
	filteredRequestsLastCleanTime := time.Now().UnixNano()
	filteredCount := 0

	for {
		msg, err := src.PluginRead()
		if err != nil {
			if err == ErrorStopped || err == io.EOF {
				return nil
			}
			return err
		}
		if msg != nil && len(msg.Data) > 0 {
			if len(msg.Data) > int(Settings.CopyBufferSize) {
				msg.Data = msg.Data[:Settings.CopyBufferSize]
			}
			meta := payloadMeta(msg.Meta)
			if len(meta) < 3 {
				Debug(2, fmt.Sprintf("[EMITTER] Found malformed record %q from %q", msg.Meta, src))
				continue
			}
			requestID := byteutils.SliceToString(meta[1])
			// start a subroutine only when necessary
			if Settings.Verbose >= 3 {
				Debug(3, "[EMITTER] input: ", byteutils.SliceToString(msg.Meta[:len(msg.Meta)-1]), " from: ", src)
			}
			if modifier != nil {
				Debug(3, "[EMITTER] modifier:", requestID, "from:", src)
				if isRequestPayload(msg.Meta) {
					msg.Data = modifier.Rewrite(msg.Data)
					// If modifier tells to skip request
					if len(msg.Data) == 0 {
						filteredRequests[requestID] = time.Now().UnixNano()
						filteredCount++
						continue
					}
					Debug(3, "[EMITTER] Rewritten input:", requestID, "from:", src)

				} else {
					if _, ok := filteredRequests[requestID]; ok {
						delete(filteredRequests, requestID)
						filteredCount--
						continue
					}
				}
			}

			if Settings.PrettifyHTTP {
				msg.Data = prettifyHTTP(msg.Data)
				if len(msg.Data) == 0 {
					continue
				}
			}

			if Settings.SplitOutput {
				if Settings.RecognizeTCPSessions {
					if !PRO {
						log.Fatal("Detailed TCP sessions work only with PRO license")
					}
					hasher := fnv.New32a()
					hasher.Write(meta[1])

					wIndex = int(hasher.Sum32()) % len(writers)
					if _, err := writers[wIndex].PluginWrite(msg); err != nil {
						return err
					}
				} else {
					// Simple round robin
					if _, err := writers[wIndex].PluginWrite(msg); err != nil {
						return err
					}

					wIndex = (wIndex + 1) % len(writers)
				}
			} else {
				for _, dst := range writers {
					if _, err := dst.PluginWrite(msg); err != nil && err != io.ErrClosedPipe {
						return err
					}
				}
			}
		}

		// Run GC on each 1000 request
		if filteredCount > 0 && filteredCount%1000 == 0 {
			// Clean up filtered requests for which we didn't get a response to filter
			now := time.Now().UnixNano()
			if now-filteredRequestsLastCleanTime > int64(60*time.Second) {
				for k, v := range filteredRequests {
					if now-v > int64(60*time.Second) {
						delete(filteredRequests, k)
						filteredCount--
					}
				}
				filteredRequestsLastCleanTime = time.Now().UnixNano()
			}
		}
	}
}

The principle of polling scheduling algorithm is to allocate the requests from users to internal servers in turn every time, starting from 1 to n (number of internal servers), and then restart the cycle.

The advantage of the algorithm is its simplicity. It does not need to record the status of all current connections, so it is a stateless scheduling.

4, Other small knowledge

1. goreplay captures packets by calling google/gopacket, which calls libpcap through cgo. The overall tool is small and practical. It can not only capture packets of rawsocket, but also record and playback http, but also support cascading between multiple instances. RAW_SOCKET allows you to listen for traffic on any port because they operate at the IP level. Port is the characteristic of TCP, which has the advantages of flow control, reliable transmission and so on. This package implements its own TCP layer: using tcp_packet parses TCP packets. The flow is controlled by tcp_message.go management

Reference address: http://en.wikipedia.org/wiki/Raw_socket

2. Using three monkey emoji characters as the request separator is very funny at first sight.

For example:

3. The configuration information depends on the startup command parameters.

For example:

/usr/local/bin/gor --input-raw :80 --input-raw-track-response   --input-raw-bpf-filter "host ! 167.xxx.xxx.xx"  --input-raw-override-snaplen --prettify-http --output-http http://192.168.3.110:80 --output-http-timeout 10s --output-http-workers 1000 --output-http-workers-min 100  --http-allow-header "Aww-Csid: xxxxx" --output-http-track-response --http-allow-method POST --middleware "/production/www/go_replay/client/middleware/sync --project {project_name}" --output-http-compatibility-mode --http-allow-url /article/detail

4. goreplay supports Java programs to work together. Enable plug-in mode:

gor --input-raw :80 --middleware "java -jar xxx.jar" --output-file request.gor

A command can be passed to gor through the middleware parameter, and GOR will pull up a process to execute the command. During recording, gory communicates with the plug-in process by obtaining the standard input and output of the process.

The data flow is roughly as follows:

+-------------+     Original request     +--------------+     Modified request      +-------------+
|  Gor input  |----------STDIN---------->|  Middleware  |----------STDOUT---------->| Gor output  |
+-------------+                          +--------------+                           +-------------+
  input-raw                              java -jar xxx.jar                            output-file   

5. Interceptor setting

Reference address: https://github.com/buger/goreplay/wiki/Dealing-with-missing-requests-and-responses

During actual use, it is found that recording traffic will lose many requests when it reaches a certain level. After reading official documents and testing, it is found that the most relevant key parameter is input raw buffer size.
The fourth main reason is that gor itself needs to read the data packet and analyze the protocol. With the help of pcap and os buffer, when the buffer is insufficient and the arrived data packet is not enough to assemble the Http request, there will be a loss or failure request, which cannot be handled correctly.

listener.go this parameter is used on the underlying recording:

  inactive.SetTimeout(t.messageExpire)
      inactive.SetPromisc(true)
      inactive.SetImmediateMode(t.immediateMode)
      if t.immediateMode {
        log.Println("Setting immediate mode")
      }
      if t.bufferSize > 0 {
        inactive.SetBufferSize(int(t.bufferSize))
      }

      handle, herr := inactive.Activate()
      if herr != nil {
        log.Println("PCAP Activate error:", herr)
        wg.Done()
        return
      }

Define bufferSize in the specific copy action:

// CopyMulty copies from 1 reader to multiple writers
func CopyMulty(src io.Reader, writers ...io.Writer) (err error) {
  buf := make([]byte, Settings.copyBufferSize)
  wIndex := 0
  modifier := NewHTTPModifier(&Settings.modifierConfig)
  filteredRequests := make(map[string]time.Time)
  filteredRequestsLastCleanTime := time.Now()
 
  ......
}

5, Code call link diagram

Finally, a link diagram of gor code call is attached.

Original drawing address:

 

 

Click follow to learn about Huawei's new cloud technology for the first time~

Keywords: Go

Added by tommy445 on Sat, 19 Feb 2022 04:41:45 +0200