Code use example of RDS trigger eventFormat as protobuf

Preface

Function calculation A new trigger, RDS trigger, has been released, which indicates that a new member has been added to the event source of function calculation-- Alibaba cloud Relational Database Service (RDS) According to Official documents As a supplement, this article explains the code example when eventFormat is protobuf.

protobuf format

syntax = "proto3";
package protocol;
enum DBType {
  MySQL = 0;
  Redis = 1;
  Mongo = 2;
  HBase = 3;
}
message Message {
  //After consuming the message, the offset of ack can be used
  int64 offset = 1;
  //After the message, the time stamp corresponding to the record of ack can be used
  int64 timestamp = 2;
  //Save as backup: large data lines can be used for later transmission
  int32 spare_flag = 3;
  int32 spare_seq = 4;
  //message version
  int32 version = 5;
  //data source
  DBType db_type = 6;
  //data row
  repeated Entry entries = 7;
}
//operation
enum OpType {
  UNKOWN_TYPE = 0;
  BEGIN = 1;
  COMMIT = 2;
  //Queries that do not function in the following DDL operations
  QUERY = 3;
  INSERT = 4;
  UPDATE = 5;
  DELETE = 6;
  CREATE = 7;
  ALTER = 8;
  DROP = 9;
  TRUNCATE = 10;
  RENAME = 11;
  //CREATE INDEX
  CINDEX = 12;
  //DROP INDEX
  DINDEX = 13;
  OPTIMIZE = 14;
  XA = 15;
}
message Entry {
  OpType operation = 1;
  //Time stamp in s
  int64 timestamp = 2;
  //Transaction id
  string id = 3;
  //Lines in a transaction
  int64 sequence = 4;
  //db name of DML operation, default db of session during DDL operation
  string db_name = 5;
  //Table name of DML operation
  string table_name = 6;
  //after image, the back image of the operation
  repeated Field row = 7;
  //before image, the front image of the operation
  repeated Field before_row = 8;
  //sql for non DML statements
  string query = 9;
}
message Field {
  //column name
  string name = 1;
  //character set
  string charset = 3;
  //The first few columns
  int32 idx = 2;
  //Corresponding to type in java
  int32 type_num = 4;
  //For MySQL, type num in MySQL;
  int32 org_type = 5;
  //name of the original type in db
  string org_type_name = 6;
  //Reserved flag
  int32 flag = 7;
  //NULL or not
  bool is_null = 8;
  //Is it pk?
  bool is_pk = 9;
  //Whether it is unsigned value
  bool is_unsigned = 10;
  //Whether it is a timestamp value(timestamp shows that the value is related to the time zone, and the value of the standard time zone is recorded here)
  bool is_timestamp = 11;
  //The value of value is expressed in bytes. The consumer needs to generate the corresponding string with charset and bytes first, and then convert it to the type value corresponding to type_num
  //At the same time, if charset is empty, it means that the original data type is binary
  bytes value = 12;
}

Code example

For a complete code example, please download the attachment at the end of this article, which contains three kinds of runtime (Python, PHP, nodejs) use cases. The following shows the short code examples used by each.

For the java runtime example, please refer to: Redis cache obsolescence

python

# -*- coding: utf-8 -*-

from proto import rdsEvent_pb2

bin_data = b""
# local data only for test
with open('./proto.bin', 'rb') as f:
   bin_data = f.read()

def handler(event, context):
    target = rdsEvent_pb2.Message()
    target.ParseFromString(event)
    #target.ParseFromString(bin_data)
    print(target.offset, target.db_type)
    entry = target.entries[0]
    print(entry.operation, entry.db_name, entry.table_name)
    print(entry.row[0].name, entry.row[1].value, entry.row[2].type_num)
    
    return "OK"

nodejs

'use strict';

var fs = require("fs");
var protobuf = require("protobufjs");

module.exports.handler = function(event, context, callback) {
    protobuf.load("rdsEvent.proto", function(err, root) {
        if (err)
            throw err;

        var rdsEventMessage = root.lookupType("proto.Message");
        
        // local data only for test
        //var event = fs.readFileSync('proto.bin');

        var message = rdsEventMessage.decode(event);

        console.log(JSON.stringify(message))
        callback(null, 'hello world');

    });
};

php

<?php

require_once __DIR__ . '/Proto/Field.php';
require_once __DIR__ . '/Proto/DBType.php';
require_once __DIR__ . '/Proto/OpType.php';
require_once __DIR__ . '/Proto/Entry.php';
require_once __DIR__ . '/Proto/Message.php';
require_once __DIR__ . '/GPBMetadata/RdsEvent.php';

// local data only for test
$filename = __DIR__ . "/proto.bin";
$handle = fopen($filename, "rb");
$data   = fread($handle, filesize($filename));
fclose($handle);

function handler($event, $context) {
    $res = new \Proto\Message();
    //$res->mergeFromString($data);
    $res->mergeFromString($event);
    echo $res->getDbType() . PHP_EOL;
    echo $res->getOffset() . PHP_EOL;
    echo $res->getEntries()[0]->getTableName() . PHP_EOL;
    return "ok";
}

Keywords: PHP MySQL Redis Java

Added by davieboy on Mon, 02 Dec 2019 07:59:25 +0200