GO language -- migrate from MySQL database to Amazon DynamoDB
1, Foreword
The blogger is learning the Go language. It happens that the Leader asked to study whether the migration from MySQL database to DynamoDB can be realized by script
This blog post is a simple migration implemented by bloggers using Go language, because the code is not very beautiful and there is no algorithm optimization for beginners to learn Go language.
Functionally, it's just a simple one-to-one migration of data to DynamoDB. The original MySQL database type is String in DynamoDB.
We will continue to optimize the program in the future. We also welcome the leaders of Go language to put forward valuable modification and optimization schemes.
1.1 migration background
Many companies consider migrating from relational databases such as MySQL to Amazon DynamoDB
Amazon DynamoDB is a fully hosted, fast, highly scalable and flexible NoSQL database. DynamoDB can increase or decrease capacity according to traffic requirements. Compared with a typical media based RDBMS, the total cost of services can be optimized more easily
1.2 migration issues
- Service interruption due to downtime, especially when customer service must be seamlessly available 24 / 7 / 365
- Different key designs for RDBMS and DynamoDB
1.3 official migration method of AWS
Two types of migration based on AWS managed services: https://aws.amazon.com/cn/blogs/big-data/near-zero-downtime-migration-from-mysql-to-dynamodb/
Learning video: https://www.youtube.com/watch?v=j88icq7JArI
2, Ideas and functions
2.1 ideas
-
Initial database connection and DynamoDB client
-
Read MySQL database
-
Get MySQL data tables in each database
-
Convert data table structure to DynamoDB structure (field, type)
- Get the table field and judge whether the table field is a primary key
-
Create DynamoDB table:
-
Define DynamoDB table name: mysql database name_ data sheet
-
Create DynamoDB table
-
-
Get the data of MySQL data table circularly and load it into DynamoDB
-
Get all column data information and the number of rows
-
Read data from data table
-
Write data to DynamoDB
2.2 main function
The main function is to call other functions
Initialize the database connection, and initialize the Amazon DynamoDB client
// Initialize database connection db := Mysql.ConnectDB() if db == nil { fmt.Printf("init db failed!\n") return } // Initial DynamoDB client cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion("ap-southeast-1")) if err != nil { log.Fatalf("unable to load SDK config, %v", err) } // Using the Config value, create the DynamoDB client svc := dynamodb.NewFromConfig(cfg)
function call
// 1 read database Mysql.DatabaseInfo(db) // 2 read data table Mysql.TableInfo(db, database[i]) // 3 get table fields Mysql.TableFiledInfo(db, database[i], table[j]) // 4 create DynamoDB table DynamoDB.CreateDynamoDB(svc, field, tableName) // 5.1 obtain all column data information and the number of rows Mysql.TableData(db, field, database[i], table[j]) // 5.3 write data to DynamoDB DynamoDB.PutItemDynamoDB(svc , itemMap, tableName)
3, MySQL function
3.1 query all non system databases
In MySQL database, information_ The schema.tables table stores the metadata of the MySQL database.
Metadata information mainly includes table information and table field information in the database_ Query database information in schema.tables table:
SELECT table_schema databaseName FROM INFORMATION_SCHEMA.TABLES WHERE UPPER(table_type)='BASE TABLE' AND table_schema NOT IN ('mysql','performance_schema','sys') GROUP BY table_schema ORDER BY table_schema asc
UPPER(table_type)='BASE TABLE' only select the basic database. In MySQL database, the artificially created databases are of BASE TABLE type.
In MySQL database, the database mysql, performance_ Schema and sys are of type BASE TABLE. However, these three databases are also MySQL's own databases, not user databases, and need to be excluded.
func DatabaseInfo(db *sql.DB) []string { sqlStr := `SELECT table_schema databaseName FROM INFORMATION_SCHEMA.TABLES WHERE UPPER(table_type)='BASE TABLE' AND table_schema NOT IN ('mysql','performance_schema','sys') GROUP BY table_schema ORDER BY table_schema asc` rows, err := db.Query(sqlStr) // Close query defer rows.Close() if err != nil { fmt.Printf("query table name error!err:%v\n", err) return nil //panic(err) } var result []string for rows.Next() { var tableName string err = rows.Scan(&tableName) if err != nil { fmt.Printf("scan table name error!err:%v\n", err) return nil } result = append(result, tableName) } return result }
Traverse the query results, and the database name is stored in the form of array.
3.2 query data table information
All table structures in the database, from information_ Queries in the schema.tables table. The code logic is consistent with the query database information
sqlStr query statement,? Incoming database name
sqlStr := `SELECT table_name tableName FROM INFORMATION_SCHEMA.TABLES WHERE UPPER(table_type)='BASE TABLE' AND LOWER(table_schema) = ? GROUP BY table_name ORDER BY table_name asc`
Traverse the query results, and the data table name is stored in the form of array.
3.3 query data table field information
All field structures in the data table, from information_ Queries in the schema.tables table. The code logic is consistent with the query database information
Data table field structure:
- Fname: table field name
- ColumnKey: whether the table field attribute is a primary key (PRI)
- dataType: table field type
type Field struct { Fname string ColumnKey string dataType string }
sqlStr query statement,? Incoming database and data table names
sqlStr := `SELECT COLUMN_NAME fName,COLUMN_KEY columnKey,DATA_TYPE dataType FROM information_schema.columns WHERE table_schema = ? AND table_name = ?`
Function returns a table field structure
3.4 query all information in the table
For queries in github.com/go-sql-driver/mysql native to go language, you need to specify the same number of variables as the query results to output the query results.
Official document description: https://pkg.go.dev/database/sql#Row.Scan
Therefore, the table fields obtained by bloggers through the previous function are queried one by one and summarized into map type data with a column query structure.
- Traverse the field array and Query the value of this column in the data table through Query
- Use rows.Next to traverse the query result, and rows.Scan to obtain the column value and append it to the array
- Use map type to store all data of a table in the form of key (field name): value [value array]
func TableData(db *sql.DB,field []Field,database, table string) (map[string][]string, int) { result := make(map[string][]string) var rowsLength int for i := 0; i < len(field); i++ { sqlStr := "SELECT " + field[i].Fname + " from " + database + "." + table //fmt.Println(sqlStr) rows, err := db.Query(sqlStr) if err != nil { fmt.Printf("Failed to query table! error: %v\n", err) return nil, 0 } defer rows.Close() var columnValue string var oneResult []string for rows.Next() { err = rows.Scan(&columnValue) if err != nil { fmt.Printf("Failed to scan a field in a table!err:%v\n", err) return nil, 0 } oneResult = append(oneResult, columnValue) } if len(oneResult) == 0 { fmt.Printf("%v.%v not data!\n", database, table) return nil, 0 } result[field[i].Fname] = oneResult rowsLength = len(oneResult) } return result, rowsLength }
-
db: *sql.DB database connection
-
field: []Field data table field
-
database: string database
-
table: string data table
-
return: map[string][]string returns the data of each column
4, Amazon DynamoDB
4.1 creating DynamoDB table
In DynamoDB's design, there is only one partition key and one sort key. Of course, Amazon DynamoDB can also add global indexes and local indexes. This method is complex. Here, only partition keys and sorting keys are used
Because DynamoDB has only two keys and must specify a partition key.
In MySQL database, there may be more than two or no primary keys. In these two cases, bloggers judge the field attribute values obtained earlier.
If there are two primary keys, the two primary keys in front of the query results are used as partition keys and sorting keys respectively; If there is no primary key, the first column of the query result is used as the partition key.
The best way here is to write an interface to modify the format of each data table converted to DynamoDB according to actual production
In addition, all tables created are of String type by default, and the format of the original field is not determined. Bloggers understand that the best way is to use the reflection mechanism of Go language to judge the converted DynamoDB field type
func CreateDynamoDB(svc *dynamodb.Client, field [] Mysql.Field, tableName string) *dynamodb.CreateTableOutput { var attributeDefinitions []types.AttributeDefinition var keySchema []types.KeySchemaElement for i :=0; i < len(field); i++ { if (field[i].ColumnKey == "PRI") && (len(attributeDefinitions) < 1) { // The first primary key acts as the partition key Attribute := []types.AttributeDefinition{ { AttributeName: aws.String(field[i].Fname), AttributeType: types.ScalarAttributeTypeS, }, } schemaElement := []types.KeySchemaElement{ { AttributeName: aws.String(field[i].Fname), KeyType: types.KeyTypeHash, }, } attributeDefinitions = append(attributeDefinitions, Attribute...) keySchema = append(keySchema, schemaElement...) } else if (field[i].ColumnKey == "PRI") && (len(attributeDefinitions) >= 1) { // The second primary key is used as the sort key Attribute := []types.AttributeDefinition{ { AttributeName: aws.String(field[i].Fname), AttributeType: types.ScalarAttributeTypeS, }, } schemaElement := []types.KeySchemaElement{ { AttributeName: aws.String(field[i].Fname), KeyType: types.KeyTypeRange, }, } attributeDefinitions = append(attributeDefinitions, Attribute...) keySchema = append(keySchema, schemaElement...) } // When there are more than two primary keys, only the first two primary keys are selected if len(attributeDefinitions) >= 2 { fmt.Printf("The database primary key is greater than or equal to 2!tableName:%v\n", tableName) break } } // If there is no primary key, the first table field is the partition key of DynamoDB if len(attributeDefinitions) == 0 { attributeDefinitions = []types.AttributeDefinition{ { AttributeName: aws.String(field[0].Fname), AttributeType: types.ScalarAttributeTypeS, }, } keySchema = []types.KeySchemaElement{ { AttributeName: aws.String(field[0].Fname), KeyType: types.KeyTypeHash, }, } fmt.Printf("No primary key exists in the database!tableName:%v\n", tableName) } //fmt.Println(attributeDefinitions[1].AttributeName) input := &dynamodb.CreateTableInput{ AttributeDefinitions: attributeDefinitions, KeySchema: keySchema, ProvisionedThroughput: &types.ProvisionedThroughput{ ReadCapacityUnits: aws.Int64(5), WriteCapacityUnits: aws.Int64(5), }, TableName: aws.String(tableName), } result, err := svc.CreateTable(context.TODO(),input) if err != nil { fmt.Printf("Failed to create DynamoDB! error: %v\n", err) return nil } // CreateTable is an asynchronous operation. You need to wait for a certain time to continue with the next step time.Sleep(time.Second * 5) return result }
4.2 insert data
By traversing to get all data to the MySQL data table, add the data to the DynamoDB format map, invoke the PutItemInput interface to add data.
The added data types are all String types, and the format of the original field is not judged. Bloggers understand that the best way is to use the reflection mechanism of Go language to judge the converted DynamoDB field type
for k := 0; k < rowLength; k++ { itemMap := make(map[string]types.AttributeValue) for itemName, item := range tableData { itemMap[itemName] = &types.AttributeValueMemberS{Value: item[k]} } // 5.3 write data to DynamoDB putItemReuslt := DynamoDB.PutItemDynamoDB(svc , itemMap, tableName) if putItemReuslt != nil { fmt.Println("put Item succeed!") } else { panic(putItemReuslt) } }
func PutItemDynamoDB(svc *dynamodb.Client, itemMap map[string]types.AttributeValue, tableName string) *dynamodb.PutItemOutput{ input := &dynamodb.PutItemInput{ Item: itemMap, ReturnConsumedCapacity: types.ReturnConsumedCapacityTotal, TableName: aws.String(tableName), } result, err := svc.PutItem(context.TODO(),input) if err != nil { fmt.Printf("Failed to put Item! error: %v\n", err) return nil } return result }
Appendix:
reference resources
MySQL Driver: https://github.com/Go-SQL-Driver/MySQL/
AWS Go DynamoDB SDKv2: https://pkg.go.dev/github.com/aws/aws-sdk-go-v2/service/dynamodb#Client.PutItem
Code address
GitHub: https://github.com/MoGD2018/Go-mysql-convert-to-dynamodb
Gitee: https://gitee.com/MoGD/Go-mysql-convert-to-dynamodb