go to realize high concurrency and high availability distributed system: design a high concurrency massive data storage mechanism similar to kafka 1

In the previous section, we implemented the log micro service, which runs in the mode of http server. The client post s the log data through json, and then reads the log through http get. At that time, our implementation was to add all log information to the end of the array, which means that all log information will be saved in memory. However, the number of logs of distributed systems will be very huge. For example, the number of logs of twitter in one day will reach one trillion. It is estimated that the number of logs of domestic microblog, wechat, Taobao and other large-scale systems is also this level. Assuming that we use 100 servers to run log microservices, one will process 1 billion logs. Assuming that a log is 64 bytes, it will consume 64G if the log is stored directly in memory. Considering that many logs are likely to be read after storage, and one server needs to provide other programs to run, Therefore, storing logs directly in memory will be a huge loss.

Therefore, we need an effective file system to store so much log information, and the storage mechanism also needs to support fast query. Of course, we can use mysql and other databases to store logs, but such databases are difficult to query fast enough. Therefore, it is necessary for us to design our own storage system to meet the requirements, The system should be able to quickly query the required records in massive data.

The basic idea of dealing with massive data or high concurrency requirements is to divide and rule. Think about our country's 1.4 billion people. If you send express, how can SF service providers quickly and accurately send packages to the recipients. The idea is actually very simple. First, you need to determine the recipient's province, then the urban area, then the township, then the street community, and finally the building unit. Through this continuous "zoning" and quickly narrowing the search range, you can quickly locate the target. For the 1 billion logs, we also adopt the idea of "zoning", and divide them into 100 copies, with 10 million records each. The first record number is 0 to 9999999, the second record is 10000000 to 19999999, and so on. Suppose that when we want to query the log numbered 11 million, we can find it in the second copy. If we want to speed up the speed, we can continue to split each copy.

First, let's look at how to store the binary data of the log. The log is actually a string of binary data, so we use the simplest storage mechanism as follows: Length, data content | length, data content |... That is, when storing log binary data, we first store its length, then write binary data, and then store the length of the second log, followed by the binary data of the second log, Where "length" "We use 8 bytes, and so on. But there is a problem with this storage method, that is, the query will be very slow. If we want to read the nth log content, we must start from scratch, first obtain the length of the first day's data, then cross the given length, then read the second data length, and then continue to cross the second data length, and so on The time complexity of reading the nth record is O(n).

The reason for the slow query speed is that the length of each data is different. To read the nth record, we need to analyze the length of the previous n-1 records in turn. In order to speed up the speed, we need to set up an index file, which directly records the offset of the nth record in the binary file. Assuming that three records are stored in the binary file, the data length of the first record is 4 bytes, the length of the second record is 8 bytes, and the length of the third record is 12 bytes, the format of the index file is: 0,0|1,12|2,28| Let's look at the logic of the index file, "0,0" means that the 0 data is read from the offset of the binary file, "1,8" means that the data of the 1 log is read from the offset of the binary file to 8, "2,20" means that the data of the 2 log is read from the offset of the binary file to 20 bytes.

Because the length of the 0-th data is placed at the beginning of the binary file, followed by the 4-byte data, its offset in the binary file is 0. Since 8 bytes are used to represent the length and the data occupies 4 bytes, the 8-byte data corresponding to the offset of 12 bytes is the length of the second data. Since the length of the second data is 8, Therefore, continue to offset 8 bytes to get the starting position of the third data length. Therefore, after offsetting 28 bytes, it is 8 bytes of the corresponding length of the third data. Therefore, "2,28", that is, the log with subscript 2, is recorded in the index file, and the starting offset in the binary file is 28.

The index log uses 4 bytes to represent the record subscript and 8 bytes to represent the data offset, so every 12 bytes can express an offset recorded in the binary file. Therefore, if we want to quickly locate the starting position of the nth record in the binary file, We directly read 8 bytes of data at an offset of (n-1)*12 + 8 from the index file to get the starting position of the nth record in the binary file.

We use store to represent the binary file storing data, " Store "to correspond to the binary file suffix of the stored data, and inex is used to represent the index," "Index" is used as the suffix of the index file, so first look at the implementation of the binary file, create a folder named log in the internal directory, and then create the store Go file, enter the following code:

package log

import (
    "bufio"
    "encoding/binary"
    "os"
    "sync"
)

var (
    enc = binary.BigEndian  //The big end is used to encode the data length, because the length information needs to be transmitted over the network 
)

const (
    lenWidth = 8 //8 bytes for storing data length
)

type store struct {
    *os.File  //Corresponding binary file
    mu sync.Mutex  //Since multiple read and write requests may occur at the same time, locking is required
    buf *bufio.Writer //Interface for reading and writing binary data
    size uint64  //Size of the entire file
}

func newStore(f *os.File) (*store, error) { //Pass in a file handle to create the store object
    fi, err := os.Stat(f.Name())
    if err != nil {
        return nil, err
    }
    size := uint64(fi.Size())
    return &store {
        File : f,
        size: size,
        buf : bufio.NewWriter(f),
    }, nil
}

func (s *store) Append(p []byte) (n uint64, pos uint64, err error) {
    //Add a record, n represents the subscript of the record, and pos represents the offset of the record in the binary file
    s.mu.Lock()
    defer s.mu.Unlock()
    pos = s.size 
    //Write the length of data with 8 bytes before writing data
    if err := binary.Write(s.buf, enc, uint64(len(p))); err != nil {
        return 0, 0, err 
    }
    //Then write the data
    w, err := s.buf.Write(p)
    if err != nil {
        return 0, 0, err 
    }

    //After adding a record, the size of the store will change accordingly
    w += lenWidth 
    s.size += uint64(w)
    return uint64(w), pos, nil
}

func (s *store) Read(pos uint64) ([]byte, error) {
    //Read record information from offset pos
    s.mu.Lock()
    defer s.mu.Unlock()
    //Now write all the data in the buffer to the file
    if err := s.buf.Flush(); err != nil {
        return nil, err 
    }
    //Gets the length of the record
    size := make([]byte, lenWidth)
    if _, err := s.File.ReadAt(size, int64(pos)); err != nil {
        return nil , err 
    }
    //Read recorded binary data
    b := make([]byte, enc.Uint64(size))
    if _, err := s.File.ReadAt(b, int64(pos + lenWidth)); err != nil {
        return nil, err 
    }

    return b, nil
}

func (s *store) ReadAt(p []byte, off int64) (int, error) {
    s.mu.Lock()
    defer s.mu.Unlock()
    if err := s.buf.Flush(); err != nil {
        return 0, err
    }
    //Data is read into the buffer from the binary offset of off
    return s.File.ReadAt(p, off)
}

func (s *store) Close() error {
    s.mu.Lock()
    defer s.mu.Unlock()
    err := s.buf.Flush()
    if err != nil {
        return err 
    }

    return s.File.Close()
}

The object for storing and recording binary data is called store, which corresponds to a binary file. Its read-write logic is the same as that described above. Next, we run the logic implemented above through testing and create a store in the same directory_ test. Go, enter the test cases as follows:

package log 
import (
    "io/ioutil"
    "os"
    "testing"
    "github.com/stretchr/testify/require"
)

var (
    write = []byte("this is a record")
    //The data length of a record. Four bytes represent the content length, and len(write) represents the content length
    width = uint64(len(write)) + lenWidth
)

func TestStoreAppendRead(t *testing.T) {
    //First create a binary file for storing data
    f, err := ioutil.TempFile("", "store_append_read_test")
    require.NoError(t, err)
    defer os.Remove(f.Name())

    s, err := newStore(f) 
    require.NoError(t, err)
    //Test insert record
    testAppend(t, s)
    //Test read a record
    testReadAt(t, s)

    s, err = newStore(f)
    require.NoError(t, err)
    testRead(t, s)
}

