Exploration of pika in codis

background

Faced with the increasing storage capacity of kv type data in the company and the fact that the performance response is not sensitive, the cost of using the original codis scheme to store data is becoming higher and higher. In this scenario, there is an urgent need for an alternative scheme that can effectively take into account both cost and performance. Therefore, Pika is introduced as the bottom storage of codis to replace the high-cost codis server, and a series of design modifications are carried out around pika's scheme.

Principle design of codis

codis project is mainly divided into four components: codis Fe, codis dashboard, codis proxy and codis server.

CODIS Fe is mainly to facilitate the unified management of multiple sets of CODIS dasshoard, and provide an operation and maintenance friendly management interface, which is relatively friendly in operation and maintenance and management.

CODIS dashboard mainly completes the data consistency of slot, CODIS proxy and zk (or etcd) and other components, the operation and maintenance status of the whole cluster, the expansion and contraction of data and the management of high availability of components, which is similar to the API server function of k8s.

CODIS proxy is mainly an access proxy provided to the business level, which parses the request route and routes the route information of the key to the corresponding back-end group. Another important function is that when the cluster is expanded and shrunk through CODIS Fe, CODIS proxy will migrate according to the migration status of the group, To trigger the check or migration function of the key, so as to complete the hot migration of data without interrupting the business service, so as to ensure the availability of the business.

Operation principle of codis

codis runs in accordance with the schematic diagram given on the official website.

The path exposure of zk can be used for load balancing and service discovery of CODIS proxy, which can be encapsulated by jodis officially provided or redis client implemented by ourselves.

CODIS dashboard receives the management instructions of CODIS Fe, such as capacity expansion, online and offline, cluster master-slave status, etc. After receiving the instruction, store the status and push the status to CODIS proxy, so that the online management of CODIS Fe can be dynamic, and the data of CODIS proxy can be dynamically routed. The management of all cluster status will be consistent through CODIS dashboard.

Expansion and contraction principle of codis

Through a brief overview of the whole operation diagram of codis, we will further explore how codis expands and shrinks dynamically.

Taking the extended group as an example, the whole cluster process is as follows.

The original cluster is as follows. At this time, you need to add group4 to the cluster and migrate the 901-1023 slot s of group3 to group4.

The status after migration is completed is as follows.

At this time, group4 is newly added to the codis cluster, and the running data slot is the 901-1023 slot of the original group3.

The detailed migration steps can be divided into the following steps.

CODIS Fe sends the status to CODIS dashboard

Fill in the slot 901-1023 on CODIS Fe and Migrate Range, and migrate to group4.

At this time, CODIS Fe will send a uri containing / API / topom / slots / action / create range /... / 901 / 1023 / 1 to CODIS dashboard. At this time, CODIS dashboard will perform the following operations.

func (s *Topom) SlotCreateActionSome(groupFrom, groupTo int, numSlots int) error {
	s.mu.Lock()
	defer s.mu.Unlock()
	ctx, err := s.newContext()
	if err != nil {
		return err
	}

	g, err := ctx.getGroup(groupTo)
	if err != nil {
		return err
	}
	if len(g.Servers) == 0 {
		return errors.Errorf("group-[%d] is empty", g.Id)
	}

	var pending []int
	for _, m := range ctx.slots {   // Verify the status of the slot
		if len(pending) >= numSlots {
			break
		}
		if m.Action.State != models.ActionNothing {
			continue
		}
		if m.GroupId != groupFrom {
			continue
		}
		if m.GroupId == g.Id {
			continue
		}
		pending = append(pending, m.Id)
	}

	for _, sid := range pending {
		m, err := ctx.getSlotMapping(sid)
		if err != nil {
			return err
		}
		defer s.dirtySlotsCache(m.Id)   // Mark the slot as needing to write zk or etcd again to maintain the status

		m.Action.State = models.ActionPending
		m.Action.Index = ctx.maxSlotActionIndex() + 1
		m.Action.TargetId = g.Id
		if err := s.storeUpdateSlotMapping(m); err != nil {  // Update the status of the slot. The structure maintains the corresponding relationship between group and slot, which is used to set the routing agent of each slot in CODIS proxy
			return err
		}
	}
	return nil
}

It can be seen from the process that after creating the migration process, the migration information is actually written into zk or etcd to save the migration state, and the state migration of each step is carried out through the state machine started by CODIS dashboard.

CODIS dashboard accepts the status and starts the migration

After CODIS Fe successfully writes the migration information to zk or etcd, the CODIS dashboard monitors the status of service migration to start data migration.

