d28f005552
Fix limit for databases other than sqlite go mod tidy && go mod vendor Remove unneeded break statements Make everything work with the new xorm version Fix xorm logging Fix lint Fix redis init Fix using id field Fix database init for testing Change default database log level Add xorm logger Use const for postgres go mod tidy Merge branch 'master' into update/xorm # Conflicts: # go.mod # go.sum # vendor/modules.txt go mod vendor Fix loading fixtures for postgres Go mod vendor1 Update xorm to version 1 Co-authored-by: kolaente <k@knt.li> Reviewed-on: https://kolaente.dev/vikunja/api/pulls/323
464 lines
11 KiB
Go
464 lines
11 KiB
Go
// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
|
|
// All rights reserved.
|
|
//
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file.
|
|
|
|
package leveldb
|
|
|
|
import (
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/syndtr/goleveldb/leveldb/memdb"
|
|
"github.com/syndtr/goleveldb/leveldb/opt"
|
|
"github.com/syndtr/goleveldb/leveldb/util"
|
|
)
|
|
|
|
func (db *DB) writeJournal(batches []*Batch, seq uint64, sync bool) error {
|
|
wr, err := db.journal.Next()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := writeBatchesWithHeader(wr, batches, seq); err != nil {
|
|
return err
|
|
}
|
|
if err := db.journal.Flush(); err != nil {
|
|
return err
|
|
}
|
|
if sync {
|
|
return db.journalWriter.Sync()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (db *DB) rotateMem(n int, wait bool) (mem *memDB, err error) {
|
|
retryLimit := 3
|
|
retry:
|
|
// Wait for pending memdb compaction.
|
|
err = db.compTriggerWait(db.mcompCmdC)
|
|
if err != nil {
|
|
return
|
|
}
|
|
retryLimit--
|
|
|
|
// Create new memdb and journal.
|
|
mem, err = db.newMem(n)
|
|
if err != nil {
|
|
if err == errHasFrozenMem {
|
|
if retryLimit <= 0 {
|
|
panic("BUG: still has frozen memdb")
|
|
}
|
|
goto retry
|
|
}
|
|
return
|
|
}
|
|
|
|
// Schedule memdb compaction.
|
|
if wait {
|
|
err = db.compTriggerWait(db.mcompCmdC)
|
|
} else {
|
|
db.compTrigger(db.mcompCmdC)
|
|
}
|
|
return
|
|
}
|
|
|
|
func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) {
|
|
delayed := false
|
|
slowdownTrigger := db.s.o.GetWriteL0SlowdownTrigger()
|
|
pauseTrigger := db.s.o.GetWriteL0PauseTrigger()
|
|
flush := func() (retry bool) {
|
|
mdb = db.getEffectiveMem()
|
|
if mdb == nil {
|
|
err = ErrClosed
|
|
return false
|
|
}
|
|
defer func() {
|
|
if retry {
|
|
mdb.decref()
|
|
mdb = nil
|
|
}
|
|
}()
|
|
tLen := db.s.tLen(0)
|
|
mdbFree = mdb.Free()
|
|
switch {
|
|
case tLen >= slowdownTrigger && !delayed:
|
|
delayed = true
|
|
time.Sleep(time.Millisecond)
|
|
case mdbFree >= n:
|
|
return false
|
|
case tLen >= pauseTrigger:
|
|
delayed = true
|
|
// Set the write paused flag explicitly.
|
|
atomic.StoreInt32(&db.inWritePaused, 1)
|
|
err = db.compTriggerWait(db.tcompCmdC)
|
|
// Unset the write paused flag.
|
|
atomic.StoreInt32(&db.inWritePaused, 0)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
default:
|
|
// Allow memdb to grow if it has no entry.
|
|
if mdb.Len() == 0 {
|
|
mdbFree = n
|
|
} else {
|
|
mdb.decref()
|
|
mdb, err = db.rotateMem(n, false)
|
|
if err == nil {
|
|
mdbFree = mdb.Free()
|
|
} else {
|
|
mdbFree = 0
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
start := time.Now()
|
|
for flush() {
|
|
}
|
|
if delayed {
|
|
db.writeDelay += time.Since(start)
|
|
db.writeDelayN++
|
|
} else if db.writeDelayN > 0 {
|
|
db.logf("db@write was delayed N·%d T·%v", db.writeDelayN, db.writeDelay)
|
|
atomic.AddInt32(&db.cWriteDelayN, int32(db.writeDelayN))
|
|
atomic.AddInt64(&db.cWriteDelay, int64(db.writeDelay))
|
|
db.writeDelay = 0
|
|
db.writeDelayN = 0
|
|
}
|
|
return
|
|
}
|
|
|
|
type writeMerge struct {
|
|
sync bool
|
|
batch *Batch
|
|
keyType keyType
|
|
key, value []byte
|
|
}
|
|
|
|
func (db *DB) unlockWrite(overflow bool, merged int, err error) {
|
|
for i := 0; i < merged; i++ {
|
|
db.writeAckC <- err
|
|
}
|
|
if overflow {
|
|
// Pass lock to the next write (that failed to merge).
|
|
db.writeMergedC <- false
|
|
} else {
|
|
// Release lock.
|
|
<-db.writeLockC
|
|
}
|
|
}
|
|
|
|
// ourBatch is batch that we can modify.
|
|
func (db *DB) writeLocked(batch, ourBatch *Batch, merge, sync bool) error {
|
|
// Try to flush memdb. This method would also trying to throttle writes
|
|
// if it is too fast and compaction cannot catch-up.
|
|
mdb, mdbFree, err := db.flush(batch.internalLen)
|
|
if err != nil {
|
|
db.unlockWrite(false, 0, err)
|
|
return err
|
|
}
|
|
defer mdb.decref()
|
|
|
|
var (
|
|
overflow bool
|
|
merged int
|
|
batches = []*Batch{batch}
|
|
)
|
|
|
|
if merge {
|
|
// Merge limit.
|
|
var mergeLimit int
|
|
if batch.internalLen > 128<<10 {
|
|
mergeLimit = (1 << 20) - batch.internalLen
|
|
} else {
|
|
mergeLimit = 128 << 10
|
|
}
|
|
mergeCap := mdbFree - batch.internalLen
|
|
if mergeLimit > mergeCap {
|
|
mergeLimit = mergeCap
|
|
}
|
|
|
|
merge:
|
|
for mergeLimit > 0 {
|
|
select {
|
|
case incoming := <-db.writeMergeC:
|
|
if incoming.batch != nil {
|
|
// Merge batch.
|
|
if incoming.batch.internalLen > mergeLimit {
|
|
overflow = true
|
|
break merge
|
|
}
|
|
batches = append(batches, incoming.batch)
|
|
mergeLimit -= incoming.batch.internalLen
|
|
} else {
|
|
// Merge put.
|
|
internalLen := len(incoming.key) + len(incoming.value) + 8
|
|
if internalLen > mergeLimit {
|
|
overflow = true
|
|
break merge
|
|
}
|
|
if ourBatch == nil {
|
|
ourBatch = db.batchPool.Get().(*Batch)
|
|
ourBatch.Reset()
|
|
batches = append(batches, ourBatch)
|
|
}
|
|
// We can use same batch since concurrent write doesn't
|
|
// guarantee write order.
|
|
ourBatch.appendRec(incoming.keyType, incoming.key, incoming.value)
|
|
mergeLimit -= internalLen
|
|
}
|
|
sync = sync || incoming.sync
|
|
merged++
|
|
db.writeMergedC <- true
|
|
|
|
default:
|
|
break merge
|
|
}
|
|
}
|
|
}
|
|
|
|
// Release ourBatch if any.
|
|
if ourBatch != nil {
|
|
defer db.batchPool.Put(ourBatch)
|
|
}
|
|
|
|
// Seq number.
|
|
seq := db.seq + 1
|
|
|
|
// Write journal.
|
|
if err := db.writeJournal(batches, seq, sync); err != nil {
|
|
db.unlockWrite(overflow, merged, err)
|
|
return err
|
|
}
|
|
|
|
// Put batches.
|
|
for _, batch := range batches {
|
|
if err := batch.putMem(seq, mdb.DB); err != nil {
|
|
panic(err)
|
|
}
|
|
seq += uint64(batch.Len())
|
|
}
|
|
|
|
// Incr seq number.
|
|
db.addSeq(uint64(batchesLen(batches)))
|
|
|
|
// Rotate memdb if it's reach the threshold.
|
|
if batch.internalLen >= mdbFree {
|
|
db.rotateMem(0, false)
|
|
}
|
|
|
|
db.unlockWrite(overflow, merged, nil)
|
|
return nil
|
|
}
|
|
|
|
// Write apply the given batch to the DB. The batch records will be applied
|
|
// sequentially. Write might be used concurrently, when used concurrently and
|
|
// batch is small enough, write will try to merge the batches. Set NoWriteMerge
|
|
// option to true to disable write merge.
|
|
//
|
|
// It is safe to modify the contents of the arguments after Write returns but
|
|
// not before. Write will not modify content of the batch.
|
|
func (db *DB) Write(batch *Batch, wo *opt.WriteOptions) error {
|
|
if err := db.ok(); err != nil || batch == nil || batch.Len() == 0 {
|
|
return err
|
|
}
|
|
|
|
// If the batch size is larger than write buffer, it may justified to write
|
|
// using transaction instead. Using transaction the batch will be written
|
|
// into tables directly, skipping the journaling.
|
|
if batch.internalLen > db.s.o.GetWriteBuffer() && !db.s.o.GetDisableLargeBatchTransaction() {
|
|
tr, err := db.OpenTransaction()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := tr.Write(batch, wo); err != nil {
|
|
tr.Discard()
|
|
return err
|
|
}
|
|
return tr.Commit()
|
|
}
|
|
|
|
merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge()
|
|
sync := wo.GetSync() && !db.s.o.GetNoSync()
|
|
|
|
// Acquire write lock.
|
|
if merge {
|
|
select {
|
|
case db.writeMergeC <- writeMerge{sync: sync, batch: batch}:
|
|
if <-db.writeMergedC {
|
|
// Write is merged.
|
|
return <-db.writeAckC
|
|
}
|
|
// Write is not merged, the write lock is handed to us. Continue.
|
|
case db.writeLockC <- struct{}{}:
|
|
// Write lock acquired.
|
|
case err := <-db.compPerErrC:
|
|
// Compaction error.
|
|
return err
|
|
case <-db.closeC:
|
|
// Closed
|
|
return ErrClosed
|
|
}
|
|
} else {
|
|
select {
|
|
case db.writeLockC <- struct{}{}:
|
|
// Write lock acquired.
|
|
case err := <-db.compPerErrC:
|
|
// Compaction error.
|
|
return err
|
|
case <-db.closeC:
|
|
// Closed
|
|
return ErrClosed
|
|
}
|
|
}
|
|
|
|
return db.writeLocked(batch, nil, merge, sync)
|
|
}
|
|
|
|
func (db *DB) putRec(kt keyType, key, value []byte, wo *opt.WriteOptions) error {
|
|
if err := db.ok(); err != nil {
|
|
return err
|
|
}
|
|
|
|
merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge()
|
|
sync := wo.GetSync() && !db.s.o.GetNoSync()
|
|
|
|
// Acquire write lock.
|
|
if merge {
|
|
select {
|
|
case db.writeMergeC <- writeMerge{sync: sync, keyType: kt, key: key, value: value}:
|
|
if <-db.writeMergedC {
|
|
// Write is merged.
|
|
return <-db.writeAckC
|
|
}
|
|
// Write is not merged, the write lock is handed to us. Continue.
|
|
case db.writeLockC <- struct{}{}:
|
|
// Write lock acquired.
|
|
case err := <-db.compPerErrC:
|
|
// Compaction error.
|
|
return err
|
|
case <-db.closeC:
|
|
// Closed
|
|
return ErrClosed
|
|
}
|
|
} else {
|
|
select {
|
|
case db.writeLockC <- struct{}{}:
|
|
// Write lock acquired.
|
|
case err := <-db.compPerErrC:
|
|
// Compaction error.
|
|
return err
|
|
case <-db.closeC:
|
|
// Closed
|
|
return ErrClosed
|
|
}
|
|
}
|
|
|
|
batch := db.batchPool.Get().(*Batch)
|
|
batch.Reset()
|
|
batch.appendRec(kt, key, value)
|
|
return db.writeLocked(batch, batch, merge, sync)
|
|
}
|
|
|
|
// Put sets the value for the given key. It overwrites any previous value
|
|
// for that key; a DB is not a multi-map. Write merge also applies for Put, see
|
|
// Write.
|
|
//
|
|
// It is safe to modify the contents of the arguments after Put returns but not
|
|
// before.
|
|
func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error {
|
|
return db.putRec(keyTypeVal, key, value, wo)
|
|
}
|
|
|
|
// Delete deletes the value for the given key. Delete will not returns error if
|
|
// key doesn't exist. Write merge also applies for Delete, see Write.
|
|
//
|
|
// It is safe to modify the contents of the arguments after Delete returns but
|
|
// not before.
|
|
func (db *DB) Delete(key []byte, wo *opt.WriteOptions) error {
|
|
return db.putRec(keyTypeDel, key, nil, wo)
|
|
}
|
|
|
|
func isMemOverlaps(icmp *iComparer, mem *memdb.DB, min, max []byte) bool {
|
|
iter := mem.NewIterator(nil)
|
|
defer iter.Release()
|
|
return (max == nil || (iter.First() && icmp.uCompare(max, internalKey(iter.Key()).ukey()) >= 0)) &&
|
|
(min == nil || (iter.Last() && icmp.uCompare(min, internalKey(iter.Key()).ukey()) <= 0))
|
|
}
|
|
|
|
// CompactRange compacts the underlying DB for the given key range.
|
|
// In particular, deleted and overwritten versions are discarded,
|
|
// and the data is rearranged to reduce the cost of operations
|
|
// needed to access the data. This operation should typically only
|
|
// be invoked by users who understand the underlying implementation.
|
|
//
|
|
// A nil Range.Start is treated as a key before all keys in the DB.
|
|
// And a nil Range.Limit is treated as a key after all keys in the DB.
|
|
// Therefore if both is nil then it will compact entire DB.
|
|
func (db *DB) CompactRange(r util.Range) error {
|
|
if err := db.ok(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Lock writer.
|
|
select {
|
|
case db.writeLockC <- struct{}{}:
|
|
case err := <-db.compPerErrC:
|
|
return err
|
|
case <-db.closeC:
|
|
return ErrClosed
|
|
}
|
|
|
|
// Check for overlaps in memdb.
|
|
mdb := db.getEffectiveMem()
|
|
if mdb == nil {
|
|
return ErrClosed
|
|
}
|
|
defer mdb.decref()
|
|
if isMemOverlaps(db.s.icmp, mdb.DB, r.Start, r.Limit) {
|
|
// Memdb compaction.
|
|
if _, err := db.rotateMem(0, false); err != nil {
|
|
<-db.writeLockC
|
|
return err
|
|
}
|
|
<-db.writeLockC
|
|
if err := db.compTriggerWait(db.mcompCmdC); err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
<-db.writeLockC
|
|
}
|
|
|
|
// Table compaction.
|
|
return db.compTriggerRange(db.tcompCmdC, -1, r.Start, r.Limit)
|
|
}
|
|
|
|
// SetReadOnly makes DB read-only. It will stay read-only until reopened.
|
|
func (db *DB) SetReadOnly() error {
|
|
if err := db.ok(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Lock writer.
|
|
select {
|
|
case db.writeLockC <- struct{}{}:
|
|
db.compWriteLocking = true
|
|
case err := <-db.compPerErrC:
|
|
return err
|
|
case <-db.closeC:
|
|
return ErrClosed
|
|
}
|
|
|
|
// Set compaction read-only.
|
|
select {
|
|
case db.compErrSetC <- ErrReadOnly:
|
|
case perr := <-db.compPerErrC:
|
|
return perr
|
|
case <-db.closeC:
|
|
return ErrClosed
|
|
}
|
|
|
|
return nil
|
|
}
|