In this post, we will explore how to implement Apache Kafka messaging in Golang. Several packages are available, and the best choice depends on your specific use case.
Here are some of the most popular Kafka client libraries for Golang:
- Kafka-go (MIT license): This library provides both low-level and high-level implementations of Kafka messaging. It leverages interfaces for ease of use. https://github.com/segmentio/kafka-go
- kafkapubsub (gocloud.dev license): This package offers a pub/sub implementation for Golang that works with Kafka. https://github.com/google/go-cloud/blob/master/pubsub/kafkapubsub/kafka.go
- Sarama (MIT license): This library is a popular Go client for Kafka. It provides options for both asynchronous and synchronous message producers, allowing for low-level control over messaging if needed. The low-level API lets developers handle individual messages and connections precisely. https://github.com/IBM/sarama
- Goka (MIT license): Goka is a distributed stream processing library that simplifies building microservices. It can maintain a consumer state by persisting it in Kafka. Goka relies on Sarama for actual communication with Kafka. https://github.com/lovoo/goka
- confluent-kafka-go (Apache 2.0 license): This is Confluent's official Go client for Kafka. It comes with extensive documentation, making it a valuable resource for development. https://docs.confluent.io/home/overview.html
Kafka-go
The Kafka-go package provides both high-level and low-level APIs for working with messaging. The high-level APIs abstract the detailed implementation of the topic, while the low-level APIs provide developers with flexibility in handling connections, readers, writers, partitions, replication, and more.
For the low-level approach, developers can utilize the connection object to create and configure Kafka topics. On the other hand, for the high-level implementation, the high-level abstraction APIs Reader and Writer can be used to read and write to/from topics. This approach is particularly useful and straightforward for implementing a single topic-partition pair.
The Kafka-go Writer type offers several key features, including automatic retries, synchronous and asynchronous writes, and the creation of missing topics before publishing messages.
In this exercise, we will leverage the high-level types provided by the Kafka-go library to publish and subscribe to messages.
Setup Project
Step 1: First of all we need to set up the project and environment ready for the Kafka. Please complete Part 13 (
Part 13 - Messaging with Apache Kafka(Setup))of this series to install and set up the environment ready for Kafka.
Step 2: Create a producer module
Add a folder named producer under the project learngo and initialize the producer module by executing the command
$go mod init learngo/producer
Step 3: Create a consumer module
Add a folder named consumer under the learngo and initialize the consumer module by executing the command
$go mod init learngo/consumer
Step 4: add the kafka-go library on each module
$ go get github.com/segmentio/kafka-go
Create Topic and Publish Message
Step 1: Create a file named producer.go into the producer folder and add the following code:
producer.go
package producer
import (
"context"
"github.com/segmentio/kafka-go"
)
// Producer struct holds the Kafka writer
type Producer struct {
messageWriter *kafka.Writer
}
// NewProducer creates a new Kafka producer,
//The broker address will be passed from the caller i.e "localhost:9092"
func NewProducer(brokers []string) (*Producer, error) {
messageWriter := &kafka.Writer{
Addr: kafka.TCP(brokers...),
Topic: "learngo-person",
}
return &Producer{messageWriter: messageWriter}, nil
}
// PublishMessage sends a message to the Kafka topic
func (p *Producer) PublishMessage(message string) error {
msg := kafka.Message{
Value: []byte(message),
}
err := p.messageWriter.WriteMessages(context.Background(), msg)
return err
}
In the above code, we use the Kafka Writer to create a new topic named 'learnogo-person'. The constructor takes the broker address and creates the topic 'learnogo-person' if it doesn't exist. By default, the Kafka broker allows automatic topic creation.
We can explicitly configure the Kafka Writer to allow or disallow automatic topic creation as well.
messageWriter := &kafka.Writer{
Addr: kafka.TCP(brokers...),
Topic: "learngo-person",
AllowAutoTopicCreation: true,
}
return &Producer{messageWriter: messageWriter}, nil
}
Step 2. Modify the main.go
file within the main module and add the following function:
// This function publishes the message into Kafka topic. The Kafka broker hosted on localhost
func PublishPersonDataToKafka(personName string) {
brokers := []string{"localhost:9092"} // Kafka broker address
producer, err := producer.NewProducer(brokers)
if err != nil {
fmt.Println("Error creating producer:", err)
return
}
// Call the PublishMessage function from the producer
producer.PublishMessage(personName)
if err != nil {
fmt.Println("Error publishing message:", err)
return
}
fmt.Println("Message published successfully!")
}
Here, we are creating an instance of the Kafka message producer by providing the broker address.
Step 3: Call the function PublishPersonDataToKafka for each person's data retrieved from the database.
// Check if the address has values
if len(persons) > 0 {
// Loop through the collection and read properties
for _, person := range persons {
personName := &stringformater.PersonName{FirstName: person.FirstName, LastName: person.LastName}
go formatString(personName, ch)
formatedName, status := <-ch // receive formatted data from channel with channel status
channelStatus = status
fmt.Println(formatedName)
//publish Formated name into Kafka topic
PublishPersonDataToKafka(formatedName)
}
} else {
fmt.Println("No objects in the persons collection")
}
Step 4: Execute the main module
$go run main.go
After executing the program, the learnogo-person
topic will be created in the local Kafka cluster if it doesn't already exist.
Let's verify the topic create and publish the message
Step 5. Find the kafka cluster container id and open the container
$ docker exec -it <container_id> /bin/bash
Step 6: List all topics inside the Kafka container
root@385270269b49:/# kafka-topics.sh --list --bootstrap-server localhost:9092
385270269b49 - is the Kafka container ID
Step 7: Verify whether the message was published or not using the command:
root@385270269b49:/# kafka-console-consumer.sh --topic learngo-person --from-beginning --bootstrap-server localhost:9092
{"person":{"firstName":"learn",lastName:"go"}}
{"PersonID":0,"FirstName":"Test First Name","LastName":"Test Last Name","CreatedDate":"2024-06-04T13:47:42.220169-04:00","UserID":"learnGo"}
Subscribe and Consume the message
Step 1: Add a file named consumer.go to the folder consumer and add the below code in the file.
package main
import (
"context"
"fmt"
"log"
"github.com/segmentio/kafka-go"
)
// COnsumer struct holds the Kafka Reader
type Consumer struct {
messageReader *kafka.Reader
}
//Make a new reader that consumes from topic-A, partition 0, at offset 42
func NewConsumer(brokers []string) (*Consumer, error) {
messageReader := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
Topic: "learngo-person",
Partition: 0,
MaxBytes: 10e6, // 10MB
})
return &Consumer{messageReader: messageReader}, nil
}
// Consume messages from the reader until the context is canceled
func (c *Consumer) ConsumeMessages(ctx context.Context) {
for {
log.Printf("Looking for new message into the topic")
msg, err := c.messageReader.ReadMessage(ctx)
if err != nil {
log.Printf("Errored Out, exiting wait")
break
}
//Write the message into the console
fmt.Printf("Received message: Person Name: %s\n", msg.Value)
}
if err := c.messageReader.Close(); err != nil {
log.Fatal("failed to close reader:", err)
}
}
func main() {
fmt.Println("Consumer Started")
brokers := []string{"localhost:9092"}
// Create a new consumer
consumer, err := NewConsumer(brokers)
if err != nil {
fmt.Println("Exception occurred")
log.Fatal(err)
}
// Create a context for the consumer loop
ctx := context.Background()
// Call the ConsumeMessages function
consumer.ConsumeMessages(ctx)
if err != nil {
fmt.Println("Exception occurred")
log.Fatal(err)
}
fmt.Println("Consumer stopped")
}
In the code above, we've created the consumer for the topic "learngo-person" on the localhost:9092 Kafka broker and consumed the message using the message readers' ReadMessage function. The
ConsumeMessages
function continuously reads messages from the topic using the reader's
ReadMessage
function.
Step2. Open a new terminal and run the consumer module using the following command:
$go run consumer.go
The consumer will start looking for messages from the learngo-person
topic and print the received message content (person name) to the terminal.
We will see the recently published message in the terminal
2024/06/04 16:38:03 Looking for a new message on the topic
Received message: Person Name: Jhonney Walker
2024/06/04 16:38:03 Looking for a new message on the topic
Received message: Person Name: Bil Gates
2024/06/04 16:38:03 Looking for a new message on the topic
Reference
https://pkg.go.dev/github.com/segmentio/kafka-go#section-readme
https://pkg.go.dev/github.com/DarkMristov/kafka-go
https://kafka.apache.org/documentation/#intro_distribution
http://cloudurable.com/blog/kafka-architecture-topics/index.html