Sunday, July 14, 2024
HomeJavaConstructing an Apache Kafka information processing Java utility utilizing the AWS CDK

Constructing an Apache Kafka information processing Java utility utilizing the AWS CDK


Piotr Chotkowski, Cloud Utility Improvement Marketing consultant, AWS Skilled Providers

Utilizing a Java utility to course of information queued in Apache Kafka is a standard use case throughout many industries. Occasion-driven and microservices architectures, for instance, typically depend on Apache Kafka for information streaming and element decoupling. You need to use it as a message queue or an occasion bus, in addition to a manner to enhance resilience and reproducibility of occasions occurring within the applying.

On this publish, I stroll you thru the method of making a easy end-to-end information processing utility utilizing AWS instruments and providers in addition to different trade customary strategies. We begin with a short structure overview and an infrastructure definition. You then see how with only a few strains of code you possibly can arrange an Apache Kafka cluster utilizing Amazon Managed Streaming for Apache Kafka (Amazon MSK) and the AWS Cloud Improvement Package (AWS CDK). Subsequent, I present you tips on how to form your undertaking construction and bundle your utility for deployment. We additionally have a look at the implementation particulars and the way we will create Kafka subjects in Amazon MSK cluster in addition to ship and obtain messages from Apache Kafka utilizing providers corresponding to AWS Lambda and AWS Fargate.

I exploit the AWS CDK to automate infrastructure creation and utility deployment. The AWS CDK is an open-source software program improvement framework to outline your cloud utility sources utilizing acquainted programming languages. For extra info, see the Developer Information, AWS CDK Intro Workshop, and the AWS CDK Examples GitHub repo.

All of the code introduced on this publish is open sourced and accessible on GitHub.

Overview of answer

The next diagram illustrates our total structure.

Architecture diagram of the solution

Triggering the TransactionHandler Lambda operate publishes messages to an Apache Kafka subject. The appliance is packaged in a container and deployed to ECS Fargate, consumes messages from the Kafka subject, processes them, and shops the ends in an Amazon DynamoDB desk. The KafkaTopicHandler Lambda operate known as as soon as throughout deployment to create Kafka subject. Each the Lambda operate and the patron utility publish logs to Amazon CloudWatch.

To observe together with this publish, you want the next conditions:

Undertaking construction and infrastructure definition

The undertaking consists of three important elements: the infrastructure (together with Kafka cluster and Amazon DynamoDB), a Spring Boot Java client utility, and Lambda producer code.

Let’s begin with exploring the infrastructure and deployment definition. It’s carried out utilizing a set of AWS CDK stacks and constructs. I’ve chosen Typescript as my language right here primarily due to private desire. Nevertheless for those who desire you should use CDK with different languages. On the time of writing, AWS CDK helps Python, TypeScript, Java, .NET and Go. For extra info, see Working with the AWS CDK.

Let’s have a look at the undertaking listing construction. All AWS CDK stacks are positioned within the amazon-msk-java-app-cdk/lib listing. In amazon-msk-java-app-cdk/bin, you could find the primary AWS CDK app the place the entire stacks are instantiated. amazon-msk-java-app-cdk/lambda comprises code for TransactionHandler, which publishes messages to a Kafka subject, in addition to code for KafkaTopicHandler, which is chargeable for creating Kafka subject. The enterprise logic for the Kafka client, which is a Java Maven undertaking, is within the client listing. The Dockerfile mandatory for Fargate container creation is positioned in client/docker/Dockerfile. Lastly, doc comprises structure diagrams and scripts comprises the deployment script.

Establishing your Kafka cluster

The central a part of the structure is the Kafka cluster created utilizing Amazon MSK, which is comparatively straightforward to outline and deploy with the AWS CDK. Within the following code, I exploit the CfnCluster assemble to arrange my cluster:

