Thursday, June 6, 2024

Mastering Go: Part 14 - Messaging with Apache Kafka(Go Implementation)

In this post, we will explore how to implement Apache Kafka messaging in Golang. Several packages are available, and the best choice depends on your specific use case.

Here are some of the most popular Kafka client libraries for Golang:

  • Kafka-go (MIT license): This library provides both low-level and high-level implementations of Kafka messaging. It leverages interfaces for ease of use. https://github.com/segmentio/kafka-go
  • kafkapubsub (gocloud.dev license): This package offers a pub/sub implementation for Golang that works with Kafka. https://github.com/google/go-cloud/blob/master/pubsub/kafkapubsub/kafka.go
  • Sarama (MIT license): This library is a popular Go client for Kafka. It provides options for both asynchronous and synchronous message producers, allowing for low-level control over messaging if needed. The low-level API lets developers handle individual messages and connections precisely. https://github.com/IBM/sarama
  • Goka (MIT license): Goka is a distributed stream processing library that simplifies building microservices. It can maintain a consumer state by persisting it in Kafka. Goka relies on Sarama for actual communication with Kafka. https://github.com/lovoo/goka
  • confluent-kafka-go (Apache 2.0 license): This is Confluent's official Go client for Kafka. It comes with extensive documentation, making it a valuable resource for development. https://docs.confluent.io/home/overview.html

Kafka-go

The Kafka-go package provides both high-level and low-level APIs for working with messaging. The high-level APIs abstract the detailed implementation of the topic, while the low-level APIs provide developers with flexibility in handling connections, readers, writers, partitions, replication, and more.

For the low-level approach, developers can utilize the connection object to create and configure Kafka topics. On the other hand, for the high-level implementation, the high-level abstraction APIs Reader and Writer can be used to read and write to/from topics. This approach is particularly useful and straightforward for implementing a single topic-partition pair.

The Kafka-go Writer type offers several key features, including automatic retries, synchronous and asynchronous writes, and the creation of missing topics before publishing messages.

In this exercise, we will leverage the high-level types provided by the Kafka-go library to publish and subscribe to messages.

Setup Project

Step 1: First of all we need to set up the project and environment ready for the Kafka. Please complete Part 13 (Part 13 - Messaging with Apache Kafka(Setup))of this series to install and set up the environment ready for Kafka.

Step 2: Create a producer module
Add a folder named producer under the project learngo and initialize the producer module by executing the command
$go mod init learngo/producer

Step 3: Create a consumer module
Add a folder named consumer under the learngo and initialize the consumer module by executing the command
$go mod init learngo/consumer

Step 4: add the kafka-go library on each module
$ go get github.com/segmentio/kafka-go

Create Topic and Publish Message

Step 1: Create a file named producer.go into the producer folder and add the following code:

producer.go
package producer

import (
"context"

"github.com/segmentio/kafka-go"
)

// Producer struct holds the Kafka writer
type Producer struct {
messageWriter *kafka.Writer
}

// NewProducer creates a new Kafka producer,
//The broker address will be passed from the caller i.e "localhost:9092"
func NewProducer(brokers []string) (*Producer, error) {
messageWriter := &kafka.Writer{
Addr: kafka.TCP(brokers...),
Topic: "learngo-person",
}
return &Producer{messageWriter: messageWriter}, nil
}

// PublishMessage sends a message to the Kafka topic
func (p *Producer) PublishMessage(message string) error {
msg := kafka.Message{
Value: []byte(message),
}
err := p.messageWriter.WriteMessages(context.Background(), msg)
return err
}


In the above code, we use the Kafka Writer to create a new topic named 'learnogo-person'. The constructor takes the broker address and creates the topic 'learnogo-person' if it doesn't exist. By default, the Kafka broker allows automatic topic creation.
We can explicitly configure the Kafka Writer to allow or disallow automatic topic creation as well.

messageWriter := &kafka.Writer{
Addr: kafka.TCP(brokers...),
Topic: "learngo-person",
AllowAutoTopicCreation: true,
}
return &Producer{messageWriter: messageWriter}, nil
}

Step 2. Modify the main.go file within the main module and add the following function:

// This function publishes the message into Kafka topic. The Kafka broker hosted on localhost
func PublishPersonDataToKafka(personName string) {

brokers := []string{"localhost:9092"} // Kafka broker address
producer, err := producer.NewProducer(brokers)
if err != nil {
fmt.Println("Error creating producer:", err)
return
}

// Call the PublishMessage function from the producer
producer.PublishMessage(personName)
if err != nil {
fmt.Println("Error publishing message:", err)
return
}

fmt.Println("Message published successfully!")
}

Here, we are creating an instance of the Kafka message producer by providing the broker address.

Step 3: Call the function PublishPersonDataToKafka for each person's data retrieved from the database.
  // Check if the address has values
if len(persons) > 0 {
// Loop through the collection and read properties
for _, person := range persons {
personName := &stringformater.PersonName{FirstName: person.FirstName, LastName: person.LastName}
go formatString(personName, ch)
formatedName, status := <-ch // receive formatted data from channel with channel status
channelStatus = status
fmt.Println(formatedName)

//publish Formated name into Kafka topic
PublishPersonDataToKafka(formatedName)
}
} else {
fmt.Println("No objects in the persons collection")
}

Step 4: Execute the main module
$go run main.go

After executing the program, the learnogo-person topic will be created in the local Kafka cluster if it doesn't already exist.

Let's verify the topic create and publish the message 

Step 5.  Find the kafka cluster container id and open the container
$ docker exec -it <container_id> /bin/bash

Step 6: List all topics inside the Kafka container

root@385270269b49:/# kafka-topics.sh --list --bootstrap-server localhost:9092
385270269b49 - is the Kafka container ID

Step 7: Verify whether the message was published or not using the command:
root@385270269b49:/# kafka-console-consumer.sh --topic learngo-person --from-beginning --bootstrap-server localhost:9092
{"person":{"firstName":"learn",lastName:"go"}}
{"PersonID":0,"FirstName":"Test First Name","LastName":"Test Last Name","CreatedDate":"2024-06-04T13:47:42.220169-04:00","UserID":"learnGo"}

Subscribe and Consume the message

Step 1: Add a file named consumer.go to the folder consumer and add the below code in the file.
package main

import (
"context"
"fmt"
"log"

"github.com/segmentio/kafka-go"
)

// COnsumer struct holds the Kafka Reader
type Consumer struct {
messageReader *kafka.Reader
}

//Make a new reader that consumes from topic-A, partition 0, at offset 42
func NewConsumer(brokers []string) (*Consumer, error) {

messageReader := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
Topic: "learngo-person",
Partition: 0,
MaxBytes: 10e6, // 10MB
})
return &Consumer{messageReader: messageReader}, nil
}

// Consume messages from the reader until the context is canceled
func (c *Consumer) ConsumeMessages(ctx context.Context) {
for {
log.Printf("Looking for new message into the topic")

msg, err := c.messageReader.ReadMessage(ctx)
if err != nil {
log.Printf("Errored Out, exiting wait")
break
}

//Write the message into the console
fmt.Printf("Received message: Person Name: %s\n", msg.Value)
}

if err := c.messageReader.Close(); err != nil {
log.Fatal("failed to close reader:", err)
}

}

func main() {

fmt.Println("Consumer Started")

brokers := []string{"localhost:9092"}

// Create a new consumer
consumer, err := NewConsumer(brokers)
if err != nil {
fmt.Println("Exception occurred")
log.Fatal(err)
}

// Create a context for the consumer loop
ctx := context.Background()

// Call the ConsumeMessages function
consumer.ConsumeMessages(ctx)
if err != nil {
fmt.Println("Exception occurred")
log.Fatal(err)
}

fmt.Println("Consumer stopped")
}


