Talk about the SeedHostsProvider of elastic search

order

This paper mainly studies the seedhosts provider of elastic search

SeedHostsProvider

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/SeedHostsProvider.java

/**
 * A pluggable provider of the list of seed hosts to use for discovery.
 */
public interface SeedHostsProvider {

    /**
     * Returns a list of seed hosts to use for discovery. Called repeatedly while discovery is active (i.e. while there is no master)
     * so that this list may be dynamic.
     */
    List<TransportAddress> getSeedAddresses(HostsResolver hostsResolver);

    /**
     * Helper object that allows to resolve a list of hosts to a list of transport addresses.
     * Each host is resolved into a transport address (or a collection of addresses if the
     * number of ports is greater than one)
     */
    interface HostsResolver {
        List<TransportAddress> resolveHosts(List<String> hosts, int limitPortCounts);
    }
}
  • The SeedHostsProvider interface defines the getSeedAddresses method. The method parameter type is HostsResolver. The HostsResolver interface defines the resolveHosts method. It has several implementation classes, namely SettingsBasedSeedHostsProvider, FileBasedSeedHostsProvider, GceSeedHostsProvider, AwsEc2SeedHostsProvider and AzureSeedHostsProvider

SettingsBasedSeedHostsProvider

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/SettingsBasedSeedHostsProvider.java

public class SettingsBasedSeedHostsProvider implements SeedHostsProvider {

    private static final Logger logger = LogManager.getLogger(SettingsBasedSeedHostsProvider.class);

    public static final Setting<List<String>> LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING =
        Setting.listSetting("discovery.zen.ping.unicast.hosts", emptyList(), Function.identity(), Property.NodeScope, Property.Deprecated);

    public static final Setting<List<String>> DISCOVERY_SEED_HOSTS_SETTING =
        Setting.listSetting("discovery.seed_hosts", emptyList(), Function.identity(), Property.NodeScope);

    // these limits are per-address
    private static final int LIMIT_FOREIGN_PORTS_COUNT = 1;
    private static final int LIMIT_LOCAL_PORTS_COUNT = 5;

    private final List<String> configuredHosts;
    private final int limitPortCounts;

    public SettingsBasedSeedHostsProvider(Settings settings, TransportService transportService) {
        if (LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.exists(settings)) {
            if (DISCOVERY_SEED_HOSTS_SETTING.exists(settings)) {
                throw new IllegalArgumentException("it is forbidden to set both ["
                    + DISCOVERY_SEED_HOSTS_SETTING.getKey() + "] and ["
                    + LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey() + "]");
            }
            configuredHosts = LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.get(settings);
            // we only limit to 1 address, makes no sense to ping 100 ports
            limitPortCounts = LIMIT_FOREIGN_PORTS_COUNT;
        } else if (DISCOVERY_SEED_HOSTS_SETTING.exists(settings)) {
            configuredHosts = DISCOVERY_SEED_HOSTS_SETTING.get(settings);
            // we only limit to 1 address, makes no sense to ping 100 ports
            limitPortCounts = LIMIT_FOREIGN_PORTS_COUNT;
        } else {
            // if unicast hosts are not specified, fill with simple defaults on the local machine
            configuredHosts = transportService.getLocalAddresses();
            limitPortCounts = LIMIT_LOCAL_PORTS_COUNT;
        }

        logger.debug("using initial hosts {}", configuredHosts);
    }

    @Override
    public List<TransportAddress> getSeedAddresses(HostsResolver hostsResolver) {
        return hostsResolver.resolveHosts(configuredHosts, limitPortCounts);
    }
}
  • The SettingsBasedSeedHostsProvider mainly reads the configuration of discovery.seed'hosts or discovery.zen.ping.unicast.hosts

FileBasedSeedHostsProvider

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/FileBasedSeedHostsProvider.java

public class FileBasedSeedHostsProvider implements SeedHostsProvider {

    private static final Logger logger = LogManager.getLogger(FileBasedSeedHostsProvider.class);

    public static final String UNICAST_HOSTS_FILE = "unicast_hosts.txt";

    private final Path unicastHostsFilePath;

    public FileBasedSeedHostsProvider(Path configFile) {
        this.unicastHostsFilePath = configFile.resolve(UNICAST_HOSTS_FILE);
    }

    private List<String> getHostsList() {
        if (Files.exists(unicastHostsFilePath)) {
            try (Stream<String> lines = Files.lines(unicastHostsFilePath)) {
                return lines.filter(line -> line.startsWith("#") == false) // lines starting with `#` are comments
                    .collect(Collectors.toList());
            } catch (IOException e) {
                logger.warn(() -> new ParameterizedMessage("failed to read file [{}]", unicastHostsFilePath), e);
                return Collections.emptyList();
            }
        }

        logger.warn("expected, but did not find, a dynamic hosts list at [{}]", unicastHostsFilePath);

        return Collections.emptyList();
    }

