fixes double lookups when block does not exist (#739)
* fixes double lookups when block does not exist * handle timeouts on requestBlock * fix indentation which was causing an integration test to fail
This commit is contained in:
parent
4d069599c9
commit
d33804f700
|
@ -73,6 +73,7 @@ type
|
|||
peersPerRequest: int # Max number of peers to request from
|
||||
wallet*: WalletRef # Nitro wallet for micropayments
|
||||
pricing*: ?Pricing # Optional bandwidth pricing
|
||||
blockFetchTimeout*: Duration # Timeout for fetching blocks over the network
|
||||
discovery*: DiscoveryEngine
|
||||
|
||||
Pricing* = object
|
||||
|
@ -173,12 +174,10 @@ proc monitorBlockHandle(
|
|||
proc requestBlock*(
|
||||
b: BlockExcEngine,
|
||||
address: BlockAddress,
|
||||
timeout = DefaultBlockTimeout): Future[?!Block] {.async.} =
|
||||
let blockFuture = b.pendingBlocks.getWantHandle(address, timeout)
|
||||
): Future[?!Block] {.async.} =
|
||||
let blockFuture = b.pendingBlocks.getWantHandle(address, b.blockFetchTimeout)
|
||||
|
||||
if b.pendingBlocks.isInFlight(address):
|
||||
success await blockFuture
|
||||
else:
|
||||
if not b.pendingBlocks.isInFlight(address):
|
||||
let peers = b.peers.selectCheapest(address)
|
||||
if peers.len == 0:
|
||||
b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid])
|
||||
|
@ -199,12 +198,17 @@ proc requestBlock*(
|
|||
await b.sendWantHave(address, @[peer], toSeq(b.peers))
|
||||
codex_block_exchange_want_have_lists_sent.inc()
|
||||
|
||||
# Don't let timeouts bubble up. We can't be too broad here or we break
|
||||
# cancellations.
|
||||
try:
|
||||
success await blockFuture
|
||||
except AsyncTimeoutError as err:
|
||||
failure err
|
||||
|
||||
proc requestBlock*(
|
||||
b: BlockExcEngine,
|
||||
cid: Cid,
|
||||
timeout = DefaultBlockTimeout): Future[?!Block] =
|
||||
cid: Cid
|
||||
): Future[?!Block] =
|
||||
b.requestBlock(BlockAddress.init(cid))
|
||||
|
||||
proc blockPresenceHandler*(
|
||||
|
@ -614,7 +618,8 @@ proc new*(
|
|||
peerStore: PeerCtxStore,
|
||||
pendingBlocks: PendingBlocksManager,
|
||||
concurrentTasks = DefaultConcurrentTasks,
|
||||
peersPerRequest = DefaultMaxPeersPerRequest
|
||||
peersPerRequest = DefaultMaxPeersPerRequest,
|
||||
blockFetchTimeout = DefaultBlockTimeout,
|
||||
): BlockExcEngine =
|
||||
## Create new block exchange engine instance
|
||||
##
|
||||
|
@ -629,7 +634,8 @@ proc new*(
|
|||
wallet: wallet,
|
||||
concurrentTasks: concurrentTasks,
|
||||
taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize),
|
||||
discovery: discovery)
|
||||
discovery: discovery,
|
||||
blockFetchTimeout: blockFetchTimeout)
|
||||
|
||||
proc peerEventHandler(peerId: PeerId, event: PeerEvent) {.async.} =
|
||||
if event.kind == PeerEventKind.Joined:
|
||||
|
|
|
@ -209,6 +209,62 @@ proc fetchBatched*(
|
|||
let iter = Iter.fromSlice(0..<manifest.blocksCount)
|
||||
self.fetchBatched(manifest.treeCid, iter, batchSize, onBatch)
|
||||
|
||||
proc streamSingleBlock(
|
||||
self: CodexNodeRef,
|
||||
cid: Cid
|
||||
): Future[?!LPstream] {.async.} =
|
||||
## Streams the contents of a single block.
|
||||
##
|
||||
trace "Streaming single block", cid = cid
|
||||
|
||||
let
|
||||
stream = BufferStream.new()
|
||||
|
||||
without blk =? (await self.networkStore.getBlock(BlockAddress.init(cid))), err:
|
||||
return failure(err)
|
||||
|
||||
proc streamOneBlock(): Future[void] {.async.} =
|
||||
try:
|
||||
await stream.pushData(blk.data)
|
||||
except CatchableError as exc:
|
||||
trace "Unable to send block", cid, exc = exc.msg
|
||||
discard
|
||||
finally:
|
||||
await stream.pushEof()
|
||||
|
||||
asyncSpawn streamOneBlock()
|
||||
LPStream(stream).success
|
||||
|
||||
proc streamEntireDataset(
|
||||
self: CodexNodeRef,
|
||||
manifest: Manifest,
|
||||
manifestCid: Cid,
|
||||
): ?!LPStream =
|
||||
## Streams the contents of the entire dataset described by the manifest.
|
||||
##
|
||||
trace "Retrieving blocks from manifest", manifestCid
|
||||
|
||||
if manifest.protected:
|
||||
# Retrieve, decode and save to the local store all EС groups
|
||||
proc erasureJob(): Future[void] {.async.} =
|
||||
try:
|
||||
# Spawn an erasure decoding job
|
||||
let
|
||||
erasure = Erasure.new(
|
||||
self.networkStore,
|
||||
leoEncoderProvider,
|
||||
leoDecoderProvider)
|
||||
without _ =? (await erasure.decode(manifest)), error:
|
||||
trace "Unable to erasure decode manifest", manifestCid, exc = error.msg
|
||||
except CatchableError as exc:
|
||||
trace "Exception decoding manifest", manifestCid, exc = exc.msg
|
||||
|
||||
asyncSpawn erasureJob()
|
||||
|
||||
# Retrieve all blocks of the dataset sequentially from the local store or network
|
||||
trace "Creating store stream for manifest", manifestCid
|
||||
LPStream(StoreStream.new(self.networkStore, manifest, pad = false)).success
|
||||
|
||||
proc retrieve*(
|
||||
self: CodexNodeRef,
|
||||
cid: Cid,
|
||||
|
@ -219,46 +275,13 @@ proc retrieve*(
|
|||
if local and not await (cid in self.networkStore):
|
||||
return failure((ref BlockNotFoundError)(msg: "Block not found in local store"))
|
||||
|
||||
if manifest =? (await self.fetchManifest(cid)):
|
||||
trace "Retrieving blocks from manifest", cid
|
||||
if manifest.protected:
|
||||
# Retrieve, decode and save to the local store all EС groups
|
||||
proc erasureJob(): Future[void] {.async.} =
|
||||
try:
|
||||
# Spawn an erasure decoding job
|
||||
let
|
||||
erasure = Erasure.new(
|
||||
self.networkStore,
|
||||
leoEncoderProvider,
|
||||
leoDecoderProvider)
|
||||
without _ =? (await erasure.decode(manifest)), error:
|
||||
trace "Unable to erasure decode manifest", cid, exc = error.msg
|
||||
except CatchableError as exc:
|
||||
trace "Exception decoding manifest", cid, exc = exc.msg
|
||||
|
||||
asyncSpawn erasureJob()
|
||||
|
||||
# Retrieve all blocks of the dataset sequentially from the local store or network
|
||||
trace "Creating store stream for manifest", cid
|
||||
LPStream(StoreStream.new(self.networkStore, manifest, pad = false)).success
|
||||
else:
|
||||
let
|
||||
stream = BufferStream.new()
|
||||
|
||||
without blk =? (await self.networkStore.getBlock(BlockAddress.init(cid))), err:
|
||||
without manifest =? (await self.fetchManifest(cid)), err:
|
||||
if err of AsyncTimeoutError:
|
||||
return failure(err)
|
||||
|
||||
proc streamOneBlock(): Future[void] {.async.} =
|
||||
try:
|
||||
await stream.pushData(blk.data)
|
||||
except CatchableError as exc:
|
||||
trace "Unable to send block", cid, exc = exc.msg
|
||||
discard
|
||||
finally:
|
||||
await stream.pushEof()
|
||||
return await self.streamSingleBlock(cid)
|
||||
|
||||
asyncSpawn streamOneBlock()
|
||||
LPStream(stream).success()
|
||||
self.streamEntireDataset(manifest, cid)
|
||||
|
||||
proc store*(
|
||||
self: CodexNodeRef,
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
|
||||
import std/tables
|
||||
import std/times
|
||||
|
||||
import pkg/libp2p
|
||||
|
@ -6,10 +6,25 @@ import pkg/chronos
|
|||
|
||||
import pkg/codex/codextypes
|
||||
import pkg/codex/chunker
|
||||
import pkg/codex/stores
|
||||
import pkg/codex/slots
|
||||
|
||||
import ../../asynctest
|
||||
|
||||
type CountingStore* = ref object of NetworkStore
|
||||
lookups*: Table[Cid, int]
|
||||
|
||||
proc new*(T: type CountingStore,
|
||||
engine: BlockExcEngine, localStore: BlockStore): CountingStore =
|
||||
# XXX this works cause NetworkStore.new is trivial
|
||||
result = CountingStore(engine: engine, localStore: localStore)
|
||||
|
||||
method getBlock*(self: CountingStore,
|
||||
address: BlockAddress): Future[?!Block] {.async.} =
|
||||
|
||||
self.lookups.mgetOrPut(address.cid, 0).inc
|
||||
await procCall getBlock(NetworkStore(self), address)
|
||||
|
||||
proc toTimesDuration*(d: chronos.Duration): times.Duration =
|
||||
initDuration(seconds = d.seconds)
|
||||
|
||||
|
|
|
@ -62,6 +62,21 @@ asyncchecksuite "Test Node - Basic":
|
|||
check:
|
||||
fetched == manifest
|
||||
|
||||
test "Should not lookup non-existing blocks twice":
|
||||
# https://github.com/codex-storage/nim-codex/issues/699
|
||||
let
|
||||
cstore = CountingStore.new(engine, localStore)
|
||||
node = CodexNodeRef.new(switch, cstore, engine, blockDiscovery)
|
||||
missingCid = Cid.init(
|
||||
"zDvZRwzmCvtiyubW9AecnxgLnXK8GrBvpQJBDzToxmzDN6Nrc2CZ").get()
|
||||
|
||||
engine.blockFetchTimeout = timer.milliseconds(100)
|
||||
|
||||
discard await node.retrieve(missingCid, local = false)
|
||||
|
||||
let lookupCount = cstore.lookups.getOrDefault(missingCid)
|
||||
check lookupCount == 1
|
||||
|
||||
test "Block Batching":
|
||||
let manifest = await storeDataGetManifest(localStore, chunker)
|
||||
|
||||
|
|
Loading…
Reference in New Issue