d28f005552
Fix limit for databases other than sqlite go mod tidy && go mod vendor Remove unneeded break statements Make everything work with the new xorm version Fix xorm logging Fix lint Fix redis init Fix using id field Fix database init for testing Change default database log level Add xorm logger Use const for postgres go mod tidy Merge branch 'master' into update/xorm # Conflicts: # go.mod # go.sum # vendor/modules.txt go mod vendor Fix loading fixtures for postgres Go mod vendor1 Update xorm to version 1 Co-authored-by: kolaente <k@knt.li> Reviewed-on: https://kolaente.dev/vikunja/api/pulls/323
329 lines
8.9 KiB
Go
329 lines
8.9 KiB
Go
// Copyright (c) 2016, Suryandaru Triandana <syndtr@gmail.com>
|
|
// All rights reserved.
|
|
//
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file.
|
|
|
|
package leveldb
|
|
|
|
import (
|
|
"errors"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/syndtr/goleveldb/leveldb/iterator"
|
|
"github.com/syndtr/goleveldb/leveldb/opt"
|
|
"github.com/syndtr/goleveldb/leveldb/util"
|
|
)
|
|
|
|
var errTransactionDone = errors.New("leveldb: transaction already closed")
|
|
|
|
// Transaction is the transaction handle.
|
|
type Transaction struct {
|
|
db *DB
|
|
lk sync.RWMutex
|
|
seq uint64
|
|
mem *memDB
|
|
tables tFiles
|
|
ikScratch []byte
|
|
rec sessionRecord
|
|
stats cStatStaging
|
|
closed bool
|
|
}
|
|
|
|
// Get gets the value for the given key. It returns ErrNotFound if the
|
|
// DB does not contains the key.
|
|
//
|
|
// The returned slice is its own copy, it is safe to modify the contents
|
|
// of the returned slice.
|
|
// It is safe to modify the contents of the argument after Get returns.
|
|
func (tr *Transaction) Get(key []byte, ro *opt.ReadOptions) ([]byte, error) {
|
|
tr.lk.RLock()
|
|
defer tr.lk.RUnlock()
|
|
if tr.closed {
|
|
return nil, errTransactionDone
|
|
}
|
|
return tr.db.get(tr.mem.DB, tr.tables, key, tr.seq, ro)
|
|
}
|
|
|
|
// Has returns true if the DB does contains the given key.
|
|
//
|
|
// It is safe to modify the contents of the argument after Has returns.
|
|
func (tr *Transaction) Has(key []byte, ro *opt.ReadOptions) (bool, error) {
|
|
tr.lk.RLock()
|
|
defer tr.lk.RUnlock()
|
|
if tr.closed {
|
|
return false, errTransactionDone
|
|
}
|
|
return tr.db.has(tr.mem.DB, tr.tables, key, tr.seq, ro)
|
|
}
|
|
|
|
// NewIterator returns an iterator for the latest snapshot of the transaction.
|
|
// The returned iterator is not safe for concurrent use, but it is safe to use
|
|
// multiple iterators concurrently, with each in a dedicated goroutine.
|
|
// It is also safe to use an iterator concurrently while writes to the
|
|
// transaction. The resultant key/value pairs are guaranteed to be consistent.
|
|
//
|
|
// Slice allows slicing the iterator to only contains keys in the given
|
|
// range. A nil Range.Start is treated as a key before all keys in the
|
|
// DB. And a nil Range.Limit is treated as a key after all keys in
|
|
// the DB.
|
|
//
|
|
// WARNING: Any slice returned by interator (e.g. slice returned by calling
|
|
// Iterator.Key() or Iterator.Key() methods), its content should not be modified
|
|
// unless noted otherwise.
|
|
//
|
|
// The iterator must be released after use, by calling Release method.
|
|
//
|
|
// Also read Iterator documentation of the leveldb/iterator package.
|
|
func (tr *Transaction) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
|
|
tr.lk.RLock()
|
|
defer tr.lk.RUnlock()
|
|
if tr.closed {
|
|
return iterator.NewEmptyIterator(errTransactionDone)
|
|
}
|
|
tr.mem.incref()
|
|
return tr.db.newIterator(tr.mem, tr.tables, tr.seq, slice, ro)
|
|
}
|
|
|
|
func (tr *Transaction) flush() error {
|
|
// Flush memdb.
|
|
if tr.mem.Len() != 0 {
|
|
tr.stats.startTimer()
|
|
iter := tr.mem.NewIterator(nil)
|
|
t, n, err := tr.db.s.tops.createFrom(iter)
|
|
iter.Release()
|
|
tr.stats.stopTimer()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if tr.mem.getref() == 1 {
|
|
tr.mem.Reset()
|
|
} else {
|
|
tr.mem.decref()
|
|
tr.mem = tr.db.mpoolGet(0)
|
|
tr.mem.incref()
|
|
}
|
|
tr.tables = append(tr.tables, t)
|
|
tr.rec.addTableFile(0, t)
|
|
tr.stats.write += t.size
|
|
tr.db.logf("transaction@flush created L0@%d N·%d S·%s %q:%q", t.fd.Num, n, shortenb(int(t.size)), t.imin, t.imax)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (tr *Transaction) put(kt keyType, key, value []byte) error {
|
|
tr.ikScratch = makeInternalKey(tr.ikScratch, key, tr.seq+1, kt)
|
|
if tr.mem.Free() < len(tr.ikScratch)+len(value) {
|
|
if err := tr.flush(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if err := tr.mem.Put(tr.ikScratch, value); err != nil {
|
|
return err
|
|
}
|
|
tr.seq++
|
|
return nil
|
|
}
|
|
|
|
// Put sets the value for the given key. It overwrites any previous value
|
|
// for that key; a DB is not a multi-map.
|
|
// Please note that the transaction is not compacted until committed, so if you
|
|
// writes 10 same keys, then those 10 same keys are in the transaction.
|
|
//
|
|
// It is safe to modify the contents of the arguments after Put returns.
|
|
func (tr *Transaction) Put(key, value []byte, wo *opt.WriteOptions) error {
|
|
tr.lk.Lock()
|
|
defer tr.lk.Unlock()
|
|
if tr.closed {
|
|
return errTransactionDone
|
|
}
|
|
return tr.put(keyTypeVal, key, value)
|
|
}
|
|
|
|
// Delete deletes the value for the given key.
|
|
// Please note that the transaction is not compacted until committed, so if you
|
|
// writes 10 same keys, then those 10 same keys are in the transaction.
|
|
//
|
|
// It is safe to modify the contents of the arguments after Delete returns.
|
|
func (tr *Transaction) Delete(key []byte, wo *opt.WriteOptions) error {
|
|
tr.lk.Lock()
|
|
defer tr.lk.Unlock()
|
|
if tr.closed {
|
|
return errTransactionDone
|
|
}
|
|
return tr.put(keyTypeDel, key, nil)
|
|
}
|
|
|
|
// Write apply the given batch to the transaction. The batch will be applied
|
|
// sequentially.
|
|
// Please note that the transaction is not compacted until committed, so if you
|
|
// writes 10 same keys, then those 10 same keys are in the transaction.
|
|
//
|
|
// It is safe to modify the contents of the arguments after Write returns.
|
|
func (tr *Transaction) Write(b *Batch, wo *opt.WriteOptions) error {
|
|
if b == nil || b.Len() == 0 {
|
|
return nil
|
|
}
|
|
|
|
tr.lk.Lock()
|
|
defer tr.lk.Unlock()
|
|
if tr.closed {
|
|
return errTransactionDone
|
|
}
|
|
return b.replayInternal(func(i int, kt keyType, k, v []byte) error {
|
|
return tr.put(kt, k, v)
|
|
})
|
|
}
|
|
|
|
func (tr *Transaction) setDone() {
|
|
tr.closed = true
|
|
tr.db.tr = nil
|
|
tr.mem.decref()
|
|
<-tr.db.writeLockC
|
|
}
|
|
|
|
// Commit commits the transaction. If error is not nil, then the transaction is
|
|
// not committed, it can then either be retried or discarded.
|
|
//
|
|
// Other methods should not be called after transaction has been committed.
|
|
func (tr *Transaction) Commit() error {
|
|
if err := tr.db.ok(); err != nil {
|
|
return err
|
|
}
|
|
|
|
tr.lk.Lock()
|
|
defer tr.lk.Unlock()
|
|
if tr.closed {
|
|
return errTransactionDone
|
|
}
|
|
if err := tr.flush(); err != nil {
|
|
// Return error, lets user decide either to retry or discard
|
|
// transaction.
|
|
return err
|
|
}
|
|
if len(tr.tables) != 0 {
|
|
// Committing transaction.
|
|
tr.rec.setSeqNum(tr.seq)
|
|
tr.db.compCommitLk.Lock()
|
|
tr.stats.startTimer()
|
|
var cerr error
|
|
for retry := 0; retry < 3; retry++ {
|
|
cerr = tr.db.s.commit(&tr.rec)
|
|
if cerr != nil {
|
|
tr.db.logf("transaction@commit error R·%d %q", retry, cerr)
|
|
select {
|
|
case <-time.After(time.Second):
|
|
case <-tr.db.closeC:
|
|
tr.db.logf("transaction@commit exiting")
|
|
tr.db.compCommitLk.Unlock()
|
|
return cerr
|
|
}
|
|
} else {
|
|
// Success. Set db.seq.
|
|
tr.db.setSeq(tr.seq)
|
|
break
|
|
}
|
|
}
|
|
tr.stats.stopTimer()
|
|
if cerr != nil {
|
|
// Return error, lets user decide either to retry or discard
|
|
// transaction.
|
|
return cerr
|
|
}
|
|
|
|
// Update compaction stats. This is safe as long as we hold compCommitLk.
|
|
tr.db.compStats.addStat(0, &tr.stats)
|
|
|
|
// Trigger table auto-compaction.
|
|
tr.db.compTrigger(tr.db.tcompCmdC)
|
|
tr.db.compCommitLk.Unlock()
|
|
|
|
// Additionally, wait compaction when certain threshold reached.
|
|
// Ignore error, returns error only if transaction can't be committed.
|
|
tr.db.waitCompaction()
|
|
}
|
|
// Only mark as done if transaction committed successfully.
|
|
tr.setDone()
|
|
return nil
|
|
}
|
|
|
|
func (tr *Transaction) discard() {
|
|
// Discard transaction.
|
|
for _, t := range tr.tables {
|
|
tr.db.logf("transaction@discard @%d", t.fd.Num)
|
|
if err1 := tr.db.s.stor.Remove(t.fd); err1 == nil {
|
|
tr.db.s.reuseFileNum(t.fd.Num)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Discard discards the transaction.
|
|
//
|
|
// Other methods should not be called after transaction has been discarded.
|
|
func (tr *Transaction) Discard() {
|
|
tr.lk.Lock()
|
|
if !tr.closed {
|
|
tr.discard()
|
|
tr.setDone()
|
|
}
|
|
tr.lk.Unlock()
|
|
}
|
|
|
|
func (db *DB) waitCompaction() error {
|
|
if db.s.tLen(0) >= db.s.o.GetWriteL0PauseTrigger() {
|
|
return db.compTriggerWait(db.tcompCmdC)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// OpenTransaction opens an atomic DB transaction. Only one transaction can be
|
|
// opened at a time. Subsequent call to Write and OpenTransaction will be blocked
|
|
// until in-flight transaction is committed or discarded.
|
|
// The returned transaction handle is safe for concurrent use.
|
|
//
|
|
// Transaction is expensive and can overwhelm compaction, especially if
|
|
// transaction size is small. Use with caution.
|
|
//
|
|
// The transaction must be closed once done, either by committing or discarding
|
|
// the transaction.
|
|
// Closing the DB will discard open transaction.
|
|
func (db *DB) OpenTransaction() (*Transaction, error) {
|
|
if err := db.ok(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// The write happen synchronously.
|
|
select {
|
|
case db.writeLockC <- struct{}{}:
|
|
case err := <-db.compPerErrC:
|
|
return nil, err
|
|
case <-db.closeC:
|
|
return nil, ErrClosed
|
|
}
|
|
|
|
if db.tr != nil {
|
|
panic("leveldb: has open transaction")
|
|
}
|
|
|
|
// Flush current memdb.
|
|
if db.mem != nil && db.mem.Len() != 0 {
|
|
if _, err := db.rotateMem(0, true); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// Wait compaction when certain threshold reached.
|
|
if err := db.waitCompaction(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
tr := &Transaction{
|
|
db: db,
|
|
seq: db.seq,
|
|
mem: db.mpoolGet(0),
|
|
}
|
|
tr.mem.incref()
|
|
db.tr = tr
|
|
return tr, nil
|
|
}
|