Advertise manifests and trees (#797)

* wip update advertise loop to announce only manifests and trees

* fixes e2e tests

* fixes tests for discovery engine

---------

Co-authored-by: Dmitriy Ryajov <dryajov@gmail.com>
This commit is contained in:
Ben Bierens 2024-05-09 10:03:35 +02:00 committed by GitHub
parent 1a0d2d424e
commit 9379c7c662
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 102 additions and 95 deletions

View File

@ -11,6 +11,7 @@ import std/sequtils
import pkg/chronos
import pkg/libp2p/cid
import pkg/libp2p/multicodec
import pkg/metrics
import pkg/questionable
import pkg/questionable/results
@ -25,6 +26,7 @@ import ../../utils
import ../../discovery
import ../../stores/blockstore
import ../../logutils
import ../../manifest
logScope:
topics = "codex discoveryengine"
@ -76,14 +78,32 @@ proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} =
await sleepAsync(b.discoveryLoopSleep)
proc advertiseBlock(b: DiscoveryEngine, cid: Cid) {.async.} =
without isM =? cid.isManifest, err:
warn "Unable to determine if cid is manifest"
return
if isM:
without blk =? await b.localStore.getBlock(cid), err:
error "Error retrieving manifest block", cid, err = err.msg
return
without manifest =? Manifest.decode(blk), err:
error "Unable to decode as manifest", err = err.msg
return
# announce manifest cid and tree cid
await b.advertiseQueue.put(cid)
await b.advertiseQueue.put(manifest.treeCid)
proc advertiseQueueLoop(b: DiscoveryEngine) {.async.} =
while b.discEngineRunning:
if cids =? await b.localStore.listBlocks(blockType = b.advertiseType):
trace "Begin iterating blocks..."
for c in cids:
if cid =? await c:
await b.advertiseQueue.put(cid)
await sleepAsync(50.millis)
b.advertiseBlock(cid)
await sleepAsync(100.millis)
trace "Iterating blocks finished."
await sleepAsync(b.advertiseLoopSleep)
@ -248,7 +268,7 @@ proc new*(
discoveryLoopSleep = DefaultDiscoveryLoopSleep,
advertiseLoopSleep = DefaultAdvertiseLoopSleep,
minPeersPerBlock = DefaultMinPeersPerBlock,
advertiseType = BlockType.Both
advertiseType = BlockType.Manifest
): DiscoveryEngine =
## Create a discovery engine instance for advertising services
##

View File

