Schema Evolution with Glue Schema Registry and Managed Streaming for Kafka (MSK) using LocalStack

Find incompatibilities early or even avoid them altogether when developing Kafka producers or consumers! Learn how to test data schema evolution by using Managed Streaming for Kafka (MSK) with the Glue Schema Registry in LocalStack.

Apache Kafka is an open-source distributed event store and stream-processing platform. It is used to capture data generated by producers and distribute it among its consumers. Kafka is known for its scalability, with reports of production environments scaling to trillions of messages per day. With Amazon Managed Streaming for Apache Kafka (MSK), AWS provides a service to provision Apache Kafka clusters easily.

LocalStack Pro supports Amazon Managed Streaming for Kafka (MSK), which enables you to spin up Kafka clusters on your local machine and test the integration of your applications with Amazon MSK.

Kafka clusters are often used as the central messaging infrastructure in complex microservice environments. However, the continuous and independent development of the individual microservices - the data producers and consumers - can make it hard to coordinate and evolve data schemas over time without introducing application failures due to incompatibilities. A common solution to this problem is to use a schema registry which provides for the validation of schema changes, preventing any unsafe changes and subsequent application failures.

AWS Glue Schema Registry can be used as such a schema registry, enabling you to validate and evolve streaming data using Apache Avro schemas. It can be easily integrated into Java applications for Apache Kafka with AWS’s official open-source serializers and deserializers.

The following chart shows the integration of producers and consumers with Amazon MSK and the AWS Glue Schema Registry:



Workflow: Glue Schema Registry with MSK


  1. Before sending a record, the producer validates that the schema it is using to serialize its records is valid. We can configure the producer to register a new schema version if the schema is not yet registered.
    • When registering the new schema version, the schema registry validates if the schema is compatible.
    • If the registry detects an incompatibility, the registration is rejected. This ensures that a producer fails early and cannot publish incompatible records in the first place.
  2. Once the schema is valid, the producer serializes and compresses the record and sends it to the Kafka cluster.
  3. The consumer reads the serialized and compressed record.
  4. The consumer requests the schema from the schema registry (if it is not already cached) and uses the schema to decompress and deserialize the record.

AWS Glue Schema Registry is supported by LocalStack Pro as well, ultimately allowing you to test the evolution of your data streaming application completely on your local machine. It allows you develop and test your application’s data schema evolution locally. The code for this tutorial (including a script to execute it step-by-step) can be found in our LocalStack Pro samples over GitHub.

Prerequisites

For this tutorial you will need:

  • LocalStack Pro to emulate Amazon MSK and AWS Glue Schema Registry locally
    • Don’t worry, if you don’t have a subscription yet, you can just get a trial license for free.
  • awslocal
  • Java 11+
  • Maven 3

Initial schema

At first, we will define our schema, set up our Java project, and generate the Java data classes using the schema. In our Apache Avro data schema we describe a request to ride a unicorn, including the necessary addresses, a fare, a duration, some preferences, and a customer record:


Project setup

Now we can set up our Java project with one module for the producer and another module for the consumer. Both modules have their schema in the src/main/resources folder.

.
├── consumer
│   ├── pom.xml
│   └── src
│       └── main
│           └── resources
│               └── avro
│                   └── unicorn_ride_request_v1.avsc
├── producer
│   ├── pom.xml
│   └── src
│       └── main
│           └── resources
│               └── avro
│                   └── unicorn_ride_request_v1.avsc
└── pom.xml

In our root pom, we configure the producer and the consumer module, some shared dependencies (most notably software.amazon.glue:schema-registry-serde), and the avro-maven-plugin to generate Java classes for the schema:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cloud.localstack.demos.gluemsk</groupId>
    <artifactId>root-pom</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>pom</packaging>
    <name>Glue MSK Demo</name>

    <modules>
        <module>producer</module>
        <module>consumer</module>
    </modules>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>software.amazon.glue</groupId>
            <artifactId>schema-registry-serde</artifactId>
            <version>1.1.10</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.36</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-reload4j</artifactId>
            <version>1.7.36</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.11.0</version>
        </dependency>
        <dependency>
            <groupId>com.beust</groupId>
            <artifactId>jcommander</artifactId>
            <version>1.82</version>
        </dependency>
    </dependencies>

    <build>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>org.apache.avro</groupId>
                    <artifactId>avro-maven-plugin</artifactId>
                    <version>1.11.0</version>
                    <executions>
                        <execution>
                            <phase>generate-sources</phase>
                            <goals>
                                <goal>schema</goal>
                            </goals>
                            <configuration>
                                <sourceDirectory>${project.basedir}/src/main/resources/avro/</sourceDirectory>
                                <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>
</project>

While the root pom is a bit lengthy, the pom.xml files of the two modules are quite simple. They only reference the root pom, activate the avro-maven-plugin, and define a main class (we’ll go into detail on the actual Java code in the section below).

Here is what the producer’s pom.xml looks like:


And similarly the consumer’s pom.xml looks like:


Now the project is all set up and we can already generate our schema classes from the AVRO schema using the avro-maven-plugin:

mvn clean generate-sources

After the maven plugin is done, we have all types generated for both the producer and the consumer:

.
├── consumer
│   └── src
│       └── main
│           ├── java
│               └── cloud
│                   └── localstack
│                       └── demos
│                           └── gluemsk
│                               └── schema
│                                   ├── Customer.java
│                                   ├── ModeOfPayment.java
│                                   ├── RecommendedUnicorn.java
│                                   ├── unicorn_color.java
│                                   ├── UnicornPreferredColor.java
│                                   └── UnicornRideRequest.java
└── producer
    └── src
        └── main
            └── java
                └── cloud
                    └── localstack
                        └── demos
                            └── gluemsk
                                └── schema
                                    ├── Customer.java
                                    ├── ModeOfPayment.java
                                    ├── RecommendedUnicorn.java
                                    ├── unicorn_color.java
                                    ├── UnicornPreferredColor.java
                                    └── UnicornRideRequest.java

Since we want to log the sent and received messages, we need to configure the logging by adding a log4j.properties to the src/main/resources folder of the two modules:

log4j.rootLogger=DEBUG, CONSOLE
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=[%c{1}][%-5p] %m%n

Implementing a Producer and Consumer

Now, all the boilerplate is done:

  • We have a proper project setup.
  • The logging is properly configured.
  • We generated our schema classes which we’ll use as a typesafe interface for the records.

The Producer

The next step is to implement our producer. The complete module can be found on our samples repository (along with the rest of the code of this tutorial).

We create a new class called Producer in producer/src/main/java/cloud/localstack/demos/gluemsk/producer/. The Producer contains a main method which uses jcommander to create a simple CLI interface:

package cloud.localstack.demos.gluemsk.producer;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Producer {
    private final static Logger LOGGER = LoggerFactory.getLogger(org.apache.kafka.clients.producer.Producer.class.getName());

    @Parameter(names = {"--help", "-h"}, help = true)
    protected boolean help = false;
    @Parameter(names = {"--bootstrap-servers", "-bs"}, description = "Kafka bootstrap servers endpoint to connect to.")
    protected String bootstrapServers = "localhost:4511";
    @Parameter(names = {"--aws-endpoint-servers", "-ae"}, description = "AWS endpoint to use.")
    protected String awsEndpoint = "https://localhost.localstack.cloud:4566";
    @Parameter(names = {"--region", "-reg"}, description = "AWS Region to use.")
    protected String regionName = "us-east-1";
    @Parameter(names = {"--topic-name", "-topic"}, description = "Kafka topic name where you send the data records. Default is unicorn-ride-request-topic.")
    protected String topic = "unicorn-ride-request-topic";
    @Parameter(names = {"--num-messages", "-nm"}, description = "Number of messages you want producer to send. Default is 100.")
    protected String str_numOfMessages = "100";


    public static void main(String[] args) {
        Producer producer = new Producer();
        JCommander jc = JCommander.newBuilder().addObject(producer).build();
        jc.parse(args);
        if (producer.help) {
            jc.usage();
            return;
        }
        producer.startProducer();
    }

    public void startProducer() {
        // TODO:
        // - Create the producer
        // - Send the UnicornRideRequest records to the topic
    }
}

Now we can add a method to configure our producer:

private Properties getProducerConfig() {
    Properties props = new Properties();
    // use the "--bootstrap-servers" argument to define the Kafka bootstrap address to connect to
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
    props.put(ProducerConfig.ACKS_CONFIG, "-1");
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "glue-msk-demo-producer");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    // use the GlueSchemaRegistryKafkaSerializer from software.amazon.glue:schema-registry-serde
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName());
    // configure the GlueSchemaRegistryKafkaSerializer (data format, region, Glue registry and schema,...)
    props.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.AVRO.name());
    props.put(AWSSchemaRegistryConstants.AWS_REGION, regionName);
    props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "unicorn-ride-request-registry");
    props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "unicorn-ride-request-schema-avro");
    props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.SPECIFIC_RECORD.getName());
    // define the endpoint to use - by default we use LocalStack (localhost.localstack.cloud)
    props.put(AWSSchemaRegistryConstants.AWS_ENDPOINT, this.awsEndpoint);
    // enable compression of records
    props.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, AWSSchemaRegistryConstants.COMPRESSION.ZLIB.name());
    return props;
}

In addition, we create a method to generate new dummy UnicornRideRequests and a Callback class logging the producer metadata:

public UnicornRideRequest getRecord(int requestId) {
    /*
      Initialise UnicornRideRequest object of
      class that is generated from AVRO Schema
    */
    UnicornRideRequest rideRequest = UnicornRideRequest.newBuilder()
            .setRequestId(requestId)
            .setPickupAddress("Melbourne, Victoria, Australia")
            .setDestinationAddress("Sydney, NSW, Aus")
            .setRideFare(1200.50F)
            .setRideDuration(120)
            .setPreferredUnicornColor(UnicornPreferredColor.WHITE)
            .setRecommendedUnicorn(RecommendedUnicorn.newBuilder()
                    .setUnicornId(requestId * 2)
                    .setColor(unicorn_color.WHITE)
                    .setStarsRating(5).build())
            .setCustomer(Customer.newBuilder()
                    .setCustomerAccountNo(1001)
                    .setFirstName("Dummy")
                    .setLastName("User")
                    .setEmailAddresses(List.of("demo@example.com"))
                    .setCustomerAddress("Flinders Street Station")
                    .setModeOfPayment(ModeOfPayment.CARD)
                    .setCustomerRating(5).build()).build();
    LOGGER.info(rideRequest.toString());
    return rideRequest;
}

private static class ProducerCallback implements Callback {
    @Override
    public void onCompletion(RecordMetadata recordMetaData, Exception e) {
        if (e == null) {
            LOGGER.info("Received new metadata. \t" +
                    "Topic:" + recordMetaData.topic() + "\t" +
                    "Partition: " + recordMetaData.partition() + "\t" +
                    "Offset: " + recordMetaData.offset() + "\t" +
                    "Timestamp: " + recordMetaData.timestamp());
        } else {
            LOGGER.info("There's been an error from the Producer side");
            e.printStackTrace();
        }
    }
}

And finally, we can implement our startProducer method which creates the producer using getProducerConfig and sends records generated by getRecord:

public void startProducer() {
    try (KafkaProducer<String, UnicornRideRequest> producer = new KafkaProducer<>(getProducerConfig())) {
        int numberOfMessages = Integer.parseInt(str_numOfMessages);
        LOGGER.info("Starting to send records...");
        for (int i = 0; i < numberOfMessages; i++) {
            UnicornRideRequest rideRequest = getRecord(i);
            String key = "key-" + i;
            ProducerRecord<String, UnicornRideRequest> record = new ProducerRecord<>(topic, key, rideRequest);
            producer.send(record, new ProducerCallback());
        }
    }
}

The Consumer

Now that we have a Producer, we need a component which reads the data from the Kafka cluster.

The complete module can be found on our samples repository (along with the rest of the code of this tutorial).

We create a new class called Consumer in consumer/src/main/java/cloud/localstack/demos/gluemsk/consumer/.

Analogous to the Producer, the Consumer contains a main method which uses jcommander to create a simple CLI interface:

package cloud.localstack.demos.gluemsk.consumer;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Consumer {
    private final static Logger LOGGER = LoggerFactory.getLogger(java.util.function.Consumer.class.getName());

    @Parameter(names = {"--help", "-h"}, help = true)
    protected boolean help = false;
    @Parameter(names = {"--bootstrap-servers", "-bs"}, description = "kafka bootstrap servers endpoint")
    protected String bootstrapServers = "localhost:4511";
    @Parameter(names = {"--aws-endpoint-servers", "-ae"}, description = "AWS endpoint")
    protected String awsEndpoint = "https://localhost.localstack.cloud:4566";
    @Parameter(names = {"--region", "-reg"}, description = "AWS Region to use.")
    protected String regionName = "us-east-1";
    @Parameter(names = {"--topic-name", "-topic"}, description = "Kafka topic name where you send the data records. Default is unicorn-ride-request-topic")
    protected String topic = "unicorn-ride-request-topic";
    @Parameter(names = {"--num-messages", "-nm"}, description = "Number of messages you want consumer to wait for until it stops. Default is 100, use 0 if you want it to run indefinitely.")
    protected String str_numOfMessages = "100";


    public static void main(String[] args) {
        Consumer consumer = new Consumer();
        JCommander jc = JCommander.newBuilder().addObject(consumer).build();
        jc.parse(args);
        if (consumer.help) {
            jc.usage();
            return;
        }
        consumer.startConsumer();
    }

    public void startConsumer() {
        // TODO:
        // - Create the Kafka Consumer
        // - Subscribe to the topic
        // - Process the incoming records
    }
}

Similar to the producer, we need to configure our consumer:

private Properties getConsumerConfig() {
    Properties props = new Properties();
    // use the "--bootstrap-servers" argument to define the Kafka bootstrap address to connect to
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "unicorn.riderequest.consumer");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    // use the GlueSchemaRegistryKafkaDeserializer from software.amazon.glue:schema-registry-serde
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());
    props.put(AWSSchemaRegistryConstants.AWS_REGION, this.regionName);
    props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.SPECIFIC_RECORD.getName());
    // define the endpoint to use - by default we use LocalStack (localhost.localstack.cloud)
    props.put(AWSSchemaRegistryConstants.AWS_ENDPOINT, this.awsEndpoint);
    return props;
}

And finally, we can implement our startConsumer method which creates the consumer using getConsumerConfig, reads the records from the topic, and logs them:

public void startConsumer() {
    LOGGER.info("Starting consumer...");
    try (KafkaConsumer<String, UnicornRideRequest> consumer = new KafkaConsumer<>(getConsumerConfig())) {
        consumer.subscribe(Collections.singletonList(topic));
        int outstandingMessages = Integer.parseInt(str_numOfMessages);
        boolean runIndefinitely = outstandingMessages == 0;
        while (outstandingMessages > 0 || runIndefinitely) {
            // a real consumer would probably run in an endless loop waiting for new records here
            final ConsumerRecords<String, UnicornRideRequest> records = consumer.poll(Duration.ofMillis(10));
            for (final ConsumerRecord<String, UnicornRideRequest> record : records) {
                final UnicornRideRequest rideRequest = record.value();
                LOGGER.info(String.valueOf(rideRequest.getRequestId()));
                LOGGER.info(rideRequest.toString());
                outstandingMessages--;
            }
        }
    }
    LOGGER.info("Stopping consumer...");
}

