http2 protocol frame format
We know that network transmission is in binary form, so the transmission at the bottom of all protocols is also binary. So the problem comes. The client sends a packet to the server. How does the server know whether the packet is completed or still being sent? Or, if a data packet is too large, the client needs to be split into several packets to send, or if the data packet is too small, the client needs to send a composite packet. How can the server identify it? In order to solve these problems, both client and server will agree on some "rules" that can be understood by both parties, which is the agreement.
We know that grpc transport layer is based on http2 protocol specification, so we need to understand the format of http2 protocol frame first. The format of http 2 protocol frame is as follows:
Frame Format All frames begin with a fixed 9-octet header followed by a variable-length payload. +-----------------------------------------------+ | Length (24) | +---------------+---------------+---------------+ | Type (8) | Flags (8) | +-+-------------+---------------+-------------------------------+ |R| Stream Identifier (31) | +=+=============================================================+ | Frame Payload (0...) ... +---------------------------------------------------------------+
For a network packet, you must first know the format of the packet, and then you can parse the packet according to the agreed format. So what is the format of grpc package? After reading the source code, it is revealed directly. In fact, it is like this
The http frame format is: length (3 bytes) + type (1 byte) + Flag (1 byte) + R (1 bit) + stream identifier (31 bit) + payload. Payload is the specific content of the message
The first nine bytes are http packet headers. Length represents the message length and type represents the type of http frame. http specifies a total of 10 frame types:
- HEADERS frame header information, corresponding to HTTP HEADER S
- The DATA frame corresponds to the HTTP Response Body
- The PRIORITY frame is used to adjust the PRIORITY of the flow
- RST_STREAM frame stream termination frame, used to interrupt the transmission of resources
- SETTINGS frame user client server AC connection configuration information
- PUSH_PROMISE frame server actively pushes resources to the client
- GOAWAY frame notifies the other party of disconnection
- PING frame heart skip frame, detect round-trip time and connection availability
- WINDOW_UPDATE frame resize frame
- CONTINUATION frame when the HEADERS of the CONTINUATION frame are too large
Flag indicates flag bit. There are three kinds of flag bits in http:
- END_STREAM end flag, indicating that the current frame is the last frame of the stream
- END_ Header end indicates that the current frame is the last frame of header information
- PADDED filling flag, which fills useless information in the data Payload for interference channel monitoring
R is the reserved bit of 1 bit, stream identifier is the stream id, and http will assign an id to each data stream
For details, please refer to: http frame
Parsing http frame header
Go back to the helloworld directory of the examples directory and trace s.Serve in the server main function. Methods: s.Serve() - > s.handlerawconn (rawconn) - > s.serverstreams (st)
Take a look at the serveStreams method
func (s *Server) serveStreams(st transport.ServerTransport) { defer st.Close() var wg sync.WaitGroup st.HandleStreams(func(stream *transport.Stream) { wg.Add(1) go func() { defer wg.Done() s.handleStream(st, stream, s.traceInfo(st, stream)) }() }, func(ctx context.Context, method string) context.Context { if !EnableTracing { return ctx } tr := trace.New("grpc.Recv."+methodFamily(method), method) return trace.NewContext(ctx, tr) }) wg.Wait() }
Here, the HandleStreams method of transport is called, which is the specific implementation of HTTP frame processing. Its bottom layer directly calls the ReadFrame method of http2 package to read an HTTP frame data.
type framer struct { writer *bufWriter fr *http2.Framer } func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) { defer close(t.readerDone) for { frame, err := t.framer.fr.ReadFrame() atomic.StoreUint32(&t.activity, 1) ... switch frame := frame.(type) { case *http2.MetaHeadersFrame: if t.operateHeaders(frame, handle, traceCtx) { t.Close() break } case *http2.DataFrame: t.handleData(frame) case *http2.RSTStreamFrame: t.handleRSTStream(frame) case *http2.SettingsFrame: t.handleSettings(frame) case *http2.PingFrame: t.handlePing(frame) case *http2.WindowUpdateFrame: t.handleWindowUpdate(frame) case *http2.GoAwayFrame: // TODO: Handle GoAway from the client appropriately. default: errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame) } } }
Read and take out the data of a frame directly through the ReadFrame of http2 package.
func (fr *Framer) ReadFrame() (Frame, error) { fr.errDetail = nil if fr.lastFrame != nil { fr.lastFrame.invalidate() } fh, err := readFrameHeader(fr.headerBuf[:], fr.r) if err != nil { return nil, err } if fh.Length > fr.maxReadSize { return nil, ErrFrameTooLarge } payload := fr.getReadBuf(fh.Length) if _, err := io.ReadFull(fr.r, payload); err != nil { return nil, err } f, err := typeFrameParser(fh.Type)(fr.frameCache, fh, payload) if err != nil { if ce, ok := err.(connError); ok { return nil, fr.connError(ce.Code, ce.Reason) } return nil, err } if err := fr.checkFrameOrder(f); err != nil { return nil, err } if fr.logReads { fr.debugReadLoggerf("http2: Framer %p: read %v", fr, summarizeFrame(f)) } if fh.Type == FrameHeaders && fr.ReadMetaHeaders != nil { return fr.readMetaFrame(f.(*HeadersFrame)) } return f, nil }
FH, err: = readframeheader (fr.headerBuf [:], fr.r) reads the header data of http. Let's take a look at the length of headerBuf and find that it is 9 bytes.
const frameHeaderLen = 9 func readFrameHeader(buf []byte, r io.Reader) (FrameHeader, error) { _, err := io.ReadFull(r, buf[:frameHeaderLen]) if err != nil { return FrameHeader{}, err } return FrameHeader{ Length: (uint32(buf[0])<<16 | uint32(buf[1])<<8 | uint32(buf[2])), Type: FrameType(buf[3]), Flags: Flags(buf[4]), StreamID: binary.BigEndian.Uint32(buf[5:]) & (1<<31 - 1), valid: true, }, nil }
Analyze business data
After the above process, we finally read the http header. As mentioned earlier, after reading the http package, you also need to parse the grpc protocol header. How is this part analyzed?
Returning to the part of reading frames in http2, when it is found that the format of the frame is MetaHeadersFrame, that is, the first frame, the operateHeaders method will be called
case *http2.MetaHeadersFrame: if t.operateHeaders(frame, handle, traceCtx) { t.Close() break }
Take a look at operateHeaders, which will call handle(s). In fact, this handle is passed in by the HandleStreams method under the path of s.Serve() - > s.handlerawconn (rawconn) - > s.servestreams (st), that is, it will call the handleStream method
st.HandleStreams(func(stream *transport.Stream) { wg.Add(1) go func() { defer wg.Done() s.handleStream(st, stream, s.traceInfo(st, stream)) }() }, func(ctx context.Context, method string) context.Context { if !EnableTracing { return ctx } tr := trace.New("grpc.Recv."+methodFamily(method), method) return trace.NewContext(ctx, tr) })
s.handleStream(st, stream, s.traceInfo(st, stream)) -- > s.processunaryrpc (T, stream, SRV, MD, trinfo) - > D, err: = recvandecompress (& parser {R: stream}, stream, DC, s.opts.maxreceivemessagesize, payinfo, decomp), enter the recvandecompress function, which calls
pf, d, err := p.recvMsg(maxReceiveMessageSize)
Enter recvMsg, now it is the function of parsing grpc protocol. First read out the protocol header and use 5 bytes. Get the length of the protocol body message from the protocol header, and then read out the protocol body with a byte array of corresponding length
type parser struct { r io.Reader header [5]byte } func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byte, err error) { if _, err := p.r.Read(p.header[:]); err != nil { return 0, nil, err } pf = payloadFormat(p.header[0]) length := binary.BigEndian.Uint32(p.header[1:]) ... msg = make([]byte, int(length)) if _, err := p.r.Read(msg); err != nil { if err == io.EOF { err = io.ErrUnexpectedEOF } return 0, nil, err } return pf, msg, nil }
Continue to return to this figure. As mentioned earlier, the grpc protocol header is 5 bytes.
Compressed flag indicates whether to compress. A value of 1 is to compress the message body data, and 0 is not compressed.
Length indicates the length of message body data. Now I finally know the origin of this data structure!
The read data is binary. After reading the original data, we can unpack the corresponding data. Here you can refer to another article of mine: