mirror of
https://github.com/vacp2p/nim-libp2p-experimental.git
synced 2025-01-28 11:05:09 +00:00
b99d2039a8
* allow multiple codecs per protocol (without breaking things) * add 1.1 protocol to gossip * explicit peering part 1 * explicit peering part 2 * explicit peering part 3 * PeerInfo and ControlPrune protocols * fix encodePrune * validated always, even explicit peers * prune by score (score is stub still) * add a way to pass parameters to gossip * standard setup fixes * take into account explicit direct peers in publish * add floodPublish logic * small fixes, publish still half broken * make sure to waitsub in sparse test * use var semantics to optimize table access * wip... lvalues don't work properly sadly... * big publish refactor, replenish and balance * fix internal tests * use g.peers for fanout (todo: don't include flood peers) * exclude non gossip from fanout * internal test fixes * fix flood tests * fix test's trypublish * test interop fixes * make sure to not remove peers from gossip table * restore old replenishFanout * cleanups * restore utility module import * restore trace vs debug in gossip * improve fanout replenish behavior further * triage publish nil peers (issue is on master too but just hidden behind a if/in) * getGossipPeers fixes * remove topics from pubsubpeer (was unused) * simplify rebalanceMesh (following spec) and make it finally reach D_high * better diagnostics * merge new pubsubpeer, copy 1.1 to new module * fix up merge * conditional enable gossip11 module * add back topics in peers, re-enable flood publish * add more heartbeat locking to prevent races * actually lock the heartbeat * minor fixes * with sugar * merge 1.0 * remove assertion in publish * fix multistream 1.1 multi proto * Fix merge oops * wip * fix gossip 11 upstream * gossipsub11 -> gossipsub * support interop testing * tests fixing * fix directchat build * control prune updates (pb) * wip parameters * gossip internal tests fixes * parameters wip * finishup with params * cleanups/wip * small sugar * grafted and pruned procs * wip updateScores * wip * fix logging issue * pubsubpeer, chronicles explicit override * fix internal gossip tests * wip * tables troubleshooting * score wip * score wip * fixes * fix test utils generateNodes * don't delete while iterating in score update * fix grafted defect * add a handleConnect in subscribeTopic * pruning improvements * wip * score fixes * post merge - builds gossip tests * further merge fixes * rebalance improvements and opportunistic grafting * fix test for now * restore explicit peering * implement peer exchange graft message * add an hard cap to PX * backoff time management * IWANT cap/budget * Adaptive gossip dissemination * outbound mesh quota, internal tests fixing * oversub prune score based, finish outbound quota * finishup with score and ihave budget * use go daemon 0.3.0 * import fixes * byScore cleanup score sorting * remove pointless scaling in `/` Duration operator * revert using libp2p org for daemon * interop fixes * fixes and cleanup * remove heartbeat assertion, minor debug fixes * logging improvements and cleaning up * (to revert) add some traces * add explicit topic to gossip rpcs * pubsub merge fixes and type fix in switch * Revert "(to revert) add some traces" This reverts commit 4663eaab6cc336c81cee50bc54025cf0b7bcbd99. * cleanup some now irrelevant todo * shuffle peers anyway as score might be disabled * add missing shuffle * old merge fix * more merge fixes * debug improvements * re-enable gossip internal tests * add gossip10 fallback (dormant but tested) * split gossipsub internal tests into 1.0 and 1.1 Co-authored-by: Dmitriy Ryajov <dryajov@gmail.com>
472 lines
14 KiB
Nim
472 lines
14 KiB
Nim
import options, tables
|
|
import unittest
|
|
import chronos, chronicles, stew/byteutils
|
|
import helpers
|
|
import ../libp2p/[daemon/daemonapi,
|
|
protobuf/minprotobuf,
|
|
vbuffer,
|
|
multiaddress,
|
|
multicodec,
|
|
cid,
|
|
varint,
|
|
multihash,
|
|
standard_setup,
|
|
peerid,
|
|
peerinfo,
|
|
switch,
|
|
stream/connection,
|
|
muxers/muxer,
|
|
crypto/crypto,
|
|
muxers/mplex/mplex,
|
|
muxers/muxer,
|
|
protocols/protocol,
|
|
protocols/identify,
|
|
transports/transport,
|
|
transports/tcptransport,
|
|
protocols/secure/secure,
|
|
protocols/pubsub/pubsub,
|
|
protocols/pubsub/floodsub,
|
|
protocols/pubsub/gossipsub]
|
|
|
|
type
|
|
# TODO: Unify both PeerInfo structs
|
|
NativePeerInfo = peerinfo.PeerInfo
|
|
DaemonPeerInfo = daemonapi.PeerInfo
|
|
|
|
proc writeLp*(s: StreamTransport, msg: string | seq[byte]): Future[int] {.gcsafe.} =
|
|
## write lenght prefixed
|
|
var buf = initVBuffer()
|
|
buf.writeSeq(msg)
|
|
buf.finish()
|
|
result = s.write(buf.buffer)
|
|
|
|
proc readLp*(s: StreamTransport): Future[seq[byte]] {.async, gcsafe.} =
|
|
## read length prefixed msg
|
|
var
|
|
size: uint
|
|
length: int
|
|
res: VarintResult[void]
|
|
result = newSeq[byte](10)
|
|
|
|
for i in 0..<len(result):
|
|
await s.readExactly(addr result[i], 1)
|
|
res = LP.getUVarint(result.toOpenArray(0, i), length, size)
|
|
if res.isOk():
|
|
break
|
|
res.expect("Valid varint")
|
|
result.setLen(size)
|
|
if size > 0.uint:
|
|
await s.readExactly(addr result[0], int(size))
|
|
|
|
proc testPubSubDaemonPublish(gossip: bool = false,
|
|
count: int = 1): Future[bool] {.async.} =
|
|
var pubsubData = "TEST MESSAGE"
|
|
var testTopic = "test-topic"
|
|
var msgData = pubsubData.toBytes()
|
|
|
|
var flags = {PSFloodSub}
|
|
if gossip:
|
|
flags = {PSGossipSub}
|
|
|
|
let daemonNode = await newDaemonApi(flags)
|
|
let daemonPeer = await daemonNode.identity()
|
|
let nativeNode = newStandardSwitch(
|
|
secureManagers = [SecureProtocol.Noise],
|
|
outTimeout = 5.minutes)
|
|
|
|
let pubsub = if gossip:
|
|
GossipSub.init(
|
|
switch = nativeNode).PubSub
|
|
else:
|
|
FloodSub.init(
|
|
switch = nativeNode).PubSub
|
|
|
|
nativeNode.mount(pubsub)
|
|
|
|
let awaiters = nativeNode.start()
|
|
await pubsub.start()
|
|
let nativePeer = nativeNode.peerInfo
|
|
|
|
var finished = false
|
|
var times = 0
|
|
proc nativeHandler(topic: string, data: seq[byte]) {.async.} =
|
|
let smsg = string.fromBytes(data)
|
|
check smsg == pubsubData
|
|
times.inc()
|
|
if times >= count and not finished:
|
|
finished = true
|
|
|
|
let peer = NativePeerInfo.init(
|
|
daemonPeer.peer,
|
|
daemonPeer.addresses)
|
|
await nativeNode.connect(peer.peerId, peer.addrs)
|
|
|
|
await sleepAsync(1.seconds)
|
|
await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
|
|
|
|
proc pubsubHandler(api: DaemonAPI,
|
|
ticket: PubsubTicket,
|
|
message: PubSubMessage): Future[bool] {.async.} =
|
|
result = true # don't cancel subscription
|
|
|
|
asyncDiscard daemonNode.pubsubSubscribe(testTopic, pubsubHandler)
|
|
await pubsub.subscribe(testTopic, nativeHandler)
|
|
await sleepAsync(5.seconds)
|
|
|
|
proc publisher() {.async.} =
|
|
while not finished:
|
|
await daemonNode.pubsubPublish(testTopic, msgData)
|
|
await sleepAsync(500.millis)
|
|
|
|
await wait(publisher(), 5.minutes) # should be plenty of time
|
|
|
|
result = true
|
|
await nativeNode.stop()
|
|
await pubsub.stop()
|
|
await allFutures(awaiters)
|
|
await daemonNode.close()
|
|
|
|
proc testPubSubNodePublish(gossip: bool = false,
|
|
count: int = 1): Future[bool] {.async.} =
|
|
var pubsubData = "TEST MESSAGE"
|
|
var testTopic = "test-topic"
|
|
var msgData = pubsubData.toBytes()
|
|
|
|
var flags = {PSFloodSub}
|
|
if gossip:
|
|
flags = {PSGossipSub}
|
|
|
|
let daemonNode = await newDaemonApi(flags)
|
|
let daemonPeer = await daemonNode.identity()
|
|
let nativeNode = newStandardSwitch(
|
|
secureManagers = [SecureProtocol.Noise],
|
|
outTimeout = 5.minutes)
|
|
|
|
let pubsub = if gossip:
|
|
GossipSub.init(
|
|
switch = nativeNode).PubSub
|
|
else:
|
|
FloodSub.init(
|
|
switch = nativeNode).PubSub
|
|
|
|
nativeNode.mount(pubsub)
|
|
|
|
let awaiters = nativeNode.start()
|
|
await pubsub.start()
|
|
let nativePeer = nativeNode.peerInfo
|
|
|
|
let peer = NativePeerInfo.init(
|
|
daemonPeer.peer,
|
|
daemonPeer.addresses)
|
|
await nativeNode.connect(peer)
|
|
|
|
await sleepAsync(1.seconds)
|
|
await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
|
|
|
|
var times = 0
|
|
var finished = false
|
|
proc pubsubHandler(api: DaemonAPI,
|
|
ticket: PubsubTicket,
|
|
message: PubSubMessage): Future[bool] {.async.} =
|
|
let smsg = string.fromBytes(message.data)
|
|
check smsg == pubsubData
|
|
times.inc()
|
|
if times >= count and not finished:
|
|
finished = true
|
|
result = true # don't cancel subscription
|
|
|
|
discard await daemonNode.pubsubSubscribe(testTopic, pubsubHandler)
|
|
proc nativeHandler(topic: string, data: seq[byte]) {.async.} = discard
|
|
await pubsub.subscribe(testTopic, nativeHandler)
|
|
await sleepAsync(5.seconds)
|
|
|
|
proc publisher() {.async.} =
|
|
while not finished:
|
|
discard await pubsub.publish(testTopic, msgData)
|
|
await sleepAsync(500.millis)
|
|
|
|
await wait(publisher(), 5.minutes) # should be plenty of time
|
|
|
|
result = finished
|
|
await nativeNode.stop()
|
|
await pubsub.stop()
|
|
await allFutures(awaiters)
|
|
await daemonNode.close()
|
|
|
|
suite "Interop":
|
|
# TODO: chronos transports are leaking,
|
|
# but those are tracked for both the daemon
|
|
# and libp2p, so not sure which one it is,
|
|
# need to investigate more
|
|
# teardown:
|
|
# for tracker in testTrackers():
|
|
# # echo tracker.dump()
|
|
# # check tracker.isLeaked() == false
|
|
|
|
# TODO: this test is failing sometimes on windows
|
|
# For some reason we receive EOF before test 4 sometimes
|
|
test "native -> daemon multiple reads and writes":
|
|
proc runTests(): Future[bool] {.async.} =
|
|
var protos = @["/test-stream"]
|
|
|
|
let nativeNode = newStandardSwitch(
|
|
secureManagers = [SecureProtocol.Noise],
|
|
outTimeout = 5.minutes)
|
|
|
|
let awaiters = await nativeNode.start()
|
|
let daemonNode = await newDaemonApi()
|
|
let daemonPeer = await daemonNode.identity()
|
|
|
|
var testFuture = newFuture[void]("test.future")
|
|
proc daemonHandler(api: DaemonAPI, stream: P2PStream) {.async.} =
|
|
check string.fromBytes(await stream.transp.readLp()) == "test 1"
|
|
asyncDiscard stream.transp.writeLp("test 2")
|
|
|
|
await sleepAsync(10.millis)
|
|
check string.fromBytes(await stream.transp.readLp()) == "test 3"
|
|
asyncDiscard stream.transp.writeLp("test 4")
|
|
testFuture.complete()
|
|
await stream.close()
|
|
|
|
await daemonNode.addHandler(protos, daemonHandler)
|
|
let conn = await nativeNode.dial(NativePeerInfo.init(daemonPeer.peer,
|
|
daemonPeer.addresses),
|
|
protos[0])
|
|
await conn.writeLp("test 1")
|
|
check "test 2" == string.fromBytes((await conn.readLp(1024)))
|
|
await sleepAsync(10.millis)
|
|
|
|
await conn.writeLp("test 3")
|
|
check "test 4" == string.fromBytes((await conn.readLp(1024)))
|
|
|
|
await wait(testFuture, 10.secs)
|
|
await conn.close()
|
|
|
|
await daemonNode.close()
|
|
await nativeNode.stop()
|
|
await allFutures(awaiters)
|
|
|
|
await sleepAsync(1.seconds)
|
|
result = true
|
|
|
|
check:
|
|
waitFor(runTests()) == true
|
|
|
|
test "native -> daemon connection":
|
|
proc runTests(): Future[bool] {.async.} =
|
|
var protos = @["/test-stream"]
|
|
var test = "TEST STRING"
|
|
# We are preparing expect string, which should be prefixed with varint
|
|
# length and do not have `\r\n` suffix, because we going to use
|
|
# readLine().
|
|
var buffer = initVBuffer()
|
|
buffer.writeSeq(test & "\r\n")
|
|
buffer.finish()
|
|
var expect = newString(len(buffer) - 2)
|
|
copyMem(addr expect[0], addr buffer.buffer[0], len(expect))
|
|
|
|
let nativeNode = newStandardSwitch(
|
|
secureManagers = [SecureProtocol.Noise],
|
|
outTimeout = 5.minutes)
|
|
|
|
let awaiters = await nativeNode.start()
|
|
|
|
let daemonNode = await newDaemonApi()
|
|
let daemonPeer = await daemonNode.identity()
|
|
|
|
var testFuture = newFuture[string]("test.future")
|
|
proc daemonHandler(api: DaemonAPI, stream: P2PStream) {.async.} =
|
|
# We should perform `readLp()` instead of `readLine()`. `readLine()`
|
|
# here reads actually length prefixed string.
|
|
var line = await stream.transp.readLine()
|
|
check line == expect
|
|
testFuture.complete(line)
|
|
await stream.close()
|
|
|
|
await daemonNode.addHandler(protos, daemonHandler)
|
|
let conn = await nativeNode.dial(NativePeerInfo.init(daemonPeer.peer,
|
|
daemonPeer.addresses),
|
|
protos[0])
|
|
await conn.writeLp(test & "\r\n")
|
|
result = expect == (await wait(testFuture, 10.secs))
|
|
|
|
await conn.close()
|
|
await nativeNode.stop()
|
|
await allFutures(awaiters)
|
|
await daemonNode.close()
|
|
|
|
check:
|
|
waitFor(runTests()) == true
|
|
|
|
test "daemon -> native connection":
|
|
proc runTests(): Future[bool] {.async.} =
|
|
var protos = @["/test-stream"]
|
|
var test = "TEST STRING"
|
|
|
|
var testFuture = newFuture[string]("test.future")
|
|
proc nativeHandler(conn: Connection, proto: string) {.async.} =
|
|
var line = string.fromBytes(await conn.readLp(1024))
|
|
check line == test
|
|
testFuture.complete(line)
|
|
await conn.close()
|
|
|
|
# custom proto
|
|
var proto = new LPProtocol
|
|
proto.handler = nativeHandler
|
|
proto.codec = protos[0] # codec
|
|
|
|
let nativeNode = newStandardSwitch(
|
|
secureManagers = [SecureProtocol.Noise], outTimeout = 5.minutes)
|
|
|
|
nativeNode.mount(proto)
|
|
|
|
let awaiters = await nativeNode.start()
|
|
let nativePeer = nativeNode.peerInfo
|
|
|
|
let daemonNode = await newDaemonApi()
|
|
await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
|
|
var stream = await daemonNode.openStream(nativePeer.peerId, protos)
|
|
discard await stream.transp.writeLp(test)
|
|
|
|
result = test == (await wait(testFuture, 10.secs))
|
|
|
|
await stream.close()
|
|
await nativeNode.stop()
|
|
await allFutures(awaiters)
|
|
await daemonNode.close()
|
|
await sleepAsync(1.seconds)
|
|
|
|
check:
|
|
waitFor(runTests()) == true
|
|
|
|
test "daemon -> multiple reads and writes":
|
|
proc runTests(): Future[bool] {.async.} =
|
|
var protos = @["/test-stream"]
|
|
|
|
var testFuture = newFuture[void]("test.future")
|
|
proc nativeHandler(conn: Connection, proto: string) {.async.} =
|
|
check "test 1" == string.fromBytes(await conn.readLp(1024))
|
|
await conn.writeLp("test 2".toBytes())
|
|
|
|
check "test 3" == string.fromBytes(await conn.readLp(1024))
|
|
await conn.writeLp("test 4".toBytes())
|
|
|
|
testFuture.complete()
|
|
await conn.close()
|
|
|
|
# custom proto
|
|
var proto = new LPProtocol
|
|
proto.handler = nativeHandler
|
|
proto.codec = protos[0] # codec
|
|
|
|
let nativeNode = newStandardSwitch(
|
|
secureManagers = [SecureProtocol.Noise], outTimeout = 5.minutes)
|
|
|
|
nativeNode.mount(proto)
|
|
|
|
let awaiters = await nativeNode.start()
|
|
let nativePeer = nativeNode.peerInfo
|
|
|
|
let daemonNode = await newDaemonApi()
|
|
await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
|
|
var stream = await daemonNode.openStream(nativePeer.peerId, protos)
|
|
|
|
asyncDiscard stream.transp.writeLp("test 1")
|
|
check "test 2" == string.fromBytes(await stream.transp.readLp())
|
|
|
|
asyncDiscard stream.transp.writeLp("test 3")
|
|
check "test 4" == string.fromBytes(await stream.transp.readLp())
|
|
|
|
await wait(testFuture, 10.secs)
|
|
|
|
result = true
|
|
await stream.close()
|
|
await nativeNode.stop()
|
|
await allFutures(awaiters)
|
|
await daemonNode.close()
|
|
|
|
check:
|
|
waitFor(runTests()) == true
|
|
|
|
test "read write multiple":
|
|
proc runTests(): Future[bool] {.async.} =
|
|
var protos = @["/test-stream"]
|
|
var test = "TEST STRING"
|
|
|
|
var count = 0
|
|
var testFuture = newFuture[int]("test.future")
|
|
proc nativeHandler(conn: Connection, proto: string) {.async.} =
|
|
while count < 10:
|
|
var line = string.fromBytes(await conn.readLp(1024))
|
|
check line == test
|
|
await conn.writeLp(test.toBytes())
|
|
count.inc()
|
|
|
|
testFuture.complete(count)
|
|
await conn.close()
|
|
|
|
# custom proto
|
|
var proto = new LPProtocol
|
|
proto.handler = nativeHandler
|
|
proto.codec = protos[0] # codec
|
|
|
|
let nativeNode = newStandardSwitch(
|
|
secureManagers = [SecureProtocol.Noise], outTimeout = 5.minutes)
|
|
|
|
nativeNode.mount(proto)
|
|
|
|
let awaiters = await nativeNode.start()
|
|
let nativePeer = nativeNode.peerInfo
|
|
|
|
let daemonNode = await newDaemonApi()
|
|
await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
|
|
var stream = await daemonNode.openStream(nativePeer.peerId, protos)
|
|
|
|
var count2 = 0
|
|
while count2 < 10:
|
|
discard await stream.transp.writeLp(test)
|
|
let line = await stream.transp.readLp()
|
|
check test == string.fromBytes(line)
|
|
inc(count2)
|
|
|
|
result = 10 == (await wait(testFuture, 1.minutes))
|
|
await stream.close()
|
|
await nativeNode.stop()
|
|
await allFutures(awaiters)
|
|
await daemonNode.close()
|
|
|
|
check:
|
|
waitFor(runTests()) == true
|
|
|
|
test "floodsub: daemon publish one":
|
|
check:
|
|
waitFor(testPubSubDaemonPublish()) == true
|
|
|
|
test "floodsub: daemon publish many":
|
|
check:
|
|
waitFor(testPubSubDaemonPublish(count = 10)) == true
|
|
|
|
test "gossipsub: daemon publish one":
|
|
check:
|
|
waitFor(testPubSubDaemonPublish(gossip = true)) == true
|
|
|
|
test "gossipsub: daemon publish many":
|
|
check:
|
|
waitFor(testPubSubDaemonPublish(gossip = true, count = 10)) == true
|
|
|
|
test "floodsub: node publish one":
|
|
check:
|
|
waitFor(testPubSubNodePublish()) == true
|
|
|
|
test "floodsub: node publish many":
|
|
check:
|
|
waitFor(testPubSubNodePublish(count = 10)) == true
|
|
|
|
test "gossipsub: node publish one":
|
|
check:
|
|
waitFor(testPubSubNodePublish(gossip = true)) == true
|
|
|
|
test "gossipsub: node publish many":
|
|
check:
|
|
waitFor(testPubSubNodePublish(gossip = true, count = 10)) == true
|