package kafka import ( "fmt" "git.aionnect.com/aionnect/go-common/utils" "git.aionnect.com/aionnect/go-common/utils/jsonutil" "git.aionnect.com/aionnect/go-common/utils/logger" "git.aionnect.com/aionnect/go-common/utils/mq" "github.com/Shopify/sarama" "github.com/spf13/viper" ) // kafka消息构造函数 func NewMsg(topic string, value interface{}) (msg *sarama.ProducerMessage, err error) { var bytes []byte if nil == value { bytes = []byte{} } else { bytes, err = jsonutil.Marshal(value) if nil != err { return } } msg = &sarama.ProducerMessage{ Topic: topic, Partition: int32(-1), // 用于指定partition,仅当采用NewManualPartitioner时生效,但不同topic的partition数不一,手工指定很容易出现越界错误,一般不实用 Key: sarama.StringEncoder(utils.NextId().String()), // 当采用NewHashPartitioner时,是根据Key的hash值选取partition Value: sarama.ByteEncoder(bytes), } return } // -------------------------------------------------------------------------------------------------------------------- // 生产者对象 type Producer struct { producer mq.IProducer log *logger.Logger } // 新建生产者 func NewProducer() *Producer { config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Partitioner = sarama.NewRandomPartitioner // 随机选取partition,还可以用NewRoundRobinPartitioner轮流选取,或者如前面的注释,可hash选取或手工指定 config.Producer.Return.Successes = true viper.SetDefault("kafka.brokers", []string{"127.0.0.1:9092"}) brokers := viper.GetStringSlice("kafka.brokers") return &Producer{ producer: NewSyncProducer(brokers, config), // 初始化同步或异步生产者 } } // 将底层类库的日志输出到指定日志记录器 func (p *Producer) SetLogger(log *logger.Logger) { if nil == log { return } p.log = log sarama.Logger = log } // 获取当前日志记录器 func (p *Producer) GetLogger() *logger.Logger { if nil == p.log { p.log = logger.New() } return p.log } // 发送单条消息 func (p *Producer) SendJSON(topic string, value interface{}) (result interface{}, err error) { return p.producer.SendJSON(topic, value) } // 批量发送消息 func (p *Producer) SendJSONs(messages []*mq.ProducerMessage) (err error) { return p.producer.SendJSONs(messages) } // 关闭 func (p *Producer) Close() error { return p.producer.Close() } // -------------------------------------------------------------------------------------------------------------------- // 同步生产者 type SyncProducer struct { sarama.SyncProducer } // 新建同步生产者 func NewSyncProducer(brokers []string, config *sarama.Config) *SyncProducer { producer, err := sarama.NewSyncProducer(brokers, config) if err != nil { panic(fmt.Sprintf("Failed to produce message: %s", err)) } return &SyncProducer{SyncProducer: producer} } // 同步生产者发送单条消息 func (p *SyncProducer) SendJSON(topic string, value interface{}) (result interface{}, err error) { if topic == "" { return "", nil } var msg *sarama.ProducerMessage msg, err = NewMsg(topic, value) if nil != err { return "", err } var partition int32 var offset int64 partition, offset, err = p.SendMessage(msg) if nil != err { return "", err } return fmt.Sprintf("partition=%d, offset=%d\n", partition, offset), nil } // 同步生产者批量发送消息 func (p *SyncProducer) SendJSONs(messages []*mq.ProducerMessage) (err error) { if nil == messages || len(messages) == 0 { return } var msgList []*sarama.ProducerMessage for i := 0; i < len(messages); i++ { if messages[i].Topic == "" { continue } var msg *sarama.ProducerMessage msg, err = NewMsg(messages[i].Topic, messages[i].Value) if nil != err { continue } msgList = append(msgList, msg) } return p.SendMessages(msgList) } // -------------------------------------------------------------------------------------------------------------------- // 异步生产者 type AsyncProducer struct { sarama.AsyncProducer } // 新建异步生产者 func NewAsyncProducer(brokers []string, config *sarama.Config) *AsyncProducer { producer, err := sarama.NewAsyncProducer(brokers, config) if err != nil { panic(fmt.Sprintf("Failed to produce message: %s", err)) } return &AsyncProducer{AsyncProducer: producer} } // 异步生产者发送单条消息 func (p *AsyncProducer) SendJSON(topic string, value interface{}) (result interface{}, err error) { if topic == "" { return "", nil } var msg *sarama.ProducerMessage msg, err = NewMsg(topic, value) if nil != err { return "", err } p.Input() <- msg return "success", nil } // 异步生产者批量发送消息 func (p *AsyncProducer) SendJSONs(messages []*mq.ProducerMessage) (err error) { if nil == messages || len(messages) == 0 { return } for i := 0; i < len(messages); i++ { if messages[i].Topic == "" { continue } var msg *sarama.ProducerMessage msg, err = NewMsg(messages[i].Topic, messages[i].Value) if nil != err { continue } p.Input() <- msg } return } // 注意异步生产者其实还有一个异步关闭的方法,且其需搭配下列结果处理代码使用 // 异步生消息生产者发送结果处理 //for { // select { // case suc := <-producer.Successes(): // fmt.Println("offset: ", suc.Offset, "timestamp: ", suc.Timestamp.String(), "partitions: ", suc.Partition) // case fail := <-producer.Errors(): // fmt.Println("err: ", fail.Err) // } //}