Merge branch 'master' into feature/ceremony-files

This commit is contained in:
benbierens 2024-05-02 08:42:03 +02:00
commit 616b237a7c
No known key found for this signature in database
GPG Key ID: 877D2C2E09A22F3A
10 changed files with 35 additions and 52 deletions

View File

@ -68,13 +68,12 @@ proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} =
try: try:
await b.discoveryQueue.put(cid) await b.discoveryQueue.put(cid)
except CatchableError as exc: except CatchableError as exc:
trace "Exception in discovery loop", exc = exc.msg warn "Exception in discovery loop", exc = exc.msg
logScope: logScope:
sleep = b.discoveryLoopSleep sleep = b.discoveryLoopSleep
wanted = b.pendingBlocks.len wanted = b.pendingBlocks.len
trace "About to sleep discovery loop"
await sleepAsync(b.discoveryLoopSleep) await sleepAsync(b.discoveryLoopSleep)
proc advertiseQueueLoop(b: DiscoveryEngine) {.async.} = proc advertiseQueueLoop(b: DiscoveryEngine) {.async.} =
@ -87,10 +86,9 @@ proc advertiseQueueLoop(b: DiscoveryEngine) {.async.} =
await sleepAsync(50.millis) await sleepAsync(50.millis)
trace "Iterating blocks finished." trace "Iterating blocks finished."
trace "About to sleep advertise loop", sleep = b.advertiseLoopSleep
await sleepAsync(b.advertiseLoopSleep) await sleepAsync(b.advertiseLoopSleep)
trace "Exiting advertise task loop" info "Exiting advertise task loop"
proc advertiseTaskLoop(b: DiscoveryEngine) {.async.} = proc advertiseTaskLoop(b: DiscoveryEngine) {.async.} =
## Run advertise tasks ## Run advertise tasks
@ -102,7 +100,6 @@ proc advertiseTaskLoop(b: DiscoveryEngine) {.async.} =
cid = await b.advertiseQueue.get() cid = await b.advertiseQueue.get()
if cid in b.inFlightAdvReqs: if cid in b.inFlightAdvReqs:
trace "Advertise request already in progress", cid
continue continue
try: try:
@ -111,17 +108,15 @@ proc advertiseTaskLoop(b: DiscoveryEngine) {.async.} =
b.inFlightAdvReqs[cid] = request b.inFlightAdvReqs[cid] = request
codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64) codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64)
trace "Advertising block", cid, inflight = b.inFlightAdvReqs.len
await request await request
finally: finally:
b.inFlightAdvReqs.del(cid) b.inFlightAdvReqs.del(cid)
codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64) codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64)
trace "Advertised block", cid, inflight = b.inFlightAdvReqs.len
except CatchableError as exc: except CatchableError as exc:
trace "Exception in advertise task runner", exc = exc.msg warn "Exception in advertise task runner", exc = exc.msg
trace "Exiting advertise task runner" info "Exiting advertise task runner"
proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} = proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} =
## Run discovery tasks ## Run discovery tasks
@ -166,9 +161,9 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} =
b.inFlightDiscReqs.del(cid) b.inFlightDiscReqs.del(cid)
codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64) codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64)
except CatchableError as exc: except CatchableError as exc:
trace "Exception in discovery task runner", exc = exc.msg warn "Exception in discovery task runner", exc = exc.msg
trace "Exiting discovery task runner" info "Exiting discovery task runner"
proc queueFindBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} = proc queueFindBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} =
for cid in cids: for cid in cids:
@ -183,10 +178,9 @@ proc queueProvideBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} =
for cid in cids: for cid in cids:
if cid notin b.advertiseQueue: if cid notin b.advertiseQueue:
try: try:
trace "Queueing provide block", cid, queue = b.discoveryQueue.len
b.advertiseQueue.putNoWait(cid) b.advertiseQueue.putNoWait(cid)
except CatchableError as exc: except CatchableError as exc:
trace "Exception queueing discovery request", exc = exc.msg warn "Exception queueing discovery request", exc = exc.msg
proc start*(b: DiscoveryEngine) {.async.} = proc start*(b: DiscoveryEngine) {.async.} =
## Start the discengine task ## Start the discengine task

View File

@ -262,8 +262,6 @@ proc blockPresenceHandler*(
not b.peers.anyIt( cid in it.peerHaveCids )) not b.peers.anyIt( cid in it.peerHaveCids ))
proc scheduleTasks(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} = proc scheduleTasks(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} =
trace "Schedule a task for new blocks", items = blocksDelivery.len
let let
cids = blocksDelivery.mapIt( it.blk.cid ) cids = blocksDelivery.mapIt( it.blk.cid )
@ -277,7 +275,7 @@ proc scheduleTasks(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.asyn
if b.scheduleTask(p): if b.scheduleTask(p):
trace "Task scheduled for peer", peer = p.id trace "Task scheduled for peer", peer = p.id
else: else:
trace "Unable to schedule task for peer", peer = p.id warn "Unable to schedule task for peer", peer = p.id
break # do next peer break # do next peer
@ -293,7 +291,7 @@ proc cancelBlocks(b: BlockExcEngine, addrs: seq[BlockAddress]) {.async.} =
.filterIt(it.failed) .filterIt(it.failed)
if failed.len > 0: if failed.len > 0:
trace "Failed to send block request cancellations to peers", peers = failed.len warn "Failed to send block request cancellations to peers", peers = failed.len
proc getAnnouceCids(blocksDelivery: seq[BlockDelivery]): seq[Cid] = proc getAnnouceCids(blocksDelivery: seq[BlockDelivery]): seq[Cid] =
var cids = initHashSet[Cid]() var cids = initHashSet[Cid]()
@ -309,8 +307,6 @@ proc getAnnouceCids(blocksDelivery: seq[BlockDelivery]): seq[Cid] =
return cids.toSeq return cids.toSeq
proc resolveBlocks*(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} = proc resolveBlocks*(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} =
trace "Resolving blocks", blocks = blocksDelivery.len
b.pendingBlocks.resolve(blocksDelivery) b.pendingBlocks.resolve(blocksDelivery)
await b.scheduleTasks(blocksDelivery) await b.scheduleTasks(blocksDelivery)
let announceCids = getAnnouceCids(blocksDelivery) let announceCids = getAnnouceCids(blocksDelivery)
@ -618,7 +614,7 @@ proc blockexcTaskRunner(b: BlockExcEngine) {.async.} =
trace "Got new task from queue", peerId = peerCtx.id trace "Got new task from queue", peerId = peerCtx.id
await b.taskHandler(peerCtx) await b.taskHandler(peerCtx)
trace "Exiting blockexc task runner" info "Exiting blockexc task runner"
proc new*( proc new*(
T: type BlockExcEngine, T: type BlockExcEngine,

View File

@ -102,8 +102,6 @@ proc resolve*(
trace "Block retrieval time", retrievalDurationUs, address = bd.address trace "Block retrieval time", retrievalDurationUs, address = bd.address
else: else:
trace "Block handle already finished", address = bd.address trace "Block handle already finished", address = bd.address
do:
warn "Attempting to resolve block that's not currently a pending block", address = bd.address
proc setInFlight*( proc setInFlight*(
p: PendingBlocksManager, p: PendingBlocksManager,

View File

@ -147,9 +147,6 @@ proc sendWantCancellations*(
addresses: seq[BlockAddress]): Future[void] {.async.} = addresses: seq[BlockAddress]): Future[void] {.async.} =
## Informs a remote peer that we're no longer interested in a set of blocks ## Informs a remote peer that we're no longer interested in a set of blocks
## ##
trace "Sending block request cancellation to peer", addrs = addresses.len, peer = id
await b.sendWantList(id = id, addresses = addresses, cancel = true) await b.sendWantList(id = id, addresses = addresses, cancel = true)
proc handleBlocksDelivery( proc handleBlocksDelivery(

View File

@ -84,16 +84,13 @@ method find*(
method provide*(d: Discovery, cid: Cid) {.async, base.} = method provide*(d: Discovery, cid: Cid) {.async, base.} =
## Provide a bock Cid ## Provide a bock Cid
## ##
trace "Providing block", cid
let let
nodes = await d.protocol.addProvider( nodes = await d.protocol.addProvider(
cid.toNodeId(), d.providerRecord.get) cid.toNodeId(), d.providerRecord.get)
if nodes.len <= 0: if nodes.len <= 0:
trace "Couldn't provide to any nodes!" warn "Couldn't provide to any nodes!"
trace "Provided to nodes", nodes = nodes.len
method find*( method find*(
d: Discovery, d: Discovery,

View File

@ -66,7 +66,7 @@ proc encode*(manifest: Manifest): ?!seq[byte] =
var header = initProtoBuffer() var header = initProtoBuffer()
header.write(1, manifest.treeCid.data.buffer) header.write(1, manifest.treeCid.data.buffer)
header.write(2, manifest.blockSize.uint32) header.write(2, manifest.blockSize.uint32)
header.write(3, manifest.datasetSize.uint32) header.write(3, manifest.datasetSize.uint64)
header.write(4, manifest.codec.uint32) header.write(4, manifest.codec.uint32)
header.write(5, manifest.hcodec.uint32) header.write(5, manifest.hcodec.uint32)
header.write(6, manifest.version.uint32) header.write(6, manifest.version.uint32)
@ -75,7 +75,7 @@ proc encode*(manifest: Manifest): ?!seq[byte] =
erasureInfo.write(1, manifest.ecK.uint32) erasureInfo.write(1, manifest.ecK.uint32)
erasureInfo.write(2, manifest.ecM.uint32) erasureInfo.write(2, manifest.ecM.uint32)
erasureInfo.write(3, manifest.originalTreeCid.data.buffer) erasureInfo.write(3, manifest.originalTreeCid.data.buffer)
erasureInfo.write(4, manifest.originalDatasetSize.uint32) erasureInfo.write(4, manifest.originalDatasetSize.uint64)
erasureInfo.write(5, manifest.protectedStrategy.uint32) erasureInfo.write(5, manifest.protectedStrategy.uint32)
if manifest.verifiable: if manifest.verifiable:
@ -106,12 +106,12 @@ proc decode*(_: type Manifest, data: openArray[byte]): ?!Manifest =
pbVerificationInfo: ProtoBuffer pbVerificationInfo: ProtoBuffer
treeCidBuf: seq[byte] treeCidBuf: seq[byte]
originalTreeCid: seq[byte] originalTreeCid: seq[byte]
datasetSize: uint32 datasetSize: uint64
codec: uint32 codec: uint32
hcodec: uint32 hcodec: uint32
version: uint32 version: uint32
blockSize: uint32 blockSize: uint32
originalDatasetSize: uint32 originalDatasetSize: uint64
ecK, ecM: uint32 ecK, ecM: uint32
protectedStrategy: uint32 protectedStrategy: uint32
verifyRoot: seq[byte] verifyRoot: seq[byte]

View File

@ -294,7 +294,7 @@ proc store*(
## Save stream contents as dataset with given blockSize ## Save stream contents as dataset with given blockSize
## to nodes's BlockStore, and return Cid of its manifest ## to nodes's BlockStore, and return Cid of its manifest
## ##
trace "Storing data" info "Storing data"
let let
hcodec = Sha256HashCodec hcodec = Sha256HashCodec
@ -308,8 +308,6 @@ proc store*(
let chunk = await chunker.getBytes(); let chunk = await chunker.getBytes();
chunk.len > 0): chunk.len > 0):
trace "Got data from stream", len = chunk.len
without mhash =? MultiHash.digest($hcodec, chunk).mapFailure, err: without mhash =? MultiHash.digest($hcodec, chunk).mapFailure, err:
return failure(err) return failure(err)
@ -322,7 +320,7 @@ proc store*(
cids.add(cid) cids.add(cid)
if err =? (await self.networkStore.putBlock(blk)).errorOption: if err =? (await self.networkStore.putBlock(blk)).errorOption:
trace "Unable to store block", cid = blk.cid, err = err.msg error "Unable to store block", cid = blk.cid, err = err.msg
return failure(&"Unable to store block {blk.cid}") return failure(&"Unable to store block {blk.cid}")
except CancelledError as exc: except CancelledError as exc:
raise exc raise exc
@ -353,7 +351,7 @@ proc store*(
codec = dataCodec) codec = dataCodec)
without manifestBlk =? await self.storeManifest(manifest), err: without manifestBlk =? await self.storeManifest(manifest), err:
trace "Unable to store manifest" error "Unable to store manifest"
return failure(err) return failure(err)
info "Stored data", manifestCid = manifestBlk.cid, info "Stored data", manifestCid = manifestBlk.cid,
@ -361,7 +359,6 @@ proc store*(
blocks = manifest.blocksCount, blocks = manifest.blocksCount,
datasetSize = manifest.datasetSize datasetSize = manifest.datasetSize
# Announce manifest
await self.discovery.provide(manifestBlk.cid) await self.discovery.provide(manifestBlk.cid)
await self.discovery.provide(treeCid) await self.discovery.provide(treeCid)

View File

@ -69,9 +69,6 @@ method putBlock*(
ttl = Duration.none): Future[?!void] {.async.} = ttl = Duration.none): Future[?!void] {.async.} =
## Store block locally and notify the network ## Store block locally and notify the network
## ##
trace "Putting block into network store", cid = blk.cid
let res = await self.localStore.putBlock(blk, ttl) let res = await self.localStore.putBlock(blk, ttl)
if res.isErr: if res.isErr:
return res return res

View File

@ -119,7 +119,7 @@ method putCidAndProof*(
without key =? createBlockCidAndProofMetadataKey(treeCid, index), err: without key =? createBlockCidAndProofMetadataKey(treeCid, index), err:
return failure(err) return failure(err)
trace "Storing block cid and proof with key", key trace "Storing block cid and proof", blockCid, key
let value = (blockCid, proof).encode() let value = (blockCid, proof).encode()
@ -313,7 +313,7 @@ method putBlock*(
return success() return success()
without key =? makePrefixKey(self.postFixLen, blk.cid), err: without key =? makePrefixKey(self.postFixLen, blk.cid), err:
trace "Error getting key from provider", err = err.msg warn "Error getting key from provider", err = err.msg
return failure(err) return failure(err)
if await key in self.repoDs: if await key in self.repoDs:
@ -325,8 +325,6 @@ method putBlock*(
return failure( return failure(
newException(QuotaUsedError, "Cannot store block, quota used!")) newException(QuotaUsedError, "Cannot store block, quota used!"))
trace "Storing block with key", key
var var
batch: seq[BatchEntry] batch: seq[BatchEntry]
@ -334,22 +332,21 @@ method putBlock*(
used = self.quotaUsedBytes + blk.data.len.uint used = self.quotaUsedBytes + blk.data.len.uint
if err =? (await self.repoDs.put(key, blk.data)).errorOption: if err =? (await self.repoDs.put(key, blk.data)).errorOption:
trace "Error storing block", err = err.msg error "Error storing block", err = err.msg
return failure(err) return failure(err)
trace "Updating quota", used
batch.add((QuotaUsedKey, @(used.uint64.toBytesBE))) batch.add((QuotaUsedKey, @(used.uint64.toBytesBE)))
without blockExpEntry =? self.getBlockExpirationEntry(blk.cid, ttl), err: without blockExpEntry =? self.getBlockExpirationEntry(blk.cid, ttl), err:
trace "Unable to create block expiration metadata key", err = err.msg warn "Unable to create block expiration metadata key", err = err.msg
return failure(err) return failure(err)
batch.add(blockExpEntry) batch.add(blockExpEntry)
if err =? (await self.metaDs.put(batch)).errorOption: if err =? (await self.metaDs.put(batch)).errorOption:
trace "Error updating quota bytes", err = err.msg error "Error updating quota bytes", err = err.msg
if err =? (await self.repoDs.delete(key)).errorOption: if err =? (await self.repoDs.delete(key)).errorOption:
trace "Error deleting block after failed quota update", err = err.msg error "Error deleting block after failed quota update", err = err.msg
return failure(err) return failure(err)
return failure(err) return failure(err)
@ -357,7 +354,7 @@ method putBlock*(
self.quotaUsedBytes = used self.quotaUsedBytes = used
inc self.totalBlocks inc self.totalBlocks
if isErr (await self.persistTotalBlocksCount()): if isErr (await self.persistTotalBlocksCount()):
trace "Unable to update block total metadata" warn "Unable to update block total metadata"
return failure("Unable to update block total metadata") return failure("Unable to update block total metadata")
self.updateMetrics() self.updateMetrics()

View File

@ -57,6 +57,16 @@ checksuite "Manifest":
check: check:
encodeDecode(manifest) == manifest encodeDecode(manifest) == manifest
test "Should encode/decode large manifest":
let large = Manifest.new(
treeCid = Cid.example,
blockSize = (64 * 1024).NBytes,
datasetSize = (5 * 1024).MiBs
)
check:
encodeDecode(large) == large
test "Should encode/decode to/from protected manifest": test "Should encode/decode to/from protected manifest":
check: check:
encodeDecode(protectedManifest) == protectedManifest encodeDecode(protectedManifest) == protectedManifest