@ -99,50 +99,33 @@ asyncchecksuite "Block Advertising and Discovery":
await engine.stop()
test "Should advertise both manifests and blocks":
test "Should advertise both manifests and trees":
let
cids = @[manifest.cid.tryGet, manifest.treeCid]
advertised = initTable.collect:
for b in (blocks & manifestBlock): {b.cid: newFuture[void]()}
for cid in cids: {cid: newFuture[void]()}
blockDiscovery
.publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid) {.async.} =
if cid in advertised and not advertised[cid].finished():
advertised[cid].complete()
discovery.advertiseType = BlockType.Both
await engine.start() # fire up advertise loop
await engine.start()
await allFuturesThrowing(
allFinished(toSeq(advertised.values)))
await engine.stop()
test "Should advertise local manifests":
test "Should not advertise local blocks":
let
advertised = newFuture[Cid]()
blockCids = blocks.mapIt(it.cid)
blockDiscovery
.publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid) {.async.} =
check manifestBlock.cid == cid
advertised.complete(cid)
check:
cid notin blockCids
discovery.advertiseType = BlockType.Manifest
await engine.start() # fire up advertise loop
check (await advertised.wait(10.millis)) == manifestBlock.cid
await engine.stop()
test "Should advertise local blocks":
let
advertised = initTable.collect:
for b in blocks: {b.cid: newFuture[void]()}
blockDiscovery
.publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid) {.async.} =
if cid in advertised and not advertised[cid].finished():
advertised[cid].complete()
discovery.advertiseType = BlockType.Block
await engine.start() # fire up advertise loop
await allFuturesThrowing(
allFinished(toSeq(advertised.values)))
await engine.start()
await sleepAsync(3.seconds)
await engine.stop()
test "Should not launch discovery if remote peer has block":
@ -165,7 +148,7 @@ asyncchecksuite "Block Advertising and Discovery":
proc(d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] =
check false
await engine.start() # fire up discovery loop
await engine.start()
engine.pendingBlocks.resolve(blocks.mapIt(BlockDelivery(blk: it, address: it.address)))
await allFuturesThrowing(
@ -173,23 +156,33 @@ asyncchecksuite "Block Advertising and Discovery":
await engine.stop()
asyncchecksuite "E2E - Multiple Nodes Discovery":
let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256)
proc asBlock(m: Manifest): bt.Block =
let mdata = m.encode().tryGet()
bt.Block.new(data = mdata, codec = ManifestCodec).tryGet()
asyncchecksuite "E2E - Multiple Nodes Discovery":
var
switch: seq[Switch]
blockexc: seq[NetworkStore]
blocks: seq[bt.Block]
manifests: seq[Manifest]
mBlocks: seq[bt.Block]
trees: seq[CodexTree]
setup:
for _ in 0..<4:
let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256)
var blocks = newSeq[bt.Block]()
while true:
let chunk = await chunker.getBytes()
if chunk.len <= 0:
break
blocks.add(bt.Block.new(chunk).tryGet())
let (manifest, tree) = makeManifestAndTree(blocks).tryGet()
manifests.add(manifest)
mBlocks.add(manifest.asBlock())
trees.add(tree)
for _ in 0..<4:
let
s = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr})
blockDiscovery = MockDiscovery.new()
@ -223,9 +216,12 @@ asyncchecksuite "E2E - Multiple Nodes Discovery":
teardown:
switch = @[]
blockexc = @[]
manifests = @[]
mBlocks = @[]
trees = @[]
test "E2E - Should advertise and discover blocks":
# Distribute the blocks amongst 1..3
# Distribute the manifests and trees amongst 1..3
# Ask 0 to download everything without connecting him beforehand
var advertised: Table[Cid, SignedPeerRecord]
@ -242,14 +238,14 @@ asyncchecksuite "E2E - Multiple Nodes Discovery":
.publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} =
advertised[cid] = switch[3].peerInfo.signedPeerRecord
discard blocks[0..5].mapIt(blockexc[1].engine.pendingBlocks.getWantHandle(it.address))
await blockexc[1].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[0..5].mapIt(BlockDelivery(blk: it, address: it.address)))
discard blockexc[1].engine.pendingBlocks.getWantHandle(mBlocks[0].cid)
await blockexc[1].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, @[BlockDelivery(blk: mBlocks[0], address: BlockAddress(leaf: false, cid: mBlocks[0].cid))])
discard blocks[4..10].mapIt(blockexc[2].engine.pendingBlocks.getWantHandle(it.address))
await blockexc[2].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[4..10].mapIt(BlockDelivery(blk: it, address: it.address)))
discard blockexc[2].engine.pendingBlocks.getWantHandle(mBlocks[1].cid)
await blockexc[2].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, @[BlockDelivery(blk: mBlocks[1], address: BlockAddress(leaf: false, cid: mBlocks[1].cid))])
discard blocks[10..15].mapIt(blockexc[3].engine.pendingBlocks.getWantHandle(it.address))
await blockexc[3].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[10..15].mapIt(BlockDelivery(blk: it, address: it.address)))
discard blockexc[3].engine.pendingBlocks.getWantHandle(mBlocks[2].cid)
await blockexc[3].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, @[BlockDelivery(blk: mBlocks[2], address: BlockAddress(leaf: false, cid: mBlocks[2].cid))])
MockDiscovery(blockexc[0].engine.discovery.discovery)
.findBlockProvidersHandler = proc(d: MockDiscovery, cid: Cid):
@ -258,8 +254,8 @@ asyncchecksuite "E2E - Multiple Nodes Discovery":
result.add(advertised[cid])
let futs = collect(newSeq):
for b in blocks:
blockexc[0].engine.requestBlock(b.cid)
for m in mBlocks[0..2]:
blockexc[0].engine.requestBlock(m.cid)
await allFuturesThrowing(
switch.mapIt(it.start()) &
@ -273,7 +269,7 @@ asyncchecksuite "E2E - Multiple Nodes Discovery":
test "E2E - Should advertise and discover blocks with peers already connected":
# Distribute the blocks amongst 1..3
# Ask 0 to download everything without connecting him beforehand
# Ask 0 to download everything *WITH* connecting him beforehand
var advertised: Table[Cid, SignedPeerRecord]
@ -289,14 +285,14 @@ asyncchecksuite "E2E - Multiple Nodes Discovery":
.publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} =
advertised[cid] = switch[3].peerInfo.signedPeerRecord
discard blocks[0..5].mapIt(blockexc[1].engine.pendingBlocks.getWantHandle(it.address))
await blockexc[1].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[0..5].mapIt(BlockDelivery(blk: it, address: it.address)))
discard blockexc[1].engine.pendingBlocks.getWantHandle(mBlocks[0].cid)
await blockexc[1].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, @[BlockDelivery(blk: mBlocks[0], address: BlockAddress(leaf: false, cid: mBlocks[0].cid))])
discard blocks[4..10].mapIt(blockexc[2].engine.pendingBlocks.getWantHandle(it.address))
await blockexc[2].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[4..10].mapIt(BlockDelivery(blk: it, address: it.address)))
discard blockexc[2].engine.pendingBlocks.getWantHandle(mBlocks[1].cid)
await blockexc[2].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, @[BlockDelivery(blk: mBlocks[1], address: BlockAddress(leaf: false, cid: mBlocks[1].cid))])
discard blocks[10..15].mapIt(blockexc[3].engine.pendingBlocks.getWantHandle(it.address))
await blockexc[3].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[10..15].mapIt(BlockDelivery(blk: it, address: it.address)))
discard blockexc[3].engine.pendingBlocks.getWantHandle(mBlocks[2].cid)
await blockexc[3].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, @[BlockDelivery(blk: mBlocks[2], address: BlockAddress(leaf: false, cid: mBlocks[2].cid))])
MockDiscovery(blockexc[0].engine.discovery.discovery)
.findBlockProvidersHandler = proc(d: MockDiscovery, cid: Cid):
@ -305,7 +301,7 @@ asyncchecksuite "E2E - Multiple Nodes Discovery":
return @[advertised[cid]]
let
futs = blocks.mapIt( blockexc[0].engine.requestBlock( it.cid ) )
futs = mBlocks[0..2].mapIt(blockexc[0].engine.requestBlock(it.cid))
await allFuturesThrowing(
switch.mapIt(it.start()) &

View File

@ -10,17 +10,26 @@ import pkg/codex/blockexchange
import pkg/codex/chunker
import pkg/codex/blocktype as bt
import pkg/codex/blockexchange/engine
import pkg/codex/manifest
import pkg/codex/merkletree
import ../../../asynctest
import ../../helpers
import ../../helpers/mockdiscovery
import ../../examples
proc asBlock(m: Manifest): bt.Block =
let mdata = m.encode().tryGet()
bt.Block.new(data = mdata, codec = ManifestCodec).tryGet()
asyncchecksuite "Test Discovery Engine":
let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256)
var
blocks: seq[bt.Block]
manifest: Manifest
tree: CodexTree
manifestBlock: bt.Block
switch: Switch
peerStore: PeerCtxStore
blockDiscovery: MockDiscovery
@ -35,6 +44,10 @@ asyncchecksuite "Test Discovery Engine":
blocks.add(bt.Block.new(chunk).tryGet())
(manifest, tree) = makeManifestAndTree(blocks).tryGet()
manifestBlock = manifest.asBlock()
blocks.add(manifestBlock)
switch = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr})
network = BlockExcNetwork.new(switch)
peerStore = PeerCtxStore.new()
@ -72,8 +85,8 @@ asyncchecksuite "Test Discovery Engine":
pendingBlocks,
discoveryLoopSleep = 100.millis)
haves = collect(initTable):
for b in blocks:
{ b.cid: newFuture[void]() }
for cid in @[manifestBlock.cid, manifest.treeCid]:
{ cid: newFuture[void]() }
blockDiscovery.publishBlockProvideHandler =
proc(d: MockDiscovery, cid: Cid) {.async, gcsafe.} =
@ -108,28 +121,6 @@ asyncchecksuite "Test Discovery Engine":
await want.wait(1.seconds)
await discoveryEngine.stop()
test "Should queue advertise request":
var
localStore = CacheStore.new(@[blocks[0]])
discoveryEngine = DiscoveryEngine.new(
localStore,
peerStore,
network,
blockDiscovery,
pendingBlocks,
discoveryLoopSleep = 100.millis)
have = newFuture[void]()
blockDiscovery.publishBlockProvideHandler =
proc(d: MockDiscovery, cid: Cid) {.async, gcsafe.} =
check cid == blocks[0].cid
if not have.finished:
have.complete()
await discoveryEngine.start()
await have.wait(1.seconds)
await discoveryEngine.stop()
test "Should not request more than minPeersPerBlock":
var
localStore = CacheStore.new()