Write HDFS data to es through Map/Reduce, and ES data to HDFS

Environmental preparation

System centos 7

java 1.8

hadoop 2.7

ES 7.15.2 (for installation of ES stand-alone version, refer to: https://blog.csdn.net/weixin_36340771/article/details/121389741 )

Prepare hadoop local running environment

Get Hadoop files

Link: https://pan.baidu.com/s/1MGriraZ8ekvzsJyWdPssrw
Extraction code: u4uc

Configure HADOOP_HOME

Extract the above files and configure HADOOP_HOME, pay attention to changing the address.

Get engineering code

https://github.com/BaoPiao/elasticsearch_demo

Read HDFS write ES

Read HDFS data through FileInputFormat, then convert the data to MapWritable data type through Mapper, and finally write the data to ES through EsOutputFormat.

Parameter description

Basic parameters

Network parameters

Es cluster information of es.nodes connection

es.port the port number of the es cluster. The default is 9200

The prefix information of es.nodes.path.prefix connection is empty by default (for example, if you want to write to es.node:9200/custom/path/prefix every time, it can be written as / custom/path/prefix here)

Write parameters

es.resource specifies the es index

es.resource = twitter/tweet #The index is called twitter and the type is tweet

es.resource.write the default value is es.resource

Support writing to different types according to data (media_type is a field)

es.resource.write = my-collection/{media_type}

According to the data, it is written into different indexes. In addition, it also supports date types for rolling generation of log indexes. For details, see reference 2.

es.resource.write = my-collection_{media_type}

Document related parameters

es.input.json is false by default, and the input file format is JSON

es.write.operation is index by default

index(default) will be replaced if it already exists

If create already exists, an exception will be reported

update updates the existing. If there is no exception, an exception is reported

If upsert does not exist, it is inserted. If it exists, it is updated

es.output.json is false by default. Is the output file format JSON

es.ingest.pipeline is none by default and specifies the processing pipeline

es.mapping.id specifies the document ID. the field name is filled in here. It is blank by default

es.mapping.parent specifies the parent document. Fill in the field name or a constant here

es.mapping.version specifies the version number. Fill in the field name or a constant here

es.mapping.include specifies which fields are written to ES

es.mapping.exclude specifies which fields are not written to ES

Code reference

Only the driver code is posted here. For the rest, please refer to GitHub: https://github.com/BaoPiao/elasticsearch_demo

public class ElasticsearchHadoop {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();
        conf.setBoolean("mapred.map.tasks.speculative.execution", false);
        conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
        conf.set("es.nodes", "192.168.10.108:9200");
        conf.set("es.resource", "/artists_2");
        conf.set("es.write.operation", "upsert");
        conf.set("es.mapping.include", "id");
        conf.set("es.mapping.id", "id");
        Job job = Job.getInstance(conf);
        job.setJarByClass(ElasticsearchHadoop.class);
        job.setOutputFormatClass(EsOutputFormat.class);
        job.setMapperClass(EsMapper.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(MapWritable.class);
        FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.10.108:9000/test"));
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);

    }
}

Read ES write HDFS

Read ES data through esinpformat, and the returned data is < Text, MapWritable >, where Text is the key, and MapWritable contains field and field value information. Finally, write your own Reduce to HDFS.

Parameter description

Metadata information

es.read.metadata is false by default. Whether to return original data information, such as document id, version number, etc

es.read.metadata.field is by default_ Metadata field. If and only if es.read.metadata is set to true, metadata information is returned, and the data is encapsulated by map

es.read.metadata.version is false by default. If and only if es.read.metadata is set to true, the value will take effect and the version number will be returned.

Query parameters

es.resource specifies the es index

es.resource = twitter/tweet #The index is called twitter and the type is tweet

es.query is empty by default

  1. uri query method
  2. dsl query method (recommended)
  3. Read file mode
# uri (or parameter) query
es.query = ?q=costinl

# query dsl
es.query = { "query" : { "term" : { "user" : "costinl" } } }

# external resource
es.query = org/mypackage/myquery.json

es.resource.read the default value is es.resource

es.resource.read the default value is es.resource

Code reference

Only the driver part is posted here. For details, please refer to GitHub: https://github.com/BaoPiao/elasticsearch_demo

public class ReadElasticsearch {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();
        conf.set("es.nodes", "192.168.10.108:9200");
        conf.set("es.resource", "artists_2");
        conf.set("es.read.metadata", "true");
        conf.set("es.read.metadata.field", "_metadata");
        conf.set("es.query", "{\n" +
                "    \"query\":{\n" +
                "        \"match_all\":{}\n" +
                "    }\n" +
                "}");
        org.apache.hadoop.mapreduce.Job job = Job.getInstance(conf);
        job.setInputFormatClass(EsInputFormat.class);

        job.setReducerClass(EsReduce.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(MapWritable.class);

        FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop\\outputES"));
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

last

Many parameters are not listed in detail, such as write batch size, write retry times, timeout, etc; Read scroll settings, etc. for details, see resources

reference material

https://www.elastic.co/guide/en/elasticsearch/hadoop/current/mapreduce.html

https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html#_essential_settings

Keywords: ElasticSearch Hadoop

Added by groovything on Wed, 01 Dec 2021 21:01:13 +0200