Don't drop connection due to inactivity; Fix a race during gossip initialization

This commit is contained in:
Zahary Karadjov 2020-08-13 18:08:49 +03:00
parent d1f1e1b31e
commit fb3e6e3c69
No known key found for this signature in database
GPG Key ID: C8936F8A3073D609
9 changed files with 74 additions and 25 deletions

View File

@ -153,10 +153,13 @@ proc selectMuxer*(c: ConnManager, conn: Connection): Muxer =
##
if isNil(conn):
trace "No connection in selectMuxer"
return
if conn in c.muxed:
return c.muxed[conn].muxer
else:
trace "No entry in c.muxed"
proc storeConn*(c: ConnManager, conn: Connection) =
## store a connection
@ -221,6 +224,8 @@ proc getMuxedStream*(c: ConnManager,
let muxer = c.selectMuxer(c.selectConn(peerId, dir))
if not(isNil(muxer)):
return await muxer.newStream()
else:
trace "No muxer, no stream", peerId
proc getMuxedStream*(c: ConnManager,
peerId: PeerID): Future[Connection] {.async, gcsafe.} =
@ -230,6 +235,8 @@ proc getMuxedStream*(c: ConnManager,
let muxer = c.selectMuxer(c.selectConn(peerId))
if not(isNil(muxer)):
return await muxer.newStream()
else:
trace "No muxer, no stream", peerId
proc getMuxedStream*(c: ConnManager,
conn: Connection): Future[Connection] {.async, gcsafe.} =

View File

@ -28,12 +28,14 @@ type
# user provider proc that returns a constructed Muxer
MuxerConstructor* = proc(conn: Connection): Muxer {.gcsafe, closure.}
MuxerRegistrator* = proc(muxer: Muxer) {.gcsafe, closure.}
# this wraps a creator proc that knows how to make muxers
MuxerProvider* = ref object of LPProtocol
newMuxer*: MuxerConstructor
streamHandler*: StreamHandler # triggered every time there is a new stream, called for any muxer instance
muxerHandler*: MuxerHandler # triggered every time there is a new muxed connection created
registrator*: MuxerRegistrator
# muxer interface
method newStream*(m: Muxer, name: string = "", lazy: bool = false):
@ -41,10 +43,13 @@ method newStream*(m: Muxer, name: string = "", lazy: bool = false):
method close*(m: Muxer) {.base, async, gcsafe.} = discard
method handle*(m: Muxer): Future[void] {.base, async, gcsafe.} = discard
proc newMuxerProvider*(creator: MuxerConstructor, codec: string): MuxerProvider {.gcsafe.} =
proc newMuxerProvider*(creator: MuxerConstructor,
codec: string,
registrator: MuxerRegistrator): MuxerProvider {.gcsafe.} =
new result
result.newMuxer = creator
result.codec = codec
result.registrator = registrator
result.init()
method init(c: MuxerProvider) =
@ -65,6 +70,7 @@ method init(c: MuxerProvider) =
futs &= c.muxerHandler(muxer)
checkFutures(await allFinished(futs))
except CancelledError as exc:
raise exc
except CatchableError as exc:

View File

@ -16,7 +16,8 @@ import pubsub,
rpc/[messages, message],
../../stream/connection,
../../peerid,
../../peerinfo
../../peerinfo,
../../connmanager
logScope:
topics = "floodsub"
@ -115,6 +116,8 @@ method init*(f: FloodSub) =
## e.g. ``/floodsub/1.0.0``, etc...
##
# trace "Incoming FloodSub connection"
# f.switch.connManager.storeIncoming(conn)
await f.handleConn(conn, proto)
f.handler = handler

View File

@ -21,7 +21,9 @@ import pubsub,
../../stream/connection,
../../peerid,
../../errors,
../../utility
../../utility,
../../connmanager,
../../switch
logScope:
topics = "gossipsub"
@ -77,6 +79,11 @@ method init*(g: GossipSub) =
## e.g. ``/floodsub/1.0.0``, etc...
##
# trace "Incoming Gossip connection"
# let muxer = g.switch.connManager.selectMuxer(conn)
# if muxer.isNil:
# await g.switch.mux(conn)
# g.switch.connManager.storeIncoming(conn)
await g.handleConn(conn, proto)
g.handler = handler
@ -537,14 +544,18 @@ method publish*(g: GossipSub,
if msgId notin g.mcache:
g.mcache.put(msgId, msg)
let published = await g.broadcast(toSeq(peers), RPCMsg(messages: @[msg]), timeout)
when defined(libp2p_expensive_metrics):
if published > 0:
libp2p_pubsub_messages_published.inc(labelValues = [topic])
if peers.len > 0:
let published = await g.broadcast(toSeq(peers), RPCMsg(messages: @[msg]), timeout)
when defined(libp2p_expensive_metrics):
if published > 0:
libp2p_pubsub_messages_published.inc(labelValues = [topic])
trace "published message to peers", peers = published,
msg = msg.shortLog()
return published
debug "published message to peers", peers = published,
msg = msg.shortLog()
return published
else:
debug "No peers for gossip message", topic, msg
return 0
method start*(g: GossipSub) {.async.} =
trace "gossipsub start"

View File

@ -16,7 +16,8 @@ import pubsubpeer,
../../stream/connection,
../../peerid,
../../peerinfo,
../../errors
../../errors,
../../connmanager
export PubSubPeer
export PubSubObserver
@ -171,6 +172,9 @@ method handleConn*(p: PubSub,
await conn.close()
return
# trace "Incoming PubSub connection"
# p.switch.connManager.storeIncoming(conn)
proc handler(peer: PubSubPeer, msgs: seq[RPCMsg]) {.async.} =
# call pubsub rpc handler
await p.rpcHandler(peer, msgs)

View File

@ -159,7 +159,7 @@ proc send*(
if not p.connected:
try:
await p.sendLock.acquire()
trace "no send connection, dialing peer"
trace "no send connection, dialing peer", codec = p.codec
# get a send connection if there is none
p.sendConn = await p.switch.dial(
p.peerId, p.codec)
@ -176,7 +176,7 @@ proc send*(
trace "sending encoded msgs to peer"
await p.sendConn.writeLp(encoded).wait(timeout)
p.sentRpcCache.put(digest)
trace "sent pubsub message to remote"
debug "sent pubsub message to remote"
when defined(libp2p_expensive_metrics):
for x in mm.messages:
@ -185,7 +185,7 @@ proc send*(
libp2p_pubsub_sent_messages.inc(labelValues = [p.id, t])
except CatchableError as exc:
trace "unable to send to remote", exc = exc.msg
debug "unable to send to remote", exc = exc.msg
if not(isNil(p.sendConn)):
await p.sendConn.close()
p.sendConn = nil

View File

@ -1,6 +1,6 @@
import
options, tables, chronos, bearssl,
switch, peerid, peerinfo, stream/connection, multiaddress,
switch, peerid, peerinfo, stream/connection, multiaddress, connmanager,
crypto/crypto, transports/[transport, tcptransport],
muxers/[muxer, mplex/mplex, mplex/types],
protocols/[identify, secure/secure]
@ -37,10 +37,15 @@ proc newStandardSwitch*(privKey = none(PrivateKey),
if rng == nil: # newRng could fail
raise (ref CatchableError)(msg: "Cannot initialize RNG")
let connManager = ConnManager.init()
proc registerMuxer(muxer: Muxer) {.gcsafe.} =
connManager.storeMuxer(muxer)
let
seckey = privKey.get(otherwise = PrivateKey.random(rng[]).tryGet())
peerInfo = PeerInfo.init(seckey, [address])
mplexProvider = newMuxerProvider(createMplex, MplexCodec)
mplexProvider = newMuxerProvider(createMplex, MplexCodec, registerMuxer)
transports = @[Transport(TcpTransport.init(transportFlags))]
muxers = {MplexCodec: mplexProvider}.toTable
identify = newIdentify(peerInfo)
@ -58,6 +63,7 @@ proc newStandardSwitch*(privKey = none(PrivateKey),
peerInfo,
transports,
identify,
connManager,
muxers,
secureManagers = secureManagerInstances)

View File

@ -124,6 +124,11 @@ proc timeoutMonitor(s: Connection) {.async, gcsafe.} =
s.activity = false
continue
if true:
trace "Ignoring long inactivity"
s.activity = false
continue
break
# reset channel on innactivity timeout

View File

@ -68,7 +68,7 @@ type
Switch* = ref object of RootObj
peerInfo*: PeerInfo
connManager: ConnManager
connManager*: ConnManager
transports*: seq[Transport]
protocols*: seq[LPProtocol]
muxers*: Table[string, MuxerProvider]
@ -156,7 +156,7 @@ proc identify(s: Switch, conn: Connection) {.async, gcsafe.} =
trace "identify: identified remote peer", peer = $conn.peerInfo
proc mux(s: Switch, conn: Connection) {.async, gcsafe.} =
proc mux*(s: Switch, conn: Connection) {.async, gcsafe.} =
## mux incoming connection
trace "muxing connection", peer = $conn
@ -465,15 +465,27 @@ proc stop*(s: Switch) {.async.} =
trace "switch stopped"
proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} =
trace "Entering muxer handler"
# store incoming connection
s.connManager.storeIncoming(muxer.connection)
# store muxer and muxed connection
s.connManager.storeMuxer(muxer)
var stream = await muxer.newStream()
trace "Got muxer stream"
defer:
if not(isNil(stream)):
await stream.close()
try:
trace "Getting identity in muxer"
# once we got a muxed connection, attempt to
# identify it
await s.identify(stream)
trace "Got identity"
if isNil(stream.peerInfo):
await muxer.close()
return
@ -483,12 +495,6 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} =
peerId = peerInfo.peerId
muxer.connection.peerInfo = peerInfo
# store incoming connection
s.connManager.storeIncoming(muxer.connection)
# store muxer and muxed connection
s.connManager.storeMuxer(muxer)
trace "got new muxer", peer = shortLog(peerInfo)
muxer.connection.closeEvent.wait()
@ -510,6 +516,7 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} =
proc newSwitch*(peerInfo: PeerInfo,
transports: seq[Transport],
identity: Identify,
connManager: ConnManager,
muxers: Table[string, MuxerProvider],
secureManagers: openarray[Secure] = []): Switch =
if secureManagers.len == 0:
@ -519,7 +526,7 @@ proc newSwitch*(peerInfo: PeerInfo,
peerInfo: peerInfo,
ms: newMultistream(),
transports: transports,
connManager: ConnManager.init(),
connManager: connManager,
identity: identity,
muxers: muxers,
secureManagers: @secureManagers,