[From Getting Started to Abandoning-ZooKeeper] ZooKeeper Actual-Distributed Lock-Upgrade

Preface

Above [From Getting Started to Abandoning-ZooKeeper] ZooKeeper Actual-Distributed Lock In ZooKeeper, we implement a distributed lock by utilizing the temporary node feature of ZooKeeper.
However, it is a polling method to determine whether a continuous attempt to acquire a lock is necessary for the CPU to idle, and the herding effect is prone to occur when there is intense competition for locks among multiple threads.

To solve the above two problems.This article looks at how to implement an upgraded version of a distributed lock.

Design

We still implement the java.util.concurrent.locks.Lock interface.
Unlike the previous implementation, we used ZooKeeper's EPHEMERAL_SEQUENTIAL temporary order node.
When a lock is first acquired, a temporary node is created, and if the number at the end of the temporary node is the smallest of the nodes with the same name under the current parent node, the lock is acquired successfully.
Otherwise, listen on the last node with a larger number until the last node is released, and then try to acquire the lock again.This avoids competition caused by multiple threads acquiring a lock at the same time.
The watch ability provided by ZooKeeper is also used to avoid CPU idle caused by polling.
A volatile int type state is used to count locks after they are acquired to implement the locks reentrant mechanism.

DistributedFairLock

public class DistributedFairLock implements Lock {
    private static Logger logger = LoggerFactory.getLogger(DistributedFairLock.class);

    //ZooKeeper client, ZooKeeper operation
    private ZooKeeper zooKeeper;

    //Root Node Name
    private String dir;

    //Lock Node
    private String node;

    //ZooKeeper Authentication Information
    private List<ACL> acls;

    //To lock nodes
    private String fullPath;

    //Lock identification, 0 means no lock is acquired, one for each lock acquired and one for each lock released.Disconnect when reduced to 0 and delete temporary nodes.
    private volatile int state;

    //Node id created by current lock
    private String id;

    //Block through CountDownLatch until the last listening node is cancelled before proceeding
    private CountDownLatch countDownLatch;

    /**
     * Constructor.
     *
     * @param zooKeeper the zoo keeper
     * @param dir       the dir
     * @param node      the node
     * @param acls      the acls
     */
    public DistributedFairLock(ZooKeeper zooKeeper, String dir, String node, List<ACL> acls) {
        this.zooKeeper = zooKeeper;
        this.dir = dir;
        this.node = node;
        this.acls = acls;
        this.fullPath = dir.concat("/").concat(this.node);
        init();
    }

    private void init() {
        try {
            Stat stat = zooKeeper.exists(dir, false);
            if (stat == null) {
                zooKeeper.create(dir, null, acls, CreateMode.PERSISTENT);
            }
        } catch (Exception e) {
            logger.error("[DistributedFairLock#init] error : " + e.toString(), e);
        }
    }
}

lock

public void lock() {
    try {
        //Locking
        synchronized (this) {
            //If the lock is not currently held
            if (state <= 0) {
                //Create Node
                if (id == null) {
                    id = zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL_SEQUENTIAL);
                }

                //Get all nodes under the current path
                List<String> nodes = zooKeeper.getChildren(dir, false);
                SortedSet<String> sortedSet = new TreeSet<>();
                for (String node : nodes) {
                    sortedSet.add(dir.concat("/").concat(node));
                }

                //Get all nodes whose IDs are less than the current node order
                SortedSet<String> lessSet = ((TreeSet<String>) sortedSet).headSet(id);

                if (!lessSet.isEmpty()) {
                    //Listen on the previous node to achieve fair lock by avoiding multi-lock competition and CPU idling  
                    Stat stat = zooKeeper.exists(lessSet.last(), new LockWatcher());
                    if (stat != null) {
                        countDownLatch = new CountDownLatch(1);
                        countDownLatch.await();
                    }

                }
            }

            state++;
        }
    } catch (InterruptedException e) {
        logger.error("[DistributedFairLock#lock] error : " + e.toString(), e);
        Thread.currentThread().interrupt();
    } catch (KeeperException ke) {
        logger.error("[DistributedFairLock#lock] error : " + ke.toString(), ke);
        if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) {
            Thread.currentThread().interrupt();
        }
    }
}

tryLock

public boolean tryLock() {
    try {
        synchronized (this) {
            if (state <= 0) {
                if (id == null) {
                    id = zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL_SEQUENTIAL);
                }

                List<String> nodes = zooKeeper.getChildren(dir, false);
                SortedSet<String> sortedSet = new TreeSet<>();
                for (String node : nodes) {
                    sortedSet.add(dir.concat("/").concat(node));
                }


                SortedSet<String> lessSet = ((TreeSet<String>) sortedSet).headSet(id);

                if (!lessSet.isEmpty()) {
                    return false;
                }
            }
            state++;
        }
    } catch (InterruptedException e) {
        logger.error("[DistributedFairLock#tryLock] error : " + e.toString(), e);
        return false;
    } catch (KeeperException ke) {
        logger.error("[DistributedFairLock#tryLock] error : " + ke.toString(), ke);
        if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) {
            return false;
        }
    }
    return true;
}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
    try {
        synchronized (this) {
            if (state <= 0) {
                if (id == null) {
                    id = zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL_SEQUENTIAL);
                }

                List<String> nodes = zooKeeper.getChildren(dir, false);
                SortedSet<String> sortedSet = new TreeSet<>();
                for (String node : nodes) {
                    sortedSet.add(dir.concat("/").concat(node));
                }


                SortedSet<String> lessSet = ((TreeSet<String>) sortedSet).headSet(id);

                if (!lessSet.isEmpty()) {
                    Stat stat = zooKeeper.exists(lessSet.last(), new LockWatcher());
                    if (stat != null) {
                        countDownLatch = new CountDownLatch(1);
                        countDownLatch.await(time, unit);
                    }

                }
            }

            state++;
        }
    } catch (InterruptedException e) {
        logger.error("[DistributedFairLock#tryLock] error : " + e.toString(), e);
        return false;
    } catch (KeeperException ke) {
        logger.error("[DistributedFairLock#tryLock] error : " + ke.toString(), ke);
        if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) {
            return false;
        }
    }
    return true;
}

unlock

public void unlock() {
    synchronized (this) {
        if (state > 0) {
            state--;
        }
        //Delete the temporary node created when the lock is no longer held
        if (state == 0 && zooKeeper != null) {
            try {
                zooKeeper.delete(id, -1);
                id = null;
            } catch (Exception e) {
                logger.error("[DistributedFairLock#unlock] error : " + e.toString(), e);
            }
        }
    }
}

LockWatcher

private class LockWatcher implements Watcher {
    @Override
    public void process(WatchedEvent event) {
        synchronized (this) {
            if (countDownLatch != null) {
                countDownLatch.countDown();
            }
        }
    }
}

summary

Above is our improved fair re-entrant of distributed locks through temporary sequential nodes and the watch mechanism.
The source code is visible: aloofJr
Avoid CPU idle caused by polling through the watch mechanism.
The herding effect is avoided through sequential temporary nodes.

If there are better optimization schemes for the above ways, you are welcome to discuss them together.

More articles

See my blog: https://nc2era.com

written by AloofJr , please indicate the source

Keywords: Java Zookeeper less

Added by thinsoldier on Wed, 11 Dec 2019 08:43:43 +0200