From fb3e6e3c697a4e917d64c5eac536e3d06a759cf4 Mon Sep 17 00:00:00 2001 From: Zahary Karadjov Date: Thu, 13 Aug 2020 18:08:49 +0300 Subject: [PATCH] Don't drop connection due to inactivity; Fix a race during gossip initialization --- libp2p/connmanager.nim | 7 +++++++ libp2p/muxers/muxer.nim | 8 +++++++- libp2p/protocols/pubsub/floodsub.nim | 5 ++++- libp2p/protocols/pubsub/gossipsub.nim | 27 ++++++++++++++++++-------- libp2p/protocols/pubsub/pubsub.nim | 6 +++++- libp2p/protocols/pubsub/pubsubpeer.nim | 6 +++--- libp2p/standard_setup.nim | 10 ++++++++-- libp2p/stream/connection.nim | 5 +++++ libp2p/switch.nim | 25 +++++++++++++++--------- 9 files changed, 74 insertions(+), 25 deletions(-) diff --git a/libp2p/connmanager.nim b/libp2p/connmanager.nim index 3edd2ff00..377976811 100644 --- a/libp2p/connmanager.nim +++ b/libp2p/connmanager.nim @@ -153,10 +153,13 @@ proc selectMuxer*(c: ConnManager, conn: Connection): Muxer = ## if isNil(conn): + trace "No connection in selectMuxer" return if conn in c.muxed: return c.muxed[conn].muxer + else: + trace "No entry in c.muxed" proc storeConn*(c: ConnManager, conn: Connection) = ## store a connection @@ -221,6 +224,8 @@ proc getMuxedStream*(c: ConnManager, let muxer = c.selectMuxer(c.selectConn(peerId, dir)) if not(isNil(muxer)): return await muxer.newStream() + else: + trace "No muxer, no stream", peerId proc getMuxedStream*(c: ConnManager, peerId: PeerID): Future[Connection] {.async, gcsafe.} = @@ -230,6 +235,8 @@ proc getMuxedStream*(c: ConnManager, let muxer = c.selectMuxer(c.selectConn(peerId)) if not(isNil(muxer)): return await muxer.newStream() + else: + trace "No muxer, no stream", peerId proc getMuxedStream*(c: ConnManager, conn: Connection): Future[Connection] {.async, gcsafe.} = diff --git a/libp2p/muxers/muxer.nim b/libp2p/muxers/muxer.nim index baa76a22d..301bd879e 100644 --- a/libp2p/muxers/muxer.nim +++ b/libp2p/muxers/muxer.nim @@ -28,12 +28,14 @@ type # user provider proc that returns a constructed Muxer MuxerConstructor* = proc(conn: Connection): Muxer {.gcsafe, closure.} + MuxerRegistrator* = proc(muxer: Muxer) {.gcsafe, closure.} # this wraps a creator proc that knows how to make muxers MuxerProvider* = ref object of LPProtocol newMuxer*: MuxerConstructor streamHandler*: StreamHandler # triggered every time there is a new stream, called for any muxer instance muxerHandler*: MuxerHandler # triggered every time there is a new muxed connection created + registrator*: MuxerRegistrator # muxer interface method newStream*(m: Muxer, name: string = "", lazy: bool = false): @@ -41,10 +43,13 @@ method newStream*(m: Muxer, name: string = "", lazy: bool = false): method close*(m: Muxer) {.base, async, gcsafe.} = discard method handle*(m: Muxer): Future[void] {.base, async, gcsafe.} = discard -proc newMuxerProvider*(creator: MuxerConstructor, codec: string): MuxerProvider {.gcsafe.} = +proc newMuxerProvider*(creator: MuxerConstructor, + codec: string, + registrator: MuxerRegistrator): MuxerProvider {.gcsafe.} = new result result.newMuxer = creator result.codec = codec + result.registrator = registrator result.init() method init(c: MuxerProvider) = @@ -65,6 +70,7 @@ method init(c: MuxerProvider) = futs &= c.muxerHandler(muxer) checkFutures(await allFinished(futs)) + except CancelledError as exc: raise exc except CatchableError as exc: diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 3ec69adde..ee110f8d1 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -16,7 +16,8 @@ import pubsub, rpc/[messages, message], ../../stream/connection, ../../peerid, - ../../peerinfo + ../../peerinfo, + ../../connmanager logScope: topics = "floodsub" @@ -115,6 +116,8 @@ method init*(f: FloodSub) = ## e.g. ``/floodsub/1.0.0``, etc... ## + # trace "Incoming FloodSub connection" + # f.switch.connManager.storeIncoming(conn) await f.handleConn(conn, proto) f.handler = handler diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 613c95710..43638d10a 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -21,7 +21,9 @@ import pubsub, ../../stream/connection, ../../peerid, ../../errors, - ../../utility + ../../utility, + ../../connmanager, + ../../switch logScope: topics = "gossipsub" @@ -77,6 +79,11 @@ method init*(g: GossipSub) = ## e.g. ``/floodsub/1.0.0``, etc... ## + # trace "Incoming Gossip connection" + # let muxer = g.switch.connManager.selectMuxer(conn) + # if muxer.isNil: + # await g.switch.mux(conn) + # g.switch.connManager.storeIncoming(conn) await g.handleConn(conn, proto) g.handler = handler @@ -537,14 +544,18 @@ method publish*(g: GossipSub, if msgId notin g.mcache: g.mcache.put(msgId, msg) - let published = await g.broadcast(toSeq(peers), RPCMsg(messages: @[msg]), timeout) - when defined(libp2p_expensive_metrics): - if published > 0: - libp2p_pubsub_messages_published.inc(labelValues = [topic]) + if peers.len > 0: + let published = await g.broadcast(toSeq(peers), RPCMsg(messages: @[msg]), timeout) + when defined(libp2p_expensive_metrics): + if published > 0: + libp2p_pubsub_messages_published.inc(labelValues = [topic]) - trace "published message to peers", peers = published, - msg = msg.shortLog() - return published + debug "published message to peers", peers = published, + msg = msg.shortLog() + return published + else: + debug "No peers for gossip message", topic, msg + return 0 method start*(g: GossipSub) {.async.} = trace "gossipsub start" diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index a3a9dfeca..2a6057ddb 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -16,7 +16,8 @@ import pubsubpeer, ../../stream/connection, ../../peerid, ../../peerinfo, - ../../errors + ../../errors, + ../../connmanager export PubSubPeer export PubSubObserver @@ -171,6 +172,9 @@ method handleConn*(p: PubSub, await conn.close() return + # trace "Incoming PubSub connection" + # p.switch.connManager.storeIncoming(conn) + proc handler(peer: PubSubPeer, msgs: seq[RPCMsg]) {.async.} = # call pubsub rpc handler await p.rpcHandler(peer, msgs) diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index ac47e811c..bfe33e676 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -159,7 +159,7 @@ proc send*( if not p.connected: try: await p.sendLock.acquire() - trace "no send connection, dialing peer" + trace "no send connection, dialing peer", codec = p.codec # get a send connection if there is none p.sendConn = await p.switch.dial( p.peerId, p.codec) @@ -176,7 +176,7 @@ proc send*( trace "sending encoded msgs to peer" await p.sendConn.writeLp(encoded).wait(timeout) p.sentRpcCache.put(digest) - trace "sent pubsub message to remote" + debug "sent pubsub message to remote" when defined(libp2p_expensive_metrics): for x in mm.messages: @@ -185,7 +185,7 @@ proc send*( libp2p_pubsub_sent_messages.inc(labelValues = [p.id, t]) except CatchableError as exc: - trace "unable to send to remote", exc = exc.msg + debug "unable to send to remote", exc = exc.msg if not(isNil(p.sendConn)): await p.sendConn.close() p.sendConn = nil diff --git a/libp2p/standard_setup.nim b/libp2p/standard_setup.nim index d9d51ff21..b22905d8e 100644 --- a/libp2p/standard_setup.nim +++ b/libp2p/standard_setup.nim @@ -1,6 +1,6 @@ import options, tables, chronos, bearssl, - switch, peerid, peerinfo, stream/connection, multiaddress, + switch, peerid, peerinfo, stream/connection, multiaddress, connmanager, crypto/crypto, transports/[transport, tcptransport], muxers/[muxer, mplex/mplex, mplex/types], protocols/[identify, secure/secure] @@ -37,10 +37,15 @@ proc newStandardSwitch*(privKey = none(PrivateKey), if rng == nil: # newRng could fail raise (ref CatchableError)(msg: "Cannot initialize RNG") + let connManager = ConnManager.init() + + proc registerMuxer(muxer: Muxer) {.gcsafe.} = + connManager.storeMuxer(muxer) + let seckey = privKey.get(otherwise = PrivateKey.random(rng[]).tryGet()) peerInfo = PeerInfo.init(seckey, [address]) - mplexProvider = newMuxerProvider(createMplex, MplexCodec) + mplexProvider = newMuxerProvider(createMplex, MplexCodec, registerMuxer) transports = @[Transport(TcpTransport.init(transportFlags))] muxers = {MplexCodec: mplexProvider}.toTable identify = newIdentify(peerInfo) @@ -58,6 +63,7 @@ proc newStandardSwitch*(privKey = none(PrivateKey), peerInfo, transports, identify, + connManager, muxers, secureManagers = secureManagerInstances) diff --git a/libp2p/stream/connection.nim b/libp2p/stream/connection.nim index 95e0e8105..9a8998d18 100644 --- a/libp2p/stream/connection.nim +++ b/libp2p/stream/connection.nim @@ -124,6 +124,11 @@ proc timeoutMonitor(s: Connection) {.async, gcsafe.} = s.activity = false continue + if true: + trace "Ignoring long inactivity" + s.activity = false + continue + break # reset channel on innactivity timeout diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 859a240f6..4a0071aa6 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -68,7 +68,7 @@ type Switch* = ref object of RootObj peerInfo*: PeerInfo - connManager: ConnManager + connManager*: ConnManager transports*: seq[Transport] protocols*: seq[LPProtocol] muxers*: Table[string, MuxerProvider] @@ -156,7 +156,7 @@ proc identify(s: Switch, conn: Connection) {.async, gcsafe.} = trace "identify: identified remote peer", peer = $conn.peerInfo -proc mux(s: Switch, conn: Connection) {.async, gcsafe.} = +proc mux*(s: Switch, conn: Connection) {.async, gcsafe.} = ## mux incoming connection trace "muxing connection", peer = $conn @@ -465,15 +465,27 @@ proc stop*(s: Switch) {.async.} = trace "switch stopped" proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} = + trace "Entering muxer handler" + + # store incoming connection + s.connManager.storeIncoming(muxer.connection) + + # store muxer and muxed connection + s.connManager.storeMuxer(muxer) + var stream = await muxer.newStream() + trace "Got muxer stream" defer: if not(isNil(stream)): await stream.close() try: + trace "Getting identity in muxer" # once we got a muxed connection, attempt to # identify it await s.identify(stream) + trace "Got identity" + if isNil(stream.peerInfo): await muxer.close() return @@ -483,12 +495,6 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} = peerId = peerInfo.peerId muxer.connection.peerInfo = peerInfo - # store incoming connection - s.connManager.storeIncoming(muxer.connection) - - # store muxer and muxed connection - s.connManager.storeMuxer(muxer) - trace "got new muxer", peer = shortLog(peerInfo) muxer.connection.closeEvent.wait() @@ -510,6 +516,7 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} = proc newSwitch*(peerInfo: PeerInfo, transports: seq[Transport], identity: Identify, + connManager: ConnManager, muxers: Table[string, MuxerProvider], secureManagers: openarray[Secure] = []): Switch = if secureManagers.len == 0: @@ -519,7 +526,7 @@ proc newSwitch*(peerInfo: PeerInfo, peerInfo: peerInfo, ms: newMultistream(), transports: transports, - connManager: ConnManager.init(), + connManager: connManager, identity: identity, muxers: muxers, secureManagers: @secureManagers,