PubSubPeer tables refactor (#263)

* refactor peer tables

* tests fixing

* override PubSubPeer equality

* fix pubsubpeer comparison
This commit is contained in:
Giovanni Petrantoni 2020-07-13 22:32:38 +09:00 committed by GitHub
parent efb952f18b
commit fcda0f6ce1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 175 additions and 143 deletions

View File

@ -26,7 +26,7 @@ const FloodSubCodec* = "/floodsub/1.0.0"
type type
FloodSub* = ref object of PubSub FloodSub* = ref object of PubSub
floodsub*: Table[string, HashSet[string]] # topic to remote peer map floodsub*: PeerTable # topic to remote peer map
seen*: TimedCache[string] # list of messages forwarded to peers seen*: TimedCache[string] # list of messages forwarded to peers
method subscribeTopic*(f: FloodSub, method subscribeTopic*(f: FloodSub,
@ -35,23 +35,28 @@ method subscribeTopic*(f: FloodSub,
peerId: string) {.gcsafe, async.} = peerId: string) {.gcsafe, async.} =
await procCall PubSub(f).subscribeTopic(topic, subscribe, peerId) await procCall PubSub(f).subscribeTopic(topic, subscribe, peerId)
let peer = f.peers.getOrDefault(peerId)
if peer == nil:
debug "subscribeTopic on a nil peer!"
return
if topic notin f.floodsub: if topic notin f.floodsub:
f.floodsub[topic] = initHashSet[string]() f.floodsub[topic] = initHashSet[PubSubPeer]()
if subscribe: if subscribe:
trace "adding subscription for topic", peer = peerId, name = topic trace "adding subscription for topic", peer = peer.id, name = topic
# subscribe the peer to the topic # subscribe the peer to the topic
f.floodsub[topic].incl(peerId) f.floodsub[topic].incl(peer)
else: else:
trace "removing subscription for topic", peer = peerId, name = topic trace "removing subscription for topic", peer = peer.id, name = topic
# unsubscribe the peer from the topic # unsubscribe the peer from the topic
f.floodsub[topic].excl(peerId) f.floodsub[topic].excl(peer)
method handleDisconnect*(f: FloodSub, peer: PubSubPeer) = method handleDisconnect*(f: FloodSub, peer: PubSubPeer) =
## handle peer disconnects ## handle peer disconnects
for t in toSeq(f.floodsub.keys): for t in toSeq(f.floodsub.keys):
if t in f.floodsub: if t in f.floodsub:
f.floodsub[t].excl(peer.id) f.floodsub[t].excl(peer)
procCall PubSub(f).handleDisconnect(peer) procCall PubSub(f).handleDisconnect(peer)
@ -62,7 +67,7 @@ method rpcHandler*(f: FloodSub,
for m in rpcMsgs: # for all RPC messages for m in rpcMsgs: # for all RPC messages
if m.messages.len > 0: # if there are any messages if m.messages.len > 0: # if there are any messages
var toSendPeers: HashSet[string] = initHashSet[string]() var toSendPeers = initHashSet[PubSubPeer]()
for msg in m.messages: # for every message for msg in m.messages: # for every message
let msgId = f.msgIdProvider(msg) let msgId = f.msgIdProvider(msg)
logScope: msgId logScope: msgId
@ -158,6 +163,6 @@ method initPubSub*(f: FloodSub) =
procCall PubSub(f).initPubSub() procCall PubSub(f).initPubSub()
f.peers = initTable[string, PubSubPeer]() f.peers = initTable[string, PubSubPeer]()
f.topics = initTable[string, Topic]() f.topics = initTable[string, Topic]()
f.floodsub = initTable[string, HashSet[string]]() f.floodsub = initTable[string, HashSet[PubSubPeer]]()
f.seen = newTimedCache[string](2.minutes) f.seen = newTimedCache[string](2.minutes)
f.init() f.init()

View File

@ -45,9 +45,9 @@ const GossipSubFanoutTTL* = 1.minutes
type type
GossipSub* = ref object of FloodSub GossipSub* = ref object of FloodSub
mesh*: Table[string, HashSet[string]] # peers that we send messages to when we are subscribed to the topic mesh*: PeerTable # peers that we send messages to when we are subscribed to the topic
fanout*: Table[string, HashSet[string]] # peers that we send messages to when we're not subscribed to the topic fanout*: PeerTable # peers that we send messages to when we're not subscribed to the topic
gossipsub*: Table[string, HashSet[string]] # peers that are subscribed to a topic gossipsub*: PeerTable # peers that are subscribed to a topic
lastFanoutPubSub*: Table[string, Moment] # last publish time for fanout topics lastFanoutPubSub*: Table[string, Moment] # last publish time for fanout topics
gossip*: Table[string, seq[ControlIHave]] # pending gossip gossip*: Table[string, seq[ControlIHave]] # pending gossip
control*: Table[string, ControlMessage] # pending control messages control*: Table[string, ControlMessage] # pending control messages
@ -68,23 +68,20 @@ declareGauge(libp2p_gossipsub_peers_per_topic_gossipsub,
"gossipsub peers per topic in gossipsub", "gossipsub peers per topic in gossipsub",
labels = ["topic"]) labels = ["topic"])
func addPeer( func addPeer(table: var PeerTable, topic: string, peer: PubSubPeer): bool =
table: var Table[string, HashSet[string]], topic: string,
peerId: string): bool =
# returns true if the peer was added, false if it was already in the collection # returns true if the peer was added, false if it was already in the collection
not table.mgetOrPut(topic, initHashSet[string]()).containsOrIncl(peerId) not table.mgetOrPut(topic, initHashSet[PubSubPeer]()).containsOrIncl(peer)
func removePeer( func removePeer(table: var PeerTable, topic: string, peer: PubSubPeer) =
table: var Table[string, HashSet[string]], topic, peerId: string) =
table.withValue(topic, peers): table.withValue(topic, peers):
peers[].excl(peerId) peers[].excl(peer)
if peers[].len == 0: if peers[].len == 0:
table.del(topic) table.del(topic)
func hasPeer(table: Table[string, HashSet[string]], topic, peerId: string): bool = func hasPeer(table: PeerTable, topic: string, peer: PubSubPeer): bool =
(topic in table) and (peerId in table[topic]) (topic in table) and (peer in table[topic])
func peers(table: Table[string, HashSet[string]], topic: string): int = func peers(table: PeerTable, topic: string): int =
if topic in table: if topic in table:
table[topic].len table[topic].len
else: else:
@ -112,8 +109,8 @@ proc replenishFanout(g: GossipSub, topic: string) =
if g.fanout.peers(topic) < GossipSubDLo: if g.fanout.peers(topic) < GossipSubDLo:
trace "replenishing fanout", peers = g.fanout.peers(topic) trace "replenishing fanout", peers = g.fanout.peers(topic)
if topic in g.gossipsub: if topic in g.gossipsub:
for peerId in g.gossipsub[topic]: for peer in g.gossipsub[topic]:
if g.fanout.addPeer(topic, peerId): if g.fanout.addPeer(topic, peer):
if g.fanout.peers(topic) == GossipSubD: if g.fanout.peers(topic) == GossipSubD:
break break
@ -133,8 +130,8 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
trace "replenishing mesh", topic, peers = g.mesh.peers(topic) trace "replenishing mesh", topic, peers = g.mesh.peers(topic)
# replenish the mesh if we're below GossipSubDlo # replenish the mesh if we're below GossipSubDlo
var newPeers = toSeq( var newPeers = toSeq(
g.gossipsub.getOrDefault(topic, initHashSet[string]()) - g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()) -
g.mesh.getOrDefault(topic, initHashSet[string]()) g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]())
) )
logScope: logScope:
@ -146,19 +143,11 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
trace "getting peers", topic, peers = newPeers.len trace "getting peers", topic, peers = newPeers.len
for id in newPeers: for peer in newPeers:
if g.mesh.peers(topic) >= GossipSubD: # send a graft message to the peer
break grafts.add peer
discard g.mesh.addPeer(topic, peer)
let p = g.peers.getOrDefault(id) trace "got peer", peer = peer.id
if p != nil:
# send a graft message to the peer
grafts.add p
discard g.mesh.addPeer(topic, id)
trace "got peer", peer = id
else:
# Peer should have been removed from mesh also!
warn "Unknown peer in mesh", peer = id
if g.mesh.peers(topic) > GossipSubDhi: if g.mesh.peers(topic) > GossipSubDhi:
# prune peers if we've gone over # prune peers if we've gone over
@ -166,17 +155,14 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
shuffle(mesh) shuffle(mesh)
trace "about to prune mesh", mesh = mesh.len trace "about to prune mesh", mesh = mesh.len
for id in mesh: for peer in mesh:
if g.mesh.peers(topic) <= GossipSubD: if g.mesh.peers(topic) <= GossipSubD:
break break
trace "pruning peers", peers = g.mesh.peers(topic) trace "pruning peers", peers = g.mesh.peers(topic)
# send a graft message to the peer # send a graft message to the peer
g.mesh.removePeer(topic, id) g.mesh.removePeer(topic, peer)
prunes.add(peer)
let p = g.peers.getOrDefault(id)
if p != nil:
prunes.add(p)
libp2p_gossipsub_peers_per_topic_gossipsub libp2p_gossipsub_peers_per_topic_gossipsub
.set(g.gossipsub.peers(topic).int64, labelValues = [topic]) .set(g.gossipsub.peers(topic).int64, labelValues = [topic])
@ -236,18 +222,18 @@ proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} =
trace "topic not in gossip array, skipping", topicID = topic trace "topic not in gossip array, skipping", topicID = topic
continue continue
for id in allPeers: for peer in allPeers:
if result.len >= GossipSubD: if result.len >= GossipSubD:
trace "got gossip peers", peers = result.len trace "got gossip peers", peers = result.len
break break
if id in gossipPeers: if peer in gossipPeers:
continue continue
if id notin result: if peer.id notin result:
result[id] = controlMsg result[peer.id] = controlMsg
result[id].ihave.add(ihave) result[peer.id].ihave.add(ihave)
proc heartbeat(g: GossipSub) {.async.} = proc heartbeat(g: GossipSub) {.async.} =
while g.heartbeatRunning: while g.heartbeatRunning:
@ -282,19 +268,19 @@ method handleDisconnect*(g: GossipSub, peer: PubSubPeer) =
## handle peer disconnects ## handle peer disconnects
procCall FloodSub(g).handleDisconnect(peer) procCall FloodSub(g).handleDisconnect(peer)
for t in toSeq(g.gossipsub.keys): for t in toSeq(g.gossipsub.keys):
g.gossipsub.removePeer(t, peer.id) g.gossipsub.removePeer(t, peer)
libp2p_gossipsub_peers_per_topic_gossipsub libp2p_gossipsub_peers_per_topic_gossipsub
.set(g.gossipsub.peers(t).int64, labelValues = [t]) .set(g.gossipsub.peers(t).int64, labelValues = [t])
for t in toSeq(g.mesh.keys): for t in toSeq(g.mesh.keys):
g.mesh.removePeer(t, peer.id) g.mesh.removePeer(t, peer)
libp2p_gossipsub_peers_per_topic_mesh libp2p_gossipsub_peers_per_topic_mesh
.set(g.mesh.peers(t).int64, labelValues = [t]) .set(g.mesh.peers(t).int64, labelValues = [t])
for t in toSeq(g.fanout.keys): for t in toSeq(g.fanout.keys):
g.fanout.removePeer(t, peer.id) g.fanout.removePeer(t, peer)
libp2p_gossipsub_peers_per_topic_fanout libp2p_gossipsub_peers_per_topic_fanout
.set(g.fanout.peers(t).int64, labelValues = [t]) .set(g.fanout.peers(t).int64, labelValues = [t])
@ -310,16 +296,21 @@ method subscribeTopic*(g: GossipSub,
peerId: string) {.gcsafe, async.} = peerId: string) {.gcsafe, async.} =
await procCall FloodSub(g).subscribeTopic(topic, subscribe, peerId) await procCall FloodSub(g).subscribeTopic(topic, subscribe, peerId)
let peer = g.peers.getOrDefault(peerId)
if peer == nil:
debug "subscribeTopic on a nil peer!"
return
if subscribe: if subscribe:
trace "adding subscription for topic", peer = peerId, name = topic trace "adding subscription for topic", peer = peerId, name = topic
# subscribe remote peer to the topic # subscribe remote peer to the topic
discard g.gossipsub.addPeer(topic, peerId) discard g.gossipsub.addPeer(topic, peer)
else: else:
trace "removing subscription for topic", peer = peerId, name = topic trace "removing subscription for topic", peer = peerId, name = topic
# unsubscribe remote peer from the topic # unsubscribe remote peer from the topic
g.gossipsub.removePeer(topic, peerId) g.gossipsub.removePeer(topic, peer)
g.mesh.removePeer(topic, peerId) g.mesh.removePeer(topic, peer)
g.fanout.removePeer(topic, peerId) g.fanout.removePeer(topic, peer)
libp2p_gossipsub_peers_per_topic_mesh libp2p_gossipsub_peers_per_topic_mesh
.set(g.mesh.peers(topic).int64, labelValues = [topic]) .set(g.mesh.peers(topic).int64, labelValues = [topic])
@ -338,10 +329,9 @@ method subscribeTopic*(g: GossipSub,
proc handleGraft(g: GossipSub, proc handleGraft(g: GossipSub,
peer: PubSubPeer, peer: PubSubPeer,
grafts: seq[ControlGraft]): seq[ControlPrune] = grafts: seq[ControlGraft]): seq[ControlPrune] =
let peerId = peer.id
for graft in grafts: for graft in grafts:
let topic = graft.topicID let topic = graft.topicID
trace "processing graft message", topic, peerId trace "processing graft message", topic, peer
# If they send us a graft before they send us a subscribe, what should # If they send us a graft before they send us a subscribe, what should
# we do? For now, we add them to mesh but don't add them to gossipsub. # we do? For now, we add them to mesh but don't add them to gossipsub.
@ -351,10 +341,10 @@ proc handleGraft(g: GossipSub,
# In the spec, there's no mention of DHi here, but implicitly, a # In the spec, there's no mention of DHi here, but implicitly, a
# peer will be removed from the mesh on next rebalance, so we don't want # peer will be removed from the mesh on next rebalance, so we don't want
# this peer to push someone else out # this peer to push someone else out
if g.mesh.addPeer(topic, peerId): if g.mesh.addPeer(topic, peer):
g.fanout.removePeer(topic, peer.id) g.fanout.removePeer(topic, peer)
else: else:
trace "Peer already in mesh", topic, peerId trace "Peer already in mesh", topic, peer
else: else:
result.add(ControlPrune(topicID: topic)) result.add(ControlPrune(topicID: topic))
else: else:
@ -368,7 +358,7 @@ proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) =
trace "processing prune message", peer = peer.id, trace "processing prune message", peer = peer.id,
topicID = prune.topicID topicID = prune.topicID
g.mesh.removePeer(prune.topicID, peer.id) g.mesh.removePeer(prune.topicID, peer)
libp2p_gossipsub_peers_per_topic_mesh libp2p_gossipsub_peers_per_topic_mesh
.set(g.mesh.peers(prune.topicID).int64, labelValues = [prune.topicID]) .set(g.mesh.peers(prune.topicID).int64, labelValues = [prune.topicID])
@ -403,7 +393,7 @@ method rpcHandler*(g: GossipSub,
for m in rpcMsgs: # for all RPC messages for m in rpcMsgs: # for all RPC messages
if m.messages.len > 0: # if there are any messages if m.messages.len > 0: # if there are any messages
var toSendPeers: HashSet[string] var toSendPeers: HashSet[PubSubPeer]
for msg in m.messages: # for every message for msg in m.messages: # for every message
let msgId = g.msgIdProvider(msg) let msgId = g.msgIdProvider(msg)
logScope: msgId logScope: msgId
@ -485,10 +475,8 @@ method unsubscribe*(g: GossipSub,
let peers = g.mesh.getOrDefault(topic) let peers = g.mesh.getOrDefault(topic)
g.mesh.del(topic) g.mesh.del(topic)
for id in peers: for peer in peers:
let p = g.peers.getOrDefault(id) await peer.sendPrune(@[topic])
if p != nil:
await p.sendPrune(@[topic])
method publish*(g: GossipSub, method publish*(g: GossipSub,
topic: string, topic: string,
@ -497,7 +485,7 @@ method publish*(g: GossipSub,
discard await procCall PubSub(g).publish(topic, data) discard await procCall PubSub(g).publish(topic, data)
trace "about to publish message on topic", name = topic, trace "about to publish message on topic", name = topic,
data = data.shortLog data = data.shortLog
var peers: HashSet[string] var peers: HashSet[PubSubPeer]
if topic.len <= 0: # data could be 0/empty if topic.len <= 0: # data could be 0/empty
return 0 return 0
@ -578,9 +566,9 @@ method initPubSub*(g: GossipSub) =
randomize() randomize()
g.mcache = newMCache(GossipSubHistoryGossip, GossipSubHistoryLength) g.mcache = newMCache(GossipSubHistoryGossip, GossipSubHistoryLength)
g.mesh = initTable[string, HashSet[string]]() # meshes - topic to peer g.mesh = initTable[string, HashSet[PubSubPeer]]() # meshes - topic to peer
g.fanout = initTable[string, HashSet[string]]() # fanout - topic to peer g.fanout = initTable[string, HashSet[PubSubPeer]]() # fanout - topic to peer
g.gossipsub = initTable[string, HashSet[string]]()# topic to peer map of all gossipsub peers g.gossipsub = initTable[string, HashSet[PubSubPeer]]()# topic to peer map of all gossipsub peers
g.lastFanoutPubSub = initTable[string, Moment]() # last publish time for fanout topics g.lastFanoutPubSub = initTable[string, Moment]() # last publish time for fanout topics
g.gossip = initTable[string, seq[ControlIHave]]() # pending gossip g.gossip = initTable[string, seq[ControlIHave]]() # pending gossip
g.control = initTable[string, ControlMessage]() # pending control messages g.control = initTable[string, ControlMessage]() # pending control messages

View File

@ -30,6 +30,8 @@ declareCounter(libp2p_pubsub_validation_failure, "pubsub failed validated messag
declarePublicCounter(libp2p_pubsub_messages_published, "published messages", labels = ["topic"]) declarePublicCounter(libp2p_pubsub_messages_published, "published messages", labels = ["topic"])
type type
PeerTable* = Table[string, HashSet[PubSubPeer]]
SendRes = tuple[published: seq[string], failed: seq[string]] # keep private SendRes = tuple[published: seq[string], failed: seq[string]] # keep private
TopicHandler* = proc(topic: string, TopicHandler* = proc(topic: string,
@ -59,6 +61,16 @@ type
observers: ref seq[PubSubObserver] # ref as in smart_ptr observers: ref seq[PubSubObserver] # ref as in smart_ptr
msgIdProvider*: MsgIdProvider # Turn message into message id (not nil) msgIdProvider*: MsgIdProvider # Turn message into message id (not nil)
proc hasPeerID*(t: PeerTable, topic, peerId: string): bool =
# unefficient but used only in tests!
let peers = t.getOrDefault(topic)
if peers.len == 0:
false
else:
let ps = toSeq(peers)
ps.any do (peer: PubSubPeer) -> bool:
peer.id == peerId
method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.base.} = method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.base.} =
## handle peer disconnects ## handle peer disconnects
## ##
@ -243,20 +255,16 @@ method subscribe*(p: PubSub,
libp2p_pubsub_topics.inc() libp2p_pubsub_topics.inc()
proc sendHelper*(p: PubSub, proc sendHelper*(p: PubSub,
sendPeers: HashSet[string], sendPeers: HashSet[PubSubPeer],
msgs: seq[Message]): Future[SendRes] {.async.} = msgs: seq[Message]): Future[SendRes] {.async.} =
var sent: seq[tuple[id: string, fut: Future[void]]] var sent: seq[tuple[id: string, fut: Future[void]]]
for sendPeer in sendPeers: for sendPeer in sendPeers:
# avoid sending to self # avoid sending to self
if sendPeer == p.peerInfo.id: if sendPeer.peerInfo == p.peerInfo:
continue continue
let peer = p.peers.getOrDefault(sendPeer) trace "sending messages to peer", peer = sendPeer.id, msgs
if isNil(peer): sent.add((id: sendPeer.id, fut: sendPeer.send(@[RPCMsg(messages: msgs)])))
continue
trace "sending messages to peer", peer = peer.id, msgs
sent.add((id: peer.id, fut: peer.send(@[RPCMsg(messages: msgs)])))
var published: seq[string] var published: seq[string]
var failed: seq[string] var failed: seq[string]

View File

@ -43,6 +43,35 @@ type
RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.} RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.}
func hash*(p: PubSubPeer): Hash =
# int is either 32/64, so intptr basically, pubsubpeer is a ref
cast[pointer](p).hash
func `==`*(a, b: PubSubPeer): bool =
# override equiality to support both nil and peerInfo comparisons
# this in the future will allow us to recycle refs
let
aptr = cast[pointer](a)
bptr = cast[pointer](b)
if aptr == nil:
if bptr == nil:
true
else:
false
elif bptr == nil:
false
else:
if a.peerInfo == nil:
if b.peerInfo == nil:
true
else:
false
else:
if b.peerInfo == nil:
false
else:
a.peerInfo.id == b.peerInfo.id
proc id*(p: PubSubPeer): string = p.peerInfo.id proc id*(p: PubSubPeer): string = p.peerInfo.id
proc connected*(p: PubSubPeer): bool = proc connected*(p: PubSubPeer): bool =

View File

@ -29,7 +29,7 @@ proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} =
var ceil = 15 var ceil = 15
let fsub = cast[FloodSub](sender.pubSub.get()) let fsub = cast[FloodSub](sender.pubSub.get())
while not fsub.floodsub.hasKey(key) or while not fsub.floodsub.hasKey(key) or
not fsub.floodsub[key].contains(receiver.peerInfo.id): not fsub.floodsub.hasPeerID(key, receiver.peerInfo.id):
await sleepAsync(100.millis) await sleepAsync(100.millis)
dec ceil dec ceil
doAssert(ceil > 0, "waitSub timeout!") doAssert(ceil > 0, "waitSub timeout!")

View File

@ -29,18 +29,19 @@ suite "GossipSub internal":
let gossipSub = newPubSub(TestGossipSub, randomPeerInfo()) let gossipSub = newPubSub(TestGossipSub, randomPeerInfo())
let topic = "foobar" let topic = "foobar"
gossipSub.mesh[topic] = initHashSet[string]() gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
var conns = newSeq[Connection]() var conns = newSeq[Connection]()
gossipSub.gossipsub[topic] = initHashSet[string]() gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
for i in 0..<15: for i in 0..<15:
let conn = newBufferStream(noop) let conn = newBufferStream(noop)
conns &= conn conns &= conn
let peerInfo = randomPeerInfo() let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) let peer = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id].conn = conn peer.conn = conn
gossipSub.mesh[topic].incl(peerInfo.id) gossipSub.peers[peerInfo.id] = peer
gossipSub.mesh[topic].incl(peer)
check gossipSub.peers.len == 15 check gossipSub.peers.len == 15
await gossipSub.rebalanceMesh(topic) await gossipSub.rebalanceMesh(topic)
@ -58,19 +59,20 @@ suite "GossipSub internal":
let gossipSub = newPubSub(TestGossipSub, randomPeerInfo()) let gossipSub = newPubSub(TestGossipSub, randomPeerInfo())
let topic = "foobar" let topic = "foobar"
gossipSub.mesh[topic] = initHashSet[string]() gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
gossipSub.topics[topic] = Topic() # has to be in topics to rebalance gossipSub.topics[topic] = Topic() # has to be in topics to rebalance
gossipSub.gossipsub[topic] = initHashSet[string]() gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
var conns = newSeq[Connection]() var conns = newSeq[Connection]()
for i in 0..<15: for i in 0..<15:
let conn = newBufferStream(noop) let conn = newBufferStream(noop)
conns &= conn conns &= conn
let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) let peer = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id].conn = conn peer.conn = conn
gossipSub.mesh[topic].incl(peerInfo.id) gossipSub.peers[peerInfo.id] = peer
gossipSub.mesh[topic].incl(peer)
check gossipSub.mesh[topic].len == 15 check gossipSub.mesh[topic].len == 15
await gossipSub.rebalanceMesh(topic) await gossipSub.rebalanceMesh(topic)
@ -91,7 +93,7 @@ suite "GossipSub internal":
discard discard
let topic = "foobar" let topic = "foobar"
gossipSub.gossipsub[topic] = initHashSet[string]() gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
var conns = newSeq[Connection]() var conns = newSeq[Connection]()
for i in 0..<15: for i in 0..<15:
@ -99,9 +101,9 @@ suite "GossipSub internal":
conns &= conn conns &= conn
var peerInfo = randomPeerInfo() var peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) let peer = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id].handler = handler peer.handler = handler
gossipSub.gossipsub[topic].incl(peerInfo.id) gossipSub.gossipsub[topic].incl(peer)
check gossipSub.gossipsub[topic].len == 15 check gossipSub.gossipsub[topic].len == 15
gossipSub.replenishFanout(topic) gossipSub.replenishFanout(topic)
@ -122,7 +124,7 @@ suite "GossipSub internal":
discard discard
let topic = "foobar" let topic = "foobar"
gossipSub.fanout[topic] = initHashSet[string]() gossipSub.fanout[topic] = initHashSet[PubSubPeer]()
gossipSub.lastFanoutPubSub[topic] = Moment.fromNow(1.millis) gossipSub.lastFanoutPubSub[topic] = Moment.fromNow(1.millis)
await sleepAsync(5.millis) # allow the topic to expire await sleepAsync(5.millis) # allow the topic to expire
@ -132,9 +134,9 @@ suite "GossipSub internal":
conns &= conn conns &= conn
let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) let peer = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id].handler = handler peer.handler = handler
gossipSub.fanout[topic].incl(peerInfo.id) gossipSub.fanout[topic].incl(peer)
check gossipSub.fanout[topic].len == GossipSubD check gossipSub.fanout[topic].len == GossipSubD
@ -157,8 +159,8 @@ suite "GossipSub internal":
let topic1 = "foobar1" let topic1 = "foobar1"
let topic2 = "foobar2" let topic2 = "foobar2"
gossipSub.fanout[topic1] = initHashSet[string]() gossipSub.fanout[topic1] = initHashSet[PubSubPeer]()
gossipSub.fanout[topic2] = initHashSet[string]() gossipSub.fanout[topic2] = initHashSet[PubSubPeer]()
gossipSub.lastFanoutPubSub[topic1] = Moment.fromNow(1.millis) gossipSub.lastFanoutPubSub[topic1] = Moment.fromNow(1.millis)
gossipSub.lastFanoutPubSub[topic2] = Moment.fromNow(1.minutes) gossipSub.lastFanoutPubSub[topic2] = Moment.fromNow(1.minutes)
await sleepAsync(5.millis) # allow the topic to expire await sleepAsync(5.millis) # allow the topic to expire
@ -169,10 +171,10 @@ suite "GossipSub internal":
conns &= conn conns &= conn
let peerInfo = randomPeerInfo() let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) let peer = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id].handler = handler peer.handler = handler
gossipSub.fanout[topic1].incl(peerInfo.id) gossipSub.fanout[topic1].incl(peer)
gossipSub.fanout[topic2].incl(peerInfo.id) gossipSub.fanout[topic2].incl(peer)
check gossipSub.fanout[topic1].len == GossipSubD check gossipSub.fanout[topic1].len == GossipSubD
check gossipSub.fanout[topic2].len == GossipSubD check gossipSub.fanout[topic2].len == GossipSubD
@ -196,9 +198,9 @@ suite "GossipSub internal":
discard discard
let topic = "foobar" let topic = "foobar"
gossipSub.mesh[topic] = initHashSet[string]() gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
gossipSub.fanout[topic] = initHashSet[string]() gossipSub.fanout[topic] = initHashSet[PubSubPeer]()
gossipSub.gossipsub[topic] = initHashSet[string]() gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
var conns = newSeq[Connection]() var conns = newSeq[Connection]()
# generate mesh and fanout peers # generate mesh and fanout peers
@ -207,12 +209,12 @@ suite "GossipSub internal":
conns &= conn conns &= conn
let peerInfo = randomPeerInfo() let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) let peer = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id].handler = handler peer.handler = handler
if i mod 2 == 0: if i mod 2 == 0:
gossipSub.fanout[topic].incl(peerInfo.id) gossipSub.fanout[topic].incl(peer)
else: else:
gossipSub.mesh[topic].incl(peerInfo.id) gossipSub.mesh[topic].incl(peer)
# generate gossipsub (free standing) peers # generate gossipsub (free standing) peers
for i in 0..<15: for i in 0..<15:
@ -220,9 +222,9 @@ suite "GossipSub internal":
conns &= conn conns &= conn
let peerInfo = randomPeerInfo() let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) let peer = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id].handler = handler peer.handler = handler
gossipSub.gossipsub[topic].incl(peerInfo.id) gossipSub.gossipsub[topic].incl(peer)
# generate messages # generate messages
for i in 0..5: for i in 0..5:
@ -240,8 +242,8 @@ suite "GossipSub internal":
let peers = gossipSub.getGossipPeers() let peers = gossipSub.getGossipPeers()
check peers.len == GossipSubD check peers.len == GossipSubD
for p in peers.keys: for p in peers.keys:
check p notin gossipSub.fanout[topic] check not gossipSub.fanout.hasPeerID(topic, p)
check p notin gossipSub.mesh[topic] check not gossipSub.mesh.hasPeerID(topic, p)
await allFuturesThrowing(conns.mapIt(it.close())) await allFuturesThrowing(conns.mapIt(it.close()))
@ -258,20 +260,20 @@ suite "GossipSub internal":
discard discard
let topic = "foobar" let topic = "foobar"
gossipSub.fanout[topic] = initHashSet[string]() gossipSub.fanout[topic] = initHashSet[PubSubPeer]()
gossipSub.gossipsub[topic] = initHashSet[string]() gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
var conns = newSeq[Connection]() var conns = newSeq[Connection]()
for i in 0..<30: for i in 0..<30:
let conn = newBufferStream(noop) let conn = newBufferStream(noop)
conns &= conn conns &= conn
let peerInfo = randomPeerInfo() let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) let peer = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id].handler = handler peer.handler = handler
if i mod 2 == 0: if i mod 2 == 0:
gossipSub.fanout[topic].incl(peerInfo.id) gossipSub.fanout[topic].incl(peer)
else: else:
gossipSub.gossipsub[topic].incl(peerInfo.id) gossipSub.gossipsub[topic].incl(peer)
# generate messages # generate messages
for i in 0..5: for i in 0..5:
@ -300,20 +302,20 @@ suite "GossipSub internal":
discard discard
let topic = "foobar" let topic = "foobar"
gossipSub.mesh[topic] = initHashSet[string]() gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
gossipSub.gossipsub[topic] = initHashSet[string]() gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
var conns = newSeq[Connection]() var conns = newSeq[Connection]()
for i in 0..<30: for i in 0..<30:
let conn = newBufferStream(noop) let conn = newBufferStream(noop)
conns &= conn conns &= conn
let peerInfo = randomPeerInfo() let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) let peer = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id].handler = handler peer.handler = handler
if i mod 2 == 0: if i mod 2 == 0:
gossipSub.mesh[topic].incl(peerInfo.id) gossipSub.mesh[topic].incl(peer)
else: else:
gossipSub.gossipsub[topic].incl(peerInfo.id) gossipSub.gossipsub[topic].incl(peer)
# generate messages # generate messages
for i in 0..5: for i in 0..5:
@ -342,20 +344,20 @@ suite "GossipSub internal":
discard discard
let topic = "foobar" let topic = "foobar"
gossipSub.mesh[topic] = initHashSet[string]() gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
gossipSub.fanout[topic] = initHashSet[string]() gossipSub.fanout[topic] = initHashSet[PubSubPeer]()
var conns = newSeq[Connection]() var conns = newSeq[Connection]()
for i in 0..<30: for i in 0..<30:
let conn = newBufferStream(noop) let conn = newBufferStream(noop)
conns &= conn conns &= conn
let peerInfo = randomPeerInfo() let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) let peer = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id].handler = handler peer.handler = handler
if i mod 2 == 0: if i mod 2 == 0:
gossipSub.mesh[topic].incl(peerInfo.id) gossipSub.mesh[topic].incl(peer)
else: else:
gossipSub.fanout[topic].incl(peerInfo.id) gossipSub.fanout[topic].incl(peer)
# generate messages # generate messages
for i in 0..5: for i in 0..5:

View File

@ -32,11 +32,11 @@ proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} =
var ceil = 15 var ceil = 15
let fsub = GossipSub(sender.pubSub.get()) let fsub = GossipSub(sender.pubSub.get())
while (not fsub.gossipsub.hasKey(key) or while (not fsub.gossipsub.hasKey(key) or
not fsub.gossipsub[key].contains(receiver.peerInfo.id)) and not fsub.gossipsub.hasPeerID(key, receiver.peerInfo.id)) and
(not fsub.mesh.hasKey(key) or (not fsub.mesh.hasKey(key) or
not fsub.mesh[key].contains(receiver.peerInfo.id)) and not fsub.mesh.hasPeerID(key, receiver.peerInfo.id)) and
(not fsub.fanout.hasKey(key) or (not fsub.fanout.hasKey(key) or
not fsub.fanout[key].contains(receiver.peerInfo.id)): not fsub.fanout.hasPeerID(key , receiver.peerInfo.id)):
trace "waitSub sleeping..." trace "waitSub sleeping..."
await sleepAsync(1.seconds) await sleepAsync(1.seconds)
dec ceil dec ceil
@ -192,7 +192,7 @@ suite "GossipSub":
check: check:
"foobar" in gossip2.topics "foobar" in gossip2.topics
"foobar" in gossip1.gossipsub "foobar" in gossip1.gossipsub
gossip2.peerInfo.id in gossip1.gossipsub["foobar"] gossip1.gossipsub.hasPeerID("foobar", gossip2.peerInfo.id)
await allFuturesThrowing(nodes.mapIt(it.stop())) await allFuturesThrowing(nodes.mapIt(it.stop()))
await allFuturesThrowing(awaitters) await allFuturesThrowing(awaitters)
@ -236,11 +236,11 @@ suite "GossipSub":
"foobar" in gossip1.gossipsub "foobar" in gossip1.gossipsub
"foobar" in gossip2.gossipsub "foobar" in gossip2.gossipsub
gossip2.peerInfo.id in gossip1.gossipsub["foobar"] or gossip1.gossipsub.hasPeerID("foobar", gossip2.peerInfo.id) or
gossip2.peerInfo.id in gossip1.mesh["foobar"] gossip1.mesh.hasPeerID("foobar", gossip2.peerInfo.id)
gossip1.peerInfo.id in gossip2.gossipsub["foobar"] or gossip2.gossipsub.hasPeerID("foobar", gossip1.peerInfo.id) or
gossip1.peerInfo.id in gossip2.mesh["foobar"] gossip2.mesh.hasPeerID("foobar", gossip1.peerInfo.id)
await allFuturesThrowing(nodes.mapIt(it.stop())) await allFuturesThrowing(nodes.mapIt(it.stop()))
await allFuturesThrowing(awaitters) await allFuturesThrowing(awaitters)