Setting up the infrastructure

Now that the initial coding is done, we can give it a try. Let’s start LocalStack:

LOCALSTACK_AUTH_TOKEN=<your-auth-token> localstack start -d

Once LocalStack is started, we can create a new Kafka cluster using awslocal:

$ awslocal kafka create-cluster \
  --cluster-name "unicorn-ride-cluster" \
  --kafka-version "2.2.1" \
  --number-of-broker-nodes 1 \
  --broker-node-group-info "{\"ClientSubnets\": [], \"InstanceType\":\"kafka.m5.xlarge\"}"
{
    "ClusterArn": "arn:aws:kafka:us-east-1:000000000000:cluster/unicorn-ride-cluster/f9b16124-baf3-459b-8507-ec6c605b7a0a-25",
    "ClusterName": "unicorn-ride-cluster",
    "State": "CREATING"
}

The ClusterArn is created dynamically and will be different for your run. Make sure to use your ClusterArn for the commands below.

It takes some time for the cluster to get up and running. We can monitor the state with describe-cluster:

$ awslocal kafka describe-cluster --cluster-arn "arn:aws:kafka:us-east-1:000000000000:cluster/unicorn-ride-cluster/f9b16124-baf3-459b-8507-ec6c605b7a0a-25"
{
    "ClusterInfo": {
        "BrokerNodeGroupInfo": {
            "ClientSubnets": [],
            "InstanceType": "kafka.m5.xlarge"
        },
        "ClusterArn": "arn:aws:kafka:us-east-1:000000000000:cluster/unicorn-ride-cluster/f9b16124-baf3-459b-8507-ec6c605b7a0a-25",
        "ClusterName": "unicorn-ride-cluster",
        "CreationTime": "2022-07-21T14:41:07.897000Z",
        "CurrentBrokerSoftwareInfo": {
            "KafkaVersion": "2.5.0"
        },
        "CurrentVersion": "KFABVNKTBGF6HX",
        "NumberOfBrokerNodes": 1,
        "State": "ACTIVE",
        "ZookeeperConnectString": "localhost:4510"
    }
}

Once the State is ACTIV, the cluster is ready to be used. Now it’s time to create our Glue Schema Registry:

$ awslocal glue create-registry --registry-name unicorn-ride-request-registry
{
    "RegistryArn": "arn:aws:glue:us-east-1:000000000000:file-registry/unicorn-ride-request-registry",
    "RegistryName": "unicorn-ride-request-registry"
}

In the newly created registry, we can now add our initial UnicornRideRequest schema:

$ awslocal glue create-schema \
  --registry-id RegistryName="unicorn-ride-request-registry" \
  --schema-name unicorn-ride-request-schema-avro \
  --compatibility BACKWARD \
  --data-format AVRO \
  --schema-definition "file://producer/src/main/resources/avro/unicorn_ride_request_v1.avsc"
{
    "RegistryName": "unicorn-ride-request-registry",
    "RegistryArn": "arn:aws:glue:us-east-1:000000000000:file-registry/unicorn-ride-request-registry",
    "SchemaName": "unicorn-ride-request-schema-avro",
    "SchemaArn": "arn:aws:glue:us-east-1:000000000000:schema/unicorn-ride-request-registry/unicorn-ride-request-schema-avro",
    "DataFormat": "AVRO",
    "Compatibility": "BACKWARD",
    "SchemaCheckpoint": 1,
    "LatestSchemaVersion": 1,
    "NextSchemaVersion": 2,
    "SchemaStatus": "AVAILABLE",
    "SchemaVersionId": "925c868c-ff56-4992-9f24-6df5933c15cc",
    "SchemaVersionStatus": "AVAILABLE"
}

For the schema, we just defined the compatibility mode BACKWARD. This means that the consumers using the new schema can also read data produced with the last schema. For example, this would allow the deletion of fields, or the introduction of new optional fields.

You can find a thorough description of the different compatibility modes in the AWS docs on Schema Versioning and Compatibility. The Glue Schema Registry will ensure that newly registered schemas fulfill the constraints defined by the compatibility mode.

Running the Producer and Consumer

Finally, everything is ready to start our Producer and Consumer. First, we need to get the bootstrap server address from the Kafka cluster:

$ awslocal kafka get-bootstrap-brokers --cluster-arn  "arn:aws:kafka:us-east-1:000000000000:cluster/unicorn-ride-cluster/f9b16124-baf3-459b-8507-ec6c605b7a0a-25"
{
    "BootstrapBrokerString": "localhost:4511"
}

Like the ClusterArn, the BootstrapBrokerString is dynamic and can be different. Please make sure to use your bootstrap server address for the runs below.

Now let’s start the Producer. By default, our producer will just send 100 records and shut down.

# Compile the Java packages
mvn clean install
# Run the producer
mvn -pl producer exec:java -Dexec.args="--bootstrap-servers localhost:4511"

