🌞

Kafka客户端--Go版本

「 Kafka 」
分布式消息平台,用Scala实现,本项目使用Go语言实现简单的消息生产和消费。

Kafka 客户端 Golang 版

project feature weakness
Shopify/sarama 最受欢迎 集群式消费难实现,不支持Context
bsm/sarama-cluster 基于sarama补充集群式消费 不支持Context
confluentinc/confluent-kafka-go 依赖C语言库,不支持Context
lovoo/goka 依赖于sarama 不支持Context
segmentio/kafka-go 同时支持集群模式,易与软件交互 未正式发布,支持Context

下面将基于 Shopify/sarama 实现简单的消费者及生产者。

基本数据结构

  • Reporter 生产者:包含一个Producer及一个logger用于日志记录

  • Subscriber 消费者:包含一个Consumer及logger 具体实现

生产者

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package reporter

import (
	"github.com/Shopify/sarama"
	"github.com/sirupsen/logrus"

	"../error_const"
)

// kafka server 端配置
type KafkaCfg struct {
	Host  string `json:"host"`
	Topic string `json:"topic"`
}

// 消息生产者数据结构
type Reporter struct {
	Producer sarama.SyncProducer
	logger   *logrus.Logger
}

// 构造函数
func NewReporter(cfg *KafkaCfg, log *logrus.Logger) *Reporter {
	reporter := &Reporter{
		logger: log,
	}

	reporter.setProducer(cfg)
	return reporter
}

func (reporter *Reporter) setProducer(cfg *KafkaCfg) {
	var broker = []string{cfg.Host}

	config := sarama.NewConfig()
	config.Producer.Return.Successes = true
	// new 一个Producer
	producer, err := sarama.NewSyncProducer(broker, config)
	if err != nil {
		reporter.logger.Errorf(error_const.InitProducerError, err)
	}

	reporter.Producer = producer
}

func (reporter *Reporter) DoReport(topic string, msg []byte) {
	reporter.do(topic, msg)
}

func (reporter *Reporter) do(topic string, msg []byte) {
	kafkaMsg := generateProducerMessage(topic, msg)
	_, _, err := reporter.Producer.SendMessage(kafkaMsg)
	if err != nil {
		reporter.logger.Errorf(error_const.ReportKafkaMsgError, err, string(msg))
	}
	reporter.logger.Infof(error_const.ReportKafkaMsgSuccess, string(msg))
}

func generateProducerMessage(topic string, message []byte) *sarama.ProducerMessage {
	return &sarama.ProducerMessage{
		Topic:     topic,
		Partition: -1,
		Value:     sarama.StringEncoder(message),
	}
}

消费者

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package subscriber

import (
	"github.com/Shopify/sarama"
	"github.com/sirupsen/logrus"

	"../error_const"
	"../reporter"
)

type Subscriber struct {
	Consumer sarama.Consumer
	logger   *logrus.Logger
}

func NewSubscriber(cfg *reporter.KafkaCfg, log *logrus.Logger) *Subscriber {
	subscriber := &Subscriber{
		logger: log,
	}
	subscriber.setConsumer(cfg)
	return subscriber
}

func (subscriber *Subscriber) setConsumer(cfg *reporter.KafkaCfg) {
	consumer, err := sarama.NewConsumer([]string{cfg.Host}, nil)
	if err != nil {
		panic(err)
	}
	subscriber.Consumer = consumer
}

func (subscriber *Subscriber) Consume(topic string, ch chan string) {
	defer func() {
		if err := subscriber.Consumer.Close(); err != nil {
			subscriber.logger.Errorf(error_const.SubcriberCloseConsumerError, err)
		}
	}()

	// 获取所有 partition
	partitionList, err := subscriber.Consumer.Partitions(topic)
	if err != nil {
		subscriber.logger.Errorf(error_const.SubScriberGetPartitionsError, err, topic)
	}

	// 遍历所有 partition 获取最新 offset 上的消息
	for _, partition := range partitionList {
		pc, _ := subscriber.Consumer.ConsumePartition(topic, partition, sarama.OffsetNewest)
  	// 启动一个 goroutine 用于持续监听消息
		go func(pc sarama.PartitionConsumer) {
			for message := range pc.Messages() {
				subscriber.logger.Infof(messageReceived(message))
				ch <- messageReceived(message)
			}
		}(pc)
	}
}

func messageReceived(message *sarama.ConsumerMessage) string {
	return string(message.Value)
}

main

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main

import (
	"encoding/json"
	"fmt"
	"os"

	"github.com/sirupsen/logrus"

	"./reporter"
	"./subscriber"
)

var (
	config = `{
	"host": "localhost:9092",
	"topic": "kafka_test"
  }`
)

func main() {
	var cfg reporter.KafkaCfg
	json.Unmarshal([]byte(config), &cfg)
	log := &logrus.Logger{Out: os.Stdout}

	//reporter
	producer := reporter.NewReporter(&cfg, log)
	//subscriber
	consumer := subscriber.NewSubscriber(&cfg, log)
	message := "Hello Kafka World."

	ch := make(chan string)
	consumer.Consume(cfg.Topic, ch)
	producer.DoReport(cfg.Topic, []byte(message))

	select {
	case msg := <-ch:
		fmt.Println("Got msg: ", msg)
		break
	}
}

其他

为避免漏消费消息,可指定 group,Kafka 确保每个 partition 只能同一个 group 中的同一个 consumer 消费,若要重复消费,更换 group。

项目代码:kafka-client-golang,项目结构:

1
2
3
4
5
6
7
8
9
.
├── README.md
├── error_const
│   └── error_const.go
├── main.go
├── reporter
│   └── report.go
└── subscriber
    └── subscribe.go

👏欢迎评论👏

updatedupdated2020-05-082020-05-08