order
This paper mainly studies the seedhosts provider of elastic search
SeedHostsProvider
elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/SeedHostsProvider.java
/** * A pluggable provider of the list of seed hosts to use for discovery. */ public interface SeedHostsProvider { /** * Returns a list of seed hosts to use for discovery. Called repeatedly while discovery is active (i.e. while there is no master) * so that this list may be dynamic. */ List<TransportAddress> getSeedAddresses(HostsResolver hostsResolver); /** * Helper object that allows to resolve a list of hosts to a list of transport addresses. * Each host is resolved into a transport address (or a collection of addresses if the * number of ports is greater than one) */ interface HostsResolver { List<TransportAddress> resolveHosts(List<String> hosts, int limitPortCounts); } }
- The SeedHostsProvider interface defines the getSeedAddresses method. The method parameter type is HostsResolver. The HostsResolver interface defines the resolveHosts method. It has several implementation classes, namely SettingsBasedSeedHostsProvider, FileBasedSeedHostsProvider, GceSeedHostsProvider, AwsEc2SeedHostsProvider and AzureSeedHostsProvider
SettingsBasedSeedHostsProvider
elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/SettingsBasedSeedHostsProvider.java
public class SettingsBasedSeedHostsProvider implements SeedHostsProvider { private static final Logger logger = LogManager.getLogger(SettingsBasedSeedHostsProvider.class); public static final Setting<List<String>> LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING = Setting.listSetting("discovery.zen.ping.unicast.hosts", emptyList(), Function.identity(), Property.NodeScope, Property.Deprecated); public static final Setting<List<String>> DISCOVERY_SEED_HOSTS_SETTING = Setting.listSetting("discovery.seed_hosts", emptyList(), Function.identity(), Property.NodeScope); // these limits are per-address private static final int LIMIT_FOREIGN_PORTS_COUNT = 1; private static final int LIMIT_LOCAL_PORTS_COUNT = 5; private final List<String> configuredHosts; private final int limitPortCounts; public SettingsBasedSeedHostsProvider(Settings settings, TransportService transportService) { if (LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.exists(settings)) { if (DISCOVERY_SEED_HOSTS_SETTING.exists(settings)) { throw new IllegalArgumentException("it is forbidden to set both [" + DISCOVERY_SEED_HOSTS_SETTING.getKey() + "] and [" + LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey() + "]"); } configuredHosts = LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.get(settings); // we only limit to 1 address, makes no sense to ping 100 ports limitPortCounts = LIMIT_FOREIGN_PORTS_COUNT; } else if (DISCOVERY_SEED_HOSTS_SETTING.exists(settings)) { configuredHosts = DISCOVERY_SEED_HOSTS_SETTING.get(settings); // we only limit to 1 address, makes no sense to ping 100 ports limitPortCounts = LIMIT_FOREIGN_PORTS_COUNT; } else { // if unicast hosts are not specified, fill with simple defaults on the local machine configuredHosts = transportService.getLocalAddresses(); limitPortCounts = LIMIT_LOCAL_PORTS_COUNT; } logger.debug("using initial hosts {}", configuredHosts); } @Override public List<TransportAddress> getSeedAddresses(HostsResolver hostsResolver) { return hostsResolver.resolveHosts(configuredHosts, limitPortCounts); } }
- The SettingsBasedSeedHostsProvider mainly reads the configuration of discovery.seed'hosts or discovery.zen.ping.unicast.hosts
FileBasedSeedHostsProvider
elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/FileBasedSeedHostsProvider.java
public class FileBasedSeedHostsProvider implements SeedHostsProvider { private static final Logger logger = LogManager.getLogger(FileBasedSeedHostsProvider.class); public static final String UNICAST_HOSTS_FILE = "unicast_hosts.txt"; private final Path unicastHostsFilePath; public FileBasedSeedHostsProvider(Path configFile) { this.unicastHostsFilePath = configFile.resolve(UNICAST_HOSTS_FILE); } private List<String> getHostsList() { if (Files.exists(unicastHostsFilePath)) { try (Stream<String> lines = Files.lines(unicastHostsFilePath)) { return lines.filter(line -> line.startsWith("#") == false) // lines starting with `#` are comments .collect(Collectors.toList()); } catch (IOException e) { logger.warn(() -> new ParameterizedMessage("failed to read file [{}]", unicastHostsFilePath), e); return Collections.emptyList(); } } logger.warn("expected, but did not find, a dynamic hosts list at [{}]", unicastHostsFilePath); return Collections.emptyList(); } @Override public List<TransportAddress> getSeedAddresses(HostsResolver hostsResolver) { final List<TransportAddress> transportAddresses = hostsResolver.resolveHosts(getHostsList(), 1); logger.debug("seed addresses: {}", transportAddresses); return transportAddresses; } }
- FileBasedSeedHostsProvider mainly reads the unicast_hosts.txt file from the specified location and resolves the hostsList
SeedHostsProvider.HostsResolver
UnicastZenPing.createHostsResolver
elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java
private SeedHostsProvider.HostsResolver createHostsResolver() { return (hosts, limitPortCounts) -> SeedHostsResolver.resolveHostsLists(unicastZenPingExecutorService, logger, hosts, limitPortCounts, transportService, resolveTimeout); }
- UnicastZenPing's createHostsResolver method creates an anonymous SeedHostsProvider.HostsResolver class whose implementation is delegated to the SeedHostsResolver.resolveHostsLists method
SeedHostsResolver
elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/SeedHostsResolver.java
public class SeedHostsResolver extends AbstractLifecycleComponent implements ConfiguredHostsResolver { //...... public static List<TransportAddress> resolveHostsLists( final ExecutorService executorService, final Logger logger, final List<String> hosts, final int limitPortCounts, final TransportService transportService, final TimeValue resolveTimeout) { Objects.requireNonNull(executorService); Objects.requireNonNull(logger); Objects.requireNonNull(hosts); Objects.requireNonNull(transportService); Objects.requireNonNull(resolveTimeout); if (resolveTimeout.nanos() < 0) { throw new IllegalArgumentException("resolve timeout must be non-negative but was [" + resolveTimeout + "]"); } // create tasks to submit to the executor service; we will wait up to resolveTimeout for these tasks to complete final List<Callable<TransportAddress[]>> callables = hosts .stream() .map(hn -> (Callable<TransportAddress[]>) () -> transportService.addressesFromString(hn, limitPortCounts)) .collect(Collectors.toList()); final List<Future<TransportAddress[]>> futures; try { futures = executorService.invokeAll(callables, resolveTimeout.nanos(), TimeUnit.NANOSECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return Collections.emptyList(); } final List<TransportAddress> transportAddresses = new ArrayList<>(); final Set<TransportAddress> localAddresses = new HashSet<>(); localAddresses.add(transportService.boundAddress().publishAddress()); localAddresses.addAll(Arrays.asList(transportService.boundAddress().boundAddresses())); // ExecutorService#invokeAll guarantees that the futures are returned in the iteration order of the tasks so we can associate the // hostname with the corresponding task by iterating together final Iterator<String> it = hosts.iterator(); for (final Future<TransportAddress[]> future : futures) { final String hostname = it.next(); if (!future.isCancelled()) { assert future.isDone(); try { final TransportAddress[] addresses = future.get(); logger.trace("resolved host [{}] to {}", hostname, addresses); for (int addressId = 0; addressId < addresses.length; addressId++) { final TransportAddress address = addresses[addressId]; // no point in pinging ourselves if (localAddresses.contains(address) == false) { transportAddresses.add(address); } } } catch (final ExecutionException e) { assert e.getCause() != null; final String message = "failed to resolve host [" + hostname + "]"; logger.warn(message, e.getCause()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); // ignore } } else { logger.warn("timed out after [{}] resolving host [{}]", resolveTimeout, hostname); } } return Collections.unmodifiableList(transportAddresses); } //...... }
- The resolveHostsLists static method of SeedHostsResolver mainly uses multithreading to execute transportService.addressesFromString method in parallel
Summary
- The SeedHostsProvider interface defines the getSeedAddresses method. The method parameter type is HostsResolver. The HostsResolver interface defines the resolveHosts method. It has several implementation classes, namely SettingsBasedSeedHostsProvider, FileBasedSeedHostsProvider, GceSeedHostsProvider, AwsEc2SeedHostsProvider and AzureSeedHostsProvider
- SettingsBasedSeedHostsProvider mainly reads the configuration of discovery.seed'hosts or discovery.zen.ping.unicast.hosts; FileBasedSeedHostsProvider mainly reads the unicast'hosts.txt file from the specified location to resolve the hostsList
- UnicastZenPing's createHostsResolver method creates an anonymous SeedHostsProvider.HostsResolver class, whose implementation is delegated to the SeedHostsResolver.resolveHostsLists method; SeedHostsResolver's resolveHostsLists static method mainly uses multithreads to execute transportService.addressesFromString method in parallel