Implement near real-time eventing from Snowflake to RDS - part 2
This post delves into the essentials of implementing Change Data Capture (CDC) using Snowflake User-Defined Procedures (UDPs), generating Avro messages, and ensuring data quality.
Part 2: Implementing a Snowflake CDC Solution with Avro, Kafka, and Postgres
In Part 1, we outlined the architecture for moving data from Snowflake to AWS RDS Aurora, emphasizing Snowflake Streams, Kafka, and Avro. This part delves into implementing CDC (Change Data Capture) using Snowflake UDP (User-Defined Procedures), generating Avro messages, ensuring data quality through schema verification, and addressing key challenges such as data type compatibility, volume management, and initial loads. We also explore Kafka topics and schema management and conclude with the role of Kafka's JDBC sink connector in delivering data to a PostgreSQL instance.
Snowflake CDC with Streams Solutions
Snowflake Streams are leveraged to capture data changes (INSERT, UPDATE, DELETE) in near real-time. The captured changes are processed using a UDP, which transforms the data into Avro format for downstream consumption.
Data Flow:
Snowflake (source) → Pandas (processing) → Kafka (messaging) → AWS RDS (destiny)
Implementing the data frames allowed us to deal with large datasets and prevented some of the memory limitations issues that were one of the main concerns.
Data Processing:
Snowpark/Pandas: Data extraction and transformation
Confluent Kafka: Message production, schema management, and data sink
This architecture focuses specifically on CDC operations based on the following principles:
- Proper data serialization
- Schema compatibility
- Efficient event processing
- Reliable message delivery
- Secure credential management
Snowflake Workflow:
The snowflake workflow is based on the following components:
- The stream that captures all changes to data in a specific table, view, and more
- The secret (if you want to keep things more secure)
- The task
- The User Define Procedure
Create a Snowflake Stream:
Track changes in a data source
CREATE OR REPLACE STREAM GG_STREAM_TEST
ON TABLE GG_TABLE_TEST;
Create a Snowflake Secret
CREATE OR REPLACE SECRET gg_secret_basic
TYPE = PASSWORD
USERNAME = 'gg_username'
PASSWORD = 'gg_password'
COMMENT = 'Secret for schema registry authentication';
Create a Snowflake Task:
CREATE OR REPLACE TASK gg_task_minute
WAREHOUSE = gg_data_warehouse
SCHEDULE = '1 MINUTE'
AS
CALL POC.CDC.PROC_GG_TEST();
-- Enable the task
ALTER TASK gg_task_minute RESUME;
Define a Snowflake UDP for Avro Conversion:
- The UDP reads records from the stream and produces the messages to the Kafka topic
CREATE OR REPLACE PROCEDURE POC.CDC.PROC_GG_TEST(
"TOPIC_NAME" VARCHAR(16777216),
"SCHEMA_NAME" VARCHAR(16777216),
"STREAM_NAME" VARCHAR(16777216))
RETURNS OBJECT
LANGUAGE PYTHON
RUNTIME_VERSION = '3.11'
SECRETS = ('cred'=gg_secret_basic)
...
code snippet to create snowflake procedure
topic_name = the name of the topic that will receive the messages
schema_name = the name given in the schema registry to the schema definition
stream_name = the name given to snowflake stream (ex: GG_STREAM_TEST)
- Import libraries
# ------- Snowflake related
import _snowflake
from snowflake.snowpark import Session
from snowflake.snowpark.types import StructType, StructField, IntegerType, LongType, DoubleType, StringType, DateType, TimestampType, BooleanType, BinaryType, FloatType, ShortType, ByteType, DecimalType, ArrayType, MapType, Timestamp
# ------- Kafka related
from confluent_kafka import Producer, KafkaError
from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
# ------- Dataframes related
import pandas as pd
# ------- Standard Libraries
from decimal import Decimal
from datetime import datetime
import time
...
code snippet to import the libraries used
- Applies necessary transformations
In the current example, we needed to transform only the datetime and double data types and handle the snowflake-specific data type labeled NaT (Not a Time). However, in other cases, more data types must be converted to achieve the correct format.
def convert_value(value):
if isinstance(value, datetime):
# Convert datetime to Unix timestamp in milliseconds
return int(value.timestamp() * 1000)
elif isinstance(value, Decimal):
# Convert Decimal to float
return float(value)
return value
datetime and decimal conversion for values
def convert_rows_to_dicts_schema(rows, field_types):
def convert_value(value, data_type):
if isinstance(data_type, TimestampType):
return int(value.timestamp() * 1000) if value is not None else None
elif isinstance(data_type, DecimalType):
return float(value) if value is not None else None
else:
return value
return [{key: convert_value(row[key], field_types[key])
for key in row.asDict()}
for row in rows]
datetime and decimal conversion for the schema
# Replace NaT (Not a Time) values with None
df.replace({pd.NaT: None}, inplace=True)
And the snowflake internal Nat
- Serializes them into Avro format
Kafka messages are composed of the key, the value, and the header.
Key serialization
# String serializer initialization
string_serializer = StringSerializer('utf_8')
# Key serialization in producer.produce
key=string_serializer(row['ID'])
...
Code snippet to serialise the key.
This example's message key was elementary, so we used a primary key. However, the key can be much more complex to open chances to become helpful in the event processing of Kafka.
Value serialization
# Avro serializer initialization
schema = get_latest_schema(schema_registry_client, "gg_schema_test")
avro_serializer = AvroSerializer(schema_registry_client, schema)
# AVRO serialization of the Value
value=avro_serializer(row, SerializationContext(topic, MessageField.VALUE))
The value serialization is based on the schema registered in the Schema Registry.
Header serialization
# Headers are automatically serialized as strings
headers=[
('METADATA$ROW_ID', str(row['METADATA$ROW_ID'])),
('METADATA$ACTION', str(row['METADATA$ACTION'])),
('METADATA$ISUPDATE', str(row['METADATA$ISUPDATE']))
]
the snowflake STREAMS generates metadata$row_id, metadata$action, and metadata$update automatically. Here are the explanations:
METADATA$ACTION:
Indicates the type of change that occurred:
INSERT – New row added
DELETE – Row deleted
For updates, Snowflake generates both a DELETE and INSERT pair.
METADATA$ISUPDATE:
Boolean flag indicating if the record is part of an UPDATE operation
TRUE – The row is part of an UPDATE
FALSE – The row is a regular INSERT or DELETE
METADATA$ROW_ID:
Unique identifier for each row version and allows tracking the same row across changes
Example: 23bc7a4d83522484f4d7e36edf84b4c7986dfa9b
- Write to Kafka topic.
# Initialize components
producer = get_producer()
# Process and produce messages
for df in result.to_pandas_batches():
df.replace({pd.NaT: None}, inplace=True)
for row in df.to_dict(orient='records'):
# Produce message to Kafka
producer.produce(
topic=topic,
# String serialization of the Key
key=string_serializer(row['ID']),
# AVRO serialization of the Value
value=avro_serializer(
row,
SerializationContext(topic, MessageField.VALUE)
),
# Header List
headers=[
('METADATA$ACTION', str(row['METADATA$ACTION'])),
('METADATA$ISUPDATE', str(row['METADATA$ISUPDATE'])),
('METADATA$ROW_ID', str(row['METADATA$ROW_ID']))
]
)
# Message delivery handler - Mandatory
producer.poll(0)
producer.flush()
This snippet serves to demonstrate the purpose of the process of producing Kafka messages.
It is important to mention the use of the data frames that allow reducing the complexity of the code by implementing the to_pandas_batch()the
Why Generating Avro Messages?
Avro provides a compact, binary serialization format that integrates seamlessly with Kafka. The Schema Registry plays a pivotal role in:
- Generating Avro Messages: Ensures consistency in message structure.
- Verifying Message Compatibility: Enforces schema evolution rules, preventing breaking changes.
- Message Size: Compared with JSON, the size is much smaller, less bandwidth needed, and less memory to process.
Steps to Generate and Validate Avro Messages:
{
"type": "record",
"name": "gg_schema_test",
"title": "gg_schema_test",
"fields": [
{"name": "ID", "type": ["string", "null"]},
{"name": "EMAIL", "type": ["string", "null"], "default": null},
{"name": "CREATED_AT", ["type": "long", "logicalType": "timestamp-millis"]},
{"name": "SALES", ["type": "double", "null"]}
]
}
Serialize Data:
- Using Avro libraries "confluent_kafka.schema_registry.avro" to force the compatibility with "Magic Byte".
Validate Schema:
- Kafka producers validate each message against the Schema Registry before publishing it on a topic.
Why Data Quality?
Data Type Conversion Challenges
- Problem: Inconsistent data types between Snowflake, Avro, and PostgreSQL can lead to errors and prevent the process.
- Solution: Standardize data types during serialization. For example:
- Snowflake
TIMESTAMP
→ Avrolong
(epoch milliseconds) → PostgreSQLTIMESTAMP
. - Snowflake NUMBER → Avro
double
→ PostgreSQLFLOAT
.
- Snowflake
def enhanced_convert_value(value, data_type):
if value is None:
return None
if isinstance(data_type, TimestampType):
return int(value.timestamp() * 1000)
elif isinstance(data_type, DateType):
return value.isoformat()
elif isinstance(data_type, DecimalType):
return float(value)
elif isinstance(data_type, (IntegerType, LongType)):
return int(value)
elif isinstance(data_type, BooleanType):
return bool(value)
elif isinstance(data_type, ArrayType):
return [enhanced_convert_value(v, data_type.elementType) for v in value]
elif isinstance(data_type, MapType):
return {str(k): enhanced_convert_value(v, data_type.valueType)
for k, v in value.items()}
return str(value)
We don't always need this code snippet, but this could be a possible solution.
Why is data volume a challenge?
- Problem: Large datasets may exceed memory limits during initial loads or high-velocity CDC.
- Solution:
- Data Slicing: Split data into manageable chunks.
Is an Initial Load needed?
- Problem: Syncing the initial state of a large Snowflake table to RDS.
- Solution:
- Create the stream with the appropriate parameter to produce Avro messages with schema validation.
- Other options are available; however, schema validation's data quality validation is a crucial point to consider.
Kafka Topics and Schema Assignment
Kafka topics are the backbone of the streaming pipeline. Each topic represents a distinct data stream, often tied to a table or entity.
Topic Headers:
- Include metadata (e.g., source table, schema version, operation type).
{
"source": "snowflake",
"table": "gg_table_test"
"operation": "insert",
"schema_version": "1.2"
}
One possible example
Schema Assignment:
- Producers fetch the latest schema from the Schema Registry.
- The schema ID is embedded in the message headers for consumers to validate.
Delivering Messages with Kafka Sink Connector
The Kafka JDBC Sink Connector writes Avro messages from Kafka topics to AWS RDS. It is well-known and documented. We will not go into detail about this, only pointing out the most critical or tricky parameters.
Configuration:
- Connector Properties:
{
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"batch.size": 5000,
"consumer.override.max.poll.records": 1000,
"topics": "gg_topic_test",
"connection.url": "jdbc:postgresql://xxxxxxx.eu-north-1.amazonaws.com:5432/poc",
"auto.create": "false",
"auto.evolve": "false",
"delete.enabled": "false",
"table.types": "TABLE",
"table.name.format": "gg_table_test",
"insert.mode": "upsert",
"pk.mode": "record_value",
"pk.fields": "ID",
"dialect.name": "PostgreSqlDatabaseDialect",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"errors.tolerance": "all",
"input.data.format": "AVRO",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "https://xxxxxxx.eu-north-1.aws.confluent.cloud",
"transforms": "TC1",
"transforms.TC1.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TC1.field": "CREATED_AT",
"transforms.TC1.target.type": "Timestamp",
"transforms.TC1.format":"yyyy-MM-dd HH:mm:ss.SSS",
"transforms.TC1.unix.precision": "milliseconds"
},
"name": "gg_poc_test"
}
Snippet of the configuration of the sink connector.
This configuration is an example with the most critical parameters. Its context assumes that the table RDS has already been created. If that is not the case, changing "auto.create" and "auto.evolve" to "TRUE" critical can automatically create the table.
Depending on the size of the messages, the thruput performance can be adjusted by tweaking the "batch.size" and "consumer.override.max.poll".
In this example, the sink connector processes one of many possible transformations.
- Schema Mapping:
- The connector maps Avro fields to, in this specific case, PostgreSQL columns automatically if schemas align.
- Custom mappings can handle complex types or defaults.
Conclusion
This architecture showcases a pipeline from Snowflake to AWS RDS, leveraging Kafka and Avro for real-time, schema-validated data integration. This solution brings essential innovations by removing the JDBC source connector for Snowflake and replacing it with a "native" producer, one less resource to configure, maintain, and pay for. More importantly, it replaces the pulling resource with a pushing resource. It also simplifies the Snowflake procedure/functions solution with a single UDP and ensures that the volume of data is not an issue automagically.
Although the context of using an analytic platform as source data can raise many "valid" questions, the steps covered here outline a sophisticated process that can be adapted for various data engineering needs. This provides a foundation for further innovation in real-time analytics and operational workloads.