Announce to DHT only tree and manifest CIDs (#788)
* announce only tree and manifest cids * wip * Adds tests for selecting which CIDs are announced * newline * Review comments by Tomasz
This commit is contained in:
parent
53809129a5
commit
3041f5ff5f
|
@ -17,12 +17,14 @@ import pkg/chronos
|
||||||
import pkg/libp2p/[cid, switch, multihash, multicodec]
|
import pkg/libp2p/[cid, switch, multihash, multicodec]
|
||||||
import pkg/metrics
|
import pkg/metrics
|
||||||
import pkg/stint
|
import pkg/stint
|
||||||
|
import pkg/questionable
|
||||||
|
|
||||||
import ../../stores/blockstore
|
import ../../stores/blockstore
|
||||||
import ../../blocktype
|
import ../../blocktype
|
||||||
import ../../utils
|
import ../../utils
|
||||||
import ../../merkletree
|
import ../../merkletree
|
||||||
import ../../logutils
|
import ../../logutils
|
||||||
|
import ../../manifest
|
||||||
|
|
||||||
import ../protobuf/blockexc
|
import ../protobuf/blockexc
|
||||||
import ../protobuf/presence
|
import ../protobuf/presence
|
||||||
|
@ -293,19 +295,28 @@ proc cancelBlocks(b: BlockExcEngine, addrs: seq[BlockAddress]) {.async.} =
|
||||||
if failed.len > 0:
|
if failed.len > 0:
|
||||||
trace "Failed to send block request cancellations to peers", peers = failed.len
|
trace "Failed to send block request cancellations to peers", peers = failed.len
|
||||||
|
|
||||||
|
proc getAnnouceCids(blocksDelivery: seq[BlockDelivery]): seq[Cid] =
|
||||||
|
var cids = initHashSet[Cid]()
|
||||||
|
for bd in blocksDelivery:
|
||||||
|
if bd.address.leaf:
|
||||||
|
cids.incl(bd.address.treeCid)
|
||||||
|
else:
|
||||||
|
without isM =? bd.address.cid.isManifest, err:
|
||||||
|
warn "Unable to determine if cid is manifest"
|
||||||
|
continue
|
||||||
|
if isM:
|
||||||
|
cids.incl(bd.address.cid)
|
||||||
|
return cids.toSeq
|
||||||
|
|
||||||
proc resolveBlocks*(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} =
|
proc resolveBlocks*(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} =
|
||||||
trace "Resolving blocks", blocks = blocksDelivery.len
|
trace "Resolving blocks", blocks = blocksDelivery.len
|
||||||
|
|
||||||
b.pendingBlocks.resolve(blocksDelivery)
|
b.pendingBlocks.resolve(blocksDelivery)
|
||||||
await b.scheduleTasks(blocksDelivery)
|
await b.scheduleTasks(blocksDelivery)
|
||||||
var cids = initHashSet[Cid]()
|
let announceCids = getAnnouceCids(blocksDelivery)
|
||||||
for bd in blocksDelivery:
|
|
||||||
cids.incl(bd.blk.cid)
|
|
||||||
if bd.address.leaf:
|
|
||||||
cids.incl(bd.address.treeCid)
|
|
||||||
|
|
||||||
await b.cancelBlocks(blocksDelivery.mapIt(it.address))
|
await b.cancelBlocks(blocksDelivery.mapIt(it.address))
|
||||||
b.discovery.queueProvideBlocksReq(cids.toSeq)
|
|
||||||
|
b.discovery.queueProvideBlocksReq(announceCids)
|
||||||
|
|
||||||
proc resolveBlocks*(b: BlockExcEngine, blocks: seq[Block]) {.async.} =
|
proc resolveBlocks*(b: BlockExcEngine, blocks: seq[Block]) {.async.} =
|
||||||
await b.resolveBlocks(
|
await b.resolveBlocks(
|
||||||
|
|
|
@ -15,6 +15,7 @@ import pkg/codex/chunker
|
||||||
import pkg/codex/discovery
|
import pkg/codex/discovery
|
||||||
import pkg/codex/blocktype
|
import pkg/codex/blocktype
|
||||||
import pkg/codex/utils/asyncheapqueue
|
import pkg/codex/utils/asyncheapqueue
|
||||||
|
import pkg/codex/manifest
|
||||||
|
|
||||||
import ../../../asynctest
|
import ../../../asynctest
|
||||||
import ../../helpers
|
import ../../helpers
|
||||||
|
@ -389,6 +390,51 @@ asyncchecksuite "NetworkStore engine handlers":
|
||||||
discard await allFinished(pending)
|
discard await allFinished(pending)
|
||||||
await allFuturesThrowing(cancellations.values().toSeq)
|
await allFuturesThrowing(cancellations.values().toSeq)
|
||||||
|
|
||||||
|
test "resolveBlocks should queue manifest CIDs for discovery":
|
||||||
|
engine.network = BlockExcNetwork(
|
||||||
|
request: BlockExcRequest(sendWantCancellations: NopSendWantCancellationsProc))
|
||||||
|
|
||||||
|
let
|
||||||
|
manifest = Manifest.new(
|
||||||
|
treeCid = Cid.example,
|
||||||
|
blockSize = 123.NBytes,
|
||||||
|
datasetSize = 234.NBytes
|
||||||
|
)
|
||||||
|
|
||||||
|
let manifestBlk = Block.new(data = manifest.encode().tryGet(), codec = ManifestCodec).tryGet()
|
||||||
|
let blks = @[manifestBlk]
|
||||||
|
|
||||||
|
await engine.resolveBlocks(blks)
|
||||||
|
|
||||||
|
check:
|
||||||
|
manifestBlk.cid in engine.discovery.advertiseQueue
|
||||||
|
|
||||||
|
test "resolveBlocks should queue tree CIDs for discovery":
|
||||||
|
engine.network = BlockExcNetwork(
|
||||||
|
request: BlockExcRequest(sendWantCancellations: NopSendWantCancellationsProc))
|
||||||
|
|
||||||
|
let
|
||||||
|
tCid = Cid.example
|
||||||
|
delivery = BlockDelivery(blk: Block.example, address: BlockAddress(leaf: true, treeCid: tCid))
|
||||||
|
|
||||||
|
await engine.resolveBlocks(@[delivery])
|
||||||
|
|
||||||
|
check:
|
||||||
|
tCid in engine.discovery.advertiseQueue
|
||||||
|
|
||||||
|
test "resolveBlocks should not queue non-manifest non-tree CIDs for discovery":
|
||||||
|
engine.network = BlockExcNetwork(
|
||||||
|
request: BlockExcRequest(sendWantCancellations: NopSendWantCancellationsProc))
|
||||||
|
|
||||||
|
let
|
||||||
|
blkCid = Cid.example
|
||||||
|
delivery = BlockDelivery(blk: Block.example, address: BlockAddress(leaf: false, cid: blkCid))
|
||||||
|
|
||||||
|
await engine.resolveBlocks(@[delivery])
|
||||||
|
|
||||||
|
check:
|
||||||
|
blkCid notin engine.discovery.advertiseQueue
|
||||||
|
|
||||||
asyncchecksuite "Task Handler":
|
asyncchecksuite "Task Handler":
|
||||||
var
|
var
|
||||||
rng: Rng
|
rng: Rng
|
||||||
|
|
Loading…
Reference in New Issue