Once the producer is running, we can observe the different steps of the producer as described in the intro:

  1. Before sending a record, the producer validates that the schema, which it is using to serialize its records, is valid:

    ...
    [GlueSchemaRegistrySerializerFactory][DEBUG] Returning Avro serializer instance from GlueSchemaRegistrySerializerFactory
    [AWSSchemaRegistryClient][DEBUG] Getting Schema Version Id for : schemaDefinition = {"type":"record","name":"UnicornRideRequest","namespace":"cloud.localstack.demos.gluemsk.schema","fields":[{"name":"request_id","type":"int","doc":"customer request id"},{"name":"pickup_address","type":"string","doc":"customer pickup address"},{"name":"destination_address","type":"string","doc":"customer destination address"},{"name":"ride_fare","type":"float","doc":"ride fare amount (USD)"},{"name":"ride_duration","type":"int","doc":"ride duration in minutes"},{"name":"preferred_unicorn_color","type":{"type":"enum","name":"UnicornPreferredColor","symbols":["WHITE","BLACK","RED","BLUE","GREY"]},"default":"WHITE"},{"name":"recommended_unicorn","type":{"type":"record","name":"RecommendedUnicorn","fields":[{"name":"unicorn_id","type":"int","doc":"recommended unicorn id"},{"name":"color","type":{"type":"enum","name":"unicorn_color","symbols":["WHITE","RED","BLUE"]}},{"name":"stars_rating","type":["null","int"],"doc":"unicorn star ratings based on customers feedback","default":null}]}},{"name":"customer","type":{"type":"record","name":"Customer","fields":[{"name":"customer_account_no","type":"int","doc":"customer account number"},{"name":"first_name","type":"string"},{"name":"middle_name","type":["null","string"],"default":null},{"name":"last_name","type":"string"},{"name":"email_addresses","type":["null",{"type":"array","items":"string"}]},{"name":"customer_address","type":"string","doc":"customer address"},{"name":"mode_of_payment","type":{"type":"enum","name":"ModeOfPayment","symbols":["CARD","CASH"]},"default":"CARD"},{"name":"customer_rating","type":["null","int"],"default":null}]}}]}, schemaName = unicorn-ride-request-schema-avro, dataFormat = AVRO
    ...
    
  2. Once the schema is known to be valid, the producer serializes the record, compresses it, and sends it to the Kafka cluster.

    ...
    [GlueSchemaRegistryKafkaSerializer][DEBUG] Schema Version Id received from the from schema registry: f95edc4b-778d-4f65-b23e-7de41c5b4e53
    [GlueSchemaRegistrySerializerFactory][DEBUG] Returning Avro serializer instance from GlueSchemaRegistrySerializerFactory
    [GlueSchemaRegistryDefaultCompression][DEBUG] Compression :: record length: 0KB
    [GlueSchemaRegistryDefaultCompression][DEBUG] Compression :: record length after compression: 0KB
    [Producer][INFO ] {"request_id": 1, "pickup_address": "Melbourne, Victoria, Australia", "destination_address": "Sydney, NSW, Aus", "ride_fare": 1200.5, "ride_duration": 120, "preferred_unicorn_color": "WHITE", "recommended_unicorn": {"unicorn_id": 2, "color": "WHITE", "stars_rating": 5}, "customer": {"customer_account_no": 1001, "first_name": "Dummy", "middle_name": null, "last_name": "User", "email_addresses": ["demo@example.com"], "customer_address": "Flinders Street Station", "mode_of_payment": "CARD", "customer_rating": 5}}
    ...
    

    Now, the records sent by the producer are managed by the Kafka cluster and are waiting for a consumer to pick them up. We can start the consumer with the same bootstrap server address as the producer:

    mvn -pl consumer exec:java -Dexec.args="--bootstrap-servers localhost:4511"
    

    In the logs of the consumer, we can now observe the steps of the consumer as described in the intro:

  3. The serialized and compressed record is read by the consumers:

    [NetworkClient][DEBUG] [Consumer clientId=consumer-unicorn.riderequest.consumer-1, groupId=unicorn.riderequest.consumer] Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=12, clientId=consumer-unicorn.riderequest.consumer-1, correlationId=10) and timeout 30000 to node 0: FetchRequestData(clusterId=null, replicaId=-1, maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=0, sessionEpoch=0, topics=[FetchTopic(topic='unicorn-ride-request-topic', partitions=[FetchPartition(partition=0, currentLeaderEpoch=0, fetchOffset=900, lastFetchedEpoch=-1, logStartOffset=-1, partitionMaxBytes=1048576)])], forgottenTopicsData=[], rackId='')
    [NetworkClient][DEBUG] [Consumer clientId=consumer-unicorn.riderequest.consumer-1, groupId=unicorn.riderequest.consumer] Received FETCH response from node 0 for request with header RequestHeader(apiKey=FETCH, apiVersion=12, clientId=consumer-unicorn.riderequest.consumer-1, correlationId=10): FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=2057133662, responses=[FetchableTopicResponse(topic='unicorn-ride-request-topic', partitionResponses=[FetchablePartitionResponse(partition=0, errorCode=0, highWatermark=1000, lastStableOffset=1000, logStartOffset=0, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=null, preferredReadReplica=-1, recordSet=MemoryRecords(size=17141, buffer=java.nio.HeapByteBuffer[pos=0 lim=17141 cap=17144]))])])
    
  4. The consumer requests the schema from the schema registry, and uses the schema to decompress and deserialize the record.

    [request][DEBUG] Sending Request: DefaultSdkHttpFullRequest(httpMethod=POST, protocol=https, host=localhost.localstack.cloud, port=4566, encodedPath=/, headers=[amz-sdk-invocation-id, Content-Length, Content-Type, User-Agent, X-Amz-Target], queryParameters=[])
    ...
    [request][DEBUG] Received successful response: 200, Request ID: 1EDT4G1DBSDS0XCD2N7FM9L4SRROEEML1FPDOYSANII78CXDR4F2, Extended Request ID: not available
    [GlueSchemaRegistryDeserializerFactory][DEBUG] Returning Avro de-serializer instance from GlueSchemaRegistryDeserializerFactory
    [GlueSchemaRegistryDefaultCompression][DEBUG] Decompression :: Compressed record length: 0KB
    [GlueSchemaRegistryDefaultCompression][DEBUG] Decompression :: Decompressed record length: 0KB
    [AvroDeserializer][DEBUG] Length of actual message: 121
    [DatumReaderInstance][DEBUG] Using SpecificDatumReader for de-serializing Avro message, schema: {"type":"record","name":"UnicornRideRequest","namespace":"cloud.localstack.demos.gluemsk.schema","fields":[{"name":"request_id","type":"int","doc":"customer request id"},{"name":"pickup_address","type":"string","doc":"customer pickup address"},{"name":"destination_address","type":"string","doc":"customer destination address"},{"name":"ride_fare","type":"float","doc":"ride fare amount (USD)"},{"name":"ride_duration","type":"int","doc":"ride duration in minutes"},{"name":"preferred_unicorn_color","type":{"type":"enum","name":"UnicornPreferredColor","symbols":["WHITE","BLACK","RED","BLUE","GREY"]},"default":"WHITE"},{"name":"recommended_unicorn","type":{"type":"record","name":"RecommendedUnicorn","fields":[{"name":"unicorn_id","type":"int","doc":"recommended unicorn id"},{"name":"color","type":{"type":"enum","name":"unicorn_color","symbols":["WHITE","RED","BLUE"]}},{"name":"stars_rating","type":["null","int"],"doc":"unicorn star ratings based on customers feedback","default":null}]}},{"name":"customer","type":{"type":"record","name":"Customer","fields":[{"name":"customer_account_no","type":"int","doc":"customer account number"},{"name":"first_name","type":"string"},{"name":"middle_name","type":["null","string"],"default":null},{"name":"last_name","type":"string"},{"name":"email_addresses","type":["null",{"type":"array","items":"string"}]},{"name":"customer_address","type":"string","doc":"customer address"},{"name":"mode_of_payment","type":{"type":"enum","name":"ModeOfPayment","symbols":["CARD","CASH"]},"default":"CARD"},{"name":"customer_rating","type":["null","int"],"default":null}]}}]})
    [AvroDeserializer][DEBUG] Finished de-serializing Avro message
    [GlueSchemaRegistryDeserializerFactory][DEBUG] Returning Avro de-serializer instance from GlueSchemaRegistryDeserializerFactory
    [GlueSchemaRegistryDefaultCompression][DEBUG] Decompression :: Compressed record length: 0KB
    [GlueSchemaRegistryDefaultCompression][DEBUG] Decompression :: Decompressed record length: 0KB
    [AvroDeserializer][DEBUG] Length of actual message: 121
    [AvroDeserializer][DEBUG] Finished de-serializing Avro message
    

