Amazon Managed Streaming for Apache Kafka (Amazon MSK) with schema registry :

Swetav Kamal
4 min readDec 27, 2019

Schema registry will use Confluent open source schema registry project and it will be talking to MSK(Managed streaming Kafka) cluster.

We need to have an Ec2 instance in the same VPC as your MSK cluster. This EC2 instance will have confluent schema registry installed on it. Once we have schema registry started on to the EC2 instance,

  1. Either you can use the kafka-avro-console-producer and kafka-avro-console-consumer script which is available with confluent hub to test the schema registry
  2. you can use the sample java project at below location to test https://github.com/swetavkamal/SchemaRegistryMSK

Follow the below steps :

  1. Launch a MSK cluster
  2. Create a client machine in the same VPC as MSK cluster and keep the security group such that MSK cluster and client machine can talk to each other. you can follow the below link to set up client machine : https://docs.aws.amazon.com/msk/latest/developerguide/create-client-machine.html
  3. Download Confluent from https://www.confluent.io/previous-versions/ [ I used 5.1.2 with MSK 1.1.1 ] and untar it or even you can install confluent hub on your client machine.
1. wget http://packages.confluent.io/archive/5.3/confluent-community-5.3.1-2.12.tar.gz?_ga=2.262062221.128806676.1578718223-1914692183.15707699062. tar -xvzf confluent-community-5.3.1-2.12.tar.gz\?_ga\=2.262062221.128806676.1578718223-1914692183.1570769906

4. Once you have the client machine ready and confluent downloaded and extracted, Go to the schema-registry settings file. You would find file “schema-registry.properties” at location “confluent-5.1.2/etc/schema-registry”

Make below changes :

listeners=http://0.0.0.0:8082 [ You can take port of your choice where your schema registery listners will listen ]

kafkastore.connection.url=

kafkastore.bootstrap.servers=PLAINTEXT://<your-bootstrap-servers>

5. Once you have the settings ready, you need to start the schema-registry server.

Go to confluent-5.1.2/bin folder and run command

./schema-registry-start ../etc/schema-registry/schema-registry.properties

6. You would see something similar :

INFO Server started, listening for requests… (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:44)

7. You are now ready to test your schema registry

Testing through kafka-avro-console-producer script of confluent

7.1 Go to the confluent-5.1.2/bin folder

7.2 ./kafka-avro-console-producer — broker-list <your-broker-list> — topic test — property value.schema=’{“type”:”record”,”name”:”myrecord”,”fields”:[{“name”:”f1",”type”:”string”}]}’

You will see screen waiting for input where you need to pass some Inputs:

{“f1”: “value1”}

{“f2”: “value1”}

{“f3”: “value1”}

7.2 Then Run the consumer to test the avro-consumer

./kafka-avro-console-consumer — topic test — bootstrap-server <your-bootstrap-server> — from-beginning

You would see the output

{“f1”:”value1"}
{“f1”:”value2"}
{“f1”:”value3"}

8. Testing through java sample code : [Optional ]

Now import the java project from below github project in your eclipse and build it, you would see schema files generated. Make changes as per your requirement.

  1. https://github.com/swetavkamal/SchemaRegistryMSK

Once, every thing is ready, export each version of the producers and consumers separately in your client machine and start running:

java -jar producer1.jar java -jar consumer1.jar

jave -jar producer2.jar java -jar consumer2.jar

Now, Let us take it a step further where we would like to run the schema registry in production environment.

In such a case, You need to follow below architecture

Steps to the architecture :

  1. We need to use the steps 1 to 4 from above to launch an EC2 instance .This will prepare an Ec2 instance with Confluent schema registry on it.
  2. Now we need to create an AMI using this instance. Follow below link : https://docs.aws.amazon.com/toolkit-for-visual-studio/latest/user-guide/tkv-create-ami-from-instance.html
  3. Now we will create an Autoscaling group for which we will use above AMI.

3.1 Create a launch configuration using the AMI created by you.

Go to Ec2 console, click on Create Launch Configuration and select “My-AMI” and select your AMI, Give further details as per below doc and craete the configuration https://docs.aws.amazon.com/autoscaling/ec2/userguide/create-launch-config.html

Note : while creating launch configuration, in configuration details under user data, you need to add below so that schema registry service gets up when new node comes up

#!/bin/bash

/home/ec2-user/confluent-5.3.1/bin/schema-registry-start /home/ec2-user/confluent-5.3.1/etc/schema-registry/schema-registry.properties

Once the configuration is ready, use this configuration to create an Autoscaling group : https://docs.aws.amazon.com/autoscaling/ec2/userguide/create-asg.html

4. Now, we need to create a target group and attach our scaling group to the target group.

https://docs.aws.amazon.com/autoscaling/ec2/userguide/attach-load-balancer-asg.html

Finally create a Load balancer and attach the target group to this.

https://docs.aws.amazon.com/elasticloadbalancing/latest/network/create-network-load-balancer.html

Now once every thing is ready, you need to go your code and change the schema registry IP and port to the DNS of loadbalancer : props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, “http://<load-baalncer-DNS> :8082");

Now as your code would run, it will pass the requests through load balancer to EC2 on which schema registry is running

--

--