Self defined scheduling based on K8s scheduler

That is to write code according to the agreement, which is the same as when I first wrote mysql UDF and hive custom functions.

Multiple schedulers are officially configured:

https://kubernetes.io/zh/docs/tasks/extend-kubernetes/configure-multiple-schedulers/

 

Background: in order to realize the scheduling function of database service based on K8s
 
Difficulties: 1. The resources of native K8s are only cpu and mem, but disk resources need to be considered in MySQL scheduling,
2. The original scheduling strategy does not conform to the online environment, such as mixed running of online containers and physical machines, customized service strategy, etc
 
Scheme: 1. Modify and customize the scheduler based on the source code of K8s scheduler. When scheduling all servers, specify a new scheduler to realize the custom policy
2. Synchronize the required metadata, such as MySQL port capacity and server disk capacity, to the annotations in K8s through scripts
 
technological process:
1. Download the scheduler source code to the local server
 It can be downloaded from the official. It was originally internal here git Address, omit
2. Compile after the local modification is completed,
CGO_ENABLED=0 GOOS=linux GOARCH=amd64  make all WHAT=cmd/kube-scheduler/
3. After compilation & & no error is reported, the images are packaged and push ed to the warehouse
docker build -f Dockerfile -t registry.xxxx.xxxx.com.cn/xxxx/scheduler:1.0 .
docker login registry.xxx.xxx.com.cn -u xxxx
docker push registry.xxxx.xxxx.com.cn/xxxx/scheduler
4. Find the running scheduler on the North-South K8s cluster and delete it. The scheduler is deployed in the mode of deployment. After deleting, confirm whether the new scheduler is running
kubectl get pod -n kube-system | grep kube-scheduler-db
kubectl delete pod kube-scheduler-db-xxxxxx-xxxxxxx -n kube-system
5. At this point, the scheduler updates the process, and then the modification effectiveness can be observed. For major updates, it is recommended to test first and then officially launch
 
Details of modification (example)
The directory structure of the scheduler is as follows
     
The parts we need to modify are concentrated in the \ kubernetes development \ PKG \ scheduler \ algorithm directory,
 
There are two folders in this directory, and each file in these two directories represents a policy
Predictions this directory is the primary selection strategy. The primary selection is filtering. Through this strategy, servers that do not meet the hard standards will be eliminated
priorities , this directory is the optimization strategy, which means scoring. Through this strategy, all servers that pass the primary selection are scored, and the server with the highest score is selected for scheduling
 
XXXX in predictions_ disk_ predicate. Take go as an example. This strategy filters servers that do not meet MySQL disk space requirements. The actual code is subject to online
package predicates
 
import (
       "fmt"
       "k8s.io/api/core/v1"
       "k8s.io/kubernetes/pkg/scheduler/algorithm"
       schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
)
 
func NevisDiskPredicate(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
 
       var remainDiskPercentage int64 = 10 //Minimum disk remaining 10%
       var requestedDisk int64 //Total request ed value of node
       var HoldDiskMin int64 = 250 //Minimum remaining capacity
       node := nodeInfo.Node() //The node instance under judgment is obtained here
 
       if node == nil {  // If the node is invalid, exit directly
              return false, nil, fmt.Errorf("node not found")
       }
       
       //To obtain the service type, disk scheduling is currently only effective for MySQL and pika, so you need to judge the resource pool of the server
       scheduleService := pod.Annotations[AnnotationSchedulerService]
 
       requestDiskCurrentPod := NevisPodDISKRequest(pod) // disk request of current POD
       for _, p := range nodeInfo.Pods() { // Get the sum of diskrequest and requestedDISK of each pod on the node
              requestDiskPerPod := xxxxPodDISKRequest(p)
              requestedDisk += requestDiskPerPod
       }
 
 
       nodeFreeDisk := xxxxNodeDISKFree(node) //Get free disk of node  
       nodeTotalDisk := xxxxNodeDISKTotal(node) //Get the total disk of node
        //The NevisNodeDISKFree and NevisNodeDISKTotal functions are in utils Go, specifically to obtain node annotations data,
        //It should be noted here that various states of the online annotations value need to be considered, and the default value needs to be set, otherwise it will affect subsequent judgment
 
       // Filtering strategy
       // 1. The number of requests already added and to be requested should be less than 90% of the total disk,
       // 2. The number of free disk minus the number of incoming request s should be greater than 10% of the total disk
       // 3. Considering that the disk capacity of some servers is very small (< 4T), the minimum reserved space is increased to 250G
        // The strategy needs to pay attention to the mixed running state of containers and physical machines on the line, which needs to be considered comprehensively,
 
       HoldDisk := int64(nodeTotalDisk * remainDiskPercentage/100)
       if HoldDisk < HoldDiskMin {
              HoldDisk = HoldDiskMin
       }
 
       if ((scheduleService == SchedulerServiceMySQL) || (scheduleService == SchedulerServicePika)){
        //Judge the resource pool and filter the unqualified servers directly
        // The judgment of service type should be placed at the entry of the function to avoid invalid operation. Here is an example and will not be modified
              if  (int64(requestedDisk) + int64(requestDiskCurrentPod) > int64(nodeTotalDisk - HoldDisk)|| ((nodeFreeDisk - int64(requestDiskCurrentPod)) < HoldDisk)) {
                     return false, []algorithm.PredicateFailureReason{
                            &PredicateFailureError{
                                   PredicateName: "xxxxDiskPredicate",
                                   PredicateDesc: fmt.Sprintf("node doesn't has enough DISK request %d, requested %d", requestDiskCurrentPod, requestedDISK),
                            },
                     }, nil
              }
              return true, nil, nil
       }
       return true, nil, nil
}
 
