Voiced by Amazon Polly |
Introduction
Change Data Capture (CDC) is a fascinating, transformative data integration and replication concept. It is that captures and propagates data modifications within a database, enabling real-time synchronization between multiple systems or applications. CDC presents a paradigm shift from traditional batch-based methods, offering organizations a more efficient and precise way to handle data changes.
At its core, the CDC captures individual data alterations, such as inserts, updates, and deletes, as they occur in the source database. Rather than periodically extracting and processing entire datasets, the CDC selectively captures the relevant changes and immediately transfers them to the target systems. This approach minimizes the data transfer volume, reduces latency, and optimizes resource utilization, making it ideal for scenarios where timeliness and accuracy are crucial.
Several tools available in the market provide Change Data Capture (CDC) functionality, but in this example, we will be using Debezium.
This solution is explained in 3 parts, earlier, in the 1st part, we saw the creation of VPC and installation of an Amazon EC2 machine(private) which can be SSH without using Bastion host that is with using Amazon EC2 Instance Connect (EIC) Endpoint, in the 2nd part, launching Amazon RDS, install Apache Kafka and configure Debezium on Private Amazon EC2. This is the 3rd part, where we will see CRUD operation on MySQL Database to check if Debezium is working.
Pioneers in Cloud Consulting & Migration Services
- Reduced infrastructural costs
- Accelerated application deployment
Steps to perform CRUD Operations on MySQL Database
Step 1: Let’s do a CRUD operation on MySQL Database to check if Debezium is working
Keep multiple tabs to check, such as tabs on MySQL CRUD operation, Debezium running, and Apache Kafka topic consumption.
We already have the Debezium Connector running.
MySQL tab
Step 2: Now let’s have another tab for Consumers to consume the data from Apache Kafka topics
1 |
./kafka/bin/kafka-console-consumer.sh --bootstrap-server EC2PrivateIPAddress:9092 --topic mysql.mysqldb.sample --from-beginning |
Step 3: Now insert a record
Step 4: Goto consumer and check
1 |
"payload":{"before":null,"after":{"id":3,"name":"suresh"},"source":{"version":"1.3.1.Final","connector":"mysql","name":"mysql","ts_ms":1688897274000,"snapshot":"false","db":"mysqldb","table":"sample","server_id":2096180673,"gtid":null,"file":"mysql-bin-changelog.000019","pos":754,"row":0,"thread":22,"query":null},"op":"c","ts_ms":1688897274460,"transaction":null}} |
As you can see here, whenever an operation is performed on the database, the Debezium records it in the Apache Kafka topic.
Step 5: Delete the NAT Gateway and Release the Elastic IP because it will cost.
Step 6: Create AWS Lambda for Apache Kafka Topic as an event source to consume and produce it to another Table in the database.
Step 7: First, go to MySQL Tab and create a table called destination table
Step 8: Goto Amazon EC2 and edit Security Group
Steps to Create Endpoints
Step 1: Create STS Endpoint
Step 2: Create AWS Lambda and Amazon EC2 Endpoints as well.
Steps to Create of AWS Lambda Function
AWS Lambda is a serverless compute service that Amazon Web Services (AWS) provides. It allows users to run their code without provisioning or managing servers. With AWS Lambda, developers can upload their code, and the service automatically scales and manages the infrastructure needed to run the code. AWS Lambda supports a wide range of programming languages and can be triggered by various events, such as changes in data, API requests, or scheduled intervals. It offers a pay-per-use pricing model, where users are billed only for the actual compute time consumed by their code. AWS Lambda simplifies the process of building scalable and event-driven applications, enabling developers to focus on writing code rather than managing servers. We will use the Self-Managed Apache Kafka as an event source for AWS Lambda, which will be used in the trigger and send the CDC event data to another table in a different database/same with Dynamic attribute.
Step 1: Now go to the AWS Lambda service and create a function.
Step 2: Attach these AWS IAM Role of AWS Lambda
Step 3: Goto AWS Lambda Configuration -> Amazon VPC
Step 4: Now go to the Trigger section and add the necessary details
Step 5: Add MySQL Connector to the AWS Lambda with the help of below video
Step 6: Add the below code to AWS Lambda
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
import json import base64 import mysql.connector def lambda_handler(event, context): print("Event: ",event) if 'mysql.mysqldb.sample-0' in event['records']: value=event['records']['mysql.mysqldb.sample-0'][0]['value'] print("value : ",value) decoded = (base64.b64decode(value)).decode('utf-8') print("after decoded : ",decoded) decoded_value = json.loads(decoded) fields=decoded_value['schema']['fields'] print("fields: ", fields) final_fields=[] final_types=[] for field in fields: # print("field :",field) nested_fields = field['fields'] # print("Nested :", nested_fields) for field in nested_fields: final_fields.append(field['field']) if(field['type']=="int32"): final_types.append("int") elif(field['type']=="string"): final_types.append("varchar(100)") break field_names = final_fields field_types = final_types print("field Names: ",field_names) print("field types: ",field_types) conn = mysql.connector.connect( host="RDS-MySQL-Endpoint", user="admin", password="adminpass", database="mysqldb" ) cursor = conn.cursor() table_name = "destinationtable" cursor.execute(f"DESCRIBE {table_name}") existing_fields = [field[0] for field in cursor.fetchall()] print("Existing Fields: ", existing_fields) # Add the new fields to the table for i in range(len(field_names)): field_name = field_names[i] field_type = field_types[i] # Check if the field already exists in the table if field_name not in existing_fields: # Alter the table to add the new field alter_sql = f"ALTER TABLE {table_name} ADD COLUMN {field_name} {field_type}" cursor.execute(alter_sql) print("table altered") insert_sql = f"INSERT INTO {table_name} ({', '.join(field_names)}) VALUES ({', '.join(['%s'] * len(field_names))})" print("insert sql: ",insert_sql) data=decoded_value['payload']['after'] print("payload: ",data) cursor.execute(insert_sql, tuple(data[field_name] for field_name in field_names)) conn.commit() print("Data inserted") |
Amazon CloudWatch Logs
Step 7: Final result where we insert a table, and the CDC data will be stored in the destination table. The attributes are dynamically created if it doesn’t exist in the table.
Conclusion
Finally, the entire process is done from RDS MySQL, Apache Kafka Topics, Debezium Connector, and AWS Lambda to process the CDC data into another table of RDS MySQL by using Endpoints for communication. The Debezium connector sends CDC data instantaneously, as seen in the above examples. AWS Lambda triggers whenever the topic receives the data from MySQL, and the trigger has various options such as batch, window, etc.
Click here to check out the Part 1 and Part 2.
Drop a query if you have any questions regarding Amazon Lambda and we will get back to you quickly.
Making IT Networks Enterprise-ready – Cloud Management Services
- Accelerated cloud migration
- End-to-end view of the cloud environment
About CloudThat
CloudThat is an official AWS (Amazon Web Services) Advanced Consulting Partner and Training partner, AWS Migration Partner, AWS Data and Analytics Partner, AWS DevOps Competency Partner, Amazon QuickSight Service Delivery Partner, AWS EKS Service Delivery Partner, and Microsoft Gold Partner, helping people develop knowledge of the cloud and help their businesses aim for higher goals using best-in-industry cloud computing practices and expertise. We are on a mission to build a robust cloud computing ecosystem by disseminating knowledge on technological intricacies within the cloud space. Our blogs, webinars, case studies, and white papers enable all the stakeholders in the cloud computing sphere.
To get started, go through our Consultancy page and Managed Services Package, CloudThat’s offerings.
FAQs
1. Do Endpoints cost?
ANS: – Yes, please look into AWS service pricing for more.
2. Can we have multiple topics in the same trigger?
ANS: – No, only one topic for a trigger. If you need more, then add another trigger.
WRITTEN BY Suresh Kumar Reddy
Yerraballi Suresh Kumar Reddy is working as a Research Associate - Data and AI/ML at CloudThat. He is a self-motivated and hard-working Cloud Data Science aspirant who is adept at using analytical tools for analyzing and extracting meaningful insights from data.
Click to Comment