Play with zookeeper using the three core languages Java, Python and Golang

This article video tutorial

https://www.bilibili.com/vide...

Pay attention to the official account and learn more exciting courses

Play with zookeeper using the three core languages Java, Python and Golang

Operate zookeeper using Java Native api

Create a maven project

Create a maven project

Add project dependency

<dependencies>
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.8</version>
    </dependency>
</dependencies>

https://mvnrepository.com/

Implementation code

package com.duoke360;

import org.apache.zookeeper.*;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class Test {
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        String connStr = "192.168.18.128:2181";
        CountDownLatch countDown = new CountDownLatch(1);

        Watcher watcher= event -> {
            if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
                System.err.println("eventType:"+event.getType());
                if(event.getType()== Watcher.Event.EventType.None){
                    countDown.countDown();
                }else if(event.getType()== Watcher.Event.EventType.NodeCreated){
                    System.out.println("listen:Node creation");
                }else if(event.getType()== Watcher.Event.EventType.NodeChildrenChanged){
                    System.out.println("listen:Child node modification");
                }
            }
        };

        ZooKeeper zookeeper = new ZooKeeper(connStr, 5000,watcher);
        countDown.await();

        //Register to listen. You should register again every time, otherwise you won't be able to listen
        // First create a root node root
        zookeeper.exists("/root/ghz", watcher);

        // Create node
        String result = zookeeper.create("/root/ghz", "Lao Guo".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println(result);

        Thread.sleep(10);

        // Get node
        byte[] bs = zookeeper.getData("/root/ghz", true, null);
        result = new String(bs);
        System.out.println("The data after node creation is:" + result);

        // Modify node
        zookeeper.setData("/root/ghz", "Multi course Network-Lao Guo".getBytes(), -1);

        Thread.sleep(10);

        bs = zookeeper.getData("/root/ghz", true, null);
        result = new String(bs);


        System.out.println("The data after modifying the node is:" + result);
        // Delete node
        zookeeper.delete("/root/ghz", -1);
        System.out.println("Node deleted successfully");
    }
}

Operation results

eventType:None
eventType:NodeCreated
listen:Node creation
/root/ghz
 The data after node creation is:Lao Guo
eventType:NodeDataChanged
 The data after modifying the node is:Multi course Network-Lao Guo
 Node deleted successfully
eventType:NodeDeleted

Use the java zkclient library to operate zookeeper md

Create a maven project

Create a maven project

Add dependency

<!-- https://mvnrepository.com/artifact/com.101tec/zkclient -->
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.10</version>
</dependency>

Implementation code

package com.duoke360;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.zookeeper.*;

import java.util.List;

public class Test {
    public static void main(String[] args) throws InterruptedException {
        String connStr = "192.168.18.128:2181";
        ZkClient zk = new ZkClient(connStr);

        // Registration [data] event
        zk.subscribeDataChanges("/root/ghz", new IZkDataListener() {

            @Override
            public void handleDataDeleted(String arg0) throws Exception {
                System.err.println("Data deletion:" + arg0);

            }

            @Override
            public void handleDataChange(String arg0, Object arg1) throws Exception {
                System.err.println("Data modification:" + arg0 + "------" + arg1);

            }
        });

        zk.subscribeChildChanges("/root", (arg0, arg1) -> {
            System.err.println("Child nodes change:" + arg0);
            arg1.forEach(f -> {
                System.out.println("content: " + f);
            });
        });

        List<String> list = zk.getChildren("/");
        list.forEach(e -> {
            System.out.println(e);
        });

        String res = zk.create("/root/ghz", "Multi course Network-Lao Guo", CreateMode.PERSISTENT);
        System.out.println("Create node/root/ghz success:" + res);

        zk.writeData("/root/ghz", "Multi course Network-zookeeper");
        System.out.println("Modify node/root/ghz Data success");

        res = zk.readData("/root/ghz");
        System.out.println("Node data:" + res);

        Thread.sleep(1000);

        zk.delete("/root/ghz");
        System.out.println("Delete node/root/ghz success");

        Thread.sleep(1000);

        System.out.println("------------------------------------------------");

        for (int i = 0; i < 10; i++) {
            zk.create("/root/ghz", "Multi course Network-Lao Guo", CreateMode.PERSISTENT);
            Thread.sleep(1000);
            zk.delete("/root/ghz");
            Thread.sleep(1000);
        }
    }
}

Operation results

node20000000002
zookeeper
node10000000001
root
 Create node/root/ghz success:/root/ghz
 Modify node/root/ghz Data success
 Node data:Multi course Network-zookeeper
