d28f005552
Fix limit for databases other than sqlite go mod tidy && go mod vendor Remove unneeded break statements Make everything work with the new xorm version Fix xorm logging Fix lint Fix redis init Fix using id field Fix database init for testing Change default database log level Add xorm logger Use const for postgres go mod tidy Merge branch 'master' into update/xorm # Conflicts: # go.mod # go.sum # vendor/modules.txt go mod vendor Fix loading fixtures for postgres Go mod vendor1 Update xorm to version 1 Co-authored-by: kolaente <k@knt.li> Reviewed-on: https://kolaente.dev/vikunja/api/pulls/323
271 lines
5.6 KiB
Go
271 lines
5.6 KiB
Go
// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
|
|
// All rights reserved.
|
|
//
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file.
|
|
|
|
package leveldb
|
|
|
|
import (
|
|
"fmt"
|
|
"sync/atomic"
|
|
|
|
"github.com/syndtr/goleveldb/leveldb/journal"
|
|
"github.com/syndtr/goleveldb/leveldb/storage"
|
|
)
|
|
|
|
// Logging.
|
|
|
|
type dropper struct {
|
|
s *session
|
|
fd storage.FileDesc
|
|
}
|
|
|
|
func (d dropper) Drop(err error) {
|
|
if e, ok := err.(*journal.ErrCorrupted); ok {
|
|
d.s.logf("journal@drop %s-%d S·%s %q", d.fd.Type, d.fd.Num, shortenb(e.Size), e.Reason)
|
|
} else {
|
|
d.s.logf("journal@drop %s-%d %q", d.fd.Type, d.fd.Num, err)
|
|
}
|
|
}
|
|
|
|
func (s *session) log(v ...interface{}) { s.stor.Log(fmt.Sprint(v...)) }
|
|
func (s *session) logf(format string, v ...interface{}) { s.stor.Log(fmt.Sprintf(format, v...)) }
|
|
|
|
// File utils.
|
|
|
|
func (s *session) newTemp() storage.FileDesc {
|
|
num := atomic.AddInt64(&s.stTempFileNum, 1) - 1
|
|
return storage.FileDesc{Type: storage.TypeTemp, Num: num}
|
|
}
|
|
|
|
func (s *session) addFileRef(fd storage.FileDesc, ref int) int {
|
|
ref += s.fileRef[fd.Num]
|
|
if ref > 0 {
|
|
s.fileRef[fd.Num] = ref
|
|
} else if ref == 0 {
|
|
delete(s.fileRef, fd.Num)
|
|
} else {
|
|
panic(fmt.Sprintf("negative ref: %v", fd))
|
|
}
|
|
return ref
|
|
}
|
|
|
|
// Session state.
|
|
|
|
// Get current version. This will incr version ref, must call
|
|
// version.release (exactly once) after use.
|
|
func (s *session) version() *version {
|
|
s.vmu.Lock()
|
|
defer s.vmu.Unlock()
|
|
s.stVersion.incref()
|
|
return s.stVersion
|
|
}
|
|
|
|
func (s *session) tLen(level int) int {
|
|
s.vmu.Lock()
|
|
defer s.vmu.Unlock()
|
|
return s.stVersion.tLen(level)
|
|
}
|
|
|
|
// Set current version to v.
|
|
func (s *session) setVersion(v *version) {
|
|
s.vmu.Lock()
|
|
defer s.vmu.Unlock()
|
|
// Hold by session. It is important to call this first before releasing
|
|
// current version, otherwise the still used files might get released.
|
|
v.incref()
|
|
if s.stVersion != nil {
|
|
// Release current version.
|
|
s.stVersion.releaseNB()
|
|
}
|
|
s.stVersion = v
|
|
}
|
|
|
|
// Get current unused file number.
|
|
func (s *session) nextFileNum() int64 {
|
|
return atomic.LoadInt64(&s.stNextFileNum)
|
|
}
|
|
|
|
// Set current unused file number to num.
|
|
func (s *session) setNextFileNum(num int64) {
|
|
atomic.StoreInt64(&s.stNextFileNum, num)
|
|
}
|
|
|
|
// Mark file number as used.
|
|
func (s *session) markFileNum(num int64) {
|
|
nextFileNum := num + 1
|
|
for {
|
|
old, x := s.stNextFileNum, nextFileNum
|
|
if old > x {
|
|
x = old
|
|
}
|
|
if atomic.CompareAndSwapInt64(&s.stNextFileNum, old, x) {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// Allocate a file number.
|
|
func (s *session) allocFileNum() int64 {
|
|
return atomic.AddInt64(&s.stNextFileNum, 1) - 1
|
|
}
|
|
|
|
// Reuse given file number.
|
|
func (s *session) reuseFileNum(num int64) {
|
|
for {
|
|
old, x := s.stNextFileNum, num
|
|
if old != x+1 {
|
|
x = old
|
|
}
|
|
if atomic.CompareAndSwapInt64(&s.stNextFileNum, old, x) {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// Set compaction ptr at given level; need external synchronization.
|
|
func (s *session) setCompPtr(level int, ik internalKey) {
|
|
if level >= len(s.stCompPtrs) {
|
|
newCompPtrs := make([]internalKey, level+1)
|
|
copy(newCompPtrs, s.stCompPtrs)
|
|
s.stCompPtrs = newCompPtrs
|
|
}
|
|
s.stCompPtrs[level] = append(internalKey{}, ik...)
|
|
}
|
|
|
|
// Get compaction ptr at given level; need external synchronization.
|
|
func (s *session) getCompPtr(level int) internalKey {
|
|
if level >= len(s.stCompPtrs) {
|
|
return nil
|
|
}
|
|
return s.stCompPtrs[level]
|
|
}
|
|
|
|
// Manifest related utils.
|
|
|
|
// Fill given session record obj with current states; need external
|
|
// synchronization.
|
|
func (s *session) fillRecord(r *sessionRecord, snapshot bool) {
|
|
r.setNextFileNum(s.nextFileNum())
|
|
|
|
if snapshot {
|
|
if !r.has(recJournalNum) {
|
|
r.setJournalNum(s.stJournalNum)
|
|
}
|
|
|
|
if !r.has(recSeqNum) {
|
|
r.setSeqNum(s.stSeqNum)
|
|
}
|
|
|
|
for level, ik := range s.stCompPtrs {
|
|
if ik != nil {
|
|
r.addCompPtr(level, ik)
|
|
}
|
|
}
|
|
|
|
r.setComparer(s.icmp.uName())
|
|
}
|
|
}
|
|
|
|
// Mark if record has been committed, this will update session state;
|
|
// need external synchronization.
|
|
func (s *session) recordCommited(rec *sessionRecord) {
|
|
if rec.has(recJournalNum) {
|
|
s.stJournalNum = rec.journalNum
|
|
}
|
|
|
|
if rec.has(recPrevJournalNum) {
|
|
s.stPrevJournalNum = rec.prevJournalNum
|
|
}
|
|
|
|
if rec.has(recSeqNum) {
|
|
s.stSeqNum = rec.seqNum
|
|
}
|
|
|
|
for _, r := range rec.compPtrs {
|
|
s.setCompPtr(r.level, internalKey(r.ikey))
|
|
}
|
|
}
|
|
|
|
// Create a new manifest file; need external synchronization.
|
|
func (s *session) newManifest(rec *sessionRecord, v *version) (err error) {
|
|
fd := storage.FileDesc{Type: storage.TypeManifest, Num: s.allocFileNum()}
|
|
writer, err := s.stor.Create(fd)
|
|
if err != nil {
|
|
return
|
|
}
|
|
jw := journal.NewWriter(writer)
|
|
|
|
if v == nil {
|
|
v = s.version()
|
|
defer v.release()
|
|
}
|
|
if rec == nil {
|
|
rec = &sessionRecord{}
|
|
}
|
|
s.fillRecord(rec, true)
|
|
v.fillRecord(rec)
|
|
|
|
defer func() {
|
|
if err == nil {
|
|
s.recordCommited(rec)
|
|
if s.manifest != nil {
|
|
s.manifest.Close()
|
|
}
|
|
if s.manifestWriter != nil {
|
|
s.manifestWriter.Close()
|
|
}
|
|
if !s.manifestFd.Zero() {
|
|
s.stor.Remove(s.manifestFd)
|
|
}
|
|
s.manifestFd = fd
|
|
s.manifestWriter = writer
|
|
s.manifest = jw
|
|
} else {
|
|
writer.Close()
|
|
s.stor.Remove(fd)
|
|
s.reuseFileNum(fd.Num)
|
|
}
|
|
}()
|
|
|
|
w, err := jw.Next()
|
|
if err != nil {
|
|
return
|
|
}
|
|
err = rec.encode(w)
|
|
if err != nil {
|
|
return
|
|
}
|
|
err = jw.Flush()
|
|
if err != nil {
|
|
return
|
|
}
|
|
err = s.stor.SetMeta(fd)
|
|
return
|
|
}
|
|
|
|
// Flush record to disk.
|
|
func (s *session) flushManifest(rec *sessionRecord) (err error) {
|
|
s.fillRecord(rec, false)
|
|
w, err := s.manifest.Next()
|
|
if err != nil {
|
|
return
|
|
}
|
|
err = rec.encode(w)
|
|
if err != nil {
|
|
return
|
|
}
|
|
err = s.manifest.Flush()
|
|
if err != nil {
|
|
return
|
|
}
|
|
if !s.o.GetNoSync() {
|
|
err = s.manifestWriter.Sync()
|
|
if err != nil {
|
|
return
|
|
}
|
|
}
|
|
s.recordCommited(rec)
|
|
return
|
|
}
|