Schema Evolution

In the course of this tutorial, we have implemented a Kafka producer and a consumer which integrate with the Glue Schema Registry. But the full potential of the Glue Schema Registry is unlocked when performing a schema evolution, i.e., when running producers and consumers with a new version of an already registered schema.

Therefore, we will run a few more interesting scenarios to illustrate the benefits of the Schema Registry:

  1. Running a producer which automatically registers a new, compatible schema version.
  2. Running a producer which is rejected when trying to register a new, incompatible schema version.
  3. Running an old consumer which is rejected since it is not compatible with the newly registered schema version.
  4. Running an updated consumer which can consume the new schema / records.

Producer registering a new schema version

In this step, we will create a new producer which uses a new, BACKWARD compatible schema version. The complete module can be found in our samples repository (along with the rest of the code of this tutorial).

The producer should register the new schema version automatically on its own.

We create the new producer by executing the following steps:

  • Copy the producer directory and rename it to producer-2.

  • Set a new artifact ID in the pom.xml of the module:

      ...
      <artifactId>producer-2</artifactId>
      ...
    
  • Add the new module to the root pom.xml:

    <modules>
      <module>producer</module>
      <module>producer-2</module>
      <module>consumer</module>
    </modules>
    
  • Create a new version of the schema:

    • Rename the schema to unicorn_ride_request_v2.avsc.

    • In the schema, remove the previously required field customer:

      $ diff -u producer/src/main/resources/avro/unicorn_ride_request_v1.avsc producer-2/src/main/resources/avro/unicorn_ride_request_v2.avsc
      --- producer/src/main/resources/avro/unicorn_ride_request_v1.avsc 2022-05-13 08:27:08.219354922 +0200
      +++ producer-2/src/main/resources/avro/unicorn_ride_request_v2.avsc 2022-05-13 08:27:08.219354922 +0200
      @@ -20,23 +20,6 @@
              {"name": "stars_rating", "type": ["null", "int"], "default": null, "doc": "unicorn star ratings based on customers feedback"}
              ]
          }
      -    },
      -    {
      -      "name": "customer",
      -      "type": {
      -        "type": "record",
      -        "name": "Customer",
      -        "fields": [
      -          {"name": "customer_account_no","type": "int", "doc": "customer account number"},
      -          {"name": "first_name","type": "string"},
      -          {"name": "middle_name","type": ["null","string"], "default": null},
      -          {"name": "last_name","type": "string"},
      -          {"name": "email_addresses","type": ["null", {"type":"array", "items":"string" }]},
      -          {"name": "customer_address","type": "string","doc": "customer address"},
      -          {"name": "mode_of_payment","type": {"type": "enum","name": "ModeOfPayment","symbols": ["CARD","CASH"]}, "default": "CARD"},
      -          {"name": "customer_rating", "type": ["null", "int"], "default": null}
      -        ]
      -      }
          }
      ]
      }
      

      This change is BACKWARD compatible, because an updated consumer can read records for both - the current and the previous - records (new consumers don’t need the customer data, they don’t care if it’s present or not).

  • Re-generate the Java classes for the schema:

    mvn clean generate-sources
    
  • Once the classes have been generatated, the producer code needs to be adjusted (remove the usage of setCustomer in the producer’s getRecord, since the method does not exist anymore).

  • Configure the producer to automatically register its schema version in case it’s not yet registered by setting the additional property AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING to true:

        ...
        props.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, AWSSchemaRegistryConstants.COMPRESSION.ZLIB.name());
        // Automatically register the new schema version of this producer!
        props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
        return props;
    }
    ...
    

Now we can run the new producer with the same bootstrap server address as before:

mvn clean install
mvn -pl producer-2 exec:java -Dexec.args="--bootstrap-servers localhost:4511"

In the logs we can see that the producer registered a new schema version before successfully publishing the new records:

...
[AWSSchemaRegistryClient][INFO ] Registered the schema version with schema version id = 02922438-aa47-41e0-80e0-42238d07565f and with version number = 2 and status AVAILABLE
...

Producer trying to register an incompatible schema version

In our next scenario we will create a new producer which wants to register a schema which is not compatible to the schema in the registry. The producer will be rejected right when trying to register the new schema version, before even sending a record.

The complete module can be found in our samples repository (along with the rest of the code of this tutorial).

Similar to the previous scenario, we create a new producer by executing the following steps:

  • Copy the producer-2 directory and rename it to producer-3.

  • Set a new artifact ID in the pom.xml of the module:

      ...
      <artifactId>producer-3</artifactId>
      ...
    
  • Add the new module to the root pom.xml:

    <modules>
      <module>producer</module>
      <module>producer-2</module>
      <module>producer-3</module>
      <module>consumer</module>
    </modules>
    
  • Create a new version of the schema:

    • Rename the schema to unicorn_ride_request_v3.avsc.

    • In the schema, add a new required field unicorn_food:

      $ diff -u producer-2/src/main/resources/avro/unicorn_ride_request_v2.avsc producer-3/src/main/resources/avro/unicorn_ride_request_v3.avsc
      --- producer-2/src/main/resources/avro/unicorn_ride_request_v2.avsc 2022-05-13 08:27:08.219354922 +0200
      +++ producer-3/src/main/resources/avro/unicorn_ride_request_v3.avsc 2022-05-13 08:27:08.219354922 +0200
      @@ -20,6 +20,17 @@
              {"name": "stars_rating", "type": ["null", "int"], "default": null, "doc": "unicorn star ratings based on customers feedback"}
              ]
          }
      +    },
      +    {
      +      "name": "unicorn_food",
      +      "type": {
      +        "type": "record",
      +        "name": "Food",
      +        "fields": [
      +          {"name": "price","type": "float","doc": "price per pound of food (USD)"},
      +          {"name": "name","type": "string"}
      +        ]
      +      }
          }
      ]
      }
      

      This change is not BACKWARD compatible, because an updated consumer cannot read records for both - the current and the previous - records (new consumers would expect the unicorn_food, which is not present in old records).

  • Re-generate the Java classes for the schema:

    mvn clean generate-sources
    
  • Once the classes have been generated, the producer code needs to be adjusted (set the new required unicorn_food in the producer’s getRecord):

    ...
            .setStarsRating(5).build())
    // we removed the (previously required) customer data here in the new version of the producer (v1)
    // and added a new field without a default (which isn't backward compatible)
    .setUnicornFood(Food.newBuilder().setPrice(133.7f).setName("Rainbow").build())
    .build();
    ...
    

We can run the new producer with the same bootstrap server address as before:

mvn clean install
mvn -pl producer-3 exec:java -Dexec.args="--bootstrap-servers localhost:4511"

In the logs we can see that the producer fails when trying to register its new, but incompatible, version of the schema:

...
[AWSSchemaRegistryClient][INFO ] Registered the schema version with schema version id = 8f625284-07b4-442e-83ae-395cd7853746 and with version number = 3 and status FAILURE
...
[WARNING]
com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: Register schema :: Call failed when registering the schema with the schema registry for schema name = unicorn-ride-request-schema-avro
    at com.amazonaws.services.schemaregistry.common.AWSSchemaRegistryClient.registerSchemaVersion (AWSSchemaRegistryClient.java:310)
