From 9abcc1f2f503fe11a61c0383d1010bc814dd793f Mon Sep 17 00:00:00 2001 From: gmega Date: Fri, 9 Jan 2026 13:55:40 -0300 Subject: [PATCH] fix: don't write to the blockmap unless we're at batch boundaries --- blockstore/dataset.nim | 24 +++++++++++++----------- blockstore/repostore.nim | 27 +++++++++++++++++++-------- 2 files changed, 32 insertions(+), 19 deletions(-) diff --git a/blockstore/dataset.nim b/blockstore/dataset.nim index e2d5534..3cf53a0 100644 --- a/blockstore/dataset.nim +++ b/blockstore/dataset.nim @@ -76,6 +76,7 @@ type db: LevelDb blockmapBackend: BlockmapBackend blockmapSeq: seq[byte] + blockmapPending: seq[int] blockmapFile: FileBlockmap merkleBackend: MerkleBackend merkleReader: MerkleReader @@ -148,13 +149,6 @@ proc markBlock(dataset: Dataset, index: int, value: bool): BResult[void] {.inlin ?dataset.blockmapFile.clear(index.uint64) ok() -proc flushBlockmap(dataset: Dataset) = - case dataset.blockmapBackend - of bmLevelDb: - discard - of bmFile: - dataset.blockmapFile.flush() - proc close*(dataset: Dataset) = case dataset.blockmapBackend of bmLevelDb: @@ -946,7 +940,7 @@ proc saveBlockmap(dataset: Dataset): BResult[void] = except LevelDbException as e: err(databaseError(e.msg)) of bmFile: - dataset.flushBlockmap() + dataset.blockmapFile.flush() ok() proc putBlock*(dataset: Dataset, b: blk.Block, index: int, proof: MerkleProof): Future[BResult[void]] {.async.} = @@ -959,7 +953,7 @@ proc putBlock*(dataset: Dataset, b: blk.Block, index: int, proof: MerkleProof): if proof.leafCount != uint64(dataset.blockCount): return err(invalidProofError()) - discard ?await dataset.repo.putBlock(b) + let (_, synced) = ?await dataset.repo.putBlock(b) let key = datasetBlockKey(dataset.treeId, index) @@ -975,9 +969,17 @@ proc putBlock*(dataset: Dataset, b: blk.Block, index: int, proof: MerkleProof): try: dataset.db.put(key, cast[string](blockRefBytes)) - ?dataset.markBlock(index, true) + # Only update the blockmap when blocks are synced (or + # we've disabled consistency) or we may report blocks + # we don't have. + if synced or dataset.repo.syncBatchSize == 0: + for index in dataset.blockmapPending: + ?dataset.markBlock(index, true) - ?dataset.saveBlockmap() + ?dataset.saveBlockmap() + dataset.blockmapPending = @[] + else: + dataset.blockmapPending.add(index) return ok() except LevelDbException as e: diff --git a/blockstore/repostore.nim b/blockstore/repostore.nim index e4e05c9..7c5dcbe 100644 --- a/blockstore/repostore.nim +++ b/blockstore/repostore.nim @@ -2,7 +2,7 @@ import std/[os, locks, atomics, strutils, times, options, tables] when defined(posix): import std/posix import chronos -import chronos/asyncsync +import chronos/[asyncsync, threadsync] import leveldbstatic as leveldb import ./errors @@ -24,6 +24,7 @@ type SyncWorker* = ref object mutex: Lock cond: Cond + synced: ThreadSignalPtr running: Atomic[bool] thread: Thread[pointer] blocksDir: string @@ -69,6 +70,7 @@ proc syncWorkerLoop(workerPtr: pointer) {.thread, nimcall.} = release(worker.mutex) doSync(worker.blocksDir) acquire(worker.mutex) + discard worker.synced.fireSync() release(worker.mutex) doSync(worker.blocksDir) break @@ -80,8 +82,10 @@ proc newSyncWorker*(blocksDir: string): SyncWorker = result.running.store(true) createThread(result.thread, syncWorkerLoop, cast[pointer](result)) -proc triggerSync*(worker: SyncWorker) = +proc triggerSync*(worker: SyncWorker): Future[void].Raising( + [AsyncError, CancelledError]) = signal(worker.cond) + return worker.synced.wait() proc stopSyncWorker*(worker: SyncWorker) = worker.running.store(false) @@ -172,6 +176,9 @@ proc decreaseUsed*(store: RepoStore, size: uint64) {.inline.} = proc quota*(store: RepoStore): uint64 {.inline.} = store.quota +proc syncBatchSize*(store: RepoStore): int {.inline.} = + store.syncBatchSize + proc wouldExceedQuota*(store: RepoStore, size: uint64): bool {.inline.} = if store.quota == 0: return false @@ -216,7 +223,7 @@ proc incrementRefCount(store: RepoStore, cidStr: string): BResult[void] = except Exception as e: err(databaseError(e.msg)) -proc putBlock*(store: RepoStore, b: blk.Block): Future[BResult[bool]] {.async.} = +proc putBlock*(store: RepoStore, b: blk.Block): Future[BResult[(bool, bool)]] {.async.} = let cidStr = $b.cid let blockPath = store.getBlockPath(b.cid) let blockSize = b.data.len.uint64 @@ -224,7 +231,7 @@ proc putBlock*(store: RepoStore, b: blk.Block): Future[BResult[bool]] {.async.} let hasIt = ?store.hasBlock(b.cid) if hasIt: ?store.incrementRefCount(cidStr) - return ok(false) + return ok((false, false)) let cl = await store.acquireCidLock(cidStr) defer: store.releaseCidLock(cl, cidStr) @@ -232,11 +239,13 @@ proc putBlock*(store: RepoStore, b: blk.Block): Future[BResult[bool]] {.async.} let hasIt2 = ?store.hasBlock(b.cid) if hasIt2: ?store.incrementRefCount(cidStr) - return ok(false) + return ok((false, false)) let fileExisted = fileExists(blockPath) - var newUsed: uint64 + var + newUsed: uint64 + synced = false if fileExisted: newUsed = store.used.load() else: @@ -262,6 +271,7 @@ proc putBlock*(store: RepoStore, b: blk.Block): Future[BResult[bool]] {.async.} let syncResult = syncAndCloseFile(fileResult.value) if syncResult.isErr: return err(syncResult.error) + synced = true else: let writeResult = writeBlockToFile(blockPath, b.data, ioBuffered) @@ -269,7 +279,8 @@ proc putBlock*(store: RepoStore, b: blk.Block): Future[BResult[bool]] {.async.} return err(writeResult.error) let count = store.writeCount.fetchAdd(1) + 1 if count mod store.syncBatchSize == 0: - store.syncWorker.triggerSync() + await store.syncWorker.triggerSync() # FIXME this might error out and we wait forever + synced = true newUsed = store.used.fetchAdd(blockSize) + blockSize @@ -280,7 +291,7 @@ proc putBlock*(store: RepoStore, b: blk.Block): Future[BResult[bool]] {.async.} try: store.db.put(blockInfoKey(cidStr), cast[string](infoBytes)) store.db.put(UsedKey, cast[string](usedBytes)) - ok(not fileExisted) + ok((not fileExisted, synced)) except LevelDbException as e: err(databaseError(e.msg)) except Exception as e: