Update module go-redis/redis to v7 (#277)

Update module go-redis/redis to v7

Reviewed-on: https://kolaente.dev/vikunja/api/pulls/277
This commit is contained in:
renovate 2020-04-09 04:35:36 +00:00 committed by konrad
parent 4ca3f714ea
commit 15d718fb1a
25 changed files with 500 additions and 1099 deletions

4
go.mod
View file

@ -34,12 +34,12 @@ require (
github.com/garyburd/redigo v1.6.0 // indirect
github.com/go-openapi/jsonreference v0.19.3 // indirect
github.com/go-openapi/spec v0.19.4 // indirect
github.com/go-redis/redis v6.15.7+incompatible
github.com/go-redis/redis v6.14.0+incompatible
github.com/go-redis/redis/v7 v7.2.0 // indirect
github.com/go-sql-driver/mysql v1.5.0
github.com/go-testfixtures/testfixtures/v3 v3.1.1
github.com/go-xorm/core v0.6.2 // indirect
github.com/go-xorm/xorm v0.7.9 // indirect
github.com/golang/protobuf v1.3.2 // indirect
github.com/gordonklaus/ineffassign v0.0.0-20200309095847-7953dde2c7bf
github.com/imdario/mergo v0.3.9
github.com/jgautheron/goconst v0.0.0-20200227150835-cda7ea3bf591

7
go.sum
View file

@ -117,11 +117,13 @@ github.com/go-openapi/swag v0.17.0/go.mod h1:AByQ+nYG6gQg71GINrmuDXCPWdL640yX49/
github.com/go-openapi/swag v0.19.2/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk=
github.com/go-openapi/swag v0.19.5 h1:lTz6Ys4CmqqCQmZPBlbQENR1/GucA2bzYTE12Pw4tFY=
github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk=
github.com/go-redis/redis v6.14.0+incompatible h1:AMPZkM7PbsJbilelrJUAyC4xQbGROTOLSuDd7fnMXCI=
github.com/go-redis/redis v6.14.0+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-redis/redis v6.15.2+incompatible h1:9SpNVG76gr6InJGxoZ6IuuxaCOQwDAhzyXg+Bs+0Sb4=
github.com/go-redis/redis v6.15.2+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-redis/redis v6.15.7+incompatible h1:3skhDh95XQMpnqeqNftPkQD9jL9e5e36z/1SUm6dy1U=
github.com/go-redis/redis v6.15.7+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-redis/redis/v7 v7.2.0/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg=
github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA=
github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs=
@ -281,9 +283,11 @@ github.com/olekukonko/tablewriter v0.0.4/go.mod h1:zq6QwlOf5SlnkVbMSr5EoBv3636FW
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7 h1:lDH9UUVJtmYCjyT0CI4q8xvlXPxeZ0gYCVvWbmPlp88=
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk=
github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw=
@ -460,6 +464,7 @@ golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
@ -492,6 +497,7 @@ golang.org/x/sys v0.0.0-20190616124812-15dcb6c0061f/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb h1:fgwFCsaw9buMuxNd6+DQfAuSFqbNiQZpcgJQAgJsK6k=
golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191010194322-b09406accb47/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -549,6 +555,7 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/d4l3k/messagediff.v1 v1.2.1 h1:70AthpjunwzUiarMHyED52mj9UwtAnE89l1Gmrt3EU0=
gopkg.in/d4l3k/messagediff.v1 v1.2.1/go.mod h1:EUzikiKadqXWcD1AzJLagx0j/BeeWGtn++04Xniyg44=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=

View file

@ -5,10 +5,10 @@ services:
- redis-server
go:
- 1.7.x
- 1.8.x
- 1.9.x
- 1.10.x
- 1.11.x
- 1.12.x
- tip
matrix:

View file

@ -1,9 +1,5 @@
# Changelog
## Unreleased
- Cluster and Ring pipelines process commands for each node in its own goroutine.
## 6.14
- Added Options.MinIdleConns.

View file

@ -3,8 +3,6 @@ all: testdeps
go test ./... -short -race
env GOOS=linux GOARCH=386 go test ./...
go vet
go get github.com/gordonklaus/ineffassign
ineffassign .
testdeps: testdata/redis/src/redis-server
@ -15,7 +13,7 @@ bench: testdeps
testdata/redis:
mkdir -p $@
wget -qO- https://github.com/antirez/redis/archive/5.0.tar.gz | tar xvz --strip-components=1 -C $@
wget -qO- https://github.com/antirez/redis/archive/unstable.tar.gz | tar xvz --strip-components=1 -C $@
testdata/redis/src/redis-server: testdata/redis
sed -i.bak 's/libjemalloc.a/libjemalloc.a -lrt/g' $</src/Makefile

View file

@ -9,7 +9,7 @@ Supports:
- Redis 3 commands except QUIT, MONITOR, SLOWLOG and SYNC.
- Automatic connection pooling with [circuit breaker](https://en.wikipedia.org/wiki/Circuit_breaker_design_pattern) support.
- [Pub/Sub](https://godoc.org/github.com/go-redis/redis#PubSub).
- [Transactions](https://godoc.org/github.com/go-redis/redis#example-Client-TxPipeline).
- [Transactions](https://godoc.org/github.com/go-redis/redis#Multi).
- [Pipeline](https://godoc.org/github.com/go-redis/redis#example-Client-Pipeline) and [TxPipeline](https://godoc.org/github.com/go-redis/redis#example-Client-TxPipeline).
- [Scripting](https://godoc.org/github.com/go-redis/redis#Script).
- [Timeouts](https://godoc.org/github.com/go-redis/redis#Options).
@ -143,4 +143,4 @@ BenchmarkRedisClusterPing-4 100000 11535 ns/op 117 B/op
- [Golang PostgreSQL ORM](https://github.com/go-pg/pg)
- [Golang msgpack](https://github.com/vmihailenco/msgpack)
- [Golang message task queue](https://github.com/vmihailenco/taskq)
- [Golang message task queue](https://github.com/go-msgqueue/msgqueue)

View file

@ -3,6 +3,7 @@ package redis
import (
"context"
"crypto/tls"
"errors"
"fmt"
"math"
"math/rand"
@ -17,6 +18,7 @@ import (
"github.com/go-redis/redis/internal/hashtag"
"github.com/go-redis/redis/internal/pool"
"github.com/go-redis/redis/internal/proto"
"github.com/go-redis/redis/internal/singleflight"
)
var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
@ -48,9 +50,6 @@ type ClusterOptions struct {
// and Cluster.ReloadState to manually trigger state reloading.
ClusterSlots func() ([]ClusterSlot, error)
// Optional hook that is called when a new node is created.
OnNewNode func(*Client)
// Following options are copied from Options struct.
OnConnect func(*Conn) error
@ -83,7 +82,7 @@ func (opt *ClusterOptions) init() {
opt.MaxRedirects = 8
}
if (opt.RouteByLatency || opt.RouteRandomly) && opt.ClusterSlots == nil {
if opt.RouteByLatency || opt.RouteRandomly {
opt.ReadOnly = true
}
@ -167,10 +166,6 @@ func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
go node.updateLatency()
}
if clOpt.OnNewNode != nil {
clOpt.OnNewNode(node.Client)
}
return &node
}
@ -242,6 +237,8 @@ type clusterNodes struct {
clusterAddrs []string
closed bool
nodeCreateGroup singleflight.Group
_generation uint32 // atomic
}
@ -344,6 +341,11 @@ func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
return node, nil
}
v, err := c.nodeCreateGroup.Do(addr, func() (interface{}, error) {
node := newClusterNode(c.opt, addr)
return node, nil
})
c.mu.Lock()
defer c.mu.Unlock()
@ -353,13 +355,15 @@ func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
node, ok := c.allNodes[addr]
if ok {
_ = v.(*clusterNode).Close()
return node, err
}
node = newClusterNode(c.opt, addr)
node = v.(*clusterNode)
c.allAddrs = appendIfNotExists(c.allAddrs, addr)
c.clusterAddrs = append(c.clusterAddrs, addr)
if err == nil {
c.clusterAddrs = append(c.clusterAddrs, addr)
}
c.allNodes[addr] = node
return node, err
@ -529,12 +533,10 @@ func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) {
n := rand.Intn(len(nodes)-1) + 1
slave = nodes[n]
if !slave.Loading() {
return slave, nil
break
}
}
// All slaves are loading - use master.
return nodes[0], nil
return slave, nil
}
}
@ -578,12 +580,23 @@ func (c *clusterState) slotNodes(slot int) []*clusterNode {
return nil
}
func (c *clusterState) IsConsistent() bool {
if c.nodes.opt.ClusterSlots != nil {
return true
}
return len(c.Masters) <= len(c.Slaves)
}
//------------------------------------------------------------------------------
type clusterStateHolder struct {
load func() (*clusterState, error)
state atomic.Value
state atomic.Value
firstErrMu sync.RWMutex
firstErr error
reloading uint32 // atomic
}
@ -594,8 +607,24 @@ func newClusterStateHolder(fn func() (*clusterState, error)) *clusterStateHolder
}
func (c *clusterStateHolder) Reload() (*clusterState, error) {
state, err := c.reload()
if err != nil {
return nil, err
}
if !state.IsConsistent() {
time.AfterFunc(time.Second, c.LazyReload)
}
return state, nil
}
func (c *clusterStateHolder) reload() (*clusterState, error) {
state, err := c.load()
if err != nil {
c.firstErrMu.Lock()
if c.firstErr == nil {
c.firstErr = err
}
c.firstErrMu.Unlock()
return nil, err
}
c.state.Store(state)
@ -609,11 +638,16 @@ func (c *clusterStateHolder) LazyReload() {
go func() {
defer atomic.StoreUint32(&c.reloading, 0)
_, err := c.Reload()
if err != nil {
return
for {
state, err := c.reload()
if err != nil {
return
}
time.Sleep(100 * time.Millisecond)
if state.IsConsistent() {
return
}
}
time.Sleep(100 * time.Millisecond)
}()
}
@ -626,7 +660,15 @@ func (c *clusterStateHolder) Get() (*clusterState, error) {
}
return state, nil
}
return c.Reload()
c.firstErrMu.RLock()
err := c.firstErr
c.firstErrMu.RUnlock()
if err != nil {
return nil, err
}
return nil, errors.New("redis: cluster has no state")
}
func (c *clusterStateHolder) ReloadOrGet() (*clusterState, error) {
@ -674,6 +716,10 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
c.processTxPipeline = c.defaultProcessTxPipeline
c.init()
_, _ = c.state.Reload()
_, _ = c.cmdsInfoCache.Get()
if opt.IdleCheckFrequency > 0 {
go c.reaper(opt.IdleCheckFrequency)
}
@ -681,17 +727,17 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
return c
}
func (c *ClusterClient) init() {
c.cmdable.setProcessor(c.Process)
}
// ReloadState reloads cluster state. If available it calls ClusterSlots func
// ReloadState reloads cluster state. It calls ClusterSlots func
// to get cluster slots information.
func (c *ClusterClient) ReloadState() error {
_, err := c.state.Reload()
return err
}
func (c *ClusterClient) init() {
c.cmdable.setProcessor(c.Process)
}
func (c *ClusterClient) Context() context.Context {
if c.ctx != nil {
return c.ctx
@ -703,12 +749,12 @@ func (c *ClusterClient) WithContext(ctx context.Context) *ClusterClient {
if ctx == nil {
panic("nil context")
}
c2 := c.clone()
c2 := c.copy()
c2.ctx = ctx
return c2
}
func (c *ClusterClient) clone() *ClusterClient {
func (c *ClusterClient) copy() *ClusterClient {
cp := *c
cp.init()
return &cp
@ -772,11 +818,6 @@ func cmdSlot(cmd Cmder, pos int) int {
}
func (c *ClusterClient) cmdSlot(cmd Cmder) int {
args := cmd.Args()
if args[0] == "cluster" && args[1] == "getkeysinslot" {
return args[2].(int)
}
cmdInfo := c.cmdInfo(cmd.Name())
return cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo))
}
@ -788,9 +829,9 @@ func (c *ClusterClient) cmdSlotAndNode(cmd Cmder) (int, *clusterNode, error) {
}
cmdInfo := c.cmdInfo(cmd.Name())
slot := c.cmdSlot(cmd)
slot := cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo))
if c.opt.ReadOnly && cmdInfo != nil && cmdInfo.ReadOnly {
if cmdInfo != nil && cmdInfo.ReadOnly && c.opt.ReadOnly {
if c.opt.RouteByLatency {
node, err := state.slotClosestNode(slot)
return slot, node, err
@ -849,12 +890,15 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
if err == nil {
break
}
if err != Nil {
if internal.IsRetryableError(err, true) {
c.state.LazyReload()
continue
}
moved, ask, addr := internal.IsMovedError(err)
if moved || ask {
c.state.LazyReload()
node, err = c.nodes.GetOrCreate(addr)
if err != nil {
return err
@ -862,7 +906,7 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
continue
}
if err == pool.ErrClosed || internal.IsReadOnlyError(err) {
if err == pool.ErrClosed {
node, err = c.slotMasterNode(slot)
if err != nil {
return err
@ -870,10 +914,6 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
continue
}
if internal.IsRetryableError(err, true) {
continue
}
return err
}
@ -938,34 +978,16 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error {
if err == nil {
break
}
if err != Nil {
c.state.LazyReload()
}
// If slave is loading - pick another node.
// If slave is loading - read from master.
if c.opt.ReadOnly && internal.IsLoadingError(err) {
node.MarkAsLoading()
node = nil
continue
}
var moved bool
var addr string
moved, ask, addr = internal.IsMovedError(err)
if moved || ask {
node, err = c.nodes.GetOrCreate(addr)
if err != nil {
break
}
continue
}
if err == pool.ErrClosed || internal.IsReadOnlyError(err) {
node = nil
continue
}
if internal.IsRetryableError(err, true) {
c.state.LazyReload()
// First retry the same node.
if attempt == 0 {
continue
@ -979,6 +1001,24 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error {
continue
}
var moved bool
var addr string
moved, ask, addr = internal.IsMovedError(err)
if moved || ask {
c.state.LazyReload()
node, err = c.nodes.GetOrCreate(addr)
if err != nil {
break
}
continue
}
if err == pool.ErrClosed {
node = nil
continue
}
break
}
@ -1198,12 +1238,10 @@ func (c *ClusterClient) WrapProcessPipeline(
fn func(oldProcess func([]Cmder) error) func([]Cmder) error,
) {
c.processPipeline = fn(c.processPipeline)
c.processTxPipeline = fn(c.processTxPipeline)
}
func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error {
cmdsMap := newCmdsMap()
err := c.mapCmdsByNode(cmds, cmdsMap)
cmdsMap, err := c.mapCmdsByNode(cmds)
if err != nil {
setCmdsErr(cmds, err)
return err
@ -1214,31 +1252,28 @@ func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error {
time.Sleep(c.retryBackoff(attempt))
}
failedCmds := newCmdsMap()
var wg sync.WaitGroup
failedCmds := make(map[*clusterNode][]Cmder)
for node, cmds := range cmdsMap.m {
wg.Add(1)
go func(node *clusterNode, cmds []Cmder) {
defer wg.Done()
cn, err := node.Client.getConn()
if err != nil {
if err == pool.ErrClosed {
c.mapCmdsByNode(cmds, failedCmds)
} else {
setCmdsErr(cmds, err)
}
return
for node, cmds := range cmdsMap {
cn, err := node.Client.getConn()
if err != nil {
if err == pool.ErrClosed {
c.remapCmds(cmds, failedCmds)
} else {
setCmdsErr(cmds, err)
}
continue
}
err = c.pipelineProcessCmds(node, cn, cmds, failedCmds)
node.Client.releaseConnStrict(cn, err)
}(node, cmds)
err = c.pipelineProcessCmds(node, cn, cmds, failedCmds)
if err == nil || internal.IsRedisError(err) {
node.Client.connPool.Put(cn)
} else {
node.Client.connPool.Remove(cn)
}
}
wg.Wait()
if len(failedCmds.m) == 0 {
if len(failedCmds) == 0 {
break
}
cmdsMap = failedCmds
@ -1247,24 +1282,14 @@ func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error {
return cmdsFirstErr(cmds)
}
type cmdsMap struct {
mu sync.Mutex
m map[*clusterNode][]Cmder
}
func newCmdsMap() *cmdsMap {
return &cmdsMap{
m: make(map[*clusterNode][]Cmder),
}
}
func (c *ClusterClient) mapCmdsByNode(cmds []Cmder, cmdsMap *cmdsMap) error {
func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, error) {
state, err := c.state.Get()
if err != nil {
setCmdsErr(cmds, err)
return err
return nil, err
}
cmdsMap := make(map[*clusterNode][]Cmder)
cmdsAreReadOnly := c.cmdsAreReadOnly(cmds)
for _, cmd := range cmds {
var node *clusterNode
@ -1276,13 +1301,11 @@ func (c *ClusterClient) mapCmdsByNode(cmds []Cmder, cmdsMap *cmdsMap) error {
node, err = state.slotMasterNode(slot)
}
if err != nil {
return err
return nil, err
}
cmdsMap.mu.Lock()
cmdsMap.m[node] = append(cmdsMap.m[node], cmd)
cmdsMap.mu.Unlock()
cmdsMap[node] = append(cmdsMap[node], cmd)
}
return nil
return cmdsMap, nil
}
func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool {
@ -1295,30 +1318,39 @@ func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool {
return true
}
func (c *ClusterClient) remapCmds(cmds []Cmder, failedCmds map[*clusterNode][]Cmder) {
remappedCmds, err := c.mapCmdsByNode(cmds)
if err != nil {
setCmdsErr(cmds, err)
return
}
for node, cmds := range remappedCmds {
failedCmds[node] = cmds
}
}
func (c *ClusterClient) pipelineProcessCmds(
node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap,
node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
) error {
err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
return writeCmd(wr, cmds...)
})
if err != nil {
setCmdsErr(cmds, err)
failedCmds.mu.Lock()
failedCmds.m[node] = cmds
failedCmds.mu.Unlock()
failedCmds[node] = cmds
return err
}
err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error {
return c.pipelineReadCmds(node, rd, cmds, failedCmds)
return c.pipelineReadCmds(rd, cmds, failedCmds)
})
return err
}
func (c *ClusterClient) pipelineReadCmds(
node *clusterNode, rd *proto.Reader, cmds []Cmder, failedCmds *cmdsMap,
rd *proto.Reader, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
) error {
var firstErr error
for _, cmd := range cmds {
err := cmd.readReply(rd)
if err == nil {
@ -1333,18 +1365,13 @@ func (c *ClusterClient) pipelineReadCmds(
continue
}
failedCmds.mu.Lock()
failedCmds.m[node] = append(failedCmds.m[node], cmd)
failedCmds.mu.Unlock()
if firstErr == nil {
firstErr = err
}
return err
}
return firstErr
return nil
}
func (c *ClusterClient) checkMovedErr(
cmd Cmder, err error, failedCmds *cmdsMap,
cmd Cmder, err error, failedCmds map[*clusterNode][]Cmder,
) bool {
moved, ask, addr := internal.IsMovedError(err)
@ -1356,9 +1383,7 @@ func (c *ClusterClient) checkMovedErr(
return false
}
failedCmds.mu.Lock()
failedCmds.m[node] = append(failedCmds.m[node], cmd)
failedCmds.mu.Unlock()
failedCmds[node] = append(failedCmds[node], cmd)
return true
}
@ -1368,9 +1393,7 @@ func (c *ClusterClient) checkMovedErr(
return false
}
failedCmds.mu.Lock()
failedCmds.m[node] = append(failedCmds.m[node], NewCmd("ASKING"), cmd)
failedCmds.mu.Unlock()
failedCmds[node] = append(failedCmds[node], NewCmd("ASKING"), cmd)
return true
}
@ -1410,34 +1433,31 @@ func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error {
time.Sleep(c.retryBackoff(attempt))
}
failedCmds := newCmdsMap()
var wg sync.WaitGroup
failedCmds := make(map[*clusterNode][]Cmder)
for node, cmds := range cmdsMap {
wg.Add(1)
go func(node *clusterNode, cmds []Cmder) {
defer wg.Done()
cn, err := node.Client.getConn()
if err != nil {
if err == pool.ErrClosed {
c.mapCmdsByNode(cmds, failedCmds)
} else {
setCmdsErr(cmds, err)
}
return
cn, err := node.Client.getConn()
if err != nil {
if err == pool.ErrClosed {
c.remapCmds(cmds, failedCmds)
} else {
setCmdsErr(cmds, err)
}
continue
}
err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds)
node.Client.releaseConnStrict(cn, err)
}(node, cmds)
err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds)
if err == nil || internal.IsRedisError(err) {
node.Client.connPool.Put(cn)
} else {
node.Client.connPool.Remove(cn)
}
}
wg.Wait()
if len(failedCmds.m) == 0 {
if len(failedCmds) == 0 {
break
}
cmdsMap = failedCmds.m
cmdsMap = failedCmds
}
}
@ -1454,16 +1474,14 @@ func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
}
func (c *ClusterClient) txPipelineProcessCmds(
node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap,
node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
) error {
err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
return txPipelineWriteMulti(wr, cmds)
})
if err != nil {
setCmdsErr(cmds, err)
failedCmds.mu.Lock()
failedCmds.m[node] = cmds
failedCmds.mu.Unlock()
failedCmds[node] = cmds
return err
}
@ -1479,7 +1497,7 @@ func (c *ClusterClient) txPipelineProcessCmds(
}
func (c *ClusterClient) txPipelineReadQueued(
rd *proto.Reader, cmds []Cmder, failedCmds *cmdsMap,
rd *proto.Reader, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
) error {
// Parse queued replies.
var statusCmd StatusCmd
@ -1528,51 +1546,40 @@ func (c *ClusterClient) txPipelineReadQueued(
return nil
}
func (c *ClusterClient) pubSub() *PubSub {
func (c *ClusterClient) pubSub(channels []string) *PubSub {
var node *clusterNode
pubsub := &PubSub{
opt: c.opt.clientOptions(),
newConn: func(channels []string) (*pool.Conn, error) {
if node != nil {
panic("node != nil")
}
if node == nil {
var slot int
if len(channels) > 0 {
slot = hashtag.Slot(channels[0])
} else {
slot = -1
}
var err error
if len(channels) > 0 {
slot := hashtag.Slot(channels[0])
node, err = c.slotMasterNode(slot)
} else {
node, err = c.nodes.Random()
masterNode, err := c.slotMasterNode(slot)
if err != nil {
return nil, err
}
node = masterNode
}
if err != nil {
return nil, err
}
cn, err := node.Client.newConn()
if err != nil {
node = nil
return nil, err
}
return cn, nil
return node.Client.newConn()
},
closeConn: func(cn *pool.Conn) error {
err := node.Client.connPool.CloseConn(cn)
node = nil
return err
return node.Client.connPool.CloseConn(cn)
},
}
pubsub.init()
return pubsub
}
// Subscribe subscribes the client to the specified channels.
// Channels can be omitted to create empty subscription.
func (c *ClusterClient) Subscribe(channels ...string) *PubSub {
pubsub := c.pubSub()
pubsub := c.pubSub(channels)
if len(channels) > 0 {
_ = pubsub.Subscribe(channels...)
}
@ -1582,7 +1589,7 @@ func (c *ClusterClient) Subscribe(channels ...string) *PubSub {
// PSubscribe subscribes the client to the given patterns.
// Patterns can be omitted to create empty subscription.
func (c *ClusterClient) PSubscribe(channels ...string) *PubSub {
pubsub := c.pubSub()
pubsub := c.pubSub(channels)
if len(channels) > 0 {
_ = pubsub.PSubscribe(channels...)
}

View file

@ -183,7 +183,7 @@ func (cmd *Cmd) Int() (int, error) {
case string:
return strconv.Atoi(val)
default:
err := fmt.Errorf("redis: unexpected type=%T for Int", val)
err := fmt.Errorf("redis: unexpected type=%T for Int64", val)
return 0, err
}
}
@ -218,25 +218,6 @@ func (cmd *Cmd) Uint64() (uint64, error) {
}
}
func (cmd *Cmd) Float32() (float32, error) {
if cmd.err != nil {
return 0, cmd.err
}
switch val := cmd.val.(type) {
case int64:
return float32(val), nil
case string:
f, err := strconv.ParseFloat(val, 32)
if err != nil {
return 0, err
}
return float32(f), nil
default:
err := fmt.Errorf("redis: unexpected type=%T for Float32", val)
return 0, err
}
}
func (cmd *Cmd) Float64() (float64, error) {
if cmd.err != nil {
return 0, cmd.err
@ -604,17 +585,6 @@ func (cmd *StringCmd) Uint64() (uint64, error) {
return strconv.ParseUint(cmd.Val(), 10, 64)
}
func (cmd *StringCmd) Float32() (float32, error) {
if cmd.err != nil {
return 0, cmd.err
}
f, err := strconv.ParseFloat(cmd.Val(), 32)
if err != nil {
return 0, err
}
return float32(f), nil
}
func (cmd *StringCmd) Float64() (float64, error) {
if cmd.err != nil {
return 0, cmd.err
@ -717,12 +687,12 @@ func (cmd *StringSliceCmd) readReply(rd *proto.Reader) error {
func stringSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
ss := make([]string, 0, n)
for i := int64(0); i < n; i++ {
switch s, err := rd.ReadString(); {
case err == Nil:
s, err := rd.ReadString()
if err == Nil {
ss = append(ss, "")
case err != nil:
} else if err != nil {
return nil, err
default:
} else {
ss = append(ss, s)
}
}
@ -999,20 +969,14 @@ func xMessageSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
return nil, err
}
var values map[string]interface{}
v, err := rd.ReadArrayReply(stringInterfaceMapParser)
if err != nil {
if err != proto.Nil {
return nil, err
}
} else {
values = v.(map[string]interface{})
return nil, err
}
msgs = append(msgs, XMessage{
ID: id,
Values: values,
Values: v.(map[string]interface{}),
})
return nil, nil
})
@ -1373,68 +1337,6 @@ func zSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
//------------------------------------------------------------------------------
type ZWithKeyCmd struct {
baseCmd
val ZWithKey
}
var _ Cmder = (*ZWithKeyCmd)(nil)
func NewZWithKeyCmd(args ...interface{}) *ZWithKeyCmd {
return &ZWithKeyCmd{
baseCmd: baseCmd{_args: args},
}
}
func (cmd *ZWithKeyCmd) Val() ZWithKey {
return cmd.val
}
func (cmd *ZWithKeyCmd) Result() (ZWithKey, error) {
return cmd.Val(), cmd.Err()
}
func (cmd *ZWithKeyCmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *ZWithKeyCmd) readReply(rd *proto.Reader) error {
var v interface{}
v, cmd.err = rd.ReadArrayReply(zWithKeyParser)
if cmd.err != nil {
return cmd.err
}
cmd.val = v.(ZWithKey)
return nil
}
// Implements proto.MultiBulkParse
func zWithKeyParser(rd *proto.Reader, n int64) (interface{}, error) {
if n != 3 {
return nil, fmt.Errorf("got %d elements, expected 3", n)
}
var z ZWithKey
var err error
z.Key, err = rd.ReadString()
if err != nil {
return nil, err
}
z.Member, err = rd.ReadString()
if err != nil {
return nil, err
}
z.Score, err = rd.ReadFloatReply()
if err != nil {
return nil, err
}
return z, nil
}
//------------------------------------------------------------------------------
type ScanCmd struct {
baseCmd

View file

@ -8,6 +8,13 @@ import (
"github.com/go-redis/redis/internal"
)
func readTimeout(timeout time.Duration) time.Duration {
if timeout == 0 {
return 0
}
return timeout + 10*time.Second
}
func usePrecise(dur time.Duration) bool {
return dur < time.Second || dur%time.Second != 0
}
@ -166,7 +173,6 @@ type Cmdable interface {
SUnion(keys ...string) *StringSliceCmd
SUnionStore(destination string, keys ...string) *IntCmd
XAdd(a *XAddArgs) *StringCmd
XDel(stream string, ids ...string) *IntCmd
XLen(stream string) *IntCmd
XRange(stream, start, stop string) *XMessageSliceCmd
XRangeN(stream, start, stop string, count int64) *XMessageSliceCmd
@ -175,7 +181,6 @@ type Cmdable interface {
XRead(a *XReadArgs) *XStreamSliceCmd
XReadStreams(streams ...string) *XStreamSliceCmd
XGroupCreate(stream, group, start string) *StatusCmd
XGroupCreateMkStream(stream, group, start string) *StatusCmd
XGroupSetID(stream, group, start string) *StatusCmd
XGroupDestroy(stream, group string) *IntCmd
XGroupDelConsumer(stream, group, consumer string) *IntCmd
@ -187,8 +192,6 @@ type Cmdable interface {
XClaimJustID(a *XClaimArgs) *StringSliceCmd
XTrim(key string, maxLen int64) *IntCmd
XTrimApprox(key string, maxLen int64) *IntCmd
BZPopMax(timeout time.Duration, keys ...string) *ZWithKeyCmd
BZPopMin(timeout time.Duration, keys ...string) *ZWithKeyCmd
ZAdd(key string, members ...Z) *IntCmd
ZAddNX(key string, members ...Z) *IntCmd
ZAddXX(key string, members ...Z) *IntCmd
@ -203,8 +206,6 @@ type Cmdable interface {
ZLexCount(key, min, max string) *IntCmd
ZIncrBy(key string, increment float64, member string) *FloatCmd
ZInterStore(destination string, store ZStore, keys ...string) *IntCmd
ZPopMax(key string, count ...int64) *ZSliceCmd
ZPopMin(key string, count ...int64) *ZSliceCmd
ZRange(key string, start, stop int64) *StringSliceCmd
ZRangeWithScores(key string, start, stop int64) *ZSliceCmd
ZRangeByScore(key string, opt ZRangeBy) *StringSliceCmd
@ -232,7 +233,6 @@ type Cmdable interface {
ClientKillByFilter(keys ...string) *IntCmd
ClientList() *StringCmd
ClientPause(dur time.Duration) *BoolCmd
ClientID() *IntCmd
ConfigGet(parameter string) *SliceCmd
ConfigResetStat() *StatusCmd
ConfigSet(parameter, value string) *StatusCmd
@ -270,7 +270,6 @@ type Cmdable interface {
ClusterResetHard() *StatusCmd
ClusterInfo() *StringCmd
ClusterKeySlot(key string) *IntCmd
ClusterGetKeysInSlot(slot int, count int) *StringSliceCmd
ClusterCountFailureReports(nodeID string) *IntCmd
ClusterCountKeysInSlot(slot int) *IntCmd
ClusterDelSlots(slots ...int) *StatusCmd
@ -1343,16 +1342,6 @@ func (c *cmdable) XAdd(a *XAddArgs) *StringCmd {
return cmd
}
func (c *cmdable) XDel(stream string, ids ...string) *IntCmd {
args := []interface{}{"xdel", stream}
for _, id := range ids {
args = append(args, id)
}
cmd := NewIntCmd(args...)
c.process(cmd)
return cmd
}
func (c *cmdable) XLen(stream string) *IntCmd {
cmd := NewIntCmd("xlen", stream)
c.process(cmd)
@ -1406,9 +1395,6 @@ func (c *cmdable) XRead(a *XReadArgs) *XStreamSliceCmd {
}
cmd := NewXStreamSliceCmd(args...)
if a.Block >= 0 {
cmd.setReadTimeout(a.Block)
}
c.process(cmd)
return cmd
}
@ -1426,12 +1412,6 @@ func (c *cmdable) XGroupCreate(stream, group, start string) *StatusCmd {
return cmd
}
func (c *cmdable) XGroupCreateMkStream(stream, group, start string) *StatusCmd {
cmd := NewStatusCmd("xgroup", "create", stream, group, start, "mkstream")
c.process(cmd)
return cmd
}
func (c *cmdable) XGroupSetID(stream, group, start string) *StatusCmd {
cmd := NewStatusCmd("xgroup", "setid", stream, group, start)
c.process(cmd)
@ -1453,11 +1433,9 @@ func (c *cmdable) XGroupDelConsumer(stream, group, consumer string) *IntCmd {
type XReadGroupArgs struct {
Group string
Consumer string
// List of streams and ids.
Streams []string
Count int64
Block time.Duration
NoAck bool
Streams []string
Count int64
Block time.Duration
}
func (c *cmdable) XReadGroup(a *XReadGroupArgs) *XStreamSliceCmd {
@ -1469,18 +1447,12 @@ func (c *cmdable) XReadGroup(a *XReadGroupArgs) *XStreamSliceCmd {
if a.Block >= 0 {
args = append(args, "block", int64(a.Block/time.Millisecond))
}
if a.NoAck {
args = append(args, "noack")
}
args = append(args, "streams")
for _, s := range a.Streams {
args = append(args, s)
}
cmd := NewXStreamSliceCmd(args...)
if a.Block >= 0 {
cmd.setReadTimeout(a.Block)
}
c.process(cmd)
return cmd
}
@ -1577,12 +1549,6 @@ type Z struct {
Member interface{}
}
// ZWithKey represents sorted set member including the name of the key where it was popped.
type ZWithKey struct {
Z
Key string
}
// ZStore is used as an arg to ZInterStore and ZUnionStore.
type ZStore struct {
Weights []float64
@ -1590,34 +1556,6 @@ type ZStore struct {
Aggregate string
}
// Redis `BZPOPMAX key [key ...] timeout` command.
func (c *cmdable) BZPopMax(timeout time.Duration, keys ...string) *ZWithKeyCmd {
args := make([]interface{}, 1+len(keys)+1)
args[0] = "bzpopmax"
for i, key := range keys {
args[1+i] = key
}
args[len(args)-1] = formatSec(timeout)
cmd := NewZWithKeyCmd(args...)
cmd.setReadTimeout(timeout)
c.process(cmd)
return cmd
}
// Redis `BZPOPMIN key [key ...] timeout` command.
func (c *cmdable) BZPopMin(timeout time.Duration, keys ...string) *ZWithKeyCmd {
args := make([]interface{}, 1+len(keys)+1)
args[0] = "bzpopmin"
for i, key := range keys {
args[1+i] = key
}
args[len(args)-1] = formatSec(timeout)
cmd := NewZWithKeyCmd(args...)
cmd.setReadTimeout(timeout)
c.process(cmd)
return cmd
}
func (c *cmdable) zAdd(a []interface{}, n int, members ...Z) *IntCmd {
for i, m := range members {
a[n+2*i] = m.Score
@ -1756,46 +1694,6 @@ func (c *cmdable) ZInterStore(destination string, store ZStore, keys ...string)
return cmd
}
func (c *cmdable) ZPopMax(key string, count ...int64) *ZSliceCmd {
args := []interface{}{
"zpopmax",
key,
}
switch len(count) {
case 0:
break
case 1:
args = append(args, count[0])
default:
panic("too many arguments")
}
cmd := NewZSliceCmd(args...)
c.process(cmd)
return cmd
}
func (c *cmdable) ZPopMin(key string, count ...int64) *ZSliceCmd {
args := []interface{}{
"zpopmin",
key,
}
switch len(count) {
case 0:
break
case 1:
args = append(args, count[0])
default:
panic("too many arguments")
}
cmd := NewZSliceCmd(args...)
c.process(cmd)
return cmd
}
func (c *cmdable) zRange(key string, start, stop int64, withScores bool) *StringSliceCmd {
args := []interface{}{
"zrange",
@ -2071,24 +1969,6 @@ func (c *cmdable) ClientPause(dur time.Duration) *BoolCmd {
return cmd
}
func (c *cmdable) ClientID() *IntCmd {
cmd := NewIntCmd("client", "id")
c.process(cmd)
return cmd
}
func (c *cmdable) ClientUnblock(id int64) *IntCmd {
cmd := NewIntCmd("client", "unblock", id)
c.process(cmd)
return cmd
}
func (c *cmdable) ClientUnblockWithError(id int64) *IntCmd {
cmd := NewIntCmd("client", "unblock", id, "error")
c.process(cmd)
return cmd
}
// ClientSetName assigns a name to the connection.
func (c *statefulCmdable) ClientSetName(name string) *BoolCmd {
cmd := NewBoolCmd("client", "setname", name)
@ -2404,12 +2284,6 @@ func (c *cmdable) ClusterKeySlot(key string) *IntCmd {
return cmd
}
func (c *cmdable) ClusterGetKeysInSlot(slot int, count int) *StringSliceCmd {
cmd := NewStringSliceCmd("cluster", "getkeysinslot", slot, count)
c.process(cmd)
return cmd
}
func (c *cmdable) ClusterCountFailureReports(nodeID string) *IntCmd {
cmd := NewIntCmd("cluster", "count-failure-reports", nodeID)
c.process(cmd)

View file

@ -9,9 +9,6 @@ import (
)
func IsRetryableError(err error, retryTimeout bool) bool {
if err == nil {
return false
}
if err == io.EOF {
return true
}
@ -47,8 +44,7 @@ func IsBadConn(err error, allowTimeout bool) bool {
return false
}
if IsRedisError(err) {
// #790
return IsReadOnlyError(err)
return strings.HasPrefix(err.Error(), "READONLY ")
}
if allowTimeout {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
@ -83,7 +79,3 @@ func IsMovedError(err error) (moved bool, ask bool, addr string) {
func IsLoadingError(err error) bool {
return strings.HasPrefix(err.Error(), "LOADING ")
}
func IsReadOnlyError(err error) bool {
return strings.HasPrefix(err.Error(), "READONLY ")
}

View file

@ -17,16 +17,14 @@ type Conn struct {
rdLocked bool
wr *proto.Writer
Inited bool
pooled bool
createdAt time.Time
usedAt atomic.Value
InitedAt time.Time
pooled bool
usedAt atomic.Value
}
func NewConn(netConn net.Conn) *Conn {
cn := &Conn{
netConn: netConn,
createdAt: time.Now(),
netConn: netConn,
}
cn.rd = proto.NewReader(netConn)
cn.wr = proto.NewWriter(netConn)

View file

@ -38,7 +38,7 @@ type Pooler interface {
Get() (*Conn, error)
Put(*Conn)
Remove(*Conn, error)
Remove(*Conn)
Len() int
IdleLen() int
@ -289,7 +289,7 @@ func (p *ConnPool) popIdle() *Conn {
func (p *ConnPool) Put(cn *Conn) {
if !cn.pooled {
p.Remove(cn, nil)
p.Remove(cn)
return
}
@ -300,7 +300,7 @@ func (p *ConnPool) Put(cn *Conn) {
p.freeTurn()
}
func (p *ConnPool) Remove(cn *Conn, reason error) {
func (p *ConnPool) Remove(cn *Conn) {
p.removeConn(cn)
p.freeTurn()
_ = p.closeConn(cn)
@ -468,7 +468,7 @@ func (p *ConnPool) isStaleConn(cn *Conn) bool {
if p.opt.IdleTimeout > 0 && now.Sub(cn.UsedAt()) >= p.opt.IdleTimeout {
return true
}
if p.opt.MaxConnAge > 0 && now.Sub(cn.createdAt) >= p.opt.MaxConnAge {
if p.opt.MaxConnAge > 0 && now.Sub(cn.InitedAt) >= p.opt.MaxConnAge {
return true
}

View file

@ -1,203 +1,53 @@
package pool
import (
"fmt"
"sync/atomic"
)
const (
stateDefault = 0
stateInited = 1
stateClosed = 2
)
type BadConnError struct {
wrapped error
}
var _ error = (*BadConnError)(nil)
func (e BadConnError) Error() string {
return "pg: Conn is in a bad state"
}
func (e BadConnError) Unwrap() error {
return e.wrapped
}
type SingleConnPool struct {
pool Pooler
level int32 // atomic
state uint32 // atomic
ch chan *Conn
_badConnError atomic.Value
cn *Conn
}
var _ Pooler = (*SingleConnPool)(nil)
func NewSingleConnPool(pool Pooler) *SingleConnPool {
p, ok := pool.(*SingleConnPool)
if !ok {
p = &SingleConnPool{
pool: pool,
ch: make(chan *Conn, 1),
}
}
atomic.AddInt32(&p.level, 1)
return p
}
func (p *SingleConnPool) SetConn(cn *Conn) {
if atomic.CompareAndSwapUint32(&p.state, stateDefault, stateInited) {
p.ch <- cn
} else {
panic("not reached")
func NewSingleConnPool(cn *Conn) *SingleConnPool {
return &SingleConnPool{
cn: cn,
}
}
func (p *SingleConnPool) NewConn() (*Conn, error) {
return p.pool.NewConn()
panic("not implemented")
}
func (p *SingleConnPool) CloseConn(cn *Conn) error {
return p.pool.CloseConn(cn)
func (p *SingleConnPool) CloseConn(*Conn) error {
panic("not implemented")
}
func (p *SingleConnPool) Get() (*Conn, error) {
// In worst case this races with Close which is not a very common operation.
for i := 0; i < 1000; i++ {
switch atomic.LoadUint32(&p.state) {
case stateDefault:
cn, err := p.pool.Get()
if err != nil {
return nil, err
}
if atomic.CompareAndSwapUint32(&p.state, stateDefault, stateInited) {
return cn, nil
}
p.pool.Remove(cn, ErrClosed)
case stateInited:
if err := p.badConnError(); err != nil {
return nil, err
}
cn, ok := <-p.ch
if !ok {
return nil, ErrClosed
}
return cn, nil
case stateClosed:
return nil, ErrClosed
default:
panic("not reached")
}
}
return nil, fmt.Errorf("pg: SingleConnPool.Get: infinite loop")
return p.cn, nil
}
func (p *SingleConnPool) Put(cn *Conn) {
defer func() {
if recover() != nil {
p.freeConn(cn)
}
}()
p.ch <- cn
}
func (p *SingleConnPool) freeConn(cn *Conn) {
if err := p.badConnError(); err != nil {
p.pool.Remove(cn, err)
} else {
p.pool.Put(cn)
if p.cn != cn {
panic("p.cn != cn")
}
}
func (p *SingleConnPool) Remove(cn *Conn, reason error) {
defer func() {
if recover() != nil {
p.pool.Remove(cn, ErrClosed)
}
}()
p._badConnError.Store(BadConnError{wrapped: reason})
p.ch <- cn
func (p *SingleConnPool) Remove(cn *Conn) {
if p.cn != cn {
panic("p.cn != cn")
}
}
func (p *SingleConnPool) Len() int {
switch atomic.LoadUint32(&p.state) {
case stateDefault:
return 0
case stateInited:
return 1
case stateClosed:
return 0
default:
panic("not reached")
}
return 1
}
func (p *SingleConnPool) IdleLen() int {
return len(p.ch)
return 0
}
func (p *SingleConnPool) Stats() *Stats {
return &Stats{}
return nil
}
func (p *SingleConnPool) Close() error {
level := atomic.AddInt32(&p.level, -1)
if level > 0 {
return nil
}
for i := 0; i < 1000; i++ {
state := atomic.LoadUint32(&p.state)
if state == stateClosed {
return ErrClosed
}
if atomic.CompareAndSwapUint32(&p.state, state, stateClosed) {
close(p.ch)
cn, ok := <-p.ch
if ok {
p.freeConn(cn)
}
return nil
}
}
return fmt.Errorf("pg: SingleConnPool.Close: infinite loop")
}
func (p *SingleConnPool) Reset() error {
if p.badConnError() == nil {
return nil
}
select {
case cn, ok := <-p.ch:
if !ok {
return ErrClosed
}
p.pool.Remove(cn, ErrClosed)
p._badConnError.Store(BadConnError{wrapped: nil})
default:
return fmt.Errorf("pg: SingleConnPool does not have a Conn")
}
if !atomic.CompareAndSwapUint32(&p.state, stateInited, stateDefault) {
state := atomic.LoadUint32(&p.state)
return fmt.Errorf("pg: invalid SingleConnPool state: %d", state)
}
return nil
}
func (p *SingleConnPool) badConnError() error {
if v := p._badConnError.Load(); v != nil {
err := v.(BadConnError)
if err.wrapped != nil {
return err
}
}
return nil
}

View file

@ -55,13 +55,13 @@ func (p *StickyConnPool) putUpstream() {
func (p *StickyConnPool) Put(cn *Conn) {}
func (p *StickyConnPool) removeUpstream(reason error) {
p.pool.Remove(p.cn, reason)
func (p *StickyConnPool) removeUpstream() {
p.pool.Remove(p.cn)
p.cn = nil
}
func (p *StickyConnPool) Remove(cn *Conn, reason error) {
p.removeUpstream(reason)
func (p *StickyConnPool) Remove(cn *Conn) {
p.removeUpstream()
}
func (p *StickyConnPool) Len() int {
@ -101,7 +101,7 @@ func (p *StickyConnPool) Close() error {
if p.reusable {
p.putUpstream()
} else {
p.removeUpstream(ErrClosed)
p.removeUpstream()
}
}

View file

@ -0,0 +1,64 @@
/*
Copyright 2013 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package singleflight provides a duplicate function call suppression
// mechanism.
package singleflight
import "sync"
// call is an in-flight or completed Do call
type call struct {
wg sync.WaitGroup
val interface{}
err error
}
// Group represents a class of work and forms a namespace in which
// units of work can be executed with duplicate suppression.
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized
}
// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
g.mu.Unlock()
c.wg.Wait()
return c.val, c.err
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
c.val, c.err = fn()
c.wg.Done()
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
return c.val, c.err
}

View file

@ -27,13 +27,3 @@ func isLower(s string) bool {
}
return true
}
func Unwrap(err error) error {
u, ok := err.(interface {
Unwrap() error
})
if !ok {
return nil
}
return u.Unwrap()
}

View file

@ -14,17 +14,6 @@ import (
"github.com/go-redis/redis/internal/pool"
)
// Limiter is the interface of a rate limiter or a circuit breaker.
type Limiter interface {
// Allow returns a nil if operation is allowed or an error otherwise.
// If operation is allowed client must report the result of operation
// whether is a success or a failure.
Allow() error
// ReportResult reports the result of previously allowed operation.
// nil indicates a success, non-nil error indicates a failure.
ReportResult(result error)
}
type Options struct {
// The network type, either tcp or unix.
// Default is tcp.
@ -59,7 +48,7 @@ type Options struct {
// Default is 5 seconds.
DialTimeout time.Duration
// Timeout for socket reads. If reached, commands will fail
// with a timeout instead of blocking. Use value -1 for no timeout and 0 for default.
// with a timeout instead of blocking.
// Default is 3 seconds.
ReadTimeout time.Duration
// Timeout for socket writes. If reached, commands will fail
@ -101,9 +90,6 @@ func (opt *Options) init() {
if opt.Network == "" {
opt.Network = "tcp"
}
if opt.Addr == "" {
opt.Addr = "localhost:6379"
}
if opt.Dialer == nil {
opt.Dialer = func() (net.Conn, error) {
netDialer := &net.Dialer{

View file

@ -8,22 +8,8 @@ import (
type pipelineExecer func([]Cmder) error
// Pipeliner is an mechanism to realise Redis Pipeline technique.
//
// Pipelining is a technique to extremely speed up processing by packing
// operations to batches, send them at once to Redis and read a replies in a
// singe step.
// See https://redis.io/topics/pipelining
//
// Pay attention, that Pipeline is not a transaction, so you can get unexpected
// results in case of big pipelines and small read/write timeouts.
// Redis client has retransmission logic in case of timeouts, pipeline
// can be retransmitted and commands can be executed more then once.
// To avoid this: it is good idea to use reasonable bigger read/write timeouts
// depends of your batch size and/or use TxPipeline.
type Pipeliner interface {
StatefulCmdable
Do(args ...interface{}) *Cmd
Process(cmd Cmder) error
Close() error
Discard() error
@ -45,12 +31,6 @@ type Pipeline struct {
closed bool
}
func (c *Pipeline) Do(args ...interface{}) *Cmd {
cmd := NewCmd(args...)
_ = c.Process(cmd)
return cmd
}
// Process queues the cmd for later execution.
func (c *Pipeline) Process(cmd Cmder) error {
c.mu.Lock()

View file

@ -1,9 +1,7 @@
package redis
import (
"errors"
"fmt"
"strings"
"sync"
"time"
@ -12,9 +10,7 @@ import (
"github.com/go-redis/redis/internal/proto"
)
var errPingTimeout = errors.New("redis: ping timeout")
// PubSub implements Pub/Sub commands as described in
// PubSub implements Pub/Sub commands bas described in
// http://redis.io/topics/pubsub. Message receiving is NOT safe
// for concurrent use by multiple goroutines.
//
@ -30,9 +26,8 @@ type PubSub struct {
cn *pool.Conn
channels map[string]struct{}
patterns map[string]struct{}
closed bool
exit chan struct{}
closed bool
exit chan struct{}
cmd *Cmd
@ -41,12 +36,6 @@ type PubSub struct {
ping chan struct{}
}
func (c *PubSub) String() string {
channels := mapKeys(c.channels)
channels = append(channels, mapKeys(c.patterns)...)
return fmt.Sprintf("PubSub(%s)", strings.Join(channels, ", "))
}
func (c *PubSub) init() {
c.exit = make(chan struct{})
}
@ -62,6 +51,7 @@ func (c *PubSub) _conn(newChannels []string) (*pool.Conn, error) {
if c.closed {
return nil, pool.ErrClosed
}
if c.cn != nil {
return c.cn, nil
}
@ -397,39 +387,16 @@ func (c *PubSub) ReceiveMessage() (*Message, error) {
// It periodically sends Ping messages to test connection health.
// The channel is closed with PubSub. Receive* APIs can not be used
// after channel is created.
//
// If the Go channel is full for 30 seconds the message is dropped.
func (c *PubSub) Channel() <-chan *Message {
return c.channel(100)
}
// ChannelSize is like Channel, but creates a Go channel
// with specified buffer size.
func (c *PubSub) ChannelSize(size int) <-chan *Message {
return c.channel(size)
}
func (c *PubSub) channel(size int) <-chan *Message {
c.chOnce.Do(func() {
c.initChannel(size)
})
if cap(c.ch) != size {
err := fmt.Errorf("redis: PubSub.Channel is called with different buffer size")
panic(err)
}
c.chOnce.Do(c.initChannel)
return c.ch
}
func (c *PubSub) initChannel(size int) {
const timeout = 30 * time.Second
c.ch = make(chan *Message, size)
c.ping = make(chan struct{}, 1)
func (c *PubSub) initChannel() {
c.ch = make(chan *Message, 100)
c.ping = make(chan struct{}, 10)
go func() {
timer := time.NewTimer(timeout)
timer.Stop()
var errCount int
for {
msg, err := c.Receive()
@ -444,7 +411,6 @@ func (c *PubSub) initChannel(size int) {
errCount++
continue
}
errCount = 0
// Any message is as good as a ping.
@ -459,28 +425,21 @@ func (c *PubSub) initChannel(size int) {
case *Pong:
// Ignore.
case *Message:
timer.Reset(timeout)
select {
case c.ch <- msg:
if !timer.Stop() {
<-timer.C
}
case <-timer.C:
internal.Logf(
"redis: %s channel is full for %s (message is dropped)",
c, timeout)
}
c.ch <- msg
default:
internal.Logf("redis: unknown message type: %T", msg)
internal.Logf("redis: unknown message: %T", msg)
}
}
}()
go func() {
const timeout = 5 * time.Second
timer := time.NewTimer(timeout)
timer.Stop()
healthy := true
var pingErr error
for {
timer.Reset(timeout)
select {
@ -490,13 +449,10 @@ func (c *PubSub) initChannel(size int) {
<-timer.C
}
case <-timer.C:
pingErr := c.Ping()
pingErr = c.Ping()
if healthy {
healthy = false
} else {
if pingErr == nil {
pingErr = errPingTimeout
}
c.mu.Lock()
c._reconnect(pingErr)
c.mu.Unlock()

View file

@ -26,7 +26,6 @@ func SetLogger(logger *log.Logger) {
type baseClient struct {
opt *Options
connPool pool.Pooler
limiter Limiter
process func(Cmder) error
processPipeline func([]Cmder) error
@ -51,80 +50,45 @@ func (c *baseClient) newConn() (*pool.Conn, error) {
return nil, err
}
err = c.initConn(cn)
if err != nil {
_ = c.connPool.CloseConn(cn)
return nil, err
if cn.InitedAt.IsZero() {
if err := c.initConn(cn); err != nil {
_ = c.connPool.CloseConn(cn)
return nil, err
}
}
return cn, nil
}
func (c *baseClient) getConn() (*pool.Conn, error) {
if c.limiter != nil {
err := c.limiter.Allow()
if err != nil {
return nil, err
}
}
cn, err := c._getConn()
if err != nil {
if c.limiter != nil {
c.limiter.ReportResult(err)
}
return nil, err
}
return cn, nil
}
func (c *baseClient) _getConn() (*pool.Conn, error) {
cn, err := c.connPool.Get()
if err != nil {
return nil, err
}
err = c.initConn(cn)
if err != nil {
c.connPool.Remove(cn, err)
if err := internal.Unwrap(err); err != nil {
if cn.InitedAt.IsZero() {
err := c.initConn(cn)
if err != nil {
c.connPool.Remove(cn)
return nil, err
}
return nil, err
}
return cn, nil
}
func (c *baseClient) releaseConn(cn *pool.Conn, err error) {
if c.limiter != nil {
c.limiter.ReportResult(err)
}
func (c *baseClient) releaseConn(cn *pool.Conn, err error) bool {
if internal.IsBadConn(err, false) {
c.connPool.Remove(cn, err)
} else {
c.connPool.Put(cn)
}
}
func (c *baseClient) releaseConnStrict(cn *pool.Conn, err error) {
if c.limiter != nil {
c.limiter.ReportResult(err)
c.connPool.Remove(cn)
return false
}
if err == nil || internal.IsRedisError(err) {
c.connPool.Put(cn)
} else {
c.connPool.Remove(cn, err)
}
c.connPool.Put(cn)
return true
}
func (c *baseClient) initConn(cn *pool.Conn) error {
if cn.Inited {
return nil
}
cn.Inited = true
cn.InitedAt = time.Now()
if c.opt.Password == "" &&
c.opt.DB == 0 &&
@ -162,7 +126,7 @@ func (c *baseClient) initConn(cn *pool.Conn) error {
// Do creates a Cmd from the args and processes the cmd.
func (c *baseClient) Do(args ...interface{}) *Cmd {
cmd := NewCmd(args...)
_ = c.Process(cmd)
c.Process(cmd)
return cmd
}
@ -204,7 +168,9 @@ func (c *baseClient) defaultProcess(cmd Cmder) error {
return err
}
err = cn.WithReader(c.cmdTimeout(cmd), cmd.readReply)
err = cn.WithReader(c.cmdTimeout(cmd), func(rd *proto.Reader) error {
return cmd.readReply(rd)
})
c.releaseConn(cn, err)
if err != nil && internal.IsRetryableError(err, cmd.readTimeout() == nil) {
continue
@ -222,11 +188,7 @@ func (c *baseClient) retryBackoff(attempt int) time.Duration {
func (c *baseClient) cmdTimeout(cmd Cmder) time.Duration {
if timeout := cmd.readTimeout(); timeout != nil {
t := *timeout
if t == 0 {
return 0
}
return t + 10*time.Second
return readTimeout(*timeout)
}
return c.opt.ReadTimeout
}
@ -238,7 +200,7 @@ func (c *baseClient) cmdTimeout(cmd Cmder) time.Duration {
func (c *baseClient) Close() error {
var firstErr error
if c.onClose != nil {
if err := c.onClose(); err != nil {
if err := c.onClose(); err != nil && firstErr == nil {
firstErr = err
}
}
@ -282,7 +244,12 @@ func (c *baseClient) generalProcessPipeline(cmds []Cmder, p pipelineProcessor) e
}
canRetry, err := p(cn, cmds)
c.releaseConnStrict(cn, err)
if err == nil || internal.IsRedisError(err) {
c.connPool.Put(cn)
break
}
c.connPool.Remove(cn)
if !canRetry || !internal.IsRetryableError(err, true) {
break
@ -352,7 +319,7 @@ func txPipelineReadQueued(rd *proto.Reader, cmds []Cmder) error {
return err
}
for range cmds {
for _ = range cmds {
err = statusCmd.readReply(rd)
if err != nil && !internal.IsRedisError(err) {
return err
@ -424,12 +391,12 @@ func (c *Client) WithContext(ctx context.Context) *Client {
if ctx == nil {
panic("nil context")
}
c2 := c.clone()
c2 := c.copy()
c2.ctx = ctx
return c2
}
func (c *Client) clone() *Client {
func (c *Client) copy() *Client {
cp := *c
cp.init()
return &cp
@ -440,11 +407,6 @@ func (c *Client) Options() *Options {
return c.opt
}
func (c *Client) SetLimiter(l Limiter) *Client {
c.limiter = l
return c
}
type PoolStats pool.Stats
// PoolStats returns connection pool stats.
@ -493,30 +455,6 @@ func (c *Client) pubSub() *PubSub {
// Subscribe subscribes the client to the specified channels.
// Channels can be omitted to create empty subscription.
// Note that this method does not wait on a response from Redis, so the
// subscription may not be active immediately. To force the connection to wait,
// you may call the Receive() method on the returned *PubSub like so:
//
// sub := client.Subscribe(queryResp)
// iface, err := sub.Receive()
// if err != nil {
// // handle error
// }
//
// // Should be *Subscription, but others are possible if other actions have been
// // taken on sub since it was created.
// switch iface.(type) {
// case *Subscription:
// // subscribe succeeded
// case *Message:
// // received first message
// case *Pong:
// // pong received
// default:
// // handle error
// }
//
// ch := sub.Channel()
func (c *Client) Subscribe(channels ...string) *PubSub {
pubsub := c.pubSub()
if len(channels) > 0 {
@ -544,12 +482,10 @@ type Conn struct {
}
func newConn(opt *Options, cn *pool.Conn) *Conn {
connPool := pool.NewSingleConnPool(nil)
connPool.SetConn(cn)
c := Conn{
baseClient: baseClient{
opt: opt,
connPool: connPool,
connPool: pool.NewSingleConnPool(cn),
},
}
c.baseClient.init()

View file

@ -273,13 +273,9 @@ func (c *ringShards) Heartbeat(frequency time.Duration) {
// rebalance removes dead shards from the Ring.
func (c *ringShards) rebalance() {
c.mu.RLock()
shards := c.shards
c.mu.RUnlock()
hash := newConsistentHash(c.opt)
var shardsNum int
for name, shard := range shards {
for name, shard := range c.shards {
if shard.IsUp() {
hash.Add(name)
shardsNum++
@ -323,12 +319,12 @@ func (c *ringShards) Close() error {
//------------------------------------------------------------------------------
// Ring is a Redis client that uses consistent hashing to distribute
// Ring is a Redis client that uses constistent hashing to distribute
// keys across multiple Redis servers (shards). It's safe for
// concurrent use by multiple goroutines.
//
// Ring monitors the state of each shard and removes dead shards from
// the ring. When a shard comes online it is added back to the ring. This
// the ring. When shard comes online it is added back to the ring. This
// gives you maximum availability and partition tolerance, but no
// consistency between different shards or even clients. Each client
// uses shards that are available to the client and does not do any
@ -346,7 +342,6 @@ type Ring struct {
shards *ringShards
cmdsInfoCache *cmdsInfoCache
process func(Cmder) error
processPipeline func([]Cmder) error
}
@ -359,10 +354,8 @@ func NewRing(opt *RingOptions) *Ring {
}
ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo)
ring.process = ring.defaultProcess
ring.processPipeline = ring.defaultProcessPipeline
ring.init()
ring.cmdable.setProcessor(ring.Process)
for name, addr := range opt.Addrs {
clopt := opt.clientOptions()
@ -375,10 +368,6 @@ func NewRing(opt *RingOptions) *Ring {
return ring
}
func (c *Ring) init() {
c.cmdable.setProcessor(c.Process)
}
func (c *Ring) Context() context.Context {
if c.ctx != nil {
return c.ctx
@ -390,15 +379,13 @@ func (c *Ring) WithContext(ctx context.Context) *Ring {
if ctx == nil {
panic("nil context")
}
c2 := c.clone()
c2 := c.copy()
c2.ctx = ctx
return c2
}
func (c *Ring) clone() *Ring {
func (c *Ring) copy() *Ring {
cp := *c
cp.init()
return &cp
}
@ -539,34 +526,19 @@ func (c *Ring) Do(args ...interface{}) *Cmd {
func (c *Ring) WrapProcess(
fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error,
) {
c.process = fn(c.process)
c.ForEachShard(func(c *Client) error {
c.WrapProcess(fn)
return nil
})
}
func (c *Ring) Process(cmd Cmder) error {
return c.process(cmd)
}
func (c *Ring) defaultProcess(cmd Cmder) error {
for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
if attempt > 0 {
time.Sleep(c.retryBackoff(attempt))
}
shard, err := c.cmdShard(cmd)
if err != nil {
cmd.setErr(err)
return err
}
err = shard.Client.Process(cmd)
if err == nil {
return nil
}
if !internal.IsRetryableError(err, cmd.readTimeout() == nil) {
return err
}
shard, err := c.cmdShard(cmd)
if err != nil {
cmd.setErr(err)
return err
}
return cmd.Err()
return shard.Client.Process(cmd)
}
func (c *Ring) Pipeline() Pipeliner {
@ -603,42 +575,36 @@ func (c *Ring) defaultProcessPipeline(cmds []Cmder) error {
time.Sleep(c.retryBackoff(attempt))
}
var mu sync.Mutex
var failedCmdsMap map[string][]Cmder
var wg sync.WaitGroup
for hash, cmds := range cmdsMap {
wg.Add(1)
go func(hash string, cmds []Cmder) {
defer wg.Done()
shard, err := c.shards.GetByHash(hash)
if err != nil {
setCmdsErr(cmds, err)
continue
}
shard, err := c.shards.GetByHash(hash)
if err != nil {
setCmdsErr(cmds, err)
return
cn, err := shard.Client.getConn()
if err != nil {
setCmdsErr(cmds, err)
continue
}
canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds)
if err == nil || internal.IsRedisError(err) {
shard.Client.connPool.Put(cn)
continue
}
shard.Client.connPool.Remove(cn)
if canRetry && internal.IsRetryableError(err, true) {
if failedCmdsMap == nil {
failedCmdsMap = make(map[string][]Cmder)
}
cn, err := shard.Client.getConn()
if err != nil {
setCmdsErr(cmds, err)
return
}
canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds)
shard.Client.releaseConnStrict(cn, err)
if canRetry && internal.IsRetryableError(err, true) {
mu.Lock()
if failedCmdsMap == nil {
failedCmdsMap = make(map[string][]Cmder)
}
failedCmdsMap[hash] = cmds
mu.Unlock()
}
}(hash, cmds)
failedCmdsMap[hash] = cmds
}
}
wg.Wait()
if len(failedCmdsMap) == 0 {
break
}
@ -664,39 +630,6 @@ func (c *Ring) Close() error {
return c.shards.Close()
}
func (c *Ring) Watch(fn func(*Tx) error, keys ...string) error {
if len(keys) == 0 {
return fmt.Errorf("redis: Watch requires at least one key")
}
var shards []*ringShard
for _, key := range keys {
if key != "" {
shard, err := c.shards.GetByKey(hashtag.Key(key))
if err != nil {
return err
}
shards = append(shards, shard)
}
}
if len(shards) == 0 {
return fmt.Errorf("redis: Watch requires at least one shard")
}
if len(shards) > 1 {
for _, shard := range shards[1:] {
if shard.Client != shards[0].Client {
err := fmt.Errorf("redis: Watch requires all keys to be in the same shard")
return err
}
}
}
return shards[0].Client.Watch(fn, keys...)
}
func newConsistentHash(opt *RingOptions) *consistenthash.Map {
return consistenthash.New(opt.HashReplicas, consistenthash.Hash(opt.Hash))
}

View file

@ -90,7 +90,9 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
opt: opt,
connPool: failover.Pool(),
onClose: failover.Close,
onClose: func() error {
return failover.Close()
},
},
}
c.baseClient.init()
@ -117,7 +119,7 @@ func NewSentinelClient(opt *Options) *SentinelClient {
return c
}
func (c *SentinelClient) pubSub() *PubSub {
func (c *SentinelClient) PubSub() *PubSub {
pubsub := &PubSub{
opt: c.opt,
@ -130,67 +132,14 @@ func (c *SentinelClient) pubSub() *PubSub {
return pubsub
}
// Subscribe subscribes the client to the specified channels.
// Channels can be omitted to create empty subscription.
func (c *SentinelClient) Subscribe(channels ...string) *PubSub {
pubsub := c.pubSub()
if len(channels) > 0 {
_ = pubsub.Subscribe(channels...)
}
return pubsub
}
// PSubscribe subscribes the client to the given patterns.
// Patterns can be omitted to create empty subscription.
func (c *SentinelClient) PSubscribe(channels ...string) *PubSub {
pubsub := c.pubSub()
if len(channels) > 0 {
_ = pubsub.PSubscribe(channels...)
}
return pubsub
}
func (c *SentinelClient) GetMasterAddrByName(name string) *StringSliceCmd {
cmd := NewStringSliceCmd("sentinel", "get-master-addr-by-name", name)
cmd := NewStringSliceCmd("SENTINEL", "get-master-addr-by-name", name)
c.Process(cmd)
return cmd
}
func (c *SentinelClient) Sentinels(name string) *SliceCmd {
cmd := NewSliceCmd("sentinel", "sentinels", name)
c.Process(cmd)
return cmd
}
// Failover forces a failover as if the master was not reachable, and without
// asking for agreement to other Sentinels.
func (c *SentinelClient) Failover(name string) *StatusCmd {
cmd := NewStatusCmd("sentinel", "failover", name)
c.Process(cmd)
return cmd
}
// Reset resets all the masters with matching name. The pattern argument is a
// glob-style pattern. The reset process clears any previous state in a master
// (including a failover in progress), and removes every slave and sentinel
// already discovered and associated with the master.
func (c *SentinelClient) Reset(pattern string) *IntCmd {
cmd := NewIntCmd("sentinel", "reset", pattern)
c.Process(cmd)
return cmd
}
// FlushConfig forces Sentinel to rewrite its configuration on disk, including
// the current Sentinel state.
func (c *SentinelClient) FlushConfig() *StatusCmd {
cmd := NewStatusCmd("sentinel", "flushconfig")
c.Process(cmd)
return cmd
}
// Master shows the state and info of the specified master.
func (c *SentinelClient) Master(name string) *StringStringMapCmd {
cmd := NewStringStringMapCmd("sentinel", "master", name)
cmd := NewSliceCmd("SENTINEL", "sentinels", name)
c.Process(cmd)
return cmd
}
@ -207,92 +156,79 @@ type sentinelFailover struct {
masterName string
_masterAddr string
sentinel *SentinelClient
pubsub *PubSub
}
func (c *sentinelFailover) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
if c.sentinel != nil {
return c.closeSentinel()
}
return nil
func (d *sentinelFailover) Close() error {
return d.resetSentinel()
}
func (c *sentinelFailover) Pool() *pool.ConnPool {
c.poolOnce.Do(func() {
c.opt.Dialer = c.dial
c.pool = newConnPool(c.opt)
func (d *sentinelFailover) Pool() *pool.ConnPool {
d.poolOnce.Do(func() {
d.opt.Dialer = d.dial
d.pool = newConnPool(d.opt)
})
return c.pool
return d.pool
}
func (c *sentinelFailover) dial() (net.Conn, error) {
addr, err := c.MasterAddr()
func (d *sentinelFailover) dial() (net.Conn, error) {
addr, err := d.MasterAddr()
if err != nil {
return nil, err
}
return net.DialTimeout("tcp", addr, c.opt.DialTimeout)
return net.DialTimeout("tcp", addr, d.opt.DialTimeout)
}
func (c *sentinelFailover) MasterAddr() (string, error) {
addr, err := c.masterAddr()
func (d *sentinelFailover) MasterAddr() (string, error) {
d.mu.Lock()
defer d.mu.Unlock()
addr, err := d.masterAddr()
if err != nil {
return "", err
}
c.switchMaster(addr)
d._switchMaster(addr)
return addr, nil
}
func (c *sentinelFailover) masterAddr() (string, error) {
c.mu.RLock()
addr := c.getMasterAddr()
c.mu.RUnlock()
if addr != "" {
return addr, nil
func (d *sentinelFailover) masterAddr() (string, error) {
// Try last working sentinel.
if d.sentinel != nil {
addr, err := d.sentinel.GetMasterAddrByName(d.masterName).Result()
if err == nil {
addr := net.JoinHostPort(addr[0], addr[1])
return addr, nil
}
internal.Logf("sentinel: GetMasterAddrByName name=%q failed: %s",
d.masterName, err)
d._resetSentinel()
}
c.mu.Lock()
defer c.mu.Unlock()
addr = c.getMasterAddr()
if addr != "" {
return addr, nil
}
if c.sentinel != nil {
c.closeSentinel()
}
for i, sentinelAddr := range c.sentinelAddrs {
for i, sentinelAddr := range d.sentinelAddrs {
sentinel := NewSentinelClient(&Options{
Addr: sentinelAddr,
MaxRetries: c.opt.MaxRetries,
DialTimeout: d.opt.DialTimeout,
ReadTimeout: d.opt.ReadTimeout,
WriteTimeout: d.opt.WriteTimeout,
DialTimeout: c.opt.DialTimeout,
ReadTimeout: c.opt.ReadTimeout,
WriteTimeout: c.opt.WriteTimeout,
PoolSize: c.opt.PoolSize,
PoolTimeout: c.opt.PoolTimeout,
IdleTimeout: c.opt.IdleTimeout,
IdleCheckFrequency: c.opt.IdleCheckFrequency,
TLSConfig: c.opt.TLSConfig,
PoolSize: d.opt.PoolSize,
PoolTimeout: d.opt.PoolTimeout,
IdleTimeout: d.opt.IdleTimeout,
})
masterAddr, err := sentinel.GetMasterAddrByName(c.masterName).Result()
masterAddr, err := sentinel.GetMasterAddrByName(d.masterName).Result()
if err != nil {
internal.Logf("sentinel: GetMasterAddrByName master=%q failed: %s",
c.masterName, err)
_ = sentinel.Close()
d.masterName, err)
sentinel.Close()
continue
}
// Push working sentinel to the top.
c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
c.setSentinel(sentinel)
d.sentinelAddrs[0], d.sentinelAddrs[i] = d.sentinelAddrs[i], d.sentinelAddrs[0]
d.setSentinel(sentinel)
addr := net.JoinHostPort(masterAddr[0], masterAddr[1])
return addr, nil
@ -301,34 +237,17 @@ func (c *sentinelFailover) masterAddr() (string, error) {
return "", errors.New("redis: all sentinels are unreachable")
}
func (c *sentinelFailover) getMasterAddr() string {
sentinel := c.sentinel
if sentinel == nil {
return ""
}
addr, err := sentinel.GetMasterAddrByName(c.masterName).Result()
if err != nil {
internal.Logf("sentinel: GetMasterAddrByName name=%q failed: %s",
c.masterName, err)
return ""
}
return net.JoinHostPort(addr[0], addr[1])
func (c *sentinelFailover) switchMaster(addr string) {
c.mu.Lock()
c._switchMaster(addr)
c.mu.Unlock()
}
func (c *sentinelFailover) switchMaster(addr string) {
c.mu.RLock()
masterAddr := c._masterAddr
c.mu.RUnlock()
if masterAddr == addr {
func (c *sentinelFailover) _switchMaster(addr string) {
if c._masterAddr == addr {
return
}
c.mu.Lock()
defer c.mu.Unlock()
internal.Logf("sentinel: new master=%q addr=%q",
c.masterName, addr)
_ = c.Pool().Filter(func(cn *pool.Conn) bool {
@ -337,36 +256,32 @@ func (c *sentinelFailover) switchMaster(addr string) {
c._masterAddr = addr
}
func (c *sentinelFailover) setSentinel(sentinel *SentinelClient) {
c.discoverSentinels(sentinel)
c.sentinel = sentinel
c.pubsub = sentinel.Subscribe("+switch-master")
go c.listen(c.pubsub)
func (d *sentinelFailover) setSentinel(sentinel *SentinelClient) {
d.discoverSentinels(sentinel)
d.sentinel = sentinel
go d.listen(sentinel)
}
func (c *sentinelFailover) closeSentinel() error {
var firstErr error
err := c.pubsub.Close()
if err != nil && firstErr == err {
firstErr = err
func (d *sentinelFailover) resetSentinel() error {
var err error
d.mu.Lock()
if d.sentinel != nil {
err = d._resetSentinel()
}
c.pubsub = nil
err = c.sentinel.Close()
if err != nil && firstErr == err {
firstErr = err
}
c.sentinel = nil
return firstErr
d.mu.Unlock()
return err
}
func (c *sentinelFailover) discoverSentinels(sentinel *SentinelClient) {
sentinels, err := sentinel.Sentinels(c.masterName).Result()
func (d *sentinelFailover) _resetSentinel() error {
err := d.sentinel.Close()
d.sentinel = nil
return err
}
func (d *sentinelFailover) discoverSentinels(sentinel *SentinelClient) {
sentinels, err := sentinel.Sentinels(d.masterName).Result()
if err != nil {
internal.Logf("sentinel: Sentinels master=%q failed: %s", c.masterName, err)
internal.Logf("sentinel: Sentinels master=%q failed: %s", d.masterName, err)
return
}
for _, sentinel := range sentinels {
@ -375,32 +290,49 @@ func (c *sentinelFailover) discoverSentinels(sentinel *SentinelClient) {
key := vals[i].(string)
if key == "name" {
sentinelAddr := vals[i+1].(string)
if !contains(c.sentinelAddrs, sentinelAddr) {
internal.Logf("sentinel: discovered new sentinel=%q for master=%q",
sentinelAddr, c.masterName)
c.sentinelAddrs = append(c.sentinelAddrs, sentinelAddr)
if !contains(d.sentinelAddrs, sentinelAddr) {
internal.Logf(
"sentinel: discovered new sentinel=%q for master=%q",
sentinelAddr, d.masterName,
)
d.sentinelAddrs = append(d.sentinelAddrs, sentinelAddr)
}
}
}
}
}
func (c *sentinelFailover) listen(pubsub *PubSub) {
ch := pubsub.Channel()
func (d *sentinelFailover) listen(sentinel *SentinelClient) {
pubsub := sentinel.PubSub()
defer pubsub.Close()
err := pubsub.Subscribe("+switch-master")
if err != nil {
internal.Logf("sentinel: Subscribe failed: %s", err)
d.resetSentinel()
return
}
for {
msg, ok := <-ch
if !ok {
break
msg, err := pubsub.ReceiveMessage()
if err != nil {
if err == pool.ErrClosed {
d.resetSentinel()
return
}
internal.Logf("sentinel: ReceiveMessage failed: %s", err)
continue
}
if msg.Channel == "+switch-master" {
switch msg.Channel {
case "+switch-master":
parts := strings.Split(msg.Payload, " ")
if parts[0] != c.masterName {
if parts[0] != d.masterName {
internal.Logf("sentinel: ignore addr for master=%q", parts[0])
continue
}
addr := net.JoinHostPort(parts[3], parts[4])
c.switchMaster(addr)
d.switchMaster(addr)
}
}
}

View file

@ -29,10 +29,10 @@ func (c *Client) newTx() *Tx {
return &tx
}
// Watch prepares a transaction and marks the keys to be watched
// Watch prepares a transcaction and marks the keys to be watched
// for conditional execution if there are any keys.
//
// The transaction is automatically closed when fn exits.
// The transaction is automatically closed when the fn exits.
func (c *Client) Watch(fn func(*Tx) error, keys ...string) error {
tx := c.newTx()
if len(keys) > 0 {

View file

@ -155,7 +155,6 @@ type UniversalClient interface {
Watch(fn func(*Tx) error, keys ...string) error
Process(cmd Cmder) error
WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error)
WrapProcessPipeline(fn func(oldProcess func([]Cmder) error) func([]Cmder) error)
Subscribe(channels ...string) *PubSub
PSubscribe(channels ...string) *PubSub
Close() error

3
vendor/modules.txt vendored
View file

@ -60,13 +60,14 @@ github.com/go-openapi/jsonreference
github.com/go-openapi/spec
# github.com/go-openapi/swag v0.19.5
github.com/go-openapi/swag
# github.com/go-redis/redis v6.15.7+incompatible
# github.com/go-redis/redis v6.14.0+incompatible
github.com/go-redis/redis
github.com/go-redis/redis/internal
github.com/go-redis/redis/internal/consistenthash
github.com/go-redis/redis/internal/hashtag
github.com/go-redis/redis/internal/pool
github.com/go-redis/redis/internal/proto
github.com/go-redis/redis/internal/singleflight
github.com/go-redis/redis/internal/util
# github.com/go-sql-driver/mysql v1.5.0
github.com/go-sql-driver/mysql