mirror of
https://github.com/status-im/nim-libp2p.git
synced 2025-02-02 16:14:15 +00:00
fix(gossipsub): pubsubpeer is created with wrong gossipsub version (#1116)
This commit is contained in:
parent
4618f4c68f
commit
7498258f7c
@ -862,8 +862,13 @@ method initPubSub*(g: GossipSub) {.raises: [InitializationError].} =
|
|||||||
# init gossip stuff
|
# init gossip stuff
|
||||||
g.mcache = MCache.init(g.parameters.historyGossip, g.parameters.historyLength)
|
g.mcache = MCache.init(g.parameters.historyGossip, g.parameters.historyLength)
|
||||||
|
|
||||||
method getOrCreatePeer*(g: GossipSub, peerId: PeerId, protos: seq[string]): PubSubPeer =
|
method getOrCreatePeer*(
|
||||||
let peer = procCall PubSub(g).getOrCreatePeer(peerId, protos)
|
g: GossipSub,
|
||||||
|
peerId: PeerId,
|
||||||
|
protosToDial: seq[string],
|
||||||
|
protoNegotiated: string = "",
|
||||||
|
): PubSubPeer =
|
||||||
|
let peer = procCall PubSub(g).getOrCreatePeer(peerId, protosToDial, protoNegotiated)
|
||||||
g.parameters.overheadRateLimit.withValue(overheadRateLimit):
|
g.parameters.overheadRateLimit.withValue(overheadRateLimit):
|
||||||
peer.overheadRateLimitOpt =
|
peer.overheadRateLimitOpt =
|
||||||
Opt.some(TokenBucket.new(overheadRateLimit.bytes, overheadRateLimit.interval))
|
Opt.some(TokenBucket.new(overheadRateLimit.bytes, overheadRateLimit.interval))
|
||||||
|
@ -348,19 +348,22 @@ method onPubSubPeerEvent*(
|
|||||||
discard
|
discard
|
||||||
|
|
||||||
method getOrCreatePeer*(
|
method getOrCreatePeer*(
|
||||||
p: PubSub, peerId: PeerId, protos: seq[string]
|
p: PubSub, peerId: PeerId, protosToDial: seq[string], protoNegotiated: string = ""
|
||||||
): PubSubPeer {.base, gcsafe.} =
|
): PubSubPeer {.base, gcsafe.} =
|
||||||
p.peers.withValue(peerId, peer):
|
p.peers.withValue(peerId, peer):
|
||||||
|
if peer[].codec == "":
|
||||||
|
peer[].codec = protoNegotiated
|
||||||
return peer[]
|
return peer[]
|
||||||
|
|
||||||
proc getConn(): Future[Connection] {.async.} =
|
proc getConn(): Future[Connection] {.async.} =
|
||||||
return await p.switch.dial(peerId, protos)
|
return await p.switch.dial(peerId, protosToDial)
|
||||||
|
|
||||||
proc onEvent(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe.} =
|
proc onEvent(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe.} =
|
||||||
p.onPubSubPeerEvent(peer, event)
|
p.onPubSubPeerEvent(peer, event)
|
||||||
|
|
||||||
# create new pubsub peer
|
# create new pubsub peer
|
||||||
let pubSubPeer = PubSubPeer.new(peerId, getConn, onEvent, protos[0], p.maxMessageSize)
|
let pubSubPeer =
|
||||||
|
PubSubPeer.new(peerId, getConn, onEvent, protoNegotiated, p.maxMessageSize)
|
||||||
debug "created new pubsub peer", peerId
|
debug "created new pubsub peer", peerId
|
||||||
|
|
||||||
p.peers[peerId] = pubSubPeer
|
p.peers[peerId] = pubSubPeer
|
||||||
@ -425,7 +428,7 @@ method handleConn*(p: PubSub, conn: Connection, proto: string) {.base, async.} =
|
|||||||
# call pubsub rpc handler
|
# call pubsub rpc handler
|
||||||
p.rpcHandler(peer, data)
|
p.rpcHandler(peer, data)
|
||||||
|
|
||||||
let peer = p.getOrCreatePeer(conn.peerId, @[proto])
|
let peer = p.getOrCreatePeer(conn.peerId, @[], proto)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
peer.handler = handler
|
peer.handler = handler
|
||||||
|
@ -1038,3 +1038,39 @@ suite "GossipSub":
|
|||||||
check currentRateLimitHits() == rateLimitHits + 2
|
check currentRateLimitHits() == rateLimitHits + 2
|
||||||
|
|
||||||
await stopNodes(nodes)
|
await stopNodes(nodes)
|
||||||
|
|
||||||
|
asyncTest "Peer must send right gosspipsub version":
|
||||||
|
func dumbMsgIdProvider(m: Message): Result[MessageId, ValidationResult] =
|
||||||
|
ok(newSeq[byte](10))
|
||||||
|
let node0 = generateNodes(1, gossip = true, msgIdProvider = dumbMsgIdProvider)[0]
|
||||||
|
let node1 = generateNodes(
|
||||||
|
1,
|
||||||
|
gossip = true,
|
||||||
|
msgIdProvider = dumbMsgIdProvider,
|
||||||
|
gossipSubVersion = GossipSubCodec_10,
|
||||||
|
)[0]
|
||||||
|
|
||||||
|
let nodesFut = await allFinished(node0.switch.start(), node1.switch.start())
|
||||||
|
|
||||||
|
await node0.switch.connect(
|
||||||
|
node1.switch.peerInfo.peerId, node1.switch.peerInfo.addrs
|
||||||
|
)
|
||||||
|
|
||||||
|
proc handler(topic: string, data: seq[byte]) {.async.} =
|
||||||
|
discard
|
||||||
|
|
||||||
|
node0.subscribe("foobar", handler)
|
||||||
|
node1.subscribe("foobar", handler)
|
||||||
|
await waitSubGraph(@[node0, node1], "foobar")
|
||||||
|
|
||||||
|
var gossip0: GossipSub = GossipSub(node0)
|
||||||
|
var gossip1: GossipSub = GossipSub(node1)
|
||||||
|
|
||||||
|
checkUntilTimeout:
|
||||||
|
gossip0.mesh.getOrDefault("foobar").toSeq[0].codec == GossipSubCodec_10
|
||||||
|
checkUntilTimeout:
|
||||||
|
gossip1.mesh.getOrDefault("foobar").toSeq[0].codec == GossipSubCodec_10
|
||||||
|
|
||||||
|
await allFuturesThrowing(node0.switch.stop(), node1.switch.stop())
|
||||||
|
|
||||||
|
await allFuturesThrowing(nodesFut.concat())
|
||||||
|
@ -70,6 +70,7 @@ proc generateNodes*(
|
|||||||
enablePX: bool = false,
|
enablePX: bool = false,
|
||||||
overheadRateLimit: Opt[tuple[bytes: int, interval: Duration]] =
|
overheadRateLimit: Opt[tuple[bytes: int, interval: Duration]] =
|
||||||
Opt.none(tuple[bytes: int, interval: Duration]),
|
Opt.none(tuple[bytes: int, interval: Duration]),
|
||||||
|
gossipSubVersion: string = "",
|
||||||
): seq[PubSub] =
|
): seq[PubSub] =
|
||||||
for i in 0 ..< num:
|
for i in 0 ..< num:
|
||||||
let switch = newStandardSwitch(
|
let switch = newStandardSwitch(
|
||||||
@ -100,6 +101,8 @@ proc generateNodes*(
|
|||||||
g.topicParams.mgetOrPut("foobar", TopicParams.init()).topicWeight = 1.0
|
g.topicParams.mgetOrPut("foobar", TopicParams.init()).topicWeight = 1.0
|
||||||
g.topicParams.mgetOrPut("foo", TopicParams.init()).topicWeight = 1.0
|
g.topicParams.mgetOrPut("foo", TopicParams.init()).topicWeight = 1.0
|
||||||
g.topicParams.mgetOrPut("bar", TopicParams.init()).topicWeight = 1.0
|
g.topicParams.mgetOrPut("bar", TopicParams.init()).topicWeight = 1.0
|
||||||
|
if gossipSubVersion != "":
|
||||||
|
g.codecs = @[gossipSubVersion]
|
||||||
g.PubSub
|
g.PubSub
|
||||||
else:
|
else:
|
||||||
FloodSub.init(
|
FloodSub.init(
|
||||||
|
Loading…
x
Reference in New Issue
Block a user