//Processing slot operations
	go func() {
		for !s.IsClosed() {
			if s.IsOnline() {
				if err := s.ProcessSlotAction(); err != nil {
					log.WarnErrorf(err, "process slot action failed")
					time.Sleep(time.Second * 5)
				}
			}
			time.Sleep(time.Second)
		}
	}()

The collaboration will always refresh to check whether there is a migration status. If there is a migration status, it will be processed. The core functions related to the processing of this state are as follows.

func (s *Topom) SlotActionPrepareFilter(accept, update func(m *models.SlotMapping) bool) (int, bool, error) {
	s.mu.Lock()
	defer s.mu.Unlock()

	// context
	ctx, err := s.newContext()
	if err != nil {
		return 0, false, err
	}

	// Take the smallest action index
	var minActionIndex = func(filter func(m *models.SlotMapping) bool) (picked *models.SlotMapping) {
		for _, m := range ctx.slots {
			// Traverse all slotMapping, if m.action State = = "", jump out of this cycle and execute the next cycle
			if m.Action.State == models.ActionNothing {
				continue
			}

			// Pass slotMapping into the filter function, if m.action State !=  models. Execute the statement in if only after actionpending
			if filter(m) {
				if picked != nil && picked.Action.Index < m.Action.Index {
					continue
				}

				//accept will return true only if a slot has not executed the update method; In other words, a slot will only be processed once
				// The two group IDs stored in marks are: the group has been assigned or is about to be assigned
				// If the group id to be allocated to m is in marks, accept(m) returns false, which ensures that only one slot will be migrated to the same group and under a redis
				if accept == nil || accept(m) {
					picked = m
				}
			}
		}
		return picked
	}

	// The first case is: take the action in the slot being migrated The slot with the smallest ID
	// In the second case, the second operation is performed only when the above is not obtained, and the action in the pending slot is retrieved The slot with the smallest ID
	var m = func() *models.SlotMapping {
		// Take out m.action State !=  "" and m.Action. State !=  Action in the pending slot The slot with the smallest ID
		// That is: take out the action in the slot being migrated The slot with the smallest ID
		// Assign to picked
		// Then return
		var picked = minActionIndex(func(m *models.SlotMapping) bool {
			return m.Action.State != models.ActionPending
		})
		if picked != nil {
			return picked
		}

		// If not, execute the following statement

		if s.action.disabled.IsTrue() {
			return nil
		}

		// Take out m.action State !=  "" and m.Action. State == models. Action in actionpending's slot The slot with the smallest ID
		// That is: take out the action in the slot in pending status The slot with the smallest ID
		// Assign to picked
		// Then return
		return minActionIndex(func(m *models.SlotMapping) bool {
			return m.Action.State == models.ActionPending
		})
	}()

	// If no value is obtained in the above two cases, it means that slot migration is not required because the minimum action of slot migration is not obtained ID, and the smallest action for slot migration is not obtained id
	if m == nil {
		return 0, false, nil
	}


	if update != nil && !update(m) {
		return 0, false, nil
	}

	log.Warnf("slot-[%d] action prepare:\n%s", m.Id, m.Encode())


	//Change the action of each SlotMapping State and interact with zk
	//In addition, action When the state meets the requirements of preparing or prepared, the SlotMapping should be synchronized to the proxy
	switch m.Action.State {

	case models.ActionPending:

		defer s.dirtySlotsCache(m.Id)

		// Change the action state to preparing
		m.Action.State = models.ActionPreparing

		// Write to zk
		if err := s.storeUpdateSlotMapping(m); err != nil {
			return 0, false, err
		}

		// Unconditionally continue to execute the statements in the following case
		fallthrough

	case models.ActionPreparing:

		defer s.dirtySlotsCache(m.Id)

		log.Warnf("slot-[%d] resync to prepared", m.Id)

		// Change the action state to ActionPrepared
		m.Action.State = models.ActionPrepared

		// Refresh the slotMapping information into the proxy. If the brush fails, the m.action Change state back to ActionPreparing and return
		if err := s.resyncSlotMappings(ctx, m); err != nil {
			log.Warnf("slot-[%d] resync-rollback to preparing", m.Id)

			// Failed to refresh slotMapping information to proxy, m.action Change state back to ActionPreparing
			m.Action.State = models.ActionPreparing
			s.resyncSlotMappings(ctx, m)
			log.Warnf("slot-[%d] resync-rollback to preparing, done", m.Id)
			return 0, false, err
		}

		// After the proxy information is refreshed successfully, the m.action State = models. Actionprepared writes to zk
		if err := s.storeUpdateSlotMapping(m); err != nil {
			return 0, false, err
		}

		// Unconditionally continue to execute the statements in the following case
		fallthrough

	case models.ActionPrepared:

		defer s.dirtySlotsCache(m.Id)

		log.Warnf("slot-[%d] resync to migrating", m.Id)

		// Change the action state to ActionMigrating
		m.Action.State = models.ActionMigrating

		// Refresh the slotMapping information into the proxy. If the brush fails, return
		if err := s.resyncSlotMappings(ctx, m); err != nil {
			log.Warnf("slot-[%d] resync to migrating failed", m.Id)
			return 0, false, err
		}

		// After successful brushing, m.action State = models. Actionmigrating write zk
		if err := s.storeUpdateSlotMapping(m); err != nil {
			return 0, false, err
		}

		// Unconditionally continue to execute the statements in the following case
		fallthrough

	case models.ActionMigrating:

		return m.Id, true, nil

	case models.ActionFinished:

		return m.Id, true, nil

	default:

		return 0, false, errors.Errorf("slot-[%d] action state is invalid", m.Id)

	}
}

When the status of ActionPreparing is, the data will be written to zk or etcd to notify the CODIS proxy status update.

In the CODIS dashboard, the backend data will be migrated through the processSlotAction function, and the core function is newSlotActionExecutor.

// Call the slot smgrttagslot command of redis to migrate the redis slot
func (s *Topom) newSlotActionExecutor(sid int) (func(db int) (remains int, nextdb int, err error), error) {
	s.mu.Lock()
	defer s.mu.Unlock()

	// context
	ctx, err := s.newContext()
	if err != nil {
		return nil, err
	}

	//The main method to obtain SlotMapping based on the id of the slot is return CTX slots[sid], nil
	m, err := ctx.getSlotMapping(sid)
	if err != nil {
		return nil, err
	}

	switch m.Action.State {

	//Initially, the slot is still in the process of migration, that is, migrating
	case models.ActionMigrating:

		if s.action.disabled.IsTrue() {
			return nil, nil
		}
		// m. When the groupid master-slave switches, the slot migration operation is not performed
		if ctx.isGroupPromoting(m.GroupId) {
			return nil, nil
		}
		// m.action. The targetid master-slave switch does not perform slot migration
		if ctx.isGroupPromoting(m.Action.TargetId) {
			return nil, nil
		}

		//During migration, the group of a slot itself and the promotion of the target group All States must be empty before migration
		from := ctx.getGroupMaster(m.GroupId)
		//Take out the first server of group 2, which is also the master
		dest := ctx.getGroupMaster(m.Action.TargetId)

		//The counter in Topom's action is incremented by one
		s.action.executor.Incr()

		return func(db int) (int, int, error) {
			//For each slot migration operation, the counter in Topom's action is decremented by 1
			defer s.action.executor.Decr()

			if from == "" {
				return 0, -1, nil
			}

			//Get the redisClient of group 1 from the cache. This client is composed of addr, auth, timeout, Database and redigo Conn composition: if there is no cache, create a new one
			c, err := s.action.redisp.GetClient(from)
			if err != nil {
				return 0, -1, err
			}
			//put the newly created redis client or the redis client extracted from the cache to topom action. Redisp
			defer s.action.redisp.PutClient(c)

			//Here db is 0, which is equivalent to redis selecting 0 from 16 databases
			if err := c.Select(db); err != nil {
				return 0, -1, err
			}

			var do func() (int, error)

			method, _ := models.ParseForwardMethod(s.config.MigrationMethod)
			switch method {
			case models.ForwardSync:
				do = func() (int, error) {
					//Call the slot smgrttagslot command of redis, randomly select a key of the current slot, and migrate all k-v's that are the same as the tag of this key to the target machine
					return c.MigrateSlot(sid, dest)
				}
			case models.ForwardSemiAsync:
				var option = &redis.MigrateSlotAsyncOption{
					MaxBulks: s.config.MigrationAsyncMaxBulks,
					MaxBytes: s.config.MigrationAsyncMaxBytes.AsInt(),
					NumKeys:  s.config.MigrationAsyncNumKeys,
					Timeout: math2.MinDuration(time.Second*5,
						s.config.MigrationTimeout.Duration()),
				}
				//Call the slotsmgrattagslot-async command of redis. The parameters are the ip and port of target redis
				do = func() (int, error) {
					return c.MigrateSlotAsync(sid, dest, option)
				}
			default:
				log.Panicf("unknown forward method %d", int(method))
			}

			n, err := do()
			if err != nil {
				return 0, -1, err
			} else if n != 0 {
				return n, db, nil
			}

			nextdb := -1
			//Check and process the keyspace information through the info command. The m retrieved here is empty
			m, err := c.InfoKeySpace()
			if err != nil {
				return 0, -1, err
			}
			for i := range m {
				if (nextdb == -1 || i < nextdb) && db < i {
					nextdb = i
				}
			}
			return 0, nextdb, nil

		}, nil

	case models.ActionFinished:

		return func(int) (int, int, error) {
			return 0, -1, nil
		}, nil

	default:

		return nil, errors.Errorf("slot-[%d] action state is invalid", m.Id)

	}
}

Call the CODIS server that needs to migrate data to actively synchronize data information.

If the data is accessed through CODIS proxy during data synchronization, CODIS proxy will either find out whether the data of the new node exists according to the configuration file, or migrate the data to the new node if it does not exist, so as to maintain the consistency of the data.

CODIS dashboard completed

When all the slots are migrated, the current slot status will be updated in zk or etcd to complete the whole migration process.

The whole migration process can be as follows.

Introduction to pika

Pika is an open source redis compatible protocol from 360 team. The underlying layer selects a kv storage of rocksdb. The project joins the open atom open source foundation and provides the access capability of codis on the mainstream version. Therefore, it is considered to replace the codis server component in codis by introducing pika.

pika supports both single node mode and distributed mode, that is, each slot can be migrated through separate management. In business practice, considering the relatively large amount of data, the distributed mode is used at the beginning, and it is also used in subsequent design and transformation.

Challenge of pika accessing codis

CODIS server command officially supported by pika

By referring to the source code of pika (version 3.4.0), it is located in pika_ command. The following is found in the H header file.

//Codis Slots
const std::string kCmdNameSlotsInfo = "slotsinfo";
const std::string kCmdNameSlotsHashKey = "slotshashkey";
const std::string kCmdNameSlotsMgrtTagSlotAsync = "slotsmgrttagslot-async";
const std::string kCmdNameSlotsMgrtSlotAsync = "slotsmgrtslot-async";
const std::string kCmdNameSlotsDel = "slotsdel";
const std::string kCmdNameSlotsScan = "slotsscan";
const std::string kCmdNameSlotsMgrtExecWrapper = "slotsmgrt-exec-wrapper";
const std::string kCmdNameSlotsMgrtAsyncStatus = "slotsmgrt-async-status";
const std::string kCmdNameSlotsMgrtAsyncCancel = "slotsmgrt-async-cancel";
const std::string kCmdNameSlotsMgrtSlot = "slotsmgrtslot";
const std::string kCmdNameSlotsMgrtTagSlot = "slotsmgrttagslot";
const std::string kCmdNameSlotsMgrtOne = "slotsmgrtone";
const std::string kCmdNameSlotsMgrtTagOne = "slotsmgrttagone";

Compare and check the commands supported by CODIS server as follows.

    {"slotsinfo",slotsinfoCommand,-1,"rF",0,NULL,0,0,0,0,0},
    {"slotsscan",slotsscanCommand,-3,"rR",0,NULL,0,0,0,0,0},
    {"slotsdel",slotsdelCommand,-2,"w",0,NULL,1,-1,1,0,0},
    {"slotsmgrtslot",slotsmgrtslotCommand,5,"w",0,NULL,0,0,0,0,0},
    {"slotsmgrttagslot",slotsmgrttagslotCommand,5,"w",0,NULL,0,0,0,0,0},
    {"slotsmgrtone",slotsmgrtoneCommand,5,"w",0,NULL,0,0,0,0,0},
    {"slotsmgrttagone",slotsmgrttagoneCommand,5,"w",0,NULL,0,0,0,0,0},
    {"slotshashkey",slotshashkeyCommand,-1,"rF",0,NULL,0,0,0,0,0},
    {"slotscheck",slotscheckCommand,0,"r",0,NULL,0,0,0,0,0},
    {"slotsrestore",slotsrestoreCommand,-4,"wm",0,NULL,0,0,0,0,0},
    {"slotsmgrtslot-async",slotsmgrtSlotAsyncCommand,8,"ws",0,NULL,0,0,0,0,0},
    {"slotsmgrttagslot-async",slotsmgrtTagSlotAsyncCommand,8,"ws",0,NULL,0,0,0,0,0},
    {"slotsmgrtone-async",slotsmgrtOneAsyncCommand,-7,"ws",0,NULL,0,0,0,0,0},
    {"slotsmgrttagone-async",slotsmgrtTagOneAsyncCommand,-7,"ws",0,NULL,0,0,0,0,0},
    {"slotsmgrtone-async-dump",slotsmgrtOneAsyncDumpCommand,-4,"rm",0,NULL,0,0,0,0,0},
    {"slotsmgrttagone-async-dump",slotsmgrtTagOneAsyncDumpCommand,-4,"rm",0,NULL,0,0,0,0,0},
    {"slotsmgrt-async-fence",slotsmgrtAsyncFenceCommand,0,"rs",0,NULL,0,0,0,0,0},
    {"slotsmgrt-async-cancel",slotsmgrtAsyncCancelCommand,0,"F",0,NULL,0,0,0,0,0},
    {"slotsmgrt-async-status",slotsmgrtAsyncStatusCommand,0,"F",0,NULL,0,0,0,0,0},
    {"slotsmgrt-exec-wrapper",slotsmgrtExecWrapperCommand,-3,"wm",0,NULL,0,0,0,0,0},
    {"slotsrestore-async",slotsrestoreAsyncCommand,-2,"wm",0,NULL,0,0,0,0,0},
    {"slotsrestore-async-auth",slotsrestoreAsyncAuthCommand,2,"sltF",0,NULL,0,0,0,0,0},
    {"slotsrestore-async-select",slotsrestoreAsyncSelectCommand,2,"lF",0,NULL,0,0,0,0,0},
    {"slotsrestore-async-ack",slotsrestoreAsyncAckCommand,3,"w",0,NULL,0,0,0,0,0},

