Переглянути джерело

添加消息队列支持模块

marion 5 роки тому
батько
коміт
69dcd15aaf

+ 5 - 0
utils/mq/consumer_message.go

@@ -0,0 +1,5 @@
+package mq
+
+type ConsumerMessage struct {
+	Value interface{}
+}

+ 9 - 0
utils/mq/i_consumer.go

@@ -0,0 +1,9 @@
+package mq
+
+import "io"
+
+type IConsumer interface {
+	io.Closer
+	BytesMessages() <-chan []byte     // 返回消息内容管道
+	BindJSONChan(channel interface{}) // 绑定JSON管道,输出已经过反序列化的对象
+}

+ 9 - 0
utils/mq/i_producer.go

@@ -0,0 +1,9 @@
+package mq
+
+import "io"
+
+type IProducer interface {
+	io.Closer
+	SendJSON(topic string, value interface{}) (interface{}, error) // 发布JSON消息
+	SendJSONs(messages []*ProducerMessage) error                   // 批量发布JSON消息
+}

+ 92 - 0
utils/mq/kafka/consumer.go

@@ -0,0 +1,92 @@
+package kafka
+
+import (
+	"encoding/json"
+	"fmt"
+	"git.haoqitour.com/haoqi/go-common/utils/logger"
+	"github.com/Shopify/sarama"
+	"github.com/bsm/sarama-cluster"
+	"github.com/spf13/viper"
+	"reflect"
+)
+
+// 消费者对象
+type Consumer struct {
+	*cluster.Consumer
+	log *logger.Logger
+}
+
+// 新建消费者
+func NewConsumer(groupID string, topics []string) *Consumer {
+	config := cluster.NewConfig()
+	config.Consumer.Return.Errors = true
+	config.Group.Return.Notifications = false
+
+	viper.SetDefault("kafka.brokers", []string{"127.0.0.1:9092"})
+	brokers := viper.GetStringSlice("kafka.brokers")
+
+	consumer, err := cluster.NewConsumer(brokers, groupID, topics, config)
+	if err != nil {
+		panic(fmt.Sprintf("Failed to start consumer: %s", err))
+	}
+	return &Consumer{Consumer: consumer}
+}
+
+// 将底层类库的日志输出到指定日志记录器
+func (c *Consumer) SetLogger(log *logger.Logger) {
+	if nil == log {
+		return
+	}
+	c.log = log
+	sarama.Logger = log
+}
+
+// 获取当前日志记录器
+func (c *Consumer) GetLogger() *logger.Logger {
+	if nil == c.log {
+		c.log = logger.New()
+	}
+	return c.log
+}
+
+// 消息读取管道,管道消息类型是byte切片
+func (c *Consumer) BytesMessages() <-chan []byte {
+	ch := make(chan []byte, 0)
+	go func(c *Consumer, ch chan []byte, oc <-chan *sarama.ConsumerMessage) {
+		for msg := range oc {
+			ch <- msg.Value
+			c.MarkOffset(msg, "") // mark message as processed
+		}
+	}(c, ch, c.Consumer.Messages())
+	return ch
+}
+
+// 将消息输出绑定到指定管道上,此方法内会进行反序列化,输出的消息类型可以是指定对象类型
+func (c *Consumer) BindJSONChan(channel interface{}) {
+	go func(c *Consumer, channel interface{}) {
+		chVal := reflect.ValueOf(channel)
+		if chVal.Kind() != reflect.Chan {
+			return
+		}
+		argType := chVal.Type().Elem()
+		for {
+			select {
+			case msg := <-c.Messages():
+				var oPtr reflect.Value
+				if nil != msg && nil != msg.Value && len(msg.Value) > 0 && string(msg.Value) != "" {
+					if argType.Kind() != reflect.Ptr {
+						oPtr = reflect.New(argType)
+					} else {
+						oPtr = reflect.New(argType.Elem())
+					}
+					_ = json.Unmarshal(msg.Value, oPtr.Interface())
+					if argType.Kind() != reflect.Ptr {
+						oPtr = reflect.Indirect(oPtr)
+					}
+				}
+				chVal.Send(oPtr)
+				c.MarkOffset(msg, "") // mark message as processed
+			}
+		}
+	}(c, channel)
+}

