[Zookeeper Learning Notes]|12, Zookeeper Realization - Master Election

1. Introduction

1. Overview

a:Master elections are a very common scenario in distributed systems.In distributed systems, master-slave mode is often used to avoid single point failures and improve the availability of system services.
b: Normally, Master nodes are used to coordinate other system units in the cluster, maintain system state information, or take charge of some complex logic before synchronizing processing results to other nodes.
When the Master node is down or otherwise unable to provide services due to other problems, a Master election will be initiated to select a new Master node from the candidate nodes to continue to provide services.

2. Scenes

In some read-write separated applications, the Master node is responsible for the client's write requests and synchronizes the results to the slave node after processing.

3. Election algorithm

The well-known election algorithms are Paxos algorithm, Raft algorithm, Bully algorithm and so on.

4. Strong Consistency

a: Node creation guarantees global uniqueness in distributed high concurrency scenarios.

b: Zookeeper will ensure that clients cannot create an existing data node repeatedly.

c: If there are multiple client requests to create the same node at the same time, only one client request must be able to create successfully in the end.

2. Apache Curator implements Master elections

1. Curator offers two election options: Leader Latch and Leader Election.

2. Scheme 1. Leader Latch
1) Overview

The system will randomly select one of the candidates as leader until close() is called to release leadship, and then re-elect the leader randomly, otherwise the other candidates cannot become leader.

  1. Simulating nodes in a distributed environment using threads

A: Each thread will create a Zookeeper client and a LeaderLatch object for the election.

b: Each thread has a name with a number in it to distinguish it.

c: Each thread has a lifetime of number * 10 seconds, after which the LeaderLatch object and client are shut down, indicating that the'node'is down.

d: The node is a Master node, and the system will re-launch the Master election.

3) Code

package com.hao.demo.zookeeper.masterelection;

import com.hao.demo.zookeeper.publishsubscribe.ZKUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author haojunhu
 * @date 2020-06-06
 * Curator Election Plan 1. LeaderLatchTest
 */
@Slf4j
public class LeaderLatchTest {
    private static final String zkServerIps = "localhost:2181";

    private static final String masterPath = "/testZK/leader_latch";

    public static void main(String[] args) {
        final int clientNums = 5; // Number of clients for simulation
        final CountDownLatch countDownLatch = new CountDownLatch(clientNums);

        List<LeaderLatch> latchList = new CopyOnWriteArrayList<>();
        List<CuratorFramework> clientList = new CopyOnWriteArrayList<>();
        AtomicInteger atomicInteger = new AtomicInteger(1);

        for (int i = 0; i < clientNums; i++) {
            new Thread(() -> {

                CuratorFramework client = ZKUtils.getClient(); // Create Client
                clientList.add(client);
                int number = atomicInteger.getAndIncrement();
                final LeaderLatch latch = new LeaderLatch(client, masterPath, "client#" + number);
                log.info("create client =>{}", latch.getId());
                // LeaderLatch Add Listening Events
                latch.addListener(new LeaderLatchListener() {
                    @Override
                    public void isLeader() {
                        log.info("{}Say: I am now elected Leader! I'm starting to do something.", latch.getId());
                    }

                    @Override
                    public void notLeader() {
                        log.info("{}Say: I'm sorry I lost the election,I'll go to rest and wait.", latch.getId());
                    }
                });
                latchList.add(latch);
                try {
                    latch.start();
                    Thread.sleep(number * 10000); // Shut down client after random wait number*10 seconds
                } catch (Exception e) {
                    log.error("e=>{}", e.getMessage());
                } finally {
                    log.info("client =>{} is close", latch.getId());
                    CloseableUtils.closeQuietly(latch);
                    CloseableUtils.closeQuietly(client);
                    countDownLatch.countDown();
                }

            }).start();
        }

    }

    /**
     * To ensure the order in which client s are created
     * @return
     */
//    private static synchronized CuratorFramework getClient() {
//        CuratorFramework client = CuratorFrameworkFactory.builder()
//                .connectString(zkServerIps)
//                .sessionTimeoutMs(6000)
//                .connectionTimeoutMs(3000)
//                //.namespace("LeaderLatchTest")
//                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
//                .build();
//        client.start();
//        return client;
//    }

}

3. Scheme 2. Leader Election
1) Overview

A: Leader Election elections for Master elections require the addition of a LeaderSelectorListener listener to control leadership.

b: When the node is selected as leader, the takeLeadership method is called for business logic processing, and leadship is released immediately upon completion of processing.

c: Rerun the Master election so that each node has the potential to become a leader.

