Migrating from MySQL database to AWS DynamoDB

GO language -- migrate from MySQL database to Amazon DynamoDB

1, Foreword

The blogger is learning the Go language. It happens that the Leader asked to study whether the migration from MySQL database to DynamoDB can be realized by script

This blog post is a simple migration implemented by bloggers using Go language, because the code is not very beautiful and there is no algorithm optimization for beginners to learn Go language.

Functionally, it's just a simple one-to-one migration of data to DynamoDB. The original MySQL database type is String in DynamoDB.

We will continue to optimize the program in the future. We also welcome the leaders of Go language to put forward valuable modification and optimization schemes.

1.1 migration background

Many companies consider migrating from relational databases such as MySQL to Amazon DynamoDB

Amazon DynamoDB is a fully hosted, fast, highly scalable and flexible NoSQL database. DynamoDB can increase or decrease capacity according to traffic requirements. Compared with a typical media based RDBMS, the total cost of services can be optimized more easily

1.2 migration issues

  • Service interruption due to downtime, especially when customer service must be seamlessly available 24 / 7 / 365
  • Different key designs for RDBMS and DynamoDB

1.3 official migration method of AWS

Two types of migration based on AWS managed services: https://aws.amazon.com/cn/blogs/big-data/near-zero-downtime-migration-from-mysql-to-dynamodb/

Learning video: https://www.youtube.com/watch?v=j88icq7JArI

2, Ideas and functions

2.1 ideas

  1. Initial database connection and DynamoDB client

  2. Read MySQL database

  3. Get MySQL data tables in each database

  4. Convert data table structure to DynamoDB structure (field, type)

  • Get the table field and judge whether the table field is a primary key
  1. Create DynamoDB table:

    • Define DynamoDB table name: mysql database name_ data sheet

    • Create DynamoDB table

  2. Get the data of MySQL data table circularly and load it into DynamoDB

  • Get all column data information and the number of rows

  • Read data from data table

  • Write data to DynamoDB

2.2 main function

The main function is to call other functions

Initialize the database connection, and initialize the Amazon DynamoDB client

// Initialize database connection
	db := Mysql.ConnectDB()
	if db == nil {
		fmt.Printf("init db failed!\n")
		return
	}
	// Initial DynamoDB client
	cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion("ap-southeast-1"))
	if err != nil {
		log.Fatalf("unable to load SDK config, %v", err)
	}
	// Using the Config value, create the DynamoDB client
	svc := dynamodb.NewFromConfig(cfg)

function call

// 1 read database
Mysql.DatabaseInfo(db)
// 2 read data table
Mysql.TableInfo(db, database[i])
// 3 get table fields
Mysql.TableFiledInfo(db, database[i], table[j])
// 4 create DynamoDB table
DynamoDB.CreateDynamoDB(svc, field, tableName)
// 5.1 obtain all column data information and the number of rows
Mysql.TableData(db, field, database[i], table[j])
// 5.3 write data to DynamoDB
DynamoDB.PutItemDynamoDB(svc , itemMap, tableName)

3, MySQL function

3.1 query all non system databases

In MySQL database, information_ The schema.tables table stores the metadata of the MySQL database.

Metadata information mainly includes table information and table field information in the database_ Query database information in schema.tables table:

SELECT table_schema databaseName
FROM INFORMATION_SCHEMA.TABLES 
WHERE UPPER(table_type)='BASE TABLE'
AND table_schema NOT IN ('mysql','performance_schema','sys')
GROUP BY table_schema
ORDER BY table_schema asc

UPPER(table_type)='BASE TABLE' only select the basic database. In MySQL database, the artificially created databases are of BASE TABLE type.

In MySQL database, the database mysql, performance_ Schema and sys are of type BASE TABLE. However, these three databases are also MySQL's own databases, not user databases, and need to be excluded.

func DatabaseInfo(db *sql.DB) []string {
	sqlStr := `SELECT table_schema databaseName
			FROM INFORMATION_SCHEMA.TABLES 
			WHERE UPPER(table_type)='BASE TABLE'
			AND table_schema NOT IN ('mysql','performance_schema','sys')
			GROUP BY table_schema
			ORDER BY table_schema asc`
	
	rows, err := db.Query(sqlStr)
	// Close query
	defer rows.Close()
	if err != nil {
		fmt.Printf("query table name error!err:%v\n", err)
		return nil
		//panic(err)
	}
	var result []string
	for rows.Next() {
		var tableName string
		err = rows.Scan(&tableName)
		if err != nil {
			fmt.Printf("scan table name error!err:%v\n", err)
			return nil
		}
		result = append(result, tableName)
	}
	return result

}

Traverse the query results, and the database name is stored in the form of array.

3.2 query data table information

All table structures in the database, from information_ Queries in the schema.tables table. The code logic is consistent with the query database information

sqlStr query statement,? Incoming database name

sqlStr := `SELECT table_name tableName
			FROM INFORMATION_SCHEMA.TABLES 
			WHERE UPPER(table_type)='BASE TABLE'
			AND LOWER(table_schema) = ? 
			GROUP BY table_name
			ORDER BY table_name asc`

Traverse the query results, and the data table name is stored in the form of array.

3.3 query data table field information

All field structures in the data table, from information_ Queries in the schema.tables table. The code logic is consistent with the query database information

Data table field structure:

  • Fname: table field name
  • ColumnKey: whether the table field attribute is a primary key (PRI)
  • dataType: table field type
type Field struct {
	Fname     string
	ColumnKey string
	dataType  string
}

sqlStr query statement,? Incoming database and data table names

sqlStr := `SELECT COLUMN_NAME fName,COLUMN_KEY columnKey,DATA_TYPE dataType
			FROM information_schema.columns 
			WHERE table_schema = ? AND table_name = ?`

Function returns a table field structure

3.4 query all information in the table

For queries in github.com/go-sql-driver/mysql native to go language, you need to specify the same number of variables as the query results to output the query results.

Official document description: https://pkg.go.dev/database/sql#Row.Scan

Therefore, the table fields obtained by bloggers through the previous function are queried one by one and summarized into map type data with a column query structure.

  1. Traverse the field array and Query the value of this column in the data table through Query
  2. Use rows.Next to traverse the query result, and rows.Scan to obtain the column value and append it to the array
  3. Use map type to store all data of a table in the form of key (field name): value [value array]
func TableData(db *sql.DB,field []Field,database, table string) (map[string][]string, int) {
	result := make(map[string][]string)
	var rowsLength int
	for i := 0; i < len(field); i++ {
		sqlStr := "SELECT " + field[i].Fname + " from " + database + "." + table
		//fmt.Println(sqlStr)
		rows, err := db.Query(sqlStr)
		if err != nil {
			fmt.Printf("Failed to query table! error: %v\n", err)
			return nil, 0
		}
		defer rows.Close()
		var columnValue string
		var oneResult []string
		for rows.Next() {
			err = rows.Scan(&columnValue)
			if err != nil {
				fmt.Printf("Failed to scan a field in a table!err:%v\n", err)
				return nil, 0
			}
			oneResult = append(oneResult, columnValue)
		}
		if len(oneResult) == 0 {
			fmt.Printf("%v.%v not data!\n", database, table)
			return nil, 0
		}
		result[field[i].Fname] = oneResult
		rowsLength = len(oneResult)
	}
	return result, rowsLength
}
  • db: *sql.DB database connection

  • field: []Field data table field

  • database: string database

  • table: string data table

  • return: map[string][]string returns the data of each column

4, Amazon DynamoDB

4.1 creating DynamoDB table

In DynamoDB's design, there is only one partition key and one sort key. Of course, Amazon DynamoDB can also add global indexes and local indexes. This method is complex. Here, only partition keys and sorting keys are used

Because DynamoDB has only two keys and must specify a partition key.

In MySQL database, there may be more than two or no primary keys. In these two cases, bloggers judge the field attribute values obtained earlier.

If there are two primary keys, the two primary keys in front of the query results are used as partition keys and sorting keys respectively; If there is no primary key, the first column of the query result is used as the partition key.

The best way here is to write an interface to modify the format of each data table converted to DynamoDB according to actual production

In addition, all tables created are of String type by default, and the format of the original field is not determined. Bloggers understand that the best way is to use the reflection mechanism of Go language to judge the converted DynamoDB field type

func CreateDynamoDB(svc *dynamodb.Client, field [] Mysql.Field, tableName string) *dynamodb.CreateTableOutput {
	var attributeDefinitions []types.AttributeDefinition
	var keySchema []types.KeySchemaElement
	for i :=0; i < len(field); i++ {
		if (field[i].ColumnKey == "PRI") && (len(attributeDefinitions) < 1)  {
			// The first primary key acts as the partition key
			Attribute := []types.AttributeDefinition{
				{
					AttributeName: aws.String(field[i].Fname),
					AttributeType: types.ScalarAttributeTypeS,
				},
			}
			schemaElement := []types.KeySchemaElement{
				{
					AttributeName: aws.String(field[i].Fname),
					KeyType:       types.KeyTypeHash,
				},
			}
			attributeDefinitions = append(attributeDefinitions, Attribute...)
			keySchema = append(keySchema, schemaElement...)
		} else if (field[i].ColumnKey == "PRI") && (len(attributeDefinitions) >= 1) {
			// The second primary key is used as the sort key
			Attribute := []types.AttributeDefinition{
				{
					AttributeName: aws.String(field[i].Fname),
					AttributeType: types.ScalarAttributeTypeS,
				},
			}
			schemaElement := []types.KeySchemaElement{
				{
					AttributeName: aws.String(field[i].Fname),
					KeyType:       types.KeyTypeRange,
				},
			}
			attributeDefinitions = append(attributeDefinitions, Attribute...)
			keySchema = append(keySchema, schemaElement...)
		}
		// When there are more than two primary keys, only the first two primary keys are selected
		if len(attributeDefinitions) >= 2 {
			fmt.Printf("The database primary key is greater than or equal to 2!tableName:%v\n", tableName)
			break
		}
	}
	// If there is no primary key, the first table field is the partition key of DynamoDB
	if len(attributeDefinitions) == 0 {
		attributeDefinitions = []types.AttributeDefinition{
			{
				AttributeName: aws.String(field[0].Fname),
				AttributeType: types.ScalarAttributeTypeS,
			},
		}
		keySchema = []types.KeySchemaElement{
			{
				AttributeName: aws.String(field[0].Fname),
				KeyType:       types.KeyTypeHash,
			},
		}
		fmt.Printf("No primary key exists in the database!tableName:%v\n", tableName)
	}
	//fmt.Println(attributeDefinitions[1].AttributeName)
	input := &dynamodb.CreateTableInput{
		AttributeDefinitions: attributeDefinitions,
		KeySchema: keySchema,
		ProvisionedThroughput: &types.ProvisionedThroughput{
			ReadCapacityUnits:  aws.Int64(5),
			WriteCapacityUnits: aws.Int64(5),
		},
		TableName: aws.String(tableName),
	}

	result, err := svc.CreateTable(context.TODO(),input)
	if err != nil {
		fmt.Printf("Failed to create DynamoDB! error: %v\n", err)
		return nil
	}
	// CreateTable is an asynchronous operation. You need to wait for a certain time to continue with the next step
	time.Sleep(time.Second * 5)

	return result
}

4.2 insert data

By traversing to get all data to the MySQL data table, add the data to the DynamoDB format map, invoke the PutItemInput interface to add data.

The added data types are all String types, and the format of the original field is not judged. Bloggers understand that the best way is to use the reflection mechanism of Go language to judge the converted DynamoDB field type

     for k := 0; k < rowLength; k++ {
				itemMap := make(map[string]types.AttributeValue)
				for itemName, item := range tableData {
					itemMap[itemName] =  &types.AttributeValueMemberS{Value: item[k]}
				}
				// 5.3 write data to DynamoDB
				putItemReuslt := DynamoDB.PutItemDynamoDB(svc , itemMap, tableName)
				if putItemReuslt != nil {
					fmt.Println("put Item succeed!")
				} else {
					panic(putItemReuslt)
				}
			}
func PutItemDynamoDB(svc *dynamodb.Client, itemMap map[string]types.AttributeValue, tableName string) *dynamodb.PutItemOutput{

	input := &dynamodb.PutItemInput{
		Item: itemMap,
		ReturnConsumedCapacity: types.ReturnConsumedCapacityTotal,
		TableName:              aws.String(tableName),
	}
	result, err := svc.PutItem(context.TODO(),input)
	if err != nil {
		fmt.Printf("Failed to put Item! error: %v\n", err)
		return nil
	}

	return result
}

Appendix:

reference resources

MySQL Driver: https://github.com/Go-SQL-Driver/MySQL/

AWS Go DynamoDB SDKv2: https://pkg.go.dev/github.com/aws/aws-sdk-go-v2/service/dynamodb#Client.PutItem

Code address

GitHub: https://github.com/MoGD2018/Go-mysql-convert-to-dynamodb

Gitee: https://gitee.com/MoGD/Go-mysql-convert-to-dynamodb

Keywords: Go Database MySQL

Added by astoller on Mon, 08 Nov 2021 14:13:10 +0200