d28f005552
Fix limit for databases other than sqlite go mod tidy && go mod vendor Remove unneeded break statements Make everything work with the new xorm version Fix xorm logging Fix lint Fix redis init Fix using id field Fix database init for testing Change default database log level Add xorm logger Use const for postgres go mod tidy Merge branch 'master' into update/xorm # Conflicts: # go.mod # go.sum # vendor/modules.txt go mod vendor Fix loading fixtures for postgres Go mod vendor1 Update xorm to version 1 Co-authored-by: kolaente <k@knt.li> Reviewed-on: https://kolaente.dev/vikunja/api/pulls/323
854 lines
17 KiB
Go
854 lines
17 KiB
Go
// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
|
|
// All rights reserved.
|
|
//
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file.
|
|
|
|
package leveldb
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/syndtr/goleveldb/leveldb/errors"
|
|
"github.com/syndtr/goleveldb/leveldb/opt"
|
|
"github.com/syndtr/goleveldb/leveldb/storage"
|
|
)
|
|
|
|
var (
|
|
errCompactionTransactExiting = errors.New("leveldb: compaction transact exiting")
|
|
)
|
|
|
|
type cStat struct {
|
|
duration time.Duration
|
|
read int64
|
|
write int64
|
|
}
|
|
|
|
func (p *cStat) add(n *cStatStaging) {
|
|
p.duration += n.duration
|
|
p.read += n.read
|
|
p.write += n.write
|
|
}
|
|
|
|
func (p *cStat) get() (duration time.Duration, read, write int64) {
|
|
return p.duration, p.read, p.write
|
|
}
|
|
|
|
type cStatStaging struct {
|
|
start time.Time
|
|
duration time.Duration
|
|
on bool
|
|
read int64
|
|
write int64
|
|
}
|
|
|
|
func (p *cStatStaging) startTimer() {
|
|
if !p.on {
|
|
p.start = time.Now()
|
|
p.on = true
|
|
}
|
|
}
|
|
|
|
func (p *cStatStaging) stopTimer() {
|
|
if p.on {
|
|
p.duration += time.Since(p.start)
|
|
p.on = false
|
|
}
|
|
}
|
|
|
|
type cStats struct {
|
|
lk sync.Mutex
|
|
stats []cStat
|
|
}
|
|
|
|
func (p *cStats) addStat(level int, n *cStatStaging) {
|
|
p.lk.Lock()
|
|
if level >= len(p.stats) {
|
|
newStats := make([]cStat, level+1)
|
|
copy(newStats, p.stats)
|
|
p.stats = newStats
|
|
}
|
|
p.stats[level].add(n)
|
|
p.lk.Unlock()
|
|
}
|
|
|
|
func (p *cStats) getStat(level int) (duration time.Duration, read, write int64) {
|
|
p.lk.Lock()
|
|
defer p.lk.Unlock()
|
|
if level < len(p.stats) {
|
|
return p.stats[level].get()
|
|
}
|
|
return
|
|
}
|
|
|
|
func (db *DB) compactionError() {
|
|
var err error
|
|
noerr:
|
|
// No error.
|
|
for {
|
|
select {
|
|
case err = <-db.compErrSetC:
|
|
switch {
|
|
case err == nil:
|
|
case err == ErrReadOnly, errors.IsCorrupted(err):
|
|
goto hasperr
|
|
default:
|
|
goto haserr
|
|
}
|
|
case <-db.closeC:
|
|
return
|
|
}
|
|
}
|
|
haserr:
|
|
// Transient error.
|
|
for {
|
|
select {
|
|
case db.compErrC <- err:
|
|
case err = <-db.compErrSetC:
|
|
switch {
|
|
case err == nil:
|
|
goto noerr
|
|
case err == ErrReadOnly, errors.IsCorrupted(err):
|
|
goto hasperr
|
|
default:
|
|
}
|
|
case <-db.closeC:
|
|
return
|
|
}
|
|
}
|
|
hasperr:
|
|
// Persistent error.
|
|
for {
|
|
select {
|
|
case db.compErrC <- err:
|
|
case db.compPerErrC <- err:
|
|
case db.writeLockC <- struct{}{}:
|
|
// Hold write lock, so that write won't pass-through.
|
|
db.compWriteLocking = true
|
|
case <-db.closeC:
|
|
if db.compWriteLocking {
|
|
// We should release the lock or Close will hang.
|
|
<-db.writeLockC
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
type compactionTransactCounter int
|
|
|
|
func (cnt *compactionTransactCounter) incr() {
|
|
*cnt++
|
|
}
|
|
|
|
type compactionTransactInterface interface {
|
|
run(cnt *compactionTransactCounter) error
|
|
revert() error
|
|
}
|
|
|
|
func (db *DB) compactionTransact(name string, t compactionTransactInterface) {
|
|
defer func() {
|
|
if x := recover(); x != nil {
|
|
if x == errCompactionTransactExiting {
|
|
if err := t.revert(); err != nil {
|
|
db.logf("%s revert error %q", name, err)
|
|
}
|
|
}
|
|
panic(x)
|
|
}
|
|
}()
|
|
|
|
const (
|
|
backoffMin = 1 * time.Second
|
|
backoffMax = 8 * time.Second
|
|
backoffMul = 2 * time.Second
|
|
)
|
|
var (
|
|
backoff = backoffMin
|
|
backoffT = time.NewTimer(backoff)
|
|
lastCnt = compactionTransactCounter(0)
|
|
|
|
disableBackoff = db.s.o.GetDisableCompactionBackoff()
|
|
)
|
|
for n := 0; ; n++ {
|
|
// Check whether the DB is closed.
|
|
if db.isClosed() {
|
|
db.logf("%s exiting", name)
|
|
db.compactionExitTransact()
|
|
} else if n > 0 {
|
|
db.logf("%s retrying N·%d", name, n)
|
|
}
|
|
|
|
// Execute.
|
|
cnt := compactionTransactCounter(0)
|
|
err := t.run(&cnt)
|
|
if err != nil {
|
|
db.logf("%s error I·%d %q", name, cnt, err)
|
|
}
|
|
|
|
// Set compaction error status.
|
|
select {
|
|
case db.compErrSetC <- err:
|
|
case perr := <-db.compPerErrC:
|
|
if err != nil {
|
|
db.logf("%s exiting (persistent error %q)", name, perr)
|
|
db.compactionExitTransact()
|
|
}
|
|
case <-db.closeC:
|
|
db.logf("%s exiting", name)
|
|
db.compactionExitTransact()
|
|
}
|
|
if err == nil {
|
|
return
|
|
}
|
|
if errors.IsCorrupted(err) {
|
|
db.logf("%s exiting (corruption detected)", name)
|
|
db.compactionExitTransact()
|
|
}
|
|
|
|
if !disableBackoff {
|
|
// Reset backoff duration if counter is advancing.
|
|
if cnt > lastCnt {
|
|
backoff = backoffMin
|
|
lastCnt = cnt
|
|
}
|
|
|
|
// Backoff.
|
|
backoffT.Reset(backoff)
|
|
if backoff < backoffMax {
|
|
backoff *= backoffMul
|
|
if backoff > backoffMax {
|
|
backoff = backoffMax
|
|
}
|
|
}
|
|
select {
|
|
case <-backoffT.C:
|
|
case <-db.closeC:
|
|
db.logf("%s exiting", name)
|
|
db.compactionExitTransact()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
type compactionTransactFunc struct {
|
|
runFunc func(cnt *compactionTransactCounter) error
|
|
revertFunc func() error
|
|
}
|
|
|
|
func (t *compactionTransactFunc) run(cnt *compactionTransactCounter) error {
|
|
return t.runFunc(cnt)
|
|
}
|
|
|
|
func (t *compactionTransactFunc) revert() error {
|
|
if t.revertFunc != nil {
|
|
return t.revertFunc()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (db *DB) compactionTransactFunc(name string, run func(cnt *compactionTransactCounter) error, revert func() error) {
|
|
db.compactionTransact(name, &compactionTransactFunc{run, revert})
|
|
}
|
|
|
|
func (db *DB) compactionExitTransact() {
|
|
panic(errCompactionTransactExiting)
|
|
}
|
|
|
|
func (db *DB) compactionCommit(name string, rec *sessionRecord) {
|
|
db.compCommitLk.Lock()
|
|
defer db.compCommitLk.Unlock() // Defer is necessary.
|
|
db.compactionTransactFunc(name+"@commit", func(cnt *compactionTransactCounter) error {
|
|
return db.s.commit(rec)
|
|
}, nil)
|
|
}
|
|
|
|
func (db *DB) memCompaction() {
|
|
mdb := db.getFrozenMem()
|
|
if mdb == nil {
|
|
return
|
|
}
|
|
defer mdb.decref()
|
|
|
|
db.logf("memdb@flush N·%d S·%s", mdb.Len(), shortenb(mdb.Size()))
|
|
|
|
// Don't compact empty memdb.
|
|
if mdb.Len() == 0 {
|
|
db.logf("memdb@flush skipping")
|
|
// drop frozen memdb
|
|
db.dropFrozenMem()
|
|
return
|
|
}
|
|
|
|
// Pause table compaction.
|
|
resumeC := make(chan struct{})
|
|
select {
|
|
case db.tcompPauseC <- (chan<- struct{})(resumeC):
|
|
case <-db.compPerErrC:
|
|
close(resumeC)
|
|
resumeC = nil
|
|
case <-db.closeC:
|
|
db.compactionExitTransact()
|
|
}
|
|
|
|
var (
|
|
rec = &sessionRecord{}
|
|
stats = &cStatStaging{}
|
|
flushLevel int
|
|
)
|
|
|
|
// Generate tables.
|
|
db.compactionTransactFunc("memdb@flush", func(cnt *compactionTransactCounter) (err error) {
|
|
stats.startTimer()
|
|
flushLevel, err = db.s.flushMemdb(rec, mdb.DB, db.memdbMaxLevel)
|
|
stats.stopTimer()
|
|
return
|
|
}, func() error {
|
|
for _, r := range rec.addedTables {
|
|
db.logf("memdb@flush revert @%d", r.num)
|
|
if err := db.s.stor.Remove(storage.FileDesc{Type: storage.TypeTable, Num: r.num}); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
|
|
rec.setJournalNum(db.journalFd.Num)
|
|
rec.setSeqNum(db.frozenSeq)
|
|
|
|
// Commit.
|
|
stats.startTimer()
|
|
db.compactionCommit("memdb", rec)
|
|
stats.stopTimer()
|
|
|
|
db.logf("memdb@flush committed F·%d T·%v", len(rec.addedTables), stats.duration)
|
|
|
|
for _, r := range rec.addedTables {
|
|
stats.write += r.size
|
|
}
|
|
db.compStats.addStat(flushLevel, stats)
|
|
|
|
// Drop frozen memdb.
|
|
db.dropFrozenMem()
|
|
|
|
// Resume table compaction.
|
|
if resumeC != nil {
|
|
select {
|
|
case <-resumeC:
|
|
close(resumeC)
|
|
case <-db.closeC:
|
|
db.compactionExitTransact()
|
|
}
|
|
}
|
|
|
|
// Trigger table compaction.
|
|
db.compTrigger(db.tcompCmdC)
|
|
}
|
|
|
|
type tableCompactionBuilder struct {
|
|
db *DB
|
|
s *session
|
|
c *compaction
|
|
rec *sessionRecord
|
|
stat0, stat1 *cStatStaging
|
|
|
|
snapHasLastUkey bool
|
|
snapLastUkey []byte
|
|
snapLastSeq uint64
|
|
snapIter int
|
|
snapKerrCnt int
|
|
snapDropCnt int
|
|
|
|
kerrCnt int
|
|
dropCnt int
|
|
|
|
minSeq uint64
|
|
strict bool
|
|
tableSize int
|
|
|
|
tw *tWriter
|
|
}
|
|
|
|
func (b *tableCompactionBuilder) appendKV(key, value []byte) error {
|
|
// Create new table if not already.
|
|
if b.tw == nil {
|
|
// Check for pause event.
|
|
if b.db != nil {
|
|
select {
|
|
case ch := <-b.db.tcompPauseC:
|
|
b.db.pauseCompaction(ch)
|
|
case <-b.db.closeC:
|
|
b.db.compactionExitTransact()
|
|
default:
|
|
}
|
|
}
|
|
|
|
// Create new table.
|
|
var err error
|
|
b.tw, err = b.s.tops.create()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Write key/value into table.
|
|
return b.tw.append(key, value)
|
|
}
|
|
|
|
func (b *tableCompactionBuilder) needFlush() bool {
|
|
return b.tw.tw.BytesLen() >= b.tableSize
|
|
}
|
|
|
|
func (b *tableCompactionBuilder) flush() error {
|
|
t, err := b.tw.finish()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
b.rec.addTableFile(b.c.sourceLevel+1, t)
|
|
b.stat1.write += t.size
|
|
b.s.logf("table@build created L%d@%d N·%d S·%s %q:%q", b.c.sourceLevel+1, t.fd.Num, b.tw.tw.EntriesLen(), shortenb(int(t.size)), t.imin, t.imax)
|
|
b.tw = nil
|
|
return nil
|
|
}
|
|
|
|
func (b *tableCompactionBuilder) cleanup() {
|
|
if b.tw != nil {
|
|
b.tw.drop()
|
|
b.tw = nil
|
|
}
|
|
}
|
|
|
|
func (b *tableCompactionBuilder) run(cnt *compactionTransactCounter) error {
|
|
snapResumed := b.snapIter > 0
|
|
hasLastUkey := b.snapHasLastUkey // The key might has zero length, so this is necessary.
|
|
lastUkey := append([]byte{}, b.snapLastUkey...)
|
|
lastSeq := b.snapLastSeq
|
|
b.kerrCnt = b.snapKerrCnt
|
|
b.dropCnt = b.snapDropCnt
|
|
// Restore compaction state.
|
|
b.c.restore()
|
|
|
|
defer b.cleanup()
|
|
|
|
b.stat1.startTimer()
|
|
defer b.stat1.stopTimer()
|
|
|
|
iter := b.c.newIterator()
|
|
defer iter.Release()
|
|
for i := 0; iter.Next(); i++ {
|
|
// Incr transact counter.
|
|
cnt.incr()
|
|
|
|
// Skip until last state.
|
|
if i < b.snapIter {
|
|
continue
|
|
}
|
|
|
|
resumed := false
|
|
if snapResumed {
|
|
resumed = true
|
|
snapResumed = false
|
|
}
|
|
|
|
ikey := iter.Key()
|
|
ukey, seq, kt, kerr := parseInternalKey(ikey)
|
|
|
|
if kerr == nil {
|
|
shouldStop := !resumed && b.c.shouldStopBefore(ikey)
|
|
|
|
if !hasLastUkey || b.s.icmp.uCompare(lastUkey, ukey) != 0 {
|
|
// First occurrence of this user key.
|
|
|
|
// Only rotate tables if ukey doesn't hop across.
|
|
if b.tw != nil && (shouldStop || b.needFlush()) {
|
|
if err := b.flush(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Creates snapshot of the state.
|
|
b.c.save()
|
|
b.snapHasLastUkey = hasLastUkey
|
|
b.snapLastUkey = append(b.snapLastUkey[:0], lastUkey...)
|
|
b.snapLastSeq = lastSeq
|
|
b.snapIter = i
|
|
b.snapKerrCnt = b.kerrCnt
|
|
b.snapDropCnt = b.dropCnt
|
|
}
|
|
|
|
hasLastUkey = true
|
|
lastUkey = append(lastUkey[:0], ukey...)
|
|
lastSeq = keyMaxSeq
|
|
}
|
|
|
|
switch {
|
|
case lastSeq <= b.minSeq:
|
|
// Dropped because newer entry for same user key exist
|
|
fallthrough // (A)
|
|
case kt == keyTypeDel && seq <= b.minSeq && b.c.baseLevelForKey(lastUkey):
|
|
// For this user key:
|
|
// (1) there is no data in higher levels
|
|
// (2) data in lower levels will have larger seq numbers
|
|
// (3) data in layers that are being compacted here and have
|
|
// smaller seq numbers will be dropped in the next
|
|
// few iterations of this loop (by rule (A) above).
|
|
// Therefore this deletion marker is obsolete and can be dropped.
|
|
lastSeq = seq
|
|
b.dropCnt++
|
|
continue
|
|
default:
|
|
lastSeq = seq
|
|
}
|
|
} else {
|
|
if b.strict {
|
|
return kerr
|
|
}
|
|
|
|
// Don't drop corrupted keys.
|
|
hasLastUkey = false
|
|
lastUkey = lastUkey[:0]
|
|
lastSeq = keyMaxSeq
|
|
b.kerrCnt++
|
|
}
|
|
|
|
if err := b.appendKV(ikey, iter.Value()); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if err := iter.Error(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Finish last table.
|
|
if b.tw != nil && !b.tw.empty() {
|
|
return b.flush()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (b *tableCompactionBuilder) revert() error {
|
|
for _, at := range b.rec.addedTables {
|
|
b.s.logf("table@build revert @%d", at.num)
|
|
if err := b.s.stor.Remove(storage.FileDesc{Type: storage.TypeTable, Num: at.num}); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (db *DB) tableCompaction(c *compaction, noTrivial bool) {
|
|
defer c.release()
|
|
|
|
rec := &sessionRecord{}
|
|
rec.addCompPtr(c.sourceLevel, c.imax)
|
|
|
|
if !noTrivial && c.trivial() {
|
|
t := c.levels[0][0]
|
|
db.logf("table@move L%d@%d -> L%d", c.sourceLevel, t.fd.Num, c.sourceLevel+1)
|
|
rec.delTable(c.sourceLevel, t.fd.Num)
|
|
rec.addTableFile(c.sourceLevel+1, t)
|
|
db.compactionCommit("table-move", rec)
|
|
return
|
|
}
|
|
|
|
var stats [2]cStatStaging
|
|
for i, tables := range c.levels {
|
|
for _, t := range tables {
|
|
stats[i].read += t.size
|
|
// Insert deleted tables into record
|
|
rec.delTable(c.sourceLevel+i, t.fd.Num)
|
|
}
|
|
}
|
|
sourceSize := int(stats[0].read + stats[1].read)
|
|
minSeq := db.minSeq()
|
|
db.logf("table@compaction L%d·%d -> L%d·%d S·%s Q·%d", c.sourceLevel, len(c.levels[0]), c.sourceLevel+1, len(c.levels[1]), shortenb(sourceSize), minSeq)
|
|
|
|
b := &tableCompactionBuilder{
|
|
db: db,
|
|
s: db.s,
|
|
c: c,
|
|
rec: rec,
|
|
stat1: &stats[1],
|
|
minSeq: minSeq,
|
|
strict: db.s.o.GetStrict(opt.StrictCompaction),
|
|
tableSize: db.s.o.GetCompactionTableSize(c.sourceLevel + 1),
|
|
}
|
|
db.compactionTransact("table@build", b)
|
|
|
|
// Commit.
|
|
stats[1].startTimer()
|
|
db.compactionCommit("table", rec)
|
|
stats[1].stopTimer()
|
|
|
|
resultSize := int(stats[1].write)
|
|
db.logf("table@compaction committed F%s S%s Ke·%d D·%d T·%v", sint(len(rec.addedTables)-len(rec.deletedTables)), sshortenb(resultSize-sourceSize), b.kerrCnt, b.dropCnt, stats[1].duration)
|
|
|
|
// Save compaction stats
|
|
for i := range stats {
|
|
db.compStats.addStat(c.sourceLevel+1, &stats[i])
|
|
}
|
|
}
|
|
|
|
func (db *DB) tableRangeCompaction(level int, umin, umax []byte) error {
|
|
db.logf("table@compaction range L%d %q:%q", level, umin, umax)
|
|
if level >= 0 {
|
|
if c := db.s.getCompactionRange(level, umin, umax, true); c != nil {
|
|
db.tableCompaction(c, true)
|
|
}
|
|
} else {
|
|
// Retry until nothing to compact.
|
|
for {
|
|
compacted := false
|
|
|
|
// Scan for maximum level with overlapped tables.
|
|
v := db.s.version()
|
|
m := 1
|
|
for i := m; i < len(v.levels); i++ {
|
|
tables := v.levels[i]
|
|
if tables.overlaps(db.s.icmp, umin, umax, false) {
|
|
m = i
|
|
}
|
|
}
|
|
v.release()
|
|
|
|
for level := 0; level < m; level++ {
|
|
if c := db.s.getCompactionRange(level, umin, umax, false); c != nil {
|
|
db.tableCompaction(c, true)
|
|
compacted = true
|
|
}
|
|
}
|
|
|
|
if !compacted {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (db *DB) tableAutoCompaction() {
|
|
if c := db.s.pickCompaction(); c != nil {
|
|
db.tableCompaction(c, false)
|
|
}
|
|
}
|
|
|
|
func (db *DB) tableNeedCompaction() bool {
|
|
v := db.s.version()
|
|
defer v.release()
|
|
return v.needCompaction()
|
|
}
|
|
|
|
// resumeWrite returns an indicator whether we should resume write operation if enough level0 files are compacted.
|
|
func (db *DB) resumeWrite() bool {
|
|
v := db.s.version()
|
|
defer v.release()
|
|
if v.tLen(0) < db.s.o.GetWriteL0PauseTrigger() {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (db *DB) pauseCompaction(ch chan<- struct{}) {
|
|
select {
|
|
case ch <- struct{}{}:
|
|
case <-db.closeC:
|
|
db.compactionExitTransact()
|
|
}
|
|
}
|
|
|
|
type cCmd interface {
|
|
ack(err error)
|
|
}
|
|
|
|
type cAuto struct {
|
|
// Note for table compaction, an non-empty ackC represents it's a compaction waiting command.
|
|
ackC chan<- error
|
|
}
|
|
|
|
func (r cAuto) ack(err error) {
|
|
if r.ackC != nil {
|
|
defer func() {
|
|
recover()
|
|
}()
|
|
r.ackC <- err
|
|
}
|
|
}
|
|
|
|
type cRange struct {
|
|
level int
|
|
min, max []byte
|
|
ackC chan<- error
|
|
}
|
|
|
|
func (r cRange) ack(err error) {
|
|
if r.ackC != nil {
|
|
defer func() {
|
|
recover()
|
|
}()
|
|
r.ackC <- err
|
|
}
|
|
}
|
|
|
|
// This will trigger auto compaction but will not wait for it.
|
|
func (db *DB) compTrigger(compC chan<- cCmd) {
|
|
select {
|
|
case compC <- cAuto{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
// This will trigger auto compaction and/or wait for all compaction to be done.
|
|
func (db *DB) compTriggerWait(compC chan<- cCmd) (err error) {
|
|
ch := make(chan error)
|
|
defer close(ch)
|
|
// Send cmd.
|
|
select {
|
|
case compC <- cAuto{ch}:
|
|
case err = <-db.compErrC:
|
|
return
|
|
case <-db.closeC:
|
|
return ErrClosed
|
|
}
|
|
// Wait cmd.
|
|
select {
|
|
case err = <-ch:
|
|
case err = <-db.compErrC:
|
|
case <-db.closeC:
|
|
return ErrClosed
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Send range compaction request.
|
|
func (db *DB) compTriggerRange(compC chan<- cCmd, level int, min, max []byte) (err error) {
|
|
ch := make(chan error)
|
|
defer close(ch)
|
|
// Send cmd.
|
|
select {
|
|
case compC <- cRange{level, min, max, ch}:
|
|
case err := <-db.compErrC:
|
|
return err
|
|
case <-db.closeC:
|
|
return ErrClosed
|
|
}
|
|
// Wait cmd.
|
|
select {
|
|
case err = <-ch:
|
|
case err = <-db.compErrC:
|
|
case <-db.closeC:
|
|
return ErrClosed
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (db *DB) mCompaction() {
|
|
var x cCmd
|
|
|
|
defer func() {
|
|
if x := recover(); x != nil {
|
|
if x != errCompactionTransactExiting {
|
|
panic(x)
|
|
}
|
|
}
|
|
if x != nil {
|
|
x.ack(ErrClosed)
|
|
}
|
|
db.closeW.Done()
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case x = <-db.mcompCmdC:
|
|
switch x.(type) {
|
|
case cAuto:
|
|
db.memCompaction()
|
|
x.ack(nil)
|
|
x = nil
|
|
default:
|
|
panic("leveldb: unknown command")
|
|
}
|
|
case <-db.closeC:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (db *DB) tCompaction() {
|
|
var (
|
|
x cCmd
|
|
waitQ []cCmd
|
|
)
|
|
|
|
defer func() {
|
|
if x := recover(); x != nil {
|
|
if x != errCompactionTransactExiting {
|
|
panic(x)
|
|
}
|
|
}
|
|
for i := range waitQ {
|
|
waitQ[i].ack(ErrClosed)
|
|
waitQ[i] = nil
|
|
}
|
|
if x != nil {
|
|
x.ack(ErrClosed)
|
|
}
|
|
db.closeW.Done()
|
|
}()
|
|
|
|
for {
|
|
if db.tableNeedCompaction() {
|
|
select {
|
|
case x = <-db.tcompCmdC:
|
|
case ch := <-db.tcompPauseC:
|
|
db.pauseCompaction(ch)
|
|
continue
|
|
case <-db.closeC:
|
|
return
|
|
default:
|
|
}
|
|
// Resume write operation as soon as possible.
|
|
if len(waitQ) > 0 && db.resumeWrite() {
|
|
for i := range waitQ {
|
|
waitQ[i].ack(nil)
|
|
waitQ[i] = nil
|
|
}
|
|
waitQ = waitQ[:0]
|
|
}
|
|
} else {
|
|
for i := range waitQ {
|
|
waitQ[i].ack(nil)
|
|
waitQ[i] = nil
|
|
}
|
|
waitQ = waitQ[:0]
|
|
select {
|
|
case x = <-db.tcompCmdC:
|
|
case ch := <-db.tcompPauseC:
|
|
db.pauseCompaction(ch)
|
|
continue
|
|
case <-db.closeC:
|
|
return
|
|
}
|
|
}
|
|
if x != nil {
|
|
switch cmd := x.(type) {
|
|
case cAuto:
|
|
if cmd.ackC != nil {
|
|
// Check the write pause state before caching it.
|
|
if db.resumeWrite() {
|
|
x.ack(nil)
|
|
} else {
|
|
waitQ = append(waitQ, x)
|
|
}
|
|
}
|
|
case cRange:
|
|
x.ack(db.tableRangeCompaction(cmd.level, cmd.min, cmd.max))
|
|
default:
|
|
panic("leveldb: unknown command")
|
|
}
|
|
x = nil
|
|
}
|
|
db.tableAutoCompaction()
|
|
}
|
|
}
|