func testAppend(t *testing.T, s *store) {
    t.Helper()
    //It should be able to insert several records normally
    for i := uint64(1); i < 4; i ++ {
        n, pos, err := s.Append(write)
        require.NoError(t, err)
        require.Equal(t, pos + n, width * i)
    }
}

func testRead(t *testing.T, s *store) {
    t.Helper()
    var pos uint64 
    //It should be able to read the inserted records normally
    for i := uint64(1); i < 4; i++ {
        read, err := s.Read(pos)
        require.NoError(t, err)
        require.Equal(t, write, read)
        pos += width 
    }
}

func testReadAt(t *testing.T, s *store) {
    t.Helper()
    for i, off := uint64(1), int64(0); i < 4; i++ {
        //First read 8 bytes to get
        b := make([]byte, lenWidth)
        n, err := s.ReadAt(b, off)
        require.NoError(t, err)
        //The length of the read data should be equal to the length of the buffer
        require.Equal(t, lenWidth, n)
        off += int64(n)

        size := enc.Uint64(b)
        b = make([]byte, size)
        //Read binary data of log
        n, err = s.ReadAt(b, off)
        require.NoError(t, err)
        //The read content shall be consistent with the written content
        require.Equal(t, write, b)
        //The read data length shall be consistent with the length indicated by the first 8 bytes
        require.Equal(t, int(size), n)
        off += int64(n)
    }
}

func TestStoreClose(t *testing.T) {
    f, err := ioutil.TempFile("", "store_close_test")
    require.NoError(t, err)
    defer os.Remove(f.Name())

    s, err := newStore(f)
    require.NoError(t, err)
    _, _, err = s.Append(write)
    require.NoError(t, err)

    f, beforeSize, err := openFile(f.Name())
    require.NoError(t, err)
    //The data must be written after the inspection file is closed
    err = s.Close()
    require.NoError(t, err)
    _, afterSize, err := openFile(f.Name())
    require.NoError(t, err)
    //Since the cached data must be written to the disk when the file is closed, the size of the file after closing is larger than that before closing
    require.True(t, afterSize > beforeSize)
}

func openFile(name string) (file *os.File, size int64, err error) {
    //Create a file for storing binary data
    f, err := os.OpenFile(name, os.O_RDWR | os.O_CREATE | os.O_APPEND, 0644,)
    if err != nil {
        return nil, 0, err 
    }

    fi, err := f.Stat()
    if err != nil {
        return nil, 0, err 
    }

    return f, fi.Size(), nil 
}

Next, let's take a look at the implementation of the index file, and create index. XML in the same path Go file, the input code is as follows:

package log 

import (
    "io"
    "os"
    "github.com/tysonmote/gommap"
)

var (
    offWidth uint64 = 4
    posWidth uint64 = 8
    //4 bytes are used to represent the subscript of the record, and 8 bytes represent the offset of the record in the binary file
    entWidth = offWidth + posWidth 
)

type index struct {
    file *os.File  //Storage file
    mmap gommap.MMap  //Mapping of file contents in memory
    size uint64 
}

func newIndex(f *os.File, c Config) (*index, error) {
    idx := &index {
        file : f, 
    }

    fi , err := os.Stat(f.Name())
    if err != nil {
        return nil , err
    }

    idx.size = uint64(fi.Size())
    //Now expand the file to the specified size so that you can use memory mapping next
    if err = os.Truncate(f.Name(), int64(c.Segment.MaxIndexBytes),); err != nil {
        return nil, err 
    }
    //Enable memory mapping to speed up file reading and writing
    if idx.mmap , err = gommap.Map(idx.file.Fd(), gommap.PROT_READ | gommap.PROT_WRITE, gommap.MAP_SHARED,); err != nil {
        return nil, err 
    }

    return idx, nil
}

