Update module go-redis/redis to v6.15.7 (#234)
Update module go-redis/redis to v6.15.7 Reviewed-on: https://kolaente.dev/vikunja/api/pulls/234
This commit is contained in:
parent
fd7dd47d5e
commit
026d3dc80d
18 changed files with 413 additions and 89 deletions
2
go.mod
2
go.mod
|
@ -35,7 +35,7 @@ require (
|
||||||
github.com/garyburd/redigo v1.6.0 // indirect
|
github.com/garyburd/redigo v1.6.0 // indirect
|
||||||
github.com/go-openapi/jsonreference v0.19.3 // indirect
|
github.com/go-openapi/jsonreference v0.19.3 // indirect
|
||||||
github.com/go-openapi/spec v0.19.4 // indirect
|
github.com/go-openapi/spec v0.19.4 // indirect
|
||||||
github.com/go-redis/redis v6.15.2+incompatible
|
github.com/go-redis/redis v6.15.7+incompatible
|
||||||
github.com/go-sql-driver/mysql v1.5.0
|
github.com/go-sql-driver/mysql v1.5.0
|
||||||
github.com/go-testfixtures/testfixtures/v3 v3.1.1
|
github.com/go-testfixtures/testfixtures/v3 v3.1.1
|
||||||
github.com/go-xorm/core v0.6.2 // indirect
|
github.com/go-xorm/core v0.6.2 // indirect
|
||||||
|
|
2
go.sum
2
go.sum
|
@ -111,6 +111,8 @@ github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh
|
||||||
github.com/go-redis/redis v6.14.0+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
|
github.com/go-redis/redis v6.14.0+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
|
||||||
github.com/go-redis/redis v6.15.2+incompatible h1:9SpNVG76gr6InJGxoZ6IuuxaCOQwDAhzyXg+Bs+0Sb4=
|
github.com/go-redis/redis v6.15.2+incompatible h1:9SpNVG76gr6InJGxoZ6IuuxaCOQwDAhzyXg+Bs+0Sb4=
|
||||||
github.com/go-redis/redis v6.15.2+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
|
github.com/go-redis/redis v6.15.2+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
|
||||||
|
github.com/go-redis/redis v6.15.7+incompatible h1:3skhDh95XQMpnqeqNftPkQD9jL9e5e36z/1SUm6dy1U=
|
||||||
|
github.com/go-redis/redis v6.15.7+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
|
||||||
github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA=
|
github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA=
|
||||||
github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
|
github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
|
||||||
github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs=
|
github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs=
|
||||||
|
|
1
vendor/github.com/go-redis/redis/.travis.yml
generated
vendored
1
vendor/github.com/go-redis/redis/.travis.yml
generated
vendored
|
@ -8,6 +8,7 @@ go:
|
||||||
- 1.9.x
|
- 1.9.x
|
||||||
- 1.10.x
|
- 1.10.x
|
||||||
- 1.11.x
|
- 1.11.x
|
||||||
|
- 1.12.x
|
||||||
- tip
|
- tip
|
||||||
|
|
||||||
matrix:
|
matrix:
|
||||||
|
|
4
vendor/github.com/go-redis/redis/README.md
generated
vendored
4
vendor/github.com/go-redis/redis/README.md
generated
vendored
|
@ -9,7 +9,7 @@ Supports:
|
||||||
- Redis 3 commands except QUIT, MONITOR, SLOWLOG and SYNC.
|
- Redis 3 commands except QUIT, MONITOR, SLOWLOG and SYNC.
|
||||||
- Automatic connection pooling with [circuit breaker](https://en.wikipedia.org/wiki/Circuit_breaker_design_pattern) support.
|
- Automatic connection pooling with [circuit breaker](https://en.wikipedia.org/wiki/Circuit_breaker_design_pattern) support.
|
||||||
- [Pub/Sub](https://godoc.org/github.com/go-redis/redis#PubSub).
|
- [Pub/Sub](https://godoc.org/github.com/go-redis/redis#PubSub).
|
||||||
- [Transactions](https://godoc.org/github.com/go-redis/redis#Multi).
|
- [Transactions](https://godoc.org/github.com/go-redis/redis#example-Client-TxPipeline).
|
||||||
- [Pipeline](https://godoc.org/github.com/go-redis/redis#example-Client-Pipeline) and [TxPipeline](https://godoc.org/github.com/go-redis/redis#example-Client-TxPipeline).
|
- [Pipeline](https://godoc.org/github.com/go-redis/redis#example-Client-Pipeline) and [TxPipeline](https://godoc.org/github.com/go-redis/redis#example-Client-TxPipeline).
|
||||||
- [Scripting](https://godoc.org/github.com/go-redis/redis#Script).
|
- [Scripting](https://godoc.org/github.com/go-redis/redis#Script).
|
||||||
- [Timeouts](https://godoc.org/github.com/go-redis/redis#Options).
|
- [Timeouts](https://godoc.org/github.com/go-redis/redis#Options).
|
||||||
|
@ -143,4 +143,4 @@ BenchmarkRedisClusterPing-4 100000 11535 ns/op 117 B/op
|
||||||
|
|
||||||
- [Golang PostgreSQL ORM](https://github.com/go-pg/pg)
|
- [Golang PostgreSQL ORM](https://github.com/go-pg/pg)
|
||||||
- [Golang msgpack](https://github.com/vmihailenco/msgpack)
|
- [Golang msgpack](https://github.com/vmihailenco/msgpack)
|
||||||
- [Golang message task queue](https://github.com/go-msgqueue/msgqueue)
|
- [Golang message task queue](https://github.com/vmihailenco/taskq)
|
||||||
|
|
14
vendor/github.com/go-redis/redis/cluster.go
generated
vendored
14
vendor/github.com/go-redis/redis/cluster.go
generated
vendored
|
@ -703,12 +703,12 @@ func (c *ClusterClient) WithContext(ctx context.Context) *ClusterClient {
|
||||||
if ctx == nil {
|
if ctx == nil {
|
||||||
panic("nil context")
|
panic("nil context")
|
||||||
}
|
}
|
||||||
c2 := c.copy()
|
c2 := c.clone()
|
||||||
c2.ctx = ctx
|
c2.ctx = ctx
|
||||||
return c2
|
return c2
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ClusterClient) copy() *ClusterClient {
|
func (c *ClusterClient) clone() *ClusterClient {
|
||||||
cp := *c
|
cp := *c
|
||||||
cp.init()
|
cp.init()
|
||||||
return &cp
|
return &cp
|
||||||
|
@ -1198,6 +1198,7 @@ func (c *ClusterClient) WrapProcessPipeline(
|
||||||
fn func(oldProcess func([]Cmder) error) func([]Cmder) error,
|
fn func(oldProcess func([]Cmder) error) func([]Cmder) error,
|
||||||
) {
|
) {
|
||||||
c.processPipeline = fn(c.processPipeline)
|
c.processPipeline = fn(c.processPipeline)
|
||||||
|
c.processTxPipeline = fn(c.processTxPipeline)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error {
|
func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error {
|
||||||
|
@ -1537,16 +1538,21 @@ func (c *ClusterClient) pubSub() *PubSub {
|
||||||
panic("node != nil")
|
panic("node != nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
slot := hashtag.Slot(channels[0])
|
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
|
if len(channels) > 0 {
|
||||||
|
slot := hashtag.Slot(channels[0])
|
||||||
node, err = c.slotMasterNode(slot)
|
node, err = c.slotMasterNode(slot)
|
||||||
|
} else {
|
||||||
|
node, err = c.nodes.Random()
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
cn, err := node.Client.newConn()
|
cn, err := node.Client.newConn()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
node = nil
|
||||||
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
46
vendor/github.com/go-redis/redis/command.go
generated
vendored
46
vendor/github.com/go-redis/redis/command.go
generated
vendored
|
@ -218,6 +218,25 @@ func (cmd *Cmd) Uint64() (uint64, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (cmd *Cmd) Float32() (float32, error) {
|
||||||
|
if cmd.err != nil {
|
||||||
|
return 0, cmd.err
|
||||||
|
}
|
||||||
|
switch val := cmd.val.(type) {
|
||||||
|
case int64:
|
||||||
|
return float32(val), nil
|
||||||
|
case string:
|
||||||
|
f, err := strconv.ParseFloat(val, 32)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return float32(f), nil
|
||||||
|
default:
|
||||||
|
err := fmt.Errorf("redis: unexpected type=%T for Float32", val)
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (cmd *Cmd) Float64() (float64, error) {
|
func (cmd *Cmd) Float64() (float64, error) {
|
||||||
if cmd.err != nil {
|
if cmd.err != nil {
|
||||||
return 0, cmd.err
|
return 0, cmd.err
|
||||||
|
@ -585,6 +604,17 @@ func (cmd *StringCmd) Uint64() (uint64, error) {
|
||||||
return strconv.ParseUint(cmd.Val(), 10, 64)
|
return strconv.ParseUint(cmd.Val(), 10, 64)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (cmd *StringCmd) Float32() (float32, error) {
|
||||||
|
if cmd.err != nil {
|
||||||
|
return 0, cmd.err
|
||||||
|
}
|
||||||
|
f, err := strconv.ParseFloat(cmd.Val(), 32)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return float32(f), nil
|
||||||
|
}
|
||||||
|
|
||||||
func (cmd *StringCmd) Float64() (float64, error) {
|
func (cmd *StringCmd) Float64() (float64, error) {
|
||||||
if cmd.err != nil {
|
if cmd.err != nil {
|
||||||
return 0, cmd.err
|
return 0, cmd.err
|
||||||
|
@ -687,12 +717,12 @@ func (cmd *StringSliceCmd) readReply(rd *proto.Reader) error {
|
||||||
func stringSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
|
func stringSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
ss := make([]string, 0, n)
|
ss := make([]string, 0, n)
|
||||||
for i := int64(0); i < n; i++ {
|
for i := int64(0); i < n; i++ {
|
||||||
s, err := rd.ReadString()
|
switch s, err := rd.ReadString(); {
|
||||||
if err == Nil {
|
case err == Nil:
|
||||||
ss = append(ss, "")
|
ss = append(ss, "")
|
||||||
} else if err != nil {
|
case err != nil:
|
||||||
return nil, err
|
return nil, err
|
||||||
} else {
|
default:
|
||||||
ss = append(ss, s)
|
ss = append(ss, s)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -969,14 +999,20 @@ func xMessageSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var values map[string]interface{}
|
||||||
|
|
||||||
v, err := rd.ReadArrayReply(stringInterfaceMapParser)
|
v, err := rd.ReadArrayReply(stringInterfaceMapParser)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if err != proto.Nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
values = v.(map[string]interface{})
|
||||||
|
}
|
||||||
|
|
||||||
msgs = append(msgs, XMessage{
|
msgs = append(msgs, XMessage{
|
||||||
ID: id,
|
ID: id,
|
||||||
Values: v.(map[string]interface{}),
|
Values: values,
|
||||||
})
|
})
|
||||||
return nil, nil
|
return nil, nil
|
||||||
})
|
})
|
||||||
|
|
4
vendor/github.com/go-redis/redis/internal/pool/conn.go
generated
vendored
4
vendor/github.com/go-redis/redis/internal/pool/conn.go
generated
vendored
|
@ -17,14 +17,16 @@ type Conn struct {
|
||||||
rdLocked bool
|
rdLocked bool
|
||||||
wr *proto.Writer
|
wr *proto.Writer
|
||||||
|
|
||||||
InitedAt time.Time
|
Inited bool
|
||||||
pooled bool
|
pooled bool
|
||||||
|
createdAt time.Time
|
||||||
usedAt atomic.Value
|
usedAt atomic.Value
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewConn(netConn net.Conn) *Conn {
|
func NewConn(netConn net.Conn) *Conn {
|
||||||
cn := &Conn{
|
cn := &Conn{
|
||||||
netConn: netConn,
|
netConn: netConn,
|
||||||
|
createdAt: time.Now(),
|
||||||
}
|
}
|
||||||
cn.rd = proto.NewReader(netConn)
|
cn.rd = proto.NewReader(netConn)
|
||||||
cn.wr = proto.NewWriter(netConn)
|
cn.wr = proto.NewWriter(netConn)
|
||||||
|
|
8
vendor/github.com/go-redis/redis/internal/pool/pool.go
generated
vendored
8
vendor/github.com/go-redis/redis/internal/pool/pool.go
generated
vendored
|
@ -38,7 +38,7 @@ type Pooler interface {
|
||||||
|
|
||||||
Get() (*Conn, error)
|
Get() (*Conn, error)
|
||||||
Put(*Conn)
|
Put(*Conn)
|
||||||
Remove(*Conn)
|
Remove(*Conn, error)
|
||||||
|
|
||||||
Len() int
|
Len() int
|
||||||
IdleLen() int
|
IdleLen() int
|
||||||
|
@ -289,7 +289,7 @@ func (p *ConnPool) popIdle() *Conn {
|
||||||
|
|
||||||
func (p *ConnPool) Put(cn *Conn) {
|
func (p *ConnPool) Put(cn *Conn) {
|
||||||
if !cn.pooled {
|
if !cn.pooled {
|
||||||
p.Remove(cn)
|
p.Remove(cn, nil)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -300,7 +300,7 @@ func (p *ConnPool) Put(cn *Conn) {
|
||||||
p.freeTurn()
|
p.freeTurn()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ConnPool) Remove(cn *Conn) {
|
func (p *ConnPool) Remove(cn *Conn, reason error) {
|
||||||
p.removeConn(cn)
|
p.removeConn(cn)
|
||||||
p.freeTurn()
|
p.freeTurn()
|
||||||
_ = p.closeConn(cn)
|
_ = p.closeConn(cn)
|
||||||
|
@ -468,7 +468,7 @@ func (p *ConnPool) isStaleConn(cn *Conn) bool {
|
||||||
if p.opt.IdleTimeout > 0 && now.Sub(cn.UsedAt()) >= p.opt.IdleTimeout {
|
if p.opt.IdleTimeout > 0 && now.Sub(cn.UsedAt()) >= p.opt.IdleTimeout {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
if p.opt.MaxConnAge > 0 && now.Sub(cn.InitedAt) >= p.opt.MaxConnAge {
|
if p.opt.MaxConnAge > 0 && now.Sub(cn.createdAt) >= p.opt.MaxConnAge {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
180
vendor/github.com/go-redis/redis/internal/pool/pool_single.go
generated
vendored
180
vendor/github.com/go-redis/redis/internal/pool/pool_single.go
generated
vendored
|
@ -1,53 +1,203 @@
|
||||||
package pool
|
package pool
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync/atomic"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
stateDefault = 0
|
||||||
|
stateInited = 1
|
||||||
|
stateClosed = 2
|
||||||
|
)
|
||||||
|
|
||||||
|
type BadConnError struct {
|
||||||
|
wrapped error
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ error = (*BadConnError)(nil)
|
||||||
|
|
||||||
|
func (e BadConnError) Error() string {
|
||||||
|
return "pg: Conn is in a bad state"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e BadConnError) Unwrap() error {
|
||||||
|
return e.wrapped
|
||||||
|
}
|
||||||
|
|
||||||
type SingleConnPool struct {
|
type SingleConnPool struct {
|
||||||
cn *Conn
|
pool Pooler
|
||||||
|
level int32 // atomic
|
||||||
|
|
||||||
|
state uint32 // atomic
|
||||||
|
ch chan *Conn
|
||||||
|
|
||||||
|
_badConnError atomic.Value
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ Pooler = (*SingleConnPool)(nil)
|
var _ Pooler = (*SingleConnPool)(nil)
|
||||||
|
|
||||||
func NewSingleConnPool(cn *Conn) *SingleConnPool {
|
func NewSingleConnPool(pool Pooler) *SingleConnPool {
|
||||||
return &SingleConnPool{
|
p, ok := pool.(*SingleConnPool)
|
||||||
cn: cn,
|
if !ok {
|
||||||
|
p = &SingleConnPool{
|
||||||
|
pool: pool,
|
||||||
|
ch: make(chan *Conn, 1),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
atomic.AddInt32(&p.level, 1)
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *SingleConnPool) SetConn(cn *Conn) {
|
||||||
|
if atomic.CompareAndSwapUint32(&p.state, stateDefault, stateInited) {
|
||||||
|
p.ch <- cn
|
||||||
|
} else {
|
||||||
|
panic("not reached")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *SingleConnPool) NewConn() (*Conn, error) {
|
func (p *SingleConnPool) NewConn() (*Conn, error) {
|
||||||
panic("not implemented")
|
return p.pool.NewConn()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *SingleConnPool) CloseConn(*Conn) error {
|
func (p *SingleConnPool) CloseConn(cn *Conn) error {
|
||||||
panic("not implemented")
|
return p.pool.CloseConn(cn)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *SingleConnPool) Get() (*Conn, error) {
|
func (p *SingleConnPool) Get() (*Conn, error) {
|
||||||
return p.cn, nil
|
// In worst case this races with Close which is not a very common operation.
|
||||||
|
for i := 0; i < 1000; i++ {
|
||||||
|
switch atomic.LoadUint32(&p.state) {
|
||||||
|
case stateDefault:
|
||||||
|
cn, err := p.pool.Get()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if atomic.CompareAndSwapUint32(&p.state, stateDefault, stateInited) {
|
||||||
|
return cn, nil
|
||||||
|
}
|
||||||
|
p.pool.Remove(cn, ErrClosed)
|
||||||
|
case stateInited:
|
||||||
|
if err := p.badConnError(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
cn, ok := <-p.ch
|
||||||
|
if !ok {
|
||||||
|
return nil, ErrClosed
|
||||||
|
}
|
||||||
|
return cn, nil
|
||||||
|
case stateClosed:
|
||||||
|
return nil, ErrClosed
|
||||||
|
default:
|
||||||
|
panic("not reached")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("pg: SingleConnPool.Get: infinite loop")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *SingleConnPool) Put(cn *Conn) {
|
func (p *SingleConnPool) Put(cn *Conn) {
|
||||||
if p.cn != cn {
|
defer func() {
|
||||||
panic("p.cn != cn")
|
if recover() != nil {
|
||||||
|
p.freeConn(cn)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
p.ch <- cn
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *SingleConnPool) freeConn(cn *Conn) {
|
||||||
|
if err := p.badConnError(); err != nil {
|
||||||
|
p.pool.Remove(cn, err)
|
||||||
|
} else {
|
||||||
|
p.pool.Put(cn)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *SingleConnPool) Remove(cn *Conn) {
|
func (p *SingleConnPool) Remove(cn *Conn, reason error) {
|
||||||
if p.cn != cn {
|
defer func() {
|
||||||
panic("p.cn != cn")
|
if recover() != nil {
|
||||||
|
p.pool.Remove(cn, ErrClosed)
|
||||||
}
|
}
|
||||||
|
}()
|
||||||
|
p._badConnError.Store(BadConnError{wrapped: reason})
|
||||||
|
p.ch <- cn
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *SingleConnPool) Len() int {
|
func (p *SingleConnPool) Len() int {
|
||||||
|
switch atomic.LoadUint32(&p.state) {
|
||||||
|
case stateDefault:
|
||||||
|
return 0
|
||||||
|
case stateInited:
|
||||||
return 1
|
return 1
|
||||||
|
case stateClosed:
|
||||||
|
return 0
|
||||||
|
default:
|
||||||
|
panic("not reached")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *SingleConnPool) IdleLen() int {
|
func (p *SingleConnPool) IdleLen() int {
|
||||||
return 0
|
return len(p.ch)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *SingleConnPool) Stats() *Stats {
|
func (p *SingleConnPool) Stats() *Stats {
|
||||||
return nil
|
return &Stats{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *SingleConnPool) Close() error {
|
func (p *SingleConnPool) Close() error {
|
||||||
|
level := atomic.AddInt32(&p.level, -1)
|
||||||
|
if level > 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < 1000; i++ {
|
||||||
|
state := atomic.LoadUint32(&p.state)
|
||||||
|
if state == stateClosed {
|
||||||
|
return ErrClosed
|
||||||
|
}
|
||||||
|
if atomic.CompareAndSwapUint32(&p.state, state, stateClosed) {
|
||||||
|
close(p.ch)
|
||||||
|
cn, ok := <-p.ch
|
||||||
|
if ok {
|
||||||
|
p.freeConn(cn)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Errorf("pg: SingleConnPool.Close: infinite loop")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *SingleConnPool) Reset() error {
|
||||||
|
if p.badConnError() == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case cn, ok := <-p.ch:
|
||||||
|
if !ok {
|
||||||
|
return ErrClosed
|
||||||
|
}
|
||||||
|
p.pool.Remove(cn, ErrClosed)
|
||||||
|
p._badConnError.Store(BadConnError{wrapped: nil})
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("pg: SingleConnPool does not have a Conn")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !atomic.CompareAndSwapUint32(&p.state, stateInited, stateDefault) {
|
||||||
|
state := atomic.LoadUint32(&p.state)
|
||||||
|
return fmt.Errorf("pg: invalid SingleConnPool state: %d", state)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *SingleConnPool) badConnError() error {
|
||||||
|
if v := p._badConnError.Load(); v != nil {
|
||||||
|
err := v.(BadConnError)
|
||||||
|
if err.wrapped != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
10
vendor/github.com/go-redis/redis/internal/pool/pool_sticky.go
generated
vendored
10
vendor/github.com/go-redis/redis/internal/pool/pool_sticky.go
generated
vendored
|
@ -55,13 +55,13 @@ func (p *StickyConnPool) putUpstream() {
|
||||||
|
|
||||||
func (p *StickyConnPool) Put(cn *Conn) {}
|
func (p *StickyConnPool) Put(cn *Conn) {}
|
||||||
|
|
||||||
func (p *StickyConnPool) removeUpstream() {
|
func (p *StickyConnPool) removeUpstream(reason error) {
|
||||||
p.pool.Remove(p.cn)
|
p.pool.Remove(p.cn, reason)
|
||||||
p.cn = nil
|
p.cn = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *StickyConnPool) Remove(cn *Conn) {
|
func (p *StickyConnPool) Remove(cn *Conn, reason error) {
|
||||||
p.removeUpstream()
|
p.removeUpstream(reason)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *StickyConnPool) Len() int {
|
func (p *StickyConnPool) Len() int {
|
||||||
|
@ -101,7 +101,7 @@ func (p *StickyConnPool) Close() error {
|
||||||
if p.reusable {
|
if p.reusable {
|
||||||
p.putUpstream()
|
p.putUpstream()
|
||||||
} else {
|
} else {
|
||||||
p.removeUpstream()
|
p.removeUpstream(ErrClosed)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
10
vendor/github.com/go-redis/redis/internal/util.go
generated
vendored
10
vendor/github.com/go-redis/redis/internal/util.go
generated
vendored
|
@ -27,3 +27,13 @@ func isLower(s string) bool {
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Unwrap(err error) error {
|
||||||
|
u, ok := err.(interface {
|
||||||
|
Unwrap() error
|
||||||
|
})
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return u.Unwrap()
|
||||||
|
}
|
||||||
|
|
13
vendor/github.com/go-redis/redis/pipeline.go
generated
vendored
13
vendor/github.com/go-redis/redis/pipeline.go
generated
vendored
|
@ -8,6 +8,19 @@ import (
|
||||||
|
|
||||||
type pipelineExecer func([]Cmder) error
|
type pipelineExecer func([]Cmder) error
|
||||||
|
|
||||||
|
// Pipeliner is an mechanism to realise Redis Pipeline technique.
|
||||||
|
//
|
||||||
|
// Pipelining is a technique to extremely speed up processing by packing
|
||||||
|
// operations to batches, send them at once to Redis and read a replies in a
|
||||||
|
// singe step.
|
||||||
|
// See https://redis.io/topics/pipelining
|
||||||
|
//
|
||||||
|
// Pay attention, that Pipeline is not a transaction, so you can get unexpected
|
||||||
|
// results in case of big pipelines and small read/write timeouts.
|
||||||
|
// Redis client has retransmission logic in case of timeouts, pipeline
|
||||||
|
// can be retransmitted and commands can be executed more then once.
|
||||||
|
// To avoid this: it is good idea to use reasonable bigger read/write timeouts
|
||||||
|
// depends of your batch size and/or use TxPipeline.
|
||||||
type Pipeliner interface {
|
type Pipeliner interface {
|
||||||
StatefulCmdable
|
StatefulCmdable
|
||||||
Do(args ...interface{}) *Cmd
|
Do(args ...interface{}) *Cmd
|
||||||
|
|
58
vendor/github.com/go-redis/redis/pubsub.go
generated
vendored
58
vendor/github.com/go-redis/redis/pubsub.go
generated
vendored
|
@ -3,6 +3,7 @@ package redis
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -13,7 +14,7 @@ import (
|
||||||
|
|
||||||
var errPingTimeout = errors.New("redis: ping timeout")
|
var errPingTimeout = errors.New("redis: ping timeout")
|
||||||
|
|
||||||
// PubSub implements Pub/Sub commands bas described in
|
// PubSub implements Pub/Sub commands as described in
|
||||||
// http://redis.io/topics/pubsub. Message receiving is NOT safe
|
// http://redis.io/topics/pubsub. Message receiving is NOT safe
|
||||||
// for concurrent use by multiple goroutines.
|
// for concurrent use by multiple goroutines.
|
||||||
//
|
//
|
||||||
|
@ -29,6 +30,7 @@ type PubSub struct {
|
||||||
cn *pool.Conn
|
cn *pool.Conn
|
||||||
channels map[string]struct{}
|
channels map[string]struct{}
|
||||||
patterns map[string]struct{}
|
patterns map[string]struct{}
|
||||||
|
|
||||||
closed bool
|
closed bool
|
||||||
exit chan struct{}
|
exit chan struct{}
|
||||||
|
|
||||||
|
@ -39,6 +41,12 @@ type PubSub struct {
|
||||||
ping chan struct{}
|
ping chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *PubSub) String() string {
|
||||||
|
channels := mapKeys(c.channels)
|
||||||
|
channels = append(channels, mapKeys(c.patterns)...)
|
||||||
|
return fmt.Sprintf("PubSub(%s)", strings.Join(channels, ", "))
|
||||||
|
}
|
||||||
|
|
||||||
func (c *PubSub) init() {
|
func (c *PubSub) init() {
|
||||||
c.exit = make(chan struct{})
|
c.exit = make(chan struct{})
|
||||||
}
|
}
|
||||||
|
@ -389,16 +397,39 @@ func (c *PubSub) ReceiveMessage() (*Message, error) {
|
||||||
// It periodically sends Ping messages to test connection health.
|
// It periodically sends Ping messages to test connection health.
|
||||||
// The channel is closed with PubSub. Receive* APIs can not be used
|
// The channel is closed with PubSub. Receive* APIs can not be used
|
||||||
// after channel is created.
|
// after channel is created.
|
||||||
|
//
|
||||||
|
// If the Go channel is full for 30 seconds the message is dropped.
|
||||||
func (c *PubSub) Channel() <-chan *Message {
|
func (c *PubSub) Channel() <-chan *Message {
|
||||||
c.chOnce.Do(c.initChannel)
|
return c.channel(100)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ChannelSize is like Channel, but creates a Go channel
|
||||||
|
// with specified buffer size.
|
||||||
|
func (c *PubSub) ChannelSize(size int) <-chan *Message {
|
||||||
|
return c.channel(size)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *PubSub) channel(size int) <-chan *Message {
|
||||||
|
c.chOnce.Do(func() {
|
||||||
|
c.initChannel(size)
|
||||||
|
})
|
||||||
|
if cap(c.ch) != size {
|
||||||
|
err := fmt.Errorf("redis: PubSub.Channel is called with different buffer size")
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
return c.ch
|
return c.ch
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *PubSub) initChannel() {
|
func (c *PubSub) initChannel(size int) {
|
||||||
c.ch = make(chan *Message, 100)
|
const timeout = 30 * time.Second
|
||||||
c.ping = make(chan struct{}, 10)
|
|
||||||
|
c.ch = make(chan *Message, size)
|
||||||
|
c.ping = make(chan struct{}, 1)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
timer := time.NewTimer(timeout)
|
||||||
|
timer.Stop()
|
||||||
|
|
||||||
var errCount int
|
var errCount int
|
||||||
for {
|
for {
|
||||||
msg, err := c.Receive()
|
msg, err := c.Receive()
|
||||||
|
@ -413,6 +444,7 @@ func (c *PubSub) initChannel() {
|
||||||
errCount++
|
errCount++
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
errCount = 0
|
errCount = 0
|
||||||
|
|
||||||
// Any message is as good as a ping.
|
// Any message is as good as a ping.
|
||||||
|
@ -427,16 +459,24 @@ func (c *PubSub) initChannel() {
|
||||||
case *Pong:
|
case *Pong:
|
||||||
// Ignore.
|
// Ignore.
|
||||||
case *Message:
|
case *Message:
|
||||||
c.ch <- msg
|
timer.Reset(timeout)
|
||||||
|
select {
|
||||||
|
case c.ch <- msg:
|
||||||
|
if !timer.Stop() {
|
||||||
|
<-timer.C
|
||||||
|
}
|
||||||
|
case <-timer.C:
|
||||||
|
internal.Logf(
|
||||||
|
"redis: %s channel is full for %s (message is dropped)",
|
||||||
|
c, timeout)
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
internal.Logf("redis: unknown message: %T", msg)
|
internal.Logf("redis: unknown message type: %T", msg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
const timeout = 5 * time.Second
|
|
||||||
|
|
||||||
timer := time.NewTimer(timeout)
|
timer := time.NewTimer(timeout)
|
||||||
timer.Stop()
|
timer.Stop()
|
||||||
|
|
||||||
|
|
31
vendor/github.com/go-redis/redis/redis.go
generated
vendored
31
vendor/github.com/go-redis/redis/redis.go
generated
vendored
|
@ -51,12 +51,11 @@ func (c *baseClient) newConn() (*pool.Conn, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if cn.InitedAt.IsZero() {
|
err = c.initConn(cn)
|
||||||
if err := c.initConn(cn); err != nil {
|
if err != nil {
|
||||||
_ = c.connPool.CloseConn(cn)
|
_ = c.connPool.CloseConn(cn)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
return cn, nil
|
return cn, nil
|
||||||
}
|
}
|
||||||
|
@ -85,12 +84,13 @@ func (c *baseClient) _getConn() (*pool.Conn, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if cn.InitedAt.IsZero() {
|
err = c.initConn(cn)
|
||||||
err := c.initConn(cn)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.connPool.Remove(cn)
|
c.connPool.Remove(cn, err)
|
||||||
|
if err := internal.Unwrap(err); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return cn, nil
|
return cn, nil
|
||||||
|
@ -102,7 +102,7 @@ func (c *baseClient) releaseConn(cn *pool.Conn, err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if internal.IsBadConn(err, false) {
|
if internal.IsBadConn(err, false) {
|
||||||
c.connPool.Remove(cn)
|
c.connPool.Remove(cn, err)
|
||||||
} else {
|
} else {
|
||||||
c.connPool.Put(cn)
|
c.connPool.Put(cn)
|
||||||
}
|
}
|
||||||
|
@ -116,12 +116,15 @@ func (c *baseClient) releaseConnStrict(cn *pool.Conn, err error) {
|
||||||
if err == nil || internal.IsRedisError(err) {
|
if err == nil || internal.IsRedisError(err) {
|
||||||
c.connPool.Put(cn)
|
c.connPool.Put(cn)
|
||||||
} else {
|
} else {
|
||||||
c.connPool.Remove(cn)
|
c.connPool.Remove(cn, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *baseClient) initConn(cn *pool.Conn) error {
|
func (c *baseClient) initConn(cn *pool.Conn) error {
|
||||||
cn.InitedAt = time.Now()
|
if cn.Inited {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
cn.Inited = true
|
||||||
|
|
||||||
if c.opt.Password == "" &&
|
if c.opt.Password == "" &&
|
||||||
c.opt.DB == 0 &&
|
c.opt.DB == 0 &&
|
||||||
|
@ -201,9 +204,7 @@ func (c *baseClient) defaultProcess(cmd Cmder) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = cn.WithReader(c.cmdTimeout(cmd), func(rd *proto.Reader) error {
|
err = cn.WithReader(c.cmdTimeout(cmd), cmd.readReply)
|
||||||
return cmd.readReply(rd)
|
|
||||||
})
|
|
||||||
c.releaseConn(cn, err)
|
c.releaseConn(cn, err)
|
||||||
if err != nil && internal.IsRetryableError(err, cmd.readTimeout() == nil) {
|
if err != nil && internal.IsRetryableError(err, cmd.readTimeout() == nil) {
|
||||||
continue
|
continue
|
||||||
|
@ -237,7 +238,7 @@ func (c *baseClient) cmdTimeout(cmd Cmder) time.Duration {
|
||||||
func (c *baseClient) Close() error {
|
func (c *baseClient) Close() error {
|
||||||
var firstErr error
|
var firstErr error
|
||||||
if c.onClose != nil {
|
if c.onClose != nil {
|
||||||
if err := c.onClose(); err != nil && firstErr == nil {
|
if err := c.onClose(); err != nil {
|
||||||
firstErr = err
|
firstErr = err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -543,10 +544,12 @@ type Conn struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func newConn(opt *Options, cn *pool.Conn) *Conn {
|
func newConn(opt *Options, cn *pool.Conn) *Conn {
|
||||||
|
connPool := pool.NewSingleConnPool(nil)
|
||||||
|
connPool.SetConn(cn)
|
||||||
c := Conn{
|
c := Conn{
|
||||||
baseClient: baseClient{
|
baseClient: baseClient{
|
||||||
opt: opt,
|
opt: opt,
|
||||||
connPool: pool.NewSingleConnPool(cn),
|
connPool: connPool,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
c.baseClient.init()
|
c.baseClient.init()
|
||||||
|
|
52
vendor/github.com/go-redis/redis/ring.go
generated
vendored
52
vendor/github.com/go-redis/redis/ring.go
generated
vendored
|
@ -273,9 +273,13 @@ func (c *ringShards) Heartbeat(frequency time.Duration) {
|
||||||
|
|
||||||
// rebalance removes dead shards from the Ring.
|
// rebalance removes dead shards from the Ring.
|
||||||
func (c *ringShards) rebalance() {
|
func (c *ringShards) rebalance() {
|
||||||
|
c.mu.RLock()
|
||||||
|
shards := c.shards
|
||||||
|
c.mu.RUnlock()
|
||||||
|
|
||||||
hash := newConsistentHash(c.opt)
|
hash := newConsistentHash(c.opt)
|
||||||
var shardsNum int
|
var shardsNum int
|
||||||
for name, shard := range c.shards {
|
for name, shard := range shards {
|
||||||
if shard.IsUp() {
|
if shard.IsUp() {
|
||||||
hash.Add(name)
|
hash.Add(name)
|
||||||
shardsNum++
|
shardsNum++
|
||||||
|
@ -357,7 +361,8 @@ func NewRing(opt *RingOptions) *Ring {
|
||||||
|
|
||||||
ring.process = ring.defaultProcess
|
ring.process = ring.defaultProcess
|
||||||
ring.processPipeline = ring.defaultProcessPipeline
|
ring.processPipeline = ring.defaultProcessPipeline
|
||||||
ring.cmdable.setProcessor(ring.Process)
|
|
||||||
|
ring.init()
|
||||||
|
|
||||||
for name, addr := range opt.Addrs {
|
for name, addr := range opt.Addrs {
|
||||||
clopt := opt.clientOptions()
|
clopt := opt.clientOptions()
|
||||||
|
@ -370,6 +375,10 @@ func NewRing(opt *RingOptions) *Ring {
|
||||||
return ring
|
return ring
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Ring) init() {
|
||||||
|
c.cmdable.setProcessor(c.Process)
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Ring) Context() context.Context {
|
func (c *Ring) Context() context.Context {
|
||||||
if c.ctx != nil {
|
if c.ctx != nil {
|
||||||
return c.ctx
|
return c.ctx
|
||||||
|
@ -381,13 +390,15 @@ func (c *Ring) WithContext(ctx context.Context) *Ring {
|
||||||
if ctx == nil {
|
if ctx == nil {
|
||||||
panic("nil context")
|
panic("nil context")
|
||||||
}
|
}
|
||||||
c2 := c.copy()
|
c2 := c.clone()
|
||||||
c2.ctx = ctx
|
c2.ctx = ctx
|
||||||
return c2
|
return c2
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Ring) copy() *Ring {
|
func (c *Ring) clone() *Ring {
|
||||||
cp := *c
|
cp := *c
|
||||||
|
cp.init()
|
||||||
|
|
||||||
return &cp
|
return &cp
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -653,6 +664,39 @@ func (c *Ring) Close() error {
|
||||||
return c.shards.Close()
|
return c.shards.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Ring) Watch(fn func(*Tx) error, keys ...string) error {
|
||||||
|
if len(keys) == 0 {
|
||||||
|
return fmt.Errorf("redis: Watch requires at least one key")
|
||||||
|
}
|
||||||
|
|
||||||
|
var shards []*ringShard
|
||||||
|
for _, key := range keys {
|
||||||
|
if key != "" {
|
||||||
|
shard, err := c.shards.GetByKey(hashtag.Key(key))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
shards = append(shards, shard)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(shards) == 0 {
|
||||||
|
return fmt.Errorf("redis: Watch requires at least one shard")
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(shards) > 1 {
|
||||||
|
for _, shard := range shards[1:] {
|
||||||
|
if shard.Client != shards[0].Client {
|
||||||
|
err := fmt.Errorf("redis: Watch requires all keys to be in the same shard")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return shards[0].Client.Watch(fn, keys...)
|
||||||
|
}
|
||||||
|
|
||||||
func newConsistentHash(opt *RingOptions) *consistenthash.Map {
|
func newConsistentHash(opt *RingOptions) *consistenthash.Map {
|
||||||
return consistenthash.New(opt.HashReplicas, consistenthash.Hash(opt.Hash))
|
return consistenthash.New(opt.HashReplicas, consistenthash.Hash(opt.Hash))
|
||||||
}
|
}
|
||||||
|
|
40
vendor/github.com/go-redis/redis/sentinel.go
generated
vendored
40
vendor/github.com/go-redis/redis/sentinel.go
generated
vendored
|
@ -90,9 +90,7 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
|
||||||
opt: opt,
|
opt: opt,
|
||||||
connPool: failover.Pool(),
|
connPool: failover.Pool(),
|
||||||
|
|
||||||
onClose: func() error {
|
onClose: failover.Close,
|
||||||
return failover.Close()
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
c.baseClient.init()
|
c.baseClient.init()
|
||||||
|
@ -182,6 +180,21 @@ func (c *SentinelClient) Reset(pattern string) *IntCmd {
|
||||||
return cmd
|
return cmd
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FlushConfig forces Sentinel to rewrite its configuration on disk, including
|
||||||
|
// the current Sentinel state.
|
||||||
|
func (c *SentinelClient) FlushConfig() *StatusCmd {
|
||||||
|
cmd := NewStatusCmd("sentinel", "flushconfig")
|
||||||
|
c.Process(cmd)
|
||||||
|
return cmd
|
||||||
|
}
|
||||||
|
|
||||||
|
// Master shows the state and info of the specified master.
|
||||||
|
func (c *SentinelClient) Master(name string) *StringStringMapCmd {
|
||||||
|
cmd := NewStringStringMapCmd("sentinel", "master", name)
|
||||||
|
c.Process(cmd)
|
||||||
|
return cmd
|
||||||
|
}
|
||||||
|
|
||||||
type sentinelFailover struct {
|
type sentinelFailover struct {
|
||||||
sentinelAddrs []string
|
sentinelAddrs []string
|
||||||
|
|
||||||
|
@ -232,7 +245,9 @@ func (c *sentinelFailover) MasterAddr() (string, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *sentinelFailover) masterAddr() (string, error) {
|
func (c *sentinelFailover) masterAddr() (string, error) {
|
||||||
|
c.mu.RLock()
|
||||||
addr := c.getMasterAddr()
|
addr := c.getMasterAddr()
|
||||||
|
c.mu.RUnlock()
|
||||||
if addr != "" {
|
if addr != "" {
|
||||||
return addr, nil
|
return addr, nil
|
||||||
}
|
}
|
||||||
|
@ -240,6 +255,15 @@ func (c *sentinelFailover) masterAddr() (string, error) {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
addr = c.getMasterAddr()
|
||||||
|
if addr != "" {
|
||||||
|
return addr, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.sentinel != nil {
|
||||||
|
c.closeSentinel()
|
||||||
|
}
|
||||||
|
|
||||||
for i, sentinelAddr := range c.sentinelAddrs {
|
for i, sentinelAddr := range c.sentinelAddrs {
|
||||||
sentinel := NewSentinelClient(&Options{
|
sentinel := NewSentinelClient(&Options{
|
||||||
Addr: sentinelAddr,
|
Addr: sentinelAddr,
|
||||||
|
@ -278,9 +302,7 @@ func (c *sentinelFailover) masterAddr() (string, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *sentinelFailover) getMasterAddr() string {
|
func (c *sentinelFailover) getMasterAddr() string {
|
||||||
c.mu.RLock()
|
|
||||||
sentinel := c.sentinel
|
sentinel := c.sentinel
|
||||||
c.mu.RUnlock()
|
|
||||||
|
|
||||||
if sentinel == nil {
|
if sentinel == nil {
|
||||||
return ""
|
return ""
|
||||||
|
@ -290,11 +312,6 @@ func (c *sentinelFailover) getMasterAddr() string {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
internal.Logf("sentinel: GetMasterAddrByName name=%q failed: %s",
|
internal.Logf("sentinel: GetMasterAddrByName name=%q failed: %s",
|
||||||
c.masterName, err)
|
c.masterName, err)
|
||||||
c.mu.Lock()
|
|
||||||
if c.sentinel == sentinel {
|
|
||||||
c.closeSentinel()
|
|
||||||
}
|
|
||||||
c.mu.Unlock()
|
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -376,8 +393,7 @@ func (c *sentinelFailover) listen(pubsub *PubSub) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
switch msg.Channel {
|
if msg.Channel == "+switch-master" {
|
||||||
case "+switch-master":
|
|
||||||
parts := strings.Split(msg.Payload, " ")
|
parts := strings.Split(msg.Payload, " ")
|
||||||
if parts[0] != c.masterName {
|
if parts[0] != c.masterName {
|
||||||
internal.Logf("sentinel: ignore addr for master=%q", parts[0])
|
internal.Logf("sentinel: ignore addr for master=%q", parts[0])
|
||||||
|
|
1
vendor/github.com/go-redis/redis/universal.go
generated
vendored
1
vendor/github.com/go-redis/redis/universal.go
generated
vendored
|
@ -155,6 +155,7 @@ type UniversalClient interface {
|
||||||
Watch(fn func(*Tx) error, keys ...string) error
|
Watch(fn func(*Tx) error, keys ...string) error
|
||||||
Process(cmd Cmder) error
|
Process(cmd Cmder) error
|
||||||
WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error)
|
WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error)
|
||||||
|
WrapProcessPipeline(fn func(oldProcess func([]Cmder) error) func([]Cmder) error)
|
||||||
Subscribe(channels ...string) *PubSub
|
Subscribe(channels ...string) *PubSub
|
||||||
PSubscribe(channels ...string) *PubSub
|
PSubscribe(channels ...string) *PubSub
|
||||||
Close() error
|
Close() error
|
||||||
|
|
2
vendor/modules.txt
vendored
2
vendor/modules.txt
vendored
|
@ -60,7 +60,7 @@ github.com/go-openapi/jsonreference
|
||||||
github.com/go-openapi/spec
|
github.com/go-openapi/spec
|
||||||
# github.com/go-openapi/swag v0.19.5
|
# github.com/go-openapi/swag v0.19.5
|
||||||
github.com/go-openapi/swag
|
github.com/go-openapi/swag
|
||||||
# github.com/go-redis/redis v6.15.2+incompatible
|
# github.com/go-redis/redis v6.15.7+incompatible
|
||||||
github.com/go-redis/redis
|
github.com/go-redis/redis
|
||||||
github.com/go-redis/redis/internal
|
github.com/go-redis/redis/internal
|
||||||
github.com/go-redis/redis/internal/consistenthash
|
github.com/go-redis/redis/internal/consistenthash
|
||||||
|
|
Loading…
Add table
Reference in a new issue