Browse Source

redis pipeline

marion 4 years ago
parent
commit
df8e718bd6

+ 54 - 0
utils/redis/redigo_adapter.go

@@ -1,6 +1,7 @@
 package redis
 
 import (
+	"fmt"
 	"github.com/gomodule/redigo/redis"
 	"github.com/spf13/viper"
 	"strings"
@@ -70,3 +71,56 @@ func (a *RedigoAdapter) Do(commandName string, args ...interface{}) (interface{}
 		return conn.Do(commandName, args...)
 	}
 }
+
+// 返回命令管道操作对象
+func (a *RedigoAdapter) Pipeline() IRedisPipeline {
+	return &RedigoPipeline{
+		conn: a.pool.Get(),
+	}
+}
+
+// Redigo命令管道操作对象
+type RedigoPipeline struct {
+	conn     redis.Conn
+	commands []*redisCmd
+}
+
+// 向管道中添加命令
+func (p *RedigoPipeline) Send(commandName string, args ...interface{}) IRedisPipeline {
+	p.commands = append(p.commands, &redisCmd{commandName: commandName, args: args})
+	return p
+}
+
+// 执行管道中的命令
+func (p *RedigoPipeline) Execute() ([]interface{}, error) {
+	defer func(conn redis.Conn) {
+		_ = conn.Close()
+	}(p.conn)
+
+	if nil == p.commands || len(p.commands) == 0 {
+		return nil, fmt.Errorf("no commands in pipeline")
+	}
+
+	var err error
+	for i := 0; i < len(p.commands); i++ {
+		cmd := p.commands[i]
+		err = p.conn.Send(cmd.commandName, cmd.args...)
+		if nil != err {
+			return nil, err
+		}
+	}
+	err = p.conn.Flush()
+	if nil != err {
+		return nil, err
+	}
+
+	var replies []interface{}
+	for i := 0; i < len(p.commands); i++ {
+		reply, err := p.conn.Receive()
+		if nil != err {
+			return nil, err
+		}
+		replies = append(replies, reply)
+	}
+	return replies, nil
+}

+ 12 - 1
utils/redis/redis_conn.go

@@ -27,7 +27,18 @@ func (err ErrRedis) Error() string {
 // 抽象的Redis操作适配器接口定义
 type IRedisAdapter interface {
 	Close() error
-	Do(commandName string, args ...interface{}) (reply interface{}, err error)
+	Do(commandName string, args ...interface{}) (interface{}, error)
+	Pipeline() IRedisPipeline
+}
+
+type IRedisPipeline interface {
+	Send(commandName string, args ...interface{}) IRedisPipeline
+	Execute() ([]interface{}, error)
+}
+
+type redisCmd struct {
+	commandName string
+	args        []interface{}
 }
 
 // Redis连接池适配器

+ 39 - 0
utils/redis/redis_go_cluster_adapter.go

@@ -1,6 +1,7 @@
 package redis
 
 import (
+	"fmt"
 	redisc "github.com/chasex/redis-go-cluster"
 	"github.com/spf13/viper"
 	"time"
@@ -40,3 +41,41 @@ func (a *GoRedisClusterAdapter) Close() error {
 func (a *GoRedisClusterAdapter) Do(commandName string, args ...interface{}) (interface{}, error) {
 	return a.cluster.Do(commandName, args...)
 }
+
+// 返回命令管道操作对象
+func (a *GoRedisClusterAdapter) Pipeline() IRedisPipeline {
+	return &GoRedisClusterPipeline{
+		cluster: a.cluster,
+	}
+}
+
+// Redigo命令管道操作对象
+type GoRedisClusterPipeline struct {
+	cluster  *redisc.Cluster
+	commands []*redisCmd
+}
+
+// 向管道中添加命令
+func (p *GoRedisClusterPipeline) Send(commandName string, args ...interface{}) IRedisPipeline {
+	p.commands = append(p.commands, &redisCmd{commandName: commandName, args: args})
+	return p
+}
+
+// 执行管道中的命令
+func (p *GoRedisClusterPipeline) Execute() ([]interface{}, error) {
+	if nil == p.commands || len(p.commands) == 0 {
+		return nil, fmt.Errorf("no commands in pipeline")
+	}
+
+	batch := p.cluster.NewBatch()
+
+	var err error
+	for i := 0; i < len(p.commands); i++ {
+		cmd := p.commands[i]
+		err = batch.Put(cmd.commandName, cmd.args...)
+		if nil != err {
+			return nil, err
+		}
+	}
+	return p.cluster.RunBatch(batch)
+}

+ 7 - 0
utils/redis/redisc_adapter.go

@@ -46,3 +46,10 @@ func (a *RediscAdapter) Do(commandName string, args ...interface{}) (interface{}
 		return conn.Do(commandName, args...)
 	}
 }
+
+// 返回命令管道操作对象
+func (a *RediscAdapter) Pipeline() IRedisPipeline {
+	return &RedigoPipeline{ // redisc库兼容redigo库,返回RedigoPipeline即可
+		conn: a.cluster.Get(),
+	}
+}

+ 7 - 0
utils/redis/sentinel_adapter.go

@@ -82,3 +82,10 @@ func (a *SentinelAdapter) Do(commandName string, args ...interface{}) (interface
 		return conn.Do(commandName, args...)
 	}
 }
+
+// 返回命令管道操作对象
+func (a *SentinelAdapter) Pipeline() IRedisPipeline {
+	return &RedigoPipeline{ // sentinel库兼容redigo库,返回RedigoPipeline即可
+		conn: a.pool.Get(),
+	}
+}