func (i *index) Close() error  {
    //When closing the file, first write the data in memory to the file. Here, it should be run on linux system, and there will be errors when running on windows
    if err := i.mmap.Sync(gommap.MS_SYNC); err != nil {
        return err
    }

    //Write file cached data to disk
    if err := i.file.Sync(); err != nil {
        return err 
    }

    //Set the size of the file to the size of the actual written data
    if err := i.file.Truncate(int64(i.size)); err != nil {
        return err 
    }

    return i.file.Close() 
}

func (i *index) Read(in int64) (out uint32, pos uint64, err error) {
    if i.size == 0 {
        return 0, 0, io.EOF 
    }
    //in==-1 means to read the last record
    if in == -1 {
        out = uint32((i.size / entWidth) - 1)
    } else {
        out = uint32(in)
    }

    pos = uint64(out) * entWidth 
    if i.size < pos + entWidth {
        return 0, 0, io.EOF 
    }

    out = enc.Uint32(i.mmap[pos : pos + offWidth]) //Subscript of record
    pos = enc.Uint64(i.mmap[pos + offWidth : pos + entWidth])  //Offset recorded in binary file

    return out, pos, nil 
}

func (i *index) Write(off uint32, pos uint64) error {
    //Index of a new record
    if uint64(len(i.mmap)) < i.size + entWidth {
        return io.EOF 
    }

    enc.PutUint32(i.mmap[i.size : i.size + offWidth], off)  //Subscript of new record
    enc.PutUint64(i.mmap[i.size + offWidth : i.size + entWidth], pos) //New offset recorded in binary file
    i.size += uint64(entWidth)

    return nil 
}

func (i *index) Name() string {
    return i.file.Name()
}

With the implementation of the index file, we test its logic and create a new index_test.go and enter the following code:

package log 

import (
    "io"
    "io/ioutil"
    "os"
    "testing"
    "github.com/stretchr/testify/require"
)

func TestIndex(t *testing.T) {
    f, err := ioutil.TempFile(os.TempDir(), "index_test")
    require.NoError(t, err)
    defer os.Remove(f.Name())

    c := Config{}
    c.Segment.MaxIndexBytes = 1024 //This represents the maximum length of the file corresponding to the binary data of the storage record
    idx, err := newIndex(f , c)
    require.NoError(t, err)

    _, _, err = idx.Read(-1)
    require.Error(t, err)
    require.Equal(t, f.Name(), idx.Name())

    entries := []struct {
        Off uint32 
        Pos uint64 
    } {
        //Here is the index corresponding to the two records
        {Off: 0, Pos: 2},
        {Off: 1, Pos: 10},
    }

    //Test that the read index content is consistent with the written content
    for _, want := range entries {
        err = idx.Write(want.Off, want.Pos)
        require.NoError(t, err)

        _, pos, err := idx.Read(int64(want.Off))
        require.NoError(t, err)
        require.Equal(t, want.Pos, pos)
    }

    //An error will be returned when the read data is out of range. For example, an error will be returned when only the index corresponding to the third record is written, but the index of the fourth record is to be read EOF
    _, _, err = idx.Read(int64(len(entries)))
    require.Equal(t, io.EOF, err)
    err = idx.Close()

    f, _ = os.OpenFile(f.Name(), os.O_RDWR, 0600)
    idx , err = newIndex(f, c)
    require.NoError(t, err)
    off, pos, err := idx.Read(-1)

    require.NoError(t, err)
    require.Equal(t, uint32(1), off)
    require.Equal(t, entries[1].Pos, pos)    
}

So far, we have only completed the data storage and indexing. What we still need to do is to divide the massive data into multiple store s and their corresponding indexes. The corresponding work will be carried out in the next section. The code is obtained here https://github.com/wycl16514/go_distribute_system_store_index.git

Added by nimzie on Sat, 15 Jan 2022 12:36:26 +0200