concept
-
When we develop stand-alone applications and synchronize level concurrency, we often use synchronized or Lock to solve the problem of code synchronization between multiple threads. At this time, multiple threads run under the same JVM without any problem.
-
However, when our application works in a distributed cluster, it belongs to a working environment with multiple JVMs. E-jing across JVMs cannot solve the synchronization problem through multi-threaded locks.
-
Then a more advanced locking mechanism is needed to deal with the problem of data synchronization between processes across machines - this is distributed locking.
-
Where resources are, distributed locks are placed there
What distributed locks do: data synchronization between processes across machines
Zookeeper distributed lock principle
Core idea: when the client wants to obtain a lock, it creates a node. After using the lock, it deletes the node.
Steps:
- When a client acquires a lock, a temporary sequence node is created under the lock node (this node is created casually).
- Temporary: it must be released anyway, so it is temporary. To avoid downtime, this node cannot be deleted
- Order: determines the chronological order in which nodes are created
- Then, the client obtains all the child nodes under the lock. After the client obtains all the child nodes, if it finds that the serial number of the child node it created is the smallest, it is considered that the client has obtained the lock. After using the lock, delete the node.
- If you find that the node you created is not the smallest of all the child nodes of lock, it means that you have not obtained the lock. At this time, the client needs to find the node that is smaller and register an event listener for it to listen for deletion events. (just find one smaller than yourself)
- If it is found that the node smaller than itself is deleted, the Watcher of the client will receive the corresponding notification. At this time, judge again whether the node you created is the one with the lowest sequence number among the lock child nodes. If so, you will obtain the lock. If not, repeat the above steps to continue to obtain a node smaller than yourself and register to listen.
Cursor implements distributed lock API
There are five locking schemes in cursor:
- InterProcessSemaphoreMutex: distributed exclusive lock (non reentrant lock)
- Once before
- Non reentrant: after the client obtains the shared resource, if it is non reentrant, if it wants to enter the resource again, it must release the lock before it can enter, and then compete with other lock resources
- Reentrant: if you want to go in again after going in once, you can go in again
- InterProcessMutex: distributed reentrant exclusive lock
- InterProcessReadWriteLock: distributed read / write lock
- InterProcessMultilock: move a lock as a container managed by a single entity
- InterProcessSemaphoreV2: shared semaphore
Simulated 12306 ticketing case
-
Implement the client through threads
public class LockTest { public static void main(String[] args) { Ticket12306 ticket12306 = new Ticket12306(); //Create client Thread t1 = new Thread(ticket12306,"Ctrip"); Thread t2 = new Thread(ticket12306,"Fliggy"); t1.start(); t2.start(); } }
-
Implementation of distributed lock
public class Ticket12306 implements Runnable{ private int tickets = 10;//Number of votes in the database //A class of distributed lock implementation private InterProcessMutex lock ; public Ticket12306(){ //Retry policy RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10); //2. The second method //CuratorFrameworkFactory.builder(); CuratorFramework client = CuratorFrameworkFactory.builder() .connectString("192.168.149.135:2181") .sessionTimeoutMs(60 * 1000) .connectionTimeoutMs(15 * 1000) .retryPolicy(retryPolicy) .build(); //Open connection client.start(); //Create locks and initialize locks. The lock node will be created automatically lock = new InterProcessMutex(client,"/lock"); } @Override public void run() { while(true){ //Acquire lock try { lock.acquire(3, TimeUnit.SECONDS);//Lock if(tickets > 0){ System.out.println(Thread.currentThread()+":"+tickets); Thread.sleep(100); tickets--; } } catch (Exception e) { e.printStackTrace(); }finally { //Release lock try { lock.release(); } catch (Exception e) { e.printStackTrace(); } } } } }