content: ghz
 Data modification:/root/ghz------Multi course Network-zookeeper
 Child nodes change:/root
 Data modification:/root/ghz------Multi course Network-zookeeper
 Delete node/root/ghz success
 Data deletion:/root/ghz
 Child nodes change:/root
------------------------------------------------
Data modification:/root/ghz------Multi course Network-Lao Guo
 Child nodes change:/root
content: ghz
 Data deletion:/root/ghz
 Child nodes change:/root
 Data modification:/root/ghz------Multi course Network-Lao Guo
 Child nodes change:/root
content: ghz
 Data deletion:/root/ghz
 Child nodes change:/root
 Data modification:/root/ghz------Multi course Network-Lao Guo
 Child nodes change:/root

Operate zookeeper using Java curator Library

Create a maven project

Create a maven project

Add dependency

<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.2.0</version>
</dependency>

Implementation code

package com.duoke360;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

public class Test {
    public static void main(String[] args) throws Exception {
        String connStr = "192.168.18.128:2181";
        CuratorFramework cur= CuratorFrameworkFactory.builder()
                .connectString(connStr)
                .connectionTimeoutMs(5000)
                .retryPolicy(new ExponentialBackoffRetry(1000,3))
                .build();
        cur.start();//connect

        //Create listener
        PathChildrenCache cache=new PathChildrenCache(cur,"/root",true);
        cache.start();
        cache.rebuild();
        cache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework framwork, PathChildrenCacheEvent event) throws Exception {
                System.err.println("Node changes:"+event.getType());
            }
        });

        Stat stat=cur.checkExists().forPath("/root/ghz");
        if(stat!=null){
            System.out.println("/root/ghz Node exists, delete it directly");
            cur.delete().forPath("/root/ghz");
        }

        System.in.read();

        System.out.println("Ready to create /root/ghz");
        cur.create().withMode(CreateMode.PERSISTENT)
                .forPath("/root/ghz", "Multi course Network-Lao Guo".getBytes());
        System.out.println("node /root/ghz Created successfully");

        Thread.sleep(1000);

        byte[] bs=cur.getData().forPath("/root/ghz");
        System.out.println("data:"+new String(bs));

        Thread.sleep(1000);

        cur.delete().forPath("/root/ghz");

        Thread.sleep(1000);
    }
}

Operation results

test
 Ready to create /root/ghz
 node /root/ghz Created successfully
 Node changes:CHILD_ADDED
 data:Multi course Network-Lao Guo
 Node changes:CHILD_REMOVED

Using Python to operate zookeeper

install

pip install kazoo

Connect ZooKeeper

You can directly connect to ZooKeeper through the KazooClient class, support multiple host s, and the default port is 2181.

import json
from kazoo.client import KazooClient

zk = KazooClient(hosts='192.168.18.128:2181')
zk.start()

Create node

Let's first look at the definition of the create() method

def create(self, path, value=b"", acl=None, ephemeral=False,
               sequence=False, makepath=False):
        :param path: Path of node.
        :param value: Initial bytes value of node.
        :param acl: :class:`~kazoo.security.ACL` list.
        :param ephemeral: Boolean indicating whether node is ephemeral
                          (tied to this session).
        :param sequence: Boolean indicating whether path is suffixed
                         with a unique index.
        :param makepath: Whether the path should be created if it
                         doesn't exist.

Let's explain these parameters:

  • Path: node path
  • Value: the value corresponding to the node. Note that the type of value is bytes
  • ephemeral: if True, a temporary node will be created and automatically deleted after the session is interrupted. Default False
  • sequence: if it is True, 10 digits will be added after the node name you created (for example, when you create a testplatform/test node, you actually create testplatform/test0000000003, which is incremented in order). The default is False
  • makepath: if False, throw NoNodeError when the parent node does not exist. If True, the parent node does not exist, the parent node is created. Default False

example:

from kazoo.client import KazooClient

zk = KazooClient(hosts='192.168.18.128:2181')
zk.start()
# Create node: if makepath is set to True, it will be created if the parent node does not exist. If other parameters are not filled in, it will be the default
zk.create('/root/ghz',b'',makepath=True)
# Don't forget to close zk connection after operation
zk.stop()

View node

The KazooClient class provides get with_ The children () and get() methods get the values corresponding to the child nodes and nodes

from kazoo.client import KazooClient

zk = KazooClient(hosts='192.168.18.128:2181')
zk.start()
# Get all child nodes under a node
node = zk.get_children('/root')
# Get the value corresponding to a node
value = zk.get('/root/ghz')
# Don't forget to close zk connection after operation
zk.stop()
print(node,value)