The invocation of the d:autoRequeue() method ensures that this instance may also gain leadership when it releases leadership.

2) Code

package com.hao.demo.zookeeper.masterelection;

import com.hao.demo.zookeeper.publishsubscribe.ZKUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.utils.CloseableUtils;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author haojunhu
 * @date 2020-06-06
 * Election Plan II. Leader Election
 */
@Slf4j
public class LeaderElectionTest {
    private static final String zkServerIps = "localhost:2181";

    private static final String masterPath = "/testZK/leader_selector";

    public static void main(String[] args) {
        final int clientNums = 5; // Number of clients used for simulation
        final CountDownLatch countDownLatch = new CountDownLatch(clientNums);

        List<LeaderSelector> selectorList = new CopyOnWriteArrayList<>();
        List<CuratorFramework> clientList = new CopyOnWriteArrayList<>();

        AtomicInteger atomicInteger = new AtomicInteger(1);

        try {
            for (int i = 0; i < clientNums; i++) {
                new Thread(() -> {

                    CuratorFramework client = ZKUtils.getClient();
                    clientList.add(client);

                    int number = atomicInteger.getAndIncrement();
                    final String name = "client#" + number;
                    final LeaderSelector selector = new LeaderSelector(client, masterPath, new LeaderSelectorListener() {
                        @Override
                        public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
                            log.info("{}Say: I am now elected Leader!I'm starting to do something.", name);
                            Thread.sleep(3000);
                        }

                        @Override
                        public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {

                        }
                    });
                    log.info("create client =>{}", name);
                    try {
                        selector.autoRequeue();
                        selector.start();
                        selectorList.add(selector);
                        Thread.sleep(number * 10000); // Shut down client after random wait number * 10s
                    } catch (Exception e) {

                    } finally {
                        countDownLatch.countDown();
                        log.info("client =>{} is close", name);
                        CloseableUtils.closeQuietly(selector);
                        if (!client.getState().equals(CuratorFrameworkState.STOPPED)) {
                            CloseableUtils.closeQuietly(client);
                        }
                    }
                }).start();
            }
            countDownLatch.await(); // Wait as long as all threads are pushed
        } catch (Exception e) {
            log.error("e=>{}", e.getMessage());
        }

    }

    /**
     * To ensure the order in which client s are created
     *
     * @return
     */
//    private static synchronized CuratorFramework getClient() {
//        CuratorFramework client = CuratorFrameworkFactory.builder()
//                .connectString(zkServerIps)
//                .sessionTimeoutMs(6000)
//                .connectionTimeoutMs(3000)
//                //.namespace("LeaderLatchTest")
//                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
//                .build();
//        client.start();
//        return client;
//    }
}

  1. Be careful

a:LeaderSelectorListener class inherits ConnectionStateListener

b: Once LeaderSelector starts, it adds a listener to the curator client.You must always be aware of changes in connections when using LeaderSelector.

c: Once a connection problem such as SUSPENDED occurs, the curator instance must ensure that it is no longer the leader until it receives the RECONNECTED again.

d:LOST appears, the curator instance is no longer a leader, and its takeLeadership() should exit directly.

  1. Introduction

A: If a SUSPENDED or LOST connection problem occurs, throw the CancelLeadershipException directly, and the leaderSelector instance attempts to interrupt and cancel the thread executing the takeLeadership() method.

public interface LeaderSelectorListener extends ConnectionStateListener

b: Recommended extension of LeaderSelectorListenerAdapter: provides recommended handling.

public abstract class LeaderSelectorListenerAdapter implements LeaderSelectorListener {
    public LeaderSelectorListenerAdapter() {
    }

    public void stateChanged(CuratorFramework client, ConnectionState newState) {
        if (client.getConnectionStateErrorPolicy().isErrorState(newState)) {
            throw new CancelLeadershipException();
        }
    }
}


c:Extension class

package com.hao.demo.zookeeper.masterelection;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;

/**
 * @author haojunhu
 * @date 2020-06-09
 * Mode 1
 */
public class MyLeaderSelectorListenerAdapter extends LeaderSelectorListenerAdapter {
    @Override
    public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
        
    }
}

// Mode 2
final LeaderSelector selector = new LeaderSelector(client, masterPath, new LeaderSelectorListenerAdapter() {
    @Override
    public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
        log.info("{}Say: I am now elected Leader!I'm starting to do something.", name);
        Thread.sleep(3000);
    }
});

Keywords: Apache Java Zookeeper Lombok

Added by fredcool on Wed, 10 Jun 2020 04:16:55 +0300