Preface
Function calculation A new trigger, RDS trigger, has been released, which indicates that a new member has been added to the event source of function calculation-- Alibaba cloud Relational Database Service (RDS) According to Official documents As a supplement, this article explains the code example when eventFormat is protobuf.
protobuf format
syntax = "proto3"; package protocol; enum DBType { MySQL = 0; Redis = 1; Mongo = 2; HBase = 3; } message Message { //After consuming the message, the offset of ack can be used int64 offset = 1; //After the message, the time stamp corresponding to the record of ack can be used int64 timestamp = 2; //Save as backup: large data lines can be used for later transmission int32 spare_flag = 3; int32 spare_seq = 4; //message version int32 version = 5; //data source DBType db_type = 6; //data row repeated Entry entries = 7; } //operation enum OpType { UNKOWN_TYPE = 0; BEGIN = 1; COMMIT = 2; //Queries that do not function in the following DDL operations QUERY = 3; INSERT = 4; UPDATE = 5; DELETE = 6; CREATE = 7; ALTER = 8; DROP = 9; TRUNCATE = 10; RENAME = 11; //CREATE INDEX CINDEX = 12; //DROP INDEX DINDEX = 13; OPTIMIZE = 14; XA = 15; } message Entry { OpType operation = 1; //Time stamp in s int64 timestamp = 2; //Transaction id string id = 3; //Lines in a transaction int64 sequence = 4; //db name of DML operation, default db of session during DDL operation string db_name = 5; //Table name of DML operation string table_name = 6; //after image, the back image of the operation repeated Field row = 7; //before image, the front image of the operation repeated Field before_row = 8; //sql for non DML statements string query = 9; } message Field { //column name string name = 1; //character set string charset = 3; //The first few columns int32 idx = 2; //Corresponding to type in java int32 type_num = 4; //For MySQL, type num in MySQL; int32 org_type = 5; //name of the original type in db string org_type_name = 6; //Reserved flag int32 flag = 7; //NULL or not bool is_null = 8; //Is it pk? bool is_pk = 9; //Whether it is unsigned value bool is_unsigned = 10; //Whether it is a timestamp value(timestamp shows that the value is related to the time zone, and the value of the standard time zone is recorded here) bool is_timestamp = 11; //The value of value is expressed in bytes. The consumer needs to generate the corresponding string with charset and bytes first, and then convert it to the type value corresponding to type_num //At the same time, if charset is empty, it means that the original data type is binary bytes value = 12; }
Code example
For a complete code example, please download the attachment at the end of this article, which contains three kinds of runtime (Python, PHP, nodejs) use cases. The following shows the short code examples used by each.
For the java runtime example, please refer to: Redis cache obsolescence
python
# -*- coding: utf-8 -*- from proto import rdsEvent_pb2 bin_data = b"" # local data only for test with open('./proto.bin', 'rb') as f: bin_data = f.read() def handler(event, context): target = rdsEvent_pb2.Message() target.ParseFromString(event) #target.ParseFromString(bin_data) print(target.offset, target.db_type) entry = target.entries[0] print(entry.operation, entry.db_name, entry.table_name) print(entry.row[0].name, entry.row[1].value, entry.row[2].type_num) return "OK"
nodejs
'use strict'; var fs = require("fs"); var protobuf = require("protobufjs"); module.exports.handler = function(event, context, callback) { protobuf.load("rdsEvent.proto", function(err, root) { if (err) throw err; var rdsEventMessage = root.lookupType("proto.Message"); // local data only for test //var event = fs.readFileSync('proto.bin'); var message = rdsEventMessage.decode(event); console.log(JSON.stringify(message)) callback(null, 'hello world'); }); };
php
<?php require_once __DIR__ . '/Proto/Field.php'; require_once __DIR__ . '/Proto/DBType.php'; require_once __DIR__ . '/Proto/OpType.php'; require_once __DIR__ . '/Proto/Entry.php'; require_once __DIR__ . '/Proto/Message.php'; require_once __DIR__ . '/GPBMetadata/RdsEvent.php'; // local data only for test $filename = __DIR__ . "/proto.bin"; $handle = fopen($filename, "rb"); $data = fread($handle, filesize($filename)); fclose($handle); function handler($event, $context) { $res = new \Proto\Message(); //$res->mergeFromString($data); $res->mergeFromString($event); echo $res->getDbType() . PHP_EOL; echo $res->getOffset() . PHP_EOL; echo $res->getEntries()[0]->getTableName() . PHP_EOL; return "ok"; }