package kafka

import (
	"fmt"
	"git.aionnect.com/aionnect/go-common/utils"
	"git.aionnect.com/aionnect/go-common/utils/jsonutil"
	"git.aionnect.com/aionnect/go-common/utils/logger"
	"github.com/Shopify/sarama"
	"github.com/bsm/sarama-cluster"
	"github.com/spf13/viper"
	"reflect"
)

// 消费者对象
type Consumer struct {
	*cluster.Consumer
	log *logger.Logger
}

// 新建消费者
func NewConsumer(groupID string, topics []string) *Consumer {
	config := cluster.NewConfig()
	config.Consumer.Return.Errors = true
	config.Group.Return.Notifications = false

	viper.SetDefault("kafka.brokers", []string{"127.0.0.1:9092"})
	brokers := viper.GetStringSlice("kafka.brokers")

	consumer, err := cluster.NewConsumer(brokers, groupID, topics, config)
	if err != nil {
		panic(fmt.Sprintf("Failed to start consumer: %s", err))
	}
	return &Consumer{Consumer: consumer}
}

// 将底层类库的日志输出到指定日志记录器
func (c *Consumer) SetLogger(log *logger.Logger) {
	if nil == log {
		return
	}
	c.log = log
	sarama.Logger = log
}

// 获取当前日志记录器
func (c *Consumer) GetLogger() *logger.Logger {
	if nil == c.log {
		c.log = logger.New()
	}
	return c.log
}

// 消息读取管道,管道消息类型是byte切片
func (c *Consumer) BytesMessages() <-chan []byte {
	ch := make(chan []byte, 0)
	go func(c *Consumer, ch chan []byte, oc <-chan *sarama.ConsumerMessage) {
		defer utils.DefaultGoroutineRecover(c.log, `KAFKA消息读取管道`)
		for msg := range oc {
			ch <- msg.Value
			c.MarkOffset(msg, "") // mark message as processed
		}
	}(c, ch, c.Consumer.Messages())
	return ch
}

// 将消息输出绑定到指定管道上,此方法内会进行反序列化,输出的消息类型可以是指定对象类型
func (c *Consumer) BindJSONChan(channel interface{}) {
	go func(c *Consumer, channel interface{}) {
		defer utils.DefaultGoroutineRecover(c.log, `KAFKA消息输出绑定`)
		chVal := reflect.ValueOf(channel)
		if chVal.Kind() != reflect.Chan {
			return
		}
		argType := chVal.Type().Elem()
		for {
			select {
			case msg := <-c.Messages():
				var oPtr reflect.Value
				if nil != msg && nil != msg.Value && len(msg.Value) > 0 && string(msg.Value) != "" {
					if argType.Kind() != reflect.Ptr {
						oPtr = reflect.New(argType)
					} else {
						oPtr = reflect.New(argType.Elem())
					}
					_ = jsonutil.Unmarshal(msg.Value, oPtr.Interface())
					if argType.Kind() != reflect.Ptr {
						oPtr = reflect.Indirect(oPtr)
					}
				}
				chVal.Send(oPtr)
				c.MarkOffset(msg, "") // mark message as processed
			}
		}
	}(c, channel)
}