For version 2.1.6, the use of coprocessor requires the introduction of HBase common dependency.
Next, record the detailed steps of debugging the test coprocessor
Case background
There are two tables, user and people. Each table has a person column family. Now, before inserting "people:name" into the user table, insert its rowkey into "person:lastname" of the person table. This is equivalent to the concept of secondary index.
step
1. Modify the configuration file hbase-site.xml to ensure that the regional server can still provide services when the coprocessor loads and runs abnormally.
<!--To solve the error of loading coprocessor: ERROR: org.apache.hadoop.hbase.DoNotRetryIOException--> <property> <name>hbase.table.sanity.checks</name> <value>false</value> </property> <!--Prevent coprocessor startup failure regionServer Hang up, resulting in hbase collapse--> <property> <name>hbase.coprocessor.abortonerror</name> <value>false</value> </property>
2. First, a new module HBase coprocessor is created under the current project to develop coprocessor. The directory structure is as follows:
3. Develop coprocessor logic implementation class
package com.asn.bigdata.hbase.coprocessor; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WALEdit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Optional; public class PrePutObserver implements RegionObserver, RegionCoprocessor { private static final Logger logger = LoggerFactory.getLogger(PrePutObserver.class); private static Configuration conf = null; private static Connection connection = null; private static Table table = null; private RegionCoprocessorEnvironment env = null; static{ conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "flink1,flink2,flink3"); conf.set("hbase.zookeeper.property.clientPort", "2181"); try { connection = ConnectionFactory.createConnection(conf); table = connection.getTable(TableName.valueOf("people")); } catch (IOException e) { e.printStackTrace(); } } @Override public void start(CoprocessorEnvironment e) throws IOException { logger.info("CoprocessorEnvironment............start..........................."); this.env = (RegionCoprocessorEnvironment) e; } @Override public void stop(CoprocessorEnvironment env) throws IOException { logger.info("CoprocessorEnvironment............stop..........................."); } /** * This method must be added, otherwise an error may be reported */ @Override public Optional<RegionObserver> getRegionObserver() { return Optional.of(this); } @Override public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit, Durability durability) throws IOException { logger.info("run PrePutObserver............prePut..........................."); try { byte[] userKey = put.getRow(); Cell cell = put.get(Bytes.toBytes("person"), Bytes.toBytes("name")).get(0); Put o_put = new Put(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength()); o_put.addColumn(Bytes.toBytes("person"), Bytes.toBytes("lastname"), userKey); table.put(o_put); table.close(); } catch (IOException e) { logger.error(e.getMessage()); throw e; } } }
4. Package to hdfs
This thing is the key to packaging. If the packaging method is not correct, it will lead to an error when mounting the coprocessor or executing the put operation. The correct packaging steps are as follows:
1) Click file - > project structure - > artifacts - > number - > Jar - > empty to enter the following interface, and then operate according to the serial number
2) Click build - > build artifacts - > select the jar named just now - > build to package
3) Upload the generated jar package to hdfs
5. Enter hbase shell, first disable user table
6. Load the coprocessor to the user table
hbase(main):011:0> alter 'user',METHOD => 'table_att','coprocessor' => 'hdfs://flink1:9000/test/hbase/coprocessor/hbase-coprocessor.jar|com.asn.bigdata.hbase.coprocessor.PrePutObserver|1001|'
If the above configuration is not configured in hbase-site.xml, you may encounter the following error:
ERROR: org.apache.hadoop.hbase.DoNotRetryIOException: hdfs://flink1:9000/test/hbase/coprocessor/d.jar Set hbase.table.sanity.checks to false at conf or table descriptor if you want to bypass sanity checks at org.apache.hadoop.hbase.master.HMaster.warnOrThrowExceptionForFailure(HMaster.java:2228) at org.apache.hadoop.hbase.master.HMaster.sanityCheckTableDescriptor(HMaster.java:2075) at org.apache.hadoop.hbase.master.HMaster.access$200(HMaster.java:246) at org.apache.hadoop.hbase.master.HMaster$12.run(HMaster.java:2562) at org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil.submitProcedure(MasterProcedureUtil.java:134) at org.apache.hadoop.hbase.master.HMaster.modifyTable(HMaster.java:2558) at org.apache.hadoop.hbase.master.HMaster.modifyTable(HMaster.java:2595) at org.apache.hadoop.hbase.master.MasterRpcServices.modifyTable(MasterRpcServices.java:1347) at org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos$MasterService$2.callBlockingMethod(MasterProtos.java) at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:413) at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:132) at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:324) at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:304)
The way to solve this problem is to add the following configuration in hbase-site.xml:
<property> <name>hbase.table.sanity.checks</name> <value>false</value> </property>
7. View the user table information and enable the user table
8. test
Insert data into user table
hbase(main):011:0> put 'user','0001','person:name','asn'
View person table contents
hbase(main):011:0> scan 'people' ROW COLUMN+CELL wangsen column=person:lastname, timestamp=1581819549421, value=0001 1 row(s) Took 0.0806 seconds
Finish!