Merge branch 'master' into feature/ceremony-files

This commit is contained in:
Ben 2024-03-18 09:06:19 +01:00
commit 4379461b5e
No known key found for this signature in database
GPG Key ID: 541B9D8C9F1426A1
4 changed files with 106 additions and 47 deletions

View File

@ -73,6 +73,7 @@ type
peersPerRequest: int # Max number of peers to request from
wallet*: WalletRef # Nitro wallet for micropayments
pricing*: ?Pricing # Optional bandwidth pricing
blockFetchTimeout*: Duration # Timeout for fetching blocks over the network
discovery*: DiscoveryEngine
Pricing* = object
@ -173,12 +174,10 @@ proc monitorBlockHandle(
proc requestBlock*(
b: BlockExcEngine,
address: BlockAddress,
timeout = DefaultBlockTimeout): Future[?!Block] {.async.} =
let blockFuture = b.pendingBlocks.getWantHandle(address, timeout)
): Future[?!Block] {.async.} =
let blockFuture = b.pendingBlocks.getWantHandle(address, b.blockFetchTimeout)
if b.pendingBlocks.isInFlight(address):
success await blockFuture
else:
if not b.pendingBlocks.isInFlight(address):
let peers = b.peers.selectCheapest(address)
if peers.len == 0:
b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid])
@ -199,12 +198,17 @@ proc requestBlock*(
await b.sendWantHave(address, @[peer], toSeq(b.peers))
codex_block_exchange_want_have_lists_sent.inc()
# Don't let timeouts bubble up. We can't be too broad here or we break
# cancellations.
try:
success await blockFuture
except AsyncTimeoutError as err:
failure err
proc requestBlock*(
b: BlockExcEngine,
cid: Cid,
timeout = DefaultBlockTimeout): Future[?!Block] =
cid: Cid
): Future[?!Block] =
b.requestBlock(BlockAddress.init(cid))
proc blockPresenceHandler*(
@ -614,7 +618,8 @@ proc new*(
peerStore: PeerCtxStore,
pendingBlocks: PendingBlocksManager,
concurrentTasks = DefaultConcurrentTasks,
peersPerRequest = DefaultMaxPeersPerRequest
peersPerRequest = DefaultMaxPeersPerRequest,
blockFetchTimeout = DefaultBlockTimeout,
): BlockExcEngine =
## Create new block exchange engine instance
##
@ -629,7 +634,8 @@ proc new*(
wallet: wallet,
concurrentTasks: concurrentTasks,
taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize),
discovery: discovery)
discovery: discovery,
blockFetchTimeout: blockFetchTimeout)
proc peerEventHandler(peerId: PeerId, event: PeerEvent) {.async.} =
if event.kind == PeerEventKind.Joined:

View File

@ -209,6 +209,62 @@ proc fetchBatched*(
let iter = Iter.fromSlice(0..<manifest.blocksCount)
self.fetchBatched(manifest.treeCid, iter, batchSize, onBatch)
proc streamSingleBlock(
self: CodexNodeRef,
cid: Cid
): Future[?!LPstream] {.async.} =
## Streams the contents of a single block.
##
trace "Streaming single block", cid = cid
let
stream = BufferStream.new()
without blk =? (await self.networkStore.getBlock(BlockAddress.init(cid))), err:
return failure(err)
proc streamOneBlock(): Future[void] {.async.} =
try:
await stream.pushData(blk.data)
except CatchableError as exc:
trace "Unable to send block", cid, exc = exc.msg
discard
finally:
await stream.pushEof()
asyncSpawn streamOneBlock()
LPStream(stream).success
proc streamEntireDataset(
self: CodexNodeRef,
manifest: Manifest,
manifestCid: Cid,
): ?!LPStream =
## Streams the contents of the entire dataset described by the manifest.
##
trace "Retrieving blocks from manifest", manifestCid
if manifest.protected:
# Retrieve, decode and save to the local store all EС groups
proc erasureJob(): Future[void] {.async.} =
try:
# Spawn an erasure decoding job
let
erasure = Erasure.new(
self.networkStore,
leoEncoderProvider,
leoDecoderProvider)
without _ =? (await erasure.decode(manifest)), error:
trace "Unable to erasure decode manifest", manifestCid, exc = error.msg
except CatchableError as exc:
trace "Exception decoding manifest", manifestCid, exc = exc.msg
asyncSpawn erasureJob()
# Retrieve all blocks of the dataset sequentially from the local store or network
trace "Creating store stream for manifest", manifestCid
LPStream(StoreStream.new(self.networkStore, manifest, pad = false)).success
proc retrieve*(
self: CodexNodeRef,
cid: Cid,
@ -219,46 +275,13 @@ proc retrieve*(
if local and not await (cid in self.networkStore):
return failure((ref BlockNotFoundError)(msg: "Block not found in local store"))
if manifest =? (await self.fetchManifest(cid)):
trace "Retrieving blocks from manifest", cid
if manifest.protected:
# Retrieve, decode and save to the local store all EС groups
proc erasureJob(): Future[void] {.async.} =
try:
# Spawn an erasure decoding job
let
erasure = Erasure.new(
self.networkStore,
leoEncoderProvider,
leoDecoderProvider)
without _ =? (await erasure.decode(manifest)), error:
trace "Unable to erasure decode manifest", cid, exc = error.msg
except CatchableError as exc:
trace "Exception decoding manifest", cid, exc = exc.msg
asyncSpawn erasureJob()
# Retrieve all blocks of the dataset sequentially from the local store or network
trace "Creating store stream for manifest", cid
LPStream(StoreStream.new(self.networkStore, manifest, pad = false)).success
else:
let
stream = BufferStream.new()
without blk =? (await self.networkStore.getBlock(BlockAddress.init(cid))), err:
without manifest =? (await self.fetchManifest(cid)), err:
if err of AsyncTimeoutError:
return failure(err)
proc streamOneBlock(): Future[void] {.async.} =
try:
await stream.pushData(blk.data)
except CatchableError as exc:
trace "Unable to send block", cid, exc = exc.msg
discard
finally:
await stream.pushEof()
return await self.streamSingleBlock(cid)
asyncSpawn streamOneBlock()
LPStream(stream).success()
self.streamEntireDataset(manifest, cid)
proc store*(
self: CodexNodeRef,

View File

@ -1,4 +1,4 @@
import std/tables
import std/times
import pkg/libp2p
@ -6,10 +6,25 @@ import pkg/chronos
import pkg/codex/codextypes
import pkg/codex/chunker
import pkg/codex/stores
import pkg/codex/slots
import ../../asynctest
type CountingStore* = ref object of NetworkStore
lookups*: Table[Cid, int]
proc new*(T: type CountingStore,
engine: BlockExcEngine, localStore: BlockStore): CountingStore =
# XXX this works cause NetworkStore.new is trivial
result = CountingStore(engine: engine, localStore: localStore)
method getBlock*(self: CountingStore,
address: BlockAddress): Future[?!Block] {.async.} =
self.lookups.mgetOrPut(address.cid, 0).inc
await procCall getBlock(NetworkStore(self), address)
proc toTimesDuration*(d: chronos.Duration): times.Duration =
initDuration(seconds = d.seconds)

View File

@ -62,6 +62,21 @@ asyncchecksuite "Test Node - Basic":
check:
fetched == manifest
test "Should not lookup non-existing blocks twice":
# https://github.com/codex-storage/nim-codex/issues/699
let
cstore = CountingStore.new(engine, localStore)
node = CodexNodeRef.new(switch, cstore, engine, blockDiscovery)
missingCid = Cid.init(
"zDvZRwzmCvtiyubW9AecnxgLnXK8GrBvpQJBDzToxmzDN6Nrc2CZ").get()
engine.blockFetchTimeout = timer.milliseconds(100)
discard await node.retrieve(missingCid, local = false)
let lookupCount = cstore.lookups.getOrDefault(missingCid)
check lookupCount == 1
test "Block Batching":
let manifest = await storeDataGetManifest(localStore, chunker)