492 lines
11 KiB
Go
Raw Normal View History

// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
// All rights reserved.
//
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package leveldb
import (
"fmt"
"sync/atomic"
"time"
"github.com/syndtr/goleveldb/leveldb/journal"
"github.com/syndtr/goleveldb/leveldb/storage"
)
// Logging.
type dropper struct {
s *session
fd storage.FileDesc
}
func (d dropper) Drop(err error) {
if e, ok := err.(*journal.ErrCorrupted); ok {
d.s.logf("journal@drop %s-%d S·%s %q", d.fd.Type, d.fd.Num, shortenb(int64(e.Size)), e.Reason)
} else {
d.s.logf("journal@drop %s-%d %q", d.fd.Type, d.fd.Num, err)
}
}
func (s *session) log(v ...interface{}) { s.stor.Log(fmt.Sprint(v...)) }
func (s *session) logf(format string, v ...interface{}) { s.stor.Log(fmt.Sprintf(format, v...)) }
// File utils.
func (s *session) newTemp() storage.FileDesc {
num := atomic.AddInt64(&s.stTempFileNum, 1) - 1
2019-06-09 09:24:20 +02:00
return storage.FileDesc{Type: storage.TypeTemp, Num: num}
}
// Session state.
const (
// maxCachedNumber represents the maximum number of version tasks
// that can be cached in the ref loop.
maxCachedNumber = 256
// maxCachedTime represents the maximum time for ref loop to cache
// a version task.
maxCachedTime = 5 * time.Minute
)
// vDelta indicates the change information between the next version
// and the currently specified version
type vDelta struct {
vid int64
added []int64
deleted []int64
}
// vTask defines a version task for either reference or release.
type vTask struct {
vid int64
files []tFiles
created time.Time
}
func (s *session) refLoop() {
var (
fileRef = make(map[int64]int) // Table file reference counter
ref = make(map[int64]*vTask) // Current referencing version store
deltas = make(map[int64]*vDelta)
referenced = make(map[int64]struct{})
released = make(map[int64]*vDelta) // Released version that waiting for processing
abandoned = make(map[int64]struct{}) // Abandoned version id
next, last int64
)
// addFileRef adds file reference counter with specified file number and
// reference value
addFileRef := func(fnum int64, ref int) int {
ref += fileRef[fnum]
if ref > 0 {
fileRef[fnum] = ref
} else if ref == 0 {
delete(fileRef, fnum)
} else {
panic(fmt.Sprintf("negative ref: %v", fnum))
}
return ref
}
// skipAbandoned skips useless abandoned version id.
skipAbandoned := func() bool {
if _, exist := abandoned[next]; exist {
delete(abandoned, next)
return true
}
return false
}
// applyDelta applies version change to current file reference.
applyDelta := func(d *vDelta) {
for _, t := range d.added {
addFileRef(t, 1)
}
for _, t := range d.deleted {
if addFileRef(t, -1) == 0 {
s.tops.remove(storage.FileDesc{Type: storage.TypeTable, Num: t})
}
}
}
timer := time.NewTimer(0)
<-timer.C // discard the initial tick
defer timer.Stop()
// processTasks processes version tasks in strict order.
//
// If we want to use delta to reduce the cost of file references and dereferences,
// we must strictly follow the id of the version, otherwise some files that are
// being referenced will be deleted.
//
// In addition, some db operations (such as iterators) may cause a version to be
// referenced for a long time. In order to prevent such operations from blocking
// the entire processing queue, we will properly convert some of the version tasks
// into full file references and releases.
processTasks := func() {
timer.Reset(maxCachedTime)
// Make sure we don't cache too many version tasks.
for {
// Skip any abandoned version number to prevent blocking processing.
if skipAbandoned() {
next++
continue
}
// Don't bother the version that has been released.
if _, exist := released[next]; exist {
break
}
// Ensure the specified version has been referenced.
if _, exist := ref[next]; !exist {
break
}
if last-next < maxCachedNumber && time.Since(ref[next].created) < maxCachedTime {
break
}
// Convert version task into full file references and releases mode.
// Reference version(i+1) first and wait version(i) to release.
// FileRef(i+1) = FileRef(i) + Delta(i)
for _, tt := range ref[next].files {
for _, t := range tt {
addFileRef(t.fd.Num, 1)
}
}
// Note, if some compactions take a long time, even more than 5 minutes,
// we may miss the corresponding delta information here.
// Fortunately it will not affect the correctness of the file reference,
// and we can apply the delta once we receive it.
if d := deltas[next]; d != nil {
applyDelta(d)
}
referenced[next] = struct{}{}
delete(ref, next)
delete(deltas, next)
next++
}
// Use delta information to process all released versions.
for {
if skipAbandoned() {
next++
continue
}
if d, exist := released[next]; exist {
if d != nil {
applyDelta(d)
}
delete(released, next)
next++
continue
}
return
}
}
for {
processTasks()
select {
case t := <-s.refCh:
if _, exist := ref[t.vid]; exist {
panic("duplicate reference request")
}
ref[t.vid] = t
if t.vid > last {
last = t.vid
}
case d := <-s.deltaCh:
if _, exist := ref[d.vid]; !exist {
if _, exist2 := referenced[d.vid]; !exist2 {
panic("invalid release request")
}
// The reference opt is already expired, apply
// delta here.
applyDelta(d)
continue
}
deltas[d.vid] = d
case t := <-s.relCh:
if _, exist := referenced[t.vid]; exist {
for _, tt := range t.files {
for _, t := range tt {
if addFileRef(t.fd.Num, -1) == 0 {
s.tops.remove(t.fd)
}
}
}
delete(referenced, t.vid)
continue
}
if _, exist := ref[t.vid]; !exist {
panic("invalid release request")
}
released[t.vid] = deltas[t.vid]
delete(deltas, t.vid)
delete(ref, t.vid)
case id := <-s.abandon:
if id >= next {
abandoned[id] = struct{}{}
}
case <-timer.C:
case r := <-s.fileRefCh:
ref := make(map[int64]int)
for f, c := range fileRef {
ref[f] = c
}
r <- ref
case <-s.closeC:
s.closeW.Done()
return
}
}
}
// Get current version. This will incr version ref, must call
// version.release (exactly once) after use.
func (s *session) version() *version {
s.vmu.Lock()
defer s.vmu.Unlock()
s.stVersion.incref()
return s.stVersion
}
func (s *session) tLen(level int) int {
s.vmu.Lock()
defer s.vmu.Unlock()
return s.stVersion.tLen(level)
}
// Set current version to v.
func (s *session) setVersion(r *sessionRecord, v *version) {
s.vmu.Lock()
defer s.vmu.Unlock()
// Hold by session. It is important to call this first before releasing
// current version, otherwise the still used files might get released.
v.incref()
if s.stVersion != nil {
if r != nil {
var (
added = make([]int64, 0, len(r.addedTables))
deleted = make([]int64, 0, len(r.deletedTables))
)
for _, t := range r.addedTables {
added = append(added, t.num)
}
for _, t := range r.deletedTables {
deleted = append(deleted, t.num)
}
select {
case s.deltaCh <- &vDelta{vid: s.stVersion.id, added: added, deleted: deleted}:
case <-v.s.closeC:
s.log("reference loop already exist")
}
}
// Release current version.
s.stVersion.releaseNB()
}
s.stVersion = v
}
// Get current unused file number.
func (s *session) nextFileNum() int64 {
return atomic.LoadInt64(&s.stNextFileNum)
}
// Set current unused file number to num.
func (s *session) setNextFileNum(num int64) {
atomic.StoreInt64(&s.stNextFileNum, num)
}
// Mark file number as used.
func (s *session) markFileNum(num int64) {
nextFileNum := num + 1
for {
old, x := atomic.LoadInt64(&s.stNextFileNum), nextFileNum
if old > x {
x = old
}
if atomic.CompareAndSwapInt64(&s.stNextFileNum, old, x) {
break
}
}
}
// Allocate a file number.
func (s *session) allocFileNum() int64 {
return atomic.AddInt64(&s.stNextFileNum, 1) - 1
}
// Reuse given file number.
func (s *session) reuseFileNum(num int64) {
for {
old, x := atomic.LoadInt64(&s.stNextFileNum), num
if old != x+1 {
x = old
}
if atomic.CompareAndSwapInt64(&s.stNextFileNum, old, x) {
break
}
}
}
// Set compaction ptr at given level; need external synchronization.
func (s *session) setCompPtr(level int, ik internalKey) {
if level >= len(s.stCompPtrs) {
newCompPtrs := make([]internalKey, level+1)
copy(newCompPtrs, s.stCompPtrs)
s.stCompPtrs = newCompPtrs
}
s.stCompPtrs[level] = append(internalKey{}, ik...)
}
// Get compaction ptr at given level; need external synchronization.
func (s *session) getCompPtr(level int) internalKey {
if level >= len(s.stCompPtrs) {
return nil
}
return s.stCompPtrs[level]
}
// Manifest related utils.
// Fill given session record obj with current states; need external
// synchronization.
func (s *session) fillRecord(r *sessionRecord, snapshot bool) {
r.setNextFileNum(s.nextFileNum())
if snapshot {
if !r.has(recJournalNum) {
r.setJournalNum(s.stJournalNum)
}
if !r.has(recSeqNum) {
r.setSeqNum(s.stSeqNum)
}
for level, ik := range s.stCompPtrs {
if ik != nil {
r.addCompPtr(level, ik)
}
}
r.setComparer(s.icmp.uName())
}
}
// Mark if record has been committed, this will update session state;
// need external synchronization.
func (s *session) recordCommited(rec *sessionRecord) {
if rec.has(recJournalNum) {
s.stJournalNum = rec.journalNum
}
if rec.has(recPrevJournalNum) {
s.stPrevJournalNum = rec.prevJournalNum
}
if rec.has(recSeqNum) {
s.stSeqNum = rec.seqNum
}
for _, r := range rec.compPtrs {
s.setCompPtr(r.level, r.ikey)
}
}
// Create a new manifest file; need external synchronization.
func (s *session) newManifest(rec *sessionRecord, v *version) (err error) {
2019-06-09 09:24:20 +02:00
fd := storage.FileDesc{Type: storage.TypeManifest, Num: s.allocFileNum()}
writer, err := s.stor.Create(fd)
if err != nil {
return
}
jw := journal.NewWriter(writer)
if v == nil {
v = s.version()
defer v.release()
}
if rec == nil {
rec = &sessionRecord{}
}
s.fillRecord(rec, true)
v.fillRecord(rec)
defer func() {
if err == nil {
s.recordCommited(rec)
if s.manifest != nil {
s.manifest.Close()
}
if s.manifestWriter != nil {
s.manifestWriter.Close()
}
if !s.manifestFd.Zero() {
err = s.stor.Remove(s.manifestFd)
}
s.manifestFd = fd
s.manifestWriter = writer
s.manifest = jw
} else {
writer.Close()
if rerr := s.stor.Remove(fd); err != nil {
err = fmt.Errorf("newManifest error: %v, cleanup error (%v)", err, rerr)
}
s.reuseFileNum(fd.Num)
}
}()
w, err := jw.Next()
if err != nil {
return
}
err = rec.encode(w)
if err != nil {
return
}
err = jw.Flush()
if err != nil {
return
}
if !s.o.GetNoSync() {
err = writer.Sync()
if err != nil {
return
}
}
err = s.stor.SetMeta(fd)
return
}
// Flush record to disk.
func (s *session) flushManifest(rec *sessionRecord) (err error) {
s.fillRecord(rec, false)
w, err := s.manifest.Next()
if err != nil {
return
}
err = rec.encode(w)
if err != nil {
return
}
err = s.manifest.Flush()
if err != nil {
return
}
if !s.o.GetNoSync() {
err = s.manifestWriter.Sync()
if err != nil {
return
}
}
s.recordCommited(rec)
return
}