Change node

Change the node value created above, using the set() method

from kazoo.client import KazooClient

zk = KazooClient(hosts='192.168.18.128:2181')
zk.start()
# Change the value corresponding to the node
zk.set('/root/ghz', b'duoke-ghz')
# Get the value corresponding to a node
value = zk.get('/root/ghz')
zk.stop()
print(value)

Delete node

Delete the node created above and use the delete() method

from kazoo.client import KazooClient

zk = KazooClient(hosts='192.168.18.128:2181')
zk.start()
# Delete the value corresponding to the node
zk.delete('/root/ghz',recursive=False)
zk.stop()

Parameter recursive: if it is False, an exception NotEmptyError will be thrown when the node to be deleted has child nodes. If True, delete this node and all child nodes of this node

watches event

All read operations of zookeeper have set the watch option (get_children(), get() and exists()). Watch is a trigger that is triggered when it is detected that there are changes in child nodes or changes in node value of zookeeper. Let's take the get () method as an example.

from kazoo.client import KazooClient

zk = KazooClient(hosts='192.168.18.128:2181')
zk.start()


def test(event):
    print('Trigger event')


if __name__ == "__main__":
    # The node must exist, otherwise it will be saved
    zk.get('/root/ghz', watch=test)
    print("First acquisition value")
    zk.set('/root/ghz', b'duoke-ghz')
    zk.get('/root/ghz', watch=test)
    zk.set('/root/ghz', b'duoke-ghz2')
    print("Second acquisition value")

    zk.stop()

Operation results

First acquisition value
 Trigger event
 Second acquisition value

Using golang to operate zookeeper

Download package

go get github.com/samuel/go-zookeeper

Connect to Server

package main

import (
    "fmt"
    "time"

    "github.com/samuel/go-zookeeper/zk"
)

func conn() *zk.Conn {
    var hosts = []string{"192.168.18.128:2181"}
    conn, _, err := zk.Connect(hosts, time.Second*5)
    defer conn.Close()
    if err != nil {
        fmt.Println(err)
        return nil
    } else {
        fmt.Println("Connection succeeded!")
        return conn
    }
}

func main() {
    conn()
}

Create node

func create() {
    var conn *zk.Conn = conn()
    defer conn.Close()
    var path = "/home"
    var data = []byte("Multi course Network")
    var flags int32 = 0
    //flags have four values:
    //0: permanent unless manually deleted
    //zk.FlagEphemeral = 1: short. If the session is disconnected, the modified node will also be deleted
    //zk. Flagsequence = 2: the sequence number will be automatically added after the node
    //3:Ephemeral and Sequence, that is, the Sequence number is added briefly and automatically
    var acls = zk.WorldACL(zk.PermAll) //Control access mode

    p, err_create := conn.Create(path, data, flags, acls)
    if err_create != nil {
        fmt.Println(err_create)
        return
    }
    fmt.Println("create:", p)
}

Modify node

func set() {
    var conn *zk.Conn = conn()
    defer conn.Close()
    var path = "/home"
    var data = []byte("Multi course Network-Lao Guo")
    conn.Set(path, data, -1)
    b, _, _ := conn.Get(path)
    fmt.Println("Data:" + string(b))
}

Delete node

func del() {
    var conn *zk.Conn = conn()
    defer conn.Close()
    var path = "/home"
    err := conn.Delete(path, -1)
    if err != nil {
        fmt.Println("Deletion failed!")
    } else {
        fmt.Println("Delete succeeded!")
    }
}

watch

func callback(event zk.Event) {
    fmt.Println("*******************")
    fmt.Println("path:", event.Path)
    fmt.Println("type:", event.Type.String())
    fmt.Println("state:", event.State.String())
    fmt.Println("*******************")
}

func watch() {

    var hosts = []string{"192.168.18.128:2181"}
    option := zk.WithEventCallback(callback)

    conn, _, err := zk.Connect(hosts, time.Second*5, option)
    defer conn.Close()
    if err != nil {
        fmt.Println(err)
        return
    }
    var path = "/home"
    _, _, _, err = conn.ExistsW(path)
    if err != nil {
        fmt.Println(err)
        return
    }

    // establish
    create(conn)

    time.Sleep(time.Second * 2)

    _, _, _, err = conn.ExistsW(path)
    if err != nil {
        fmt.Println(err)
        return
    }
    // delete
    del(conn)

}

Pay attention to the official account and learn more exciting courses

Keywords: Go

Added by jakebur01 on Tue, 28 Dec 2021 14:57:44 +0200