+ 98 - 0
utils/mq/kafka/consumer_test.go

@@ -0,0 +1,98 @@
+package kafka
+
+import (
+	"fmt"
+	"git.haoqitour.com/haoqi/go-common/utils"
+	"git.haoqitour.com/haoqi/go-common/utils/mq"
+	"git.haoqitour.com/haoqi/go-common/utils/mq/topic"
+	"sync"
+	"testing"
+)
+
+var wg sync.WaitGroup
+
+func TestNewConsumer(t *testing.T) {
+	// 以下示例代码是模拟中断延时处理定时器需求实现,逻辑较复杂,如果只关心kafka使用本身,仅关注加了序号注释的部位即可
+
+	// 1. 新建单播消费者
+	// 初始化增加定时任务消息消费者,相同group名单播消费消息
+	// kafka的单播分配到哪一个消费者与topic的partition配置密切相关,当partition数小于消费者数量时,会有部分消费者始终无法获得消息
+	// 可以用 ./kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic topic-name 命令查看topic信息
+	// 可以用 ./kafka-topics.sh --alter --topic topic-name --zookeeper 127.0.0.1:2181 --partitions 4 命令调整partition数量
+	// 调整完partition数量,需要重启或重置消费者,以重新分配消费者与partition的对应关系
+	// 另外注意本示例中用的topic.Order方法是有给topic加上一个前缀的
+	plusTopics := []string{topic.TOP("ka-alloc-plus-job")}
+	plusConsumer := NewConsumer("ka-alloc-plus-job-group", plusTopics)
+	defer func(plusConsumer *Consumer) {
+		_ = plusConsumer.Close()
+	}(plusConsumer)
+	// 2. 如果需要可以处理异常
+	go func() {
+		for err := range plusConsumer.Errors() {
+			t.Errorf("Error: %s\n", err.Error())
+		}
+	}()
+
+	// 1. 新建广播消费者
+	// 初始化减少定时任务消息消费者,不同group名广播消费消息
+	reduceTopics := []string{topic.TOP("ka-alloc-reduce-job")}
+	// 正式编码推荐用 utils.GetPrivateIPv4Id() 代替 utils.NextId() 来获取机器号
+	// utils.GetPrivateIPv4Id() 这个函数会根据当前机器内网ip的末尾两段运算出一个id,即测试和生产环境不同Pod的ip不同这个id也会不同
+	machineId := utils.NextId()
+	reduceConsumer := NewConsumer(fmt.Sprintf("ka-alloc-reduce-job-group-%d", machineId), reduceTopics)
+	defer func(reduceConsumer *Consumer) {
+		_ = reduceConsumer.Close()
+	}(reduceConsumer)
+	// 2. 如果需要可以处理异常
+	go func() {
+		for err := range reduceConsumer.Errors() {
+			t.Errorf("Error: %s\n", err.Error())
+		}
+	}()
+
+	// 消费消息
+	//wg.Add(1)
+	//go func() {
+	//	defer wg.Done()
+	//	for msg := range plusConsumer.Messages() { // 3. 在协程中循环阻塞取消息管道中的消息
+	//		fmt.Printf( "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value) // msg.Value如果是Json可以反序列化
+	//		plusConsumer.MarkOffset(msg, "") // mark message as processed
+	//	}
+	//}()
+	//wg.Add(1)
+	//go func() {
+	//	defer wg.Done()
+	//	for msg := range reduceConsumer.Messages() { // 3. 在协程中循环阻塞取消息管道中的消息
+	//		fmt.Printf( "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value) // msg.Value如果是Json可以反序列化
+	//		reduceConsumer.MarkOffset(msg, "") // mark message as processed
+	//	}
+	//}()
+
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		var plusCh = make(chan *mq.TestMsg, 0)
+		plusConsumer.BindJSONChan(plusCh)
+		for { // 3. 在协程中循环阻塞取消息管道中的消息
+			select {
+			case msg := <-plusCh:
+				fmt.Printf("%v\n", msg) // msg已经是反序列化得到的对象
+			}
+		}
+	}()
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		var reduceCh = make(chan *mq.TestMsg, 0)
+		reduceConsumer.BindJSONChan(reduceCh)
+		for { // 3. 在协程中循环阻塞取消息管道中的消息
+			select {
+			case msg := <-reduceCh:
+				fmt.Printf("%v\n", msg) // msg已经是反序列化得到的对象
+			}
+		}
+	}()
+
+	wg.Wait()
+	t.Log("Done consuming topic")
+}

