The most used go client of kafka should be sarama, but the old version of sarama did not support the consumption mode of consumer groups, so most people use sarama cluster.
Later, sarama supported the consumption mode of consumer groups, and sarama cluster stopped maintenance. However, there are few online analysis of sarama consumer groups, and the official example is very simple, so let's analyze it here.
1, Official sample
The official example is relatively simple:
1. Via Sarama Newconfig creates a configuration
2. Create a consumer group through NewConsumerGroup
3. Create a session of the consumer group through consumption. The third parameter of the function is the callback of the three stages of the session: Setup Cleanup and ConsumeClaim, which are called before the session is created, after the session ends and during the session survival (mainly message reading at this stage).
2, Question
1. When the specified topic does not exist in kafka, kafka will create a new topic. If you only want users to consume existing topics, how do you get the existing topics in kafka?
2. What is the calling process of setup and Cleanup? Under what circumstances will it be called?
3. Since it is a consumer group, how can I view which topic s and partition s a consumer in the group has?
4. How to use the specified offset to consume a topic?
5. How to realize exactly once consumption?
Note: the sample code used in the above test is part of the sample code written by yourself. See the end of the article for the complete sample code
3, Analysis
1. In sarama, the interface for obtaining Topics is in the Client interface, so you need to create a client through the NewClient interface, and then you can obtain all Topics in kafka through the client's Topics interface. However, the type used by the consumer group is ConsumerGroup. How to obtain this type? sarama provides the NewConsumerGroupFromClient interface, which can create a ConsumerGroup from an existing client. Therefore, the modified process is directly created from the original NewConsumerGroup to:
a. Create a client using NewClient
b. Create a ConsumerGroup using NewConsumerGroupFromClient.
The specific code is as follows:
// Create client newClient, err := sarama.NewClient(brokers, config) if err != nil { log.Fatal(err) } // Get all topic s topics, err := newClient.Topics() if err != nil { log.Fatal(err) } log.Info("topics: ", topics) // Create a consumerGroup based on the client client, err := sarama.NewConsumerGroupFromClient(k.group, newClient) if err != nil { log.Fatalf("Error creating consumer group client: %v", err) }
The advantage of this is that you can use the Client interface to obtain some information, such as the current configuration of kafka, the controller s, the brokers, the total number of topics, the partitions of a specific topic, and the current offset of the partition. For specific functions, see the Client interface:
type Client interface { // Config returns the Config struct of the client. This struct should not be // altered after it has been created. Config() *Config // Controller returns the cluster controller broker. It will return a // locally cached value if it's available. You can call RefreshController // to update the cached value. Requires Kafka 0.10 or higher. Controller() (*Broker, error) // RefreshController retrieves the cluster controller from fresh metadata // and stores it in the local cache. Requires Kafka 0.10 or higher. RefreshController() (*Broker, error) // Brokers returns the current set of active brokers as retrieved from cluster metadata. Brokers() []*Broker ...... }
2. Setup, Cleanup, and ConsumeClaim are s.handler The three interfaces of consumclaim need to be implemented by users themselves. It can be simply understood as: when you need to create a session, run setup first, then process the message in ConsumerClaim, and finally run Cleanup.
setup runs before a new session starts and before the ConsumerClaim interface. The calling process is: Consume - > newsession - > newconsumergroupsession - > handler setup .
After calling Setup, a collaboration will be created later. In this collaboration, the ConsumerClaim interface is actually called. Therefore, the ConsumerClaim we implement is actually a separate collaboration. Its calling process is: Consume - > newsession - > newconsumergroupsession - > Consume - > s.handler ConsumeClaim .
Cleanup runs at the end of a session. The calling process is: Consume - > release - > s.handler Cleanup .
After knowing the calling process, what will call them? - > 1. When creating a consumeGroup. 2. When rebalance happens.
We can add a print in setup and cleanup:
func (k *Kafka) Setup(session sarama.ConsumerGroupSession) error { log.Info("setup") close(k.ready) return nil } func (k *Kafka) Cleanup(sarama.ConsumerGroupSession) error { log.Info("cleanup") return nil }
Then start a consumer to observe the printing:
INFO[0000] setup
Then press Ctrl + C to close the consumer, and you can observe the printing:
INFO[0101] cleanup
Description when you create a new consumer and exit, setup and cleanup will be called.
Let's try the rebalance again: start a consumer first, and then start a consumer of the same group. You can see that the print is:
First started consumer Print as: INFO[0000] setup INFO[0006] cleanup INFO[0006] setup The second one started consumer Print as: INFO[0002] setup
It indicates that in case of rebrance, the original session will be closed first, cleanup will be called, then setup will be called, and finally a new session will be generated.
3. In the ConsumerGroupSession interface, there is a Claims interface, which can be used to view which topic s and partition s the current consumer is assigned to. We can print in the Setup interface:
func (k *Kafka) Setup(session sarama.ConsumerGroupSession) error { log.Info("setup") log.Info(session.Claims()) close(k.ready) return nil }
The range partition strategy is used here. The subscribed topics are t1p4 and t2p4. Each topic has four partitions, and then three consumer s are created. The resulting print is:
consumer1: INFO[0000] setup INFO[0000] map[t1p4:[0 1 2 3] t2p4:[0 1 2 3]] INFO[0009] cleanup INFO[0009] setup INFO[0009] map[t1p4:[0 1] t2p4:[2 3]] INFO[0015] cleanup INFO[0015] setup INFO[0015] map[t1p4:[0] t2p4:[3]] consumer2: INFO[0002] setup INFO[0002] map[t1p4:[2 3] t2p4:[0 1]] INFO[0009] cleanup INFO[0009] setup INFO[0009] map[t1p4:[1 2] t2p4:[0]] consumer3: INFO[0000] setup INFO[0000] map[t1p4:[3] t2p4:[1 2]]
When there is only consumer 1, it is allocated to all partitions: t1p4:[0 1 2 3] t2p4:[0 1 2 3].
When consumer2 joins, consumer1 is allocated t1p4:[0 1] t2p4:[2 3], and consumer2 is allocated t1p4:[2 3] t2p4:[0 1].
When consumer3 joins, consumert1 is allocated t1p4:[0] t2p4:[3], consumer2 is allocated t1p4:[1 2] t2p4:[0], and consumer3 is allocated t1p4:[3] t2p4:[1 2].
If you are interested, you can delete consumer1 and consumer2 in turn.
4. In kafka's config configuration, there are only two offsets specified for consumption: OffsetNewest and OffsetOldest. What should I do if I want to specify offset for consumption?
As mentioned earlier, Setup runs at the beginning of the session, and all topics and partitions can be obtained at this time. Therefore, the ResetOffset interface of ConsumerGroupSession can be used for setting. The specific implementation is as follows: (the topic used here: t2p4 already exists, and the offset in 0 partition has reached 18)
func (k *Kafka) Setup(session sarama.ConsumerGroupSession) error { log.Info("setup") session.ResetOffset("t2p4", 0, 13, "") log.Info(session.Claims()) close(k.ready) return nil } func (k *Kafka) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for message := range claim.Messages() { log.Infof("[topic:%s] [partiton:%d] [offset:%d] [value:%s] [time:%v]", message.Topic, message.Partition, message.Offset, string(message.Value), message.Timestamp) session.MarkMessage(message, "") } return nil }
At this time, messages between 13 and 18 can be consumed no matter how many times they are run:
INFO[0000] setup INFO[0000] map[t1p4:[0 1 2 3] t2p4:[0 1 2 3]] INFO[0000] [topic:t2p4] [partiton:0] [offset:13] [value:a] [time:2021-10-12 23:02:35.058 -0400 EDT] INFO[0000] [topic:t2p4] [partiton:0] [offset:14] [value:b] [time:2021-10-12 23:02:35.087 -0400 EDT] INFO[0000] [topic:t2p4] [partiton:0] [offset:15] [value:c] [time:2021-10-12 23:02:35.092 -0400 EDT] INFO[0000] [topic:t2p4] [partiton:0] [offset:16] [value:d] [time:2021-10-12 23:03:18.882 -0400 EDT] INFO[0000] [topic:t2p4] [partiton:0] [offset:17] [value:e] [time:2021-10-12 23:03:18.898 -0400 EDT] INFO[0000] [topic:t2p4] [partiton:0] [offset:18] [value:f] [time:2021-10-12 23:03:18.903 -0400 EDT]
5. After analyzing the calling process of Setup and what can be done in Setup, you can manually record the offset of topic to disk (such as text, database, etc.), read the previously recorded offset in the Setup interface, and reset it through the ResetOffset interface. Of course, the consistency between updating offset and message processing needs to be guaranteed by the business itself (for example, using database transactions).
4, Complete sample code
package main import ( "context" "os" "os/signal" "sync" "syscall" "github.com/Shopify/sarama" log "github.com/sirupsen/logrus" ) type Kafka struct { brokers []string topics []string startOffset int64 version string ready chan bool group string channelBufferSize int assignor string } var brokers = []string{"192.168.1.101:9092"} var topics = []string{"t1p4", "t2p4"} var group = "grp1" var assignor = "range" func NewKafka() *Kafka { return &Kafka{ brokers: brokers, topics: topics, group: group, channelBufferSize: 1000, ready: make(chan bool), version: "2.8.0", assignor: assignor, } } func (k *Kafka) Connect() func() { log.Infoln("kafka init...") version, err := sarama.ParseKafkaVersion(k.version) if err != nil { log.Fatalf("Error parsing Kafka version: %v", err) } config := sarama.NewConfig() config.Version = version // Partition allocation policy switch assignor { case "sticky": config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky case "roundrobin": config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin case "range": config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange default: log.Panicf("Unrecognized consumer group partition assignor: %s", assignor) } config.Consumer.Offsets.Initial = sarama.OffsetNewest config.ChannelBufferSize = k.channelBufferSize // channel length // Create client newClient, err := sarama.NewClient(brokers, config) if err != nil { log.Fatal(err) } // Get all topic s topics, err := newClient.Topics() if err != nil { log.Fatal(err) } log.Info("topics: ", topics) // Create a consumerGroup based on the client client, err := sarama.NewConsumerGroupFromClient(k.group, newClient) if err != nil { log.Fatalf("Error creating consumer group client: %v", err) } ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() for { if err := client.Consume(ctx, k.topics, k); err != nil { // When setup fails, error will be returned here log.Errorf("Error from consumer: %v", err) return } // check if context was cancelled, signaling that the consumer should stop if ctx.Err() != nil { log.Println(ctx.Err()) return } k.ready = make(chan bool) } }() <-k.ready log.Infoln("Sarama consumer up and running!...") // Ensure that the messages in the channel are consumed when the system exits return func() { log.Info("kafka close") cancel() wg.Wait() if err = client.Close(); err != nil { log.Errorf("Error closing client: %v", err) } } } // Setup is run at the beginning of a new session, before ConsumeClaim func (k *Kafka) Setup(session sarama.ConsumerGroupSession) error { log.Info("setup") session.ResetOffset("t2p4", 0, 13, "") log.Info(session.Claims()) // Mark the consumer as ready close(k.ready) return nil } // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited func (k *Kafka) Cleanup(sarama.ConsumerGroupSession) error { log.Info("cleanup") return nil } // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). func (k *Kafka) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // NOTE: // Do not move the code below to a goroutine. // The `ConsumeClaim` itself is called within a goroutine, see: // <https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29> // Specific consumption news for message := range claim.Messages() { log.Infof("[topic:%s] [partiton:%d] [offset:%d] [value:%s] [time:%v]", message.Topic, message.Partition, message.Offset, string(message.Value), message.Timestamp) // Update displacement session.MarkMessage(message, "") } return nil } func main() { k := NewKafka() c := k.Connect() sigterm := make(chan os.Signal, 1) signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) select { case <-sigterm: log.Warnln("terminating: via signal") } c() }
5, Application in dropper ADB products
ADB products are used as real-time data warehouses. An important function IS to import data in real time. This function IS mainly realized by IS components under ADB products. The main processes are as follows:
1. Collect source data from various data sources.
2. flink processes these source data in real time and writes them to kafka.
3. The IS component reads kafka and writes the data into the data warehouse in real time through the parallel data import protocol.
The IS component uses kafka's consumerGroup method for consumption, which can horizontally expand the consumption capacity of IS according to the size of the data volume, and use the transactions of the data warehouse to realize the exactly once of the imported data to complete the end-to-end data delivery.