In the code above, we've created the consumer for the topic "learngo-person" on the localhost:9092 Kafka broker and consumed the message using the message readers' ReadMessage function.  The ConsumeMessages function continuously reads messages from the topic using the reader's ReadMessage function. 

Step2. Open a new terminal and run the consumer module using the following command:
$go run consumer.go

The consumer will start looking for messages from the learngo-person topic and print the received message content (person name) to the terminal.

We will see the recently published message in the terminal

2024/06/04 16:38:03 Looking for a new message on the topic
Received message: Person Name: Jhonney Walker
2024/06/04 16:38:03 Looking for a new message on  the topic
Received message: Person Name: Bil Gates
2024/06/04 16:38:03 Looking for a new message on the topic


Reference

https://pkg.go.dev/github.com/segmentio/kafka-go#section-readme
https://pkg.go.dev/github.com/DarkMristov/kafka-go
https://kafka.apache.org/documentation/#intro_distribution
http://cloudurable.com/blog/kafka-architecture-topics/index.html


 


Friday, May 31, 2024

Mastering Go: Part 13 - Messaging with Apache Kafka(Setup)

 Messaging

Effective communication between software components, modules, and applications is essential for building robust and efficient systems. This communication allows data exchange and interaction between various parts. There are several methods and needs for communication, both internally within a system and externally with other systems.

Messaging is a powerful approach that decouples components and systems from each other. This decoupling promotes scalability and maintainability. The concept of messaging itself has evolved from the need for asynchronous communication, where components can exchange data without needing to be available at the same time.

Asynchronous Communication

Asynchronous communication allows applications to exchange data without needing to be aware of each other's availability. This decoupled approach offers several benefits for building scalable and maintainable systems.

There are various methods and techniques for asynchronous communication between applications or services. A popular method involves using topics. In this approach, a publisher sends a message to a topic, and subscribers interested in that topic receive the message. This mechanism ensures that the publisher and subscriber don't necessarily need to be aware of each other's existence.

There are several messaging systems, in this post, we will be learning mostly the basics of Apache Kafka.

Pub/Sub

Pub/Sub is an asynchronous messaging service commonly used to broadcast messages. To use Google Cloud Pub/Sub, we must have access to Google Cloud services. Golang has a built-in PubSub package provides an easy way to publish and receive Pub/Sub messages. With this package, we can create a topic to publish messages and create a subscriber to receive notifications.

We can achieve similar functionality offered by Pub/Sub using open-source platforms like Apache Kafka and/or RabbitMQ. For more details on Pub/Sub, please refer to the documentation provided by Google Cloud. Since we are not using Google Cloud services in this learning series, we will not go deeply into Pub/Sub in this post.

For more information, visit:


Apache Kafka


Apache Kafka is a messaging and distributed streaming platform that facilitates publish/subscribe communication between applications and services. It is one of the most popular open-source alternatives for messaging platforms.

Kafka operates as a distributed messaging system, utilizing servers and clients to communicate between applications using the TCP protocol. Here are the key components and concepts of Kafka:

Kafka Concepts

  • Server: Kafka runs as a cluster on multiple servers. The Kafka cluster is scalable and fault-tolerant, meaning if one server fails, the other servers will take over to ensure continuous operation.
  • Client: Kafka clients allow applications to read messages from Kafka topics/servers and process them. Kafka clients are available in several programming languages including Go.
  • Topics: The topic is the message category(named) where messages/events will be published, subscribed, and organized.
  • Producer: The client application that publishes events/messages to Kafka topics
  • Consumer: The client that subscribes and consumes the message/events from Kafka topics

Setting Up Kafka

To use Kafka, you need to set up the Kafka environment, which can be quite complex. You need a cluster management system or platform(Coordination service) to run Kafka clusters. The options include:

  • ZooKeeper: Used for managing Kafka clusters in earlier versions. It handles tasks like broker election, configuration management, and partition assignment.
  • Kraft: A newer method that eliminates the need for ZooKeeper. Kraft is a built-in alternative that eliminates the need for a separate ZooKeeper service. It replicates metadata across Kafka brokers for fault tolerance. 
  • Docker: Docker simplifies deployment and management by containerizing Kafka and its dependencies. Docker Compose is a helpful tool for defining and running multi-container applications, including Kafka clusters.
  • Kubernetes: Kubernetes is a container orchestration tool that can be used to manage and scale the Kafka cluster.

Exercise: Setting Up Apache Kafka

This exercise will guide you through setting up Apache Kafka and creating a message using Docker. 

Step 1: Install Docker

Option 1: Using Homebrew (macOS)

$brew install docker

Option 2: Using Docker Desktop

  1. Download the Docker Desktop installer from https://www.docker.com/products/docker-desktop/.
  2. Run the installer and follow the on-screen instructions.

Step 2: Verify Docker Installation

Open a terminal and run:

$docker -v

This command should display information confirming Docker is installed and running.

Step 3: Docker Compose

Docker Compose is a tool for defining and running multi-container applications. It's often bundled with Docker installations. Docker Compose allows you to specify the applications and their configurations in a single file (usually named docker-compose.yml).

Step 4: Start Docker

Verify Docker Status

$docker info

This command will list detailed information about the Docker environment.

List Docker Images

$docker image ls

This command will display a list of Docker images currently available on your machine.

Step 5: Install Kafka

We'll use Docker containers to set up Kafka. Apache Kafka versions 3.7 and later offer an official Docker image. This simplifies the local setup process.

Step 6: Create a Local Kafka Environment

To create a local Kafka environment, we'll utilize a Docker Compose configuration file (docker-compose.yml). This file defines the containers needed for the Kafka cluster to run on your local machine. I'm using the project(learngo) from this series to add my compose file( Part 12 - Program debugging , Profiling and Performance Evaluation)

docker-compose.yml

version: '3.8'


services:

  zookeeper:

    image: wurstmeister/zookeeper

    ports:

      - "2181:2181"


  kafka:

    image: wurstmeister/kafka

    ports:

      - "9092:9092"

    environment:

      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092

      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092

      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT

      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT

      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

    depends_on:

      - zookeeper



Step 7: Start the Kafka Cluster

Run the following command to download the Kafka image defined in your docker-compose.yml file and start the Kafka container in the background:

$docker-compose up -d

S*******s-MacBook-Pro:learngo s**********i$ docker compose up -d

WARN[0000] /Users/s*************i/Documents/learngo/docker-compose.yml: `version` is obsolete 

[+] Running 8/8

  kafka Pulled                                                                                                                 25.6s 

    42c077c10790 Pull complete                                                                                                 11.5s 

    44b062e78fd7 Pull complete                                                                                                 11.9s 

    b3ba9647f279 Pull complete                                                                                                 11.9s 

    10c9a58bd495 Pull complete                                                                                                 16.1s 

    ed9bd501c190 Pull complete                                                                                                 16.2s 

    03346d650161 Pull complete                                                                                                 24.8s 

    539ec416bc55 Pull complete                                                                                                 24.8s 

[+] Running 2/2

  Network learngo_default    Created                                                                                            0.1s 

  Container learngo-kafka-1  Started     

Complete Commands Summary:

  • Stop Existing Containers:
    $docker-compose down
    
  • Start Kafka Service:
    $docker-compose up -d
    
  • Check Container Status:
    $docker-compose ps
    
  • View Container Logs:
    $docker-compose logs
    
  • List Running Containers:
    $docker ps
    

Step 6: Create Topics

We'll use Kafka commands to create topics where messages will be published.

a. List Running Containers:

$docker ps
S*****s-MacBook-Pro:learngo s*********i$docker ps
CONTAINER ID   IMAGE                    COMMAND                  CREATED        STATUS        PORTS                                                NAMES
385270269b49   wurstmeister/kafka       "start-kafka.sh"         24 hours ago   Up 24 hours   0.0.0.0:9092->9092/tcp                               learngo-kafka-1
752d1dcbe150   wurstmeister/zookeeper   "/bin/sh -c '/usr/sb…"   24 hours ago   Up 24 hours   22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp   learngo-zookeeper-1

b. Access the Kafka Container:

$docker exec -it <container-id> /bin/bash
S*****s-MacBook-Pro:learngo s******i$ docker exec -it 385270269b49  /bin/bash
root@385270269b49:/# 

Here, the 385270269b49 is the container ID of your Kafka container from the previous command.

c. Create a Topic:

Run below command to create topic
$kafka-topics.sh --create --topic learngo-person --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

 root@385270269b49:/# kafka-topics.sh --create --topic learngo-person --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

Created topic learngo-person.

root@385270269b49:/# 

This command creates a topic named learngo-person with one partition and a replication factor of 1 (suitable for a local development environment).

Step 7: View Topic Details

Once you've created a topic, you can view its details using the command:

$kafka-topics.sh --describe --topic learngo-person --bootstrap-server localhost:9092

This command will display information about the newly created topic, including the number of partitions, replication factor, and configuration settings.


root@385270269b49:/# kafka-topics.sh --describe --topic learngo-person --bootstrap-server localhost:9092

Topic: learngo-person   TopicId: 3gZMZ-tmTlesfCfmO9Dwng PartitionCount: 1       ReplicationFactor: 1    Configs: segment.bytes=1073741824

        Topic: learngo-person   Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001

root@385270269b49:/# 

Step 8: Produce Messages

You can write a producer application to send messages to the Kafka topic(Detail in Part-14 of this series). Alternatively, you can use the kafka-console-producer.sh tool:

$kafka-console-producer.sh --topic learngo-person --bootstrap-server localhost:9092

At the prompt, type your message and press Enter. For example:

>{"person":{"firstName":"learn","lastName":"go"}}

Example:

root@385270269b49:/# kafka-console-producer.sh --topic learngo-person --bootstrap-server localhost:9092

>{"person":{"firstName":"learn",lastName:"go"}}

>

Step 9: Consume Messages


Write a consumer application to read messages from the Kafka topic(Detail in Part 14 of this series). Again, you can use the kafka-console-consumer.sh tool:
$docker exec -it <container_id> /bin/bash

Replace <container_id> with the actual ID of your Kafka container.

Example:
S*****s-MacBook-Pro:learngo s******i$ docker exec -it 385270269b49  /bin/bash
root@385270269b49:/# 

Inside the container, run:

$kafka-console-consumer.sh --topic learngo-person --from-beginning --bootstrap-server localhost:9092

This command will start consuming messages from the learngo-person topic, printing them to the console.

Example:

root@385270269b49:/# kafka-console-consumer.sh --topic learngo-person --from-beginning --bootstrap-server localhost:9092

{“person":{"firstName":"learn",lastName:"go"}}

Testing and Verification

You can publish more messages from the producer terminal (Step 8) and observe them being consumed in the consumer terminal (Step 9). Press Ctrl+C in each terminal to stop the producer and consumer.

Docker Desktop Verification

Open Docker Desktop and you should see two running containers: one for ZooKeeper and one for Kafka.



This guide demonstrated how to set up a local Kafka environment using Docker Compose, create topics, and use console tools to produce and consume messages. In the next part, we'll explore how to interact with Kafka using Go programs.

Reference


https://pkg.go.dev/github.com/segmentio/kafka-go

https://kafka.apache.org/intro

https://kafka.apache.org/quickstart

https://www.conduktor.io/kafka/how-to-install-apache-kafka-on-mac-with-homebrew/

https://www.conduktor.io/kafka/how-to-start-kafka-using-docker/

https://pkg.go.dev/cloud.google.com/go/pubsub

https://cloud.google.com/pubsub/docs/overview



Mastering Go: Part 14 - Messaging with Apache Kafka(Go Implementation)

In this post, we will explore how to implement Apache Kafka messaging in Golang. Several packages are available, and the best choice depends...