+ 211 - 0
utils/mq/kafka/producer.go

@@ -0,0 +1,211 @@
+package kafka
+
+import (
+	"encoding/json"
+	"fmt"
+	"git.haoqitour.com/haoqi/go-common/utils"
+	"git.haoqitour.com/haoqi/go-common/utils/logger"
+	"git.haoqitour.com/haoqi/go-common/utils/mq"
+	"github.com/Shopify/sarama"
+	"github.com/spf13/viper"
+)
+
+// kafka消息构造函数
+func NewMsg(topic string, value interface{}) (msg *sarama.ProducerMessage, err error) {
+	var bytes []byte
+	if nil == value {
+		bytes = []byte{}
+	} else {
+		bytes, err = json.Marshal(value)
+		if nil != err {
+			return
+		}
+	}
+	msg = &sarama.ProducerMessage{
+		Topic:     topic,
+		Partition: int32(-1),                                     // 用于指定partition,仅当采用NewManualPartitioner时生效,但不同topic的partition数不一,手工指定很容易出现越界错误,一般不实用
+		Key:       sarama.StringEncoder(utils.NextId().String()), // 当采用NewHashPartitioner时,是根据Key的hash值选取partition
+		Value:     sarama.ByteEncoder(bytes),
+	}
+	return
+}
+
+// --------------------------------------------------------------------------------------------------------------------
+
+// 生产者对象
+type Producer struct {
+	producer mq.IProducer
+	log      *logger.Logger
+}
+
+// 新建生产者
+func NewProducer() *Producer {
+	config := sarama.NewConfig()
+	config.Producer.RequiredAcks = sarama.WaitForAll
+	config.Producer.Partitioner = sarama.NewRandomPartitioner // 随机选取partition,还可以用NewRoundRobinPartitioner轮流选取,或者如前面的注释,可hash选取或手工指定
+	config.Producer.Return.Successes = true
+
+	viper.SetDefault("kafka.brokers", []string{"127.0.0.1:9092"})
+	brokers := viper.GetStringSlice("kafka.brokers")
+
+	return &Producer{
+		producer: NewSyncProducer(brokers, config), // 初始化同步或异步生产者
+	}
+}
+
+// 将底层类库的日志输出到指定日志记录器
+func (p *Producer) SetLogger(log *logger.Logger) {
+	if nil == log {
+		return
+	}
+	p.log = log
+	sarama.Logger = log
+}
+
+// 获取当前日志记录器
+func (p *Producer) GetLogger() *logger.Logger {
+	if nil == p.log {
+		p.log = logger.New()
+	}
+	return p.log
+}
+
+// 发送单条消息
+func (p *Producer) SendJSON(topic string, value interface{}) (result interface{}, err error) {
+	return p.producer.SendJSON(topic, value)
+}
+
+// 批量发送消息
+func (p *Producer) SendJSONs(messages []*mq.ProducerMessage) (err error) {
+	return p.producer.SendJSONs(messages)
+}
+
+// 关闭
+func (p *Producer) Close() error {
+	return p.producer.Close()
+}
+
+// --------------------------------------------------------------------------------------------------------------------
+
+// 同步生产者
+type SyncProducer struct {
+	sarama.SyncProducer
+}
+
+// 新建同步生产者
+func NewSyncProducer(brokers []string, config *sarama.Config) *SyncProducer {
+	producer, err := sarama.NewSyncProducer(brokers, config)
+	if err != nil {
+		panic(fmt.Sprintf("Failed to produce message: %s", err))
+	}
+	return &SyncProducer{SyncProducer: producer}
+}
+
+// 同步生产者发送单条消息
+func (p *SyncProducer) SendJSON(topic string, value interface{}) (result interface{}, err error) {
+	if topic == "" {
+		return "", nil
+	}
+
+	var msg *sarama.ProducerMessage
+	msg, err = NewMsg(topic, value)
+	if nil != err {
+		return "", err
+	}
+
+	var partition int32
+	var offset int64
+	partition, offset, err = p.SendMessage(msg)
+	if nil != err {
+		return "", err
+	}
+	return fmt.Sprintf("partition=%d, offset=%d\n", partition, offset), nil
+}
+
+// 同步生产者批量发送消息
+func (p *SyncProducer) SendJSONs(messages []*mq.ProducerMessage) (err error) {
+	if nil == messages || len(messages) == 0 {
+		return
+	}
+	var msgList []*sarama.ProducerMessage
+	for i := 0; i < len(messages); i++ {
+		if messages[i].Topic == "" {
+			continue
+		}
+
+		var msg *sarama.ProducerMessage
+		msg, err = NewMsg(messages[i].Topic, messages[i].Value)
+		if nil != err {
+			continue
+		}
+
+		msgList = append(msgList, msg)
+	}
+
+	return p.SendMessages(msgList)
+}
+
+// --------------------------------------------------------------------------------------------------------------------
+
+// 异步生产者
+type AsyncProducer struct {
+	sarama.AsyncProducer
+}
+
+// 新建异步生产者
+func NewAsyncProducer(brokers []string, config *sarama.Config) *AsyncProducer {
+	producer, err := sarama.NewAsyncProducer(brokers, config)
+	if err != nil {
+		panic(fmt.Sprintf("Failed to produce message: %s", err))
+	}
+	return &AsyncProducer{AsyncProducer: producer}
+}
+
+// 异步生产者发送单条消息
+func (p *AsyncProducer) SendJSON(topic string, value interface{}) (result interface{}, err error) {
+	if topic == "" {
+		return "", nil
+	}
+
+	var msg *sarama.ProducerMessage
+	msg, err = NewMsg(topic, value)
+	if nil != err {
+		return "", err
+	}
+
+	p.Input() <- msg
+	return "success", nil
+}
+
+// 异步生产者批量发送消息
+func (p *AsyncProducer) SendJSONs(messages []*mq.ProducerMessage) (err error) {
+	if nil == messages || len(messages) == 0 {
+		return
+	}
+	for i := 0; i < len(messages); i++ {
+		if messages[i].Topic == "" {
+			continue
+		}
+
+		var msg *sarama.ProducerMessage
+		msg, err = NewMsg(messages[i].Topic, messages[i].Value)
+		if nil != err {
+			continue
+		}
+
+		p.Input() <- msg
+	}
+
+	return
+}
+
+// 注意异步生产者其实还有一个异步关闭的方法,且其需搭配下列结果处理代码使用
+// 异步生消息生产者发送结果处理
+//for {
+//	select {
+//	case suc := <-producer.Successes():
+//		fmt.Println("offset: ", suc.Offset, "timestamp: ", suc.Timestamp.String(), "partitions: ", suc.Partition)
+//	case fail := <-producer.Errors():
+//		fmt.Println("err: ", fail.Err)
+//	}
+//}

+ 70 - 0
utils/mq/kafka/producer_test.go

@@ -0,0 +1,70 @@
+package kafka
+
+import (
+	"git.haoqitour.com/haoqi/go-common/utils/date"
+	"git.haoqitour.com/haoqi/go-common/utils/mq"
+	"git.haoqitour.com/haoqi/go-common/utils/mq/topic"
+	"testing"
+)
+
+func TestProducer_SendJSON(t *testing.T) {
+	// 以下示例代码是模拟中断延时处理定时器需求实现,逻辑较复杂,如果只关心kafka使用本身,仅关注加了序号注释的部位即可
+
+	// 1. 新建生产者
+	producer := NewProducer()
+	defer func(producer *Producer) {
+		_ = producer.Close()
+	}(producer)
+
+	plusMsg := &mq.TestMsg{Message: "你好, 世界++!", Time: date.Now()}
+	plusTopic := topic.TOP("ka-alloc-plus-job")
+	reduceMsg := &mq.TestMsg{Message: "你好, 世界--!", Time: date.Now()}
+	reduceTopic := topic.TOP("ka-alloc-reduce-job")
+	var plusMsgList []*mq.ProducerMessage
+	var reduceMsgList []*mq.ProducerMessage
+	for i := 0; i < 5; i++ {
+		plusMsgList = append(plusMsgList, &mq.ProducerMessage{
+			Topic: plusTopic,
+			Value: plusMsg,
+		})
+		result, err := producer.SendJSON(plusTopic, plusMsg) // 2. 发单条
+		if err != nil {
+			t.Error("Failed to produce message: ", err)
+		}
+		if nil != result {
+			t.Log(result.(string))
+		}
+
+		reduceMsgList = append(reduceMsgList, &mq.ProducerMessage{
+			Topic: reduceTopic,
+			Value: reduceMsg,
+		})
+		result, err = producer.SendJSON(reduceTopic, reduceMsg) // 2. 发单条
+		if err != nil {
+			t.Error("Failed to produce message: ", err)
+		}
+		if nil != result {
+			t.Log(result.(string))
+		}
+	}
+
+	// 异步生消息生产者的发送结果处理
+	//for i := 0; i < 5; i++ {
+	//	select {
+	//	case suc := <-producer.Successes():
+	//		fmt.Println("offset: ", suc.Offset, "timestamp: ", suc.Timestamp.String(), "partitions: ", suc.Partition)
+	//	case fail := <-producer.Errors():
+	//		fmt.Println("err: ", fail.Err)
+	//	}
+	//}
+
+	err := producer.SendJSONs(plusMsgList)
+	if err != nil {
+		t.Error("Failed to produce message: ", err) // 3. 发多条
+	}
+
+	err = producer.SendJSONs(reduceMsgList)
+	if err != nil {
+		t.Error("Failed to produce message: ", err) // 3. 发多条
+	}
+}

+ 83 - 0
utils/mq/nats/consumer.go

@@ -0,0 +1,83 @@
+package nats
+
+import (
+	"git.haoqitour.com/haoqi/go-common/utils/logger"
+	"github.com/nats-io/go-nats"
+	"github.com/spf13/viper"
+	"time"
+)
+
+// 消费者对象
+type Consumer struct {
+	conn  *nats.EncodedConn
+	queue string
+	topic string
+	log   *logger.Logger
+}
+
+// 新建消费者
+func NewConsumer(queue string, topic string) *Consumer {
+	viper.SetDefault("nats.servers", "nats://127.0.0.1:4222")
+	servers := viper.GetString("nats.servers")
+
+	var consumer *Consumer
+	nc, err := nats.Connect(servers, nats.MaxReconnects(5), nats.ReconnectWait(2*time.Second),
+		nats.DisconnectHandler(func(nc *nats.Conn) {
+			consumer.GetLogger().Info("Got disconnected!\n")
+		}),
+		nats.ReconnectHandler(func(nc *nats.Conn) {
+			consumer.GetLogger().Infof("Got reconnected to %v!\n", nc.ConnectedUrl())
+		}),
+		nats.ClosedHandler(func(nc *nats.Conn) {
+			consumer.GetLogger().Infof("Connection closed. Reason: %q\n", nc.LastError())
+		}),
+	)
+	if nil != err {
+		panic(err)
+	}
+	c, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
+	consumer = &Consumer{conn: c, queue: queue, topic: topic}
+	return consumer
+}
+
+// 指定日志记录器
+func (c *Consumer) SetLogger(log *logger.Logger) {
+	if nil == log {
+		return
+	}
+	c.log = log
+}
+
+// 获取当前日志记录器
+func (c *Consumer) GetLogger() *logger.Logger {
+	if nil == c.log {
+		c.log = logger.New()
+	}
+	return c.log
+}
+
+// 消息读取管道,管道消息类型是byte切片
+func (c *Consumer) BytesMessages() <-chan []byte {
+	var ch = make(chan []byte, 0)
+	if c.queue == "" {
+		_, _ = c.conn.BindRecvChan(c.topic, ch) // 广播
+	} else {
+		_, _ = c.conn.BindRecvQueueChan(c.topic, c.queue, ch) // 单播
+	}
+	return ch
+}
+
+// 将消息输出绑定到指定管道上,此方法内会进行反序列化,输出的消息类型可以是指定对象类型
+func (c *Consumer) BindJSONChan(channel interface{}) {
+	if c.queue == "" {
+		_, _ = c.conn.BindRecvChan(c.topic, channel) // 广播
+	} else {
+		_, _ = c.conn.BindRecvQueueChan(c.topic, c.queue, channel) // 单播
+	}
+}
+
+// 关闭
+func (c *Consumer) Close() error {
+	c.conn.Close()
+	return nil
+}

+ 57 - 0
utils/mq/nats/consumer_test.go

@@ -0,0 +1,57 @@
+package nats
+
+import (
+	"fmt"
+	"git.haoqitour.com/haoqi/go-common/utils/mq"
+	"git.haoqitour.com/haoqi/go-common/utils/mq/topic"
+	"sync"
+	"testing"
+)
+
+var wg sync.WaitGroup
+
+func TestNewConsumer(t *testing.T) {
+	// queue为空,广播
+	consumer1 := NewConsumer("", topic.TOP("test-subject"))
+	defer func(consumer1 *Consumer) {
+		_ = consumer1.Close()
+	}(consumer1)
+	// queue非空,单播
+	consumer2 := NewConsumer("testQueue", topic.TOP("test-subject"))
+	defer func(consumer2 *Consumer) {
+		_ = consumer2.Close()
+	}(consumer2)
+
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		i1 := 0
+		var ch1 = make(chan *mq.TestMsg, 0)
+		consumer1.BindJSONChan(ch1)
+		for {
+			select {
+			case msg1 := <-ch1:
+				i1++
+				fmt.Printf("1-%d: Received a broadcast message: %s\n", i1, msg1)
+			}
+		}
+	}()
+
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		i2 := 0
+		var ch2 = make(chan *mq.TestMsg, 0)
+		consumer2.BindJSONChan(ch2)
+		for {
+			select {
+			case msg2 := <-ch2:
+				i2++
+				fmt.Printf("2-%d: Received a message: %s\n", i2, msg2)
+			}
+		}
+	}()
+
+	wg.Wait()
+	t.Log("Done consuming topic")
+}

+ 90 - 0
utils/mq/nats/producer.go

