d28f005552
Fix limit for databases other than sqlite go mod tidy && go mod vendor Remove unneeded break statements Make everything work with the new xorm version Fix xorm logging Fix lint Fix redis init Fix using id field Fix database init for testing Change default database log level Add xorm logger Use const for postgres go mod tidy Merge branch 'master' into update/xorm # Conflicts: # go.mod # go.sum # vendor/modules.txt go mod vendor Fix loading fixtures for postgres Go mod vendor1 Update xorm to version 1 Co-authored-by: kolaente <k@knt.li> Reviewed-on: https://kolaente.dev/vikunja/api/pulls/323
1179 lines
28 KiB
Go
1179 lines
28 KiB
Go
// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
|
|
// All rights reserved.
|
|
//
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file.
|
|
|
|
package leveldb
|
|
|
|
import (
|
|
"container/list"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"runtime"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/syndtr/goleveldb/leveldb/errors"
|
|
"github.com/syndtr/goleveldb/leveldb/iterator"
|
|
"github.com/syndtr/goleveldb/leveldb/journal"
|
|
"github.com/syndtr/goleveldb/leveldb/memdb"
|
|
"github.com/syndtr/goleveldb/leveldb/opt"
|
|
"github.com/syndtr/goleveldb/leveldb/storage"
|
|
"github.com/syndtr/goleveldb/leveldb/table"
|
|
"github.com/syndtr/goleveldb/leveldb/util"
|
|
)
|
|
|
|
// DB is a LevelDB database.
|
|
type DB struct {
|
|
// Need 64-bit alignment.
|
|
seq uint64
|
|
|
|
// Stats. Need 64-bit alignment.
|
|
cWriteDelay int64 // The cumulative duration of write delays
|
|
cWriteDelayN int32 // The cumulative number of write delays
|
|
inWritePaused int32 // The indicator whether write operation is paused by compaction
|
|
aliveSnaps, aliveIters int32
|
|
|
|
// Session.
|
|
s *session
|
|
|
|
// MemDB.
|
|
memMu sync.RWMutex
|
|
memPool chan *memdb.DB
|
|
mem, frozenMem *memDB
|
|
journal *journal.Writer
|
|
journalWriter storage.Writer
|
|
journalFd storage.FileDesc
|
|
frozenJournalFd storage.FileDesc
|
|
frozenSeq uint64
|
|
|
|
// Snapshot.
|
|
snapsMu sync.Mutex
|
|
snapsList *list.List
|
|
|
|
// Write.
|
|
batchPool sync.Pool
|
|
writeMergeC chan writeMerge
|
|
writeMergedC chan bool
|
|
writeLockC chan struct{}
|
|
writeAckC chan error
|
|
writeDelay time.Duration
|
|
writeDelayN int
|
|
tr *Transaction
|
|
|
|
// Compaction.
|
|
compCommitLk sync.Mutex
|
|
tcompCmdC chan cCmd
|
|
tcompPauseC chan chan<- struct{}
|
|
mcompCmdC chan cCmd
|
|
compErrC chan error
|
|
compPerErrC chan error
|
|
compErrSetC chan error
|
|
compWriteLocking bool
|
|
compStats cStats
|
|
memdbMaxLevel int // For testing.
|
|
|
|
// Close.
|
|
closeW sync.WaitGroup
|
|
closeC chan struct{}
|
|
closed uint32
|
|
closer io.Closer
|
|
}
|
|
|
|
func openDB(s *session) (*DB, error) {
|
|
s.log("db@open opening")
|
|
start := time.Now()
|
|
db := &DB{
|
|
s: s,
|
|
// Initial sequence
|
|
seq: s.stSeqNum,
|
|
// MemDB
|
|
memPool: make(chan *memdb.DB, 1),
|
|
// Snapshot
|
|
snapsList: list.New(),
|
|
// Write
|
|
batchPool: sync.Pool{New: newBatch},
|
|
writeMergeC: make(chan writeMerge),
|
|
writeMergedC: make(chan bool),
|
|
writeLockC: make(chan struct{}, 1),
|
|
writeAckC: make(chan error),
|
|
// Compaction
|
|
tcompCmdC: make(chan cCmd),
|
|
tcompPauseC: make(chan chan<- struct{}),
|
|
mcompCmdC: make(chan cCmd),
|
|
compErrC: make(chan error),
|
|
compPerErrC: make(chan error),
|
|
compErrSetC: make(chan error),
|
|
// Close
|
|
closeC: make(chan struct{}),
|
|
}
|
|
|
|
// Read-only mode.
|
|
readOnly := s.o.GetReadOnly()
|
|
|
|
if readOnly {
|
|
// Recover journals (read-only mode).
|
|
if err := db.recoverJournalRO(); err != nil {
|
|
return nil, err
|
|
}
|
|
} else {
|
|
// Recover journals.
|
|
if err := db.recoverJournal(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Remove any obsolete files.
|
|
if err := db.checkAndCleanFiles(); err != nil {
|
|
// Close journal.
|
|
if db.journal != nil {
|
|
db.journal.Close()
|
|
db.journalWriter.Close()
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
}
|
|
|
|
// Doesn't need to be included in the wait group.
|
|
go db.compactionError()
|
|
go db.mpoolDrain()
|
|
|
|
if readOnly {
|
|
db.SetReadOnly()
|
|
} else {
|
|
db.closeW.Add(2)
|
|
go db.tCompaction()
|
|
go db.mCompaction()
|
|
// go db.jWriter()
|
|
}
|
|
|
|
s.logf("db@open done T·%v", time.Since(start))
|
|
|
|
runtime.SetFinalizer(db, (*DB).Close)
|
|
return db, nil
|
|
}
|
|
|
|
// Open opens or creates a DB for the given storage.
|
|
// The DB will be created if not exist, unless ErrorIfMissing is true.
|
|
// Also, if ErrorIfExist is true and the DB exist Open will returns
|
|
// os.ErrExist error.
|
|
//
|
|
// Open will return an error with type of ErrCorrupted if corruption
|
|
// detected in the DB. Use errors.IsCorrupted to test whether an error is
|
|
// due to corruption. Corrupted DB can be recovered with Recover function.
|
|
//
|
|
// The returned DB instance is safe for concurrent use.
|
|
// The DB must be closed after use, by calling Close method.
|
|
func Open(stor storage.Storage, o *opt.Options) (db *DB, err error) {
|
|
s, err := newSession(stor, o)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer func() {
|
|
if err != nil {
|
|
s.close()
|
|
s.release()
|
|
}
|
|
}()
|
|
|
|
err = s.recover()
|
|
if err != nil {
|
|
if !os.IsNotExist(err) || s.o.GetErrorIfMissing() || s.o.GetReadOnly() {
|
|
return
|
|
}
|
|
err = s.create()
|
|
if err != nil {
|
|
return
|
|
}
|
|
} else if s.o.GetErrorIfExist() {
|
|
err = os.ErrExist
|
|
return
|
|
}
|
|
|
|
return openDB(s)
|
|
}
|
|
|
|
// OpenFile opens or creates a DB for the given path.
|
|
// The DB will be created if not exist, unless ErrorIfMissing is true.
|
|
// Also, if ErrorIfExist is true and the DB exist OpenFile will returns
|
|
// os.ErrExist error.
|
|
//
|
|
// OpenFile uses standard file-system backed storage implementation as
|
|
// described in the leveldb/storage package.
|
|
//
|
|
// OpenFile will return an error with type of ErrCorrupted if corruption
|
|
// detected in the DB. Use errors.IsCorrupted to test whether an error is
|
|
// due to corruption. Corrupted DB can be recovered with Recover function.
|
|
//
|
|
// The returned DB instance is safe for concurrent use.
|
|
// The DB must be closed after use, by calling Close method.
|
|
func OpenFile(path string, o *opt.Options) (db *DB, err error) {
|
|
stor, err := storage.OpenFile(path, o.GetReadOnly())
|
|
if err != nil {
|
|
return
|
|
}
|
|
db, err = Open(stor, o)
|
|
if err != nil {
|
|
stor.Close()
|
|
} else {
|
|
db.closer = stor
|
|
}
|
|
return
|
|
}
|
|
|
|
// Recover recovers and opens a DB with missing or corrupted manifest files
|
|
// for the given storage. It will ignore any manifest files, valid or not.
|
|
// The DB must already exist or it will returns an error.
|
|
// Also, Recover will ignore ErrorIfMissing and ErrorIfExist options.
|
|
//
|
|
// The returned DB instance is safe for concurrent use.
|
|
// The DB must be closed after use, by calling Close method.
|
|
func Recover(stor storage.Storage, o *opt.Options) (db *DB, err error) {
|
|
s, err := newSession(stor, o)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer func() {
|
|
if err != nil {
|
|
s.close()
|
|
s.release()
|
|
}
|
|
}()
|
|
|
|
err = recoverTable(s, o)
|
|
if err != nil {
|
|
return
|
|
}
|
|
return openDB(s)
|
|
}
|
|
|
|
// RecoverFile recovers and opens a DB with missing or corrupted manifest files
|
|
// for the given path. It will ignore any manifest files, valid or not.
|
|
// The DB must already exist or it will returns an error.
|
|
// Also, Recover will ignore ErrorIfMissing and ErrorIfExist options.
|
|
//
|
|
// RecoverFile uses standard file-system backed storage implementation as described
|
|
// in the leveldb/storage package.
|
|
//
|
|
// The returned DB instance is safe for concurrent use.
|
|
// The DB must be closed after use, by calling Close method.
|
|
func RecoverFile(path string, o *opt.Options) (db *DB, err error) {
|
|
stor, err := storage.OpenFile(path, false)
|
|
if err != nil {
|
|
return
|
|
}
|
|
db, err = Recover(stor, o)
|
|
if err != nil {
|
|
stor.Close()
|
|
} else {
|
|
db.closer = stor
|
|
}
|
|
return
|
|
}
|
|
|
|
func recoverTable(s *session, o *opt.Options) error {
|
|
o = dupOptions(o)
|
|
// Mask StrictReader, lets StrictRecovery doing its job.
|
|
o.Strict &= ^opt.StrictReader
|
|
|
|
// Get all tables and sort it by file number.
|
|
fds, err := s.stor.List(storage.TypeTable)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
sortFds(fds)
|
|
|
|
var (
|
|
maxSeq uint64
|
|
recoveredKey, goodKey, corruptedKey, corruptedBlock, droppedTable int
|
|
|
|
// We will drop corrupted table.
|
|
strict = o.GetStrict(opt.StrictRecovery)
|
|
noSync = o.GetNoSync()
|
|
|
|
rec = &sessionRecord{}
|
|
bpool = util.NewBufferPool(o.GetBlockSize() + 5)
|
|
)
|
|
buildTable := func(iter iterator.Iterator) (tmpFd storage.FileDesc, size int64, err error) {
|
|
tmpFd = s.newTemp()
|
|
writer, err := s.stor.Create(tmpFd)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer func() {
|
|
writer.Close()
|
|
if err != nil {
|
|
s.stor.Remove(tmpFd)
|
|
tmpFd = storage.FileDesc{}
|
|
}
|
|
}()
|
|
|
|
// Copy entries.
|
|
tw := table.NewWriter(writer, o)
|
|
for iter.Next() {
|
|
key := iter.Key()
|
|
if validInternalKey(key) {
|
|
err = tw.Append(key, iter.Value())
|
|
if err != nil {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
err = iter.Error()
|
|
if err != nil && !errors.IsCorrupted(err) {
|
|
return
|
|
}
|
|
err = tw.Close()
|
|
if err != nil {
|
|
return
|
|
}
|
|
if !noSync {
|
|
err = writer.Sync()
|
|
if err != nil {
|
|
return
|
|
}
|
|
}
|
|
size = int64(tw.BytesLen())
|
|
return
|
|
}
|
|
recoverTable := func(fd storage.FileDesc) error {
|
|
s.logf("table@recovery recovering @%d", fd.Num)
|
|
reader, err := s.stor.Open(fd)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
var closed bool
|
|
defer func() {
|
|
if !closed {
|
|
reader.Close()
|
|
}
|
|
}()
|
|
|
|
// Get file size.
|
|
size, err := reader.Seek(0, 2)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var (
|
|
tSeq uint64
|
|
tgoodKey, tcorruptedKey, tcorruptedBlock int
|
|
imin, imax []byte
|
|
)
|
|
tr, err := table.NewReader(reader, size, fd, nil, bpool, o)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
iter := tr.NewIterator(nil, nil)
|
|
if itererr, ok := iter.(iterator.ErrorCallbackSetter); ok {
|
|
itererr.SetErrorCallback(func(err error) {
|
|
if errors.IsCorrupted(err) {
|
|
s.logf("table@recovery block corruption @%d %q", fd.Num, err)
|
|
tcorruptedBlock++
|
|
}
|
|
})
|
|
}
|
|
|
|
// Scan the table.
|
|
for iter.Next() {
|
|
key := iter.Key()
|
|
_, seq, _, kerr := parseInternalKey(key)
|
|
if kerr != nil {
|
|
tcorruptedKey++
|
|
continue
|
|
}
|
|
tgoodKey++
|
|
if seq > tSeq {
|
|
tSeq = seq
|
|
}
|
|
if imin == nil {
|
|
imin = append([]byte{}, key...)
|
|
}
|
|
imax = append(imax[:0], key...)
|
|
}
|
|
if err := iter.Error(); err != nil && !errors.IsCorrupted(err) {
|
|
iter.Release()
|
|
return err
|
|
}
|
|
iter.Release()
|
|
|
|
goodKey += tgoodKey
|
|
corruptedKey += tcorruptedKey
|
|
corruptedBlock += tcorruptedBlock
|
|
|
|
if strict && (tcorruptedKey > 0 || tcorruptedBlock > 0) {
|
|
droppedTable++
|
|
s.logf("table@recovery dropped @%d Gk·%d Ck·%d Cb·%d S·%d Q·%d", fd.Num, tgoodKey, tcorruptedKey, tcorruptedBlock, size, tSeq)
|
|
return nil
|
|
}
|
|
|
|
if tgoodKey > 0 {
|
|
if tcorruptedKey > 0 || tcorruptedBlock > 0 {
|
|
// Rebuild the table.
|
|
s.logf("table@recovery rebuilding @%d", fd.Num)
|
|
iter := tr.NewIterator(nil, nil)
|
|
tmpFd, newSize, err := buildTable(iter)
|
|
iter.Release()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
closed = true
|
|
reader.Close()
|
|
if err := s.stor.Rename(tmpFd, fd); err != nil {
|
|
return err
|
|
}
|
|
size = newSize
|
|
}
|
|
if tSeq > maxSeq {
|
|
maxSeq = tSeq
|
|
}
|
|
recoveredKey += tgoodKey
|
|
// Add table to level 0.
|
|
rec.addTable(0, fd.Num, size, imin, imax)
|
|
s.logf("table@recovery recovered @%d Gk·%d Ck·%d Cb·%d S·%d Q·%d", fd.Num, tgoodKey, tcorruptedKey, tcorruptedBlock, size, tSeq)
|
|
} else {
|
|
droppedTable++
|
|
s.logf("table@recovery unrecoverable @%d Ck·%d Cb·%d S·%d", fd.Num, tcorruptedKey, tcorruptedBlock, size)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Recover all tables.
|
|
if len(fds) > 0 {
|
|
s.logf("table@recovery F·%d", len(fds))
|
|
|
|
// Mark file number as used.
|
|
s.markFileNum(fds[len(fds)-1].Num)
|
|
|
|
for _, fd := range fds {
|
|
if err := recoverTable(fd); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
s.logf("table@recovery recovered F·%d N·%d Gk·%d Ck·%d Q·%d", len(fds), recoveredKey, goodKey, corruptedKey, maxSeq)
|
|
}
|
|
|
|
// Set sequence number.
|
|
rec.setSeqNum(maxSeq)
|
|
|
|
// Create new manifest.
|
|
if err := s.create(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Commit.
|
|
return s.commit(rec)
|
|
}
|
|
|
|
func (db *DB) recoverJournal() error {
|
|
// Get all journals and sort it by file number.
|
|
rawFds, err := db.s.stor.List(storage.TypeJournal)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
sortFds(rawFds)
|
|
|
|
// Journals that will be recovered.
|
|
var fds []storage.FileDesc
|
|
for _, fd := range rawFds {
|
|
if fd.Num >= db.s.stJournalNum || fd.Num == db.s.stPrevJournalNum {
|
|
fds = append(fds, fd)
|
|
}
|
|
}
|
|
|
|
var (
|
|
ofd storage.FileDesc // Obsolete file.
|
|
rec = &sessionRecord{}
|
|
)
|
|
|
|
// Recover journals.
|
|
if len(fds) > 0 {
|
|
db.logf("journal@recovery F·%d", len(fds))
|
|
|
|
// Mark file number as used.
|
|
db.s.markFileNum(fds[len(fds)-1].Num)
|
|
|
|
var (
|
|
// Options.
|
|
strict = db.s.o.GetStrict(opt.StrictJournal)
|
|
checksum = db.s.o.GetStrict(opt.StrictJournalChecksum)
|
|
writeBuffer = db.s.o.GetWriteBuffer()
|
|
|
|
jr *journal.Reader
|
|
mdb = memdb.New(db.s.icmp, writeBuffer)
|
|
buf = &util.Buffer{}
|
|
batchSeq uint64
|
|
batchLen int
|
|
)
|
|
|
|
for _, fd := range fds {
|
|
db.logf("journal@recovery recovering @%d", fd.Num)
|
|
|
|
fr, err := db.s.stor.Open(fd)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Create or reset journal reader instance.
|
|
if jr == nil {
|
|
jr = journal.NewReader(fr, dropper{db.s, fd}, strict, checksum)
|
|
} else {
|
|
jr.Reset(fr, dropper{db.s, fd}, strict, checksum)
|
|
}
|
|
|
|
// Flush memdb and remove obsolete journal file.
|
|
if !ofd.Zero() {
|
|
if mdb.Len() > 0 {
|
|
if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
|
|
fr.Close()
|
|
return err
|
|
}
|
|
}
|
|
|
|
rec.setJournalNum(fd.Num)
|
|
rec.setSeqNum(db.seq)
|
|
if err := db.s.commit(rec); err != nil {
|
|
fr.Close()
|
|
return err
|
|
}
|
|
rec.resetAddedTables()
|
|
|
|
db.s.stor.Remove(ofd)
|
|
ofd = storage.FileDesc{}
|
|
}
|
|
|
|
// Replay journal to memdb.
|
|
mdb.Reset()
|
|
for {
|
|
r, err := jr.Next()
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
|
|
fr.Close()
|
|
return errors.SetFd(err, fd)
|
|
}
|
|
|
|
buf.Reset()
|
|
if _, err := buf.ReadFrom(r); err != nil {
|
|
if err == io.ErrUnexpectedEOF {
|
|
// This is error returned due to corruption, with strict == false.
|
|
continue
|
|
}
|
|
|
|
fr.Close()
|
|
return errors.SetFd(err, fd)
|
|
}
|
|
batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb)
|
|
if err != nil {
|
|
if !strict && errors.IsCorrupted(err) {
|
|
db.s.logf("journal error: %v (skipped)", err)
|
|
// We won't apply sequence number as it might be corrupted.
|
|
continue
|
|
}
|
|
|
|
fr.Close()
|
|
return errors.SetFd(err, fd)
|
|
}
|
|
|
|
// Save sequence number.
|
|
db.seq = batchSeq + uint64(batchLen)
|
|
|
|
// Flush it if large enough.
|
|
if mdb.Size() >= writeBuffer {
|
|
if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
|
|
fr.Close()
|
|
return err
|
|
}
|
|
|
|
mdb.Reset()
|
|
}
|
|
}
|
|
|
|
fr.Close()
|
|
ofd = fd
|
|
}
|
|
|
|
// Flush the last memdb.
|
|
if mdb.Len() > 0 {
|
|
if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// Create a new journal.
|
|
if _, err := db.newMem(0); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Commit.
|
|
rec.setJournalNum(db.journalFd.Num)
|
|
rec.setSeqNum(db.seq)
|
|
if err := db.s.commit(rec); err != nil {
|
|
// Close journal on error.
|
|
if db.journal != nil {
|
|
db.journal.Close()
|
|
db.journalWriter.Close()
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Remove the last obsolete journal file.
|
|
if !ofd.Zero() {
|
|
db.s.stor.Remove(ofd)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (db *DB) recoverJournalRO() error {
|
|
// Get all journals and sort it by file number.
|
|
rawFds, err := db.s.stor.List(storage.TypeJournal)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
sortFds(rawFds)
|
|
|
|
// Journals that will be recovered.
|
|
var fds []storage.FileDesc
|
|
for _, fd := range rawFds {
|
|
if fd.Num >= db.s.stJournalNum || fd.Num == db.s.stPrevJournalNum {
|
|
fds = append(fds, fd)
|
|
}
|
|
}
|
|
|
|
var (
|
|
// Options.
|
|
strict = db.s.o.GetStrict(opt.StrictJournal)
|
|
checksum = db.s.o.GetStrict(opt.StrictJournalChecksum)
|
|
writeBuffer = db.s.o.GetWriteBuffer()
|
|
|
|
mdb = memdb.New(db.s.icmp, writeBuffer)
|
|
)
|
|
|
|
// Recover journals.
|
|
if len(fds) > 0 {
|
|
db.logf("journal@recovery RO·Mode F·%d", len(fds))
|
|
|
|
var (
|
|
jr *journal.Reader
|
|
buf = &util.Buffer{}
|
|
batchSeq uint64
|
|
batchLen int
|
|
)
|
|
|
|
for _, fd := range fds {
|
|
db.logf("journal@recovery recovering @%d", fd.Num)
|
|
|
|
fr, err := db.s.stor.Open(fd)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Create or reset journal reader instance.
|
|
if jr == nil {
|
|
jr = journal.NewReader(fr, dropper{db.s, fd}, strict, checksum)
|
|
} else {
|
|
jr.Reset(fr, dropper{db.s, fd}, strict, checksum)
|
|
}
|
|
|
|
// Replay journal to memdb.
|
|
for {
|
|
r, err := jr.Next()
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
|
|
fr.Close()
|
|
return errors.SetFd(err, fd)
|
|
}
|
|
|
|
buf.Reset()
|
|
if _, err := buf.ReadFrom(r); err != nil {
|
|
if err == io.ErrUnexpectedEOF {
|
|
// This is error returned due to corruption, with strict == false.
|
|
continue
|
|
}
|
|
|
|
fr.Close()
|
|
return errors.SetFd(err, fd)
|
|
}
|
|
batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb)
|
|
if err != nil {
|
|
if !strict && errors.IsCorrupted(err) {
|
|
db.s.logf("journal error: %v (skipped)", err)
|
|
// We won't apply sequence number as it might be corrupted.
|
|
continue
|
|
}
|
|
|
|
fr.Close()
|
|
return errors.SetFd(err, fd)
|
|
}
|
|
|
|
// Save sequence number.
|
|
db.seq = batchSeq + uint64(batchLen)
|
|
}
|
|
|
|
fr.Close()
|
|
}
|
|
}
|
|
|
|
// Set memDB.
|
|
db.mem = &memDB{db: db, DB: mdb, ref: 1}
|
|
|
|
return nil
|
|
}
|
|
|
|
func memGet(mdb *memdb.DB, ikey internalKey, icmp *iComparer) (ok bool, mv []byte, err error) {
|
|
mk, mv, err := mdb.Find(ikey)
|
|
if err == nil {
|
|
ukey, _, kt, kerr := parseInternalKey(mk)
|
|
if kerr != nil {
|
|
// Shouldn't have had happen.
|
|
panic(kerr)
|
|
}
|
|
if icmp.uCompare(ukey, ikey.ukey()) == 0 {
|
|
if kt == keyTypeDel {
|
|
return true, nil, ErrNotFound
|
|
}
|
|
return true, mv, nil
|
|
|
|
}
|
|
} else if err != ErrNotFound {
|
|
return true, nil, err
|
|
}
|
|
return
|
|
}
|
|
|
|
func (db *DB) get(auxm *memdb.DB, auxt tFiles, key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, err error) {
|
|
ikey := makeInternalKey(nil, key, seq, keyTypeSeek)
|
|
|
|
if auxm != nil {
|
|
if ok, mv, me := memGet(auxm, ikey, db.s.icmp); ok {
|
|
return append([]byte{}, mv...), me
|
|
}
|
|
}
|
|
|
|
em, fm := db.getMems()
|
|
for _, m := range [...]*memDB{em, fm} {
|
|
if m == nil {
|
|
continue
|
|
}
|
|
defer m.decref()
|
|
|
|
if ok, mv, me := memGet(m.DB, ikey, db.s.icmp); ok {
|
|
return append([]byte{}, mv...), me
|
|
}
|
|
}
|
|
|
|
v := db.s.version()
|
|
value, cSched, err := v.get(auxt, ikey, ro, false)
|
|
v.release()
|
|
if cSched {
|
|
// Trigger table compaction.
|
|
db.compTrigger(db.tcompCmdC)
|
|
}
|
|
return
|
|
}
|
|
|
|
func nilIfNotFound(err error) error {
|
|
if err == ErrNotFound {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (db *DB) has(auxm *memdb.DB, auxt tFiles, key []byte, seq uint64, ro *opt.ReadOptions) (ret bool, err error) {
|
|
ikey := makeInternalKey(nil, key, seq, keyTypeSeek)
|
|
|
|
if auxm != nil {
|
|
if ok, _, me := memGet(auxm, ikey, db.s.icmp); ok {
|
|
return me == nil, nilIfNotFound(me)
|
|
}
|
|
}
|
|
|
|
em, fm := db.getMems()
|
|
for _, m := range [...]*memDB{em, fm} {
|
|
if m == nil {
|
|
continue
|
|
}
|
|
defer m.decref()
|
|
|
|
if ok, _, me := memGet(m.DB, ikey, db.s.icmp); ok {
|
|
return me == nil, nilIfNotFound(me)
|
|
}
|
|
}
|
|
|
|
v := db.s.version()
|
|
_, cSched, err := v.get(auxt, ikey, ro, true)
|
|
v.release()
|
|
if cSched {
|
|
// Trigger table compaction.
|
|
db.compTrigger(db.tcompCmdC)
|
|
}
|
|
if err == nil {
|
|
ret = true
|
|
} else if err == ErrNotFound {
|
|
err = nil
|
|
}
|
|
return
|
|
}
|
|
|
|
// Get gets the value for the given key. It returns ErrNotFound if the
|
|
// DB does not contains the key.
|
|
//
|
|
// The returned slice is its own copy, it is safe to modify the contents
|
|
// of the returned slice.
|
|
// It is safe to modify the contents of the argument after Get returns.
|
|
func (db *DB) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) {
|
|
err = db.ok()
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
se := db.acquireSnapshot()
|
|
defer db.releaseSnapshot(se)
|
|
return db.get(nil, nil, key, se.seq, ro)
|
|
}
|
|
|
|
// Has returns true if the DB does contains the given key.
|
|
//
|
|
// It is safe to modify the contents of the argument after Has returns.
|
|
func (db *DB) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error) {
|
|
err = db.ok()
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
se := db.acquireSnapshot()
|
|
defer db.releaseSnapshot(se)
|
|
return db.has(nil, nil, key, se.seq, ro)
|
|
}
|
|
|
|
// NewIterator returns an iterator for the latest snapshot of the
|
|
// underlying DB.
|
|
// The returned iterator is not safe for concurrent use, but it is safe to use
|
|
// multiple iterators concurrently, with each in a dedicated goroutine.
|
|
// It is also safe to use an iterator concurrently with modifying its
|
|
// underlying DB. The resultant key/value pairs are guaranteed to be
|
|
// consistent.
|
|
//
|
|
// Slice allows slicing the iterator to only contains keys in the given
|
|
// range. A nil Range.Start is treated as a key before all keys in the
|
|
// DB. And a nil Range.Limit is treated as a key after all keys in
|
|
// the DB.
|
|
//
|
|
// WARNING: Any slice returned by interator (e.g. slice returned by calling
|
|
// Iterator.Key() or Iterator.Key() methods), its content should not be modified
|
|
// unless noted otherwise.
|
|
//
|
|
// The iterator must be released after use, by calling Release method.
|
|
//
|
|
// Also read Iterator documentation of the leveldb/iterator package.
|
|
func (db *DB) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
|
|
if err := db.ok(); err != nil {
|
|
return iterator.NewEmptyIterator(err)
|
|
}
|
|
|
|
se := db.acquireSnapshot()
|
|
defer db.releaseSnapshot(se)
|
|
// Iterator holds 'version' lock, 'version' is immutable so snapshot
|
|
// can be released after iterator created.
|
|
return db.newIterator(nil, nil, se.seq, slice, ro)
|
|
}
|
|
|
|
// GetSnapshot returns a latest snapshot of the underlying DB. A snapshot
|
|
// is a frozen snapshot of a DB state at a particular point in time. The
|
|
// content of snapshot are guaranteed to be consistent.
|
|
//
|
|
// The snapshot must be released after use, by calling Release method.
|
|
func (db *DB) GetSnapshot() (*Snapshot, error) {
|
|
if err := db.ok(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return db.newSnapshot(), nil
|
|
}
|
|
|
|
// GetProperty returns value of the given property name.
|
|
//
|
|
// Property names:
|
|
// leveldb.num-files-at-level{n}
|
|
// Returns the number of files at level 'n'.
|
|
// leveldb.stats
|
|
// Returns statistics of the underlying DB.
|
|
// leveldb.iostats
|
|
// Returns statistics of effective disk read and write.
|
|
// leveldb.writedelay
|
|
// Returns cumulative write delay caused by compaction.
|
|
// leveldb.sstables
|
|
// Returns sstables list for each level.
|
|
// leveldb.blockpool
|
|
// Returns block pool stats.
|
|
// leveldb.cachedblock
|
|
// Returns size of cached block.
|
|
// leveldb.openedtables
|
|
// Returns number of opened tables.
|
|
// leveldb.alivesnaps
|
|
// Returns number of alive snapshots.
|
|
// leveldb.aliveiters
|
|
// Returns number of alive iterators.
|
|
func (db *DB) GetProperty(name string) (value string, err error) {
|
|
err = db.ok()
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
const prefix = "leveldb."
|
|
if !strings.HasPrefix(name, prefix) {
|
|
return "", ErrNotFound
|
|
}
|
|
p := name[len(prefix):]
|
|
|
|
v := db.s.version()
|
|
defer v.release()
|
|
|
|
numFilesPrefix := "num-files-at-level"
|
|
switch {
|
|
case strings.HasPrefix(p, numFilesPrefix):
|
|
var level uint
|
|
var rest string
|
|
n, _ := fmt.Sscanf(p[len(numFilesPrefix):], "%d%s", &level, &rest)
|
|
if n != 1 {
|
|
err = ErrNotFound
|
|
} else {
|
|
value = fmt.Sprint(v.tLen(int(level)))
|
|
}
|
|
case p == "stats":
|
|
value = "Compactions\n" +
|
|
" Level | Tables | Size(MB) | Time(sec) | Read(MB) | Write(MB)\n" +
|
|
"-------+------------+---------------+---------------+---------------+---------------\n"
|
|
for level, tables := range v.levels {
|
|
duration, read, write := db.compStats.getStat(level)
|
|
if len(tables) == 0 && duration == 0 {
|
|
continue
|
|
}
|
|
value += fmt.Sprintf(" %3d | %10d | %13.5f | %13.5f | %13.5f | %13.5f\n",
|
|
level, len(tables), float64(tables.size())/1048576.0, duration.Seconds(),
|
|
float64(read)/1048576.0, float64(write)/1048576.0)
|
|
}
|
|
case p == "iostats":
|
|
value = fmt.Sprintf("Read(MB):%.5f Write(MB):%.5f",
|
|
float64(db.s.stor.reads())/1048576.0,
|
|
float64(db.s.stor.writes())/1048576.0)
|
|
case p == "writedelay":
|
|
writeDelayN, writeDelay := atomic.LoadInt32(&db.cWriteDelayN), time.Duration(atomic.LoadInt64(&db.cWriteDelay))
|
|
paused := atomic.LoadInt32(&db.inWritePaused) == 1
|
|
value = fmt.Sprintf("DelayN:%d Delay:%s Paused:%t", writeDelayN, writeDelay, paused)
|
|
case p == "sstables":
|
|
for level, tables := range v.levels {
|
|
value += fmt.Sprintf("--- level %d ---\n", level)
|
|
for _, t := range tables {
|
|
value += fmt.Sprintf("%d:%d[%q .. %q]\n", t.fd.Num, t.size, t.imin, t.imax)
|
|
}
|
|
}
|
|
case p == "blockpool":
|
|
value = fmt.Sprintf("%v", db.s.tops.bpool)
|
|
case p == "cachedblock":
|
|
if db.s.tops.bcache != nil {
|
|
value = fmt.Sprintf("%d", db.s.tops.bcache.Size())
|
|
} else {
|
|
value = "<nil>"
|
|
}
|
|
case p == "openedtables":
|
|
value = fmt.Sprintf("%d", db.s.tops.cache.Size())
|
|
case p == "alivesnaps":
|
|
value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveSnaps))
|
|
case p == "aliveiters":
|
|
value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveIters))
|
|
default:
|
|
err = ErrNotFound
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// DBStats is database statistics.
|
|
type DBStats struct {
|
|
WriteDelayCount int32
|
|
WriteDelayDuration time.Duration
|
|
WritePaused bool
|
|
|
|
AliveSnapshots int32
|
|
AliveIterators int32
|
|
|
|
IOWrite uint64
|
|
IORead uint64
|
|
|
|
BlockCacheSize int
|
|
OpenedTablesCount int
|
|
|
|
LevelSizes []int64
|
|
LevelTablesCounts []int
|
|
LevelRead []int64
|
|
LevelWrite []int64
|
|
LevelDurations []time.Duration
|
|
}
|
|
|
|
// Stats populates s with database statistics.
|
|
func (db *DB) Stats(s *DBStats) error {
|
|
err := db.ok()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
s.IORead = db.s.stor.reads()
|
|
s.IOWrite = db.s.stor.writes()
|
|
s.WriteDelayCount = atomic.LoadInt32(&db.cWriteDelayN)
|
|
s.WriteDelayDuration = time.Duration(atomic.LoadInt64(&db.cWriteDelay))
|
|
s.WritePaused = atomic.LoadInt32(&db.inWritePaused) == 1
|
|
|
|
s.OpenedTablesCount = db.s.tops.cache.Size()
|
|
if db.s.tops.bcache != nil {
|
|
s.BlockCacheSize = db.s.tops.bcache.Size()
|
|
} else {
|
|
s.BlockCacheSize = 0
|
|
}
|
|
|
|
s.AliveIterators = atomic.LoadInt32(&db.aliveIters)
|
|
s.AliveSnapshots = atomic.LoadInt32(&db.aliveSnaps)
|
|
|
|
s.LevelDurations = s.LevelDurations[:0]
|
|
s.LevelRead = s.LevelRead[:0]
|
|
s.LevelWrite = s.LevelWrite[:0]
|
|
s.LevelSizes = s.LevelSizes[:0]
|
|
s.LevelTablesCounts = s.LevelTablesCounts[:0]
|
|
|
|
v := db.s.version()
|
|
defer v.release()
|
|
|
|
for level, tables := range v.levels {
|
|
duration, read, write := db.compStats.getStat(level)
|
|
if len(tables) == 0 && duration == 0 {
|
|
continue
|
|
}
|
|
s.LevelDurations = append(s.LevelDurations, duration)
|
|
s.LevelRead = append(s.LevelRead, read)
|
|
s.LevelWrite = append(s.LevelWrite, write)
|
|
s.LevelSizes = append(s.LevelSizes, tables.size())
|
|
s.LevelTablesCounts = append(s.LevelTablesCounts, len(tables))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// SizeOf calculates approximate sizes of the given key ranges.
|
|
// The length of the returned sizes are equal with the length of the given
|
|
// ranges. The returned sizes measure storage space usage, so if the user
|
|
// data compresses by a factor of ten, the returned sizes will be one-tenth
|
|
// the size of the corresponding user data size.
|
|
// The results may not include the sizes of recently written data.
|
|
func (db *DB) SizeOf(ranges []util.Range) (Sizes, error) {
|
|
if err := db.ok(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
v := db.s.version()
|
|
defer v.release()
|
|
|
|
sizes := make(Sizes, 0, len(ranges))
|
|
for _, r := range ranges {
|
|
imin := makeInternalKey(nil, r.Start, keyMaxSeq, keyTypeSeek)
|
|
imax := makeInternalKey(nil, r.Limit, keyMaxSeq, keyTypeSeek)
|
|
start, err := v.offsetOf(imin)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
limit, err := v.offsetOf(imax)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var size int64
|
|
if limit >= start {
|
|
size = limit - start
|
|
}
|
|
sizes = append(sizes, size)
|
|
}
|
|
|
|
return sizes, nil
|
|
}
|
|
|
|
// Close closes the DB. This will also releases any outstanding snapshot,
|
|
// abort any in-flight compaction and discard open transaction.
|
|
//
|
|
// It is not safe to close a DB until all outstanding iterators are released.
|
|
// It is valid to call Close multiple times. Other methods should not be
|
|
// called after the DB has been closed.
|
|
func (db *DB) Close() error {
|
|
if !db.setClosed() {
|
|
return ErrClosed
|
|
}
|
|
|
|
start := time.Now()
|
|
db.log("db@close closing")
|
|
|
|
// Clear the finalizer.
|
|
runtime.SetFinalizer(db, nil)
|
|
|
|
// Get compaction error.
|
|
var err error
|
|
select {
|
|
case err = <-db.compErrC:
|
|
if err == ErrReadOnly {
|
|
err = nil
|
|
}
|
|
default:
|
|
}
|
|
|
|
// Signal all goroutines.
|
|
close(db.closeC)
|
|
|
|
// Discard open transaction.
|
|
if db.tr != nil {
|
|
db.tr.Discard()
|
|
}
|
|
|
|
// Acquire writer lock.
|
|
db.writeLockC <- struct{}{}
|
|
|
|
// Wait for all gorotines to exit.
|
|
db.closeW.Wait()
|
|
|
|
// Closes journal.
|
|
if db.journal != nil {
|
|
db.journal.Close()
|
|
db.journalWriter.Close()
|
|
db.journal = nil
|
|
db.journalWriter = nil
|
|
}
|
|
|
|
if db.writeDelayN > 0 {
|
|
db.logf("db@write was delayed N·%d T·%v", db.writeDelayN, db.writeDelay)
|
|
}
|
|
|
|
// Close session.
|
|
db.s.close()
|
|
db.logf("db@close done T·%v", time.Since(start))
|
|
db.s.release()
|
|
|
|
if db.closer != nil {
|
|
if err1 := db.closer.Close(); err == nil {
|
|
err = err1
|
|
}
|
|
db.closer = nil
|
|
}
|
|
|
|
// Clear memdbs.
|
|
db.clearMems()
|
|
|
|
return err
|
|
}
|