Pubsub lifetime (#284)

* lifecycle hooks

* tests

* move trace after closed check

* restore 1 second heartbeat

* await close event

* fix tests

* print direction string

* more trace logging

* add pubsub monitor

* add log scope

* adjust idle timeout

* add exc.msg to trace
This commit is contained in:
Dmitriy Ryajov 2020-07-27 13:33:51 -06:00 committed by GitHub
parent ed0df74bbd
commit f7fdf31365
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 147 additions and 54 deletions

View File

@ -174,12 +174,12 @@ method reset*(s: LPChannel) {.base, async, gcsafe.} =
peer = $s.conn.peerInfo
# stack = getStackTrace()
trace "resetting channel"
if s.closedLocal and s.isEof:
trace "channel already closed or reset"
return
trace "resetting channel"
# we asyncCheck here because the other end
# might be dead already - reset is always
# optimistic
@ -227,7 +227,7 @@ method close*(s: LPChannel) {.async, gcsafe.} =
await s.reset()
raise exc
except CatchableError as exc:
trace "exception closing channel"
trace "exception closing channel", exc = exc.msg
await s.reset()
trace "lpchannel closed local"
@ -244,6 +244,13 @@ proc timeoutMonitor(s: LPChannel) {.async.} =
## be reset
##
logScope:
id = s.id
initiator = s.initiator
name = s.name
oid = $s.oid
peer = $s.conn.peerInfo
try:
while true:
await sleepAsync(s.timeout)

View File

@ -39,7 +39,7 @@ const GossipSubHistoryGossip* = 3
# heartbeat interval
const GossipSubHeartbeatInitialDelay* = 100.millis
const GossipSubHeartbeatInterval* = 5.seconds # TODO: per the spec it should be 1 second
const GossipSubHeartbeatInterval* = 1.seconds
# fanout ttl
const GossipSubFanoutTTL* = 1.minutes

View File

@ -242,17 +242,19 @@ method unsubscribe*(p: PubSub,
if p.topics[t.topic].handler.len <= 0:
p.topics.del(t.topic)
# metrics
libp2p_pubsub_topics.dec()
libp2p_pubsub_topics.set(p.topics.len.int64)
proc unsubscribe*(p: PubSub,
topic: string,
handler: TopicHandler): Future[void] =
## unsubscribe from a ``topic`` string
##
p.unsubscribe(@[(topic, handler)])
method unsubscribeAll*(p: PubSub, topic: string) {.base, async.} =
libp2p_pubsub_topics.dec()
p.topics.del(topic)
libp2p_pubsub_topics.set(p.topics.len.int64)
method subscribe*(p: PubSub,
topic: string,
@ -278,7 +280,7 @@ method subscribe*(p: PubSub,
checkFutures(await allFinished(sent))
# metrics
libp2p_pubsub_topics.inc()
libp2p_pubsub_topics.set(p.topics.len.int64)
proc sendHelper*(p: PubSub,
sendPeers: HashSet[PubSubPeer],

View File

@ -38,8 +38,8 @@ proc newStandardSwitch*(privKey = none(PrivateKey),
transportFlags: set[ServerFlags] = {},
msgIdProvider: MsgIdProvider = defaultMsgIdProvider,
rng = newRng(),
inTimeout: Duration = 1.minutes,
outTimeout: Duration = 1.minutes): Switch =
inTimeout: Duration = 5.minutes,
outTimeout: Duration = 5.minutes): Switch =
proc createMplex(conn: Connection): Muxer =
Mplex.init(
conn,

View File

@ -44,6 +44,9 @@ declareCounter(libp2p_dialed_peers, "dialed peers")
declareCounter(libp2p_failed_dials, "failed dials")
declareCounter(libp2p_failed_upgrade, "peers failed upgrade")
const
MaxPubsubReconnectAttempts* = 10
type
NoPubSubException* = object of CatchableError
@ -67,6 +70,7 @@ type
pubSub*: Option[PubSub]
dialLock: Table[string, AsyncLock]
hooks: Table[Lifecycle, HashSet[Hook]]
pubsubMonitors: Table[PeerId, Future[void]]
proc newNoPubSubException(): ref NoPubSubException {.inline.} =
result = newException(NoPubSubException, "no pubsub provided!")
@ -95,9 +99,18 @@ proc disconnect*(s: Switch, peer: PeerInfo) {.async, gcsafe.}
proc subscribePeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.}
proc cleanupPubSubPeer(s: Switch, conn: Connection) {.async.} =
try:
await conn.closeEvent.wait()
if s.pubSub.isSome:
let fut = s.pubsubMonitors.getOrDefault(conn.peerInfo.peerId)
if not(isNil(fut)) and not(fut.finished):
await fut.cancelAndWait()
await s.pubSub.get().unsubscribePeer(conn.peerInfo)
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception cleaning pubsub peer", exc = exc.msg
proc isConnected*(s: Switch, peer: PeerInfo): bool =
## returns true if the peer has one or more
@ -322,7 +335,9 @@ proc internalConnect(s: Switch,
continue
break
else:
trace "Reusing existing connection", oid = $conn.oid, direction = conn.dir
trace "Reusing existing connection", oid = $conn.oid,
direction = $conn.dir,
peer = $conn.peerInfo
finally:
if lock.locked():
lock.release()
@ -344,6 +359,9 @@ proc internalConnect(s: Switch,
await s.subscribePeer(peer)
asyncCheck s.cleanupPubSubPeer(conn)
trace "got connection", oid = $conn.oid,
direction = $conn.dir,
peer = $conn.peerInfo
return conn
proc connect*(s: Switch, peer: PeerInfo) {.async.} =
@ -379,7 +397,7 @@ proc dial*(s: Switch,
await cleanup()
raise exc
except CatchableError as exc:
trace "error dialing"
trace "error dialing", exc = exc.msg
await cleanup()
raise exc
@ -399,7 +417,6 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} =
proc handle(conn: Connection): Future[void] {.async, closure, gcsafe.} =
try:
conn.closeEvent.wait()
.addCallback do(udata: pointer):
asyncCheck s.triggerHooks(
@ -451,7 +468,7 @@ proc stop*(s: Switch) {.async.} =
trace "switch stopped"
proc subscribePeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} =
proc subscribePeerInternal(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} =
## Subscribe to pub sub peer
if s.pubSub.isSome and not(s.pubSub.get().connected(peerInfo)):
trace "about to subscribe to pubsub peer", peer = peerInfo.shortLog()
@ -468,7 +485,7 @@ proc subscribePeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} =
return
s.pubSub.get().subscribePeer(stream)
await stream.closeEvent.wait()
except CancelledError as exc:
if not(isNil(stream)):
await stream.close()
@ -480,6 +497,35 @@ proc subscribePeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} =
if not(isNil(stream)):
await stream.close()
proc pubsubMonitor(switch: Switch, peer: PeerInfo) {.async.} =
## while peer connected maintain a
## pubsub connection as well
##
var tries = 0
var backoffFactor = 5 # up to ~10 mins
var backoff = 1.seconds
while switch.isConnected(peer) and
tries < MaxPubsubReconnectAttempts:
try:
debug "subscribing to pubsub peer", peer = $peer
await switch.subscribePeerInternal(peer)
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception in pubsub monitor", peer = $peer, exc = exc.msg
finally:
debug "awaiting backoff period before reconnecting", peer = $peer, backoff, tries
await sleepAsync(backoff) # allow the peer to cooldown
backoff = backoff * backoffFactor
tries.inc()
trace "exiting pubsub monitor", peer = $peer
proc subscribePeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} =
if peerInfo.peerId notin s.pubsubMonitors:
s.pubsubMonitors[peerInfo.peerId] = s.pubsubMonitor(peerInfo)
proc subscribe*(s: Switch, topic: string,
handler: TopicHandler) {.async.} =
## subscribe to a pubsub topic

View File

@ -54,7 +54,7 @@ suite "FloodSub":
nodes[1].start()
)
await subscribeNodes(nodes)
let subscribes = await subscribeNodes(nodes)
await nodes[1].subscribe("foobar", handler)
await waitSub(nodes[0], nodes[1], "foobar")
@ -69,6 +69,7 @@ suite "FloodSub":
)
await allFuturesThrowing(nodesFut.concat())
await allFuturesThrowing(subscribes)
check:
waitFor(runTests()) == true
@ -85,7 +86,7 @@ suite "FloodSub":
awaiters.add((await nodes[0].start()))
awaiters.add((await nodes[1].start()))
await subscribeNodes(nodes)
let subscribes = await subscribeNodes(nodes)
await nodes[0].subscribe("foobar", handler)
await waitSub(nodes[1], nodes[0], "foobar")
@ -95,6 +96,8 @@ suite "FloodSub":
result = await completionFut.wait(5.seconds)
await allFuturesThrowing(nodes[0].stop(), nodes[1].stop())
await allFuturesThrowing(subscribes)
await allFuturesThrowing(awaiters)
check:
@ -112,7 +115,7 @@ suite "FloodSub":
awaiters.add((await nodes[0].start()))
awaiters.add((await nodes[1].start()))
await subscribeNodes(nodes)
let subscribes = await subscribeNodes(nodes)
await nodes[1].subscribe("foobar", handler)
await waitSub(nodes[0], nodes[1], "foobar")
@ -131,6 +134,8 @@ suite "FloodSub":
await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop())
await allFuturesThrowing(subscribes)
await allFuturesThrowing(awaiters)
result = true
@ -147,7 +152,7 @@ suite "FloodSub":
awaiters.add((await nodes[0].start()))
awaiters.add((await nodes[1].start()))
await subscribeNodes(nodes)
let subscribes = await subscribeNodes(nodes)
await nodes[1].subscribe("foobar", handler)
await waitSub(nodes[0], nodes[1], "foobar")
@ -164,6 +169,8 @@ suite "FloodSub":
await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop())
await allFuturesThrowing(subscribes)
await allFuturesThrowing(awaiters)
result = true
@ -182,7 +189,7 @@ suite "FloodSub":
awaiters.add((await nodes[0].start()))
awaiters.add((await nodes[1].start()))
await subscribeNodes(nodes)
let subscribes = await subscribeNodes(nodes)
await nodes[1].subscribe("foo", handler)
await waitSub(nodes[0], nodes[1], "foo")
await nodes[1].subscribe("bar", handler)
@ -203,6 +210,8 @@ suite "FloodSub":
await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop())
await allFuturesThrowing(subscribes)
await allFuturesThrowing(awaiters)
result = true
@ -237,7 +246,7 @@ suite "FloodSub":
for i in 0..<runs:
awaitters.add(await nodes[i].start())
await subscribeNodes(nodes)
let subscribes = await subscribeNodes(nodes)
for i in 0..<runs:
await nodes[i].subscribe("foobar", futs[i][1])
@ -256,6 +265,8 @@ suite "FloodSub":
await allFuturesThrowing(futs.mapIt(it[0]))
await allFuturesThrowing(nodes.mapIt(it.stop()))
await allFuturesThrowing(subscribes)
await allFuturesThrowing(awaitters)
result = true
@ -291,7 +302,7 @@ suite "FloodSub":
for i in 0..<runs:
awaitters.add(await nodes[i].start())
await subscribeNodes(nodes)
let subscribes = await subscribeNodes(nodes)
for i in 0..<runs:
await nodes[i].subscribe("foobar", futs[i][1])
@ -310,6 +321,8 @@ suite "FloodSub":
await allFuturesThrowing(futs.mapIt(it[0]))
await allFuturesThrowing(nodes.mapIt(it.stop()))
await allFuturesThrowing(subscribes)
await allFuturesThrowing(awaitters)
result = true

View File

@ -72,7 +72,7 @@ suite "GossipSub":
awaiters.add((await nodes[0].start()))
awaiters.add((await nodes[1].start()))
await subscribeNodes(nodes)
let subscribes = await subscribeNodes(nodes)
await nodes[0].subscribe("foobar", handler)
await nodes[1].subscribe("foobar", handler)
@ -99,6 +99,8 @@ suite "GossipSub":
await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop())
await allFuturesThrowing(subscribes)
await allFuturesThrowing(awaiters)
check:
@ -114,7 +116,7 @@ suite "GossipSub":
awaiters.add((await nodes[0].start()))
awaiters.add((await nodes[1].start()))
await subscribeNodes(nodes)
let subscribes = await subscribeNodes(nodes)
await nodes[0].subscribe("foobar", handler)
await nodes[1].subscribe("foobar", handler)
@ -140,6 +142,8 @@ suite "GossipSub":
await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop())
await allFuturesThrowing(subscribes)
await allFuturesThrowing(awaiters)
check:
@ -157,7 +161,7 @@ suite "GossipSub":
awaiters.add((await nodes[0].start()))
awaiters.add((await nodes[1].start()))
await subscribeNodes(nodes)
let subscribes = await subscribeNodes(nodes)
await nodes[1].subscribe("foo", handler)
await nodes[1].subscribe("bar", handler)
@ -189,6 +193,8 @@ suite "GossipSub":
await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop())
await allFuturesThrowing(subscribes)
await allFuturesThrowing(awaiters)
result = true
check:
@ -208,7 +214,7 @@ suite "GossipSub":
for node in nodes:
awaitters.add(await node.start())
await subscribeNodes(nodes)
let subscribes = await subscribeNodes(nodes)
await nodes[1].subscribe("foobar", handler)
await sleepAsync(10.seconds)
@ -221,6 +227,8 @@ suite "GossipSub":
gossip1.gossipsub.hasPeerID("foobar", gossip2.peerInfo.id)
await allFuturesThrowing(nodes.mapIt(it.stop()))
await allFuturesThrowing(subscribes)
await allFuturesThrowing(awaitters)
result = true
@ -241,7 +249,7 @@ suite "GossipSub":
for node in nodes:
awaitters.add(await node.start())
await subscribeNodes(nodes)
let subscribes = await subscribeNodes(nodes)
await nodes[0].subscribe("foobar", handler)
await nodes[1].subscribe("foobar", handler)
@ -249,6 +257,7 @@ suite "GossipSub":
var subs: seq[Future[void]]
subs &= waitSub(nodes[1], nodes[0], "foobar")
subs &= waitSub(nodes[0], nodes[1], "foobar")
await allFuturesThrowing(subs)
let
@ -269,6 +278,8 @@ suite "GossipSub":
gossip2.mesh.hasPeerID("foobar", gossip1.peerInfo.id)
await allFuturesThrowing(nodes.mapIt(it.stop()))
await allFuturesThrowing(subscribes)
await allFuturesThrowing(awaitters)
result = true
@ -288,7 +299,7 @@ suite "GossipSub":
wait.add(await nodes[0].start())
wait.add(await nodes[1].start())
await subscribeNodes(nodes)
let subscribes = await subscribeNodes(nodes)
await nodes[1].subscribe("foobar", handler)
await waitSub(nodes[0], nodes[1], "foobar")
@ -320,6 +331,8 @@ suite "GossipSub":
await nodes[0].stop()
await nodes[1].stop()
await allFuturesThrowing(subscribes)
await allFuturesThrowing(wait)
check observed == 2
@ -340,7 +353,7 @@ suite "GossipSub":
wait.add(await nodes[0].start())
wait.add(await nodes[1].start())
await subscribeNodes(nodes)
let subscribes = await subscribeNodes(nodes)
await nodes[0].subscribe("foobar", handler)
await nodes[1].subscribe("foobar", handler)
@ -363,6 +376,8 @@ suite "GossipSub":
await nodes[0].stop()
await nodes[1].stop()
await allFuturesThrowing(subscribes)
await allFuturesThrowing(wait)
check:
@ -380,7 +395,7 @@ suite "GossipSub":
secureManagers = [SecureProtocol.Noise])
awaitters.add((await nodes[i].start()))
await subscribeRandom(nodes)
let subscribes = await subscribeRandom(nodes)
var seen: Table[string, int]
var subs: seq[Future[void]]
@ -419,6 +434,8 @@ suite "GossipSub":
gossip.mesh["foobar"].len > 0
await allFuturesThrowing(nodes.mapIt(it.stop()))
await allFuturesThrowing(subscribes)
await allFuturesThrowing(awaitters)
result = true
@ -437,7 +454,7 @@ suite "GossipSub":
secureManagers = [SecureProtocol.Secio])
awaitters.add((await nodes[i].start()))
await subscribeSparseNodes(nodes)
let subscribes = await subscribeSparseNodes(nodes, 1)
var seen: Table[string, int]
var subs: seq[Future[void]]
@ -477,6 +494,8 @@ suite "GossipSub":
gossip.mesh["foobar"].len > 0
await allFuturesThrowing(nodes.mapIt(it.stop()))
await allFuturesThrowing(subscribes)
await allFuturesThrowing(awaitters)
result = true

