pubsub timeouts tuning (#295)

* add finegrained timeouts to pubsub

* use 10 millis timeout in tests

* finalization

* revert timeouts

* use `atEof` for reads

* adjust timeouts and use atEof for reads

* use atEof for reads

* set isEof flag

* no backoff for pubsub streams

* temp timer increase, make macos finalize

* don't call `subscribePeer` in libp2p anymore

* more traces

* leak tests

* lower timeouts

* handle exceptions in control message

* don't use `cancelAndWait`

* handle exceptions in helpers

* wip

* don't send empty messages

* check for leaks properly

* don't use cancelAndWait

* don't await subscribption sends

* remove subscrivePeer calls from switch

* trying without the hooks again
This commit is contained in:
Dmitriy Ryajov 2020-08-02 23:20:11 -06:00 committed by GitHub
parent 909d9652f6
commit 980764774e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 205 additions and 87 deletions

View File

@ -123,7 +123,7 @@ proc handle*(m: MultistreamSelect, conn: Connection, active: bool = false) {.asy
trace "handle: starting multistream handling", handshaked = active trace "handle: starting multistream handling", handshaked = active
var handshaked = active var handshaked = active
try: try:
while not conn.closed: while not conn.atEof:
var ms = string.fromBytes(await conn.readLp(1024)) var ms = string.fromBytes(await conn.readLp(1024))
validateSuffix(ms) validateSuffix(ms)

View File

@ -82,7 +82,7 @@ template withEOFExceptions(body: untyped): untyped =
proc cleanupTimer(s: LPChannel) {.async.} = proc cleanupTimer(s: LPChannel) {.async.} =
## cleanup timers ## cleanup timers
if not isNil(s.timerTaskFut) and not s.timerTaskFut.finished: if not isNil(s.timerTaskFut) and not s.timerTaskFut.finished:
await s.timerTaskFut.cancelAndWait() s.timerTaskFut.cancel()
proc closeMessage(s: LPChannel) {.async.} = proc closeMessage(s: LPChannel) {.async.} =
logScope: logScope:

View File

@ -117,7 +117,7 @@ method handle*(m: Mplex) {.async, gcsafe.} =
trace "stopping mplex main loop", oid = $m.oid trace "stopping mplex main loop", oid = $m.oid
await m.close() await m.close()
while not m.connection.closed: while not m.connection.atEof:
trace "waiting for data", oid = $m.oid trace "waiting for data", oid = $m.oid
let (id, msgType, data) = await m.connection.readMsg() let (id, msgType, data) = await m.connection.readMsg()
trace "read message from connection", id = id, trace "read message from connection", id = id,
@ -169,7 +169,10 @@ method handle*(m: Mplex) {.async, gcsafe.} =
trace "pushing data to channel" trace "pushing data to channel"
if data.len > MaxMsgSize: if data.len > MaxMsgSize:
warn "attempting to send a packet larger than allowed", allowed = MaxMsgSize,
sending = data.len
raise newLPStreamLimitError() raise newLPStreamLimitError()
await channel.pushTo(data) await channel.pushTo(data)
of MessageType.CloseIn, MessageType.CloseOut: of MessageType.CloseIn, MessageType.CloseOut:

View File

@ -97,7 +97,7 @@ method rpcHandler*(f: FloodSub,
trace "exception in message handler", exc = exc.msg trace "exception in message handler", exc = exc.msg
# forward the message to all peers interested in it # forward the message to all peers interested in it
let published = await f.publishHelper(toSendPeers, m.messages) let published = await f.publishHelper(toSendPeers, m.messages, DefaultSendTimeout)
trace "forwared message to peers", peers = published trace "forwared message to peers", peers = published
@ -120,9 +120,10 @@ method subscribePeer*(p: FloodSub,
method publish*(f: FloodSub, method publish*(f: FloodSub,
topic: string, topic: string,
data: seq[byte]): Future[int] {.async.} = data: seq[byte],
timeout: Duration = InfiniteDuration): Future[int] {.async.} =
# base returns always 0 # base returns always 0
discard await procCall PubSub(f).publish(topic, data) discard await procCall PubSub(f).publish(topic, data, timeout)
if data.len <= 0 or topic.len <= 0: if data.len <= 0 or topic.len <= 0:
trace "topic or data missing, skipping publish" trace "topic or data missing, skipping publish"
@ -137,7 +138,7 @@ method publish*(f: FloodSub,
let msg = Message.init(f.peerInfo, data, topic, f.msgSeqno, f.sign) let msg = Message.init(f.peerInfo, data, topic, f.msgSeqno, f.sign)
# start the future but do not wait yet # start the future but do not wait yet
let published = await f.publishHelper(f.floodsub.getOrDefault(topic), @[msg]) let published = await f.publishHelper(f.floodsub.getOrDefault(topic), @[msg], timeout)
libp2p_pubsub_messages_published.inc(labelValues = [topic]) libp2p_pubsub_messages_published.inc(labelValues = [topic])

View File

@ -189,16 +189,14 @@ proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} =
let gossipPeers = mesh + fanout let gossipPeers = mesh + fanout
let mids = g.mcache.window(topic) let mids = g.mcache.window(topic)
if mids.len <= 0: if not mids.len > 0:
continue continue
let ihave = ControlIHave(topicID: topic,
messageIDs: toSeq(mids))
if topic notin g.gossipsub: if topic notin g.gossipsub:
trace "topic not in gossip array, skipping", topicID = topic trace "topic not in gossip array, skipping", topicID = topic
continue continue
let ihave = ControlIHave(topicID: topic, messageIDs: toSeq(mids))
for peer in allPeers: for peer in allPeers:
if result.len >= GossipSubD: if result.len >= GossipSubD:
trace "got gossip peers", peers = result.len trace "got gossip peers", peers = result.len
@ -422,7 +420,7 @@ method rpcHandler*(g: GossipSub,
trace "exception in message handler", exc = exc.msg trace "exception in message handler", exc = exc.msg
# forward the message to all peers interested in it # forward the message to all peers interested in it
let published = await g.publishHelper(toSendPeers, m.messages) let published = await g.publishHelper(toSendPeers, m.messages, DefaultSendTimeout)
trace "forwared message to peers", peers = published trace "forwared message to peers", peers = published
@ -436,9 +434,15 @@ method rpcHandler*(g: GossipSub,
let messages = g.handleIWant(peer, control.iwant) let messages = g.handleIWant(peer, control.iwant)
if respControl.graft.len > 0 or respControl.prune.len > 0 or if respControl.graft.len > 0 or respControl.prune.len > 0 or
respControl.ihave.len > 0 or respControl.iwant.len > 0: respControl.ihave.len > 0:
try:
info "sending control message", msg = respControl
await peer.send( await peer.send(
RPCMsg(control: some(respControl), messages: messages)) RPCMsg(control: some(respControl), messages: messages))
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception forwarding control messages", exc = exc.msg
method subscribe*(g: GossipSub, method subscribe*(g: GossipSub,
topic: string, topic: string,
@ -476,9 +480,10 @@ method unsubscribeAll*(g: GossipSub, topic: string) {.async.} =
method publish*(g: GossipSub, method publish*(g: GossipSub,
topic: string, topic: string,
data: seq[byte]): Future[int] {.async.} = data: seq[byte],
timeout: Duration = InfiniteDuration): Future[int] {.async.} =
# base returns always 0 # base returns always 0
discard await procCall PubSub(g).publish(topic, data) discard await procCall PubSub(g).publish(topic, data, timeout)
trace "publishing message on topic", topic, data = data.shortLog trace "publishing message on topic", topic, data = data.shortLog
var peers: HashSet[PubSubPeer] var peers: HashSet[PubSubPeer]
@ -512,7 +517,7 @@ method publish*(g: GossipSub,
if msgId notin g.mcache: if msgId notin g.mcache:
g.mcache.put(msgId, msg) g.mcache.put(msgId, msg)
let published = await g.publishHelper(peers, @[msg]) let published = await g.publishHelper(peers, @[msg], timeout)
if published > 0: if published > 0:
libp2p_pubsub_messages_published.inc(labelValues = [topic]) libp2p_pubsub_messages_published.inc(labelValues = [topic])

View File

@ -30,8 +30,6 @@ declareCounter(libp2p_pubsub_validation_failure, "pubsub failed validated messag
declarePublicCounter(libp2p_pubsub_messages_published, "published messages", labels = ["topic"]) declarePublicCounter(libp2p_pubsub_messages_published, "published messages", labels = ["topic"])
type type
SendRes = tuple[published: seq[string], failed: seq[string]] # keep private
TopicHandler* = proc(topic: string, TopicHandler* = proc(topic: string,
data: seq[byte]): Future[void] {.gcsafe.} data: seq[byte]): Future[void] {.gcsafe.}
@ -66,6 +64,7 @@ method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.base.} =
## ##
if not(isNil(peer)) and peer.peerInfo notin p.conns: if not(isNil(peer)) and peer.peerInfo notin p.conns:
trace "deleting peer", peer = peer.id trace "deleting peer", peer = peer.id
peer.onConnect.fire() # Make sure all pending sends are unblocked
p.peers.del(peer.id) p.peers.del(peer.id)
trace "peer disconnected", peer = peer.id trace "peer disconnected", peer = peer.id
@ -95,24 +94,7 @@ proc sendSubs*(p: PubSub,
topics: seq[string], topics: seq[string],
subscribe: bool) {.async.} = subscribe: bool) {.async.} =
## send subscriptions to remote peer ## send subscriptions to remote peer
asyncCheck peer.sendSubOpts(topics, subscribe)
try:
# wait for a connection before publishing
# this happens when
if not peer.onConnect.isSet:
trace "awaiting send connection"
await peer.onConnect.wait()
await peer.sendSubOpts(topics, subscribe)
except CancelledError as exc:
if not(isNil(peer)) and not(isNil(peer.conn)):
await peer.conn.close()
raise exc
except CatchableError as exc:
trace "unable to send subscriptions", exc = exc.msg
if not(isNil(peer)) and not(isNil(peer.conn)):
await peer.conn.close()
method subscribeTopic*(p: PubSub, method subscribeTopic*(p: PubSub,
topic: string, topic: string,
@ -147,7 +129,6 @@ proc getOrCreatePeer(p: PubSub,
p.peers[peer.id] = peer p.peers[peer.id] = peer
peer.observers = p.observers peer.observers = p.observers
# metrics
libp2p_pubsub_peers.set(p.peers.len.int64) libp2p_pubsub_peers.set(p.peers.len.int64)
return peer return peer
@ -281,9 +262,11 @@ method subscribe*(p: PubSub,
# metrics # metrics
libp2p_pubsub_topics.set(p.topics.len.int64) libp2p_pubsub_topics.set(p.topics.len.int64)
proc sendHelper*(p: PubSub, proc publishHelper*(p: PubSub,
sendPeers: HashSet[PubSubPeer], sendPeers: HashSet[PubSubPeer],
msgs: seq[Message]): Future[SendRes] {.async.} = msgs: seq[Message],
timeout: Duration): Future[int] {.async.} =
# send messages and cleanup failed peers
var sent: seq[tuple[id: string, fut: Future[void]]] var sent: seq[tuple[id: string, fut: Future[void]]]
for sendPeer in sendPeers: for sendPeer in sendPeers:
# avoid sending to self # avoid sending to self
@ -291,7 +274,7 @@ proc sendHelper*(p: PubSub,
continue continue
trace "sending messages to peer", peer = sendPeer.id, msgs trace "sending messages to peer", peer = sendPeer.id, msgs
sent.add((id: sendPeer.id, fut: sendPeer.send(RPCMsg(messages: msgs)))) sent.add((id: sendPeer.id, fut: sendPeer.send(RPCMsg(messages: msgs), timeout)))
var published: seq[string] var published: seq[string]
var failed: seq[string] var failed: seq[string]
@ -306,13 +289,6 @@ proc sendHelper*(p: PubSub,
trace "sending messages to peer succeeded", peer = f[0].id trace "sending messages to peer succeeded", peer = f[0].id
published.add(f[0].id) published.add(f[0].id)
return (published, failed)
proc publishHelper*(p: PubSub,
sendPeers: HashSet[PubSubPeer],
msgs: seq[Message]): Future[int] {.async.} =
# send messages and cleanup failed peers
let (published, failed) = await p.sendHelper(sendPeers, msgs)
for f in failed: for f in failed:
let peer = p.peers.getOrDefault(f) let peer = p.peers.getOrDefault(f)
if not(isNil(peer)) and not(isNil(peer.conn)): if not(isNil(peer)) and not(isNil(peer.conn)):
@ -322,7 +298,8 @@ proc publishHelper*(p: PubSub,
method publish*(p: PubSub, method publish*(p: PubSub,
topic: string, topic: string,
data: seq[byte]): Future[int] {.base, async.} = data: seq[byte],
timeout: Duration = InfiniteDuration): Future[int] {.base, async.} =
## publish to a ``topic`` ## publish to a ``topic``
if p.triggerSelf and topic in p.topics: if p.triggerSelf and topic in p.topics:
for h in p.topics[topic].handler: for h in p.topics[topic].handler:

View File

@ -26,6 +26,10 @@ declareCounter(libp2p_pubsub_received_messages, "number of messages received", l
declareCounter(libp2p_pubsub_skipped_received_messages, "number of received skipped messages", labels = ["id"]) declareCounter(libp2p_pubsub_skipped_received_messages, "number of received skipped messages", labels = ["id"])
declareCounter(libp2p_pubsub_skipped_sent_messages, "number of sent skipped messages", labels = ["id"]) declareCounter(libp2p_pubsub_skipped_sent_messages, "number of sent skipped messages", labels = ["id"])
const
DefaultReadTimeout* = 1.minutes
DefaultSendTimeout* = 10.seconds
type type
PubSubObserver* = ref object PubSubObserver* = ref object
onRecv*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].} onRecv*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].}
@ -81,9 +85,9 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
debug "starting pubsub read loop for peer", closed = conn.closed debug "starting pubsub read loop for peer", closed = conn.closed
try: try:
try: try:
while not conn.closed: while not conn.atEof:
trace "waiting for data", closed = conn.closed trace "waiting for data", closed = conn.closed
let data = await conn.readLp(64 * 1024) let data = await conn.readLp(64 * 1024).wait(DefaultReadTimeout)
let digest = $(sha256.digest(data)) let digest = $(sha256.digest(data))
trace "read data from peer", data = data.shortLog trace "read data from peer", data = data.shortLog
if digest in p.recvdRpcCache: if digest in p.recvdRpcCache:
@ -119,7 +123,10 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
trace "Exception occurred in PubSubPeer.handle", exc = exc.msg trace "Exception occurred in PubSubPeer.handle", exc = exc.msg
raise exc raise exc
proc send*(p: PubSubPeer, msg: RPCMsg) {.async.} = proc send*(
p: PubSubPeer,
msg: RPCMsg,
timeout: Duration = DefaultSendTimeout) {.async.} =
logScope: logScope:
peer = p.id peer = p.id
msg = shortLog(msg) msg = shortLog(msg)
@ -132,7 +139,7 @@ proc send*(p: PubSubPeer, msg: RPCMsg) {.async.} =
let encoded = encodeRpcMsg(mm) let encoded = encodeRpcMsg(mm)
if encoded.len <= 0: if encoded.len <= 0:
trace "empty message, skipping" info "empty message, skipping"
return return
logScope: logScope:
@ -144,21 +151,36 @@ proc send*(p: PubSubPeer, msg: RPCMsg) {.async.} =
libp2p_pubsub_skipped_sent_messages.inc(labelValues = [p.id]) libp2p_pubsub_skipped_sent_messages.inc(labelValues = [p.id])
return return
try: proc sendToRemote() {.async.} =
logScope:
peer = p.id
msg = shortLog(msg)
trace "about to send message" trace "about to send message"
if not p.onConnect.isSet:
await p.onConnect.wait()
if p.connected: # this can happen if the remote disconnected if p.connected: # this can happen if the remote disconnected
trace "sending encoded msgs to peer" trace "sending encoded msgs to peer"
await p.sendConn.writeLp(encoded) await p.sendConn.writeLp(encoded)
p.sentRpcCache.put(digest) p.sentRpcCache.put(digest)
trace "sent pubsub message to remote"
for x in mm.messages: for x in mm.messages:
for t in x.topicIDs: for t in x.topicIDs:
# metrics # metrics
libp2p_pubsub_sent_messages.inc(labelValues = [p.id, t]) libp2p_pubsub_sent_messages.inc(labelValues = [p.id, t])
let sendFut = sendToRemote()
try:
await sendFut.wait(timeout)
except CatchableError as exc: except CatchableError as exc:
trace "unable to send to remote", exc = exc.msg trace "unable to send to remote", exc = exc.msg
if not sendFut.finished:
sendFut.cancel()
if not(isNil(p.sendConn)): if not(isNil(p.sendConn)):
await p.sendConn.close() await p.sendConn.close()
p.sendConn = nil p.sendConn = nil
@ -166,21 +188,41 @@ proc send*(p: PubSubPeer, msg: RPCMsg) {.async.} =
raise exc raise exc
proc sendSubOpts*(p: PubSubPeer, topics: seq[string], subscribe: bool): Future[void] = proc sendSubOpts*(p: PubSubPeer, topics: seq[string], subscribe: bool) {.async.} =
trace "sending subscriptions", peer = p.id, subscribe, topicIDs = topics trace "sending subscriptions", peer = p.id, subscribe, topicIDs = topics
p.send(RPCMsg( try:
subscriptions: topics.mapIt(SubOpts(subscribe: subscribe, topic: it)))) await p.send(RPCMsg(
subscriptions: topics.mapIt(SubOpts(subscribe: subscribe, topic: it))),
# the long timeout is mostly for cases where
# the connection is flaky at the beggingin
timeout = 3.minutes)
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception sending subscriptions", exc = exc.msg
proc sendGraft*(p: PubSubPeer, topics: seq[string]): Future[void] = proc sendGraft*(p: PubSubPeer, topics: seq[string]) {.async.} =
trace "sending graft to peer", peer = p.id, topicIDs = topics trace "sending graft to peer", peer = p.id, topicIDs = topics
p.send(RPCMsg(control: some(
ControlMessage(graft: topics.mapIt(ControlGraft(topicID: it))))))
proc sendPrune*(p: PubSubPeer, topics: seq[string]): Future[void] = try:
await p.send(RPCMsg(control: some(
ControlMessage(graft: topics.mapIt(ControlGraft(topicID: it))))))
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception sending grafts", exc = exc.msg
proc sendPrune*(p: PubSubPeer, topics: seq[string]) {.async.} =
trace "sending prune to peer", peer = p.id, topicIDs = topics trace "sending prune to peer", peer = p.id, topicIDs = topics
p.send(RPCMsg(control: some(
try:
await p.send(RPCMsg(control: some(
ControlMessage(prune: topics.mapIt(ControlPrune(topicID: it)))))) ControlMessage(prune: topics.mapIt(ControlPrune(topicID: it))))))
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception sending prunes", exc = exc.msg
proc `$`*(p: PubSubPeer): string = proc `$`*(p: PubSubPeer): string =
p.id p.id

View File

@ -97,14 +97,16 @@ proc triggerHooks(s: Switch, peer: PeerInfo, cycle: Lifecycle) {.async, gcsafe.}
proc disconnect*(s: Switch, peer: PeerInfo) {.async, gcsafe.} proc disconnect*(s: Switch, peer: PeerInfo) {.async, gcsafe.}
proc subscribePeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} proc subscribePeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.}
proc subscribePeerInternal(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.}
proc cleanupPubSubPeer(s: Switch, conn: Connection) {.async.} = proc cleanupPubSubPeer(s: Switch, conn: Connection) {.async.} =
try: try:
await conn.closeEvent.wait() await conn.closeEvent.wait()
trace "about to cleanup pubsub peer"
if s.pubSub.isSome: if s.pubSub.isSome:
let fut = s.pubsubMonitors.getOrDefault(conn.peerInfo.peerId) let fut = s.pubsubMonitors.getOrDefault(conn.peerInfo.peerId)
if not(isNil(fut)) and not(fut.finished): if not(isNil(fut)) and not(fut.finished):
await fut.cancelAndWait() fut.cancel()
await s.pubSub.get().unsubscribePeer(conn.peerInfo) await s.pubSub.get().unsubscribePeer(conn.peerInfo)
except CancelledError as exc: except CancelledError as exc:
@ -235,7 +237,7 @@ proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, g
return sconn return sconn
proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} = proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} =
trace "upgrading incoming connection", conn = $conn, oid = conn.oid trace "upgrading incoming connection", conn = $conn, oid = $conn.oid
let ms = newMultistream() let ms = newMultistream()
# secure incoming connections # secure incoming connections
@ -244,7 +246,7 @@ proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} =
{.async, gcsafe, closure.} = {.async, gcsafe, closure.} =
var sconn: Connection var sconn: Connection
trace "Securing connection", oid = conn.oid trace "Securing connection", oid = $conn.oid
let secure = s.secureManagers.filterIt(it.codec == proto)[0] let secure = s.secureManagers.filterIt(it.codec == proto)[0]
try: try:
@ -356,8 +358,8 @@ proc internalConnect(s: Switch,
trace "dial successful", oid = $conn.oid, trace "dial successful", oid = $conn.oid,
peer = $conn.peerInfo peer = $conn.peerInfo
await s.subscribePeer(peer)
asyncCheck s.cleanupPubSubPeer(conn) asyncCheck s.cleanupPubSubPeer(conn)
asyncCheck s.subscribePeer(conn.peerInfo)
trace "got connection", oid = $conn.oid, trace "got connection", oid = $conn.oid,
direction = $conn.dir, direction = $conn.dir,
@ -391,7 +393,7 @@ proc dial*(s: Switch,
oid = $conn.oid oid = $conn.oid
if not await s.ms.select(stream, proto): if not await s.ms.select(stream, proto):
await stream.close() await stream.close()
raise newException(CatchableError, "Unable to select sub-protocol " & proto) raise newException(CatchableError, "Unable to select sub-protocol" & proto)
return stream return stream
except CancelledError as exc: except CancelledError as exc:
@ -445,7 +447,7 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} =
if s.pubSub.isSome: if s.pubSub.isSome:
await s.pubSub.get().start() await s.pubSub.get().start()
info "started libp2p node", peer = $s.peerInfo, addrs = s.peerInfo.addrs debug "started libp2p node", peer = $s.peerInfo, addrs = s.peerInfo.addrs
result = startFuts # listen for incoming connections result = startFuts # listen for incoming connections
proc stop*(s: Switch) {.async.} = proc stop*(s: Switch) {.async.} =
@ -472,7 +474,9 @@ proc stop*(s: Switch) {.async.} =
proc subscribePeerInternal(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} = proc subscribePeerInternal(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} =
## Subscribe to pub sub peer ## Subscribe to pub sub peer
if s.pubSub.isSome and not(s.pubSub.get().connected(peerInfo)): ##
if s.pubSub.isSome and not s.pubSub.get().connected(peerInfo):
trace "about to subscribe to pubsub peer", peer = peerInfo.shortLog() trace "about to subscribe to pubsub peer", peer = peerInfo.shortLog()
var stream: Connection var stream: Connection
try: try:
@ -483,6 +487,7 @@ proc subscribePeerInternal(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} =
if not await s.ms.select(stream, s.pubSub.get().codec): if not await s.ms.select(stream, s.pubSub.get().codec):
if not(isNil(stream)): if not(isNil(stream)):
trace "couldn't select pubsub", codec = s.pubSub.get().codec
await stream.close() await stream.close()
return return
@ -504,11 +509,7 @@ proc pubsubMonitor(s: Switch, peer: PeerInfo) {.async.} =
## pubsub connection as well ## pubsub connection as well
## ##
var tries = 0 while s.isConnected(peer):
var backoffFactor = 5 # up to ~10 mins
var backoff = 1.seconds
while s.isConnected(peer) and
tries < MaxPubsubReconnectAttempts:
try: try:
debug "subscribing to pubsub peer", peer = $peer debug "subscribing to pubsub peer", peer = $peer
await s.subscribePeerInternal(peer) await s.subscribePeerInternal(peer)
@ -517,10 +518,8 @@ proc pubsubMonitor(s: Switch, peer: PeerInfo) {.async.} =
except CatchableError as exc: except CatchableError as exc:
trace "exception in pubsub monitor", peer = $peer, exc = exc.msg trace "exception in pubsub monitor", peer = $peer, exc = exc.msg
finally: finally:
debug "awaiting backoff period before reconnecting", peer = $peer, backoff, tries debug "sleeping before trying pubsub peer", peer = $peer
await sleepAsync(backoff) # allow the peer to cooldown await sleepAsync(1.seconds) # allow the peer to cooldown
backoff = backoff * backoffFactor
tries.inc()
trace "exiting pubsub monitor", peer = $peer trace "exiting pubsub monitor", peer = $peer
@ -528,6 +527,8 @@ proc subscribePeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} =
if peerInfo.peerId notin s.pubsubMonitors: if peerInfo.peerId notin s.pubsubMonitors:
s.pubsubMonitors[peerInfo.peerId] = s.pubsubMonitor(peerInfo) s.pubsubMonitors[peerInfo.peerId] = s.pubsubMonitor(peerInfo)
result = s.pubsubMonitors.getOrDefault(peerInfo.peerId)
proc subscribe*(s: Switch, topic: string, proc subscribe*(s: Switch, topic: string,
handler: TopicHandler) {.async.} = handler: TopicHandler) {.async.} =
## subscribe to a pubsub topic ## subscribe to a pubsub topic
@ -554,14 +555,17 @@ proc unsubscribeAll*(s: Switch, topic: string) {.async.} =
await s.pubSub.get().unsubscribeAll(topic) await s.pubSub.get().unsubscribeAll(topic)
proc publish*(s: Switch, topic: string, data: seq[byte]): Future[int] {.async.} = proc publish*(s: Switch,
topic: string,
data: seq[byte],
timeout: Duration = InfiniteDuration): Future[int] {.async.} =
## pubslish to pubsub topic ## pubslish to pubsub topic
## ##
if s.pubSub.isNone: if s.pubSub.isNone:
raise newNoPubSubException() raise newNoPubSubException()
return await s.pubSub.get().publish(topic, data) return await s.pubSub.get().publish(topic, data, timeout)
proc addValidator*(s: Switch, proc addValidator*(s: Switch,
topics: varargs[string], topics: varargs[string],
@ -611,8 +615,8 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} =
asyncCheck s.triggerHooks(muxer.connection.peerInfo, Lifecycle.Upgraded) asyncCheck s.triggerHooks(muxer.connection.peerInfo, Lifecycle.Upgraded)
# try establishing a pubsub connection # try establishing a pubsub connection
await s.subscribePeer(muxer.connection.peerInfo)
asyncCheck s.cleanupPubSubPeer(muxer.connection) asyncCheck s.cleanupPubSubPeer(muxer.connection)
asyncCheck s.subscribePeer(muxer.connection.peerInfo)
except CancelledError as exc: except CancelledError as exc:
await muxer.close() await muxer.close()

View File

@ -15,7 +15,9 @@ import utils,
../../libp2p/[errors, ../../libp2p/[errors,
switch, switch,
stream/connection, stream/connection,
stream/bufferstream,
crypto/crypto, crypto/crypto,
protocols/pubsub/pubsubpeer,
protocols/pubsub/pubsub, protocols/pubsub/pubsub,
protocols/pubsub/floodsub, protocols/pubsub/floodsub,
protocols/pubsub/rpc/messages, protocols/pubsub/rpc/messages,
@ -218,6 +220,45 @@ suite "FloodSub":
check: check:
waitFor(runTests()) == true waitFor(runTests()) == true
test "FloodSub publish should fail on timeout":
proc runTests(): Future[bool] {.async.} =
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
discard
var nodes = generateNodes(2)
var awaiters: seq[Future[void]]
awaiters.add((await nodes[0].start()))
awaiters.add((await nodes[1].start()))
let subscribes = await subscribeNodes(nodes)
await nodes[1].subscribe("foobar", handler)
await waitSub(nodes[0], nodes[1], "foobar")
let pubsub = nodes[0].pubSub.get()
let peer = pubsub.peers[nodes[1].peerInfo.id]
peer.conn = Connection(newBufferStream(
proc (data: seq[byte]) {.async, gcsafe.} =
await sleepAsync(10.seconds)
,size = 0))
let in10millis = Moment.fromNow(10.millis)
let sent = await nodes[0].publish("foobar", "Hello!".toBytes(), 10.millis)
check Moment.now() >= in10millis
check sent == 0
await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop())
await allFuturesThrowing(subscribes)
await allFuturesThrowing(awaiters)
result = true
check:
waitFor(runTests()) == true
test "FloodSub multiple peers, no self trigger": test "FloodSub multiple peers, no self trigger":
proc runTests(): Future[bool] {.async.} = proc runTests(): Future[bool] {.async.} =
var runs = 10 var runs = 10

View File

@ -16,8 +16,10 @@ import utils, ../../libp2p/[errors,
peerid, peerid,
peerinfo, peerinfo,
stream/connection, stream/connection,
stream/bufferstream,
crypto/crypto, crypto/crypto,
protocols/pubsub/pubsub, protocols/pubsub/pubsub,
protocols/pubsub/pubsubpeer,
protocols/pubsub/gossipsub, protocols/pubsub/gossipsub,
protocols/pubsub/peertable, protocols/pubsub/peertable,
protocols/pubsub/rpc/messages] protocols/pubsub/rpc/messages]
@ -200,6 +202,45 @@ suite "GossipSub":
check: check:
waitFor(runTests()) == true waitFor(runTests()) == true
test "GossipSub publish should fail on timeout":
proc runTests(): Future[bool] {.async.} =
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
discard
var nodes = generateNodes(2, gossip = true)
var awaiters: seq[Future[void]]
awaiters.add((await nodes[0].start()))
awaiters.add((await nodes[1].start()))
let subscribes = await subscribeNodes(nodes)
await nodes[1].subscribe("foobar", handler)
await waitSub(nodes[0], nodes[1], "foobar")
let pubsub = nodes[0].pubSub.get()
let peer = pubsub.peers[nodes[1].peerInfo.id]
peer.conn = Connection(newBufferStream(
proc (data: seq[byte]) {.async, gcsafe.} =
await sleepAsync(10.seconds)
, size = 0))
let in10millis = Moment.fromNow(10.millis)
let sent = await nodes[0].publish("foobar", "Hello!".toBytes(), 10.millis)
check Moment.now() >= in10millis
check sent == 0
await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop())
await allFuturesThrowing(subscribes)
await allFuturesThrowing(awaiters)
result = true
check:
waitFor(runTests()) == true
test "e2e - GossipSub should add remote peer topic subscriptions": test "e2e - GossipSub should add remote peer topic subscriptions":
proc testBasicGossipSub(): Future[bool] {.async.} = proc testBasicGossipSub(): Future[bool] {.async.} =
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =

View File

@ -54,6 +54,7 @@ method write*(s: TestSelectStream, msg: seq[byte]) {.async, gcsafe.} = discard
method close(s: TestSelectStream) {.async, gcsafe.} = method close(s: TestSelectStream) {.async, gcsafe.} =
s.isClosed = true s.isClosed = true
s.isEof = true
proc newTestSelectStream(): TestSelectStream = proc newTestSelectStream(): TestSelectStream =
new result new result
@ -104,6 +105,7 @@ method write*(s: TestLsStream, msg: seq[byte]) {.async, gcsafe.} =
method close(s: TestLsStream) {.async, gcsafe.} = method close(s: TestLsStream) {.async, gcsafe.} =
s.isClosed = true s.isClosed = true
s.isEof = true
proc newTestLsStream(ls: LsHandler): TestLsStream {.gcsafe.} = proc newTestLsStream(ls: LsHandler): TestLsStream {.gcsafe.} =
new result new result
@ -157,6 +159,7 @@ method write*(s: TestNaStream, msg: seq[byte]) {.async, gcsafe.} =
method close(s: TestNaStream) {.async, gcsafe.} = method close(s: TestNaStream) {.async, gcsafe.} =
s.isClosed = true s.isClosed = true
s.isEof = true
proc newTestNaStream(na: NaHandler): TestNaStream = proc newTestNaStream(na: NaHandler): TestNaStream =
new result new result
@ -234,6 +237,7 @@ suite "Multistream select":
let conn = newTestNaStream(testNaHandler) let conn = newTestNaStream(testNaHandler)
proc testNaHandler(msg: string): Future[void] {.async, gcsafe.} = proc testNaHandler(msg: string): Future[void] {.async, gcsafe.} =
echo msg
check msg == Na check msg == Na
await conn.close() await conn.close()