Node Cache
Use node data as local cache.This class can listen on the nodes, handle the addition and deletion of nodes, data synchronization, and so on. You can also register custom listeners to control these data movement operations in more detail.
1. Key API s
org.apache.curator.framework.recipes.cache.NodeCache
org.apache.curator.framework.recipes.cache.NodeCacheListener
org.apache.curator.framework.recipes.cache.ChildData
2. Description of the mechanism
- Just a cache of individual data
- Internal use of state machines as processing control for different operations
3. Usage
3.1 Creation
public NodeCache(CuratorFramework client, String path)
3.2 Use
Again, you need to call start() before using it, and close() after using it.
getCurrentData() can be called at any time to get the status and data of the current cache.
You can also get the listener container through getListenable(), and add custom listeners on this basis:
public void addListener(NodeCacheListener listener)
4. Error handling
The NodeCache instance already comes with a ConnectionStateListener to handle link state changes.
5. Source Code Analysis
5.1 Class Definition
public class NodeCache implements Closeable{}
- Implement java.io.Closeable
5.2 Member Variables
public class NodeCache implements Closeable { private final Logger log = LoggerFactory.getLogger(getClass()); private final CuratorFramework client; private final String path; private final boolean dataIsCompressed; private final AtomicReference<ChildData> data = new AtomicReference<ChildData>(null); private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT); private final ListenerContainer<NodeCacheListener> listeners = new ListenerContainer<NodeCacheListener>(); private final AtomicBoolean isConnected = new AtomicBoolean(true); private ConnectionStateListener connectionStateListener = new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { if ( (newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED) ) { if ( isConnected.compareAndSet(false, true) ) { try { reset(); } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); log.error("Trying to reset after reconnection", e); } } } else { isConnected.set(false); } } }; private Watcher watcher = new Watcher() { @Override public void process(WatchedEvent event) { try { reset(); } catch(Exception e) { ThreadUtils.checkInterrupted(e); handleException(e); } } }; private enum State { LATENT, STARTED, CLOSED } private final BackgroundCallback backgroundCallback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { processBackgroundResult(event); } }; }
- log
- client
- path
- Node Path
- dataIsCompressed
- Is the data compressed
- data
- AtomicReference
- Stores locally cached data
- Cached data is encapsulated as ChildData
- state
- AtomicReference
- state
- Internal Enumeration
- LATENT (default)
- STARTED
- CLOSED
- listeners
- org.apache.curator.framework.listen.ListenerContainer
- Listener Container
- isConnected
- Is ZK Connected
- AtomicBoolean
- connectionStateListener
- Built-in link state listener
- watcher
- Built-in node listeners
- Call reset() to reset once the node changes
- backgroundCallback
- Node Data Callback Operation
- Avoid Thread Blocking
5.3 Constructor
public NodeCache(CuratorFramework client, String path) { this(client, path, false); } public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed) { this.client = client; this.path = PathUtils.validatePath(path); this.dataIsCompressed = dataIsCompressed; }
The constructor is simple, that is, assignment processing. So most of the logic is still in start().
5.4 Startup
public void start() throws Exception { start(false); } public void start(boolean buildInitial) throws Exception { Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once"); client.getConnectionStateListenable().addListener(connectionStateListener); if ( buildInitial ) { client.checkExists().creatingParentContainersIfNeeded().forPath(path); internalRebuild(); } reset(); } private void internalRebuild() throws Exception { try { Stat stat = new Stat(); byte[] bytes = dataIsCompressed ? client.getData().decompressed().storingStatIn(stat).forPath(path) : client.getData().storingStatIn(stat).forPath(path); data.set(new ChildData(path, stat, bytes)); } catch ( KeeperException.NoNodeException e ) { data.set(null); } } private void reset() throws Exception { if ( (state.get() == State.STARTED) && isConnected.get() ) { client.checkExists().creatingParentContainersIfNeeded().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path); } }
You can see
- The default is to not initialize cached data
Take a look at the start-up process again
- Atomic Operations Update Startup Status
- Add a connectionStateListener listener for links
- If you need to initialize the cache
- Create Node
- Call internalRebuild(), initial data
- Synchronize Node Data and Status
- Write to local cache
- Call reset()
- Check the existence of cache nodes in normal state
- Step 3 may be a good initialization action
- Adding watcher for node
- Callback triggers backgroundCallback
- Call the processBackgroundResult() method (state machine)
- Check the existence of cache nodes in normal state
5.4.1 processBackgroundResult method
A large part of the startup logic is in the processBackgroundResult method.So here's another look at this approach:
private void processBackgroundResult(CuratorEvent event) throws Exception { switch ( event.getType() ) { case GET_DATA: { if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) { ChildData childData = new ChildData(path, event.getStat(), event.getData()); setNewData(childData); } break; } case EXISTS: { if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() ) { setNewData(null); } else if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) { if ( dataIsCompressed ) { client.getData().decompressed().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path); } else { client.getData().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path); } } break; } } }
Since startup comes first, let's look at the EXISTS event first:
- If the node does not exist
- Call setNewData(null) to set the data to null
- If Node Exists
- If data needs to be compressed
- Then decompress to get the value
- Otherwise, get the data directly
- Okay, here we get the value back and forth through the backgroundCallback, so we'll go back to the processBackgroundResult in a new state
- GET_DATA
- state machine
- If data needs to be compressed
Take another look at the GET_DATA event:
- If reading is normal
- Building childData
- Call setNewData, assign
5.4.2 setNewData method
So, you also need to look at the setNewData method:
private void setNewData(ChildData newData) throws InterruptedException { ChildData previousData = data.getAndSet(newData); if ( !Objects.equal(previousData, newData) ) { listeners.forEach ( new Function<NodeCacheListener, Void>() { @Override public Void apply(NodeCacheListener listener) { try { listener.nodeChanged(); } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); log.error("Calling listener", e); } return null; } } ); if ( rebuildTestExchanger != null ) { try { rebuildTestExchanger.exchange(new Object()); } catch ( InterruptedException e ) { Thread.currentThread().interrupt(); } } } }
- Local Cached data Assignment New Value
- If an update to the cache is found
- Trigger the listener in the listening container (synchronous call)
- rebuildTestExchanger
- This can be skipped basically
- Send a signal object to other sites during testing
5.4.3 Summary
The start-up process can be roughly divided into
- Add Link Listener
- If you need to initialize the node, create it and pull the cached data locally
- Add a node listener to the node and mount the callback method
- Synchronize cached data by callback state machine
5.5 Getting Cached Data
public ChildData getCurrentData() { return data.get(); }
Read cached data directly. For users, only the local cache needs to be manipulated. The local cache and ZK node data are synchronized through the listener callback state machine.
5.6 Shutdown
Cache is exhausted and close() needs to be called
public void close() throws IOException { if ( state.compareAndSet(State.STARTED, State.CLOSED) ) { listeners.clear(); client.clearWatcherReferences(watcher); client.getConnectionStateListenable().removeListener(connectionStateListener); // TODO // From PathChildrenCache // This seems to enable even more GC - I'm not sure why yet - it // has something to do with Guava's cache and circular references connectionStateListener = null; watcher = null; } }
- Atomic Operations Update Status
- Clean up listener containers
- Clean up listeners on nodes
- Clean up listeners on links
No empty local cache data
6. Summary
and Path Cache Node Cache is much simpler than that.
- Path Cache
- More like a Cache Manager
- Managing multiple cache s under path
- Due to the presence of multiple cache s
- Synchronization logic is complex
- Concurrency issues are more serious
- So it's used internally
- Command mode
- Asynchronous sequential execution
- So it's used internally
- Node Cache
- Just a cache of individual data
- Also, the nature of caching data does not require strict concurrency control (dirty data is acceptable)
- Use a callback state machine to handle different data states