@@ -0,0 +1,90 @@
+package nats
+
+import (
+	"git.haoqitour.com/haoqi/go-common/utils/logger"
+	"git.haoqitour.com/haoqi/go-common/utils/mq"
+	"github.com/nats-io/go-nats"
+	"github.com/spf13/viper"
+	"time"
+)
+
+// 生产者对象
+type Producer struct {
+	conn *nats.EncodedConn
+	log  *logger.Logger
+}
+
+// 新建生产者
+func NewProducer() *Producer {
+	viper.SetDefault("nats.servers", "nats://127.0.0.1:4222")
+	servers := viper.GetString("nats.servers")
+
+	var producer *Producer
+	nc, err := nats.Connect(servers, nats.MaxReconnects(5), nats.ReconnectWait(2*time.Second),
+		nats.DisconnectHandler(func(nc *nats.Conn) {
+			producer.GetLogger().Info("Got disconnected!\n")
+		}),
+		nats.ReconnectHandler(func(nc *nats.Conn) {
+			producer.GetLogger().Infof("Got reconnected to %v!\n", nc.ConnectedUrl())
+		}),
+		nats.ClosedHandler(func(nc *nats.Conn) {
+			producer.GetLogger().Infof("Connection closed. Reason: %q\n", nc.LastError())
+		}),
+	)
+	if nil != err {
+		panic(err)
+	}
+	c, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
+	producer = &Producer{conn: c}
+	return producer
+}
+
+// 指定日志记录器
+func (p *Producer) SetLogger(log *logger.Logger) {
+	if nil == log {
+		return
+	}
+	p.log = log
+}
+
+// 获取当前日志记录器
+func (p *Producer) GetLogger() *logger.Logger {
+	if nil == p.log {
+		p.log = logger.New()
+	}
+	return p.log
+}
+
+// 发送单条消息
+func (p *Producer) SendJSON(topic string, value interface{}) (result interface{}, err error) {
+	if topic == "" {
+		return "", nil
+	}
+
+	err = p.conn.Publish(topic, value)
+	if nil != err {
+		return "", err
+	}
+	return "success", nil
+}
+
+// 批量发送消息
+func (p *Producer) SendJSONs(messages []*mq.ProducerMessage) (err error) {
+	if nil == messages || len(messages) == 0 {
+		return
+	}
+	for i := 0; i < len(messages); i++ {
+		if messages[i].Topic == "" {
+			continue
+		}
+
+		_ = p.conn.Publish(messages[i].Topic, messages[i].Value)
+	}
+	return
+}
+
+// 关闭
+func (p *Producer) Close() error {
+	p.conn.Close()
+	return nil
+}

+ 25 - 0
utils/mq/nats/producer_test.go

@@ -0,0 +1,25 @@
+package nats
+
+import (
+	"git.haoqitour.com/haoqi/go-common/utils/date"
+	"git.haoqitour.com/haoqi/go-common/utils/mq"
+	"git.haoqitour.com/haoqi/go-common/utils/mq/topic"
+	"testing"
+)
+
+func TestProducer_SendJSON(t *testing.T) {
+	producer := NewProducer()
+	defer func(producer *Producer) {
+		_ = producer.Close()
+	}(producer)
+
+	msg := &mq.TestMsg{Message: "你好, 世界!", Time: date.Now()}
+	tpc := topic.TOP("test-subject")
+	for i := 0; i < 5; i++ {
+		result, err := producer.SendJSON(tpc, msg)
+		if err != nil {
+			t.Error("Failed to produce message: ", err)
+		}
+		t.Logf(result.(string))
+	}
+}

+ 6 - 0
utils/mq/producer_message.go

@@ -0,0 +1,6 @@
+package mq
+
+type ProducerMessage struct {
+	Topic string
+	Value interface{}
+}

+ 8 - 0
utils/mq/test_message.go

@@ -0,0 +1,8 @@
+package mq
+
+import "git.haoqitour.com/haoqi/go-common/utils/date"
+
+type TestMsg struct {
+	Message string        `json:"message"`
+	Time    date.Datetime `json:"time"`
+}

+ 22 - 0
utils/mq/topic/micro_topic.go

@@ -0,0 +1,22 @@
+package topic
+
+import (
+	"fmt"
+	"strings"
+)
+
+const (
+	TOPPrefix = "go-top"
+)
+
+func TOP(path string) string {
+	return joinPath(TOPPrefix, path)
+}
+
+func joinPath(prefix string, path string) string {
+	if strings.TrimSpace(path) == "" {
+		return prefix
+	}
+	path = strings.TrimLeft(strings.TrimLeft(strings.TrimSpace(path), "/"), "\\")
+	return fmt.Sprintf("%s.%s", prefix, path)
+}