Rework pubsub (#322)

* move pubsub of off switch, pass switch into pubsub

* use join on lpstreams

* properly cleanup up failed peers

* fix tests

* fix peertable hasPeerId

* fix tests

* rework sending, remove helpers from pubsubpeer, unify in broadcast

* further split broadcast into send

* use send where appropriate

* use formatIt

* improve trace

Co-authored-by: Giovanni Petrantoni <giovanni@fragcolor.xyz>
This commit is contained in:
Dmitriy Ryajov 2020-08-11 18:05:49 -06:00 committed by GitHub
parent 59b290fcc7
commit b76b3e0e9b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 801 additions and 899 deletions

View File

@ -121,7 +121,7 @@ proc onClose(c: ConnManager, conn: Connection) {.async.} =
## triggers the connections resource cleanup ## triggers the connections resource cleanup
## ##
await conn.closeEvent.wait() await conn.join()
trace "triggering connection cleanup" trace "triggering connection cleanup"
await c.cleanupConn(conn) await c.cleanupConn(conn)

View File

@ -96,7 +96,7 @@ proc newStreamInternal*(m: Mplex,
proc cleanupChann(m: Mplex, chann: LPChannel) {.async, inline.} = proc cleanupChann(m: Mplex, chann: LPChannel) {.async, inline.} =
## remove the local channel from the internal tables ## remove the local channel from the internal tables
## ##
await chann.closeEvent.wait() await chann.join()
if not isNil(chann): if not isNil(chann):
m.getChannelList(chann.initiator).del(chann.id) m.getChannelList(chann.initiator).del(chann.id)
trace "cleaned up channel", id = chann.id trace "cleaned up channel", id = chann.id

View File

@ -31,14 +31,9 @@ type
method subscribeTopic*(f: FloodSub, method subscribeTopic*(f: FloodSub,
topic: string, topic: string,
subscribe: bool, subscribe: bool,
peerId: string) {.gcsafe, async.} = peerId: PeerID) {.gcsafe, async.} =
await procCall PubSub(f).subscribeTopic(topic, subscribe, peerId) await procCall PubSub(f).subscribeTopic(topic, subscribe, peerId)
let peer = f.peers.getOrDefault(peerId) let peer = f.peers.getOrDefault(peerId)
if peer == nil:
debug "subscribeTopic on a nil peer!", peer = peerId
return
if topic notin f.floodsub: if topic notin f.floodsub:
f.floodsub[topic] = initHashSet[PubSubPeer]() f.floodsub[topic] = initHashSet[PubSubPeer]()
@ -51,16 +46,20 @@ method subscribeTopic*(f: FloodSub,
# unsubscribe the peer from the topic # unsubscribe the peer from the topic
f.floodsub[topic].excl(peer) f.floodsub[topic].excl(peer)
method handleDisconnect*(f: FloodSub, peer: PubSubPeer) = method unsubscribePeer*(f: FloodSub, peer: PeerID) =
## handle peer disconnects ## handle peer disconnects
## ##
procCall PubSub(f).handleDisconnect(peer) trace "unsubscribing floodsub peer", peer = $peer
let pubSubPeer = f.peers.getOrDefault(peer)
if pubSubPeer.isNil:
return
if not(isNil(peer)) and peer.peerInfo notin f.conns:
for t in toSeq(f.floodsub.keys): for t in toSeq(f.floodsub.keys):
if t in f.floodsub: if t in f.floodsub:
f.floodsub[t].excl(peer) f.floodsub[t].excl(pubSubPeer)
procCall PubSub(f).unsubscribePeer(peer)
method rpcHandler*(f: FloodSub, method rpcHandler*(f: FloodSub,
peer: PubSubPeer, peer: PubSubPeer,
@ -77,7 +76,7 @@ method rpcHandler*(f: FloodSub,
if msgId notin f.seen: if msgId notin f.seen:
f.seen.put(msgId) # add the message to the seen cache f.seen.put(msgId) # add the message to the seen cache
if f.verifySignature and not msg.verify(peer.peerInfo): if f.verifySignature and not msg.verify(peer.peerId):
trace "dropping message due to failed signature verification" trace "dropping message due to failed signature verification"
continue continue
@ -102,7 +101,10 @@ method rpcHandler*(f: FloodSub,
trace "exception in message handler", exc = exc.msg trace "exception in message handler", exc = exc.msg
# forward the message to all peers interested in it # forward the message to all peers interested in it
let published = await f.publishHelper(toSendPeers, m.messages, DefaultSendTimeout) let published = await f.broadcast(
toSeq(toSendPeers),
RPCMsg(messages: m.messages),
DefaultSendTimeout)
trace "forwared message to peers", peers = published trace "forwared message to peers", peers = published
@ -118,11 +120,6 @@ method init*(f: FloodSub) =
f.handler = handler f.handler = handler
f.codec = FloodSubCodec f.codec = FloodSubCodec
method subscribePeer*(p: FloodSub,
conn: Connection) =
procCall PubSub(p).subscribePeer(conn)
asyncCheck p.handleConn(conn, FloodSubCodec)
method publish*(f: FloodSub, method publish*(f: FloodSub,
topic: string, topic: string,
data: seq[byte], data: seq[byte],
@ -143,7 +140,10 @@ method publish*(f: FloodSub,
let msg = Message.init(f.peerInfo, data, topic, f.msgSeqno, f.sign) let msg = Message.init(f.peerInfo, data, topic, f.msgSeqno, f.sign)
# start the future but do not wait yet # start the future but do not wait yet
let published = await f.publishHelper(f.floodsub.getOrDefault(topic), @[msg], timeout) let published = await f.broadcast(
toSeq(f.floodsub.getOrDefault(topic)),
RPCMsg(messages: @[msg]),
timeout)
when defined(libp2p_expensive_metrics): when defined(libp2p_expensive_metrics):
libp2p_pubsub_messages_published.inc(labelValues = [topic]) libp2p_pubsub_messages_published.inc(labelValues = [topic])
@ -167,8 +167,6 @@ method unsubscribeAll*(f: FloodSub, topic: string) {.async.} =
method initPubSub*(f: FloodSub) = method initPubSub*(f: FloodSub) =
procCall PubSub(f).initPubSub() procCall PubSub(f).initPubSub()
f.peers = initTable[string, PubSubPeer]()
f.topics = initTable[string, Topic]()
f.floodsub = initTable[string, HashSet[PubSubPeer]]() f.floodsub = initTable[string, HashSet[PubSubPeer]]()
f.seen = newTimedCache[string](2.minutes) f.seen = newTimedCache[string](2.minutes)
f.init() f.init()

View File

@ -155,10 +155,10 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
.set(g.mesh.peers(topic).int64, labelValues = [topic]) .set(g.mesh.peers(topic).int64, labelValues = [topic])
# Send changes to peers after table updates to avoid stale state # Send changes to peers after table updates to avoid stale state
for p in grafts: let graft = RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)])))
await p.sendGraft(@[topic]) let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)])))
for p in prunes: discard await g.broadcast(grafts, graft, DefaultSendTimeout)
await p.sendPrune(@[topic]) discard await g.broadcast(prunes, prune, DefaultSendTimeout)
trace "mesh balanced, got peers", peers = g.mesh.peers(topic) trace "mesh balanced, got peers", peers = g.mesh.peers(topic)
@ -177,7 +177,7 @@ proc dropFanoutPeers(g: GossipSub) =
libp2p_gossipsub_peers_per_topic_fanout libp2p_gossipsub_peers_per_topic_fanout
.set(g.fanout.peers(topic).int64, labelValues = [topic]) .set(g.fanout.peers(topic).int64, labelValues = [topic])
proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} = proc getGossipPeers(g: GossipSub): Table[PubSubPeer, ControlMessage] {.gcsafe.} =
## gossip iHave messages to peers ## gossip iHave messages to peers
## ##
@ -209,10 +209,10 @@ proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} =
if peer in gossipPeers: if peer in gossipPeers:
continue continue
if peer.id notin result: if peer notin result:
result[peer.id] = controlMsg result[peer] = controlMsg
result[peer.id].ihave.add(ihave) result[peer].ihave.add(ihave)
proc heartbeat(g: GossipSub) {.async.} = proc heartbeat(g: GossipSub) {.async.} =
while g.heartbeatRunning: while g.heartbeatRunning:
@ -231,8 +231,11 @@ proc heartbeat(g: GossipSub) {.async.} =
let peers = g.getGossipPeers() let peers = g.getGossipPeers()
var sent: seq[Future[void]] var sent: seq[Future[void]]
for peer, control in peers: for peer, control in peers:
g.peers.withValue(peer, pubsubPeer) do: g.peers.withValue(peer.peerId, pubsubPeer) do:
sent &= pubsubPeer[].send(RPCMsg(control: some(control))) sent &= g.send(
pubsubPeer[],
RPCMsg(control: some(control)),
DefaultSendTimeout)
checkFutures(await allFinished(sent)) checkFutures(await allFinished(sent))
g.mcache.shift() # shift the cache g.mcache.shift() # shift the cache
@ -243,47 +246,46 @@ proc heartbeat(g: GossipSub) {.async.} =
await sleepAsync(GossipSubHeartbeatInterval) await sleepAsync(GossipSubHeartbeatInterval)
method handleDisconnect*(g: GossipSub, peer: PubSubPeer) = method unsubscribePeer*(g: GossipSub, peer: PeerID) =
## handle peer disconnects ## handle peer disconnects
## ##
procCall FloodSub(g).handleDisconnect(peer) trace "unsubscribing gossipsub peer", peer = $peer
let pubSubPeer = g.peers.getOrDefault(peer)
if pubSubPeer.isNil:
return
if not(isNil(peer)) and peer.peerInfo notin g.conns:
for t in toSeq(g.gossipsub.keys): for t in toSeq(g.gossipsub.keys):
g.gossipsub.removePeer(t, peer) g.gossipsub.removePeer(t, pubSubPeer)
when defined(libp2p_expensive_metrics): when defined(libp2p_expensive_metrics):
libp2p_gossipsub_peers_per_topic_gossipsub libp2p_gossipsub_peers_per_topic_gossipsub
.set(g.gossipsub.peers(t).int64, labelValues = [t]) .set(g.gossipsub.peers(t).int64, labelValues = [t])
for t in toSeq(g.mesh.keys): for t in toSeq(g.mesh.keys):
g.mesh.removePeer(t, peer) g.mesh.removePeer(t, pubSubPeer)
when defined(libp2p_expensive_metrics): when defined(libp2p_expensive_metrics):
libp2p_gossipsub_peers_per_topic_mesh libp2p_gossipsub_peers_per_topic_mesh
.set(g.mesh.peers(t).int64, labelValues = [t]) .set(g.mesh.peers(t).int64, labelValues = [t])
for t in toSeq(g.fanout.keys): for t in toSeq(g.fanout.keys):
g.fanout.removePeer(t, peer) g.fanout.removePeer(t, pubSubPeer)
when defined(libp2p_expensive_metrics): when defined(libp2p_expensive_metrics):
libp2p_gossipsub_peers_per_topic_fanout libp2p_gossipsub_peers_per_topic_fanout
.set(g.fanout.peers(t).int64, labelValues = [t]) .set(g.fanout.peers(t).int64, labelValues = [t])
method subscribePeer*(p: GossipSub, procCall FloodSub(g).unsubscribePeer(peer)
conn: Connection) =
procCall PubSub(p).subscribePeer(conn)
asyncCheck p.handleConn(conn, GossipSubCodec)
method subscribeTopic*(g: GossipSub, method subscribeTopic*(g: GossipSub,
topic: string, topic: string,
subscribe: bool, subscribe: bool,
peerId: string) {.gcsafe, async.} = peerId: PeerID) {.gcsafe, async.} =
await procCall FloodSub(g).subscribeTopic(topic, subscribe, peerId) await procCall FloodSub(g).subscribeTopic(topic, subscribe, peerId)
logScope: logScope:
peer = peerId peer = $peerId
topic topic
let peer = g.peers.getOrDefault(peerId) let peer = g.peers.getOrDefault(peerId)
@ -404,7 +406,7 @@ method rpcHandler*(g: GossipSub,
g.seen.put(msgId) # add the message to the seen cache g.seen.put(msgId) # add the message to the seen cache
if g.verifySignature and not msg.verify(peer.peerInfo): if g.verifySignature and not msg.verify(peer.peerId):
trace "dropping message due to failed signature verification" trace "dropping message due to failed signature verification"
continue continue
@ -437,7 +439,10 @@ method rpcHandler*(g: GossipSub,
trace "exception in message handler", exc = exc.msg trace "exception in message handler", exc = exc.msg
# forward the message to all peers interested in it # forward the message to all peers interested in it
let published = await g.publishHelper(toSendPeers, m.messages, DefaultSendTimeout) let published = await g.broadcast(
toSeq(toSendPeers),
RPCMsg(messages: m.messages),
DefaultSendTimeout)
trace "forwared message to peers", peers = published trace "forwared message to peers", peers = published
@ -454,8 +459,10 @@ method rpcHandler*(g: GossipSub,
respControl.ihave.len > 0: respControl.ihave.len > 0:
try: try:
info "sending control message", msg = respControl info "sending control message", msg = respControl
await peer.send( await g.send(
RPCMsg(control: some(respControl), messages: messages)) peer,
RPCMsg(control: some(respControl), messages: messages),
DefaultSendTimeout)
except CancelledError as exc: except CancelledError as exc:
raise exc raise exc
except CatchableError as exc: except CatchableError as exc:
@ -478,10 +485,8 @@ method unsubscribe*(g: GossipSub,
let peers = g.mesh.getOrDefault(topic) let peers = g.mesh.getOrDefault(topic)
g.mesh.del(topic) g.mesh.del(topic)
var pending = newSeq[Future[void]]() let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)])))
for peer in peers: discard await g.broadcast(toSeq(peers), prune, DefaultSendTimeout)
pending.add(peer.sendPrune(@[topic]))
checkFutures(await allFinished(pending))
method unsubscribeAll*(g: GossipSub, topic: string) {.async.} = method unsubscribeAll*(g: GossipSub, topic: string) {.async.} =
await procCall PubSub(g).unsubscribeAll(topic) await procCall PubSub(g).unsubscribeAll(topic)
@ -490,10 +495,8 @@ method unsubscribeAll*(g: GossipSub, topic: string) {.async.} =
let peers = g.mesh.getOrDefault(topic) let peers = g.mesh.getOrDefault(topic)
g.mesh.del(topic) g.mesh.del(topic)
var pending = newSeq[Future[void]]() let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)])))
for peer in peers: discard await g.broadcast(toSeq(peers), prune, DefaultSendTimeout)
pending.add(peer.sendPrune(@[topic]))
checkFutures(await allFinished(pending))
method publish*(g: GossipSub, method publish*(g: GossipSub,
topic: string, topic: string,
@ -534,7 +537,7 @@ method publish*(g: GossipSub,
if msgId notin g.mcache: if msgId notin g.mcache:
g.mcache.put(msgId, msg) g.mcache.put(msgId, msg)
let published = await g.publishHelper(peers, @[msg], timeout) let published = await g.broadcast(toSeq(peers), RPCMsg(messages: @[msg]), timeout)
when defined(libp2p_expensive_metrics): when defined(libp2p_expensive_metrics):
if published > 0: if published > 0:
libp2p_pubsub_messages_published.inc(labelValues = [topic]) libp2p_pubsub_messages_published.inc(labelValues = [topic])

View File

@ -13,10 +13,10 @@ import pubsubpeer, ../../peerid
type type
PeerTable* = Table[string, HashSet[PubSubPeer]] # topic string to peer map PeerTable* = Table[string, HashSet[PubSubPeer]] # topic string to peer map
proc hasPeerID*(t: PeerTable, topic, peerId: string): bool = proc hasPeerID*(t: PeerTable, topic: string, peerId: PeerID): bool =
let peers = toSeq(t.getOrDefault(topic)) let peers = toSeq(t.getOrDefault(topic))
peers.any do (peer: PubSubPeer) -> bool: peers.any do (peer: PubSubPeer) -> bool:
peer.id == peerId peer.peerId == peerId
func addPeer*(table: var PeerTable, topic: string, peer: PubSubPeer): bool = func addPeer*(table: var PeerTable, topic: string, peer: PubSubPeer): bool =
# returns true if the peer was added, # returns true if the peer was added,

View File

@ -11,6 +11,7 @@ import std/[tables, sequtils, sets]
import chronos, chronicles, metrics import chronos, chronicles, metrics
import pubsubpeer, import pubsubpeer,
rpc/[message, messages], rpc/[message, messages],
../../switch,
../protocol, ../protocol,
../../stream/connection, ../../stream/connection,
../../peerid, ../../peerid,
@ -47,10 +48,10 @@ type
handler*: seq[TopicHandler] handler*: seq[TopicHandler]
PubSub* = ref object of LPProtocol PubSub* = ref object of LPProtocol
switch*: Switch # the switch used to dial/connect to peers
peerInfo*: PeerInfo # this peer's info peerInfo*: PeerInfo # this peer's info
topics*: Table[string, Topic] # local topics topics*: Table[string, Topic] # local topics
peers*: Table[string, PubSubPeer] # peerid to peer map peers*: Table[PeerID, PubSubPeer] # peerid to peer map
conns*: Table[PeerInfo, HashSet[Connection]] # peers connections
triggerSelf*: bool # trigger own local handler on publish triggerSelf*: bool # trigger own local handler on publish
verifySignature*: bool # enable signature verification verifySignature*: bool # enable signature verification
sign*: bool # enable message signing sign*: bool # enable message signing
@ -59,49 +60,65 @@ type
observers: ref seq[PubSubObserver] # ref as in smart_ptr observers: ref seq[PubSubObserver] # ref as in smart_ptr
msgIdProvider*: MsgIdProvider # Turn message into message id (not nil) msgIdProvider*: MsgIdProvider # Turn message into message id (not nil)
msgSeqno*: uint64 msgSeqno*: uint64
lifetimeFut*: Future[void] # pubsub liftime future
method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.base.} = method unsubscribePeer*(p: PubSub, peerId: PeerID) {.base.} =
## handle peer disconnects ## handle peer disconnects
## ##
if not(isNil(peer)) and peer.peerInfo notin p.conns: trace "unsubscribing pubsub peer", peer = $peerId
trace "deleting peer", peer = peer.id if peerId in p.peers:
peer.onConnect.fire() # Make sure all pending sends are unblocked p.peers.del(peerId)
p.peers.del(peer.id)
trace "peer disconnected", peer = peer.id
# metrics
libp2p_pubsub_peers.set(p.peers.len.int64) libp2p_pubsub_peers.set(p.peers.len.int64)
proc onConnClose(p: PubSub, conn: Connection) {.async.} = proc send*(
p: PubSub,
peer: PubSubPeer,
msg: RPCMsg,
timeout: Duration) {.async.} =
## send to remote peer
##
trace "sending pubsub message to peer", peer = $peer, msg = msg
try: try:
let peer = conn.peerInfo await peer.send(msg, timeout)
await conn.closeEvent.wait()
if peer in p.conns:
p.conns[peer].excl(conn)
if p.conns[peer].len <= 0:
p.conns.del(peer)
if peer.id in p.peers:
p.handleDisconnect(p.peers[peer.id])
except CancelledError as exc: except CancelledError as exc:
raise exc raise exc
except CatchableError as exc: except CatchableError as exc:
trace "exception in onConnClose handler", exc = exc.msg trace "exception sending pubsub message to peer", peer = $peer, msg = msg
p.unsubscribePeer(peer.peerId)
raise exc
proc broadcast*(
p: PubSub,
sendPeers: seq[PubSubPeer],
msg: RPCMsg,
timeout: Duration): Future[int] {.async.} =
## send messages and cleanup failed peers
##
trace "broadcasting messages to peers", peers = sendPeers.len, message = msg
let sent = await allFinished(
sendPeers.mapIt( p.send(it, msg, timeout) ))
return sent.filterIt( it.finished and it.error.isNil ).len
trace "messages broadcasted to peers", peers = sent.len
proc sendSubs*(p: PubSub, proc sendSubs*(p: PubSub,
peer: PubSubPeer, peer: PubSubPeer,
topics: seq[string], topics: seq[string],
subscribe: bool) {.async.} = subscribe: bool) {.async.} =
## send subscriptions to remote peer ## send subscriptions to remote peer
asyncCheck peer.sendSubOpts(topics, subscribe) await p.send(
peer,
RPCMsg(
subscriptions: topics.mapIt(SubOpts(subscribe: subscribe, topic: it))),
DefaultSendTimeout)
method subscribeTopic*(p: PubSub, method subscribeTopic*(p: PubSub,
topic: string, topic: string,
subscribe: bool, subscribe: bool,
peerId: string) {.base, async.} = peerId: PeerID) {.base, async.} =
# called when remote peer subscribes to a topic # called when remote peer subscribes to a topic
discard discard
@ -116,24 +133,24 @@ method rpcHandler*(p: PubSub,
if m.subscriptions.len > 0: # if there are any subscriptions if m.subscriptions.len > 0: # if there are any subscriptions
for s in m.subscriptions: # subscribe/unsubscribe the peer for each topic for s in m.subscriptions: # subscribe/unsubscribe the peer for each topic
trace "about to subscribe to topic", topicId = s.topic trace "about to subscribe to topic", topicId = s.topic
await p.subscribeTopic(s.topic, s.subscribe, peer.id) await p.subscribeTopic(s.topic, s.subscribe, peer.peerId)
proc getOrCreatePeer(p: PubSub, proc getOrCreatePeer*(
peerInfo: PeerInfo, p: PubSub,
peer: PeerID,
proto: string): PubSubPeer = proto: string): PubSubPeer =
if peerInfo.id in p.peers: if peer in p.peers:
return p.peers[peerInfo.id] return p.peers[peer]
# create new pubsub peer # create new pubsub peer
let peer = newPubSubPeer(peerInfo, proto) let pubSubPeer = newPubSubPeer(peer, p.switch, proto)
trace "created new pubsub peer", peerId = peer.id trace "created new pubsub peer", peerId = $peer
p.peers[peer.id] = peer p.peers[peer] = pubSubPeer
peer.observers = p.observers pubSubPeer.observers = p.observers
libp2p_pubsub_peers.set(p.peers.len.int64) libp2p_pubsub_peers.set(p.peers.len.int64)
return pubSubPeer
return peer
method handleConn*(p: PubSub, method handleConn*(p: PubSub,
conn: Connection, conn: Connection,
@ -154,19 +171,11 @@ method handleConn*(p: PubSub,
await conn.close() await conn.close()
return return
# track connection
p.conns.mgetOrPut(conn.peerInfo,
initHashSet[Connection]())
.incl(conn)
asyncCheck p.onConnClose(conn)
proc handler(peer: PubSubPeer, msgs: seq[RPCMsg]) {.async.} = proc handler(peer: PubSubPeer, msgs: seq[RPCMsg]) {.async.} =
# call pubsub rpc handler # call pubsub rpc handler
await p.rpcHandler(peer, msgs) await p.rpcHandler(peer, msgs)
let peer = p.getOrCreatePeer(conn.peerInfo, proto) let peer = p.getOrCreatePeer(conn.peerInfo.peerId, proto)
if p.topics.len > 0: if p.topics.len > 0:
await p.sendSubs(peer, toSeq(p.topics.keys), true) await p.sendSubs(peer, toSeq(p.topics.keys), true)
@ -181,32 +190,16 @@ method handleConn*(p: PubSub,
finally: finally:
await conn.close() await conn.close()
method subscribePeer*(p: PubSub, conn: Connection) {.base.} = method subscribePeer*(p: PubSub, peer: PeerID) {.base.} =
if not(isNil(conn)): ## subscribe to remote peer to receive/send pubsub
trace "subscribing to peer", peerId = conn.peerInfo.id ## messages
##
# track connection let pubsubPeer = p.getOrCreatePeer(peer, p.codec)
p.conns.mgetOrPut(conn.peerInfo, if p.topics.len > 0:
initHashSet[Connection]()) asyncCheck p.sendSubs(pubsubPeer, toSeq(p.topics.keys), true)
.incl(conn)
asyncCheck p.onConnClose(conn) pubsubPeer.subscribed = true
let peer = p.getOrCreatePeer(conn.peerInfo, p.codec)
if not peer.connected:
peer.conn = conn
method unsubscribePeer*(p: PubSub, peerInfo: PeerInfo) {.base, async.} =
if peerInfo.id in p.peers:
let peer = p.peers[peerInfo.id]
trace "unsubscribing from peer", peerId = $peerInfo
if not(isNil(peer)) and not(isNil(peer.conn)):
await peer.conn.close()
proc connected*(p: PubSub, peerId: PeerID): bool =
p.peers.withValue($peerId, peer):
return peer[] != nil and peer[].connected
method unsubscribe*(p: PubSub, method unsubscribe*(p: PubSub,
topics: seq[TopicPair]) {.base, async.} = topics: seq[TopicPair]) {.base, async.} =
@ -261,40 +254,6 @@ method subscribe*(p: PubSub,
# metrics # metrics
libp2p_pubsub_topics.set(p.topics.len.int64) libp2p_pubsub_topics.set(p.topics.len.int64)
proc publishHelper*(p: PubSub,
sendPeers: HashSet[PubSubPeer],
msgs: seq[Message],
timeout: Duration): Future[int] {.async.} =
# send messages and cleanup failed peers
var sent: seq[tuple[id: string, fut: Future[void]]]
for sendPeer in sendPeers:
# avoid sending to self
if sendPeer.peerInfo == p.peerInfo:
continue
trace "sending messages to peer", peer = sendPeer.id, msgs
sent.add((id: sendPeer.id, fut: sendPeer.send(RPCMsg(messages: msgs), timeout)))
var published: seq[string]
var failed: seq[string]
let futs = await allFinished(sent.mapIt(it.fut))
for s in futs:
let f = sent.filterIt(it.fut == s)
if f.len > 0:
if s.failed:
trace "sending messages to peer failed", peer = f[0].id
failed.add(f[0].id)
else:
trace "sending messages to peer succeeded", peer = f[0].id
published.add(f[0].id)
for f in failed:
let peer = p.peers.getOrDefault(f)
if not(isNil(peer)) and not(isNil(peer.conn)):
await peer.conn.close()
return published.len
method publish*(p: PubSub, method publish*(p: PubSub,
topic: string, topic: string,
data: seq[byte], data: seq[byte],
@ -364,16 +323,20 @@ method validate*(p: PubSub, message: Message): Future[bool] {.async, base.} =
else: else:
libp2p_pubsub_validation_failure.inc() libp2p_pubsub_validation_failure.inc()
proc newPubSub*(P: typedesc[PubSub], proc init*(
peerInfo: PeerInfo, P: typedesc[PubSub],
switch: Switch,
triggerSelf: bool = false, triggerSelf: bool = false,
verifySignature: bool = true, verifySignature: bool = true,
sign: bool = true, sign: bool = true,
msgIdProvider: MsgIdProvider = defaultMsgIdProvider): P = msgIdProvider: MsgIdProvider = defaultMsgIdProvider): P =
result = P(peerInfo: peerInfo, result = P(switch: switch,
peerInfo: switch.peerInfo,
triggerSelf: triggerSelf, triggerSelf: triggerSelf,
verifySignature: verifySignature, verifySignature: verifySignature,
sign: sign, sign: sign,
peers: initTable[PeerID, PubSubPeer](),
topics: initTable[string, Topic](),
cleanupLock: newAsyncLock(), cleanupLock: newAsyncLock(),
msgIdProvider: msgIdProvider) msgIdProvider: msgIdProvider)
result.initPubSub() result.initPubSub()
@ -385,6 +348,3 @@ proc removeObserver*(p: PubSub; observer: PubSubObserver) =
let idx = p.observers[].find(observer) let idx = p.observers[].find(observer)
if idx != -1: if idx != -1:
p.observers[].del(idx) p.observers[].del(idx)
proc connected*(p: PubSub, peerInfo: PeerInfo): bool {.deprecated: "Use PeerID version".} =
peerInfo != nil and connected(p, peerInfo.peerId)

View File

@ -11,6 +11,7 @@ import std/[hashes, options, sequtils, strutils, tables]
import chronos, chronicles, nimcrypto/sha2, metrics import chronos, chronicles, nimcrypto/sha2, metrics
import rpc/[messages, message, protobuf], import rpc/[messages, message, protobuf],
timedcache, timedcache,
../../switch,
../../peerid, ../../peerid,
../../peerinfo, ../../peerinfo,
../../stream/connection, ../../stream/connection,
@ -28,7 +29,6 @@ when defined(libp2p_expensive_metrics):
declareCounter(libp2p_pubsub_skipped_sent_messages, "number of sent skipped messages", labels = ["id"]) declareCounter(libp2p_pubsub_skipped_sent_messages, "number of sent skipped messages", labels = ["id"])
const const
DefaultReadTimeout* = 1.minutes
DefaultSendTimeout* = 10.seconds DefaultSendTimeout* = 10.seconds
type type
@ -37,14 +37,16 @@ type
onSend*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].} onSend*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].}
PubSubPeer* = ref object of RootObj PubSubPeer* = ref object of RootObj
proto*: string # the protocol that this peer joined from switch*: Switch # switch instance to dial peers
codec*: string # the protocol that this peer joined from
sendConn: Connection sendConn: Connection
peerInfo*: PeerInfo peerId*: PeerID
handler*: RPCHandler handler*: RPCHandler
sentRpcCache: TimedCache[string] # cache for already sent messages sentRpcCache: TimedCache[string] # cache for already sent messages
recvdRpcCache: TimedCache[string] # cache for already received messages recvdRpcCache: TimedCache[string] # cache for already received messages
onConnect*: AsyncEvent
observers*: ref seq[PubSubObserver] # ref as in smart_ptr observers*: ref seq[PubSubObserver] # ref as in smart_ptr
subscribed*: bool # are we subscribed to this peer
sendLock*: AsyncLock # send connection lock
RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.} RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.}
@ -52,19 +54,13 @@ func hash*(p: PubSubPeer): Hash =
# int is either 32/64, so intptr basically, pubsubpeer is a ref # int is either 32/64, so intptr basically, pubsubpeer is a ref
cast[pointer](p).hash cast[pointer](p).hash
proc id*(p: PubSubPeer): string = p.peerInfo.id proc id*(p: PubSubPeer): string =
doAssert(not p.isNil, "nil pubsubpeer")
p.peerId.pretty
proc connected*(p: PubSubPeer): bool = proc connected*(p: PubSubPeer): bool =
not(isNil(p.sendConn)) not p.sendConn.isNil and not
(p.sendConn.closed or p.sendConn.atEof)
proc `conn=`*(p: PubSubPeer, conn: Connection) =
if not(isNil(conn)):
trace "attaching send connection for peer", peer = p.id
p.sendConn = conn
p.onConnect.fire()
proc conn*(p: PubSubPeer): Connection =
p.sendConn
proc recvObservers(p: PubSubPeer, msg: var RPCMsg) = proc recvObservers(p: PubSubPeer, msg: var RPCMsg) =
# trigger hooks # trigger hooks
@ -83,12 +79,13 @@ proc sendObservers(p: PubSubPeer, msg: var RPCMsg) =
proc handle*(p: PubSubPeer, conn: Connection) {.async.} = proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
logScope: logScope:
peer = p.id peer = p.id
debug "starting pubsub read loop for peer", closed = conn.closed debug "starting pubsub read loop for peer", closed = conn.closed
try: try:
try: try:
while not conn.atEof: while not conn.atEof:
trace "waiting for data", closed = conn.closed trace "waiting for data", closed = conn.closed
let data = await conn.readLp(64 * 1024).wait(DefaultReadTimeout) let data = await conn.readLp(64 * 1024)
let digest = $(sha256.digest(data)) let digest = $(sha256.digest(data))
trace "read data from peer", data = data.shortLog trace "read data from peer", data = data.shortLog
if digest in p.recvdRpcCache: if digest in p.recvdRpcCache:
@ -124,12 +121,14 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
raise exc raise exc
except CatchableError as exc: except CatchableError as exc:
trace "Exception occurred in PubSubPeer.handle", exc = exc.msg trace "Exception occurred in PubSubPeer.handle", exc = exc.msg
raise exc
proc send*( proc send*(
p: PubSubPeer, p: PubSubPeer,
msg: RPCMsg, msg: RPCMsg,
timeout: Duration = DefaultSendTimeout) {.async.} = timeout: Duration = DefaultSendTimeout) {.async.} =
doAssert(not isNil(p), "pubsubpeer nil!")
logScope: logScope:
peer = p.id peer = p.id
rpcMsg = shortLog(msg) rpcMsg = shortLog(msg)
@ -155,20 +154,27 @@ proc send*(
libp2p_pubsub_skipped_sent_messages.inc(labelValues = [p.id]) libp2p_pubsub_skipped_sent_messages.inc(labelValues = [p.id])
return return
proc sendToRemote() {.async.} = try:
logScope:
peer = p.id
rpcMsg = shortLog(msg)
trace "about to send message" trace "about to send message"
if not p.connected:
try:
await p.sendLock.acquire()
trace "no send connection, dialing peer"
# get a send connection if there is none
p.sendConn = await p.switch.dial(
p.peerId, p.codec)
if not p.onConnect.isSet: if not p.connected:
await p.onConnect.wait() raise newException(CatchableError, "unable to get send pubsub stream")
# install a reader on the send connection
asyncCheck p.handle(p.sendConn)
finally:
if p.sendLock.locked:
p.sendLock.release()
if p.connected: # this can happen if the remote disconnected
trace "sending encoded msgs to peer" trace "sending encoded msgs to peer"
await p.sendConn.writeLp(encoded).wait(timeout)
await p.sendConn.writeLp(encoded)
p.sentRpcCache.put(digest) p.sentRpcCache.put(digest)
trace "sent pubsub message to remote" trace "sent pubsub message to remote"
@ -178,67 +184,24 @@ proc send*(
# metrics # metrics
libp2p_pubsub_sent_messages.inc(labelValues = [p.id, t]) libp2p_pubsub_sent_messages.inc(labelValues = [p.id, t])
let sendFut = sendToRemote()
try:
await sendFut.wait(timeout)
except CatchableError as exc: except CatchableError as exc:
trace "unable to send to remote", exc = exc.msg trace "unable to send to remote", exc = exc.msg
if not sendFut.finished:
sendFut.cancel()
if not(isNil(p.sendConn)): if not(isNil(p.sendConn)):
await p.sendConn.close() await p.sendConn.close()
p.sendConn = nil p.sendConn = nil
p.onConnect.clear()
raise exc raise exc
proc sendSubOpts*(p: PubSubPeer, topics: seq[string], subscribe: bool) {.async.} =
trace "sending subscriptions", peer = p.id, subscribe, topicIDs = topics
try:
await p.send(RPCMsg(
subscriptions: topics.mapIt(SubOpts(subscribe: subscribe, topic: it))),
# the long timeout is mostly for cases where
# the connection is flaky at the beggingin
timeout = 3.minutes)
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception sending subscriptions", exc = exc.msg
proc sendGraft*(p: PubSubPeer, topics: seq[string]) {.async.} =
trace "sending graft to peer", peer = p.id, topicIDs = topics
try:
await p.send(RPCMsg(control: some(
ControlMessage(graft: topics.mapIt(ControlGraft(topicID: it))))),
timeout = 1.minutes)
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception sending grafts", exc = exc.msg
proc sendPrune*(p: PubSubPeer, topics: seq[string]) {.async.} =
trace "sending prune to peer", peer = p.id, topicIDs = topics
try:
await p.send(RPCMsg(control: some(
ControlMessage(prune: topics.mapIt(ControlPrune(topicID: it))))),
timeout = 1.minutes)
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception sending prunes", exc = exc.msg
proc `$`*(p: PubSubPeer): string = proc `$`*(p: PubSubPeer): string =
p.id p.id
proc newPubSubPeer*(peerInfo: PeerInfo, proc newPubSubPeer*(peerId: PeerID,
proto: string): PubSubPeer = switch: Switch,
codec: string): PubSubPeer =
new result new result
result.proto = proto result.switch = switch
result.peerInfo = peerInfo result.codec = codec
result.peerId = peerId
result.sentRpcCache = newTimedCache[string](2.minutes) result.sentRpcCache = newTimedCache[string](2.minutes)
result.recvdRpcCache = newTimedCache[string](2.minutes) result.recvdRpcCache = newTimedCache[string](2.minutes)
result.onConnect = newAsyncEvent() result.sendLock = newAsyncLock()

View File

@ -10,7 +10,8 @@
{.push raises: [Defect].} {.push raises: [Defect].}
import chronicles, metrics, stew/[byteutils, endians2] import chronicles, metrics, stew/[byteutils, endians2]
import ./messages, ./protobuf, import ./messages,
./protobuf,
../../../peerid, ../../../peerid,
../../../peerinfo, ../../../peerinfo,
../../../crypto/crypto, ../../../crypto/crypto,
@ -32,7 +33,7 @@ func defaultMsgIdProvider*(m: Message): string =
proc sign*(msg: Message, p: PeerInfo): CryptoResult[seq[byte]] = proc sign*(msg: Message, p: PeerInfo): CryptoResult[seq[byte]] =
ok((? p.privateKey.sign(PubSubPrefix & encodeMessage(msg))).getBytes()) ok((? p.privateKey.sign(PubSubPrefix & encodeMessage(msg))).getBytes())
proc verify*(m: Message, p: PeerInfo): bool = proc verify*(m: Message, p: PeerID): bool =
if m.signature.len > 0 and m.key.len > 0: if m.signature.len > 0 and m.key.len > 0:
var msg = m var msg = m
msg.signature = @[] msg.signature = @[]
@ -51,17 +52,17 @@ proc verify*(m: Message, p: PeerInfo): bool =
proc init*( proc init*(
T: type Message, T: type Message,
p: PeerInfo, peer: PeerInfo,
data: seq[byte], data: seq[byte],
topic: string, topic: string,
seqno: uint64, seqno: uint64,
sign: bool = true): Message {.gcsafe, raises: [CatchableError, Defect].} = sign: bool = true): Message {.gcsafe, raises: [CatchableError, Defect].} =
result = Message( result = Message(
fromPeer: p.peerId, fromPeer: peer.peerId,
data: data, data: data,
seqno: @(seqno.toBytesBE), # unefficient, fine for now seqno: @(seqno.toBytesBE), # unefficient, fine for now
topicIDs: @[topic]) topicIDs: @[topic])
if sign and p.publicKey.isSome: if sign and peer.publicKey.isSome:
result.signature = sign(result, p).tryGet() result.signature = sign(result, peer).tryGet()
result.key = p.publicKey.get().getBytes().tryGet() result.key = peer.publicKey.get().getBytes().tryGet()

View File

@ -62,7 +62,7 @@ proc handleConn*(s: Secure,
initiator: bool): Future[Connection] {.async, gcsafe.} = initiator: bool): Future[Connection] {.async, gcsafe.} =
var sconn = await s.handshake(conn, initiator) var sconn = await s.handshake(conn, initiator)
if not isNil(sconn): if not isNil(sconn):
conn.closeEvent.wait() conn.join()
.addCallback do(udata: pointer = nil): .addCallback do(udata: pointer = nil):
asyncCheck sconn.close() asyncCheck sconn.close()

View File

@ -1,16 +1,9 @@
# compile time options here
const
libp2p_pubsub_sign {.booldefine.} = true
libp2p_pubsub_verify {.booldefine.} = true
import import
options, tables, chronos, bearssl, options, tables, chronos, bearssl,
switch, peerid, peerinfo, stream/connection, multiaddress, switch, peerid, peerinfo, stream/connection, multiaddress,
crypto/crypto, transports/[transport, tcptransport], crypto/crypto, transports/[transport, tcptransport],
muxers/[muxer, mplex/mplex, mplex/types], muxers/[muxer, mplex/mplex, mplex/types],
protocols/[identify, secure/secure], protocols/[identify, secure/secure]
protocols/pubsub/[pubsub, gossipsub, floodsub],
protocols/pubsub/rpc/message
import import
protocols/secure/noise, protocols/secure/noise,
@ -26,17 +19,12 @@ type
proc newStandardSwitch*(privKey = none(PrivateKey), proc newStandardSwitch*(privKey = none(PrivateKey),
address = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(), address = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(),
triggerSelf = false,
gossip = false,
secureManagers: openarray[SecureProtocol] = [ secureManagers: openarray[SecureProtocol] = [
# array cos order matters # array cos order matters
SecureProtocol.Secio, SecureProtocol.Secio,
SecureProtocol.Noise, SecureProtocol.Noise,
], ],
verifySignature = libp2p_pubsub_verify,
sign = libp2p_pubsub_sign,
transportFlags: set[ServerFlags] = {}, transportFlags: set[ServerFlags] = {},
msgIdProvider: MsgIdProvider = defaultMsgIdProvider,
rng = newRng(), rng = newRng(),
inTimeout: Duration = 5.minutes, inTimeout: Duration = 5.minutes,
outTimeout: Duration = 5.minutes): Switch = outTimeout: Duration = 5.minutes): Switch =
@ -66,25 +54,11 @@ proc newStandardSwitch*(privKey = none(PrivateKey),
of SecureProtocol.Secio: of SecureProtocol.Secio:
secureManagerInstances &= newSecio(rng, seckey).Secure secureManagerInstances &= newSecio(rng, seckey).Secure
let pubSub = if gossip: let switch = newSwitch(
newPubSub(GossipSub,
peerInfo = peerInfo,
triggerSelf = triggerSelf,
verifySignature = verifySignature,
sign = sign,
msgIdProvider = msgIdProvider).PubSub
else:
newPubSub(FloodSub,
peerInfo = peerInfo,
triggerSelf = triggerSelf,
verifySignature = verifySignature,
sign = sign,
msgIdProvider = msgIdProvider).PubSub
newSwitch(
peerInfo, peerInfo,
transports, transports,
identify, identify,
muxers, muxers,
secureManagers = secureManagerInstances, secureManagers = secureManagerInstances)
pubSub = some(pubSub))
return switch

View File

@ -45,7 +45,7 @@ template withExceptions(body: untyped) =
raise exc raise exc
except TransportIncompleteError: except TransportIncompleteError:
# for all intents and purposes this is an EOF # for all intents and purposes this is an EOF
raise newLPStreamEOFError() raise newLPStreamIncompleteError()
except TransportLimitError: except TransportLimitError:
raise newLPStreamLimitError() raise newLPStreamLimitError()
except TransportUseClosedError: except TransportUseClosedError:

View File

@ -115,7 +115,11 @@ proc readExactly*(s: LPStream,
read += await s.readOnce(addr pbuffer[read], nbytes - read) read += await s.readOnce(addr pbuffer[read], nbytes - read)
if read < nbytes: if read < nbytes:
trace "incomplete data received", read if s.atEof:
trace "couldn't read all bytes, stream EOF", expected = nbytes, read
raise newLPStreamEOFError()
else:
trace "couldn't read all bytes, incomplete data", expected = nbytes, read
raise newLPStreamIncompleteError() raise newLPStreamIncompleteError()
proc readLine*(s: LPStream, proc readLine*(s: LPStream,

View File

@ -25,12 +25,14 @@ import stream/connection,
protocols/secure/secure, protocols/secure/secure,
peerinfo, peerinfo,
protocols/identify, protocols/identify,
protocols/pubsub/pubsub,
muxers/muxer, muxers/muxer,
connmanager, connmanager,
peerid, peerid,
errors errors
chronicles.formatIt(PeerInfo): $it
chronicles.formatIt(PeerID): $it
logScope: logScope:
topics = "switch" topics = "switch"
@ -44,9 +46,6 @@ declareCounter(libp2p_dialed_peers, "dialed peers")
declareCounter(libp2p_failed_dials, "failed dials") declareCounter(libp2p_failed_dials, "failed dials")
declareCounter(libp2p_failed_upgrade, "peers failed upgrade") declareCounter(libp2p_failed_upgrade, "peers failed upgrade")
const
MaxPubsubReconnectAttempts* = 10
type type
NoPubSubException* = object of CatchableError NoPubSubException* = object of CatchableError
@ -77,13 +76,8 @@ type
identity*: Identify identity*: Identify
streamHandler*: StreamHandler streamHandler*: StreamHandler
secureManagers*: seq[Secure] secureManagers*: seq[Secure]
pubSub*: Option[PubSub]
dialLock: Table[PeerID, AsyncLock] dialLock: Table[PeerID, AsyncLock]
ConnEvents: Table[ConnEventKind, HashSet[ConnEventHandler]] ConnEvents: Table[ConnEventKind, HashSet[ConnEventHandler]]
pubsubMonitors: Table[PeerId, Future[void]]
proc newNoPubSubException(): ref NoPubSubException {.inline.} =
result = newException(NoPubSubException, "no pubsub provided!")
proc addConnEventHandler*(s: Switch, proc addConnEventHandler*(s: Switch,
handler: ConnEventHandler, kind: ConnEventKind) = handler: ConnEventHandler, kind: ConnEventKind) =
@ -110,23 +104,6 @@ proc triggerConnEvent(s: Switch, peerId: PeerID, event: ConnEvent) {.async, gcsa
warn "exception in trigger ConnEvents", exc = exc.msg warn "exception in trigger ConnEvents", exc = exc.msg
proc disconnect*(s: Switch, peerId: PeerID) {.async, gcsafe.} proc disconnect*(s: Switch, peerId: PeerID) {.async, gcsafe.}
proc subscribePeer*(s: Switch, peerId: PeerID) {.async, gcsafe.}
proc subscribePeerInternal(s: Switch, peerId: PeerID) {.async, gcsafe.}
proc cleanupPubSubPeer(s: Switch, conn: Connection) {.async.} =
try:
await conn.closeEvent.wait()
trace "about to cleanup pubsub peer"
if s.pubSub.isSome:
let fut = s.pubsubMonitors.getOrDefault(conn.peerInfo.peerId)
if not(isNil(fut)) and not(fut.finished):
fut.cancel()
await s.pubSub.get().unsubscribePeer(conn.peerInfo)
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception cleaning pubsub peer", exc = exc.msg
proc isConnected*(s: Switch, peerId: PeerID): bool = proc isConnected*(s: Switch, peerId: PeerID): bool =
## returns true if the peer has one or more ## returns true if the peer has one or more
@ -294,7 +271,8 @@ proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} =
proc internalConnect(s: Switch, proc internalConnect(s: Switch,
peerId: PeerID, peerId: PeerID,
addrs: seq[MultiAddress]): Future[Connection] {.async.} = addrs: seq[MultiAddress]): Future[Connection] {.async.} =
logScope: peer = peerId logScope:
peer = peerId
if s.peerInfo.peerId == peerId: if s.peerInfo.peerId == peerId:
raise newException(CatchableError, "can't dial self!") raise newException(CatchableError, "can't dial self!")
@ -352,12 +330,12 @@ proc internalConnect(s: Switch,
libp2p_failed_upgrade.inc() libp2p_failed_upgrade.inc()
raise exc raise exc
doAssert not isNil(upgraded), "checked in upgradeOutgoing" doAssert not isNil(upgraded), "connection died after upgradeOutgoing"
s.connManager.storeOutgoing(upgraded) s.connManager.storeOutgoing(upgraded)
conn = upgraded conn = upgraded
trace "dial successful", trace "dial successful",
oid = $conn.oid, oid = $upgraded.oid,
peerInfo = shortLog(upgraded.peerInfo) peerInfo = shortLog(upgraded.peerInfo)
break break
finally: finally:
@ -380,14 +358,31 @@ proc internalConnect(s: Switch,
# unworthy and disconnects it # unworthy and disconnects it
raise newException(CatchableError, "Connection closed during handshake") raise newException(CatchableError, "Connection closed during handshake")
asyncCheck s.cleanupPubSubPeer(conn)
asyncCheck s.subscribePeer(peerId)
return conn return conn
proc connect*(s: Switch, peerId: PeerID, addrs: seq[MultiAddress]) {.async.} = proc connect*(s: Switch, peerId: PeerID, addrs: seq[MultiAddress]) {.async.} =
discard await s.internalConnect(peerId, addrs) discard await s.internalConnect(peerId, addrs)
proc negotiateStream(s: Switch, stream: Connection, proto: string): Future[Connection] {.async.} =
trace "Attempting to select remote", proto = proto,
streamOid = $stream.oid,
oid = $stream.oid
if not await s.ms.select(stream, proto):
await stream.close()
raise newException(CatchableError, "Unable to select sub-protocol" & proto)
return stream
proc dial*(s: Switch,
peerId: PeerID,
proto: string): Future[Connection] {.async.} =
let stream = await s.connmanager.getMuxedStream(peerId)
if stream.isNil:
raise newException(CatchableError, "Couldn't get muxed stream")
return await s.negotiateStream(stream, proto)
proc dial*(s: Switch, proc dial*(s: Switch,
peerId: PeerID, peerId: PeerID,
addrs: seq[MultiAddress], addrs: seq[MultiAddress],
@ -408,14 +403,7 @@ proc dial*(s: Switch,
await conn.close() await conn.close()
raise newException(CatchableError, "Couldn't get muxed stream") raise newException(CatchableError, "Couldn't get muxed stream")
trace "Attempting to select remote", proto = proto, return await s.negotiateStream(stream, proto)
streamOid = $stream.oid,
oid = $conn.oid
if not await s.ms.select(stream, proto):
await stream.close()
raise newException(CatchableError, "Unable to select sub-protocol" & proto)
return stream
except CancelledError as exc: except CancelledError as exc:
trace "dial canceled" trace "dial canceled"
await cleanup() await cleanup()
@ -457,21 +445,12 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} =
s.peerInfo.addrs[i] = t.ma # update peer's address s.peerInfo.addrs[i] = t.ma # update peer's address
startFuts.add(server) startFuts.add(server)
if s.pubSub.isSome:
await s.pubSub.get().start()
debug "started libp2p node", peer = $s.peerInfo, addrs = s.peerInfo.addrs debug "started libp2p node", peer = $s.peerInfo, addrs = s.peerInfo.addrs
result = startFuts # listen for incoming connections result = startFuts # listen for incoming connections
proc stop*(s: Switch) {.async.} = proc stop*(s: Switch) {.async.} =
trace "stopping switch" trace "stopping switch"
# we want to report errors but we do not want to fail
# or crash here, cos we need to clean possibly MANY items
# and any following conn/transport won't be cleaned up
if s.pubSub.isSome:
await s.pubSub.get().stop()
# close and cleanup all connections # close and cleanup all connections
await s.connManager.close() await s.connManager.close()
@ -485,139 +464,6 @@ proc stop*(s: Switch) {.async.} =
trace "switch stopped" trace "switch stopped"
proc subscribePeerInternal(s: Switch, peerId: PeerID) {.async, gcsafe.} =
## Subscribe to pub sub peer
##
if s.pubSub.isSome and not s.pubSub.get().connected(peerId):
trace "about to subscribe to pubsub peer", peer = peerId
var stream: Connection
try:
stream = await s.connManager.getMuxedStream(peerId)
if isNil(stream):
trace "unable to subscribe to peer", peer = peerId
return
if not await s.ms.select(stream, s.pubSub.get().codec):
if not(isNil(stream)):
trace "couldn't select pubsub", codec = s.pubSub.get().codec
await stream.close()
return
s.pubSub.get().subscribePeer(stream)
await stream.closeEvent.wait()
except CancelledError as exc:
if not(isNil(stream)):
await stream.close()
raise exc
except CatchableError as exc:
trace "exception in subscribe to peer", peer = peerId,
exc = exc.msg
if not(isNil(stream)):
await stream.close()
proc pubsubMonitor(s: Switch, peerId: PeerID) {.async.} =
## while peer connected maintain a
## pubsub connection as well
##
while s.isConnected(peerId):
try:
trace "subscribing to pubsub peer", peer = peerId
await s.subscribePeerInternal(peerId)
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception in pubsub monitor", peer = peerId, exc = exc.msg
finally:
trace "sleeping before trying pubsub peer", peer = peerId
await sleepAsync(1.seconds) # allow the peer to cooldown
trace "exiting pubsub monitor", peer = peerId
proc subscribePeer*(s: Switch, peerId: PeerID): Future[void] {.gcsafe.} =
## Waits until ``server`` is not closed.
##
var retFuture = newFuture[void]("stream.transport.server.join")
let pubsubFut = s.pubsubMonitors.mgetOrPut(
peerId, s.pubsubMonitor(peerId))
proc continuation(udata: pointer) {.gcsafe.} =
retFuture.complete()
proc cancel(udata: pointer) {.gcsafe.} =
pubsubFut.removeCallback(continuation, cast[pointer](retFuture))
if not(pubsubFut.finished()):
pubsubFut.addCallback(continuation, cast[pointer](retFuture))
retFuture.cancelCallback = cancel
else:
retFuture.complete()
return retFuture
proc subscribe*(s: Switch, topic: string,
handler: TopicHandler) {.async.} =
## subscribe to a pubsub topic
##
if s.pubSub.isNone:
raise newNoPubSubException()
await s.pubSub.get().subscribe(topic, handler)
proc unsubscribe*(s: Switch, topics: seq[TopicPair]) {.async.} =
## unsubscribe from topics
##
if s.pubSub.isNone:
raise newNoPubSubException()
await s.pubSub.get().unsubscribe(topics)
proc unsubscribeAll*(s: Switch, topic: string) {.async.} =
## unsubscribe from topics
if s.pubSub.isNone:
raise newNoPubSubException()
await s.pubSub.get().unsubscribeAll(topic)
proc publish*(s: Switch,
topic: string,
data: seq[byte],
timeout: Duration = InfiniteDuration): Future[int] {.async.} =
## pubslish to pubsub topic
##
if s.pubSub.isNone:
raise newNoPubSubException()
return await s.pubSub.get().publish(topic, data, timeout)
proc addValidator*(s: Switch,
topics: varargs[string],
hook: ValidatorHandler) =
## add validator
##
if s.pubSub.isNone:
raise newNoPubSubException()
s.pubSub.get().addValidator(topics, hook)
proc removeValidator*(s: Switch,
topics: varargs[string],
hook: ValidatorHandler) =
## pubslish to pubsub topic
##
if s.pubSub.isNone:
raise newNoPubSubException()
s.pubSub.get().removeValidator(topics, hook)
proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} = proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} =
var stream = await muxer.newStream() var stream = await muxer.newStream()
defer: defer:
@ -653,10 +499,6 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} =
asyncCheck s.triggerConnEvent( asyncCheck s.triggerConnEvent(
peerId, ConnEvent(kind: ConnEventKind.Connected, incoming: true)) peerId, ConnEvent(kind: ConnEventKind.Connected, incoming: true))
# try establishing a pubsub connection
asyncCheck s.cleanupPubSubPeer(muxer.connection)
asyncCheck s.subscribePeer(peerId)
except CancelledError as exc: except CancelledError as exc:
await muxer.close() await muxer.close()
raise exc raise exc
@ -669,8 +511,7 @@ proc newSwitch*(peerInfo: PeerInfo,
transports: seq[Transport], transports: seq[Transport],
identity: Identify, identity: Identify,
muxers: Table[string, MuxerProvider], muxers: Table[string, MuxerProvider],
secureManagers: openarray[Secure] = [], secureManagers: openarray[Secure] = []): Switch =
pubSub: Option[PubSub] = none(PubSub)): Switch =
if secureManagers.len == 0: if secureManagers.len == 0:
raise (ref CatchableError)(msg: "Provide at least one secure manager") raise (ref CatchableError)(msg: "Provide at least one secure manager")
@ -703,24 +544,21 @@ proc newSwitch*(peerInfo: PeerInfo,
val.muxerHandler = proc(muxer: Muxer): Future[void] = val.muxerHandler = proc(muxer: Muxer): Future[void] =
s.muxerHandler(muxer) s.muxerHandler(muxer)
if pubSub.isSome: proc isConnected*(s: Switch, peerInfo: PeerInfo): bool
result.pubSub = pubSub {.deprecated: "Use PeerID version".} =
result.mount(pubSub.get())
proc isConnected*(s: Switch, peerInfo: PeerInfo): bool {.deprecated: "Use PeerID version".} =
not isNil(peerInfo) and isConnected(s, peerInfo.peerId) not isNil(peerInfo) and isConnected(s, peerInfo.peerId)
proc disconnect*(s: Switch, peerInfo: PeerInfo): Future[void] {.deprecated: "Use PeerID version", gcsafe.} = proc disconnect*(s: Switch, peerInfo: PeerInfo): Future[void]
{.deprecated: "Use PeerID version", gcsafe.} =
disconnect(s, peerInfo.peerId) disconnect(s, peerInfo.peerId)
proc connect*(s: Switch, peerInfo: PeerInfo): Future[void] {.deprecated: "Use PeerID version".} = proc connect*(s: Switch, peerInfo: PeerInfo): Future[void]
{.deprecated: "Use PeerID version".} =
connect(s, peerInfo.peerId, peerInfo.addrs) connect(s, peerInfo.peerId, peerInfo.addrs)
proc dial*(s: Switch, proc dial*(s: Switch,
peerInfo: PeerInfo, peerInfo: PeerInfo,
proto: string): proto: string):
Future[Connection] {.deprecated: "Use PeerID version".} = Future[Connection]
{.deprecated: "Use PeerID version".} =
dial(s, peerInfo.peerId, peerInfo.addrs, proto) dial(s, peerInfo.peerId, peerInfo.addrs, proto)
proc subscribePeer*(s: Switch, peerInfo: PeerInfo): Future[void] {.deprecated: "Use PeerID version", gcsafe.} =
subscribePeer(s, peerInfo.peerId)

View File

@ -29,9 +29,9 @@ proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} =
# turn things deterministic # turn things deterministic
# this is for testing purposes only # this is for testing purposes only
var ceil = 15 var ceil = 15
let fsub = cast[FloodSub](sender.pubSub.get()) let fsub = cast[FloodSub](sender)
while not fsub.floodsub.hasKey(key) or while not fsub.floodsub.hasKey(key) or
not fsub.floodsub.hasPeerID(key, receiver.peerInfo.id): not fsub.floodsub.hasPeerID(key, receiver.peerInfo.peerId):
await sleepAsync(100.millis) await sleepAsync(100.millis)
dec ceil dec ceil
doAssert(ceil > 0, "waitSub timeout!") doAssert(ceil > 0, "waitSub timeout!")
@ -43,7 +43,7 @@ suite "FloodSub":
check tracker.isLeaked() == false check tracker.isLeaked() == false
test "FloodSub basic publish/subscribe A -> B": test "FloodSub basic publish/subscribe A -> B":
proc runTests(): Future[bool] {.async.} = proc runTests() {.async.} =
var completionFut = newFuture[bool]() var completionFut = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar" check topic == "foobar"
@ -51,19 +51,32 @@ suite "FloodSub":
let let
nodes = generateNodes(2) nodes = generateNodes(2)
# start switches
nodesFut = await allFinished( nodesFut = await allFinished(
nodes[0].start(), nodes[0].switch.start(),
nodes[1].start() nodes[1].switch.start(),
) )
let subscribes = await subscribeNodes(nodes) # start pubsub
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
))
await subscribeNodes(nodes)
await nodes[1].subscribe("foobar", handler) await nodes[1].subscribe("foobar", handler)
await waitSub(nodes[0], nodes[1], "foobar") await waitSub(nodes[0], nodes[1], "foobar")
check (await nodes[0].publish("foobar", "Hello!".toBytes())) > 0 check (await nodes[0].publish("foobar", "Hello!".toBytes())) > 0
check (await completionFut.wait(5.seconds)) == true
result = await completionFut.wait(5.seconds) await allFuturesThrowing(
nodes[0].switch.stop(),
nodes[1].switch.stop()
)
await allFuturesThrowing( await allFuturesThrowing(
nodes[0].stop(), nodes[0].stop(),
@ -71,53 +84,80 @@ suite "FloodSub":
) )
await allFuturesThrowing(nodesFut.concat()) await allFuturesThrowing(nodesFut.concat())
await allFuturesThrowing(subscribes)
check: waitFor(runTests())
waitFor(runTests()) == true
test "FloodSub basic publish/subscribe B -> A": test "FloodSub basic publish/subscribe B -> A":
proc runTests(): Future[bool] {.async.} = proc runTests() {.async.} =
var completionFut = newFuture[bool]() var completionFut = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar" check topic == "foobar"
completionFut.complete(true) completionFut.complete(true)
var nodes = generateNodes(2) let
var awaiters: seq[Future[void]] nodes = generateNodes(2)
awaiters.add((await nodes[0].start()))
awaiters.add((await nodes[1].start()))
let subscribes = await subscribeNodes(nodes) # start switches
nodesFut = await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start(),
)
# start pubsubcon
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
))
await subscribeNodes(nodes)
await nodes[0].subscribe("foobar", handler) await nodes[0].subscribe("foobar", handler)
await waitSub(nodes[1], nodes[0], "foobar") await waitSub(nodes[1], nodes[0], "foobar")
check (await nodes[1].publish("foobar", "Hello!".toBytes())) > 0 check (await nodes[1].publish("foobar", "Hello!".toBytes())) > 0
result = await completionFut.wait(5.seconds) check (await completionFut.wait(5.seconds)) == true
await allFuturesThrowing(nodes[0].stop(), nodes[1].stop()) await allFuturesThrowing(
nodes[0].switch.stop(),
nodes[1].switch.stop()
)
await allFuturesThrowing(subscribes) await allFuturesThrowing(
await allFuturesThrowing(awaiters) nodes[0].stop(),
nodes[1].stop()
)
check: await allFuturesThrowing(nodesFut)
waitFor(runTests()) == true
waitFor(runTests())
test "FloodSub validation should succeed": test "FloodSub validation should succeed":
proc runTests(): Future[bool] {.async.} = proc runTests() {.async.} =
var handlerFut = newFuture[bool]() var handlerFut = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar" check topic == "foobar"
handlerFut.complete(true) handlerFut.complete(true)
var nodes = generateNodes(2) let
var awaiters: seq[Future[void]] nodes = generateNodes(2)
awaiters.add((await nodes[0].start()))
awaiters.add((await nodes[1].start())) # start switches
nodesFut = await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start(),
)
# start pubsubcon
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
))
await subscribeNodes(nodes)
let subscribes = await subscribeNodes(nodes)
await nodes[1].subscribe("foobar", handler) await nodes[1].subscribe("foobar", handler)
await waitSub(nodes[0], nodes[1], "foobar") await waitSub(nodes[0], nodes[1], "foobar")
@ -131,30 +171,44 @@ suite "FloodSub":
nodes[1].addValidator("foobar", validator) nodes[1].addValidator("foobar", validator)
check (await nodes[0].publish("foobar", "Hello!".toBytes())) > 0 check (await nodes[0].publish("foobar", "Hello!".toBytes())) > 0
check (await handlerFut) == true check (await handlerFut) == true
await allFuturesThrowing(
nodes[0].switch.stop(),
nodes[1].switch.stop()
)
await allFuturesThrowing( await allFuturesThrowing(
nodes[0].stop(), nodes[0].stop(),
nodes[1].stop()) nodes[1].stop()
)
await allFuturesThrowing(subscribes) await allFuturesThrowing(nodesFut)
await allFuturesThrowing(awaiters)
result = true
check: waitFor(runTests())
waitFor(runTests()) == true
test "FloodSub validation should fail": test "FloodSub validation should fail":
proc runTests(): Future[bool] {.async.} = proc runTests() {.async.} =
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check false # if we get here, it should fail check false # if we get here, it should fail
var nodes = generateNodes(2) let
var awaiters: seq[Future[void]] nodes = generateNodes(2)
awaiters.add((await nodes[0].start()))
awaiters.add((await nodes[1].start()))
let subscribes = await subscribeNodes(nodes) # start switches
nodesFut = await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start(),
)
# start pubsubcon
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
))
await subscribeNodes(nodes)
await nodes[1].subscribe("foobar", handler) await nodes[1].subscribe("foobar", handler)
await waitSub(nodes[0], nodes[1], "foobar") await waitSub(nodes[0], nodes[1], "foobar")
@ -168,30 +222,44 @@ suite "FloodSub":
discard await nodes[0].publish("foobar", "Hello!".toBytes()) discard await nodes[0].publish("foobar", "Hello!".toBytes())
await allFuturesThrowing(
nodes[0].switch.stop(),
nodes[1].switch.stop()
)
await allFuturesThrowing( await allFuturesThrowing(
nodes[0].stop(), nodes[0].stop(),
nodes[1].stop()) nodes[1].stop()
)
await allFuturesThrowing(subscribes) await allFuturesThrowing(nodesFut)
await allFuturesThrowing(awaiters)
result = true
check: waitFor(runTests())
waitFor(runTests()) == true
test "FloodSub validation one fails and one succeeds": test "FloodSub validation one fails and one succeeds":
proc runTests(): Future[bool] {.async.} = proc runTests() {.async.} =
var handlerFut = newFuture[bool]() var handlerFut = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foo" check topic == "foo"
handlerFut.complete(true) handlerFut.complete(true)
var nodes = generateNodes(2) let
var awaiters: seq[Future[void]] nodes = generateNodes(2)
awaiters.add((await nodes[0].start()))
awaiters.add((await nodes[1].start()))
let subscribes = await subscribeNodes(nodes) # start switches
nodesFut = await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start(),
)
# start pubsubcon
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
))
await subscribeNodes(nodes)
await nodes[1].subscribe("foo", handler) await nodes[1].subscribe("foo", handler)
await waitSub(nodes[0], nodes[1], "foo") await waitSub(nodes[0], nodes[1], "foo")
await nodes[1].subscribe("bar", handler) await nodes[1].subscribe("bar", handler)
@ -210,57 +278,21 @@ suite "FloodSub":
check (await nodes[0].publish("bar", "Hello!".toBytes())) > 0 check (await nodes[0].publish("bar", "Hello!".toBytes())) > 0
await allFuturesThrowing( await allFuturesThrowing(
nodes[0].stop(), nodes[0].switch.stop(),
nodes[1].stop()) nodes[1].switch.stop()
)
await allFuturesThrowing(subscribes)
await allFuturesThrowing(awaiters)
result = true
check:
waitFor(runTests()) == true
test "FloodSub publish should fail on timeout":
proc runTests(): Future[bool] {.async.} =
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
discard
var nodes = generateNodes(2)
var awaiters: seq[Future[void]]
awaiters.add((await nodes[0].start()))
awaiters.add((await nodes[1].start()))
let subscribes = await subscribeNodes(nodes)
await nodes[1].subscribe("foobar", handler)
await waitSub(nodes[0], nodes[1], "foobar")
let pubsub = nodes[0].pubSub.get()
let peer = pubsub.peers[nodes[1].peerInfo.id]
peer.conn = Connection(newBufferStream(
proc (data: seq[byte]) {.async, gcsafe.} =
await sleepAsync(10.seconds)
,size = 0))
let in10millis = Moment.fromNow(10.millis)
let sent = await nodes[0].publish("foobar", "Hello!".toBytes(), 10.millis)
check Moment.now() >= in10millis
check sent == 0
await allFuturesThrowing( await allFuturesThrowing(
nodes[0].stop(), nodes[0].stop(),
nodes[1].stop()) nodes[1].stop()
)
await allFuturesThrowing(subscribes) await allFuturesThrowing(nodesFut)
await allFuturesThrowing(awaiters)
result = true
check: waitFor(runTests())
waitFor(runTests()) == true
test "FloodSub multiple peers, no self trigger": test "FloodSub multiple peers, no self trigger":
proc runTests(): Future[bool] {.async.} = proc runTests() {.async.} =
var runs = 10 var runs = 10
var futs = newSeq[(Future[void], TopicHandler, ref int)](runs) var futs = newSeq[(Future[void], TopicHandler, ref int)](runs)
@ -279,15 +311,12 @@ suite "FloodSub":
counter counter
) )
var nodes: seq[Switch] = newSeq[Switch]() let
for i in 0..<runs: nodes = generateNodes(runs, triggerSelf = false)
nodes.add newStandardSwitch(secureManagers = [SecureProtocol.Noise]) nodesFut = nodes.mapIt(it.switch.start())
var awaitters: seq[Future[void]] await allFuturesThrowing(nodes.mapIt(it.start()))
for i in 0..<runs: await subscribeNodes(nodes)
awaitters.add(await nodes[i].start())
let subscribes = await subscribeNodes(nodes)
for i in 0..<runs: for i in 0..<runs:
await nodes[i].subscribe("foobar", futs[i][1]) await nodes[i].subscribe("foobar", futs[i][1])
@ -305,17 +334,18 @@ suite "FloodSub":
await allFuturesThrowing(pubs) await allFuturesThrowing(pubs)
await allFuturesThrowing(futs.mapIt(it[0])) await allFuturesThrowing(futs.mapIt(it[0]))
await allFuturesThrowing(nodes.mapIt(it.stop())) await allFuturesThrowing(
nodes.mapIt(
allFutures(
it.stop(),
it.switch.stop())))
await allFuturesThrowing(subscribes) await allFuturesThrowing(nodesFut)
await allFuturesThrowing(awaitters)
result = true waitFor(runTests())
check:
waitFor(runTests()) == true
test "FloodSub multiple peers, with self trigger": test "FloodSub multiple peers, with self trigger":
proc runTests(): Future[bool] {.async.} = proc runTests() {.async.} =
var runs = 10 var runs = 10
var futs = newSeq[(Future[void], TopicHandler, ref int)](runs) var futs = newSeq[(Future[void], TopicHandler, ref int)](runs)
@ -329,21 +359,17 @@ suite "FloodSub":
(proc(topic: string, data: seq[byte]) {.async, gcsafe.} = (proc(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar" check topic == "foobar"
inc counter[] inc counter[]
if counter[] == runs: if counter[] == runs - 1:
fut.complete()), fut.complete()),
counter counter
) )
var nodes: seq[Switch] = newSeq[Switch]() let
for i in 0..<runs: nodes = generateNodes(runs, triggerSelf = true)
nodes.add newStandardSwitch(triggerSelf = true, secureManagers = [SecureProtocol.Secio]) nodesFut = nodes.mapIt(it.switch.start())
await allFuturesThrowing(nodes.mapIt(it.start()))
var awaitters: seq[Future[void]] await subscribeNodes(nodes)
for i in 0..<runs:
awaitters.add(await nodes[i].start())
let subscribes = await subscribeNodes(nodes)
for i in 0..<runs: for i in 0..<runs:
await nodes[i].subscribe("foobar", futs[i][1]) await nodes[i].subscribe("foobar", futs[i][1])
@ -361,12 +387,12 @@ suite "FloodSub":
await allFuturesThrowing(pubs) await allFuturesThrowing(pubs)
await allFuturesThrowing(futs.mapIt(it[0])) await allFuturesThrowing(futs.mapIt(it[0]))
await allFuturesThrowing(nodes.mapIt(it.stop())) await allFuturesThrowing(
nodes.mapIt(
allFutures(
it.stop(),
it.switch.stop())))
await allFuturesThrowing(subscribes) await allFuturesThrowing(nodesFut)
await allFuturesThrowing(awaitters)
result = true waitFor(runTests())
check:
waitFor(runTests()) == true

View File

@ -4,6 +4,7 @@ include ../../libp2p/protocols/pubsub/gossipsub
import unittest, bearssl import unittest, bearssl
import stew/byteutils import stew/byteutils
import ../../libp2p/standard_setup
import ../../libp2p/errors import ../../libp2p/errors
import ../../libp2p/crypto/crypto import ../../libp2p/crypto/crypto
import ../../libp2p/stream/bufferstream import ../../libp2p/stream/bufferstream
@ -26,7 +27,7 @@ suite "GossipSub internal":
test "`rebalanceMesh` Degree Lo": test "`rebalanceMesh` Degree Lo":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
let gossipSub = newPubSub(TestGossipSub, randomPeerInfo()) let gossipSub = TestGossipSub.init(newStandardSwitch())
let topic = "foobar" let topic = "foobar"
gossipSub.mesh[topic] = initHashSet[PubSubPeer]() gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
@ -38,9 +39,8 @@ suite "GossipSub internal":
conns &= conn conns &= conn
let peerInfo = randomPeerInfo() let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
let peer = newPubSubPeer(peerInfo, GossipSubCodec) let peer = newPubSubPeer(peerInfo.peerId, gossipSub.switch, GossipSubCodec)
peer.conn = conn gossipSub.peers[peerInfo.peerId] = peer
gossipSub.peers[peerInfo.id] = peer
gossipSub.mesh[topic].incl(peer) gossipSub.mesh[topic].incl(peer)
check gossipSub.peers.len == 15 check gossipSub.peers.len == 15
@ -48,7 +48,7 @@ suite "GossipSub internal":
check gossipSub.mesh[topic].len == GossipSubD check gossipSub.mesh[topic].len == GossipSubD
await allFuturesThrowing(conns.mapIt(it.close())) await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
result = true result = true
check: check:
@ -56,7 +56,7 @@ suite "GossipSub internal":
test "`rebalanceMesh` Degree Hi": test "`rebalanceMesh` Degree Hi":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
let gossipSub = newPubSub(TestGossipSub, randomPeerInfo()) let gossipSub = TestGossipSub.init(newStandardSwitch())
let topic = "foobar" let topic = "foobar"
gossipSub.mesh[topic] = initHashSet[PubSubPeer]() gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
@ -69,9 +69,8 @@ suite "GossipSub internal":
conns &= conn conns &= conn
let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
let peer = newPubSubPeer(peerInfo, GossipSubCodec) let peer = newPubSubPeer(peerInfo.peerId, gossipsub.switch, GossipSubCodec)
peer.conn = conn gossipSub.peers[peerInfo.peerId] = peer
gossipSub.peers[peerInfo.id] = peer
gossipSub.mesh[topic].incl(peer) gossipSub.mesh[topic].incl(peer)
check gossipSub.mesh[topic].len == 15 check gossipSub.mesh[topic].len == 15
@ -79,6 +78,7 @@ suite "GossipSub internal":
check gossipSub.mesh[topic].len == GossipSubD check gossipSub.mesh[topic].len == GossipSubD
await allFuturesThrowing(conns.mapIt(it.close())) await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
result = true result = true
@ -87,7 +87,7 @@ suite "GossipSub internal":
test "`replenishFanout` Degree Lo": test "`replenishFanout` Degree Lo":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
let gossipSub = newPubSub(TestGossipSub, randomPeerInfo()) let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
discard discard
@ -101,7 +101,7 @@ suite "GossipSub internal":
conns &= conn conns &= conn
var peerInfo = randomPeerInfo() var peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
let peer = newPubSubPeer(peerInfo, GossipSubCodec) let peer = newPubSubPeer(peerInfo.peerId, gossipsub.switch, GossipSubCodec)
peer.handler = handler peer.handler = handler
gossipSub.gossipsub[topic].incl(peer) gossipSub.gossipsub[topic].incl(peer)
@ -110,6 +110,7 @@ suite "GossipSub internal":
check gossipSub.fanout[topic].len == GossipSubD check gossipSub.fanout[topic].len == GossipSubD
await allFuturesThrowing(conns.mapIt(it.close())) await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
result = true result = true
@ -118,7 +119,7 @@ suite "GossipSub internal":
test "`dropFanoutPeers` drop expired fanout topics": test "`dropFanoutPeers` drop expired fanout topics":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
let gossipSub = newPubSub(TestGossipSub, randomPeerInfo()) let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
discard discard
@ -134,7 +135,7 @@ suite "GossipSub internal":
conns &= conn conns &= conn
let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
let peer = newPubSubPeer(peerInfo, GossipSubCodec) let peer = newPubSubPeer(peerInfo.peerId, gossipsub.switch, GossipSubCodec)
peer.handler = handler peer.handler = handler
gossipSub.fanout[topic].incl(peer) gossipSub.fanout[topic].incl(peer)
@ -144,6 +145,7 @@ suite "GossipSub internal":
check topic notin gossipSub.fanout check topic notin gossipSub.fanout
await allFuturesThrowing(conns.mapIt(it.close())) await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
result = true result = true
@ -152,7 +154,7 @@ suite "GossipSub internal":
test "`dropFanoutPeers` leave unexpired fanout topics": test "`dropFanoutPeers` leave unexpired fanout topics":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
let gossipSub = newPubSub(TestGossipSub, randomPeerInfo()) let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
discard discard
@ -171,7 +173,7 @@ suite "GossipSub internal":
conns &= conn conns &= conn
let peerInfo = randomPeerInfo() let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
let peer = newPubSubPeer(peerInfo, GossipSubCodec) let peer = newPubSubPeer(peerInfo.peerId, gossipsub.switch, GossipSubCodec)
peer.handler = handler peer.handler = handler
gossipSub.fanout[topic1].incl(peer) gossipSub.fanout[topic1].incl(peer)
gossipSub.fanout[topic2].incl(peer) gossipSub.fanout[topic2].incl(peer)
@ -184,6 +186,7 @@ suite "GossipSub internal":
check topic2 in gossipSub.fanout check topic2 in gossipSub.fanout
await allFuturesThrowing(conns.mapIt(it.close())) await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
result = true result = true
@ -192,7 +195,7 @@ suite "GossipSub internal":
test "`getGossipPeers` - should gather up to degree D non intersecting peers": test "`getGossipPeers` - should gather up to degree D non intersecting peers":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
let gossipSub = newPubSub(TestGossipSub, randomPeerInfo()) let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
discard discard
@ -209,7 +212,7 @@ suite "GossipSub internal":
conns &= conn conns &= conn
let peerInfo = randomPeerInfo() let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
let peer = newPubSubPeer(peerInfo, GossipSubCodec) let peer = newPubSubPeer(peerInfo.peerId, gossipsub.switch, GossipSubCodec)
peer.handler = handler peer.handler = handler
if i mod 2 == 0: if i mod 2 == 0:
gossipSub.fanout[topic].incl(peer) gossipSub.fanout[topic].incl(peer)
@ -222,7 +225,7 @@ suite "GossipSub internal":
conns &= conn conns &= conn
let peerInfo = randomPeerInfo() let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
let peer = newPubSubPeer(peerInfo, GossipSubCodec) let peer = newPubSubPeer(peerInfo.peerId, gossipsub.switch, GossipSubCodec)
peer.handler = handler peer.handler = handler
gossipSub.gossipsub[topic].incl(peer) gossipSub.gossipsub[topic].incl(peer)
@ -244,10 +247,11 @@ suite "GossipSub internal":
let peers = gossipSub.getGossipPeers() let peers = gossipSub.getGossipPeers()
check peers.len == GossipSubD check peers.len == GossipSubD
for p in peers.keys: for p in peers.keys:
check not gossipSub.fanout.hasPeerID(topic, p) check not gossipSub.fanout.hasPeerID(topic, p.peerId)
check not gossipSub.mesh.hasPeerID(topic, p) check not gossipSub.mesh.hasPeerID(topic, p.peerId)
await allFuturesThrowing(conns.mapIt(it.close())) await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
result = true result = true
@ -256,7 +260,7 @@ suite "GossipSub internal":
test "`getGossipPeers` - should not crash on missing topics in mesh": test "`getGossipPeers` - should not crash on missing topics in mesh":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
let gossipSub = newPubSub(TestGossipSub, randomPeerInfo()) let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
discard discard
@ -270,7 +274,7 @@ suite "GossipSub internal":
conns &= conn conns &= conn
let peerInfo = randomPeerInfo() let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
let peer = newPubSubPeer(peerInfo, GossipSubCodec) let peer = newPubSubPeer(peerInfo.peerId, gossipsub.switch, GossipSubCodec)
peer.handler = handler peer.handler = handler
if i mod 2 == 0: if i mod 2 == 0:
gossipSub.fanout[topic].incl(peer) gossipSub.fanout[topic].incl(peer)
@ -292,6 +296,7 @@ suite "GossipSub internal":
check peers.len == GossipSubD check peers.len == GossipSubD
await allFuturesThrowing(conns.mapIt(it.close())) await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
result = true result = true
@ -300,7 +305,7 @@ suite "GossipSub internal":
test "`getGossipPeers` - should not crash on missing topics in fanout": test "`getGossipPeers` - should not crash on missing topics in fanout":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
let gossipSub = newPubSub(TestGossipSub, randomPeerInfo()) let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
discard discard
@ -314,7 +319,7 @@ suite "GossipSub internal":
conns &= conn conns &= conn
let peerInfo = randomPeerInfo() let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
let peer = newPubSubPeer(peerInfo, GossipSubCodec) let peer = newPubSubPeer(peerInfo.peerId, gossipSub.switch, GossipSubCodec)
peer.handler = handler peer.handler = handler
if i mod 2 == 0: if i mod 2 == 0:
gossipSub.mesh[topic].incl(peer) gossipSub.mesh[topic].incl(peer)
@ -336,6 +341,7 @@ suite "GossipSub internal":
check peers.len == GossipSubD check peers.len == GossipSubD
await allFuturesThrowing(conns.mapIt(it.close())) await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
result = true result = true
@ -344,7 +350,7 @@ suite "GossipSub internal":
test "`getGossipPeers` - should not crash on missing topics in gossip": test "`getGossipPeers` - should not crash on missing topics in gossip":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
let gossipSub = newPubSub(TestGossipSub, randomPeerInfo()) let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
discard discard
@ -358,7 +364,7 @@ suite "GossipSub internal":
conns &= conn conns &= conn
let peerInfo = randomPeerInfo() let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
let peer = newPubSubPeer(peerInfo, GossipSubCodec) let peer = newPubSubPeer(peerInfo.peerId, gossipSub.switch, GossipSubCodec)
peer.handler = handler peer.handler = handler
if i mod 2 == 0: if i mod 2 == 0:
gossipSub.mesh[topic].incl(peer) gossipSub.mesh[topic].incl(peer)
@ -380,6 +386,7 @@ suite "GossipSub internal":
check peers.len == 0 check peers.len == 0
await allFuturesThrowing(conns.mapIt(it.close())) await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
result = true result = true

