From fcda0f6ce1cc2573e9129dbd7b37ccde8e657008 Mon Sep 17 00:00:00 2001 From: Giovanni Petrantoni Date: Mon, 13 Jul 2020 22:32:38 +0900 Subject: [PATCH] PubSubPeer tables refactor (#263) * refactor peer tables * tests fixing * override PubSubPeer equality * fix pubsubpeer comparison --- libp2p/protocols/pubsub/floodsub.nim | 23 +++-- libp2p/protocols/pubsub/gossipsub.nim | 114 +++++++++++-------------- libp2p/protocols/pubsub/pubsub.nim | 24 ++++-- libp2p/protocols/pubsub/pubsubpeer.nim | 29 +++++++ tests/pubsub/testfloodsub.nim | 2 +- tests/pubsub/testgossipinternal.nim | 110 ++++++++++++------------ tests/pubsub/testgossipsub.nim | 16 ++-- 7 files changed, 175 insertions(+), 143 deletions(-) diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index c83c2664d..cf43b70f4 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -26,7 +26,7 @@ const FloodSubCodec* = "/floodsub/1.0.0" type FloodSub* = ref object of PubSub - floodsub*: Table[string, HashSet[string]] # topic to remote peer map + floodsub*: PeerTable # topic to remote peer map seen*: TimedCache[string] # list of messages forwarded to peers method subscribeTopic*(f: FloodSub, @@ -35,23 +35,28 @@ method subscribeTopic*(f: FloodSub, peerId: string) {.gcsafe, async.} = await procCall PubSub(f).subscribeTopic(topic, subscribe, peerId) + let peer = f.peers.getOrDefault(peerId) + if peer == nil: + debug "subscribeTopic on a nil peer!" + return + if topic notin f.floodsub: - f.floodsub[topic] = initHashSet[string]() + f.floodsub[topic] = initHashSet[PubSubPeer]() if subscribe: - trace "adding subscription for topic", peer = peerId, name = topic + trace "adding subscription for topic", peer = peer.id, name = topic # subscribe the peer to the topic - f.floodsub[topic].incl(peerId) + f.floodsub[topic].incl(peer) else: - trace "removing subscription for topic", peer = peerId, name = topic + trace "removing subscription for topic", peer = peer.id, name = topic # unsubscribe the peer from the topic - f.floodsub[topic].excl(peerId) + f.floodsub[topic].excl(peer) method handleDisconnect*(f: FloodSub, peer: PubSubPeer) = ## handle peer disconnects for t in toSeq(f.floodsub.keys): if t in f.floodsub: - f.floodsub[t].excl(peer.id) + f.floodsub[t].excl(peer) procCall PubSub(f).handleDisconnect(peer) @@ -62,7 +67,7 @@ method rpcHandler*(f: FloodSub, for m in rpcMsgs: # for all RPC messages if m.messages.len > 0: # if there are any messages - var toSendPeers: HashSet[string] = initHashSet[string]() + var toSendPeers = initHashSet[PubSubPeer]() for msg in m.messages: # for every message let msgId = f.msgIdProvider(msg) logScope: msgId @@ -158,6 +163,6 @@ method initPubSub*(f: FloodSub) = procCall PubSub(f).initPubSub() f.peers = initTable[string, PubSubPeer]() f.topics = initTable[string, Topic]() - f.floodsub = initTable[string, HashSet[string]]() + f.floodsub = initTable[string, HashSet[PubSubPeer]]() f.seen = newTimedCache[string](2.minutes) f.init() diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 32240bc75..0ec1795a6 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -45,9 +45,9 @@ const GossipSubFanoutTTL* = 1.minutes type GossipSub* = ref object of FloodSub - mesh*: Table[string, HashSet[string]] # peers that we send messages to when we are subscribed to the topic - fanout*: Table[string, HashSet[string]] # peers that we send messages to when we're not subscribed to the topic - gossipsub*: Table[string, HashSet[string]] # peers that are subscribed to a topic + mesh*: PeerTable # peers that we send messages to when we are subscribed to the topic + fanout*: PeerTable # peers that we send messages to when we're not subscribed to the topic + gossipsub*: PeerTable # peers that are subscribed to a topic lastFanoutPubSub*: Table[string, Moment] # last publish time for fanout topics gossip*: Table[string, seq[ControlIHave]] # pending gossip control*: Table[string, ControlMessage] # pending control messages @@ -68,23 +68,20 @@ declareGauge(libp2p_gossipsub_peers_per_topic_gossipsub, "gossipsub peers per topic in gossipsub", labels = ["topic"]) -func addPeer( - table: var Table[string, HashSet[string]], topic: string, - peerId: string): bool = +func addPeer(table: var PeerTable, topic: string, peer: PubSubPeer): bool = # returns true if the peer was added, false if it was already in the collection - not table.mgetOrPut(topic, initHashSet[string]()).containsOrIncl(peerId) + not table.mgetOrPut(topic, initHashSet[PubSubPeer]()).containsOrIncl(peer) -func removePeer( - table: var Table[string, HashSet[string]], topic, peerId: string) = +func removePeer(table: var PeerTable, topic: string, peer: PubSubPeer) = table.withValue(topic, peers): - peers[].excl(peerId) + peers[].excl(peer) if peers[].len == 0: table.del(topic) -func hasPeer(table: Table[string, HashSet[string]], topic, peerId: string): bool = - (topic in table) and (peerId in table[topic]) +func hasPeer(table: PeerTable, topic: string, peer: PubSubPeer): bool = + (topic in table) and (peer in table[topic]) -func peers(table: Table[string, HashSet[string]], topic: string): int = +func peers(table: PeerTable, topic: string): int = if topic in table: table[topic].len else: @@ -112,8 +109,8 @@ proc replenishFanout(g: GossipSub, topic: string) = if g.fanout.peers(topic) < GossipSubDLo: trace "replenishing fanout", peers = g.fanout.peers(topic) if topic in g.gossipsub: - for peerId in g.gossipsub[topic]: - if g.fanout.addPeer(topic, peerId): + for peer in g.gossipsub[topic]: + if g.fanout.addPeer(topic, peer): if g.fanout.peers(topic) == GossipSubD: break @@ -133,8 +130,8 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = trace "replenishing mesh", topic, peers = g.mesh.peers(topic) # replenish the mesh if we're below GossipSubDlo var newPeers = toSeq( - g.gossipsub.getOrDefault(topic, initHashSet[string]()) - - g.mesh.getOrDefault(topic, initHashSet[string]()) + g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()) - + g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]()) ) logScope: @@ -146,19 +143,11 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = trace "getting peers", topic, peers = newPeers.len - for id in newPeers: - if g.mesh.peers(topic) >= GossipSubD: - break - - let p = g.peers.getOrDefault(id) - if p != nil: - # send a graft message to the peer - grafts.add p - discard g.mesh.addPeer(topic, id) - trace "got peer", peer = id - else: - # Peer should have been removed from mesh also! - warn "Unknown peer in mesh", peer = id + for peer in newPeers: + # send a graft message to the peer + grafts.add peer + discard g.mesh.addPeer(topic, peer) + trace "got peer", peer = peer.id if g.mesh.peers(topic) > GossipSubDhi: # prune peers if we've gone over @@ -166,17 +155,14 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = shuffle(mesh) trace "about to prune mesh", mesh = mesh.len - for id in mesh: + for peer in mesh: if g.mesh.peers(topic) <= GossipSubD: break trace "pruning peers", peers = g.mesh.peers(topic) # send a graft message to the peer - g.mesh.removePeer(topic, id) - - let p = g.peers.getOrDefault(id) - if p != nil: - prunes.add(p) + g.mesh.removePeer(topic, peer) + prunes.add(peer) libp2p_gossipsub_peers_per_topic_gossipsub .set(g.gossipsub.peers(topic).int64, labelValues = [topic]) @@ -236,18 +222,18 @@ proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} = trace "topic not in gossip array, skipping", topicID = topic continue - for id in allPeers: + for peer in allPeers: if result.len >= GossipSubD: trace "got gossip peers", peers = result.len break - if id in gossipPeers: + if peer in gossipPeers: continue - if id notin result: - result[id] = controlMsg + if peer.id notin result: + result[peer.id] = controlMsg - result[id].ihave.add(ihave) + result[peer.id].ihave.add(ihave) proc heartbeat(g: GossipSub) {.async.} = while g.heartbeatRunning: @@ -282,19 +268,19 @@ method handleDisconnect*(g: GossipSub, peer: PubSubPeer) = ## handle peer disconnects procCall FloodSub(g).handleDisconnect(peer) for t in toSeq(g.gossipsub.keys): - g.gossipsub.removePeer(t, peer.id) + g.gossipsub.removePeer(t, peer) libp2p_gossipsub_peers_per_topic_gossipsub .set(g.gossipsub.peers(t).int64, labelValues = [t]) for t in toSeq(g.mesh.keys): - g.mesh.removePeer(t, peer.id) + g.mesh.removePeer(t, peer) libp2p_gossipsub_peers_per_topic_mesh .set(g.mesh.peers(t).int64, labelValues = [t]) for t in toSeq(g.fanout.keys): - g.fanout.removePeer(t, peer.id) + g.fanout.removePeer(t, peer) libp2p_gossipsub_peers_per_topic_fanout .set(g.fanout.peers(t).int64, labelValues = [t]) @@ -310,16 +296,21 @@ method subscribeTopic*(g: GossipSub, peerId: string) {.gcsafe, async.} = await procCall FloodSub(g).subscribeTopic(topic, subscribe, peerId) + let peer = g.peers.getOrDefault(peerId) + if peer == nil: + debug "subscribeTopic on a nil peer!" + return + if subscribe: trace "adding subscription for topic", peer = peerId, name = topic # subscribe remote peer to the topic - discard g.gossipsub.addPeer(topic, peerId) + discard g.gossipsub.addPeer(topic, peer) else: trace "removing subscription for topic", peer = peerId, name = topic # unsubscribe remote peer from the topic - g.gossipsub.removePeer(topic, peerId) - g.mesh.removePeer(topic, peerId) - g.fanout.removePeer(topic, peerId) + g.gossipsub.removePeer(topic, peer) + g.mesh.removePeer(topic, peer) + g.fanout.removePeer(topic, peer) libp2p_gossipsub_peers_per_topic_mesh .set(g.mesh.peers(topic).int64, labelValues = [topic]) @@ -338,10 +329,9 @@ method subscribeTopic*(g: GossipSub, proc handleGraft(g: GossipSub, peer: PubSubPeer, grafts: seq[ControlGraft]): seq[ControlPrune] = - let peerId = peer.id for graft in grafts: let topic = graft.topicID - trace "processing graft message", topic, peerId + trace "processing graft message", topic, peer # If they send us a graft before they send us a subscribe, what should # we do? For now, we add them to mesh but don't add them to gossipsub. @@ -351,10 +341,10 @@ proc handleGraft(g: GossipSub, # In the spec, there's no mention of DHi here, but implicitly, a # peer will be removed from the mesh on next rebalance, so we don't want # this peer to push someone else out - if g.mesh.addPeer(topic, peerId): - g.fanout.removePeer(topic, peer.id) + if g.mesh.addPeer(topic, peer): + g.fanout.removePeer(topic, peer) else: - trace "Peer already in mesh", topic, peerId + trace "Peer already in mesh", topic, peer else: result.add(ControlPrune(topicID: topic)) else: @@ -368,7 +358,7 @@ proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) = trace "processing prune message", peer = peer.id, topicID = prune.topicID - g.mesh.removePeer(prune.topicID, peer.id) + g.mesh.removePeer(prune.topicID, peer) libp2p_gossipsub_peers_per_topic_mesh .set(g.mesh.peers(prune.topicID).int64, labelValues = [prune.topicID]) @@ -403,7 +393,7 @@ method rpcHandler*(g: GossipSub, for m in rpcMsgs: # for all RPC messages if m.messages.len > 0: # if there are any messages - var toSendPeers: HashSet[string] + var toSendPeers: HashSet[PubSubPeer] for msg in m.messages: # for every message let msgId = g.msgIdProvider(msg) logScope: msgId @@ -485,10 +475,8 @@ method unsubscribe*(g: GossipSub, let peers = g.mesh.getOrDefault(topic) g.mesh.del(topic) - for id in peers: - let p = g.peers.getOrDefault(id) - if p != nil: - await p.sendPrune(@[topic]) + for peer in peers: + await peer.sendPrune(@[topic]) method publish*(g: GossipSub, topic: string, @@ -497,7 +485,7 @@ method publish*(g: GossipSub, discard await procCall PubSub(g).publish(topic, data) trace "about to publish message on topic", name = topic, data = data.shortLog - var peers: HashSet[string] + var peers: HashSet[PubSubPeer] if topic.len <= 0: # data could be 0/empty return 0 @@ -578,9 +566,9 @@ method initPubSub*(g: GossipSub) = randomize() g.mcache = newMCache(GossipSubHistoryGossip, GossipSubHistoryLength) - g.mesh = initTable[string, HashSet[string]]() # meshes - topic to peer - g.fanout = initTable[string, HashSet[string]]() # fanout - topic to peer - g.gossipsub = initTable[string, HashSet[string]]()# topic to peer map of all gossipsub peers + g.mesh = initTable[string, HashSet[PubSubPeer]]() # meshes - topic to peer + g.fanout = initTable[string, HashSet[PubSubPeer]]() # fanout - topic to peer + g.gossipsub = initTable[string, HashSet[PubSubPeer]]()# topic to peer map of all gossipsub peers g.lastFanoutPubSub = initTable[string, Moment]() # last publish time for fanout topics g.gossip = initTable[string, seq[ControlIHave]]() # pending gossip g.control = initTable[string, ControlMessage]() # pending control messages diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 1a97ffab1..daa05d165 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -30,6 +30,8 @@ declareCounter(libp2p_pubsub_validation_failure, "pubsub failed validated messag declarePublicCounter(libp2p_pubsub_messages_published, "published messages", labels = ["topic"]) type + PeerTable* = Table[string, HashSet[PubSubPeer]] + SendRes = tuple[published: seq[string], failed: seq[string]] # keep private TopicHandler* = proc(topic: string, @@ -59,6 +61,16 @@ type observers: ref seq[PubSubObserver] # ref as in smart_ptr msgIdProvider*: MsgIdProvider # Turn message into message id (not nil) +proc hasPeerID*(t: PeerTable, topic, peerId: string): bool = + # unefficient but used only in tests! + let peers = t.getOrDefault(topic) + if peers.len == 0: + false + else: + let ps = toSeq(peers) + ps.any do (peer: PubSubPeer) -> bool: + peer.id == peerId + method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.base.} = ## handle peer disconnects ## @@ -243,20 +255,16 @@ method subscribe*(p: PubSub, libp2p_pubsub_topics.inc() proc sendHelper*(p: PubSub, - sendPeers: HashSet[string], + sendPeers: HashSet[PubSubPeer], msgs: seq[Message]): Future[SendRes] {.async.} = var sent: seq[tuple[id: string, fut: Future[void]]] for sendPeer in sendPeers: # avoid sending to self - if sendPeer == p.peerInfo.id: + if sendPeer.peerInfo == p.peerInfo: continue - let peer = p.peers.getOrDefault(sendPeer) - if isNil(peer): - continue - - trace "sending messages to peer", peer = peer.id, msgs - sent.add((id: peer.id, fut: peer.send(@[RPCMsg(messages: msgs)]))) + trace "sending messages to peer", peer = sendPeer.id, msgs + sent.add((id: sendPeer.id, fut: sendPeer.send(@[RPCMsg(messages: msgs)]))) var published: seq[string] var failed: seq[string] diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index db1312e83..f5fcd1719 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -43,6 +43,35 @@ type RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.} +func hash*(p: PubSubPeer): Hash = + # int is either 32/64, so intptr basically, pubsubpeer is a ref + cast[pointer](p).hash + +func `==`*(a, b: PubSubPeer): bool = + # override equiality to support both nil and peerInfo comparisons + # this in the future will allow us to recycle refs + let + aptr = cast[pointer](a) + bptr = cast[pointer](b) + if aptr == nil: + if bptr == nil: + true + else: + false + elif bptr == nil: + false + else: + if a.peerInfo == nil: + if b.peerInfo == nil: + true + else: + false + else: + if b.peerInfo == nil: + false + else: + a.peerInfo.id == b.peerInfo.id + proc id*(p: PubSubPeer): string = p.peerInfo.id proc connected*(p: PubSubPeer): bool = diff --git a/tests/pubsub/testfloodsub.nim b/tests/pubsub/testfloodsub.nim index f6798976f..199921053 100644 --- a/tests/pubsub/testfloodsub.nim +++ b/tests/pubsub/testfloodsub.nim @@ -29,7 +29,7 @@ proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} = var ceil = 15 let fsub = cast[FloodSub](sender.pubSub.get()) while not fsub.floodsub.hasKey(key) or - not fsub.floodsub[key].contains(receiver.peerInfo.id): + not fsub.floodsub.hasPeerID(key, receiver.peerInfo.id): await sleepAsync(100.millis) dec ceil doAssert(ceil > 0, "waitSub timeout!") diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index 2fb13d765..be0127873 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -29,18 +29,19 @@ suite "GossipSub internal": let gossipSub = newPubSub(TestGossipSub, randomPeerInfo()) let topic = "foobar" - gossipSub.mesh[topic] = initHashSet[string]() + gossipSub.mesh[topic] = initHashSet[PubSubPeer]() var conns = newSeq[Connection]() - gossipSub.gossipsub[topic] = initHashSet[string]() + gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() for i in 0..<15: let conn = newBufferStream(noop) conns &= conn let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo - gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) - gossipSub.peers[peerInfo.id].conn = conn - gossipSub.mesh[topic].incl(peerInfo.id) + let peer = newPubSubPeer(peerInfo, GossipSubCodec) + peer.conn = conn + gossipSub.peers[peerInfo.id] = peer + gossipSub.mesh[topic].incl(peer) check gossipSub.peers.len == 15 await gossipSub.rebalanceMesh(topic) @@ -58,19 +59,20 @@ suite "GossipSub internal": let gossipSub = newPubSub(TestGossipSub, randomPeerInfo()) let topic = "foobar" - gossipSub.mesh[topic] = initHashSet[string]() + gossipSub.mesh[topic] = initHashSet[PubSubPeer]() gossipSub.topics[topic] = Topic() # has to be in topics to rebalance - gossipSub.gossipsub[topic] = initHashSet[string]() + gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() var conns = newSeq[Connection]() for i in 0..<15: let conn = newBufferStream(noop) conns &= conn let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) conn.peerInfo = peerInfo - gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) - gossipSub.peers[peerInfo.id].conn = conn - gossipSub.mesh[topic].incl(peerInfo.id) + let peer = newPubSubPeer(peerInfo, GossipSubCodec) + peer.conn = conn + gossipSub.peers[peerInfo.id] = peer + gossipSub.mesh[topic].incl(peer) check gossipSub.mesh[topic].len == 15 await gossipSub.rebalanceMesh(topic) @@ -91,7 +93,7 @@ suite "GossipSub internal": discard let topic = "foobar" - gossipSub.gossipsub[topic] = initHashSet[string]() + gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() var conns = newSeq[Connection]() for i in 0..<15: @@ -99,9 +101,9 @@ suite "GossipSub internal": conns &= conn var peerInfo = randomPeerInfo() conn.peerInfo = peerInfo - gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) - gossipSub.peers[peerInfo.id].handler = handler - gossipSub.gossipsub[topic].incl(peerInfo.id) + let peer = newPubSubPeer(peerInfo, GossipSubCodec) + peer.handler = handler + gossipSub.gossipsub[topic].incl(peer) check gossipSub.gossipsub[topic].len == 15 gossipSub.replenishFanout(topic) @@ -122,7 +124,7 @@ suite "GossipSub internal": discard let topic = "foobar" - gossipSub.fanout[topic] = initHashSet[string]() + gossipSub.fanout[topic] = initHashSet[PubSubPeer]() gossipSub.lastFanoutPubSub[topic] = Moment.fromNow(1.millis) await sleepAsync(5.millis) # allow the topic to expire @@ -132,9 +134,9 @@ suite "GossipSub internal": conns &= conn let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) conn.peerInfo = peerInfo - gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) - gossipSub.peers[peerInfo.id].handler = handler - gossipSub.fanout[topic].incl(peerInfo.id) + let peer = newPubSubPeer(peerInfo, GossipSubCodec) + peer.handler = handler + gossipSub.fanout[topic].incl(peer) check gossipSub.fanout[topic].len == GossipSubD @@ -157,8 +159,8 @@ suite "GossipSub internal": let topic1 = "foobar1" let topic2 = "foobar2" - gossipSub.fanout[topic1] = initHashSet[string]() - gossipSub.fanout[topic2] = initHashSet[string]() + gossipSub.fanout[topic1] = initHashSet[PubSubPeer]() + gossipSub.fanout[topic2] = initHashSet[PubSubPeer]() gossipSub.lastFanoutPubSub[topic1] = Moment.fromNow(1.millis) gossipSub.lastFanoutPubSub[topic2] = Moment.fromNow(1.minutes) await sleepAsync(5.millis) # allow the topic to expire @@ -169,10 +171,10 @@ suite "GossipSub internal": conns &= conn let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo - gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) - gossipSub.peers[peerInfo.id].handler = handler - gossipSub.fanout[topic1].incl(peerInfo.id) - gossipSub.fanout[topic2].incl(peerInfo.id) + let peer = newPubSubPeer(peerInfo, GossipSubCodec) + peer.handler = handler + gossipSub.fanout[topic1].incl(peer) + gossipSub.fanout[topic2].incl(peer) check gossipSub.fanout[topic1].len == GossipSubD check gossipSub.fanout[topic2].len == GossipSubD @@ -196,9 +198,9 @@ suite "GossipSub internal": discard let topic = "foobar" - gossipSub.mesh[topic] = initHashSet[string]() - gossipSub.fanout[topic] = initHashSet[string]() - gossipSub.gossipsub[topic] = initHashSet[string]() + gossipSub.mesh[topic] = initHashSet[PubSubPeer]() + gossipSub.fanout[topic] = initHashSet[PubSubPeer]() + gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() var conns = newSeq[Connection]() # generate mesh and fanout peers @@ -207,12 +209,12 @@ suite "GossipSub internal": conns &= conn let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo - gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) - gossipSub.peers[peerInfo.id].handler = handler + let peer = newPubSubPeer(peerInfo, GossipSubCodec) + peer.handler = handler if i mod 2 == 0: - gossipSub.fanout[topic].incl(peerInfo.id) + gossipSub.fanout[topic].incl(peer) else: - gossipSub.mesh[topic].incl(peerInfo.id) + gossipSub.mesh[topic].incl(peer) # generate gossipsub (free standing) peers for i in 0..<15: @@ -220,9 +222,9 @@ suite "GossipSub internal": conns &= conn let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo - gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) - gossipSub.peers[peerInfo.id].handler = handler - gossipSub.gossipsub[topic].incl(peerInfo.id) + let peer = newPubSubPeer(peerInfo, GossipSubCodec) + peer.handler = handler + gossipSub.gossipsub[topic].incl(peer) # generate messages for i in 0..5: @@ -240,8 +242,8 @@ suite "GossipSub internal": let peers = gossipSub.getGossipPeers() check peers.len == GossipSubD for p in peers.keys: - check p notin gossipSub.fanout[topic] - check p notin gossipSub.mesh[topic] + check not gossipSub.fanout.hasPeerID(topic, p) + check not gossipSub.mesh.hasPeerID(topic, p) await allFuturesThrowing(conns.mapIt(it.close())) @@ -258,20 +260,20 @@ suite "GossipSub internal": discard let topic = "foobar" - gossipSub.fanout[topic] = initHashSet[string]() - gossipSub.gossipsub[topic] = initHashSet[string]() + gossipSub.fanout[topic] = initHashSet[PubSubPeer]() + gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() var conns = newSeq[Connection]() for i in 0..<30: let conn = newBufferStream(noop) conns &= conn let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo - gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) - gossipSub.peers[peerInfo.id].handler = handler + let peer = newPubSubPeer(peerInfo, GossipSubCodec) + peer.handler = handler if i mod 2 == 0: - gossipSub.fanout[topic].incl(peerInfo.id) + gossipSub.fanout[topic].incl(peer) else: - gossipSub.gossipsub[topic].incl(peerInfo.id) + gossipSub.gossipsub[topic].incl(peer) # generate messages for i in 0..5: @@ -300,20 +302,20 @@ suite "GossipSub internal": discard let topic = "foobar" - gossipSub.mesh[topic] = initHashSet[string]() - gossipSub.gossipsub[topic] = initHashSet[string]() + gossipSub.mesh[topic] = initHashSet[PubSubPeer]() + gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() var conns = newSeq[Connection]() for i in 0..<30: let conn = newBufferStream(noop) conns &= conn let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo - gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) - gossipSub.peers[peerInfo.id].handler = handler + let peer = newPubSubPeer(peerInfo, GossipSubCodec) + peer.handler = handler if i mod 2 == 0: - gossipSub.mesh[topic].incl(peerInfo.id) + gossipSub.mesh[topic].incl(peer) else: - gossipSub.gossipsub[topic].incl(peerInfo.id) + gossipSub.gossipsub[topic].incl(peer) # generate messages for i in 0..5: @@ -342,20 +344,20 @@ suite "GossipSub internal": discard let topic = "foobar" - gossipSub.mesh[topic] = initHashSet[string]() - gossipSub.fanout[topic] = initHashSet[string]() + gossipSub.mesh[topic] = initHashSet[PubSubPeer]() + gossipSub.fanout[topic] = initHashSet[PubSubPeer]() var conns = newSeq[Connection]() for i in 0..<30: let conn = newBufferStream(noop) conns &= conn let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo - gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) - gossipSub.peers[peerInfo.id].handler = handler + let peer = newPubSubPeer(peerInfo, GossipSubCodec) + peer.handler = handler if i mod 2 == 0: - gossipSub.mesh[topic].incl(peerInfo.id) + gossipSub.mesh[topic].incl(peer) else: - gossipSub.fanout[topic].incl(peerInfo.id) + gossipSub.fanout[topic].incl(peer) # generate messages for i in 0..5: diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 757429500..3bcfe6cf6 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -32,11 +32,11 @@ proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} = var ceil = 15 let fsub = GossipSub(sender.pubSub.get()) while (not fsub.gossipsub.hasKey(key) or - not fsub.gossipsub[key].contains(receiver.peerInfo.id)) and + not fsub.gossipsub.hasPeerID(key, receiver.peerInfo.id)) and (not fsub.mesh.hasKey(key) or - not fsub.mesh[key].contains(receiver.peerInfo.id)) and + not fsub.mesh.hasPeerID(key, receiver.peerInfo.id)) and (not fsub.fanout.hasKey(key) or - not fsub.fanout[key].contains(receiver.peerInfo.id)): + not fsub.fanout.hasPeerID(key , receiver.peerInfo.id)): trace "waitSub sleeping..." await sleepAsync(1.seconds) dec ceil @@ -192,7 +192,7 @@ suite "GossipSub": check: "foobar" in gossip2.topics "foobar" in gossip1.gossipsub - gossip2.peerInfo.id in gossip1.gossipsub["foobar"] + gossip1.gossipsub.hasPeerID("foobar", gossip2.peerInfo.id) await allFuturesThrowing(nodes.mapIt(it.stop())) await allFuturesThrowing(awaitters) @@ -236,11 +236,11 @@ suite "GossipSub": "foobar" in gossip1.gossipsub "foobar" in gossip2.gossipsub - gossip2.peerInfo.id in gossip1.gossipsub["foobar"] or - gossip2.peerInfo.id in gossip1.mesh["foobar"] + gossip1.gossipsub.hasPeerID("foobar", gossip2.peerInfo.id) or + gossip1.mesh.hasPeerID("foobar", gossip2.peerInfo.id) - gossip1.peerInfo.id in gossip2.gossipsub["foobar"] or - gossip1.peerInfo.id in gossip2.mesh["foobar"] + gossip2.gossipsub.hasPeerID("foobar", gossip1.peerInfo.id) or + gossip2.mesh.hasPeerID("foobar", gossip1.peerInfo.id) await allFuturesThrowing(nodes.mapIt(it.stop())) await allFuturesThrowing(awaitters)