fix: don't write to the blockmap unless we're at batch boundaries

This commit is contained in:
gmega 2026-01-09 13:55:40 -03:00
parent 7b23545c27
commit 9abcc1f2f5
No known key found for this signature in database
GPG Key ID: 6290D34EAD824B18
2 changed files with 32 additions and 19 deletions

View File

@ -76,6 +76,7 @@ type
db: LevelDb
blockmapBackend: BlockmapBackend
blockmapSeq: seq[byte]
blockmapPending: seq[int]
blockmapFile: FileBlockmap
merkleBackend: MerkleBackend
merkleReader: MerkleReader
@ -148,13 +149,6 @@ proc markBlock(dataset: Dataset, index: int, value: bool): BResult[void] {.inlin
?dataset.blockmapFile.clear(index.uint64)
ok()
proc flushBlockmap(dataset: Dataset) =
case dataset.blockmapBackend
of bmLevelDb:
discard
of bmFile:
dataset.blockmapFile.flush()
proc close*(dataset: Dataset) =
case dataset.blockmapBackend
of bmLevelDb:
@ -946,7 +940,7 @@ proc saveBlockmap(dataset: Dataset): BResult[void] =
except LevelDbException as e:
err(databaseError(e.msg))
of bmFile:
dataset.flushBlockmap()
dataset.blockmapFile.flush()
ok()
proc putBlock*(dataset: Dataset, b: blk.Block, index: int, proof: MerkleProof): Future[BResult[void]] {.async.} =
@ -959,7 +953,7 @@ proc putBlock*(dataset: Dataset, b: blk.Block, index: int, proof: MerkleProof):
if proof.leafCount != uint64(dataset.blockCount):
return err(invalidProofError())
discard ?await dataset.repo.putBlock(b)
let (_, synced) = ?await dataset.repo.putBlock(b)
let key = datasetBlockKey(dataset.treeId, index)
@ -975,9 +969,17 @@ proc putBlock*(dataset: Dataset, b: blk.Block, index: int, proof: MerkleProof):
try:
dataset.db.put(key, cast[string](blockRefBytes))
?dataset.markBlock(index, true)
# Only update the blockmap when blocks are synced (or
# we've disabled consistency) or we may report blocks
# we don't have.
if synced or dataset.repo.syncBatchSize == 0:
for index in dataset.blockmapPending:
?dataset.markBlock(index, true)
?dataset.saveBlockmap()
?dataset.saveBlockmap()
dataset.blockmapPending = @[]
else:
dataset.blockmapPending.add(index)
return ok()
except LevelDbException as e:

View File

@ -2,7 +2,7 @@ import std/[os, locks, atomics, strutils, times, options, tables]
when defined(posix):
import std/posix
import chronos
import chronos/asyncsync
import chronos/[asyncsync, threadsync]
import leveldbstatic as leveldb
import ./errors
@ -24,6 +24,7 @@ type
SyncWorker* = ref object
mutex: Lock
cond: Cond
synced: ThreadSignalPtr
running: Atomic[bool]
thread: Thread[pointer]
blocksDir: string
@ -69,6 +70,7 @@ proc syncWorkerLoop(workerPtr: pointer) {.thread, nimcall.} =
release(worker.mutex)
doSync(worker.blocksDir)
acquire(worker.mutex)
discard worker.synced.fireSync()
release(worker.mutex)
doSync(worker.blocksDir)
break
@ -80,8 +82,10 @@ proc newSyncWorker*(blocksDir: string): SyncWorker =
result.running.store(true)
createThread(result.thread, syncWorkerLoop, cast[pointer](result))
proc triggerSync*(worker: SyncWorker) =
proc triggerSync*(worker: SyncWorker): Future[void].Raising(
[AsyncError, CancelledError]) =
signal(worker.cond)
return worker.synced.wait()
proc stopSyncWorker*(worker: SyncWorker) =
worker.running.store(false)
@ -172,6 +176,9 @@ proc decreaseUsed*(store: RepoStore, size: uint64) {.inline.} =
proc quota*(store: RepoStore): uint64 {.inline.} =
store.quota
proc syncBatchSize*(store: RepoStore): int {.inline.} =
store.syncBatchSize
proc wouldExceedQuota*(store: RepoStore, size: uint64): bool {.inline.} =
if store.quota == 0:
return false
@ -216,7 +223,7 @@ proc incrementRefCount(store: RepoStore, cidStr: string): BResult[void] =
except Exception as e:
err(databaseError(e.msg))
proc putBlock*(store: RepoStore, b: blk.Block): Future[BResult[bool]] {.async.} =
proc putBlock*(store: RepoStore, b: blk.Block): Future[BResult[(bool, bool)]] {.async.} =
let cidStr = $b.cid
let blockPath = store.getBlockPath(b.cid)
let blockSize = b.data.len.uint64
@ -224,7 +231,7 @@ proc putBlock*(store: RepoStore, b: blk.Block): Future[BResult[bool]] {.async.}
let hasIt = ?store.hasBlock(b.cid)
if hasIt:
?store.incrementRefCount(cidStr)
return ok(false)
return ok((false, false))
let cl = await store.acquireCidLock(cidStr)
defer: store.releaseCidLock(cl, cidStr)
@ -232,11 +239,13 @@ proc putBlock*(store: RepoStore, b: blk.Block): Future[BResult[bool]] {.async.}
let hasIt2 = ?store.hasBlock(b.cid)
if hasIt2:
?store.incrementRefCount(cidStr)
return ok(false)
return ok((false, false))
let fileExisted = fileExists(blockPath)
var newUsed: uint64
var
newUsed: uint64
synced = false
if fileExisted:
newUsed = store.used.load()
else:
@ -262,6 +271,7 @@ proc putBlock*(store: RepoStore, b: blk.Block): Future[BResult[bool]] {.async.}
let syncResult = syncAndCloseFile(fileResult.value)
if syncResult.isErr:
return err(syncResult.error)
synced = true
else:
let writeResult = writeBlockToFile(blockPath, b.data, ioBuffered)
@ -269,7 +279,8 @@ proc putBlock*(store: RepoStore, b: blk.Block): Future[BResult[bool]] {.async.}
return err(writeResult.error)
let count = store.writeCount.fetchAdd(1) + 1
if count mod store.syncBatchSize == 0:
store.syncWorker.triggerSync()
await store.syncWorker.triggerSync() # FIXME this might error out and we wait forever
synced = true
newUsed = store.used.fetchAdd(blockSize) + blockSize
@ -280,7 +291,7 @@ proc putBlock*(store: RepoStore, b: blk.Block): Future[BResult[bool]] {.async.}
try:
store.db.put(blockInfoKey(cidStr), cast[string](infoBytes))
store.db.put(UsedKey, cast[string](usedBytes))
ok(not fileExisted)
ok((not fileExisted, synced))
except LevelDbException as e:
err(databaseError(e.msg))
except Exception as e: