背景
之前我的微服务项目用的Nats消息队列,最初用是因为nats官网上有一个性能对比图,nats最好,就用了。后来才发现nats没有持久化保证,怪不得快呢,笑。
后来虽然用jetstream插件保证了持久性,即最少一次的到达,但是感觉单纯从业务使用上来看,kafka用着更舒服(消费者组),也没有什么大的短板,就换到kafka了。
正文
部署kafka集群
- docker-compose.yaml配置
extra_hosts中的ip换成你自己的,用ifconfig看,找那个docker0的ip填进去(其他的应该也行,但我用的这个)
version: "3"
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
restart: always
kafka1:
image: wurstmeister/kafka
depends_on: [ zookeeper ]
container_name: kafka1
ports:
- "9091:9091"
environment:
HOSTNAME: kafka1
# KAFKA_BROKER_ID: 0
KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 6000
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9091
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9091
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181/kafka
extra_hosts:
- "kafka1:172.17.0.1"
volumes:
- ./data/kafka1:/kafka
kafka2:
image: wurstmeister/kafka
depends_on: [ zookeeper ]
container_name: kafka2
ports:
- "9092:9092"
environment:
HOSTNAME: kafka2
# KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 6000
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181/kafka
extra_hosts:
- "kafka2:172.17.0.1"
volumes:
- ./data/kafka2:/kafka
kafka3:
image: wurstmeister/kafka
depends_on: [ zookeeper ]
container_name: kafka3
ports:
- "9093:9093"
environment:
HOSTNAME: kafka3
# KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 6000
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9093
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181/kafka
extra_hosts:
- "kafka3:172.17.0.1"
volumes:
- ./data/kafka3:/kafka
kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui
ports:
# 本机8081映射到容器内8080
- "8081:8080"
environment:
DYNAMIC_CONFIG_ENABLED: 'true'
volumes:
- ./configs/kafka-ui/dynamic_config.yaml:/etc/kafkaui/dynamic_config.yaml
下面是kafka-ui的配置,也就是dynamic_config.yaml
,换上去
kafka:
clusters:
-
name: ttms-kafka
bootstrapServers: kafka1:9091,kafka2:9092,kafka3:9093
metrics:
port: 9997
type: JMX
执行 docker-compose up -d启动,之后docker ps看看都起来了没,起来之后,浏览器访问8081端口就能访问到kafka-ui的界面,可以进行kafka的一些初始配置,比如创建topic。
2. 此时kafka和对应的ui界面都启动起来了,可以尝试用go代码连一下kafka集群。
- 这是生产者的示例代码,代码是对的,但我猜你第一次跑不起来,先把代码弄好,咱接着往下看
func TestSend(t *testing.T) {
// make a writer that produces to topic-A, using the least-bytes distribution
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9091", "localhost:9092", "localhost:9093"),
Topic: "topic-A",
Balancer: &kafka.LeastBytes{
}, //Hash, RoundRobin, LeastBytes
RequiredAcks: kafka.RequireOne, //Leader写入成功即可
}
defer func() {
if err := w.Close(); err != nil {
log.Fatal("failed to close writer:", err)
}
}()
err := w.WriteMessages(context.Background(),
kafka.Message{
Key: []byte("Key-A"),
Value: []byte("nero!"),
},
kafka.Message{
Key: []byte("Key-B"),
Value: []byte("One!"),
},
kafka.Message{
Key: []byte("Key-C"),
Value: []byte("Two!"),
},
)
if err != nil {
log.Fatal("failed to write messages:", err)
}
}
- 这是消费者的示例代码,消费者组用着真舒服。你可以之后多开几个TestRead的进程试一试。
func TestRead(t *testing.T) {
// make a new reader that consumes from topic-A
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{
"localhost:9091", "localhost:9092", "localhost:9093"},
GroupID: "order",
Topic: "topic-A",
MaxBytes: 10e6, // 10MB
})
defer func() {
if err := r.Close(); err != nil {
log.Fatal("failed to close reader:", err)
}
}()
ctx := context.Background()
for {
m, err := r.FetchMessage(ctx)
if err != nil {
break
}
fmt.Printf("%v message at topic/partition/offset %v/%v/%v: %s = %s\n", time.Now(), m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
if err := r.CommitMessages(ctx, m); err != nil {
log.Fatal("failed to commit messages:", err)
}
}
}
go mod tidy拉一下依赖
这时你应该已经运行了生产者代码,你最终会发现一个报错:kafka1之类的连不上。(直接找答案的去配置host就行,有兴趣的可以看看我的心路历程)
你如果用docker pull过一些单节点kafka运行过,你就会发现,直接就能连上,很顺利。
那这里连不上的问题我们用排除法就能发现的集群的问题,在正常情况下,如果客户端首次连接到 Follower 节点来发送消息,Follower 节点不会处理这个写请求,而是会返回一个错误信息给客户端,告知客户端正确的 Leader 节点位置。
容器内使用kafka1,kafka2这些host,因为我配置了host,所以要在本机上也配置一下host。
- /etc/hosts配置如下:把这个追加到你的hosts文件中
172.17.0.1 kafka1
172.17.0.1 kafka2
172.17.0.1 kafka3
结束
应该没什么问题了,拜拜,有问题或者我哪里写漏了什么可以评论。