Use and Analysis of [Curator] Node Cache

Node Cache

Use node data as local cache.This class can listen on the nodes, handle the addition and deletion of nodes, data synchronization, and so on. You can also register custom listeners to control these data movement operations in more detail.

1. Key API s

org.apache.curator.framework.recipes.cache.NodeCache

org.apache.curator.framework.recipes.cache.NodeCacheListener

org.apache.curator.framework.recipes.cache.ChildData

2. Description of the mechanism

  • Just a cache of individual data
  • Internal use of state machines as processing control for different operations

3. Usage

3.1 Creation

public NodeCache(CuratorFramework client,
                         String path)

3.2 Use

Again, you need to call start() before using it, and close() after using it.

getCurrentData() can be called at any time to get the status and data of the current cache.

You can also get the listener container through getListenable(), and add custom listeners on this basis:

public void addListener(NodeCacheListener listener)

4. Error handling

The NodeCache instance already comes with a ConnectionStateListener to handle link state changes.

5. Source Code Analysis

5.1 Class Definition

public class NodeCache implements Closeable{}
  • Implement java.io.Closeable

5.2 Member Variables

public class NodeCache implements Closeable
{
    private final Logger log = LoggerFactory.getLogger(getClass());
    private final CuratorFramework client;
    private final String path;
    private final boolean dataIsCompressed;
    private final AtomicReference<ChildData> data = new AtomicReference<ChildData>(null);
    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
    private final ListenerContainer<NodeCacheListener> listeners = new ListenerContainer<NodeCacheListener>();
    private final AtomicBoolean isConnected = new AtomicBoolean(true);
    private ConnectionStateListener connectionStateListener = new ConnectionStateListener()
    {
        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState)
        {
            if ( (newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED) )
            {
                if ( isConnected.compareAndSet(false, true) )
                {
                    try
                    {
                        reset();
                    }
                    catch ( Exception e )
                    {
                        ThreadUtils.checkInterrupted(e);
                        log.error("Trying to reset after reconnection", e);
                    }
                }
            }
            else
            {
                isConnected.set(false);
            }
        }
    };

    private Watcher watcher = new Watcher()
    {
        @Override
        public void process(WatchedEvent event)
        {
            try
            {
                reset();
            }
            catch(Exception e)
            {
                ThreadUtils.checkInterrupted(e);
                handleException(e);
            }
        }
    };

    private enum State
    {
        LATENT,
        STARTED,
        CLOSED
    }

    private final BackgroundCallback backgroundCallback = new BackgroundCallback()
    {
        @Override
        public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
        {
            processBackgroundResult(event);
        }
    };
}
  • log
  • client
  • path
    • Node Path
  • dataIsCompressed
    • Is the data compressed
  • data
    • AtomicReference
    • Stores locally cached data
    • Cached data is encapsulated as ChildData
  • state
    • AtomicReference
    • state
    • Internal Enumeration
      • LATENT (default)
      • STARTED
      • CLOSED
  • listeners
    • org.apache.curator.framework.listen.ListenerContainer
    • Listener Container
  • isConnected
    • Is ZK Connected
    • AtomicBoolean
  • connectionStateListener
    • Built-in link state listener
  • watcher
    • Built-in node listeners
    • Call reset() to reset once the node changes
  • backgroundCallback
    • Node Data Callback Operation
    • Avoid Thread Blocking

5.3 Constructor

public NodeCache(CuratorFramework client, String path)
{
    this(client, path, false);
}

public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed)
{
    this.client = client;
    this.path = PathUtils.validatePath(path);
    this.dataIsCompressed = dataIsCompressed;
}

The constructor is simple, that is, assignment processing. So most of the logic is still in start().

5.4 Startup

public void     start() throws Exception
{
    start(false);
}

public void     start(boolean buildInitial) throws Exception
{
    Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");

    client.getConnectionStateListenable().addListener(connectionStateListener);

    if ( buildInitial )
    {
        client.checkExists().creatingParentContainersIfNeeded().forPath(path);
        internalRebuild();
    }
    reset();
}

private void     internalRebuild() throws Exception
{
    try
    {
        Stat    stat = new Stat();
        byte[]  bytes = dataIsCompressed ? client.getData().decompressed().storingStatIn(stat).forPath(path) : client.getData().storingStatIn(stat).forPath(path);
        data.set(new ChildData(path, stat, bytes));
    }
    catch ( KeeperException.NoNodeException e )
    {
        data.set(null);
    }
}

private void     reset() throws Exception
{
    if ( (state.get() == State.STARTED) && isConnected.get() )
    {
        client.checkExists().creatingParentContainersIfNeeded().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
    }
}

You can see

  • The default is to not initialize cached data

Take a look at the start-up process again

  1. Atomic Operations Update Startup Status
  2. Add a connectionStateListener listener for links
  3. If you need to initialize the cache
    1. Create Node
    2. Call internalRebuild(), initial data
      1. Synchronize Node Data and Status
      2. Write to local cache
  4. Call reset()
    1. Check the existence of cache nodes in normal state
      • Step 3 may be a good initialization action
      • Adding watcher for node
      • Callback triggers backgroundCallback
        • Call the processBackgroundResult() method (state machine)

5.4.1 processBackgroundResult method

A large part of the startup logic is in the processBackgroundResult method.So here's another look at this approach:

private void processBackgroundResult(CuratorEvent event) throws Exception
{
    switch ( event.getType() )
    {
        case GET_DATA:
        {
            if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
            {
                ChildData childData = new ChildData(path, event.getStat(), event.getData());
                setNewData(childData);
            }
            break;
        }

        case EXISTS:
        {
            if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
            {
                setNewData(null);
            }
            else if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
            {
                if ( dataIsCompressed )
                {
                    client.getData().decompressed().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
                }
                else
                {
                    client.getData().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
                }
            }
            break;
        }
    }
}

Since startup comes first, let's look at the EXISTS event first:

  1. If the node does not exist
    1. Call setNewData(null) to set the data to null
  2. If Node Exists
    1. If data needs to be compressed
      1. Then decompress to get the value
    2. Otherwise, get the data directly
      • Okay, here we get the value back and forth through the backgroundCallback, so we'll go back to the processBackgroundResult in a new state
      • GET_DATA
      • state machine

Take another look at the GET_DATA event:

  1. If reading is normal
    1. Building childData
    2. Call setNewData, assign

5.4.2 setNewData method

So, you also need to look at the setNewData method:

private void setNewData(ChildData newData) throws InterruptedException
{
    ChildData   previousData = data.getAndSet(newData);
    if ( !Objects.equal(previousData, newData) )
    {
        listeners.forEach
        (
            new Function<NodeCacheListener, Void>()
            {
                @Override
                public Void apply(NodeCacheListener listener)
                {
                    try
                    {
                        listener.nodeChanged();
                    }
                    catch ( Exception e )
                    {
                        ThreadUtils.checkInterrupted(e);
                        log.error("Calling listener", e);
                    }
                    return null;
                }
            }
        );

        if ( rebuildTestExchanger != null )
        {
            try
            {
                rebuildTestExchanger.exchange(new Object());
            }
            catch ( InterruptedException e )
            {
                Thread.currentThread().interrupt();
            }
        }
    }
}
  1. Local Cached data Assignment New Value
  2. If an update to the cache is found
    1. Trigger the listener in the listening container (synchronous call)
    2. rebuildTestExchanger
      • This can be skipped basically
      • Send a signal object to other sites during testing

5.4.3 Summary

The start-up process can be roughly divided into

  1. Add Link Listener
  2. If you need to initialize the node, create it and pull the cached data locally
  3. Add a node listener to the node and mount the callback method
  4. Synchronize cached data by callback state machine

5.5 Getting Cached Data

public ChildData getCurrentData()
{
    return data.get();
}

Read cached data directly. For users, only the local cache needs to be manipulated. The local cache and ZK node data are synchronized through the listener callback state machine.

5.6 Shutdown

Cache is exhausted and close() needs to be called

public void close() throws IOException
{
    if ( state.compareAndSet(State.STARTED, State.CLOSED) )
    {
        listeners.clear();
        client.clearWatcherReferences(watcher);
        client.getConnectionStateListenable().removeListener(connectionStateListener);

        // TODO
        // From PathChildrenCache
        // This seems to enable even more GC - I'm not sure why yet - it
        // has something to do with Guava's cache and circular references
        connectionStateListener = null;
        watcher = null;
    }        
}
  • Atomic Operations Update Status
  • Clean up listener containers
  • Clean up listeners on nodes
  • Clean up listeners on links

No empty local cache data

6. Summary

and Path Cache Node Cache is much simpler than that.

  • Path Cache
    • More like a Cache Manager
    • Managing multiple cache s under path
    • Due to the presence of multiple cache s
      • Synchronization logic is complex
      • Concurrency issues are more serious
        • So it's used internally
          • Command mode
          • Asynchronous sequential execution
  • Node Cache
    • Just a cache of individual data
    • Also, the nature of caching data does not require strict concurrency control (dirty data is acceptable)
    • Use a callback state machine to handle different data states

Keywords: Apache Java

Added by chuckwgn on Sun, 23 Jun 2019 19:48:33 +0300