diff --git a/go.mod b/go.mod index 69d1bd72..5429047a 100644 --- a/go.mod +++ b/go.mod @@ -34,12 +34,12 @@ require ( github.com/garyburd/redigo v1.6.0 // indirect github.com/go-openapi/jsonreference v0.19.3 // indirect github.com/go-openapi/spec v0.19.4 // indirect - github.com/go-redis/redis v6.15.7+incompatible + github.com/go-redis/redis v6.14.0+incompatible + github.com/go-redis/redis/v7 v7.2.0 // indirect github.com/go-sql-driver/mysql v1.5.0 github.com/go-testfixtures/testfixtures/v3 v3.1.1 github.com/go-xorm/core v0.6.2 // indirect github.com/go-xorm/xorm v0.7.9 // indirect - github.com/golang/protobuf v1.3.2 // indirect github.com/gordonklaus/ineffassign v0.0.0-20200309095847-7953dde2c7bf github.com/imdario/mergo v0.3.9 github.com/jgautheron/goconst v0.0.0-20200227150835-cda7ea3bf591 diff --git a/go.sum b/go.sum index 21a7da50..6423e456 100644 --- a/go.sum +++ b/go.sum @@ -117,11 +117,13 @@ github.com/go-openapi/swag v0.17.0/go.mod h1:AByQ+nYG6gQg71GINrmuDXCPWdL640yX49/ github.com/go-openapi/swag v0.19.2/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk= github.com/go-openapi/swag v0.19.5 h1:lTz6Ys4CmqqCQmZPBlbQENR1/GucA2bzYTE12Pw4tFY= github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk= +github.com/go-redis/redis v6.14.0+incompatible h1:AMPZkM7PbsJbilelrJUAyC4xQbGROTOLSuDd7fnMXCI= github.com/go-redis/redis v6.14.0+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= github.com/go-redis/redis v6.15.2+incompatible h1:9SpNVG76gr6InJGxoZ6IuuxaCOQwDAhzyXg+Bs+0Sb4= github.com/go-redis/redis v6.15.2+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= github.com/go-redis/redis v6.15.7+incompatible h1:3skhDh95XQMpnqeqNftPkQD9jL9e5e36z/1SUm6dy1U= github.com/go-redis/redis v6.15.7+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= +github.com/go-redis/redis/v7 v7.2.0/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg= github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs= @@ -281,9 +283,11 @@ github.com/olekukonko/tablewriter v0.0.4/go.mod h1:zq6QwlOf5SlnkVbMSr5EoBv3636FW github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/op/go-logging v0.0.0-20160315200505-970db520ece7 h1:lDH9UUVJtmYCjyT0CI4q8xvlXPxeZ0gYCVvWbmPlp88= github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk= github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw= @@ -460,6 +464,7 @@ golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI= golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -492,6 +497,7 @@ golang.org/x/sys v0.0.0-20190616124812-15dcb6c0061f/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb h1:fgwFCsaw9buMuxNd6+DQfAuSFqbNiQZpcgJQAgJsK6k= golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191010194322-b09406accb47/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -549,6 +555,7 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/d4l3k/messagediff.v1 v1.2.1 h1:70AthpjunwzUiarMHyED52mj9UwtAnE89l1Gmrt3EU0= gopkg.in/d4l3k/messagediff.v1 v1.2.1/go.mod h1:EUzikiKadqXWcD1AzJLagx0j/BeeWGtn++04Xniyg44= gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= diff --git a/vendor/github.com/go-redis/redis/.travis.yml b/vendor/github.com/go-redis/redis/.travis.yml index 06d7897b..39ffc2be 100644 --- a/vendor/github.com/go-redis/redis/.travis.yml +++ b/vendor/github.com/go-redis/redis/.travis.yml @@ -5,10 +5,10 @@ services: - redis-server go: + - 1.7.x + - 1.8.x - 1.9.x - 1.10.x - - 1.11.x - - 1.12.x - tip matrix: diff --git a/vendor/github.com/go-redis/redis/CHANGELOG.md b/vendor/github.com/go-redis/redis/CHANGELOG.md index 19645661..7c40d5e3 100644 --- a/vendor/github.com/go-redis/redis/CHANGELOG.md +++ b/vendor/github.com/go-redis/redis/CHANGELOG.md @@ -1,9 +1,5 @@ # Changelog -## Unreleased - -- Cluster and Ring pipelines process commands for each node in its own goroutine. - ## 6.14 - Added Options.MinIdleConns. diff --git a/vendor/github.com/go-redis/redis/Makefile b/vendor/github.com/go-redis/redis/Makefile index fa3b4e00..1fbdac91 100644 --- a/vendor/github.com/go-redis/redis/Makefile +++ b/vendor/github.com/go-redis/redis/Makefile @@ -3,8 +3,6 @@ all: testdeps go test ./... -short -race env GOOS=linux GOARCH=386 go test ./... go vet - go get github.com/gordonklaus/ineffassign - ineffassign . testdeps: testdata/redis/src/redis-server @@ -15,7 +13,7 @@ bench: testdeps testdata/redis: mkdir -p $@ - wget -qO- https://github.com/antirez/redis/archive/5.0.tar.gz | tar xvz --strip-components=1 -C $@ + wget -qO- https://github.com/antirez/redis/archive/unstable.tar.gz | tar xvz --strip-components=1 -C $@ testdata/redis/src/redis-server: testdata/redis sed -i.bak 's/libjemalloc.a/libjemalloc.a -lrt/g' $ 0 { go c.reaper(opt.IdleCheckFrequency) } @@ -681,17 +727,17 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient { return c } -func (c *ClusterClient) init() { - c.cmdable.setProcessor(c.Process) -} - -// ReloadState reloads cluster state. If available it calls ClusterSlots func +// ReloadState reloads cluster state. It calls ClusterSlots func // to get cluster slots information. func (c *ClusterClient) ReloadState() error { _, err := c.state.Reload() return err } +func (c *ClusterClient) init() { + c.cmdable.setProcessor(c.Process) +} + func (c *ClusterClient) Context() context.Context { if c.ctx != nil { return c.ctx @@ -703,12 +749,12 @@ func (c *ClusterClient) WithContext(ctx context.Context) *ClusterClient { if ctx == nil { panic("nil context") } - c2 := c.clone() + c2 := c.copy() c2.ctx = ctx return c2 } -func (c *ClusterClient) clone() *ClusterClient { +func (c *ClusterClient) copy() *ClusterClient { cp := *c cp.init() return &cp @@ -772,11 +818,6 @@ func cmdSlot(cmd Cmder, pos int) int { } func (c *ClusterClient) cmdSlot(cmd Cmder) int { - args := cmd.Args() - if args[0] == "cluster" && args[1] == "getkeysinslot" { - return args[2].(int) - } - cmdInfo := c.cmdInfo(cmd.Name()) return cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo)) } @@ -788,9 +829,9 @@ func (c *ClusterClient) cmdSlotAndNode(cmd Cmder) (int, *clusterNode, error) { } cmdInfo := c.cmdInfo(cmd.Name()) - slot := c.cmdSlot(cmd) + slot := cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo)) - if c.opt.ReadOnly && cmdInfo != nil && cmdInfo.ReadOnly { + if cmdInfo != nil && cmdInfo.ReadOnly && c.opt.ReadOnly { if c.opt.RouteByLatency { node, err := state.slotClosestNode(slot) return slot, node, err @@ -849,12 +890,15 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error { if err == nil { break } - if err != Nil { + + if internal.IsRetryableError(err, true) { c.state.LazyReload() + continue } moved, ask, addr := internal.IsMovedError(err) if moved || ask { + c.state.LazyReload() node, err = c.nodes.GetOrCreate(addr) if err != nil { return err @@ -862,7 +906,7 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error { continue } - if err == pool.ErrClosed || internal.IsReadOnlyError(err) { + if err == pool.ErrClosed { node, err = c.slotMasterNode(slot) if err != nil { return err @@ -870,10 +914,6 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error { continue } - if internal.IsRetryableError(err, true) { - continue - } - return err } @@ -938,34 +978,16 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error { if err == nil { break } - if err != Nil { - c.state.LazyReload() - } - // If slave is loading - pick another node. + // If slave is loading - read from master. if c.opt.ReadOnly && internal.IsLoadingError(err) { node.MarkAsLoading() - node = nil - continue - } - - var moved bool - var addr string - moved, ask, addr = internal.IsMovedError(err) - if moved || ask { - node, err = c.nodes.GetOrCreate(addr) - if err != nil { - break - } - continue - } - - if err == pool.ErrClosed || internal.IsReadOnlyError(err) { - node = nil continue } if internal.IsRetryableError(err, true) { + c.state.LazyReload() + // First retry the same node. if attempt == 0 { continue @@ -979,6 +1001,24 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error { continue } + var moved bool + var addr string + moved, ask, addr = internal.IsMovedError(err) + if moved || ask { + c.state.LazyReload() + + node, err = c.nodes.GetOrCreate(addr) + if err != nil { + break + } + continue + } + + if err == pool.ErrClosed { + node = nil + continue + } + break } @@ -1198,12 +1238,10 @@ func (c *ClusterClient) WrapProcessPipeline( fn func(oldProcess func([]Cmder) error) func([]Cmder) error, ) { c.processPipeline = fn(c.processPipeline) - c.processTxPipeline = fn(c.processTxPipeline) } func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error { - cmdsMap := newCmdsMap() - err := c.mapCmdsByNode(cmds, cmdsMap) + cmdsMap, err := c.mapCmdsByNode(cmds) if err != nil { setCmdsErr(cmds, err) return err @@ -1214,31 +1252,28 @@ func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error { time.Sleep(c.retryBackoff(attempt)) } - failedCmds := newCmdsMap() - var wg sync.WaitGroup + failedCmds := make(map[*clusterNode][]Cmder) - for node, cmds := range cmdsMap.m { - wg.Add(1) - go func(node *clusterNode, cmds []Cmder) { - defer wg.Done() - - cn, err := node.Client.getConn() - if err != nil { - if err == pool.ErrClosed { - c.mapCmdsByNode(cmds, failedCmds) - } else { - setCmdsErr(cmds, err) - } - return + for node, cmds := range cmdsMap { + cn, err := node.Client.getConn() + if err != nil { + if err == pool.ErrClosed { + c.remapCmds(cmds, failedCmds) + } else { + setCmdsErr(cmds, err) } + continue + } - err = c.pipelineProcessCmds(node, cn, cmds, failedCmds) - node.Client.releaseConnStrict(cn, err) - }(node, cmds) + err = c.pipelineProcessCmds(node, cn, cmds, failedCmds) + if err == nil || internal.IsRedisError(err) { + node.Client.connPool.Put(cn) + } else { + node.Client.connPool.Remove(cn) + } } - wg.Wait() - if len(failedCmds.m) == 0 { + if len(failedCmds) == 0 { break } cmdsMap = failedCmds @@ -1247,24 +1282,14 @@ func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error { return cmdsFirstErr(cmds) } -type cmdsMap struct { - mu sync.Mutex - m map[*clusterNode][]Cmder -} - -func newCmdsMap() *cmdsMap { - return &cmdsMap{ - m: make(map[*clusterNode][]Cmder), - } -} - -func (c *ClusterClient) mapCmdsByNode(cmds []Cmder, cmdsMap *cmdsMap) error { +func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, error) { state, err := c.state.Get() if err != nil { setCmdsErr(cmds, err) - return err + return nil, err } + cmdsMap := make(map[*clusterNode][]Cmder) cmdsAreReadOnly := c.cmdsAreReadOnly(cmds) for _, cmd := range cmds { var node *clusterNode @@ -1276,13 +1301,11 @@ func (c *ClusterClient) mapCmdsByNode(cmds []Cmder, cmdsMap *cmdsMap) error { node, err = state.slotMasterNode(slot) } if err != nil { - return err + return nil, err } - cmdsMap.mu.Lock() - cmdsMap.m[node] = append(cmdsMap.m[node], cmd) - cmdsMap.mu.Unlock() + cmdsMap[node] = append(cmdsMap[node], cmd) } - return nil + return cmdsMap, nil } func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool { @@ -1295,30 +1318,39 @@ func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool { return true } +func (c *ClusterClient) remapCmds(cmds []Cmder, failedCmds map[*clusterNode][]Cmder) { + remappedCmds, err := c.mapCmdsByNode(cmds) + if err != nil { + setCmdsErr(cmds, err) + return + } + + for node, cmds := range remappedCmds { + failedCmds[node] = cmds + } +} + func (c *ClusterClient) pipelineProcessCmds( - node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap, + node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, ) error { err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error { return writeCmd(wr, cmds...) }) if err != nil { setCmdsErr(cmds, err) - failedCmds.mu.Lock() - failedCmds.m[node] = cmds - failedCmds.mu.Unlock() + failedCmds[node] = cmds return err } err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error { - return c.pipelineReadCmds(node, rd, cmds, failedCmds) + return c.pipelineReadCmds(rd, cmds, failedCmds) }) return err } func (c *ClusterClient) pipelineReadCmds( - node *clusterNode, rd *proto.Reader, cmds []Cmder, failedCmds *cmdsMap, + rd *proto.Reader, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, ) error { - var firstErr error for _, cmd := range cmds { err := cmd.readReply(rd) if err == nil { @@ -1333,18 +1365,13 @@ func (c *ClusterClient) pipelineReadCmds( continue } - failedCmds.mu.Lock() - failedCmds.m[node] = append(failedCmds.m[node], cmd) - failedCmds.mu.Unlock() - if firstErr == nil { - firstErr = err - } + return err } - return firstErr + return nil } func (c *ClusterClient) checkMovedErr( - cmd Cmder, err error, failedCmds *cmdsMap, + cmd Cmder, err error, failedCmds map[*clusterNode][]Cmder, ) bool { moved, ask, addr := internal.IsMovedError(err) @@ -1356,9 +1383,7 @@ func (c *ClusterClient) checkMovedErr( return false } - failedCmds.mu.Lock() - failedCmds.m[node] = append(failedCmds.m[node], cmd) - failedCmds.mu.Unlock() + failedCmds[node] = append(failedCmds[node], cmd) return true } @@ -1368,9 +1393,7 @@ func (c *ClusterClient) checkMovedErr( return false } - failedCmds.mu.Lock() - failedCmds.m[node] = append(failedCmds.m[node], NewCmd("ASKING"), cmd) - failedCmds.mu.Unlock() + failedCmds[node] = append(failedCmds[node], NewCmd("ASKING"), cmd) return true } @@ -1410,34 +1433,31 @@ func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error { time.Sleep(c.retryBackoff(attempt)) } - failedCmds := newCmdsMap() - var wg sync.WaitGroup + failedCmds := make(map[*clusterNode][]Cmder) for node, cmds := range cmdsMap { - wg.Add(1) - go func(node *clusterNode, cmds []Cmder) { - defer wg.Done() - - cn, err := node.Client.getConn() - if err != nil { - if err == pool.ErrClosed { - c.mapCmdsByNode(cmds, failedCmds) - } else { - setCmdsErr(cmds, err) - } - return + cn, err := node.Client.getConn() + if err != nil { + if err == pool.ErrClosed { + c.remapCmds(cmds, failedCmds) + } else { + setCmdsErr(cmds, err) } + continue + } - err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds) - node.Client.releaseConnStrict(cn, err) - }(node, cmds) + err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds) + if err == nil || internal.IsRedisError(err) { + node.Client.connPool.Put(cn) + } else { + node.Client.connPool.Remove(cn) + } } - wg.Wait() - if len(failedCmds.m) == 0 { + if len(failedCmds) == 0 { break } - cmdsMap = failedCmds.m + cmdsMap = failedCmds } } @@ -1454,16 +1474,14 @@ func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder { } func (c *ClusterClient) txPipelineProcessCmds( - node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap, + node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, ) error { err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error { return txPipelineWriteMulti(wr, cmds) }) if err != nil { setCmdsErr(cmds, err) - failedCmds.mu.Lock() - failedCmds.m[node] = cmds - failedCmds.mu.Unlock() + failedCmds[node] = cmds return err } @@ -1479,7 +1497,7 @@ func (c *ClusterClient) txPipelineProcessCmds( } func (c *ClusterClient) txPipelineReadQueued( - rd *proto.Reader, cmds []Cmder, failedCmds *cmdsMap, + rd *proto.Reader, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, ) error { // Parse queued replies. var statusCmd StatusCmd @@ -1528,51 +1546,40 @@ func (c *ClusterClient) txPipelineReadQueued( return nil } -func (c *ClusterClient) pubSub() *PubSub { +func (c *ClusterClient) pubSub(channels []string) *PubSub { var node *clusterNode pubsub := &PubSub{ opt: c.opt.clientOptions(), newConn: func(channels []string) (*pool.Conn, error) { - if node != nil { - panic("node != nil") - } + if node == nil { + var slot int + if len(channels) > 0 { + slot = hashtag.Slot(channels[0]) + } else { + slot = -1 + } - var err error - if len(channels) > 0 { - slot := hashtag.Slot(channels[0]) - node, err = c.slotMasterNode(slot) - } else { - node, err = c.nodes.Random() + masterNode, err := c.slotMasterNode(slot) + if err != nil { + return nil, err + } + node = masterNode } - if err != nil { - return nil, err - } - - cn, err := node.Client.newConn() - if err != nil { - node = nil - - return nil, err - } - - return cn, nil + return node.Client.newConn() }, closeConn: func(cn *pool.Conn) error { - err := node.Client.connPool.CloseConn(cn) - node = nil - return err + return node.Client.connPool.CloseConn(cn) }, } pubsub.init() - return pubsub } // Subscribe subscribes the client to the specified channels. // Channels can be omitted to create empty subscription. func (c *ClusterClient) Subscribe(channels ...string) *PubSub { - pubsub := c.pubSub() + pubsub := c.pubSub(channels) if len(channels) > 0 { _ = pubsub.Subscribe(channels...) } @@ -1582,7 +1589,7 @@ func (c *ClusterClient) Subscribe(channels ...string) *PubSub { // PSubscribe subscribes the client to the given patterns. // Patterns can be omitted to create empty subscription. func (c *ClusterClient) PSubscribe(channels ...string) *PubSub { - pubsub := c.pubSub() + pubsub := c.pubSub(channels) if len(channels) > 0 { _ = pubsub.PSubscribe(channels...) } diff --git a/vendor/github.com/go-redis/redis/command.go b/vendor/github.com/go-redis/redis/command.go index c70973d3..ca44d7c8 100644 --- a/vendor/github.com/go-redis/redis/command.go +++ b/vendor/github.com/go-redis/redis/command.go @@ -183,7 +183,7 @@ func (cmd *Cmd) Int() (int, error) { case string: return strconv.Atoi(val) default: - err := fmt.Errorf("redis: unexpected type=%T for Int", val) + err := fmt.Errorf("redis: unexpected type=%T for Int64", val) return 0, err } } @@ -218,25 +218,6 @@ func (cmd *Cmd) Uint64() (uint64, error) { } } -func (cmd *Cmd) Float32() (float32, error) { - if cmd.err != nil { - return 0, cmd.err - } - switch val := cmd.val.(type) { - case int64: - return float32(val), nil - case string: - f, err := strconv.ParseFloat(val, 32) - if err != nil { - return 0, err - } - return float32(f), nil - default: - err := fmt.Errorf("redis: unexpected type=%T for Float32", val) - return 0, err - } -} - func (cmd *Cmd) Float64() (float64, error) { if cmd.err != nil { return 0, cmd.err @@ -604,17 +585,6 @@ func (cmd *StringCmd) Uint64() (uint64, error) { return strconv.ParseUint(cmd.Val(), 10, 64) } -func (cmd *StringCmd) Float32() (float32, error) { - if cmd.err != nil { - return 0, cmd.err - } - f, err := strconv.ParseFloat(cmd.Val(), 32) - if err != nil { - return 0, err - } - return float32(f), nil -} - func (cmd *StringCmd) Float64() (float64, error) { if cmd.err != nil { return 0, cmd.err @@ -717,12 +687,12 @@ func (cmd *StringSliceCmd) readReply(rd *proto.Reader) error { func stringSliceParser(rd *proto.Reader, n int64) (interface{}, error) { ss := make([]string, 0, n) for i := int64(0); i < n; i++ { - switch s, err := rd.ReadString(); { - case err == Nil: + s, err := rd.ReadString() + if err == Nil { ss = append(ss, "") - case err != nil: + } else if err != nil { return nil, err - default: + } else { ss = append(ss, s) } } @@ -999,20 +969,14 @@ func xMessageSliceParser(rd *proto.Reader, n int64) (interface{}, error) { return nil, err } - var values map[string]interface{} - v, err := rd.ReadArrayReply(stringInterfaceMapParser) if err != nil { - if err != proto.Nil { - return nil, err - } - } else { - values = v.(map[string]interface{}) + return nil, err } msgs = append(msgs, XMessage{ ID: id, - Values: values, + Values: v.(map[string]interface{}), }) return nil, nil }) @@ -1373,68 +1337,6 @@ func zSliceParser(rd *proto.Reader, n int64) (interface{}, error) { //------------------------------------------------------------------------------ -type ZWithKeyCmd struct { - baseCmd - - val ZWithKey -} - -var _ Cmder = (*ZWithKeyCmd)(nil) - -func NewZWithKeyCmd(args ...interface{}) *ZWithKeyCmd { - return &ZWithKeyCmd{ - baseCmd: baseCmd{_args: args}, - } -} - -func (cmd *ZWithKeyCmd) Val() ZWithKey { - return cmd.val -} - -func (cmd *ZWithKeyCmd) Result() (ZWithKey, error) { - return cmd.Val(), cmd.Err() -} - -func (cmd *ZWithKeyCmd) String() string { - return cmdString(cmd, cmd.val) -} - -func (cmd *ZWithKeyCmd) readReply(rd *proto.Reader) error { - var v interface{} - v, cmd.err = rd.ReadArrayReply(zWithKeyParser) - if cmd.err != nil { - return cmd.err - } - cmd.val = v.(ZWithKey) - return nil -} - -// Implements proto.MultiBulkParse -func zWithKeyParser(rd *proto.Reader, n int64) (interface{}, error) { - if n != 3 { - return nil, fmt.Errorf("got %d elements, expected 3", n) - } - - var z ZWithKey - var err error - - z.Key, err = rd.ReadString() - if err != nil { - return nil, err - } - z.Member, err = rd.ReadString() - if err != nil { - return nil, err - } - z.Score, err = rd.ReadFloatReply() - if err != nil { - return nil, err - } - return z, nil -} - -//------------------------------------------------------------------------------ - type ScanCmd struct { baseCmd diff --git a/vendor/github.com/go-redis/redis/commands.go b/vendor/github.com/go-redis/redis/commands.go index 653e4abe..b259e3a8 100644 --- a/vendor/github.com/go-redis/redis/commands.go +++ b/vendor/github.com/go-redis/redis/commands.go @@ -8,6 +8,13 @@ import ( "github.com/go-redis/redis/internal" ) +func readTimeout(timeout time.Duration) time.Duration { + if timeout == 0 { + return 0 + } + return timeout + 10*time.Second +} + func usePrecise(dur time.Duration) bool { return dur < time.Second || dur%time.Second != 0 } @@ -166,7 +173,6 @@ type Cmdable interface { SUnion(keys ...string) *StringSliceCmd SUnionStore(destination string, keys ...string) *IntCmd XAdd(a *XAddArgs) *StringCmd - XDel(stream string, ids ...string) *IntCmd XLen(stream string) *IntCmd XRange(stream, start, stop string) *XMessageSliceCmd XRangeN(stream, start, stop string, count int64) *XMessageSliceCmd @@ -175,7 +181,6 @@ type Cmdable interface { XRead(a *XReadArgs) *XStreamSliceCmd XReadStreams(streams ...string) *XStreamSliceCmd XGroupCreate(stream, group, start string) *StatusCmd - XGroupCreateMkStream(stream, group, start string) *StatusCmd XGroupSetID(stream, group, start string) *StatusCmd XGroupDestroy(stream, group string) *IntCmd XGroupDelConsumer(stream, group, consumer string) *IntCmd @@ -187,8 +192,6 @@ type Cmdable interface { XClaimJustID(a *XClaimArgs) *StringSliceCmd XTrim(key string, maxLen int64) *IntCmd XTrimApprox(key string, maxLen int64) *IntCmd - BZPopMax(timeout time.Duration, keys ...string) *ZWithKeyCmd - BZPopMin(timeout time.Duration, keys ...string) *ZWithKeyCmd ZAdd(key string, members ...Z) *IntCmd ZAddNX(key string, members ...Z) *IntCmd ZAddXX(key string, members ...Z) *IntCmd @@ -203,8 +206,6 @@ type Cmdable interface { ZLexCount(key, min, max string) *IntCmd ZIncrBy(key string, increment float64, member string) *FloatCmd ZInterStore(destination string, store ZStore, keys ...string) *IntCmd - ZPopMax(key string, count ...int64) *ZSliceCmd - ZPopMin(key string, count ...int64) *ZSliceCmd ZRange(key string, start, stop int64) *StringSliceCmd ZRangeWithScores(key string, start, stop int64) *ZSliceCmd ZRangeByScore(key string, opt ZRangeBy) *StringSliceCmd @@ -232,7 +233,6 @@ type Cmdable interface { ClientKillByFilter(keys ...string) *IntCmd ClientList() *StringCmd ClientPause(dur time.Duration) *BoolCmd - ClientID() *IntCmd ConfigGet(parameter string) *SliceCmd ConfigResetStat() *StatusCmd ConfigSet(parameter, value string) *StatusCmd @@ -270,7 +270,6 @@ type Cmdable interface { ClusterResetHard() *StatusCmd ClusterInfo() *StringCmd ClusterKeySlot(key string) *IntCmd - ClusterGetKeysInSlot(slot int, count int) *StringSliceCmd ClusterCountFailureReports(nodeID string) *IntCmd ClusterCountKeysInSlot(slot int) *IntCmd ClusterDelSlots(slots ...int) *StatusCmd @@ -1343,16 +1342,6 @@ func (c *cmdable) XAdd(a *XAddArgs) *StringCmd { return cmd } -func (c *cmdable) XDel(stream string, ids ...string) *IntCmd { - args := []interface{}{"xdel", stream} - for _, id := range ids { - args = append(args, id) - } - cmd := NewIntCmd(args...) - c.process(cmd) - return cmd -} - func (c *cmdable) XLen(stream string) *IntCmd { cmd := NewIntCmd("xlen", stream) c.process(cmd) @@ -1406,9 +1395,6 @@ func (c *cmdable) XRead(a *XReadArgs) *XStreamSliceCmd { } cmd := NewXStreamSliceCmd(args...) - if a.Block >= 0 { - cmd.setReadTimeout(a.Block) - } c.process(cmd) return cmd } @@ -1426,12 +1412,6 @@ func (c *cmdable) XGroupCreate(stream, group, start string) *StatusCmd { return cmd } -func (c *cmdable) XGroupCreateMkStream(stream, group, start string) *StatusCmd { - cmd := NewStatusCmd("xgroup", "create", stream, group, start, "mkstream") - c.process(cmd) - return cmd -} - func (c *cmdable) XGroupSetID(stream, group, start string) *StatusCmd { cmd := NewStatusCmd("xgroup", "setid", stream, group, start) c.process(cmd) @@ -1453,11 +1433,9 @@ func (c *cmdable) XGroupDelConsumer(stream, group, consumer string) *IntCmd { type XReadGroupArgs struct { Group string Consumer string - // List of streams and ids. - Streams []string - Count int64 - Block time.Duration - NoAck bool + Streams []string + Count int64 + Block time.Duration } func (c *cmdable) XReadGroup(a *XReadGroupArgs) *XStreamSliceCmd { @@ -1469,18 +1447,12 @@ func (c *cmdable) XReadGroup(a *XReadGroupArgs) *XStreamSliceCmd { if a.Block >= 0 { args = append(args, "block", int64(a.Block/time.Millisecond)) } - if a.NoAck { - args = append(args, "noack") - } args = append(args, "streams") for _, s := range a.Streams { args = append(args, s) } cmd := NewXStreamSliceCmd(args...) - if a.Block >= 0 { - cmd.setReadTimeout(a.Block) - } c.process(cmd) return cmd } @@ -1577,12 +1549,6 @@ type Z struct { Member interface{} } -// ZWithKey represents sorted set member including the name of the key where it was popped. -type ZWithKey struct { - Z - Key string -} - // ZStore is used as an arg to ZInterStore and ZUnionStore. type ZStore struct { Weights []float64 @@ -1590,34 +1556,6 @@ type ZStore struct { Aggregate string } -// Redis `BZPOPMAX key [key ...] timeout` command. -func (c *cmdable) BZPopMax(timeout time.Duration, keys ...string) *ZWithKeyCmd { - args := make([]interface{}, 1+len(keys)+1) - args[0] = "bzpopmax" - for i, key := range keys { - args[1+i] = key - } - args[len(args)-1] = formatSec(timeout) - cmd := NewZWithKeyCmd(args...) - cmd.setReadTimeout(timeout) - c.process(cmd) - return cmd -} - -// Redis `BZPOPMIN key [key ...] timeout` command. -func (c *cmdable) BZPopMin(timeout time.Duration, keys ...string) *ZWithKeyCmd { - args := make([]interface{}, 1+len(keys)+1) - args[0] = "bzpopmin" - for i, key := range keys { - args[1+i] = key - } - args[len(args)-1] = formatSec(timeout) - cmd := NewZWithKeyCmd(args...) - cmd.setReadTimeout(timeout) - c.process(cmd) - return cmd -} - func (c *cmdable) zAdd(a []interface{}, n int, members ...Z) *IntCmd { for i, m := range members { a[n+2*i] = m.Score @@ -1756,46 +1694,6 @@ func (c *cmdable) ZInterStore(destination string, store ZStore, keys ...string) return cmd } -func (c *cmdable) ZPopMax(key string, count ...int64) *ZSliceCmd { - args := []interface{}{ - "zpopmax", - key, - } - - switch len(count) { - case 0: - break - case 1: - args = append(args, count[0]) - default: - panic("too many arguments") - } - - cmd := NewZSliceCmd(args...) - c.process(cmd) - return cmd -} - -func (c *cmdable) ZPopMin(key string, count ...int64) *ZSliceCmd { - args := []interface{}{ - "zpopmin", - key, - } - - switch len(count) { - case 0: - break - case 1: - args = append(args, count[0]) - default: - panic("too many arguments") - } - - cmd := NewZSliceCmd(args...) - c.process(cmd) - return cmd -} - func (c *cmdable) zRange(key string, start, stop int64, withScores bool) *StringSliceCmd { args := []interface{}{ "zrange", @@ -2071,24 +1969,6 @@ func (c *cmdable) ClientPause(dur time.Duration) *BoolCmd { return cmd } -func (c *cmdable) ClientID() *IntCmd { - cmd := NewIntCmd("client", "id") - c.process(cmd) - return cmd -} - -func (c *cmdable) ClientUnblock(id int64) *IntCmd { - cmd := NewIntCmd("client", "unblock", id) - c.process(cmd) - return cmd -} - -func (c *cmdable) ClientUnblockWithError(id int64) *IntCmd { - cmd := NewIntCmd("client", "unblock", id, "error") - c.process(cmd) - return cmd -} - // ClientSetName assigns a name to the connection. func (c *statefulCmdable) ClientSetName(name string) *BoolCmd { cmd := NewBoolCmd("client", "setname", name) @@ -2404,12 +2284,6 @@ func (c *cmdable) ClusterKeySlot(key string) *IntCmd { return cmd } -func (c *cmdable) ClusterGetKeysInSlot(slot int, count int) *StringSliceCmd { - cmd := NewStringSliceCmd("cluster", "getkeysinslot", slot, count) - c.process(cmd) - return cmd -} - func (c *cmdable) ClusterCountFailureReports(nodeID string) *IntCmd { cmd := NewIntCmd("cluster", "count-failure-reports", nodeID) c.process(cmd) diff --git a/vendor/github.com/go-redis/redis/internal/error.go b/vendor/github.com/go-redis/redis/internal/error.go index 34f6bd4d..bda97baa 100644 --- a/vendor/github.com/go-redis/redis/internal/error.go +++ b/vendor/github.com/go-redis/redis/internal/error.go @@ -9,9 +9,6 @@ import ( ) func IsRetryableError(err error, retryTimeout bool) bool { - if err == nil { - return false - } if err == io.EOF { return true } @@ -47,8 +44,7 @@ func IsBadConn(err error, allowTimeout bool) bool { return false } if IsRedisError(err) { - // #790 - return IsReadOnlyError(err) + return strings.HasPrefix(err.Error(), "READONLY ") } if allowTimeout { if netErr, ok := err.(net.Error); ok && netErr.Timeout() { @@ -83,7 +79,3 @@ func IsMovedError(err error) (moved bool, ask bool, addr string) { func IsLoadingError(err error) bool { return strings.HasPrefix(err.Error(), "LOADING ") } - -func IsReadOnlyError(err error) bool { - return strings.HasPrefix(err.Error(), "READONLY ") -} diff --git a/vendor/github.com/go-redis/redis/internal/pool/conn.go b/vendor/github.com/go-redis/redis/internal/pool/conn.go index ac48113b..1095bfe5 100644 --- a/vendor/github.com/go-redis/redis/internal/pool/conn.go +++ b/vendor/github.com/go-redis/redis/internal/pool/conn.go @@ -17,16 +17,14 @@ type Conn struct { rdLocked bool wr *proto.Writer - Inited bool - pooled bool - createdAt time.Time - usedAt atomic.Value + InitedAt time.Time + pooled bool + usedAt atomic.Value } func NewConn(netConn net.Conn) *Conn { cn := &Conn{ - netConn: netConn, - createdAt: time.Now(), + netConn: netConn, } cn.rd = proto.NewReader(netConn) cn.wr = proto.NewWriter(netConn) diff --git a/vendor/github.com/go-redis/redis/internal/pool/pool.go b/vendor/github.com/go-redis/redis/internal/pool/pool.go index cd4c8d69..9cecee8a 100644 --- a/vendor/github.com/go-redis/redis/internal/pool/pool.go +++ b/vendor/github.com/go-redis/redis/internal/pool/pool.go @@ -38,7 +38,7 @@ type Pooler interface { Get() (*Conn, error) Put(*Conn) - Remove(*Conn, error) + Remove(*Conn) Len() int IdleLen() int @@ -289,7 +289,7 @@ func (p *ConnPool) popIdle() *Conn { func (p *ConnPool) Put(cn *Conn) { if !cn.pooled { - p.Remove(cn, nil) + p.Remove(cn) return } @@ -300,7 +300,7 @@ func (p *ConnPool) Put(cn *Conn) { p.freeTurn() } -func (p *ConnPool) Remove(cn *Conn, reason error) { +func (p *ConnPool) Remove(cn *Conn) { p.removeConn(cn) p.freeTurn() _ = p.closeConn(cn) @@ -468,7 +468,7 @@ func (p *ConnPool) isStaleConn(cn *Conn) bool { if p.opt.IdleTimeout > 0 && now.Sub(cn.UsedAt()) >= p.opt.IdleTimeout { return true } - if p.opt.MaxConnAge > 0 && now.Sub(cn.createdAt) >= p.opt.MaxConnAge { + if p.opt.MaxConnAge > 0 && now.Sub(cn.InitedAt) >= p.opt.MaxConnAge { return true } diff --git a/vendor/github.com/go-redis/redis/internal/pool/pool_single.go b/vendor/github.com/go-redis/redis/internal/pool/pool_single.go index cd0289b6..b35b78af 100644 --- a/vendor/github.com/go-redis/redis/internal/pool/pool_single.go +++ b/vendor/github.com/go-redis/redis/internal/pool/pool_single.go @@ -1,203 +1,53 @@ package pool -import ( - "fmt" - "sync/atomic" -) - -const ( - stateDefault = 0 - stateInited = 1 - stateClosed = 2 -) - -type BadConnError struct { - wrapped error -} - -var _ error = (*BadConnError)(nil) - -func (e BadConnError) Error() string { - return "pg: Conn is in a bad state" -} - -func (e BadConnError) Unwrap() error { - return e.wrapped -} - type SingleConnPool struct { - pool Pooler - level int32 // atomic - - state uint32 // atomic - ch chan *Conn - - _badConnError atomic.Value + cn *Conn } var _ Pooler = (*SingleConnPool)(nil) -func NewSingleConnPool(pool Pooler) *SingleConnPool { - p, ok := pool.(*SingleConnPool) - if !ok { - p = &SingleConnPool{ - pool: pool, - ch: make(chan *Conn, 1), - } - } - atomic.AddInt32(&p.level, 1) - return p -} - -func (p *SingleConnPool) SetConn(cn *Conn) { - if atomic.CompareAndSwapUint32(&p.state, stateDefault, stateInited) { - p.ch <- cn - } else { - panic("not reached") +func NewSingleConnPool(cn *Conn) *SingleConnPool { + return &SingleConnPool{ + cn: cn, } } func (p *SingleConnPool) NewConn() (*Conn, error) { - return p.pool.NewConn() + panic("not implemented") } -func (p *SingleConnPool) CloseConn(cn *Conn) error { - return p.pool.CloseConn(cn) +func (p *SingleConnPool) CloseConn(*Conn) error { + panic("not implemented") } func (p *SingleConnPool) Get() (*Conn, error) { - // In worst case this races with Close which is not a very common operation. - for i := 0; i < 1000; i++ { - switch atomic.LoadUint32(&p.state) { - case stateDefault: - cn, err := p.pool.Get() - if err != nil { - return nil, err - } - if atomic.CompareAndSwapUint32(&p.state, stateDefault, stateInited) { - return cn, nil - } - p.pool.Remove(cn, ErrClosed) - case stateInited: - if err := p.badConnError(); err != nil { - return nil, err - } - cn, ok := <-p.ch - if !ok { - return nil, ErrClosed - } - return cn, nil - case stateClosed: - return nil, ErrClosed - default: - panic("not reached") - } - } - return nil, fmt.Errorf("pg: SingleConnPool.Get: infinite loop") + return p.cn, nil } func (p *SingleConnPool) Put(cn *Conn) { - defer func() { - if recover() != nil { - p.freeConn(cn) - } - }() - p.ch <- cn -} - -func (p *SingleConnPool) freeConn(cn *Conn) { - if err := p.badConnError(); err != nil { - p.pool.Remove(cn, err) - } else { - p.pool.Put(cn) + if p.cn != cn { + panic("p.cn != cn") } } -func (p *SingleConnPool) Remove(cn *Conn, reason error) { - defer func() { - if recover() != nil { - p.pool.Remove(cn, ErrClosed) - } - }() - p._badConnError.Store(BadConnError{wrapped: reason}) - p.ch <- cn +func (p *SingleConnPool) Remove(cn *Conn) { + if p.cn != cn { + panic("p.cn != cn") + } } func (p *SingleConnPool) Len() int { - switch atomic.LoadUint32(&p.state) { - case stateDefault: - return 0 - case stateInited: - return 1 - case stateClosed: - return 0 - default: - panic("not reached") - } + return 1 } func (p *SingleConnPool) IdleLen() int { - return len(p.ch) + return 0 } func (p *SingleConnPool) Stats() *Stats { - return &Stats{} + return nil } func (p *SingleConnPool) Close() error { - level := atomic.AddInt32(&p.level, -1) - if level > 0 { - return nil - } - - for i := 0; i < 1000; i++ { - state := atomic.LoadUint32(&p.state) - if state == stateClosed { - return ErrClosed - } - if atomic.CompareAndSwapUint32(&p.state, state, stateClosed) { - close(p.ch) - cn, ok := <-p.ch - if ok { - p.freeConn(cn) - } - return nil - } - } - - return fmt.Errorf("pg: SingleConnPool.Close: infinite loop") -} - -func (p *SingleConnPool) Reset() error { - if p.badConnError() == nil { - return nil - } - - select { - case cn, ok := <-p.ch: - if !ok { - return ErrClosed - } - p.pool.Remove(cn, ErrClosed) - p._badConnError.Store(BadConnError{wrapped: nil}) - default: - return fmt.Errorf("pg: SingleConnPool does not have a Conn") - } - - if !atomic.CompareAndSwapUint32(&p.state, stateInited, stateDefault) { - state := atomic.LoadUint32(&p.state) - return fmt.Errorf("pg: invalid SingleConnPool state: %d", state) - } - - return nil -} - -func (p *SingleConnPool) badConnError() error { - if v := p._badConnError.Load(); v != nil { - err := v.(BadConnError) - if err.wrapped != nil { - return err - } - } return nil } diff --git a/vendor/github.com/go-redis/redis/internal/pool/pool_sticky.go b/vendor/github.com/go-redis/redis/internal/pool/pool_sticky.go index 3e8f5036..91bd9133 100644 --- a/vendor/github.com/go-redis/redis/internal/pool/pool_sticky.go +++ b/vendor/github.com/go-redis/redis/internal/pool/pool_sticky.go @@ -55,13 +55,13 @@ func (p *StickyConnPool) putUpstream() { func (p *StickyConnPool) Put(cn *Conn) {} -func (p *StickyConnPool) removeUpstream(reason error) { - p.pool.Remove(p.cn, reason) +func (p *StickyConnPool) removeUpstream() { + p.pool.Remove(p.cn) p.cn = nil } -func (p *StickyConnPool) Remove(cn *Conn, reason error) { - p.removeUpstream(reason) +func (p *StickyConnPool) Remove(cn *Conn) { + p.removeUpstream() } func (p *StickyConnPool) Len() int { @@ -101,7 +101,7 @@ func (p *StickyConnPool) Close() error { if p.reusable { p.putUpstream() } else { - p.removeUpstream(ErrClosed) + p.removeUpstream() } } diff --git a/vendor/github.com/go-redis/redis/internal/singleflight/singleflight.go b/vendor/github.com/go-redis/redis/internal/singleflight/singleflight.go new file mode 100644 index 00000000..3b174172 --- /dev/null +++ b/vendor/github.com/go-redis/redis/internal/singleflight/singleflight.go @@ -0,0 +1,64 @@ +/* +Copyright 2013 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package singleflight provides a duplicate function call suppression +// mechanism. +package singleflight + +import "sync" + +// call is an in-flight or completed Do call +type call struct { + wg sync.WaitGroup + val interface{} + err error +} + +// Group represents a class of work and forms a namespace in which +// units of work can be executed with duplicate suppression. +type Group struct { + mu sync.Mutex // protects m + m map[string]*call // lazily initialized +} + +// Do executes and returns the results of the given function, making +// sure that only one execution is in-flight for a given key at a +// time. If a duplicate comes in, the duplicate caller waits for the +// original to complete and receives the same results. +func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) { + g.mu.Lock() + if g.m == nil { + g.m = make(map[string]*call) + } + if c, ok := g.m[key]; ok { + g.mu.Unlock() + c.wg.Wait() + return c.val, c.err + } + c := new(call) + c.wg.Add(1) + g.m[key] = c + g.mu.Unlock() + + c.val, c.err = fn() + c.wg.Done() + + g.mu.Lock() + delete(g.m, key) + g.mu.Unlock() + + return c.val, c.err +} diff --git a/vendor/github.com/go-redis/redis/internal/util.go b/vendor/github.com/go-redis/redis/internal/util.go index 80a60038..ffd2353e 100644 --- a/vendor/github.com/go-redis/redis/internal/util.go +++ b/vendor/github.com/go-redis/redis/internal/util.go @@ -27,13 +27,3 @@ func isLower(s string) bool { } return true } - -func Unwrap(err error) error { - u, ok := err.(interface { - Unwrap() error - }) - if !ok { - return nil - } - return u.Unwrap() -} diff --git a/vendor/github.com/go-redis/redis/options.go b/vendor/github.com/go-redis/redis/options.go index b6fabf3f..2b5bcb58 100644 --- a/vendor/github.com/go-redis/redis/options.go +++ b/vendor/github.com/go-redis/redis/options.go @@ -14,17 +14,6 @@ import ( "github.com/go-redis/redis/internal/pool" ) -// Limiter is the interface of a rate limiter or a circuit breaker. -type Limiter interface { - // Allow returns a nil if operation is allowed or an error otherwise. - // If operation is allowed client must report the result of operation - // whether is a success or a failure. - Allow() error - // ReportResult reports the result of previously allowed operation. - // nil indicates a success, non-nil error indicates a failure. - ReportResult(result error) -} - type Options struct { // The network type, either tcp or unix. // Default is tcp. @@ -59,7 +48,7 @@ type Options struct { // Default is 5 seconds. DialTimeout time.Duration // Timeout for socket reads. If reached, commands will fail - // with a timeout instead of blocking. Use value -1 for no timeout and 0 for default. + // with a timeout instead of blocking. // Default is 3 seconds. ReadTimeout time.Duration // Timeout for socket writes. If reached, commands will fail @@ -101,9 +90,6 @@ func (opt *Options) init() { if opt.Network == "" { opt.Network = "tcp" } - if opt.Addr == "" { - opt.Addr = "localhost:6379" - } if opt.Dialer == nil { opt.Dialer = func() (net.Conn, error) { netDialer := &net.Dialer{ diff --git a/vendor/github.com/go-redis/redis/pipeline.go b/vendor/github.com/go-redis/redis/pipeline.go index 2714ceb8..ba852283 100644 --- a/vendor/github.com/go-redis/redis/pipeline.go +++ b/vendor/github.com/go-redis/redis/pipeline.go @@ -8,22 +8,8 @@ import ( type pipelineExecer func([]Cmder) error -// Pipeliner is an mechanism to realise Redis Pipeline technique. -// -// Pipelining is a technique to extremely speed up processing by packing -// operations to batches, send them at once to Redis and read a replies in a -// singe step. -// See https://redis.io/topics/pipelining -// -// Pay attention, that Pipeline is not a transaction, so you can get unexpected -// results in case of big pipelines and small read/write timeouts. -// Redis client has retransmission logic in case of timeouts, pipeline -// can be retransmitted and commands can be executed more then once. -// To avoid this: it is good idea to use reasonable bigger read/write timeouts -// depends of your batch size and/or use TxPipeline. type Pipeliner interface { StatefulCmdable - Do(args ...interface{}) *Cmd Process(cmd Cmder) error Close() error Discard() error @@ -45,12 +31,6 @@ type Pipeline struct { closed bool } -func (c *Pipeline) Do(args ...interface{}) *Cmd { - cmd := NewCmd(args...) - _ = c.Process(cmd) - return cmd -} - // Process queues the cmd for later execution. func (c *Pipeline) Process(cmd Cmder) error { c.mu.Lock() diff --git a/vendor/github.com/go-redis/redis/pubsub.go b/vendor/github.com/go-redis/redis/pubsub.go index 03b01566..b08f34ad 100644 --- a/vendor/github.com/go-redis/redis/pubsub.go +++ b/vendor/github.com/go-redis/redis/pubsub.go @@ -1,9 +1,7 @@ package redis import ( - "errors" "fmt" - "strings" "sync" "time" @@ -12,9 +10,7 @@ import ( "github.com/go-redis/redis/internal/proto" ) -var errPingTimeout = errors.New("redis: ping timeout") - -// PubSub implements Pub/Sub commands as described in +// PubSub implements Pub/Sub commands bas described in // http://redis.io/topics/pubsub. Message receiving is NOT safe // for concurrent use by multiple goroutines. // @@ -30,9 +26,8 @@ type PubSub struct { cn *pool.Conn channels map[string]struct{} patterns map[string]struct{} - - closed bool - exit chan struct{} + closed bool + exit chan struct{} cmd *Cmd @@ -41,12 +36,6 @@ type PubSub struct { ping chan struct{} } -func (c *PubSub) String() string { - channels := mapKeys(c.channels) - channels = append(channels, mapKeys(c.patterns)...) - return fmt.Sprintf("PubSub(%s)", strings.Join(channels, ", ")) -} - func (c *PubSub) init() { c.exit = make(chan struct{}) } @@ -62,6 +51,7 @@ func (c *PubSub) _conn(newChannels []string) (*pool.Conn, error) { if c.closed { return nil, pool.ErrClosed } + if c.cn != nil { return c.cn, nil } @@ -397,39 +387,16 @@ func (c *PubSub) ReceiveMessage() (*Message, error) { // It periodically sends Ping messages to test connection health. // The channel is closed with PubSub. Receive* APIs can not be used // after channel is created. -// -// If the Go channel is full for 30 seconds the message is dropped. func (c *PubSub) Channel() <-chan *Message { - return c.channel(100) -} - -// ChannelSize is like Channel, but creates a Go channel -// with specified buffer size. -func (c *PubSub) ChannelSize(size int) <-chan *Message { - return c.channel(size) -} - -func (c *PubSub) channel(size int) <-chan *Message { - c.chOnce.Do(func() { - c.initChannel(size) - }) - if cap(c.ch) != size { - err := fmt.Errorf("redis: PubSub.Channel is called with different buffer size") - panic(err) - } + c.chOnce.Do(c.initChannel) return c.ch } -func (c *PubSub) initChannel(size int) { - const timeout = 30 * time.Second - - c.ch = make(chan *Message, size) - c.ping = make(chan struct{}, 1) +func (c *PubSub) initChannel() { + c.ch = make(chan *Message, 100) + c.ping = make(chan struct{}, 10) go func() { - timer := time.NewTimer(timeout) - timer.Stop() - var errCount int for { msg, err := c.Receive() @@ -444,7 +411,6 @@ func (c *PubSub) initChannel(size int) { errCount++ continue } - errCount = 0 // Any message is as good as a ping. @@ -459,28 +425,21 @@ func (c *PubSub) initChannel(size int) { case *Pong: // Ignore. case *Message: - timer.Reset(timeout) - select { - case c.ch <- msg: - if !timer.Stop() { - <-timer.C - } - case <-timer.C: - internal.Logf( - "redis: %s channel is full for %s (message is dropped)", - c, timeout) - } + c.ch <- msg default: - internal.Logf("redis: unknown message type: %T", msg) + internal.Logf("redis: unknown message: %T", msg) } } }() go func() { + const timeout = 5 * time.Second + timer := time.NewTimer(timeout) timer.Stop() healthy := true + var pingErr error for { timer.Reset(timeout) select { @@ -490,13 +449,10 @@ func (c *PubSub) initChannel(size int) { <-timer.C } case <-timer.C: - pingErr := c.Ping() + pingErr = c.Ping() if healthy { healthy = false } else { - if pingErr == nil { - pingErr = errPingTimeout - } c.mu.Lock() c._reconnect(pingErr) c.mu.Unlock() diff --git a/vendor/github.com/go-redis/redis/redis.go b/vendor/github.com/go-redis/redis/redis.go index 2a6013c3..3e72bf06 100644 --- a/vendor/github.com/go-redis/redis/redis.go +++ b/vendor/github.com/go-redis/redis/redis.go @@ -26,7 +26,6 @@ func SetLogger(logger *log.Logger) { type baseClient struct { opt *Options connPool pool.Pooler - limiter Limiter process func(Cmder) error processPipeline func([]Cmder) error @@ -51,80 +50,45 @@ func (c *baseClient) newConn() (*pool.Conn, error) { return nil, err } - err = c.initConn(cn) - if err != nil { - _ = c.connPool.CloseConn(cn) - return nil, err + if cn.InitedAt.IsZero() { + if err := c.initConn(cn); err != nil { + _ = c.connPool.CloseConn(cn) + return nil, err + } } return cn, nil } func (c *baseClient) getConn() (*pool.Conn, error) { - if c.limiter != nil { - err := c.limiter.Allow() - if err != nil { - return nil, err - } - } - - cn, err := c._getConn() - if err != nil { - if c.limiter != nil { - c.limiter.ReportResult(err) - } - return nil, err - } - return cn, nil -} - -func (c *baseClient) _getConn() (*pool.Conn, error) { cn, err := c.connPool.Get() if err != nil { return nil, err } - err = c.initConn(cn) - if err != nil { - c.connPool.Remove(cn, err) - if err := internal.Unwrap(err); err != nil { + if cn.InitedAt.IsZero() { + err := c.initConn(cn) + if err != nil { + c.connPool.Remove(cn) return nil, err } - return nil, err } return cn, nil } -func (c *baseClient) releaseConn(cn *pool.Conn, err error) { - if c.limiter != nil { - c.limiter.ReportResult(err) - } - +func (c *baseClient) releaseConn(cn *pool.Conn, err error) bool { if internal.IsBadConn(err, false) { - c.connPool.Remove(cn, err) - } else { - c.connPool.Put(cn) - } -} - -func (c *baseClient) releaseConnStrict(cn *pool.Conn, err error) { - if c.limiter != nil { - c.limiter.ReportResult(err) + c.connPool.Remove(cn) + return false } - if err == nil || internal.IsRedisError(err) { - c.connPool.Put(cn) - } else { - c.connPool.Remove(cn, err) - } + c.connPool.Put(cn) + return true } func (c *baseClient) initConn(cn *pool.Conn) error { - if cn.Inited { - return nil - } - cn.Inited = true + cn.InitedAt = time.Now() if c.opt.Password == "" && c.opt.DB == 0 && @@ -162,7 +126,7 @@ func (c *baseClient) initConn(cn *pool.Conn) error { // Do creates a Cmd from the args and processes the cmd. func (c *baseClient) Do(args ...interface{}) *Cmd { cmd := NewCmd(args...) - _ = c.Process(cmd) + c.Process(cmd) return cmd } @@ -204,7 +168,9 @@ func (c *baseClient) defaultProcess(cmd Cmder) error { return err } - err = cn.WithReader(c.cmdTimeout(cmd), cmd.readReply) + err = cn.WithReader(c.cmdTimeout(cmd), func(rd *proto.Reader) error { + return cmd.readReply(rd) + }) c.releaseConn(cn, err) if err != nil && internal.IsRetryableError(err, cmd.readTimeout() == nil) { continue @@ -222,11 +188,7 @@ func (c *baseClient) retryBackoff(attempt int) time.Duration { func (c *baseClient) cmdTimeout(cmd Cmder) time.Duration { if timeout := cmd.readTimeout(); timeout != nil { - t := *timeout - if t == 0 { - return 0 - } - return t + 10*time.Second + return readTimeout(*timeout) } return c.opt.ReadTimeout } @@ -238,7 +200,7 @@ func (c *baseClient) cmdTimeout(cmd Cmder) time.Duration { func (c *baseClient) Close() error { var firstErr error if c.onClose != nil { - if err := c.onClose(); err != nil { + if err := c.onClose(); err != nil && firstErr == nil { firstErr = err } } @@ -282,7 +244,12 @@ func (c *baseClient) generalProcessPipeline(cmds []Cmder, p pipelineProcessor) e } canRetry, err := p(cn, cmds) - c.releaseConnStrict(cn, err) + + if err == nil || internal.IsRedisError(err) { + c.connPool.Put(cn) + break + } + c.connPool.Remove(cn) if !canRetry || !internal.IsRetryableError(err, true) { break @@ -352,7 +319,7 @@ func txPipelineReadQueued(rd *proto.Reader, cmds []Cmder) error { return err } - for range cmds { + for _ = range cmds { err = statusCmd.readReply(rd) if err != nil && !internal.IsRedisError(err) { return err @@ -424,12 +391,12 @@ func (c *Client) WithContext(ctx context.Context) *Client { if ctx == nil { panic("nil context") } - c2 := c.clone() + c2 := c.copy() c2.ctx = ctx return c2 } -func (c *Client) clone() *Client { +func (c *Client) copy() *Client { cp := *c cp.init() return &cp @@ -440,11 +407,6 @@ func (c *Client) Options() *Options { return c.opt } -func (c *Client) SetLimiter(l Limiter) *Client { - c.limiter = l - return c -} - type PoolStats pool.Stats // PoolStats returns connection pool stats. @@ -493,30 +455,6 @@ func (c *Client) pubSub() *PubSub { // Subscribe subscribes the client to the specified channels. // Channels can be omitted to create empty subscription. -// Note that this method does not wait on a response from Redis, so the -// subscription may not be active immediately. To force the connection to wait, -// you may call the Receive() method on the returned *PubSub like so: -// -// sub := client.Subscribe(queryResp) -// iface, err := sub.Receive() -// if err != nil { -// // handle error -// } -// -// // Should be *Subscription, but others are possible if other actions have been -// // taken on sub since it was created. -// switch iface.(type) { -// case *Subscription: -// // subscribe succeeded -// case *Message: -// // received first message -// case *Pong: -// // pong received -// default: -// // handle error -// } -// -// ch := sub.Channel() func (c *Client) Subscribe(channels ...string) *PubSub { pubsub := c.pubSub() if len(channels) > 0 { @@ -544,12 +482,10 @@ type Conn struct { } func newConn(opt *Options, cn *pool.Conn) *Conn { - connPool := pool.NewSingleConnPool(nil) - connPool.SetConn(cn) c := Conn{ baseClient: baseClient{ opt: opt, - connPool: connPool, + connPool: pool.NewSingleConnPool(cn), }, } c.baseClient.init() diff --git a/vendor/github.com/go-redis/redis/ring.go b/vendor/github.com/go-redis/redis/ring.go index 5956b71a..3ded2806 100644 --- a/vendor/github.com/go-redis/redis/ring.go +++ b/vendor/github.com/go-redis/redis/ring.go @@ -273,13 +273,9 @@ func (c *ringShards) Heartbeat(frequency time.Duration) { // rebalance removes dead shards from the Ring. func (c *ringShards) rebalance() { - c.mu.RLock() - shards := c.shards - c.mu.RUnlock() - hash := newConsistentHash(c.opt) var shardsNum int - for name, shard := range shards { + for name, shard := range c.shards { if shard.IsUp() { hash.Add(name) shardsNum++ @@ -323,12 +319,12 @@ func (c *ringShards) Close() error { //------------------------------------------------------------------------------ -// Ring is a Redis client that uses consistent hashing to distribute +// Ring is a Redis client that uses constistent hashing to distribute // keys across multiple Redis servers (shards). It's safe for // concurrent use by multiple goroutines. // // Ring monitors the state of each shard and removes dead shards from -// the ring. When a shard comes online it is added back to the ring. This +// the ring. When shard comes online it is added back to the ring. This // gives you maximum availability and partition tolerance, but no // consistency between different shards or even clients. Each client // uses shards that are available to the client and does not do any @@ -346,7 +342,6 @@ type Ring struct { shards *ringShards cmdsInfoCache *cmdsInfoCache - process func(Cmder) error processPipeline func([]Cmder) error } @@ -359,10 +354,8 @@ func NewRing(opt *RingOptions) *Ring { } ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo) - ring.process = ring.defaultProcess ring.processPipeline = ring.defaultProcessPipeline - - ring.init() + ring.cmdable.setProcessor(ring.Process) for name, addr := range opt.Addrs { clopt := opt.clientOptions() @@ -375,10 +368,6 @@ func NewRing(opt *RingOptions) *Ring { return ring } -func (c *Ring) init() { - c.cmdable.setProcessor(c.Process) -} - func (c *Ring) Context() context.Context { if c.ctx != nil { return c.ctx @@ -390,15 +379,13 @@ func (c *Ring) WithContext(ctx context.Context) *Ring { if ctx == nil { panic("nil context") } - c2 := c.clone() + c2 := c.copy() c2.ctx = ctx return c2 } -func (c *Ring) clone() *Ring { +func (c *Ring) copy() *Ring { cp := *c - cp.init() - return &cp } @@ -539,34 +526,19 @@ func (c *Ring) Do(args ...interface{}) *Cmd { func (c *Ring) WrapProcess( fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error, ) { - c.process = fn(c.process) + c.ForEachShard(func(c *Client) error { + c.WrapProcess(fn) + return nil + }) } func (c *Ring) Process(cmd Cmder) error { - return c.process(cmd) -} - -func (c *Ring) defaultProcess(cmd Cmder) error { - for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { - if attempt > 0 { - time.Sleep(c.retryBackoff(attempt)) - } - - shard, err := c.cmdShard(cmd) - if err != nil { - cmd.setErr(err) - return err - } - - err = shard.Client.Process(cmd) - if err == nil { - return nil - } - if !internal.IsRetryableError(err, cmd.readTimeout() == nil) { - return err - } + shard, err := c.cmdShard(cmd) + if err != nil { + cmd.setErr(err) + return err } - return cmd.Err() + return shard.Client.Process(cmd) } func (c *Ring) Pipeline() Pipeliner { @@ -603,42 +575,36 @@ func (c *Ring) defaultProcessPipeline(cmds []Cmder) error { time.Sleep(c.retryBackoff(attempt)) } - var mu sync.Mutex var failedCmdsMap map[string][]Cmder - var wg sync.WaitGroup for hash, cmds := range cmdsMap { - wg.Add(1) - go func(hash string, cmds []Cmder) { - defer wg.Done() + shard, err := c.shards.GetByHash(hash) + if err != nil { + setCmdsErr(cmds, err) + continue + } - shard, err := c.shards.GetByHash(hash) - if err != nil { - setCmdsErr(cmds, err) - return + cn, err := shard.Client.getConn() + if err != nil { + setCmdsErr(cmds, err) + continue + } + + canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds) + if err == nil || internal.IsRedisError(err) { + shard.Client.connPool.Put(cn) + continue + } + shard.Client.connPool.Remove(cn) + + if canRetry && internal.IsRetryableError(err, true) { + if failedCmdsMap == nil { + failedCmdsMap = make(map[string][]Cmder) } - - cn, err := shard.Client.getConn() - if err != nil { - setCmdsErr(cmds, err) - return - } - - canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds) - shard.Client.releaseConnStrict(cn, err) - - if canRetry && internal.IsRetryableError(err, true) { - mu.Lock() - if failedCmdsMap == nil { - failedCmdsMap = make(map[string][]Cmder) - } - failedCmdsMap[hash] = cmds - mu.Unlock() - } - }(hash, cmds) + failedCmdsMap[hash] = cmds + } } - wg.Wait() if len(failedCmdsMap) == 0 { break } @@ -664,39 +630,6 @@ func (c *Ring) Close() error { return c.shards.Close() } -func (c *Ring) Watch(fn func(*Tx) error, keys ...string) error { - if len(keys) == 0 { - return fmt.Errorf("redis: Watch requires at least one key") - } - - var shards []*ringShard - for _, key := range keys { - if key != "" { - shard, err := c.shards.GetByKey(hashtag.Key(key)) - if err != nil { - return err - } - - shards = append(shards, shard) - } - } - - if len(shards) == 0 { - return fmt.Errorf("redis: Watch requires at least one shard") - } - - if len(shards) > 1 { - for _, shard := range shards[1:] { - if shard.Client != shards[0].Client { - err := fmt.Errorf("redis: Watch requires all keys to be in the same shard") - return err - } - } - } - - return shards[0].Client.Watch(fn, keys...) -} - func newConsistentHash(opt *RingOptions) *consistenthash.Map { return consistenthash.New(opt.HashReplicas, consistenthash.Hash(opt.Hash)) } diff --git a/vendor/github.com/go-redis/redis/sentinel.go b/vendor/github.com/go-redis/redis/sentinel.go index 503bbe7b..c5f71493 100644 --- a/vendor/github.com/go-redis/redis/sentinel.go +++ b/vendor/github.com/go-redis/redis/sentinel.go @@ -90,7 +90,9 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client { opt: opt, connPool: failover.Pool(), - onClose: failover.Close, + onClose: func() error { + return failover.Close() + }, }, } c.baseClient.init() @@ -117,7 +119,7 @@ func NewSentinelClient(opt *Options) *SentinelClient { return c } -func (c *SentinelClient) pubSub() *PubSub { +func (c *SentinelClient) PubSub() *PubSub { pubsub := &PubSub{ opt: c.opt, @@ -130,67 +132,14 @@ func (c *SentinelClient) pubSub() *PubSub { return pubsub } -// Subscribe subscribes the client to the specified channels. -// Channels can be omitted to create empty subscription. -func (c *SentinelClient) Subscribe(channels ...string) *PubSub { - pubsub := c.pubSub() - if len(channels) > 0 { - _ = pubsub.Subscribe(channels...) - } - return pubsub -} - -// PSubscribe subscribes the client to the given patterns. -// Patterns can be omitted to create empty subscription. -func (c *SentinelClient) PSubscribe(channels ...string) *PubSub { - pubsub := c.pubSub() - if len(channels) > 0 { - _ = pubsub.PSubscribe(channels...) - } - return pubsub -} - func (c *SentinelClient) GetMasterAddrByName(name string) *StringSliceCmd { - cmd := NewStringSliceCmd("sentinel", "get-master-addr-by-name", name) + cmd := NewStringSliceCmd("SENTINEL", "get-master-addr-by-name", name) c.Process(cmd) return cmd } func (c *SentinelClient) Sentinels(name string) *SliceCmd { - cmd := NewSliceCmd("sentinel", "sentinels", name) - c.Process(cmd) - return cmd -} - -// Failover forces a failover as if the master was not reachable, and without -// asking for agreement to other Sentinels. -func (c *SentinelClient) Failover(name string) *StatusCmd { - cmd := NewStatusCmd("sentinel", "failover", name) - c.Process(cmd) - return cmd -} - -// Reset resets all the masters with matching name. The pattern argument is a -// glob-style pattern. The reset process clears any previous state in a master -// (including a failover in progress), and removes every slave and sentinel -// already discovered and associated with the master. -func (c *SentinelClient) Reset(pattern string) *IntCmd { - cmd := NewIntCmd("sentinel", "reset", pattern) - c.Process(cmd) - return cmd -} - -// FlushConfig forces Sentinel to rewrite its configuration on disk, including -// the current Sentinel state. -func (c *SentinelClient) FlushConfig() *StatusCmd { - cmd := NewStatusCmd("sentinel", "flushconfig") - c.Process(cmd) - return cmd -} - -// Master shows the state and info of the specified master. -func (c *SentinelClient) Master(name string) *StringStringMapCmd { - cmd := NewStringStringMapCmd("sentinel", "master", name) + cmd := NewSliceCmd("SENTINEL", "sentinels", name) c.Process(cmd) return cmd } @@ -207,92 +156,79 @@ type sentinelFailover struct { masterName string _masterAddr string sentinel *SentinelClient - pubsub *PubSub } -func (c *sentinelFailover) Close() error { - c.mu.Lock() - defer c.mu.Unlock() - if c.sentinel != nil { - return c.closeSentinel() - } - return nil +func (d *sentinelFailover) Close() error { + return d.resetSentinel() } -func (c *sentinelFailover) Pool() *pool.ConnPool { - c.poolOnce.Do(func() { - c.opt.Dialer = c.dial - c.pool = newConnPool(c.opt) +func (d *sentinelFailover) Pool() *pool.ConnPool { + d.poolOnce.Do(func() { + d.opt.Dialer = d.dial + d.pool = newConnPool(d.opt) }) - return c.pool + return d.pool } -func (c *sentinelFailover) dial() (net.Conn, error) { - addr, err := c.MasterAddr() +func (d *sentinelFailover) dial() (net.Conn, error) { + addr, err := d.MasterAddr() if err != nil { return nil, err } - return net.DialTimeout("tcp", addr, c.opt.DialTimeout) + return net.DialTimeout("tcp", addr, d.opt.DialTimeout) } -func (c *sentinelFailover) MasterAddr() (string, error) { - addr, err := c.masterAddr() +func (d *sentinelFailover) MasterAddr() (string, error) { + d.mu.Lock() + defer d.mu.Unlock() + + addr, err := d.masterAddr() if err != nil { return "", err } - c.switchMaster(addr) + d._switchMaster(addr) + return addr, nil } -func (c *sentinelFailover) masterAddr() (string, error) { - c.mu.RLock() - addr := c.getMasterAddr() - c.mu.RUnlock() - if addr != "" { - return addr, nil +func (d *sentinelFailover) masterAddr() (string, error) { + // Try last working sentinel. + if d.sentinel != nil { + addr, err := d.sentinel.GetMasterAddrByName(d.masterName).Result() + if err == nil { + addr := net.JoinHostPort(addr[0], addr[1]) + return addr, nil + } + + internal.Logf("sentinel: GetMasterAddrByName name=%q failed: %s", + d.masterName, err) + d._resetSentinel() } - c.mu.Lock() - defer c.mu.Unlock() - - addr = c.getMasterAddr() - if addr != "" { - return addr, nil - } - - if c.sentinel != nil { - c.closeSentinel() - } - - for i, sentinelAddr := range c.sentinelAddrs { + for i, sentinelAddr := range d.sentinelAddrs { sentinel := NewSentinelClient(&Options{ Addr: sentinelAddr, - MaxRetries: c.opt.MaxRetries, + DialTimeout: d.opt.DialTimeout, + ReadTimeout: d.opt.ReadTimeout, + WriteTimeout: d.opt.WriteTimeout, - DialTimeout: c.opt.DialTimeout, - ReadTimeout: c.opt.ReadTimeout, - WriteTimeout: c.opt.WriteTimeout, - - PoolSize: c.opt.PoolSize, - PoolTimeout: c.opt.PoolTimeout, - IdleTimeout: c.opt.IdleTimeout, - IdleCheckFrequency: c.opt.IdleCheckFrequency, - - TLSConfig: c.opt.TLSConfig, + PoolSize: d.opt.PoolSize, + PoolTimeout: d.opt.PoolTimeout, + IdleTimeout: d.opt.IdleTimeout, }) - masterAddr, err := sentinel.GetMasterAddrByName(c.masterName).Result() + masterAddr, err := sentinel.GetMasterAddrByName(d.masterName).Result() if err != nil { internal.Logf("sentinel: GetMasterAddrByName master=%q failed: %s", - c.masterName, err) - _ = sentinel.Close() + d.masterName, err) + sentinel.Close() continue } // Push working sentinel to the top. - c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0] - c.setSentinel(sentinel) + d.sentinelAddrs[0], d.sentinelAddrs[i] = d.sentinelAddrs[i], d.sentinelAddrs[0] + d.setSentinel(sentinel) addr := net.JoinHostPort(masterAddr[0], masterAddr[1]) return addr, nil @@ -301,34 +237,17 @@ func (c *sentinelFailover) masterAddr() (string, error) { return "", errors.New("redis: all sentinels are unreachable") } -func (c *sentinelFailover) getMasterAddr() string { - sentinel := c.sentinel - - if sentinel == nil { - return "" - } - - addr, err := sentinel.GetMasterAddrByName(c.masterName).Result() - if err != nil { - internal.Logf("sentinel: GetMasterAddrByName name=%q failed: %s", - c.masterName, err) - return "" - } - - return net.JoinHostPort(addr[0], addr[1]) +func (c *sentinelFailover) switchMaster(addr string) { + c.mu.Lock() + c._switchMaster(addr) + c.mu.Unlock() } -func (c *sentinelFailover) switchMaster(addr string) { - c.mu.RLock() - masterAddr := c._masterAddr - c.mu.RUnlock() - if masterAddr == addr { +func (c *sentinelFailover) _switchMaster(addr string) { + if c._masterAddr == addr { return } - c.mu.Lock() - defer c.mu.Unlock() - internal.Logf("sentinel: new master=%q addr=%q", c.masterName, addr) _ = c.Pool().Filter(func(cn *pool.Conn) bool { @@ -337,36 +256,32 @@ func (c *sentinelFailover) switchMaster(addr string) { c._masterAddr = addr } -func (c *sentinelFailover) setSentinel(sentinel *SentinelClient) { - c.discoverSentinels(sentinel) - c.sentinel = sentinel - - c.pubsub = sentinel.Subscribe("+switch-master") - go c.listen(c.pubsub) +func (d *sentinelFailover) setSentinel(sentinel *SentinelClient) { + d.discoverSentinels(sentinel) + d.sentinel = sentinel + go d.listen(sentinel) } -func (c *sentinelFailover) closeSentinel() error { - var firstErr error - - err := c.pubsub.Close() - if err != nil && firstErr == err { - firstErr = err +func (d *sentinelFailover) resetSentinel() error { + var err error + d.mu.Lock() + if d.sentinel != nil { + err = d._resetSentinel() } - c.pubsub = nil - - err = c.sentinel.Close() - if err != nil && firstErr == err { - firstErr = err - } - c.sentinel = nil - - return firstErr + d.mu.Unlock() + return err } -func (c *sentinelFailover) discoverSentinels(sentinel *SentinelClient) { - sentinels, err := sentinel.Sentinels(c.masterName).Result() +func (d *sentinelFailover) _resetSentinel() error { + err := d.sentinel.Close() + d.sentinel = nil + return err +} + +func (d *sentinelFailover) discoverSentinels(sentinel *SentinelClient) { + sentinels, err := sentinel.Sentinels(d.masterName).Result() if err != nil { - internal.Logf("sentinel: Sentinels master=%q failed: %s", c.masterName, err) + internal.Logf("sentinel: Sentinels master=%q failed: %s", d.masterName, err) return } for _, sentinel := range sentinels { @@ -375,32 +290,49 @@ func (c *sentinelFailover) discoverSentinels(sentinel *SentinelClient) { key := vals[i].(string) if key == "name" { sentinelAddr := vals[i+1].(string) - if !contains(c.sentinelAddrs, sentinelAddr) { - internal.Logf("sentinel: discovered new sentinel=%q for master=%q", - sentinelAddr, c.masterName) - c.sentinelAddrs = append(c.sentinelAddrs, sentinelAddr) + if !contains(d.sentinelAddrs, sentinelAddr) { + internal.Logf( + "sentinel: discovered new sentinel=%q for master=%q", + sentinelAddr, d.masterName, + ) + d.sentinelAddrs = append(d.sentinelAddrs, sentinelAddr) } } } } } -func (c *sentinelFailover) listen(pubsub *PubSub) { - ch := pubsub.Channel() +func (d *sentinelFailover) listen(sentinel *SentinelClient) { + pubsub := sentinel.PubSub() + defer pubsub.Close() + + err := pubsub.Subscribe("+switch-master") + if err != nil { + internal.Logf("sentinel: Subscribe failed: %s", err) + d.resetSentinel() + return + } + for { - msg, ok := <-ch - if !ok { - break + msg, err := pubsub.ReceiveMessage() + if err != nil { + if err == pool.ErrClosed { + d.resetSentinel() + return + } + internal.Logf("sentinel: ReceiveMessage failed: %s", err) + continue } - if msg.Channel == "+switch-master" { + switch msg.Channel { + case "+switch-master": parts := strings.Split(msg.Payload, " ") - if parts[0] != c.masterName { + if parts[0] != d.masterName { internal.Logf("sentinel: ignore addr for master=%q", parts[0]) continue } addr := net.JoinHostPort(parts[3], parts[4]) - c.switchMaster(addr) + d.switchMaster(addr) } } } diff --git a/vendor/github.com/go-redis/redis/tx.go b/vendor/github.com/go-redis/redis/tx.go index fb3e6331..6a7da99d 100644 --- a/vendor/github.com/go-redis/redis/tx.go +++ b/vendor/github.com/go-redis/redis/tx.go @@ -29,10 +29,10 @@ func (c *Client) newTx() *Tx { return &tx } -// Watch prepares a transaction and marks the keys to be watched +// Watch prepares a transcaction and marks the keys to be watched // for conditional execution if there are any keys. // -// The transaction is automatically closed when fn exits. +// The transaction is automatically closed when the fn exits. func (c *Client) Watch(fn func(*Tx) error, keys ...string) error { tx := c.newTx() if len(keys) > 0 { diff --git a/vendor/github.com/go-redis/redis/universal.go b/vendor/github.com/go-redis/redis/universal.go index 03bfa0fa..a6075624 100644 --- a/vendor/github.com/go-redis/redis/universal.go +++ b/vendor/github.com/go-redis/redis/universal.go @@ -155,7 +155,6 @@ type UniversalClient interface { Watch(fn func(*Tx) error, keys ...string) error Process(cmd Cmder) error WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) - WrapProcessPipeline(fn func(oldProcess func([]Cmder) error) func([]Cmder) error) Subscribe(channels ...string) *PubSub PSubscribe(channels ...string) *PubSub Close() error diff --git a/vendor/modules.txt b/vendor/modules.txt index ff8082d7..762d0201 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -60,13 +60,14 @@ github.com/go-openapi/jsonreference github.com/go-openapi/spec # github.com/go-openapi/swag v0.19.5 github.com/go-openapi/swag -# github.com/go-redis/redis v6.15.7+incompatible +# github.com/go-redis/redis v6.14.0+incompatible github.com/go-redis/redis github.com/go-redis/redis/internal github.com/go-redis/redis/internal/consistenthash github.com/go-redis/redis/internal/hashtag github.com/go-redis/redis/internal/pool github.com/go-redis/redis/internal/proto +github.com/go-redis/redis/internal/singleflight github.com/go-redis/redis/internal/util # github.com/go-sql-driver/mysql v1.5.0 github.com/go-sql-driver/mysql