new msk.CfnCluster(this, "kafkaCluster", {
    brokerNodeGroupInfo: {
        securityGroups: [vpcStack.kafkaSecurityGroup.securityGroupId],
        clientSubnets: [...vpcStack.vpc.selectSubnets({
            subnetType: ec2.SubnetType.PRIVATE
        instanceType: "kafka.t3.small",
        storageInfo: {
            ebsStorageInfo: {
                volumeSize: 5
    clusterName: "TransactionsKafkaCluster",
    kafkaVersion: "2.7.0",
    numberOfBrokerNodes: 2

vpcStack within the previous code refers back to the AWS CDK stack containing the VPC definition. As a result of we’re utilizing this cluster for demonstration functions solely, I restrict storage to five GB, the occasion sort to kafka.t3.small, and the variety of dealer nodes to 2, which is the minimal allowed quantity. We don’t wish to connect with this cluster from outdoors the VPC, so I place the cluster in a personal subnet of my VPC. For extra details about the allowed settings, see interface CfnClusterProps. To study extra about Amazon MSK, take a look at the Amazon MSK Labs workshop.

Subject creation

On the time of writing Amazon MSK doesn’t let you create a Kafka subject contained in the cluster utilizing the AWS service API. You may solely do that by connecting on to the Kafka cluster both utilizing Kafka instruments or utilizing a library from throughout the code of your utility. On this undertaking I’m utilizing the AWS CDK’s customized useful resource supplier. It permits you to use a customized Lambda operate to deal with AWS CloudFormation’s lifecycle occasions. The definitions of CustomResource, Supplier and Lambda operate sources you could find within the kafka-topic-stack.ts file and implementation of the handler Lambda operate within the kafka-topic-handler.ts file. Let’s have a look at the code of operate:

export const handler = async (occasion: any, context: any = {}): Promise<any> => {
    strive {
        if (occasion.RequestType === 'Create' || occasion.RequestType === 'Replace') {
            let outcome = await createTopic(occasion.ResourceProperties.topicConfig);
            response.ship(occasion, context, response.SUCCESS, {alreadyExists: !outcome});
        } else if (occasion.RequestType === 'Delete') {
            await deleteTopic(occasion.ResourceProperties.topicConfig.subject);
            response.ship(occasion, context, response.SUCCESS, {deleted: true});
    } catch (e) {
        response.ship(occasion, context, response.FAILED, {purpose: e});

Handler known as as soon as when the KafkaTopicStack is deployed and as soon as when it’s destroyed. I exploit the admin shopper from the KafkaJS open-source library to create Kafka subject on ‘Create’ AWS CloudFormation occasion and to destroy it on ‘Delete’ occasion. Calling KafkaJS’s createTopics technique will resolve to true if the subject was created efficiently or false if it already exists.

Shopper implementation particulars

The principle function of the Kafka client a part of this undertaking is to course of and validate incoming transaction messages and retailer ends in the DynamoDB desk. The buyer utility is written in Java with the usage of the Spring Boot framework. The core a part of performance is carried out within the KafkaConsumer class. I exploit the KafkaListener annotation to outline the entry level for incoming messages. Spring takes care of many of the boilerplate code for us, particularly, we don’t want to put in writing the logic to manually pull messages from the Kafka subject or fear about deserialization. All you should do is present the mandatory parts within the configuration class. Within the following code, the Spring Boot configuration is positioned within the ApplicationConfiguration class:

public ConsumerFactory<String, byte[]> consumerFactory(KafkaConsumerProperties properties) {
    Map<String, Object> configs = new HashMap<>();
    configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapAddress());
    configs.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
    configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
    configs.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, properties.getTrustStoreLocation());;

    return new DefaultKafkaConsumerFactory<>(configs);

public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConsumerFactory<String, byte[]> consumerFactory) {
    ConcurrentKafkaListenerContainerFactory<String, byte[]> manufacturing facility = new ConcurrentKafkaListenerContainerFactory<>();
    manufacturing facility.setConsumerFactory(consumerFactory);
    manufacturing facility.setMessageConverter(new ByteArrayJsonMessageConverter());
    return manufacturing facility;

The previous code units up the Kafka client configuration. We get the bootstrap servers tackle string and Kafka client group ID from the surroundings variables which are arrange throughout utility deployment. By default, Amazon MSK makes use of TLS 1.2 for safe communication, so we have to arrange SSL configuration in our utility as effectively. For extra details about encryption, see Amazon MSK Encryption.

For the deserialization of incoming Kafka messages, I exploit courses supplied by the Apache Kafka library. To allow Spring to deserialize Kafka JSON messages into POJOs, I exploit the ByteArrayDeserializer class mixed with ByteArrayJsonMessageConverter. That manner, Spring merely passes bytes as is from the deserializer to the message converter, and the converter transforms bytes into Java objects utilizing Jackson’s ObjectMapper beneath. I exploit this strategy as a result of it permits me to ship plaintext JSON messages. We don’t want something extra subtle for the aim of this publish. Relying in your wants, you should use totally different combos of deserializers and message converters or devoted deserializers, corresponding to KafkaAvroDeserializer, which makes use of the schema registry to determine the goal sort.

For extra details about tips on how to use Apache Kafka with Spring framework please confer with the Spring documentation.

Shopper deployment

We full three high-level steps to deploy the patron utility into Fargate.

First, we have to construct and bundle our utility into an executable JAR. I exploit the Apache Maven Shade plugin with Spring Boot Maven plugin dependency. It’s configured within the client utility pom.xml. The JAR is created throughout the bundle section of the Maven undertaking construct and positioned within the client/docker listing subsequent to the Dockerfile.

Subsequent, we outline the picture used to create the ECS process container. To try this, we create a Dockerfile, which is a textual content file containing all of the directions and configuration essential to assemble a Docker picture. I exploit Amazon Linux 2 as a base for the picture, moreover putting in Java 11 Amazon Corretto distribution, awslogs, and a CloudWatch agent. For the SSL configuration, we additionally want to repeat the truststore file. In line 9, we copy the executable JAR constructed within the earlier step from the native location into the picture. The final line within the Dockerfile is an entry level beginning the patron utility. It’s a typical Java command:

java -cp kafka-consumer-1.0-SNAPSHOT-shaded.jar

Lastly, we reference the Dockerfile within the AWS CDK stack. We do that contained in the fargate-stack.ts file. We outline the infrastructure essential to run our containerized utility within the ECS process. To make use of the native Dockerfile picture definition contained in the AWS CDK stack, you should create the asset DockerImageAsset:

const picture = new belongings.DockerImageAsset(this, "ConsumerImage", {
    listing: '../client/docker'

Subsequent, we reference this picture asset within the definition of the ECS process utilizing the ContainerImage.fromDockerImageAsset technique:

fargateTaskDefinition.addContainer("KafkaConsumer", {
    picture: ecs.ContainerImage.fromDockerImageAsset(picture),
    logging: ecs.LogDrivers.awsLogs({streamPrefix: 'KafkaConsumer'}),
    surroundings: {
        'TABLE_NAME': this.tableName,
        'GROUP_ID': this.groupId,
        'BOOTSTRAP_ADDRESS': bootstrapAddress.valueAsString,
        'REGION': this.area,
        'TOPIC_NAME': topicName.valueAsString

Throughout the AWS CDK stack deployment, the picture outlined within the Dockerfile is created and uploaded to an Amazon Elastic Container Registry (Amazon ECR) repository. That picture is used to create and begin the ECS process container, thereby beginning our client utility. For extra details about different methods of acquiring pictures, see the Amazon ECS Assemble Library.

Producer implementation particulars

We now have our Kafka cluster and client utility outlined. Now we have to publish messages to Kafka. I exploit a Lambda operate to publish messages to Kafka. All of the code of the producer is positioned within the transaction-handler.ts file. I exploit the KafkaJS open-source library to speak with the Kafka cluster and ship messages.

Producer deployment

Now let’s deploy our Kafka producer code. The AWS CDK stack definition for that half is positioned within the lambda-stack.ts file.

let transactionHandler = new NodejsFunction(this, "TransactionHandler", {
    runtime: Runtime.NODEJS_14_X,
    entry: 'lambda/transaction-handler.ts',
    handler: 'handler',
    vpc: vpcStack.vpc,
    securityGroups: [vpcStack.lambdaSecurityGroup],
    functionName: 'TransactionHandler',
    timeout: Length.minutes(5),
    surroundings: {
        'BOOTSTRAP_ADDRESS': bootstrapAddress.valueAsString,
        'TOPIC_NAME': topicName.valueAsString

It is a comparatively quick piece of code. The AWS CDK NodejsFunction assemble permits us to bundle our enterprise logic code and deploy it as a Node.js Lambda operate to the AWS Cloud. As a result of inner AWS CDK packaging and deployment logic, it makes your life simpler for those who place the listing containing your Lambda code within the AWS CDK root listing subsequent to the bin and lib directories. Within the properties, within the entry subject, it’s a must to level to the native file containing your code. That is the relative path from the AWS CDK root listing. You may move surroundings variables within the surroundings subject. For this publish, I move Kafka’s bootstrap tackle string and subject identify that I would like as a way to talk with the Kafka cluster and ship messages from throughout the Lambda operate. If esbuild is out there, it’s used to bundle your code in your surroundings. In any other case, bundling happens in a Docker container. Which means that for those who don’t wish to use esbuild, it’s a must to begin a Docker daemon earlier than deploying your AWS CDK stack. For extra details about the NodejsFunction assemble, see the Amazon Lambda Node.js Library.

Execution stroll by way of

As soon as we deploy the applying it’s time to check it. To set off Lambda operate and ship a message to the Kafka queue you should use the next AWS CLI command.

aws lambda invoke --cli-binary-format raw-in-base64-out --function-name TransactionHandler --log-type Tail --payload '{ "accountId": "account_123", "worth": 456}' /dev/stdout --query 'LogResult' --output textual content | base64 –d

Right here you might be including 456 to the steadiness of the account account_123. Lambda operate sends JSON message to the Amazon MSK cluster. The buyer utility pulls the message from the Kafka subject within the type of bytes and transforms it to an occasion of POJO class. Subsequent the patron enterprise logic executes and the applying shops ends in the Amazon DynamoDB desk. You may run following command to see the content material of the desk.

aws dynamodb scan --table-name Accounts --query "Gadgets[*].[id.S,Balance.N]" --output textual content

All of the logs from execution are saved in Amazon CloudWatch. To view them you possibly can go to AWS console or run aws logs tail command with specified CloudWatch Logs group.

You may experiment with the applying by sending a number of messages with totally different values of accountId and worth fields of JSON payload.


On this publish, we mentioned totally different strategies to implement and deploy your utility utilizing AWS CDK constructs, Java and Typescript utility code. Excessive-level AWS CDK constructs allow you to shortly outline the cloud infrastructure of your system and allow you to focus extra on implementing your small business logic. You need to use a mixture of programing languages that finest suit your use case and preserve all of your code and infrastructure definitions in a single place.

To run the code introduced on this publish, observe the conditions and utilization steps described within the README file of the GitHub undertaking.

Keep tuned for extra content material about cloud utility improvement. In case you have any questions or strategies, please depart a remark. I hope you will have loved studying this publish and discovered one thing new. In the event you did, please share along with your colleagues. Completely happy coding!

Extra from this writer




Please enter your comment!
Please enter your name here

Most Popular

Recent Comments