Post

CDC in AWS: Content Data Capture from AWS RDS MySQL into AWS MSK Kafka topic using Debezium

Kafka connect is a powerful open-source platform for Change Data Capture (CDC), enabling real-time event streaming from databases like MySQL. In this post, we’ll explore how to set up one of the most popular Open Source Debezium to capture changes from an AWS MySQL RDS instance and publish them to a Kafka topic. We’ll also dive into the Debezium message format and explain the difference between standalone and cluster modes.

What problem does it solve

Debezium allows you to:

  • Stream real-time changes from a MySQL RDS instance.
  • Track inserts, updates, and deletes as they occur.
  • Publish these changes to Kafka topics for downstream processing.

infra.png

This is particularly useful for building event-driven architectures, data pipelines, and synchronizing databases with other systems.

Besides Debezium there are multiple opensource connectors available on confluent platform to provide intergration point with different sink and source systems like AWS S3, ElasticSearch **, **MongoDB, etc

Setting Up Debezium for MySQL RDS

1. Enable Binary Logging on MySQL RDS:

Log in to your RDS instance and ensure binary logging is enabled in your parameter group. Configure the following parameters:

  • binlog_format = RAW
  • binlog_row_image = FULL

  • Ensure binlog_retention_period is set to a sufficient duration for your use case.
1
2
SHOW
VARIABLES LIKE 'binlog_format';
Variable_nameValue
binlog_formatRAW

SHOW VARIABLES LIKE ‘log_bin%’; SHOW VARIABLES LIKE ‘binlog_format’; SHOW VARIABLES LIKE ‘binlog_row_image’;

1
2
3
SET SESSION binlog_format = 'ROW';
SET
GLOBAL binlog_format = 'ROW';

3. Run Debezium with Kafka Connect:

Use Docker Compose to start Debezium and Kafka Connect:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
services:
  debezium:
    image: debezium/connect:2.7.3.Final
    ports:
      - "8083:8083"
    healthcheck:
      test: [ "CMD-SHELL", "curl -f http://localhost:8083/ || exit 1" ]
    networks:
      - kafka-cluster
    environment:
      - BOOTSTRAP_SERVERS=b-4.kafka-dev-0.xxxx.c6.kafka.us-east-1.amazonaws.com:9092,b-3.kafka-dev-0.xxxx.c6.kafka.us-east-1.amazonaws.com:9092,b-1.kafka-dev-0.xxxx.c6.kafka.us-east-1.amazonaws.com:9092
      - GROUP_ID=MYSQL_1
      - CONFIG_STORAGE_TOPIC=debezium_connect_configs
      - OFFSET_STORAGE_TOPIC=debezium_connect_offsets
      - STATUS_STORAGE_TOPIC=debezium_source_connect_statuses
      - CONFIG_STORAGE_REPLICATION_FACTOR=1
      - OFFSET_STORAGE_REPLICATION_FACTOR=1
      - STATUS_STORAGE_REPLICATION_FACTOR=1
networks:
  kafka-cluster:
    driver: bridge

CONFIG_STORAGE_TOPIC, OFFSET_STORAGE_TOPIC, STATUS_STORAGE_TOPIC are topics in Kafka where connector will store its own configuration and perform synchronization. CONFIG_STORAGE_REPLICATION_FACTOR, OFFSET_STORAGE_REPLICATION_FACTOR, STATUS_STORAGE_REPLICATION_FACTOR should always be 1 in replication factor.

4. Detect server_id that will be used in connector configuration

1
2
SHOW
VARIABLES LIKE 'server_id';

5. Configure the MySQL Connector

Kafka connect exposes HTTP port to establish and monitor connectors.

Create a MySQL source connector by sending a POST request to the Kafka Connect REST API:

1
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d @../payload.json
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
{
  "name": "mysql-moderation-comments-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "xxxx.us-east-1.rds.amazonaws.com",
    "database.port": "3306",
    "database.user": "user",
    "database.password": "password",
    "database.server.id": "this_is_mysql_server_id",
    "topic.prefix": "mysql-cdc",
    "database.include.list": "public",
    "table.include.list": "public.users",
    "schema.history.internal.kafka.bootstrap.servers": "b-4.kafka-dev-0.xxxx.c6.kafka.us-east-1.amazonaws.com:9092,b-3.kafka-dev-0.xxxx.c6.kafka.us-east-1.amazonaws.com:9092,b-1.kafka-dev-0.xxxx.c6.kafka.us-east-1.amazonaws.com:9092",
    "schema.history.internal.kafka.topic": "schema-changes.moderation",
    "include.schema.changes": true,
    "key.converter.schemas.enable": false,
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": false,
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "database.history.skip.unparseable.ddl": true
  }
}
ParameterDetails 
connector.classClass name of connector from JAVA_PATH in running container. There are pre-built available connectors - “plugins”, also we can add any exising plugin to container or write our own. 
tasks.maxonly 1 task should be operatable at time - proper order and handling of bin log Kafka connect service uses connectors to start 1 or more task that do the actual work and distributes running tasks across the cluster of Kafka connect services. If any of services stopped or crashed those tasks will be redistributed to other running services 
database.hostnameRDS endpoint IP or DNS 
database.port  
database.user  
database.password  
database.server.idunique identifier of MySQL server - this is a master server ID 
database.server.namelogical ID of the server or cluster of services, used as prefix for all kafka topics 
topic.prefix“mysql-server” - prefix that can be added to Kafka topic to distinguish it from other existing topics 
database.include.listcoma-separated list of DBs whose CDC should be captured 
schema.history.internal.kafka.topicconnector puts all DDL statements in this topic while reading the binlog. On restart the connector will recover the schema of the DB that existed in point in 
key.converterclassName of converter or transformer for Event 
value.convertervalue converter 
table.include.listname_of_the_table 

6. Export from MySQL table

Once connector is setup it will create a snapshot of data and will ingest into Kafka. After that connector will monitor for new records and updates in CDC.

1
2
3
4
5
6
debezium-1 | INFO   MySQL|mysql-cdc|snapshot    Exported 496538 of 548957 records for table 'public.users' after 00:41:53.279   [io.debezium.relational.RelationalSnapshotChangeEventSource]
debezium-1 | INFO   MySQL|mysql-cdc|snapshot    Exported 605082 of 548957 records for table 'public.users' after 00:53:07.497   [io.debezium.relational.RelationalSnapshotChangeEventSource]
debezium-1 | INFO   MySQL|mysql-cdc|snapshot    Finished exporting 605274 records for table 'public.users' (1 of 1 tables); total duration '00:53:07.522'   [io.debezium.relational.RelationalSnapshotChangeEventSource]
debezium-1 | INFO   MySQL|mysql-cdc|snapshot  Releasing table read lock to enable MySQL writes   [io.debezium.connector.binlog.BinlogSnapshotChangeEventSource]
debezium-1 | INFO   MySQL|mysql-cdc|snapshot  Writes to MySQL tables prevented for a total of 00:53:13.755   [io.debezium.connector.binlog.BinlogSnapshotChangeEventSource]
debezium-1 | INFO   ||  WorkerSourceTask{id=mysql-moderation-comments-connector-0} Committing offsets for 10242 acknowledged messages   [org.apache.kafka.connect.runtime.WorkerSourceTask]

7. Verify Kafka Topic

Use kafka-console-consumer to check the topic for messages:

1
kafka-console-consumer --bootstrap-server localhost:9092 --topic mysql-cdc.your_database.your_table --from-beginning

Debezium Data Format

Debezium emits messages to Kafka in a JSON format with three main parts:

  • key (Identifies the specific database row.)
  • value (Contains the actual change event, with fields like before, after, op, ts_ms, etc.)

Fields:

  • before: State of the row before the change.
  • after: State of the row after the change.
  • op: Type of operation (c for create, u for update, d for delete).
  • source: Metadata about the event source.
  • ts_ms: Timestamp of the event.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
  "op": "u",
  "source": {
    ...
  },
  "ts_ms": "...",
  "ts_us": "...",
  "ts_ns": "...",
  "before": {
    "userid": "1",
    "name": "bob"
  },
  "after": {
    "userid": "1",
    "field2": "alice"
  }
}

Hide schema from payload

There are multiple configurations and data formats that allow to transform event, hide not needed fields, also register custom SingleMessageTransformers etc.

Here we are instructing Debezium to exclude schema from the payload:

1
2
debezium.source.value.converter=org.apache.kafka.connect.json.JsonConverter
debezium.source.value.converter.schemas.enable=false

Conclusion

Debezium is an excellent choice for capturing data changes in real time from MySQL RDS and streaming them into Kafka. Its support for schema change tracking, rich message format, and scalability make it ideal for modern data pipelines.

Whether you choose standalone mode for simplicity or cluster mode for fault tolerance, Debezium provides the flexibility to meet your needs.

This post is licensed under CC BY 4.0 by the author.