// Copyright (c) 2012, Suryandaru Triandana // All rights reserved. // // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. package leveldb import ( "fmt" "io" "os" "sync" "github.com/syndtr/goleveldb/leveldb/errors" "github.com/syndtr/goleveldb/leveldb/journal" "github.com/syndtr/goleveldb/leveldb/opt" "github.com/syndtr/goleveldb/leveldb/storage" ) // ErrManifestCorrupted records manifest corruption. This error will be // wrapped with errors.ErrCorrupted. type ErrManifestCorrupted struct { Field string Reason string } func (e *ErrManifestCorrupted) Error() string { return fmt.Sprintf("leveldb: manifest corrupted (field '%s'): %s", e.Field, e.Reason) } func newErrManifestCorrupted(fd storage.FileDesc, field, reason string) error { return errors.NewErrCorrupted(fd, &ErrManifestCorrupted{field, reason}) } // session represent a persistent database session. type session struct { // Need 64-bit alignment. stNextFileNum int64 // current unused file number stJournalNum int64 // current journal file number; need external synchronization stPrevJournalNum int64 // prev journal file number; no longer used; for compatibility with older version of leveldb stTempFileNum int64 stSeqNum uint64 // last mem compacted seq; need external synchronization stor *iStorage storLock storage.Locker o *cachedOptions icmp *iComparer tops *tOps manifest *journal.Writer manifestWriter storage.Writer manifestFd storage.FileDesc stCompPtrs []internalKey // compaction pointers; need external synchronization stVersion *version // current version ntVersionID int64 // next version id to assign refCh chan *vTask relCh chan *vTask deltaCh chan *vDelta abandon chan int64 closeC chan struct{} closeW sync.WaitGroup vmu sync.Mutex // Testing fields fileRefCh chan chan map[int64]int // channel used to pass current reference stat } // Creates new initialized session instance. func newSession(stor storage.Storage, o *opt.Options) (s *session, err error) { if stor == nil { return nil, os.ErrInvalid } storLock, err := stor.Lock() if err != nil { return } s = &session{ stor: newIStorage(stor), storLock: storLock, refCh: make(chan *vTask), relCh: make(chan *vTask), deltaCh: make(chan *vDelta), abandon: make(chan int64), fileRefCh: make(chan chan map[int64]int), closeC: make(chan struct{}), } s.setOptions(o) s.tops = newTableOps(s) s.closeW.Add(1) go s.refLoop() s.setVersion(nil, newVersion(s)) s.log("log@legend F·NumFile S·FileSize N·Entry C·BadEntry B·BadBlock Ke·KeyError D·DroppedEntry L·Level Q·SeqNum T·TimeElapsed") return } // Close session. func (s *session) close() { s.tops.close() if s.manifest != nil { s.manifest.Close() } if s.manifestWriter != nil { s.manifestWriter.Close() } s.manifest = nil s.manifestWriter = nil s.setVersion(nil, &version{s: s, closing: true, id: s.ntVersionID}) // Close all background goroutines close(s.closeC) s.closeW.Wait() } // Release session lock. func (s *session) release() { s.storLock.Unlock() } // Create a new database session; need external synchronization. func (s *session) create() error { // create manifest return s.newManifest(nil, nil) } // Recover a database session; need external synchronization. func (s *session) recover() (err error) { defer func() { if os.IsNotExist(err) { // Don't return os.ErrNotExist if the underlying storage contains // other files that belong to LevelDB. So the DB won't get trashed. if fds, _ := s.stor.List(storage.TypeAll); len(fds) > 0 { err = &errors.ErrCorrupted{Err: errors.New("database entry point either missing or corrupted")} } } }() fd, err := s.stor.GetMeta() if err != nil { return } reader, err := s.stor.Open(fd) if err != nil { return } defer reader.Close() var ( // Options. strict = s.o.GetStrict(opt.StrictManifest) jr = journal.NewReader(reader, dropper{s, fd}, strict, true) rec = &sessionRecord{} staging = s.stVersion.newStaging() ) for { var r io.Reader r, err = jr.Next() if err != nil { if err == io.EOF { err = nil break } return errors.SetFd(err, fd) } err = rec.decode(r) if err == nil { // save compact pointers for _, r := range rec.compPtrs { s.setCompPtr(r.level, r.ikey) } // commit record to version staging staging.commit(rec) } else { err = errors.SetFd(err, fd) if strict || !errors.IsCorrupted(err) { return } s.logf("manifest error: %v (skipped)", errors.SetFd(err, fd)) } rec.resetCompPtrs() rec.resetAddedTables() rec.resetDeletedTables() } switch { case !rec.has(recComparer): return newErrManifestCorrupted(fd, "comparer", "missing") case rec.comparer != s.icmp.uName(): return newErrManifestCorrupted(fd, "comparer", fmt.Sprintf("mismatch: want '%s', got '%s'", s.icmp.uName(), rec.comparer)) case !rec.has(recNextFileNum): return newErrManifestCorrupted(fd, "next-file-num", "missing") case !rec.has(recJournalNum): return newErrManifestCorrupted(fd, "journal-file-num", "missing") case !rec.has(recSeqNum): return newErrManifestCorrupted(fd, "seq-num", "missing") } s.manifestFd = fd s.setVersion(rec, staging.finish(false)) s.setNextFileNum(rec.nextFileNum) s.recordCommited(rec) return nil } // Commit session; need external synchronization. func (s *session) commit(r *sessionRecord, trivial bool) (err error) { v := s.version() defer v.release() // spawn new version based on current version nv := v.spawn(r, trivial) // abandon useless version id to prevent blocking version processing loop. defer func() { if err != nil { s.abandon <- nv.id s.logf("commit@abandon useless vid D%d", nv.id) } }() if s.manifest == nil || s.manifest.Size() >= s.o.GetMaxManifestFileSize() { // manifest journal writer not yet created, create one err = s.newManifest(r, nv) } else { err = s.flushManifest(r) } // finally, apply new version if no error rise if err == nil { s.setVersion(r, nv) } return }