zookeeper -- zk implementing distributed lock

Reasons for locking:

  1. Multitasking to grab a resource
  2. Multitasking requires writing to resources
  3. Multitasking access to resources is mutually exclusive

1. What happens when you are multithreaded without using locks

Define a generated order ID class, that is, a resource for multitasking

public class OrderNumGenerator {
    //Global order id
    public  static int count = 0;
    //Generate order ID
    public   String getNumber() {
        SimpleDateFormat simpt = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss");
        return simpt.format(new Date()) + "-" + ++count;
    }
}

Use CountDownLatch, a concurrent toolbar, to start 500 threads to call resources

public class OrderService implements Runnable {
    private OrderNumGenerator orderNumGenerator = new OrderNumGenerator();

    //Starting gun, simulating 500 concurrent
    private static CountDownLatch countDownLatch = new CountDownLatch(500);

    private static List<String> result = new Vector<String>();

    public void run() {
        try {
            countDownLatch.await();
            result.add(orderNumGenerator.getNumber());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws InterruptedException {
    	System.out.println("####Generate unique order number###");
    	for (int i = 0; i < 500; i++) {
    		new Thread(new OrderService()).start();
    		countDownLatch.countDown();
    	}
    	countDownLatch.await();
    	Thread.sleep(1000);

    	Collections.sort(result);
    	for(String str:result) {
    		System.out.println(str);
    	}
    }
}

It was found that there were duplicate order ID s

If it is not in the distributed environment, we usually use synchronized or Lock to solve this problem

2,synchronized

Directly add a lock to the resource, that is, the code that generates the order ID

public class OrderNumGenerator {
    //Global order id
    public  static int count = 0;
    public  static Object lock = new Object();

    public  String getNumber() {
        synchronized(lock){
            SimpleDateFormat simpt = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss");
            return simpt.format(new Date()) + "-" + ++count/*+"_"+Thread.currentThread().getId()*/;
        }
    }
}

3. Lock mode

Similarly, the problem can be solved by locking the resources

public class OrderNumGenerator {
    //Global order id
    public  static int count = 0;
    private java.util.concurrent.locks.Lock lock = new ReentrantLock();

    //Solve it by lock
    public  String getNumber() {
        try {
            lock.lock();
            SimpleDateFormat simpt = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss");
            String s = simpt.format(new Date()) + "-" + ++count;
            return s;
        }finally {
            lock.unlock();
        }
    }
}

However, the above two methods can be solved in a distributed environment. Before understanding the distributed lock, first understand the template method

That is, the main process is defined in the parent class, and some methods in the main process can be delayed to the subclass for implementation

4. Template method

Parent class

public abstract class FatherTemplate {
	
	public void A() {
		System.out.println("A");
	}

	public abstract void B() ;

	public void C() {
		System.out.println("C");
	}
	
	public void D() {
		A();
		B();
		C();
	}
}

Subclass

public class SonTemplate extends FatherTemplate{

	@Override
	public void B() {
		System.out.println("B");
		return;
	}
	
	public static void main(String[] args) {
		FatherTemplate sonTemplate = new SonTemplate();
		sonTemplate.D();
	}
}

Such a different subclass can define its own B method and invoke its own B method in the actual call process.

5. Implementation of distributed lock by zookeeper

Define an interface

//Interface
public interface Lock {
    //Resource to get lock
    public void getLock();
    // Release lock
    public void unLock();
}

Define a template

public abstract class AbstractLock implements Lock{
    public void getLock() {
        if (tryLock()) {
            System.out.println("##Acquire lock####");
        } else {
            waitLock();
            getLock();
        }
    }

    public abstract boolean tryLock();

    public abstract void waitLock();
}

Next, use zookeeper to implement tryLock(), waitLock(), and unLock()

Use tryokper() to obtain a temporary lock. If tryokper() is used to obtain a temporary lock

waitLock(): use zookeeper to register a listener of the temporary node and use the concurrency tool class. If the node already exists, wait; When it is detected that it is deleted, continue to execute

Node delete: Unlock

public class ZookeeperDistrbuteLock extends ZookeeperAbstractLock {
    private CountDownLatch countDownLatch = null;

    @Override
    //Attempt to acquire lock
    public  boolean tryLock() {
        try {
            zkClient.createEphemeral(PATH);
            return true;
        } catch (Exception e) {
            //If the creation fails, an exception is reported
            return false;
        }
    }

    @Override
    public void waitLock() {
        IZkDataListener izkDataListener = new IZkDataListener() {
            public void handleDataDeleted(String path) throws Exception {
                // Wake up the waiting thread
                if (countDownLatch != null) {
                    countDownLatch.countDown();
                }
            }
            public void handleDataChange(String path, Object data) throws Exception {
            }
        };
        // Registration event
        zkClient.subscribeDataChanges(PATH, izkDataListener);

        //If the node is saved
        if (zkClient.exists(PATH)) {
            countDownLatch = new CountDownLatch(1);
            try {
                //Wait until the event notification is received
                countDownLatch.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        // Delete listening
        zkClient.unsubscribeDataChanges(PATH, izkDataListener);
    }

    public void unLock() {
        //Release lock
        if (zkClient != null) {
            zkClient.delete(PATH);
            zkClient.close();
            System.out.println("Release lock resource...");
        }
    }
}

In this way, we can use the zookeeper distributed lock implemented by ourselves

private Lock lock = new ZookeeperDistrbuteLock();
public void getNumber() {
        try {
            lock.getLock();
            //Business operation
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unLock();
        }
    }

6. zookeeper distributed lock optimization

The above method has herd effect, that is, when one thread gets the lock, other threads are waiting. When the lock is released, all other threads will grab it

tryLock(): use zookeeper to create a temporary ordered node, and the current node ranks first among all temporary ordered nodes to obtain the lock; If it is not the first node, find the node in front of it

waitLock(): use zookeeper to register the listening of a front node. When the supervisor hears that the front node is deleted, it will continue to execute

unLock(): delete node

In this way, multiple locks are created according to the request sequence of multiple tasks. Each lock monitors whether the previous lock is deleted. If it is detected, continue to execute and try to get the lock. There will be no herding

public class ZookeeperDistrbuteLock2 extends ZookeeperAbstractLock {
    private CountDownLatch countDownLatch= null;

    private String beforePath;//Current requested node previous node
    private String currentPath;//Currently requested node

    public ZookeeperDistrbuteLock2() {
        if (!this.zkClient.exists(PATH2)) {
            this.zkClient.createPersistent(PATH2);
        }
    }

    @Override
    public boolean  tryLock() {
        //If currentPath is empty, it is the first attempt to lock, and the first lock is assigned currentPath
        if(currentPath == null || currentPath.length()<= 0){
            //Create a temporary sequence node
            currentPath = this.zkClient.createEphemeralSequential(PATH2 + '/',"lock");
        }
        //Get all temporary nodes and sort them. The name of the temporary node is a self growing string, such as 0000000400
        List<String> childrens = this.zkClient.getChildren(PATH2);
        Collections.sort(childrens);

        if (currentPath.equals(PATH2 + '/'+childrens.get(0))) {//If the current node ranks first among all nodes, the lock is obtained successfully
            return true;
        } else {//If the current node does not rank first among all nodes, get the previous node name and assign it to beforePath
            int wz = Collections.binarySearch(childrens,
                    currentPath.substring(7));
            beforePath = PATH2 + '/'+childrens.get(wz-1);
        }
        return false;
    }

    @Override
    public void waitLock() {
        IZkDataListener listener = new IZkDataListener() {
            public void handleDataDeleted(String dataPath) throws Exception {
                if(countDownLatch!=null){
                    countDownLatch.countDown();
                }
            }

            public void handleDataChange(String dataPath, Object data) throws Exception {
            }
        };
        //Adding a data deletion watcher to the front node is essentially to start another thread to listen to the front node
        this.zkClient.subscribeDataChanges(beforePath, listener);

        if(this.zkClient.exists(beforePath)){
            countDownLatch=new CountDownLatch(1);
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        this.zkClient.unsubscribeDataChanges(beforePath, listener);
    }


    public void unLock() {
        //Delete current temporary node
        zkClient.delete(currentPath);
        zkClient.close();
    }
}

Keywords: Zookeeper Distribution Cloud Native

Added by jason213123 on Tue, 15 Feb 2022 07:07:12 +0200