View File

@ -33,13 +33,13 @@ proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} =
# this is for testing purposes only # this is for testing purposes only
# peers can be inside `mesh` and `fanout`, not just `gossipsub` # peers can be inside `mesh` and `fanout`, not just `gossipsub`
var ceil = 15 var ceil = 15
let fsub = GossipSub(sender.pubSub.get()) let fsub = GossipSub(sender)
while (not fsub.gossipsub.hasKey(key) or while (not fsub.gossipsub.hasKey(key) or
not fsub.gossipsub.hasPeerID(key, receiver.peerInfo.id)) and not fsub.gossipsub.hasPeerID(key, receiver.peerInfo.peerId)) and
(not fsub.mesh.hasKey(key) or (not fsub.mesh.hasKey(key) or
not fsub.mesh.hasPeerID(key, receiver.peerInfo.id)) and not fsub.mesh.hasPeerID(key, receiver.peerInfo.peerId)) and
(not fsub.fanout.hasKey(key) or (not fsub.fanout.hasKey(key) or
not fsub.fanout.hasPeerID(key , receiver.peerInfo.id)): not fsub.fanout.hasPeerID(key , receiver.peerInfo.peerId)):
trace "waitSub sleeping..." trace "waitSub sleeping..."
await sleepAsync(1.seconds) await sleepAsync(1.seconds)
dec ceil dec ceil
@ -63,18 +63,29 @@ suite "GossipSub":
check tracker.isLeaked() == false check tracker.isLeaked() == false
test "GossipSub validation should succeed": test "GossipSub validation should succeed":
proc runTests(): Future[bool] {.async.} = proc runTests() {.async.} =
var handlerFut = newFuture[bool]() var handlerFut = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar" check topic == "foobar"
handlerFut.complete(true) handlerFut.complete(true)
var nodes = generateNodes(2, true) let
var awaiters: seq[Future[void]] nodes = generateNodes(2, gossip = true)
awaiters.add((await nodes[0].start()))
awaiters.add((await nodes[1].start()))
let subscribes = await subscribeNodes(nodes) # start switches
nodesFut = await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start(),
)
# start pubsub
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
))
await subscribeNodes(nodes)
await nodes[0].subscribe("foobar", handler) await nodes[0].subscribe("foobar", handler)
await nodes[1].subscribe("foobar", handler) await nodes[1].subscribe("foobar", handler)
@ -90,35 +101,44 @@ suite "GossipSub":
nodes[1].addValidator("foobar", validator) nodes[1].addValidator("foobar", validator)
tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1 tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1
result = (await validatorFut) and (await handlerFut) check (await validatorFut) and (await handlerFut)
let gossip1 = GossipSub(nodes[0].pubSub.get()) await allFuturesThrowing(
let gossip2 = GossipSub(nodes[1].pubSub.get()) nodes[0].switch.stop(),
check: nodes[1].switch.stop()
gossip1.mesh["foobar"].len == 1 and "foobar" notin gossip1.fanout )
gossip2.mesh["foobar"].len == 1 and "foobar" notin gossip2.fanout
await allFuturesThrowing( await allFuturesThrowing(
nodes[0].stop(), nodes[0].stop(),
nodes[1].stop()) nodes[1].stop()
)
await allFuturesThrowing(subscribes) await allFuturesThrowing(nodesFut.concat())
await allFuturesThrowing(awaiters)
check: waitFor(runTests())
waitFor(runTests()) == true
test "GossipSub validation should fail": test "GossipSub validation should fail":
proc runTests(): Future[bool] {.async.} = proc runTests() {.async.} =
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check false # if we get here, it should fail check false # if we get here, it should fail
var nodes = generateNodes(2, true) let
var awaiters: seq[Future[void]] nodes = generateNodes(2, gossip = true)
awaiters.add((await nodes[0].start()))
awaiters.add((await nodes[1].start()))
let subscribes = await subscribeNodes(nodes) # start switches
nodesFut = await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start(),
)
# start pubsub
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
))
await subscribeNodes(nodes)
await nodes[0].subscribe("foobar", handler) await nodes[0].subscribe("foobar", handler)
await nodes[1].subscribe("foobar", handler) await nodes[1].subscribe("foobar", handler)
@ -133,37 +153,54 @@ suite "GossipSub":
nodes[1].addValidator("foobar", validator) nodes[1].addValidator("foobar", validator)
tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1 tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1
result = await validatorFut check (await validatorFut) == true
let gossip1 = GossipSub(nodes[0])
let gossip2 = GossipSub(nodes[1])
let gossip1 = GossipSub(nodes[0].pubSub.get())
let gossip2 = GossipSub(nodes[1].pubSub.get())
check: check:
gossip1.mesh["foobar"].len == 1 and "foobar" notin gossip1.fanout gossip1.mesh["foobar"].len == 1 and "foobar" notin gossip1.fanout
gossip2.mesh["foobar"].len == 1 and "foobar" notin gossip2.fanout gossip2.mesh["foobar"].len == 1 and "foobar" notin gossip2.fanout
await allFuturesThrowing(
nodes[0].switch.stop(),
nodes[1].switch.stop()
)
await allFuturesThrowing( await allFuturesThrowing(
nodes[0].stop(), nodes[0].stop(),
nodes[1].stop()) nodes[1].stop()
)
await allFuturesThrowing(subscribes) await allFuturesThrowing(nodesFut.concat())
await allFuturesThrowing(awaiters)
check: waitFor(runTests())
waitFor(runTests()) == true
test "GossipSub validation one fails and one succeeds": test "GossipSub validation one fails and one succeeds":
proc runTests(): Future[bool] {.async.} = proc runTests() {.async.} =
var handlerFut = newFuture[bool]() var handlerFut = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foo" check topic == "foo"
handlerFut.complete(true) handlerFut.complete(true)
var nodes = generateNodes(2, true) let
var awaiters: seq[Future[void]] nodes = generateNodes(2, gossip = true)
awaiters.add((await nodes[0].start()))
awaiters.add((await nodes[1].start())) # start switches
nodesFut = await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start(),
)
# start pubsub
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
))
await subscribeNodes(nodes)
let subscribes = await subscribeNodes(nodes)
await nodes[1].subscribe("foo", handler) await nodes[1].subscribe("foo", handler)
await nodes[1].subscribe("bar", handler) await nodes[1].subscribe("bar", handler)
@ -182,10 +219,11 @@ suite "GossipSub":
tryPublish await nodes[0].publish("foo", "Hello!".toBytes()), 1 tryPublish await nodes[0].publish("foo", "Hello!".toBytes()), 1
tryPublish await nodes[0].publish("bar", "Hello!".toBytes()), 1 tryPublish await nodes[0].publish("bar", "Hello!".toBytes()), 1
result = ((await passed) and (await failed) and (await handlerFut)) check ((await passed) and (await failed) and (await handlerFut))
let gossip1 = GossipSub(nodes[0])
let gossip2 = GossipSub(nodes[1])
let gossip1 = GossipSub(nodes[0].pubSub.get())
let gossip2 = GossipSub(nodes[1].pubSub.get())
check: check:
"foo" notin gossip1.mesh and gossip1.fanout["foo"].len == 1 "foo" notin gossip1.mesh and gossip1.fanout["foo"].len == 1
"foo" notin gossip2.mesh and "foo" notin gossip2.fanout "foo" notin gossip2.mesh and "foo" notin gossip2.fanout
@ -193,104 +231,95 @@ suite "GossipSub":
"bar" notin gossip2.mesh and "bar" notin gossip2.fanout "bar" notin gossip2.mesh and "bar" notin gossip2.fanout
await allFuturesThrowing( await allFuturesThrowing(
nodes[0].stop(), nodes[0].switch.stop(),
nodes[1].stop()) nodes[1].switch.stop()
)
await allFuturesThrowing(subscribes)
await allFuturesThrowing(awaiters)
result = true
check:
waitFor(runTests()) == true
test "GossipSub publish should fail on timeout":
proc runTests(): Future[bool] {.async.} =
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
discard
var nodes = generateNodes(2, gossip = true)
var awaiters: seq[Future[void]]
awaiters.add((await nodes[0].start()))
awaiters.add((await nodes[1].start()))
let subscribes = await subscribeNodes(nodes)
await nodes[1].subscribe("foobar", handler)
await waitSub(nodes[0], nodes[1], "foobar")
let pubsub = nodes[0].pubSub.get()
let peer = pubsub.peers[nodes[1].peerInfo.id]
peer.conn = Connection(newBufferStream(
proc (data: seq[byte]) {.async, gcsafe.} =
await sleepAsync(10.seconds)
, size = 0))
let in10millis = Moment.fromNow(10.millis)
let sent = await nodes[0].publish("foobar", "Hello!".toBytes(), 10.millis)
check Moment.now() >= in10millis
check sent == 0
await allFuturesThrowing( await allFuturesThrowing(
nodes[0].stop(), nodes[0].stop(),
nodes[1].stop()) nodes[1].stop()
)
await allFuturesThrowing(subscribes) await allFuturesThrowing(nodesFut.concat())
await allFuturesThrowing(awaiters)
result = true
check: waitFor(runTests())
waitFor(runTests()) == true
test "e2e - GossipSub should add remote peer topic subscriptions": test "e2e - GossipSub should add remote peer topic subscriptions":
proc testBasicGossipSub(): Future[bool] {.async.} = proc testBasicGossipSub() {.async.} =
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
discard discard
var nodes: seq[Switch] = newSeq[Switch]() let
for i in 0..<2: nodes = generateNodes(
nodes.add newStandardSwitch(gossip = true, 2,
gossip = true,
secureManagers = [SecureProtocol.Noise]) secureManagers = [SecureProtocol.Noise])
var awaitters: seq[Future[void]] # start switches
for node in nodes: nodesFut = await allFinished(
awaitters.add(await node.start()) nodes[0].switch.start(),
nodes[1].switch.start(),
)
# start pubsub
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
))
await subscribeNodes(nodes)
let subscribes = await subscribeNodes(nodes)
await nodes[1].subscribe("foobar", handler) await nodes[1].subscribe("foobar", handler)
await sleepAsync(10.seconds) await sleepAsync(10.seconds)
let gossip1 = GossipSub(nodes[0].pubSub.get()) let gossip1 = GossipSub(nodes[0])
let gossip2 = GossipSub(nodes[1].pubSub.get()) let gossip2 = GossipSub(nodes[1])
check: check:
"foobar" in gossip2.topics "foobar" in gossip2.topics
"foobar" in gossip1.gossipsub "foobar" in gossip1.gossipsub
gossip1.gossipsub.hasPeerID("foobar", gossip2.peerInfo.id) gossip1.gossipsub.hasPeerID("foobar", gossip2.peerInfo.peerId)
await allFuturesThrowing(nodes.mapIt(it.stop())) await allFuturesThrowing(
nodes[0].switch.stop(),
nodes[1].switch.stop()
)
await allFuturesThrowing(subscribes) await allFuturesThrowing(
await allFuturesThrowing(awaitters) nodes[0].stop(),
nodes[1].stop()
)
result = true await allFuturesThrowing(nodesFut.concat())
check: waitFor(testBasicGossipSub())
waitFor(testBasicGossipSub()) == true
test "e2e - GossipSub should add remote peer topic subscriptions if both peers are subscribed": test "e2e - GossipSub should add remote peer topic subscriptions if both peers are subscribed":
proc testBasicGossipSub(): Future[bool] {.async.} = proc testBasicGossipSub() {.async.} =
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
discard discard
var nodes: seq[Switch] = newSeq[Switch]() let
for i in 0..<2: nodes = generateNodes(
nodes.add newStandardSwitch(gossip = true, secureManagers = [SecureProtocol.Secio]) 2,
gossip = true,
secureManagers = [SecureProtocol.Secio])
var awaitters: seq[Future[void]] # start switches
for node in nodes: nodesFut = await allFinished(
awaitters.add(await node.start()) nodes[0].switch.start(),
nodes[1].switch.start(),
)
let subscribes = await subscribeNodes(nodes) # start pubsub
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
))
await subscribeNodes(nodes)
await nodes[0].subscribe("foobar", handler) await nodes[0].subscribe("foobar", handler)
await nodes[1].subscribe("foobar", handler) await nodes[1].subscribe("foobar", handler)
@ -302,8 +331,8 @@ suite "GossipSub":
await allFuturesThrowing(subs) await allFuturesThrowing(subs)
let let
gossip1 = GossipSub(nodes[0].pubSub.get()) gossip1 = GossipSub(nodes[0])
gossip2 = GossipSub(nodes[1].pubSub.get()) gossip2 = GossipSub(nodes[1])
check: check:
"foobar" in gossip1.topics "foobar" in gossip1.topics
@ -312,35 +341,53 @@ suite "GossipSub":
"foobar" in gossip1.gossipsub "foobar" in gossip1.gossipsub
"foobar" in gossip2.gossipsub "foobar" in gossip2.gossipsub
gossip1.gossipsub.hasPeerID("foobar", gossip2.peerInfo.id) or gossip1.gossipsub.hasPeerID("foobar", gossip2.peerInfo.peerId) or
gossip1.mesh.hasPeerID("foobar", gossip2.peerInfo.id) gossip1.mesh.hasPeerID("foobar", gossip2.peerInfo.peerId)
gossip2.gossipsub.hasPeerID("foobar", gossip1.peerInfo.id) or gossip2.gossipsub.hasPeerID("foobar", gossip1.peerInfo.peerId) or
gossip2.mesh.hasPeerID("foobar", gossip1.peerInfo.id) gossip2.mesh.hasPeerID("foobar", gossip1.peerInfo.peerId)
await allFuturesThrowing(nodes.mapIt(it.stop())) await allFuturesThrowing(
nodes[0].switch.stop(),
nodes[1].switch.stop()
)
await allFuturesThrowing(subscribes) await allFuturesThrowing(
await allFuturesThrowing(awaitters) nodes[0].stop(),
nodes[1].stop()
)
result = true await allFuturesThrowing(nodesFut.concat())
check: waitFor(testBasicGossipSub())
waitFor(testBasicGossipSub()) == true
test "e2e - GossipSub send over fanout A -> B": test "e2e - GossipSub send over fanout A -> B":
proc runTests(): Future[bool] {.async.} = proc runTests() {.async.} =
var passed = newFuture[void]() var passed = newFuture[void]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar" check topic == "foobar"
passed.complete() passed.complete()
var nodes = generateNodes(2, true) let
var wait = newSeq[Future[void]]() nodes = generateNodes(
wait.add(await nodes[0].start()) 2,
wait.add(await nodes[1].start()) gossip = true,
secureManagers = [SecureProtocol.Secio])
let subscribes = await subscribeNodes(nodes) # start switches
nodesFut = await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start(),
)
# start pubsub
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
))
await subscribeNodes(nodes)
await nodes[1].subscribe("foobar", handler) await nodes[1].subscribe("foobar", handler)
await waitSub(nodes[0], nodes[1], "foobar") await waitSub(nodes[0], nodes[1], "foobar")
@ -353,18 +400,19 @@ suite "GossipSub":
obs2 = PubSubObserver(onSend: proc(peer: PubSubPeer; msgs: var RPCMsg) = obs2 = PubSubObserver(onSend: proc(peer: PubSubPeer; msgs: var RPCMsg) =
inc observed inc observed
) )
nodes[1].pubsub.get().addObserver(obs1)
nodes[0].pubsub.get().addObserver(obs2) # nodes[1].addObserver(obs1)
# nodes[0].addObserver(obs2)
tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1 tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1
var gossip1: GossipSub = GossipSub(nodes[0].pubSub.get()) var gossip1: GossipSub = GossipSub(nodes[0])
var gossip2: GossipSub = GossipSub(nodes[1].pubSub.get()) var gossip2: GossipSub = GossipSub(nodes[1])
check: check:
"foobar" in gossip1.gossipsub "foobar" in gossip1.gossipsub
gossip1.fanout.hasPeerID("foobar", gossip2.peerInfo.id) gossip1.fanout.hasPeerID("foobar", gossip2.peerInfo.peerId)
not gossip1.mesh.hasPeerID("foobar", gossip2.peerInfo.id) not gossip1.mesh.hasPeerID("foobar", gossip2.peerInfo.peerId)
await passed.wait(2.seconds) await passed.wait(2.seconds)
@ -373,14 +421,20 @@ suite "GossipSub":
await nodes[0].stop() await nodes[0].stop()
await nodes[1].stop() await nodes[1].stop()
await allFuturesThrowing(subscribes) await allFuturesThrowing(
await allFuturesThrowing(wait) nodes[0].switch.stop(),
nodes[1].switch.stop()
)
check observed == 2 await allFuturesThrowing(
result = true nodes[0].stop(),
nodes[1].stop()
)
check: await allFuturesThrowing(nodesFut.concat())
waitFor(runTests()) == true # check observed == 2
waitFor(runTests())
test "e2e - GossipSub send over mesh A -> B": test "e2e - GossipSub send over mesh A -> B":
proc runTests(): Future[bool] {.async.} = proc runTests(): Future[bool] {.async.} =
@ -389,12 +443,26 @@ suite "GossipSub":
check topic == "foobar" check topic == "foobar"
passed.complete(true) passed.complete(true)
var nodes = generateNodes(2, true) let
var wait: seq[Future[void]] nodes = generateNodes(
wait.add(await nodes[0].start()) 2,
wait.add(await nodes[1].start()) gossip = true,
secureManagers = [SecureProtocol.Secio])
let subscribes = await subscribeNodes(nodes) # start switches
nodesFut = await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start(),
)
# start pubsub
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
))
await subscribeNodes(nodes)
await nodes[0].subscribe("foobar", handler) await nodes[0].subscribe("foobar", handler)
await nodes[1].subscribe("foobar", handler) await nodes[1].subscribe("foobar", handler)
@ -404,39 +472,42 @@ suite "GossipSub":
result = await passed result = await passed
var gossip1: GossipSub = GossipSub(nodes[0].pubSub.get()) var gossip1: GossipSub = GossipSub(nodes[0])
var gossip2: GossipSub = GossipSub(nodes[1].pubSub.get()) var gossip2: GossipSub = GossipSub(nodes[1])
check: check:
"foobar" in gossip1.gossipsub "foobar" in gossip1.gossipsub
"foobar" in gossip2.gossipsub "foobar" in gossip2.gossipsub
gossip1.mesh.hasPeerID("foobar", gossip2.peerInfo.id) gossip1.mesh.hasPeerID("foobar", gossip2.peerInfo.peerId)
not gossip1.fanout.hasPeerID("foobar", gossip2.peerInfo.id) not gossip1.fanout.hasPeerID("foobar", gossip2.peerInfo.peerId)
gossip2.mesh.hasPeerID("foobar", gossip1.peerInfo.id) gossip2.mesh.hasPeerID("foobar", gossip1.peerInfo.peerId)
not gossip2.fanout.hasPeerID("foobar", gossip1.peerInfo.id) not gossip2.fanout.hasPeerID("foobar", gossip1.peerInfo.peerId)
await nodes[0].stop() await allFuturesThrowing(
await nodes[1].stop() nodes[0].switch.stop(),
nodes[1].switch.stop()
)
await allFuturesThrowing(subscribes) await allFuturesThrowing(
await allFuturesThrowing(wait) nodes[0].stop(),
nodes[1].stop()
)
await allFuturesThrowing(nodesFut.concat())
check: check:
waitFor(runTests()) == true waitFor(runTests()) == true
test "e2e - GossipSub with multiple peers": test "e2e - GossipSub with multiple peers":
proc runTests(): Future[bool] {.async.} = proc runTests() {.async.} =
var nodes: seq[Switch] = newSeq[Switch]()
var awaitters: seq[Future[void]]
var runs = 10 var runs = 10
for i in 0..<runs: let
nodes.add newStandardSwitch(triggerSelf = true, nodes = generateNodes(runs, gossip = true, triggerSelf = true)
gossip = true, nodesFut = nodes.mapIt(it.switch.start())
secureManagers = [SecureProtocol.Noise])
awaitters.add((await nodes[i].start()))
let subscribes = await subscribeRandom(nodes) await allFuturesThrowing(nodes.mapIt(it.start()))
await subscribeNodes(nodes)
var seen: Table[string, int] var seen: Table[string, int]
var subs: seq[Future[void]] var subs: seq[Future[void]]
@ -468,34 +539,33 @@ suite "GossipSub":
check: v >= 1 check: v >= 1
for node in nodes: for node in nodes:
var gossip: GossipSub = GossipSub(node.pubSub.get()) var gossip = GossipSub(node)
check: check:
"foobar" in gossip.gossipsub "foobar" in gossip.gossipsub
gossip.fanout.len == 0 gossip.fanout.len == 0
gossip.mesh["foobar"].len > 0 gossip.mesh["foobar"].len > 0
await allFuturesThrowing(nodes.mapIt(it.stop())) await allFuturesThrowing(
nodes.mapIt(
allFutures(
it.stop(),
it.switch.stop())))
await allFuturesThrowing(subscribes) await allFuturesThrowing(nodesFut)
await allFuturesThrowing(awaitters)
result = true
check: waitFor(runTests())
waitFor(runTests()) == true
test "e2e - GossipSub with multiple peers (sparse)": test "e2e - GossipSub with multiple peers (sparse)":
proc runTests(): Future[bool] {.async.} = proc runTests() {.async.} =
var nodes: seq[Switch] = newSeq[Switch]()
var awaitters: seq[Future[void]]
var runs = 10 var runs = 10
for i in 0..<runs: let
nodes.add newStandardSwitch(triggerSelf = true, nodes = generateNodes(runs, gossip = true, triggerSelf = true)
gossip = true, nodesFut = nodes.mapIt(it.switch.start())
secureManagers = [SecureProtocol.Secio])
awaitters.add((await nodes[i].start()))
let subscribes = await subscribeSparseNodes(nodes, 1) await allFuturesThrowing(nodes.mapIt(it.start()))
await subscribeNodes(nodes)
var seen: Table[string, int] var seen: Table[string, int]
var subs: seq[Future[void]] var subs: seq[Future[void]]
@ -528,17 +598,18 @@ suite "GossipSub":
check: v >= 1 check: v >= 1
for node in nodes: for node in nodes:
var gossip: GossipSub = GossipSub(node.pubSub.get()) var gossip = GossipSub(node)
check: check:
"foobar" in gossip.gossipsub "foobar" in gossip.gossipsub
gossip.fanout.len == 0 gossip.fanout.len == 0
gossip.mesh["foobar"].len > 0 gossip.mesh["foobar"].len > 0
await allFuturesThrowing(nodes.mapIt(it.stop())) await allFuturesThrowing(
nodes.mapIt(
allFutures(
it.stop(),
it.switch.stop())))
await allFuturesThrowing(subscribes) await allFuturesThrowing(nodesFut)
await allFuturesThrowing(awaitters)
result = true
check: waitFor(runTests())
waitFor(runTests()) == true