View File

@ -9,36 +9,33 @@ proc generateNodes*(num: Natural, gossip: bool = false): seq[Switch] =
for i in 0..<num:
result.add(newStandardSwitch(gossip = gossip))
proc subscribeNodes*(nodes: seq[Switch]) {.async.} =
var dials: seq[Future[void]]
proc subscribeNodes*(nodes: seq[Switch]): Future[seq[Future[void]]] {.async.} =
for dialer in nodes:
for node in nodes:
if dialer.peerInfo.peerId != node.peerInfo.peerId:
dials.add(dialer.connect(node.peerInfo))
await allFutures(dials)
await dialer.connect(node.peerInfo)
result.add(dialer.subscribePeer(node.peerInfo))
proc subscribeSparseNodes*(nodes: seq[Switch], degree: int = 2) {.async.} =
proc subscribeSparseNodes*(nodes: seq[Switch], degree: int = 2): Future[seq[Future[void]]] {.async.} =
if nodes.len < degree:
raise (ref CatchableError)(msg: "nodes count needs to be greater or equal to degree!")
var dials: seq[Future[void]]
for i, dialer in nodes:
if (i mod degree) != 0:
continue
for node in nodes:
if dialer.peerInfo.peerId != node.peerInfo.peerId:
dials.add(dialer.connect(node.peerInfo))
await allFutures(dials)
await dialer.connect(node.peerInfo)
result.add(dialer.subscribePeer(node.peerInfo))
proc subscribeRandom*(nodes: seq[Switch]) {.async.} =
var dials: seq[Future[void]]
proc subscribeRandom*(nodes: seq[Switch]): Future[seq[Future[void]]] {.async.} =
for dialer in nodes:
var dialed: seq[string]
while dialed.len < nodes.len - 1:
let node = sample(nodes)
if node.peerInfo.id notin dialed:
if dialer.peerInfo.id != node.peerInfo.id:
dials.add(dialer.connect(node.peerInfo))
dialed &= node.peerInfo.id
await allFutures(dials)
await dialer.connect(node.peerInfo)
result.add(dialer.subscribePeer(node.peerInfo))
dialed.add(node.peerInfo.id)