In contrast, it is found that pika implements relatively few commands, and it remains to be seen whether the specific access can be used normally in codis, and the syntax supported by codis server and Pika is different at present.

Under the cluster mode of pika, you need to enter the following instructions.

redis-cli -p 9221 pkcluster slot info 1

This also means that the command scheduling and management layer must also add support for the syntax format of pika.

In the early research stage, thanks to the strong support of R & D students, in the CODIS dashboard layer, some source code logic was modified to support the master-slave synchronization, master-slave promotion and other commands of pika, so as to complete the operation at the CODIS Fe level.

After completing the above operations, when continuing the data migration, it is found that the migration is completed on the CODIS Fe interface, but the data is not migrated, resulting in the fact that the newly migrated data is not migrated to the corresponding cluster. Why do the following problems appear? There is no obvious error message on the CODIS Fe interface. What's the problem?

At this point, continue to check the source code of pika's slot.

void SlotsMgrtSlotAsyncCmd::Do(std::shared_ptr<Partition> partition) {
  int64_t moved = 0;
  int64_t remained = 0;
  res_.AppendArrayLen(2);
  res_.AppendInteger(moved);
  res_.AppendInteger(remained);
}

From the source code, it is found that in our daily operation, the instructions sent to pika through CODIS dashboard directly return success, so that CODIS dashboard immediately receives the success signal when migrating, which directly modifies the migration status to success, but in fact, the data migration is not executed.

In view of this situation, we consulted the official documents about pika Capacity expansion case of Pika with Codis.

According to the official documents, this migration scheme is a scheme that may lose data. In this case, you need to adjust the migration scheme by yourself.

Design of pika migration tool

The overall process of the migration tool is as follows:

The original cluster information is as follows

At this time, 901-1023 slot information needs to be migrated to the new component, that is, group4 provides services as a new instance.

Firstly, develop a migration tool of pika, which can forward the request of proxy codis. First migrate the information from 801-1023 to pika's migration tool.

At this time, the pika migration tool writes the write information of 801-900 to group3, and writes the write information of 901-1023 to group4. Then, if you check the data, check group4 first. If not, check group3.

At this time, after the pika migration tool is accessed, it forwards the proxy to the back-end service. After the access is completed, the master-slave synchronization information is carried out to synchronize group3 to group4.

After migrating the data of slot901-1023 from group3 to group4, you can wait for the data migration to complete because there is no data written to the 901-1023 of new group3.

After the migration, disconnect the master and slave, and then migrate the slot information of pika's migration tool, i.e. 801-900 back to group3 and 901-1023 back to group4. At this time, the data migration is completed.

So far, pika has expanded the cluster through the migration tool. Most of the functions of this tool are similar to those of CODIS proxy, except that the corresponding routing rules need to be converted and the syntax instructions for pika need to be added.

summary

This article is just some thinking and exploration of pika in the codis scenario. Due to our lack of knowledge, please criticize and correct any mistakes.

Keywords: Java Operation & Maintenance

Added by TecBrat on Mon, 31 Jan 2022 01:54:39 +0200