order
This paper focuses on the Federation manager of artemis
FederationManager
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederationManager.java
public class FederationManager implements ActiveMQComponent { private final ActiveMQServer server; private Map<String, Federation> federations = new HashMap<>(); private State state; enum State { STOPPED, STOPPING, /** * Deployed means {@link FederationManager#deploy()} was called but * {@link FederationManager#start()} was not called. * <p> * We need the distinction if {@link FederationManager#stop()} is called before 'start'. As * otherwise we would leak locators. */ DEPLOYED, STARTED, } public FederationManager(final ActiveMQServer server) { this.server = server; } @Override public synchronized void start() throws ActiveMQException { if (state == State.STARTED) return; deploy(); for (Federation federation : federations.values()) { federation.start(); } state = State.STARTED; } @Override public synchronized void stop() { if (state == State.STOPPED) return; state = State.STOPPING; for (Federation federation : federations.values()) { federation.stop(); } federations.clear(); state = State.STOPPED; } @Override public boolean isStarted() { return state == State.STARTED; } public synchronized void deploy() throws ActiveMQException { for (FederationConfiguration federationConfiguration : server.getConfiguration().getFederationConfigurations()) { deploy(federationConfiguration); } if (state != State.STARTED) { state = State.DEPLOYED; } } public synchronized boolean undeploy(String name) { Federation federation = federations.remove(name); if (federation != null) { federation.stop(); } return true; } public synchronized boolean deploy(FederationConfiguration federationConfiguration) throws ActiveMQException { Federation federation = federations.get(federationConfiguration.getName()); if (federation == null) { federation = newFederation(federationConfiguration); } else if (!Objects.equals(federation.getConfig().getCredentials(), federationConfiguration.getCredentials())) { undeploy(federationConfiguration.getName()); federation = newFederation(federationConfiguration); } federation.deploy(); return true; } private synchronized Federation newFederation(FederationConfiguration federationConfiguration) throws ActiveMQException { Federation federation = new Federation(server, federationConfiguration); federations.put(federationConfiguration.getName(), federation); if (state == State.STARTED) { federation.start(); } return federation; } public Federation get(String name) { return federations.get(name); } public void register(FederatedAbstract federatedAbstract) { server.registerBrokerPlugin(federatedAbstract); } public void unregister(FederatedAbstract federatedAbstract) { server.unRegisterBrokerPlugin(federatedAbstract); } }
- FederationManager implements the ActiveMQComponent interface, which provides methods such as start(), stop(), deploy and undeploy. The start method will execute the deploy method first, and then traverse federations.values() to execute federation.start(); the stop method will traverse federations.values() to execute federation.stop(), and then empty federations; the deploy method will execute when the federation is null Line newFederation, and execute federation.deploy(). If it is not null and the credentials is inconsistent with the configuration, then execute deploy, and then renew newFederation. The deploy method is to remove the federation from the federations and execute federation.stop()
Federation
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/Federation.java
public class Federation { private final ActiveMQServer server; private final SimpleString name; private final Map<String, FederationUpstream> upstreams = new HashMap<>(); private final Map<String, FederationDownstream> downstreams = new HashMap<>(); private final FederationConfiguration config; private FederationManager.State state; //...... public synchronized void deploy() throws ActiveMQException { for (FederationUpstreamConfiguration upstreamConfiguration : config.getUpstreamConfigurations()) { deploy(upstreamConfiguration, config.getFederationPolicyMap()); } for (FederationDownstreamConfiguration downstreamConfiguration : config.getDownstreamConfigurations()) { deploy(downstreamConfiguration, config.getFederationPolicyMap()); } if (state != FederationManager.State.STARTED) { state = FederationManager.State.DEPLOYED; } } public synchronized boolean deploy(FederationUpstreamConfiguration upstreamConfiguration, Map<String, FederationPolicy> federationPolicyMap) throws ActiveMQException { String name = upstreamConfiguration.getName(); FederationUpstream upstream = upstreams.get(name); //If connection has changed we will need to do a full undeploy and redeploy. if (upstream == null) { undeploy(name); upstream = deploy(name, upstreamConfiguration); } else if (!upstream.getConnection().getConfig().equals(upstreamConfiguration.getConnectionConfiguration())) { undeploy(name); upstream = deploy(name, upstreamConfiguration); } upstream.deploy(upstreamConfiguration.getPolicyRefs(), federationPolicyMap); return true; } public synchronized boolean deploy(FederationDownstreamConfiguration downstreamConfiguration, Map<String, FederationPolicy> federationPolicyMap) throws ActiveMQException { String name = downstreamConfiguration.getName(); FederationDownstream downstream = downstreams.get(name); //If connection has changed we will need to do a full undeploy and redeploy. if (downstream == null) { undeploy(name); downstream = deploy(name, downstreamConfiguration); } else if (!downstream.getConnection().getConfig().equals(downstreamConfiguration.getConnectionConfiguration())) { undeploy(name); downstream = deploy(name, downstreamConfiguration); } downstream.deploy(config); return true; } private synchronized FederationUpstream deploy(String name, FederationUpstreamConfiguration upstreamConfiguration) { FederationUpstream upstream = new FederationUpstream(server, this, name, upstreamConfiguration); upstreams.put(name, upstream); if (state == FederationManager.State.STARTED) { upstream.start(); } return upstream; } private synchronized FederationDownstream deploy(String name, FederationDownstreamConfiguration downstreamConfiguration) { //If we have a matching upstream connection already configured then use it for the initiating downstream connection FederationConnection connection = null; if (downstreamConfiguration.getConnectionConfiguration().isShareConnection()) { for (FederationUpstream upstream : upstreams.values()) { if (upstream.getConfig().getConnectionConfiguration() .equals(downstreamConfiguration.getConnectionConfiguration())) { connection = upstream.getConnection(); connection.setSharedConnection(true); break; } } } FederationDownstream downstream = new FederationDownstream(server, this, name, downstreamConfiguration, connection); downstreams.put(name, downstream); if (state == FederationManager.State.STARTED) { downstream.start(); } return downstream; } //...... }
- The deployment method of Federation first traverses config.getupstream configurations() to deploy upstream, and then traverses config.getDownstreamConfigurations() to deploy downstream
Summary
FederationManager implements the ActiveMQComponent interface, which provides methods such as start(), stop(), deploy and undeploy. The start method will execute the deploy method first, and then traverse federations.values() to execute federation.start(); the stop method will traverse federations.values() to execute federation.stop(), and then empty federations; the deploy method will execute when the federation is null Line newFederation, and execute federation.deploy(). If it is not null and the credentials is inconsistent with the configuration, then execute deploy, and then renew newFederation. The deploy method is to remove the federation from the federations and execute federation.stop()