View File

@ -88,8 +88,12 @@ proc testPubSubDaemonPublish(gossip: bool = false,
if times >= count and not finished:
finished = true
await nativeNode.connect(NativePeerInfo.init(daemonPeer.peer,
daemonPeer.addresses))
let peer = NativePeerInfo.init(
daemonPeer.peer,
daemonPeer.addresses)
await nativeNode.connect(peer)
let subscribeHanle = nativeNode.subscribePeer(peer)
await sleepAsync(1.seconds)
await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
@ -113,6 +117,7 @@ proc testPubSubDaemonPublish(gossip: bool = false,
await nativeNode.stop()
await allFutures(awaiters)
await daemonNode.close()
await subscribeHanle
proc testPubSubNodePublish(gossip: bool = false,
count: int = 1): Future[bool] {.async.} =
@ -134,8 +139,11 @@ proc testPubSubNodePublish(gossip: bool = false,
let awaiters = nativeNode.start()
let nativePeer = nativeNode.peerInfo
await nativeNode.connect(NativePeerInfo.init(daemonPeer.peer,
daemonPeer.addresses))
let peer = NativePeerInfo.init(
daemonPeer.peer,
daemonPeer.addresses)
await nativeNode.connect(peer)
let subscribeHandle = nativeNode.subscribePeer(peer)
await sleepAsync(1.seconds)
await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
@ -168,6 +176,7 @@ proc testPubSubNodePublish(gossip: bool = false,
await nativeNode.stop()
await allFutures(awaiters)
await daemonNode.close()
await subscribeHandle
suite "Interop":
# TODO: chronos transports are leaking,