...

Outdated consumers

We’ve seen how the producer’s schema evolution works in the previous scenarios. Now, we’ll take a closer look at our consumer.

In our first schema evolution scenario, the producer registered a new version of the schema and afterwards published records with that schema. The BACKWARD schema compatibility guarantees that updated consumers can read older records, i.e., the consumers need to be updated before the producers.

Therefore, our old consumer will fail in consuming these records, because it is not compatible with the new schema registered by the new producer yet:

mvn -pl consumer exec:java -Dexec.args="--bootstrap-servers localhost:4511"

In the logs, we can see that the consumer fails because the AVRO schema used by the consumer is expecting a required field (customer):

[request][DEBUG] Sending Request: DefaultSdkHttpFullRequest(httpMethod=POST, protocol=https, host=localhost.localstack.cloud, port=4566, encodedPath=/, headers=[amz-sdk-invocation-id, Content-Length, Content-Type, User-Agent, X-Amz-Target], queryParameters=[])
...
[DatumReaderInstance][DEBUG] Using SpecificDatumReader for de-serializing Avro message, schema: {"type":"record","name":"UnicornRideRequest","namespace":"cloud.localstack.demos.gluemsk.schema","fields":[{"name":"request_id","type":"int","doc":"customer request id"},{"name":"pickup_address","type":"string","doc":"customer pickup address"},{"name":"destination_address","type":"string","doc":"customer destination address"},{"name":"ride_fare","type":"float","doc":"ride fare amount (USD)"},{"name":"ride_duration","type":"int","doc":"ride duration in minutes"},{"name":"preferred_unicorn_color","type":{"type":"enum","name":"UnicornPreferredColor","symbols":["WHITE","BLACK","RED","BLUE","GREY"]},"default":"WHITE"},{"name":"recommended_unicorn","type":{"type":"record","name":"RecommendedUnicorn","fields":[{"name":"unicorn_id","type":"int","doc":"recommended unicorn id"},{"name":"color","type":{"type":"enum","name":"unicorn_color","symbols":["WHITE","RED","BLUE"]}},{"name":"stars_rating","type":["null","int"],"doc":"unicorn star ratings based on customers feedback","default":null}]}},{"name":"customer","type":{"type":"record","name":"Customer","fields":[{"name":"customer_account_no","type":"int","doc":"customer account number"},{"name":"first_name","type":"string"},{"name":"middle_name","type":["null","string"],"default":null},{"name":"last_name","type":"string"},{"name":"email_addresses","type":["null",{"type":"array","items":"string"}]},{"name":"customer_address","type":"string","doc":"customer address"},{"name":"mode_of_payment","type":{"type":"enum","name":"ModeOfPayment","symbols":["CARD","CASH"]},"default":"CARD"},{"name":"customer_rating","type":["null","int"],"default":null}]}}]})
...
[WARNING]
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition unicorn-ride-request-topic-0 at offset 100. If needed, please seek past the record to continue consumption.
Caused by: com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: Exception occurred while de-serializing Avro message
    at com.amazonaws.services.schemaregistry.deserializers.avro.AvroDeserializer.deserialize (AvroDeserializer.java:103)
...
Caused by: org.apache.avro.AvroTypeException: Found cloud.localstack.demos.gluemsk.schema.UnicornRideRequest, expecting cloud.localstack.demos.gluemsk.schema.UnicornRideRequest, missing required field customer
    at org.apache.avro.io.ResolvingDecoder.doAction (ResolvingDecoder.java:308)
...

Updated consumers

Finally, we will update our consumer such that it is compatible to the new version of the schema. The complete module can be found in our samples repository (along with the rest of the code of this tutorial).

Similar to the previous producer scenarios, we create a new consumer by executing the following steps:

  • Copy the consumer directory and rename it to consumer-2.

  • Set a new artifact ID in the pom.xml of the module:

      ...
      <artifactId>consumer-2</artifactId>
      ...
    
  • Add the new module to the root pom.xml:

    <modules>
      <module>producer</module>
      <module>producer-2</module>
      <module>producer-3</module>
      <module>consumer</module>
      <module>consumer-2</module>
    </modules>
    
  • Replace the unicorn_ride_request_v1.avsc with the new version used by producer-2 (unicorn_ride_request_v2.avsc).

  • Re-generate the Java classes for the schema:

    mvn clean generate-sources
    

Our new consumer, based on the latest version of the schema, will be able to successfully consume the records published by the new producer:

mvn -pl consumer-2 exec:java -Dexec.args="--bootstrap-servers localhost:4511"

Conclusion

Apache Kafka is used as the core messaging system in complex environments, with independent producers and consumers. The individual development of these components make it hard to coordinate and evolve data schemas over time. Using the AWS Glue Schema Registry can help you to prevent the usage of incompatible schemas.

With LocalStack, emulating Amazon Managed Streaming for Kafka and AWS Glue Schema Registry, you can develop and test the next evolution of your data schema locally on your own machine.


Last modified July 18, 2024: setup markdownlint (#1382) (f2ebb421e)