func init() {
       predicates := Ordering()
       predicates = append(predicates, NevisDiskPred)
       SetPredicatesOrdering(predicates)
}

 

More in priorities_ mem. Take go as an example, this strategy will give high scores to servers with more free memory. The actual code is subject to the online
package priorities
 
import (
       "fmt"
       "strconv"
       "k8s.io/klog"
       "k8s.io/api/core/v1"
       "k8s.io/apimachinery/pkg/labels"
       v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
       schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
       schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
)
 
func CalculateNodeMoreMemMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) {
 
 
       node := nodeInfo.Node() 
       if node == nil {
              return schedulerapi.HostPriority{}, fmt.Errorf("node not found")
       }
 
       var allMemory int64 //Total memory of the server
       var usedMemory int64 //Memory used by server
       allMemory = NevisNodeMemAll(node)
 
        //This is to obtain the instance mem usage of node through port 10255 of node. It is reasonable that this call should not be applied in logic as much as possible, because it may slow down the scheduling speed, because it needs to be executed by all nodes that have been preliminarily selected
                // However, because the database itself will not have very frequent scheduling, and the implementation is very simple, so it is implemented in this way
       resp, err := (&http.Client{Timeout: 5 * time.Second}).Get("http://" + node.Status.Addresses[0].Address + ":10255/stats")
              if err != nil {
                     klog.Warningf("get node status failed: %s", err)
              } else {
                     var ci cv1.ContainerInfo
                     if err := json.NewDecoder(resp.Body).Decode(&ci); err != nil {
                            resp.Body.Close()
                            klog.Warningf("decode node status failed: %s", err)
                     } else {
                            usedMemory = int64(ci.Stats[0].Memory.RSS)
                     }
              }
 
        // Scoring strategy: if the memory is not used, it will be 10 points, and if it is used up, it will be 0 points, which will be allocated in proportion
       count := 10 - (usedMemory / allMemory * int64(10))
       // log here. Relevant logs can be viewed through kubectl logs Kube scheduler DB XXXX XXXX - n Kube system
       klog.Warningf("message i want", count) 
        //Return the scoring set. Just pay attention to the format
       return schedulerapi.HostPriority{
              Host:  node.Name,
              Score: int(count),
       }, nil
}
 
// When actually calculating the scoring results, map - reduce is adopted. Those who understand the principle of hadoop should know. I won't say much here. If you are interested, you can understand it by yourself
var CalculateNodeMoreMemReduce = NormalizeReduce(schedulerapi.MaxPriority, false)

The program also contains some tool classes, which are placed in utils Go, omitted here

After writing all the scheduling strategies, don't worry. We still need to register our new strategy in the factory function
The location is \ PKG \ scheduler \ algorithmprovider \ defaults \ defaults go
 
Add a function about the primary strategy in func defaultpredictions(), such as
factory.RegisterFitPredicate(predicates.xxxDiskPred, predicates.xxxDiskPredicate),
 
In func defaultPriorities(), add a function about the optimization strategy, such as
factory.RegisterPriorityFunction2("MoreCpu", priorities.CalculateNodeMoreMemMap, priorities.CalculateNodeMoreMemReduce, 1000000),
In the optimization strategy, it should be noted that the last value passed in is the weight, here it is 1000000, which is used to set the weights of different optimization strategies. The actual score of a node is the sum of the scores of all optimization strategies * weights
This large number is set here to make this strategy the most important decision value. At present, we don't care about other strategies, which can be modified in future development,
There are 6 other self-contained optimization strategies. Except that the weight of nodepreferavoidodpodspriority is 10000, the rest are 1. These optimization strategies are also easy to understand. Just look at the code by yourself
 
 
Finally, summarize the modifications that have been made
Primary election
Function name: xdiskpredict
Role service: MySQL & & Pika
Objective: filter all servers that do not meet disk space requirements,
Strategy: all_ request_ disk + request_ disk <  total_disk * 90%
                  free_disk - request_disk > total_disk * 10%
 
 
Function name: xxxmemorymysqlpredicate
Role service: MySQL
Goal: filter servers that meet memory requirements
Strategy: Free_ mem - request_ mem > total_ mem * 10% 
         
 
Function name: xxxcpupredicate
Role services: all
Goal: filter servers that do not meet cpu requirements
Strategy: Free_ cpu > request_ cpu
        
Function name: xxxmemorypredicate
Role service: redis
Objective: to filter servers that do not meet mem requirements
Strategy: it needs to meet the two modes of degradation and non degradation of redis. See the code for details, and write waste paper
 
 
Preferred
Function name: calculatenodemomemmap
Role service: MySQL
Objective: to improve the scores of idle mem multi servers
Policy (free MEM / total MEM) * 10 * weight
 
https://www.cnblogs.com/zzyhogwarts/p/15077619.html

Added by mattock on Thu, 10 Mar 2022 15:01:56 +0200