Schema Evolution with Glue Schema Registry and Managed Streaming for Kafka (MSK) using LocalStack
Apache Kafka is an open-source distributed event store and stream-processing platform. It is used to capture data generated by producers and distribute it among its consumers. Kafka is known for its scalability, with reports of production environments scaling to trillions of messages per day. With Amazon Managed Streaming for Apache Kafka (MSK), AWS provides a service to provision Apache Kafka clusters easily.
LocalStack Pro supports Amazon Managed Streaming for Kafka (MSK), which enables you to spin up Kafka clusters on your local machine and test the integration of your applications with Amazon MSK.
Kafka clusters are often used as the central messaging infrastructure in complex microservice environments. However, the continuous and independent development of the individual microservices - the data producers and consumers - can make it hard to coordinate and evolve data schemas over time without introducing application failures due to incompatibilities. A common solution to this problem is to use a schema registry which provides for the validation of schema changes, preventing any unsafe changes and subsequent application failures.
AWS Glue Schema Registry can be used as such a schema registry, enabling you to validate and evolve streaming data using Apache Avro schemas. It can be easily integrated into Java applications for Apache Kafka with AWS’s official open-source serializers and deserializers.
The following chart shows the integration of producers and consumers with Amazon MSK and the AWS Glue Schema Registry:- Before sending a record, the producer validates that the schema it is using to serialize its records is valid.
We can configure the producer to register a new schema version if the schema is not yet registered.
- When registering the new schema version, the schema registry validates if the schema is compatible.
- If the registry detects an incompatibility, the registration is rejected. This ensures that a producer fails early and cannot publish incompatible records in the first place.
- Once the schema is valid, the producer serializes and compresses the record and sends it to the Kafka cluster.
- The consumer reads the serialized and compressed record.
- The consumer requests the schema from the schema registry (if it is not already cached) and uses the schema to decompress and deserialize the record.
AWS Glue Schema Registry is supported by LocalStack Pro as well, ultimately allowing you to test the evolution of your data streaming application completely on your local machine. It allows you develop and test your application’s data schema evolution locally. The code for this tutorial (including a script to execute it step-by-step) can be found in our LocalStack Pro samples over GitHub.
Prerequisites
For this tutorial you will need:
- LocalStack Pro to emulate Amazon MSK and AWS Glue Schema Registry locally
- Don’t worry, if you don’t have a subscription yet, you can just get a trial license for free.
- awslocal
- Java 11+
- Maven 3
Initial schema
At first, we will define our schema, set up our Java project, and generate the Java data classes using the schema. In our Apache Avro data schema we describe a request to ride a unicorn, including the necessary addresses, a fare, a duration, some preferences, and a customer record:
Project setup
Now we can set up our Java project with one module for the producer
and another module for the consumer
.
Both modules have their schema in the src/main/resources
folder.
.
├── consumer
│ ├── pom.xml
│ └── src
│ └── main
│ └── resources
│ └── avro
│ └── unicorn_ride_request_v1.avsc
├── producer
│ ├── pom.xml
│ └── src
│ └── main
│ └── resources
│ └── avro
│ └── unicorn_ride_request_v1.avsc
└── pom.xml
In our root pom, we configure the producer
and the consumer
module, some shared dependencies (most notably software.amazon.glue:schema-registry-serde
), and the avro-maven-plugin
to generate Java classes for the schema:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cloud.localstack.demos.gluemsk</groupId>
<artifactId>root-pom</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>pom</packaging>
<name>Glue MSK Demo</name>
<modules>
<module>producer</module>
<module>consumer</module>
</modules>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>software.amazon.glue</groupId>
<artifactId>schema-registry-serde</artifactId>
<version>1.1.10</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.36</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
<version>1.7.36</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>com.beust</groupId>
<artifactId>jcommander</artifactId>
<version>1.82</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.11.0</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
While the root pom is a bit lengthy, the pom.xml
files of the two modules are quite simple.
They only reference the root pom, activate the avro-maven-plugin
, and define a main class (we’ll go into detail on the actual Java code in the section below).
Here is what the producer’s pom.xml
looks like:
And similarly the consumer’s pom.xml
looks like:
Now the project is all set up and we can already generate our schema classes from the AVRO schema using the avro-maven-plugin
:
mvn clean generate-sources
After the maven plugin is done, we have all types generated for both the producer and the consumer:
.
├── consumer
│ └── src
│ └── main
│ ├── java
│ └── cloud
│ └── localstack
│ └── demos
│ └── gluemsk
│ └── schema
│ ├── Customer.java
│ ├── ModeOfPayment.java
│ ├── RecommendedUnicorn.java
│ ├── unicorn_color.java
│ ├── UnicornPreferredColor.java
│ └── UnicornRideRequest.java
└── producer
└── src
└── main
└── java
└── cloud
└── localstack
└── demos
└── gluemsk
└── schema
├── Customer.java
├── ModeOfPayment.java
├── RecommendedUnicorn.java
├── unicorn_color.java
├── UnicornPreferredColor.java
└── UnicornRideRequest.java
Since we want to log the sent and received messages, we need to configure the logging by adding a log4j.properties
to the src/main/resources
folder of the two modules:
log4j.rootLogger=DEBUG, CONSOLE
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=[%c{1}][%-5p] %m%n
Implementing a Producer and Consumer
Now, all the boilerplate is done:
- We have a proper project setup.
- The logging is properly configured.
- We generated our schema classes which we’ll use as a typesafe interface for the records.
The Producer
The next step is to implement our producer. The complete module can be found on our samples repository (along with the rest of the code of this tutorial).
We create a new class called Producer
in producer/src/main/java/cloud/localstack/demos/gluemsk/producer/
.
The Producer
contains a main
method which uses jcommander
to create a simple CLI interface:
package cloud.localstack.demos.gluemsk.producer;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Producer {
private final static Logger LOGGER = LoggerFactory.getLogger(org.apache.kafka.clients.producer.Producer.class.getName());
@Parameter(names = {"--help", "-h"}, help = true)
protected boolean help = false;
@Parameter(names = {"--bootstrap-servers", "-bs"}, description = "Kafka bootstrap servers endpoint to connect to.")
protected String bootstrapServers = "localhost:4511";
@Parameter(names = {"--aws-endpoint-servers", "-ae"}, description = "AWS endpoint to use.")
protected String awsEndpoint = "https://localhost.localstack.cloud:4566";
@Parameter(names = {"--region", "-reg"}, description = "AWS Region to use.")
protected String regionName = "us-east-1";
@Parameter(names = {"--topic-name", "-topic"}, description = "Kafka topic name where you send the data records. Default is unicorn-ride-request-topic.")
protected String topic = "unicorn-ride-request-topic";
@Parameter(names = {"--num-messages", "-nm"}, description = "Number of messages you want producer to send. Default is 100.")
protected String str_numOfMessages = "100";
public static void main(String[] args) {
Producer producer = new Producer();
JCommander jc = JCommander.newBuilder().addObject(producer).build();
jc.parse(args);
if (producer.help) {
jc.usage();
return;
}
producer.startProducer();
}
public void startProducer() {
// TODO:
// - Create the producer
// - Send the UnicornRideRequest records to the topic
}
}
Now we can add a method to configure our producer:
private Properties getProducerConfig() {
Properties props = new Properties();
// use the "--bootstrap-servers" argument to define the Kafka bootstrap address to connect to
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
props.put(ProducerConfig.ACKS_CONFIG, "-1");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "glue-msk-demo-producer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// use the GlueSchemaRegistryKafkaSerializer from software.amazon.glue:schema-registry-serde
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName());
// configure the GlueSchemaRegistryKafkaSerializer (data format, region, Glue registry and schema,...)
props.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.AVRO.name());
props.put(AWSSchemaRegistryConstants.AWS_REGION, regionName);
props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "unicorn-ride-request-registry");
props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "unicorn-ride-request-schema-avro");
props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.SPECIFIC_RECORD.getName());
// define the endpoint to use - by default we use LocalStack (localhost.localstack.cloud)
props.put(AWSSchemaRegistryConstants.AWS_ENDPOINT, this.awsEndpoint);
// enable compression of records
props.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, AWSSchemaRegistryConstants.COMPRESSION.ZLIB.name());
return props;
}
In addition, we create a method to generate new dummy UnicornRideRequests
and a Callback
class logging the producer metadata:
public UnicornRideRequest getRecord(int requestId) {
/*
Initialise UnicornRideRequest object of
class that is generated from AVRO Schema
*/
UnicornRideRequest rideRequest = UnicornRideRequest.newBuilder()
.setRequestId(requestId)
.setPickupAddress("Melbourne, Victoria, Australia")
.setDestinationAddress("Sydney, NSW, Aus")
.setRideFare(1200.50F)
.setRideDuration(120)
.setPreferredUnicornColor(UnicornPreferredColor.WHITE)
.setRecommendedUnicorn(RecommendedUnicorn.newBuilder()
.setUnicornId(requestId * 2)
.setColor(unicorn_color.WHITE)
.setStarsRating(5).build())
.setCustomer(Customer.newBuilder()
.setCustomerAccountNo(1001)
.setFirstName("Dummy")
.setLastName("User")
.setEmailAddresses(List.of("demo@example.com"))
.setCustomerAddress("Flinders Street Station")
.setModeOfPayment(ModeOfPayment.CARD)
.setCustomerRating(5).build()).build();
LOGGER.info(rideRequest.toString());
return rideRequest;
}
private static class ProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetaData, Exception e) {
if (e == null) {
LOGGER.info("Received new metadata. \t" +
"Topic:" + recordMetaData.topic() + "\t" +
"Partition: " + recordMetaData.partition() + "\t" +
"Offset: " + recordMetaData.offset() + "\t" +
"Timestamp: " + recordMetaData.timestamp());
} else {
LOGGER.info("There's been an error from the Producer side");
e.printStackTrace();
}
}
}
And finally, we can implement our startProducer
method which creates the producer using getProducerConfig
and sends records generated by getRecord
:
public void startProducer() {
try (KafkaProducer<String, UnicornRideRequest> producer = new KafkaProducer<>(getProducerConfig())) {
int numberOfMessages = Integer.parseInt(str_numOfMessages);
LOGGER.info("Starting to send records...");
for (int i = 0; i < numberOfMessages; i++) {
UnicornRideRequest rideRequest = getRecord(i);
String key = "key-" + i;
ProducerRecord<String, UnicornRideRequest> record = new ProducerRecord<>(topic, key, rideRequest);
producer.send(record, new ProducerCallback());
}
}
}
The Consumer
Now that we have a Producer
, we need a component which reads the data from the Kafka cluster.
The complete module can be found on our samples repository (along with the rest of the code of this tutorial).
We create a new class called Consumer
in consumer/src/main/java/cloud/localstack/demos/gluemsk/consumer/
.
Analogous to the Producer
, the Consumer
contains a main
method which uses jcommander
to create a simple CLI interface:
package cloud.localstack.demos.gluemsk.consumer;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Consumer {
private final static Logger LOGGER = LoggerFactory.getLogger(java.util.function.Consumer.class.getName());
@Parameter(names = {"--help", "-h"}, help = true)
protected boolean help = false;
@Parameter(names = {"--bootstrap-servers", "-bs"}, description = "kafka bootstrap servers endpoint")
protected String bootstrapServers = "localhost:4511";
@Parameter(names = {"--aws-endpoint-servers", "-ae"}, description = "AWS endpoint")
protected String awsEndpoint = "https://localhost.localstack.cloud:4566";
@Parameter(names = {"--region", "-reg"}, description = "AWS Region to use.")
protected String regionName = "us-east-1";
@Parameter(names = {"--topic-name", "-topic"}, description = "Kafka topic name where you send the data records. Default is unicorn-ride-request-topic")
protected String topic = "unicorn-ride-request-topic";
@Parameter(names = {"--num-messages", "-nm"}, description = "Number of messages you want consumer to wait for until it stops. Default is 100, use 0 if you want it to run indefinitely.")
protected String str_numOfMessages = "100";
public static void main(String[] args) {
Consumer consumer = new Consumer();
JCommander jc = JCommander.newBuilder().addObject(consumer).build();
jc.parse(args);
if (consumer.help) {
jc.usage();
return;
}
consumer.startConsumer();
}
public void startConsumer() {
// TODO:
// - Create the Kafka Consumer
// - Subscribe to the topic
// - Process the incoming records
}
}
Similar to the producer, we need to configure our consumer:
private Properties getConsumerConfig() {
Properties props = new Properties();
// use the "--bootstrap-servers" argument to define the Kafka bootstrap address to connect to
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "unicorn.riderequest.consumer");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// use the GlueSchemaRegistryKafkaDeserializer from software.amazon.glue:schema-registry-serde
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());
props.put(AWSSchemaRegistryConstants.AWS_REGION, this.regionName);
props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.SPECIFIC_RECORD.getName());
// define the endpoint to use - by default we use LocalStack (localhost.localstack.cloud)
props.put(AWSSchemaRegistryConstants.AWS_ENDPOINT, this.awsEndpoint);
return props;
}
And finally, we can implement our startConsumer
method which creates the consumer using getConsumerConfig
, reads the records from the topic, and logs them:
public void startConsumer() {
LOGGER.info("Starting consumer...");
try (KafkaConsumer<String, UnicornRideRequest> consumer = new KafkaConsumer<>(getConsumerConfig())) {
consumer.subscribe(Collections.singletonList(topic));
int outstandingMessages = Integer.parseInt(str_numOfMessages);
boolean runIndefinitely = outstandingMessages == 0;
while (outstandingMessages > 0 || runIndefinitely) {
// a real consumer would probably run in an endless loop waiting for new records here
final ConsumerRecords<String, UnicornRideRequest> records = consumer.poll(Duration.ofMillis(10));
for (final ConsumerRecord<String, UnicornRideRequest> record : records) {
final UnicornRideRequest rideRequest = record.value();
LOGGER.info(String.valueOf(rideRequest.getRequestId()));
LOGGER.info(rideRequest.toString());
outstandingMessages--;
}
}
}
LOGGER.info("Stopping consumer...");
}
Setting up the infrastructure
Now that the initial coding is done, we can give it a try. Let’s start LocalStack:
LOCALSTACK_AUTH_TOKEN=<your-auth-token> localstack start -d
Once LocalStack is started, we can create a new Kafka cluster using awslocal
:
$ awslocal kafka create-cluster \
--cluster-name "unicorn-ride-cluster" \
--kafka-version "2.2.1" \
--number-of-broker-nodes 1 \
--broker-node-group-info "{\"ClientSubnets\": [], \"InstanceType\":\"kafka.m5.xlarge\"}"
{
"ClusterArn": "arn:aws:kafka:us-east-1:000000000000:cluster/unicorn-ride-cluster/f9b16124-baf3-459b-8507-ec6c605b7a0a-25",
"ClusterName": "unicorn-ride-cluster",
"State": "CREATING"
}
The ClusterArn
is created dynamically and will be different for your run.
Make sure to use your ClusterArn
for the commands below.
It takes some time for the cluster to get up and running.
We can monitor the state with describe-cluster
:
$ awslocal kafka describe-cluster --cluster-arn "arn:aws:kafka:us-east-1:000000000000:cluster/unicorn-ride-cluster/f9b16124-baf3-459b-8507-ec6c605b7a0a-25"
{
"ClusterInfo": {
"BrokerNodeGroupInfo": {
"ClientSubnets": [],
"InstanceType": "kafka.m5.xlarge"
},
"ClusterArn": "arn:aws:kafka:us-east-1:000000000000:cluster/unicorn-ride-cluster/f9b16124-baf3-459b-8507-ec6c605b7a0a-25",
"ClusterName": "unicorn-ride-cluster",
"CreationTime": "2022-07-21T14:41:07.897000Z",
"CurrentBrokerSoftwareInfo": {
"KafkaVersion": "2.5.0"
},
"CurrentVersion": "KFABVNKTBGF6HX",
"NumberOfBrokerNodes": 1,
"State": "ACTIVE",
"ZookeeperConnectString": "localhost:4510"
}
}
Once the State
is ACTIV
, the cluster is ready to be used.
Now it’s time to create our Glue Schema Registry:
$ awslocal glue create-registry --registry-name unicorn-ride-request-registry
{
"RegistryArn": "arn:aws:glue:us-east-1:000000000000:file-registry/unicorn-ride-request-registry",
"RegistryName": "unicorn-ride-request-registry"
}
In the newly created registry, we can now add our initial UnicornRideRequest
schema:
$ awslocal glue create-schema \
--registry-id RegistryName="unicorn-ride-request-registry" \
--schema-name unicorn-ride-request-schema-avro \
--compatibility BACKWARD \
--data-format AVRO \
--schema-definition "file://producer/src/main/resources/avro/unicorn_ride_request_v1.avsc"
{
"RegistryName": "unicorn-ride-request-registry",
"RegistryArn": "arn:aws:glue:us-east-1:000000000000:file-registry/unicorn-ride-request-registry",
"SchemaName": "unicorn-ride-request-schema-avro",
"SchemaArn": "arn:aws:glue:us-east-1:000000000000:schema/unicorn-ride-request-registry/unicorn-ride-request-schema-avro",
"DataFormat": "AVRO",
"Compatibility": "BACKWARD",
"SchemaCheckpoint": 1,
"LatestSchemaVersion": 1,
"NextSchemaVersion": 2,
"SchemaStatus": "AVAILABLE",
"SchemaVersionId": "925c868c-ff56-4992-9f24-6df5933c15cc",
"SchemaVersionStatus": "AVAILABLE"
}
For the schema, we just defined the compatibility mode BACKWARD
.
This means that the consumers using the new schema can also read data produced with the last schema.
For example, this would allow the deletion of fields, or the introduction of new optional fields.
You can find a thorough description of the different compatibility modes in the AWS docs on Schema Versioning and Compatibility. The Glue Schema Registry will ensure that newly registered schemas fulfill the constraints defined by the compatibility mode.
Running the Producer and Consumer
Finally, everything is ready to start our Producer
and Consumer
.
First, we need to get the bootstrap server address from the Kafka cluster:
$ awslocal kafka get-bootstrap-brokers --cluster-arn "arn:aws:kafka:us-east-1:000000000000:cluster/unicorn-ride-cluster/f9b16124-baf3-459b-8507-ec6c605b7a0a-25"
{
"BootstrapBrokerString": "localhost:4511"
}
Like the ClusterArn
, the BootstrapBrokerString
is dynamic and can be different.
Please make sure to use your bootstrap server address for the runs below.
Now let’s start the Producer
.
By default, our producer will just send 100 records and shut down.
# Compile the Java packages
mvn clean install
# Run the producer
mvn -pl producer exec:java -Dexec.args="--bootstrap-servers localhost:4511"
Once the producer is running, we can observe the different steps of the producer as described in the intro:
Before sending a record, the producer validates that the schema, which it is using to serialize its records, is valid:
... [GlueSchemaRegistrySerializerFactory][DEBUG] Returning Avro serializer instance from GlueSchemaRegistrySerializerFactory [AWSSchemaRegistryClient][DEBUG] Getting Schema Version Id for : schemaDefinition = {"type":"record","name":"UnicornRideRequest","namespace":"cloud.localstack.demos.gluemsk.schema","fields":[{"name":"request_id","type":"int","doc":"customer request id"},{"name":"pickup_address","type":"string","doc":"customer pickup address"},{"name":"destination_address","type":"string","doc":"customer destination address"},{"name":"ride_fare","type":"float","doc":"ride fare amount (USD)"},{"name":"ride_duration","type":"int","doc":"ride duration in minutes"},{"name":"preferred_unicorn_color","type":{"type":"enum","name":"UnicornPreferredColor","symbols":["WHITE","BLACK","RED","BLUE","GREY"]},"default":"WHITE"},{"name":"recommended_unicorn","type":{"type":"record","name":"RecommendedUnicorn","fields":[{"name":"unicorn_id","type":"int","doc":"recommended unicorn id"},{"name":"color","type":{"type":"enum","name":"unicorn_color","symbols":["WHITE","RED","BLUE"]}},{"name":"stars_rating","type":["null","int"],"doc":"unicorn star ratings based on customers feedback","default":null}]}},{"name":"customer","type":{"type":"record","name":"Customer","fields":[{"name":"customer_account_no","type":"int","doc":"customer account number"},{"name":"first_name","type":"string"},{"name":"middle_name","type":["null","string"],"default":null},{"name":"last_name","type":"string"},{"name":"email_addresses","type":["null",{"type":"array","items":"string"}]},{"name":"customer_address","type":"string","doc":"customer address"},{"name":"mode_of_payment","type":{"type":"enum","name":"ModeOfPayment","symbols":["CARD","CASH"]},"default":"CARD"},{"name":"customer_rating","type":["null","int"],"default":null}]}}]}, schemaName = unicorn-ride-request-schema-avro, dataFormat = AVRO ...
Once the schema is known to be valid, the producer serializes the record, compresses it, and sends it to the Kafka cluster.
... [GlueSchemaRegistryKafkaSerializer][DEBUG] Schema Version Id received from the from schema registry: f95edc4b-778d-4f65-b23e-7de41c5b4e53 [GlueSchemaRegistrySerializerFactory][DEBUG] Returning Avro serializer instance from GlueSchemaRegistrySerializerFactory [GlueSchemaRegistryDefaultCompression][DEBUG] Compression :: record length: 0KB [GlueSchemaRegistryDefaultCompression][DEBUG] Compression :: record length after compression: 0KB [Producer][INFO ] {"request_id": 1, "pickup_address": "Melbourne, Victoria, Australia", "destination_address": "Sydney, NSW, Aus", "ride_fare": 1200.5, "ride_duration": 120, "preferred_unicorn_color": "WHITE", "recommended_unicorn": {"unicorn_id": 2, "color": "WHITE", "stars_rating": 5}, "customer": {"customer_account_no": 1001, "first_name": "Dummy", "middle_name": null, "last_name": "User", "email_addresses": ["demo@example.com"], "customer_address": "Flinders Street Station", "mode_of_payment": "CARD", "customer_rating": 5}} ...
Now, the records sent by the producer are managed by the Kafka cluster and are waiting for a consumer to pick them up. We can start the consumer with the same bootstrap server address as the producer:
mvn -pl consumer exec:java -Dexec.args="--bootstrap-servers localhost:4511"
In the logs of the consumer, we can now observe the steps of the consumer as described in the intro:
The serialized and compressed record is read by the consumers:
[NetworkClient][DEBUG] [Consumer clientId=consumer-unicorn.riderequest.consumer-1, groupId=unicorn.riderequest.consumer] Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=12, clientId=consumer-unicorn.riderequest.consumer-1, correlationId=10) and timeout 30000 to node 0: FetchRequestData(clusterId=null, replicaId=-1, maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=0, sessionEpoch=0, topics=[FetchTopic(topic='unicorn-ride-request-topic', partitions=[FetchPartition(partition=0, currentLeaderEpoch=0, fetchOffset=900, lastFetchedEpoch=-1, logStartOffset=-1, partitionMaxBytes=1048576)])], forgottenTopicsData=[], rackId='') [NetworkClient][DEBUG] [Consumer clientId=consumer-unicorn.riderequest.consumer-1, groupId=unicorn.riderequest.consumer] Received FETCH response from node 0 for request with header RequestHeader(apiKey=FETCH, apiVersion=12, clientId=consumer-unicorn.riderequest.consumer-1, correlationId=10): FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=2057133662, responses=[FetchableTopicResponse(topic='unicorn-ride-request-topic', partitionResponses=[FetchablePartitionResponse(partition=0, errorCode=0, highWatermark=1000, lastStableOffset=1000, logStartOffset=0, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=null, preferredReadReplica=-1, recordSet=MemoryRecords(size=17141, buffer=java.nio.HeapByteBuffer[pos=0 lim=17141 cap=17144]))])])
The consumer requests the schema from the schema registry, and uses the schema to decompress and deserialize the record.
[request][DEBUG] Sending Request: DefaultSdkHttpFullRequest(httpMethod=POST, protocol=https, host=localhost.localstack.cloud, port=4566, encodedPath=/, headers=[amz-sdk-invocation-id, Content-Length, Content-Type, User-Agent, X-Amz-Target], queryParameters=[]) ... [request][DEBUG] Received successful response: 200, Request ID: 1EDT4G1DBSDS0XCD2N7FM9L4SRROEEML1FPDOYSANII78CXDR4F2, Extended Request ID: not available [GlueSchemaRegistryDeserializerFactory][DEBUG] Returning Avro de-serializer instance from GlueSchemaRegistryDeserializerFactory [GlueSchemaRegistryDefaultCompression][DEBUG] Decompression :: Compressed record length: 0KB [GlueSchemaRegistryDefaultCompression][DEBUG] Decompression :: Decompressed record length: 0KB [AvroDeserializer][DEBUG] Length of actual message: 121 [DatumReaderInstance][DEBUG] Using SpecificDatumReader for de-serializing Avro message, schema: {"type":"record","name":"UnicornRideRequest","namespace":"cloud.localstack.demos.gluemsk.schema","fields":[{"name":"request_id","type":"int","doc":"customer request id"},{"name":"pickup_address","type":"string","doc":"customer pickup address"},{"name":"destination_address","type":"string","doc":"customer destination address"},{"name":"ride_fare","type":"float","doc":"ride fare amount (USD)"},{"name":"ride_duration","type":"int","doc":"ride duration in minutes"},{"name":"preferred_unicorn_color","type":{"type":"enum","name":"UnicornPreferredColor","symbols":["WHITE","BLACK","RED","BLUE","GREY"]},"default":"WHITE"},{"name":"recommended_unicorn","type":{"type":"record","name":"RecommendedUnicorn","fields":[{"name":"unicorn_id","type":"int","doc":"recommended unicorn id"},{"name":"color","type":{"type":"enum","name":"unicorn_color","symbols":["WHITE","RED","BLUE"]}},{"name":"stars_rating","type":["null","int"],"doc":"unicorn star ratings based on customers feedback","default":null}]}},{"name":"customer","type":{"type":"record","name":"Customer","fields":[{"name":"customer_account_no","type":"int","doc":"customer account number"},{"name":"first_name","type":"string"},{"name":"middle_name","type":["null","string"],"default":null},{"name":"last_name","type":"string"},{"name":"email_addresses","type":["null",{"type":"array","items":"string"}]},{"name":"customer_address","type":"string","doc":"customer address"},{"name":"mode_of_payment","type":{"type":"enum","name":"ModeOfPayment","symbols":["CARD","CASH"]},"default":"CARD"},{"name":"customer_rating","type":["null","int"],"default":null}]}}]}) [AvroDeserializer][DEBUG] Finished de-serializing Avro message [GlueSchemaRegistryDeserializerFactory][DEBUG] Returning Avro de-serializer instance from GlueSchemaRegistryDeserializerFactory [GlueSchemaRegistryDefaultCompression][DEBUG] Decompression :: Compressed record length: 0KB [GlueSchemaRegistryDefaultCompression][DEBUG] Decompression :: Decompressed record length: 0KB [AvroDeserializer][DEBUG] Length of actual message: 121 [AvroDeserializer][DEBUG] Finished de-serializing Avro message
Schema Evolution
In the course of this tutorial, we have implemented a Kafka producer and a consumer which integrate with the Glue Schema Registry. But the full potential of the Glue Schema Registry is unlocked when performing a schema evolution, i.e., when running producers and consumers with a new version of an already registered schema.
Therefore, we will run a few more interesting scenarios to illustrate the benefits of the Schema Registry:
- Running a producer which automatically registers a new, compatible schema version.
- Running a producer which is rejected when trying to register a new, incompatible schema version.
- Running an old consumer which is rejected since it is not compatible with the newly registered schema version.
- Running an updated consumer which can consume the new schema / records.
Producer registering a new schema version
In this step, we will create a new producer which uses a new, BACKWARD
compatible schema version.
The complete module can be found in our samples repository (along with the rest of the code of this tutorial).
The producer should register the new schema version automatically on its own.
We create the new producer by executing the following steps:
Copy the
producer
directory and rename it toproducer-2
.Set a new artifact ID in the
pom.xml
of the module:... <artifactId>producer-2</artifactId> ...
Add the new module to the root
pom.xml
:<modules> <module>producer</module> <module>producer-2</module> <module>consumer</module> </modules>
Create a new version of the schema:
Rename the schema to
unicorn_ride_request_v2.avsc
.In the schema, remove the previously required field
customer
:$ diff -u producer/src/main/resources/avro/unicorn_ride_request_v1.avsc producer-2/src/main/resources/avro/unicorn_ride_request_v2.avsc --- producer/src/main/resources/avro/unicorn_ride_request_v1.avsc 2022-05-13 08:27:08.219354922 +0200 +++ producer-2/src/main/resources/avro/unicorn_ride_request_v2.avsc 2022-05-13 08:27:08.219354922 +0200 @@ -20,23 +20,6 @@ {"name": "stars_rating", "type": ["null", "int"], "default": null, "doc": "unicorn star ratings based on customers feedback"} ] } - }, - { - "name": "customer", - "type": { - "type": "record", - "name": "Customer", - "fields": [ - {"name": "customer_account_no","type": "int", "doc": "customer account number"}, - {"name": "first_name","type": "string"}, - {"name": "middle_name","type": ["null","string"], "default": null}, - {"name": "last_name","type": "string"}, - {"name": "email_addresses","type": ["null", {"type":"array", "items":"string" }]}, - {"name": "customer_address","type": "string","doc": "customer address"}, - {"name": "mode_of_payment","type": {"type": "enum","name": "ModeOfPayment","symbols": ["CARD","CASH"]}, "default": "CARD"}, - {"name": "customer_rating", "type": ["null", "int"], "default": null} - ] - } } ] }
This change is
BACKWARD
compatible, because an updated consumer can read records for both - the current and the previous - records (new consumers don’t need the customer data, they don’t care if it’s present or not).
Re-generate the Java classes for the schema:
mvn clean generate-sources
Once the classes have been generatated, the producer code needs to be adjusted (remove the usage of
setCustomer
in the producer’sgetRecord
, since the method does not exist anymore).Configure the producer to automatically register its schema version in case it’s not yet registered by setting the additional property
AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING
totrue
:... props.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, AWSSchemaRegistryConstants.COMPRESSION.ZLIB.name()); // Automatically register the new schema version of this producer! props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true); return props; } ...
Now we can run the new producer with the same bootstrap server address as before:
mvn clean install
mvn -pl producer-2 exec:java -Dexec.args="--bootstrap-servers localhost:4511"
In the logs we can see that the producer registered a new schema version before successfully publishing the new records:
...
[AWSSchemaRegistryClient][INFO ] Registered the schema version with schema version id = 02922438-aa47-41e0-80e0-42238d07565f and with version number = 2 and status AVAILABLE
...
Producer trying to register an incompatible schema version
In our next scenario we will create a new producer which wants to register a schema which is not compatible to the schema in the registry. The producer will be rejected right when trying to register the new schema version, before even sending a record.
The complete module can be found in our samples repository (along with the rest of the code of this tutorial).
Similar to the previous scenario, we create a new producer by executing the following steps:
Copy the
producer-2
directory and rename it toproducer-3
.Set a new artifact ID in the
pom.xml
of the module:... <artifactId>producer-3</artifactId> ...
Add the new module to the root
pom.xml
:<modules> <module>producer</module> <module>producer-2</module> <module>producer-3</module> <module>consumer</module> </modules>
Create a new version of the schema:
Rename the schema to
unicorn_ride_request_v3.avsc
.In the schema, add a new required field
unicorn_food
:$ diff -u producer-2/src/main/resources/avro/unicorn_ride_request_v2.avsc producer-3/src/main/resources/avro/unicorn_ride_request_v3.avsc --- producer-2/src/main/resources/avro/unicorn_ride_request_v2.avsc 2022-05-13 08:27:08.219354922 +0200 +++ producer-3/src/main/resources/avro/unicorn_ride_request_v3.avsc 2022-05-13 08:27:08.219354922 +0200 @@ -20,6 +20,17 @@ {"name": "stars_rating", "type": ["null", "int"], "default": null, "doc": "unicorn star ratings based on customers feedback"} ] } + }, + { + "name": "unicorn_food", + "type": { + "type": "record", + "name": "Food", + "fields": [ + {"name": "price","type": "float","doc": "price per pound of food (USD)"}, + {"name": "name","type": "string"} + ] + } } ] }
This change is not
BACKWARD
compatible, because an updated consumer cannot read records for both - the current and the previous - records (new consumers would expect theunicorn_food
, which is not present in old records).
Re-generate the Java classes for the schema:
mvn clean generate-sources
Once the classes have been generated, the producer code needs to be adjusted (set the new required
unicorn_food
in the producer’sgetRecord
):... .setStarsRating(5).build()) // we removed the (previously required) customer data here in the new version of the producer (v1) // and added a new field without a default (which isn't backward compatible) .setUnicornFood(Food.newBuilder().setPrice(133.7f).setName("Rainbow").build()) .build(); ...
We can run the new producer with the same bootstrap server address as before:
mvn clean install
mvn -pl producer-3 exec:java -Dexec.args="--bootstrap-servers localhost:4511"
In the logs we can see that the producer fails when trying to register its new, but incompatible, version of the schema:
...
[AWSSchemaRegistryClient][INFO ] Registered the schema version with schema version id = 8f625284-07b4-442e-83ae-395cd7853746 and with version number = 3 and status FAILURE
...
[WARNING]
com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: Register schema :: Call failed when registering the schema with the schema registry for schema name = unicorn-ride-request-schema-avro
at com.amazonaws.services.schemaregistry.common.AWSSchemaRegistryClient.registerSchemaVersion (AWSSchemaRegistryClient.java:310)
...
Outdated consumers
We’ve seen how the producer’s schema evolution works in the previous scenarios. Now, we’ll take a closer look at our consumer.
In our first schema evolution scenario, the producer registered a new version of the schema and afterwards published records with that schema.
The BACKWARD
schema compatibility guarantees that updated consumers can read older records, i.e., the consumers need to be updated before the producers.
Therefore, our old consumer will fail in consuming these records, because it is not compatible with the new schema registered by the new producer yet:
mvn -pl consumer exec:java -Dexec.args="--bootstrap-servers localhost:4511"
In the logs, we can see that the consumer fails because the AVRO schema used by the consumer is expecting a required field (customer
):
[request][DEBUG] Sending Request: DefaultSdkHttpFullRequest(httpMethod=POST, protocol=https, host=localhost.localstack.cloud, port=4566, encodedPath=/, headers=[amz-sdk-invocation-id, Content-Length, Content-Type, User-Agent, X-Amz-Target], queryParameters=[])
...
[DatumReaderInstance][DEBUG] Using SpecificDatumReader for de-serializing Avro message, schema: {"type":"record","name":"UnicornRideRequest","namespace":"cloud.localstack.demos.gluemsk.schema","fields":[{"name":"request_id","type":"int","doc":"customer request id"},{"name":"pickup_address","type":"string","doc":"customer pickup address"},{"name":"destination_address","type":"string","doc":"customer destination address"},{"name":"ride_fare","type":"float","doc":"ride fare amount (USD)"},{"name":"ride_duration","type":"int","doc":"ride duration in minutes"},{"name":"preferred_unicorn_color","type":{"type":"enum","name":"UnicornPreferredColor","symbols":["WHITE","BLACK","RED","BLUE","GREY"]},"default":"WHITE"},{"name":"recommended_unicorn","type":{"type":"record","name":"RecommendedUnicorn","fields":[{"name":"unicorn_id","type":"int","doc":"recommended unicorn id"},{"name":"color","type":{"type":"enum","name":"unicorn_color","symbols":["WHITE","RED","BLUE"]}},{"name":"stars_rating","type":["null","int"],"doc":"unicorn star ratings based on customers feedback","default":null}]}},{"name":"customer","type":{"type":"record","name":"Customer","fields":[{"name":"customer_account_no","type":"int","doc":"customer account number"},{"name":"first_name","type":"string"},{"name":"middle_name","type":["null","string"],"default":null},{"name":"last_name","type":"string"},{"name":"email_addresses","type":["null",{"type":"array","items":"string"}]},{"name":"customer_address","type":"string","doc":"customer address"},{"name":"mode_of_payment","type":{"type":"enum","name":"ModeOfPayment","symbols":["CARD","CASH"]},"default":"CARD"},{"name":"customer_rating","type":["null","int"],"default":null}]}}]})
...
[WARNING]
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition unicorn-ride-request-topic-0 at offset 100. If needed, please seek past the record to continue consumption.
Caused by: com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: Exception occurred while de-serializing Avro message
at com.amazonaws.services.schemaregistry.deserializers.avro.AvroDeserializer.deserialize (AvroDeserializer.java:103)
...
Caused by: org.apache.avro.AvroTypeException: Found cloud.localstack.demos.gluemsk.schema.UnicornRideRequest, expecting cloud.localstack.demos.gluemsk.schema.UnicornRideRequest, missing required field customer
at org.apache.avro.io.ResolvingDecoder.doAction (ResolvingDecoder.java:308)
...
Updated consumers
Finally, we will update our consumer such that it is compatible to the new version of the schema. The complete module can be found in our samples repository (along with the rest of the code of this tutorial).
Similar to the previous producer scenarios, we create a new consumer by executing the following steps:
Copy the
consumer
directory and rename it toconsumer-2
.Set a new artifact ID in the
pom.xml
of the module:... <artifactId>consumer-2</artifactId> ...
Add the new module to the root
pom.xml
:<modules> <module>producer</module> <module>producer-2</module> <module>producer-3</module> <module>consumer</module> <module>consumer-2</module> </modules>
Replace the
unicorn_ride_request_v1.avsc
with the new version used byproducer-2
(unicorn_ride_request_v2.avsc
).Re-generate the Java classes for the schema:
mvn clean generate-sources
Our new consumer, based on the latest version of the schema, will be able to successfully consume the records published by the new producer:
mvn -pl consumer-2 exec:java -Dexec.args="--bootstrap-servers localhost:4511"
Conclusion
Apache Kafka is used as the core messaging system in complex environments, with independent producers and consumers. The individual development of these components make it hard to coordinate and evolve data schemas over time. Using the AWS Glue Schema Registry can help you to prevent the usage of incompatible schemas.
With LocalStack, emulating Amazon Managed Streaming for Kafka and AWS Glue Schema Registry, you can develop and test the next evolution of your data schema locally on your own machine.