View File

@ -16,4 +16,4 @@ suite "Message":
peer = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) peer = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
msg = Message.init(peer, @[], "topic", seqno, sign = true) msg = Message.init(peer, @[], "topic", seqno, sign = true)
check verify(msg, peer) check verify(msg, peer.peerId)

View File

@ -1,22 +1,61 @@
# compile time options here
const
libp2p_pubsub_sign {.booldefine.} = true
libp2p_pubsub_verify {.booldefine.} = true
import random import random
import chronos import chronos
import ../../libp2p/standard_setup import ../../libp2p/[standard_setup,
protocols/pubsub/pubsub,
protocols/pubsub/floodsub,
protocols/pubsub/gossipsub,
protocols/secure/secure]
export standard_setup export standard_setup
randomize() randomize()
proc generateNodes*(num: Natural, gossip: bool = false): seq[Switch] = proc generateNodes*(
for i in 0..<num: num: Natural,
result.add(newStandardSwitch(gossip = gossip)) secureManagers: openarray[SecureProtocol] = [
# array cos order matters
SecureProtocol.Secio,
SecureProtocol.Noise,
],
msgIdProvider: MsgIdProvider = nil,
gossip: bool = false,
triggerSelf: bool = false,
verifySignature: bool = libp2p_pubsub_verify,
sign: bool = libp2p_pubsub_sign): seq[PubSub] =
proc subscribeNodes*(nodes: seq[Switch]): Future[seq[Future[void]]] {.async.} = for i in 0..<num:
let switch = newStandardSwitch(secureManagers = secureManagers)
let pubsub = if gossip:
GossipSub.init(
switch = switch,
triggerSelf = triggerSelf,
verifySignature = verifySignature,
sign = sign,
msgIdProvider = msgIdProvider).PubSub
else:
FloodSub.init(
switch = switch,
triggerSelf = triggerSelf,
verifySignature = verifySignature,
sign = sign,
msgIdProvider = msgIdProvider).PubSub
switch.mount(pubsub)
result.add(pubsub)
proc subscribeNodes*(nodes: seq[PubSub]) {.async.} =
for dialer in nodes: for dialer in nodes:
for node in nodes: for node in nodes:
if dialer.peerInfo.peerId != node.peerInfo.peerId: if dialer.switch.peerInfo.peerId != node.switch.peerInfo.peerId:
await dialer.connect(node.peerInfo) await dialer.switch.connect(node.peerInfo.peerId, node.peerInfo.addrs)
result.add(dialer.subscribePeer(node.peerInfo)) dialer.subscribePeer(node.peerInfo.peerId)
proc subscribeSparseNodes*(nodes: seq[Switch], degree: int = 2): Future[seq[Future[void]]] {.async.} = proc subscribeSparseNodes*(nodes: seq[PubSub], degree: int = 2) {.async.} =
if nodes.len < degree: if nodes.len < degree:
raise (ref CatchableError)(msg: "nodes count needs to be greater or equal to degree!") raise (ref CatchableError)(msg: "nodes count needs to be greater or equal to degree!")
@ -25,17 +64,17 @@ proc subscribeSparseNodes*(nodes: seq[Switch], degree: int = 2): Future[seq[Futu
continue continue
for node in nodes: for node in nodes:
if dialer.peerInfo.peerId != node.peerInfo.peerId: if dialer.switch.peerInfo.peerId != node.peerInfo.peerId:
await dialer.connect(node.peerInfo) await dialer.switch.connect(node.peerInfo.peerId, node.peerInfo.addrs)
result.add(dialer.subscribePeer(node.peerInfo)) dialer.subscribePeer(node.peerInfo.peerId)
proc subscribeRandom*(nodes: seq[Switch]): Future[seq[Future[void]]] {.async.} = proc subscribeRandom*(nodes: seq[PubSub]) {.async.} =
for dialer in nodes: for dialer in nodes:
var dialed: seq[string] var dialed: seq[PeerID]
while dialed.len < nodes.len - 1: while dialed.len < nodes.len - 1:
let node = sample(nodes) let node = sample(nodes)
if node.peerInfo.id notin dialed: if node.peerInfo.peerId notin dialed:
if dialer.peerInfo.id != node.peerInfo.id: if dialer.peerInfo.peerId != node.peerInfo.peerId:
await dialer.connect(node.peerInfo) await dialer.switch.connect(node.peerInfo.peerId, node.peerInfo.addrs)
result.add(dialer.subscribePeer(node.peerInfo)) dialer.subscribePeer(node.peerInfo.peerId)
dialed.add(node.peerInfo.id) dialed.add(node.peerInfo.peerId)

View File

@ -72,11 +72,20 @@ proc testPubSubDaemonPublish(gossip: bool = false,
let daemonNode = await newDaemonApi(flags) let daemonNode = await newDaemonApi(flags)
let daemonPeer = await daemonNode.identity() let daemonPeer = await daemonNode.identity()
let nativeNode = newStandardSwitch( let nativeNode = newStandardSwitch(
gossip = gossip,
secureManagers = [SecureProtocol.Noise], secureManagers = [SecureProtocol.Noise],
outTimeout = 5.minutes) outTimeout = 5.minutes)
let pubsub = if gossip:
GossipSub.init(
switch = nativeNode).PubSub
else:
FloodSub.init(
switch = nativeNode).PubSub
nativeNode.mount(pubsub)
let awaiters = nativeNode.start() let awaiters = nativeNode.start()
await pubsub.start()
let nativePeer = nativeNode.peerInfo let nativePeer = nativeNode.peerInfo
var finished = false var finished = false
@ -91,8 +100,8 @@ proc testPubSubDaemonPublish(gossip: bool = false,
let peer = NativePeerInfo.init( let peer = NativePeerInfo.init(
daemonPeer.peer, daemonPeer.peer,
daemonPeer.addresses) daemonPeer.addresses)
await nativeNode.connect(peer) await nativeNode.connect(peer.peerId, peer.addrs)
let subscribeHanle = nativeNode.subscribePeer(peer) pubsub.subscribePeer(peer.peerId)
await sleepAsync(1.seconds) await sleepAsync(1.seconds)
await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
@ -103,7 +112,7 @@ proc testPubSubDaemonPublish(gossip: bool = false,
result = true # don't cancel subscription result = true # don't cancel subscription
asyncDiscard daemonNode.pubsubSubscribe(testTopic, pubsubHandler) asyncDiscard daemonNode.pubsubSubscribe(testTopic, pubsubHandler)
await nativeNode.subscribe(testTopic, nativeHandler) await pubsub.subscribe(testTopic, nativeHandler)
await sleepAsync(5.seconds) await sleepAsync(5.seconds)
proc publisher() {.async.} = proc publisher() {.async.} =
@ -115,9 +124,9 @@ proc testPubSubDaemonPublish(gossip: bool = false,
result = true result = true
await nativeNode.stop() await nativeNode.stop()
await pubsub.stop()
await allFutures(awaiters) await allFutures(awaiters)
await daemonNode.close() await daemonNode.close()
await subscribeHanle
proc testPubSubNodePublish(gossip: bool = false, proc testPubSubNodePublish(gossip: bool = false,
count: int = 1): Future[bool] {.async.} = count: int = 1): Future[bool] {.async.} =
@ -132,18 +141,27 @@ proc testPubSubNodePublish(gossip: bool = false,
let daemonNode = await newDaemonApi(flags) let daemonNode = await newDaemonApi(flags)
let daemonPeer = await daemonNode.identity() let daemonPeer = await daemonNode.identity()
let nativeNode = newStandardSwitch( let nativeNode = newStandardSwitch(
gossip = gossip,
secureManagers = [SecureProtocol.Secio], secureManagers = [SecureProtocol.Secio],
outTimeout = 5.minutes) outTimeout = 5.minutes)
let pubsub = if gossip:
GossipSub.init(
switch = nativeNode).PubSub
else:
FloodSub.init(
switch = nativeNode).PubSub
nativeNode.mount(pubsub)
let awaiters = nativeNode.start() let awaiters = nativeNode.start()
await pubsub.start()
let nativePeer = nativeNode.peerInfo let nativePeer = nativeNode.peerInfo
let peer = NativePeerInfo.init( let peer = NativePeerInfo.init(
daemonPeer.peer, daemonPeer.peer,
daemonPeer.addresses) daemonPeer.addresses)
await nativeNode.connect(peer) await nativeNode.connect(peer)
let subscribeHandle = nativeNode.subscribePeer(peer) pubsub.subscribePeer(peer.peerId)
await sleepAsync(1.seconds) await sleepAsync(1.seconds)
await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
@ -162,21 +180,21 @@ proc testPubSubNodePublish(gossip: bool = false,
discard await daemonNode.pubsubSubscribe(testTopic, pubsubHandler) discard await daemonNode.pubsubSubscribe(testTopic, pubsubHandler)
proc nativeHandler(topic: string, data: seq[byte]) {.async.} = discard proc nativeHandler(topic: string, data: seq[byte]) {.async.} = discard
await nativeNode.subscribe(testTopic, nativeHandler) await pubsub.subscribe(testTopic, nativeHandler)
await sleepAsync(5.seconds) await sleepAsync(5.seconds)
proc publisher() {.async.} = proc publisher() {.async.} =
while not finished: while not finished:
discard await nativeNode.publish(testTopic, msgData) discard await pubsub.publish(testTopic, msgData)
await sleepAsync(500.millis) await sleepAsync(500.millis)
await wait(publisher(), 5.minutes) # should be plenty of time await wait(publisher(), 5.minutes) # should be plenty of time
result = finished result = finished
await nativeNode.stop() await nativeNode.stop()
await pubsub.stop()
await allFutures(awaiters) await allFutures(awaiters)
await daemonNode.close() await daemonNode.close()
await subscribeHandle
suite "Interop": suite "Interop":
# TODO: chronos transports are leaking, # TODO: chronos transports are leaking,

View File

@ -118,7 +118,7 @@ suite "Switch":
# plus 4 for the pubsub streams # plus 4 for the pubsub streams
check (BufferStreamTracker(bufferTracker).opened == check (BufferStreamTracker(bufferTracker).opened ==
(BufferStreamTracker(bufferTracker).closed + 4.uint64)) (BufferStreamTracker(bufferTracker).closed))
var connTracker = getTracker(ConnectionTrackerName) var connTracker = getTracker(ConnectionTrackerName)
# echo connTracker.dump() # echo connTracker.dump()
@ -127,7 +127,7 @@ suite "Switch":
# and the pubsub streams that won't clean up until # and the pubsub streams that won't clean up until
# `disconnect()` or `stop()` # `disconnect()` or `stop()`
check (ConnectionTracker(connTracker).opened == check (ConnectionTracker(connTracker).opened ==
(ConnectionTracker(connTracker).closed + 8.uint64)) (ConnectionTracker(connTracker).closed + 4.uint64))
await allFuturesThrowing( await allFuturesThrowing(
done.wait(5.seconds), done.wait(5.seconds),