    @Override
    public List<TransportAddress> getSeedAddresses(HostsResolver hostsResolver) {
        final List<TransportAddress> transportAddresses = hostsResolver.resolveHosts(getHostsList(), 1);
        logger.debug("seed addresses: {}", transportAddresses);
        return transportAddresses;
    }
}
  • FileBasedSeedHostsProvider mainly reads the unicast_hosts.txt file from the specified location and resolves the hostsList

SeedHostsProvider.HostsResolver

UnicastZenPing.createHostsResolver

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java

    private SeedHostsProvider.HostsResolver createHostsResolver() {
        return (hosts, limitPortCounts) -> SeedHostsResolver.resolveHostsLists(unicastZenPingExecutorService, logger, hosts,
            limitPortCounts, transportService, resolveTimeout);
    }
  • UnicastZenPing's createHostsResolver method creates an anonymous SeedHostsProvider.HostsResolver class whose implementation is delegated to the SeedHostsResolver.resolveHostsLists method

SeedHostsResolver

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/SeedHostsResolver.java

public class SeedHostsResolver extends AbstractLifecycleComponent implements ConfiguredHostsResolver {

	//......

    public static List<TransportAddress> resolveHostsLists(
        final ExecutorService executorService,
        final Logger logger,
        final List<String> hosts,
        final int limitPortCounts,
        final TransportService transportService,
        final TimeValue resolveTimeout) {
        Objects.requireNonNull(executorService);
        Objects.requireNonNull(logger);
        Objects.requireNonNull(hosts);
        Objects.requireNonNull(transportService);
        Objects.requireNonNull(resolveTimeout);
        if (resolveTimeout.nanos() < 0) {
            throw new IllegalArgumentException("resolve timeout must be non-negative but was [" + resolveTimeout + "]");
        }
        // create tasks to submit to the executor service; we will wait up to resolveTimeout for these tasks to complete
        final List<Callable<TransportAddress[]>> callables =
            hosts
                .stream()
                .map(hn -> (Callable<TransportAddress[]>) () -> transportService.addressesFromString(hn, limitPortCounts))
                .collect(Collectors.toList());
        final List<Future<TransportAddress[]>> futures;
        try {
            futures = executorService.invokeAll(callables, resolveTimeout.nanos(), TimeUnit.NANOSECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return Collections.emptyList();
        }
        final List<TransportAddress> transportAddresses = new ArrayList<>();
        final Set<TransportAddress> localAddresses = new HashSet<>();
        localAddresses.add(transportService.boundAddress().publishAddress());
        localAddresses.addAll(Arrays.asList(transportService.boundAddress().boundAddresses()));
        // ExecutorService#invokeAll guarantees that the futures are returned in the iteration order of the tasks so we can associate the
        // hostname with the corresponding task by iterating together
        final Iterator<String> it = hosts.iterator();
        for (final Future<TransportAddress[]> future : futures) {
            final String hostname = it.next();
            if (!future.isCancelled()) {
                assert future.isDone();
                try {
                    final TransportAddress[] addresses = future.get();
                    logger.trace("resolved host [{}] to {}", hostname, addresses);
                    for (int addressId = 0; addressId < addresses.length; addressId++) {
                        final TransportAddress address = addresses[addressId];
                        // no point in pinging ourselves
                        if (localAddresses.contains(address) == false) {
                            transportAddresses.add(address);
                        }
                    }
                } catch (final ExecutionException e) {
                    assert e.getCause() != null;
                    final String message = "failed to resolve host [" + hostname + "]";
                    logger.warn(message, e.getCause());
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    // ignore
                }
            } else {
                logger.warn("timed out after [{}] resolving host [{}]", resolveTimeout, hostname);
            }
        }
        return Collections.unmodifiableList(transportAddresses);
    }

    //......
}
  • The resolveHostsLists static method of SeedHostsResolver mainly uses multithreading to execute transportService.addressesFromString method in parallel

Summary

  • The SeedHostsProvider interface defines the getSeedAddresses method. The method parameter type is HostsResolver. The HostsResolver interface defines the resolveHosts method. It has several implementation classes, namely SettingsBasedSeedHostsProvider, FileBasedSeedHostsProvider, GceSeedHostsProvider, AwsEc2SeedHostsProvider and AzureSeedHostsProvider
  • SettingsBasedSeedHostsProvider mainly reads the configuration of discovery.seed'hosts or discovery.zen.ping.unicast.hosts; FileBasedSeedHostsProvider mainly reads the unicast'hosts.txt file from the specified location to resolve the hostsList
  • UnicastZenPing's createHostsResolver method creates an anonymous SeedHostsProvider.HostsResolver class, whose implementation is delegated to the SeedHostsResolver.resolveHostsLists method; SeedHostsResolver's resolveHostsLists static method mainly uses multithreads to execute transportService.addressesFromString method in parallel

doc

Keywords: Programming ElasticSearch Java

Added by Rangel on Thu, 21 Nov 2019 22:40:15 +0200