Merge branch 'master' into gossip-one-one

This commit is contained in:
Giovanni Petrantoni 2020-08-19 18:41:38 +09:00
commit 3b8e85c792
7 changed files with 133 additions and 95 deletions

View File

@ -31,9 +31,9 @@ type
method subscribeTopic*(f: FloodSub,
topic: string,
subscribe: bool,
peerId: PeerID) {.gcsafe, async.} =
await procCall PubSub(f).subscribeTopic(topic, subscribe, peerId)
let peer = f.peers.getOrDefault(peerId)
peer: PubsubPeer) {.gcsafe.} =
procCall PubSub(f).subscribeTopic(topic, subscribe, peer)
if topic notin f.floodsub:
f.floodsub[topic] = initHashSet[PubSubPeer]()

View File

@ -857,25 +857,20 @@ method unsubscribePeer*(g: GossipSub, peer: PeerID) =
method subscribeTopic*(g: GossipSub,
topic: string,
subscribe: bool,
peerId: PeerID) {.gcsafe, async.} =
await procCall FloodSub(g).subscribeTopic(topic, subscribe, peerId)
peer: PubSubPeer) {.gcsafe.} =
procCall FloodSub(g).subscribeTopic(topic, subscribe, peer)
logScope:
peer = $peerId
peer = $peer.id
topic
let peer = g.peers.getOrDefault(peerId)
if peer == nil:
# floodsub method logs a trace line already
return
g.onNewPeer(peer)
if subscribe:
trace "peer subscribed to topic"
# subscribe remote peer to the topic
discard g.gossipsub.addPeer(topic, peer)
if peerId in g.parameters.directPeers:
if peer.peerId in g.parameters.directPeers:
discard g.explicit.addPeer(topic, peer)
else:
trace "peer unsubscribed from topic"
@ -883,7 +878,7 @@ method subscribeTopic*(g: GossipSub,
g.gossipsub.removePeer(topic, peer)
g.mesh.removePeer(topic, peer)
g.fanout.removePeer(topic, peer)
if peerId in g.parameters.directPeers:
if peer.peerId in g.parameters.directPeers:
g.explicit.removePeer(topic, peer)
when defined(libp2p_expensive_metrics):
@ -898,10 +893,6 @@ method subscribeTopic*(g: GossipSub,
trace "gossip peers", peers = g.gossipsub.peers(topic), topic
# also rebalance current topic if we are subbed to
if topic in g.topics:
await g.rebalanceMesh(topic)
proc handleGraft(g: GossipSub,
peer: PubSubPeer,
grafts: seq[ControlGraft]): seq[ControlPrune] =

View File

@ -61,7 +61,6 @@ type
triggerSelf*: bool # trigger own local handler on publish
verifySignature*: bool # enable signature verification
sign*: bool # enable message signing
cleanupLock: AsyncLock
validators*: Table[string, HashSet[ValidatorHandler]]
observers: ref seq[PubSubObserver] # ref as in smart_ptr
msgIdProvider*: MsgIdProvider # Turn message into message id (not nil)
@ -127,7 +126,7 @@ proc sendSubs*(p: PubSub,
method subscribeTopic*(p: PubSub,
topic: string,
subscribe: bool,
peerId: PeerID) {.base, async.} =
peer: PubSubPeer) {.base.} =
# called when remote peer subscribes to a topic
discard
@ -142,7 +141,7 @@ method rpcHandler*(p: PubSub,
if m.subscriptions.len > 0: # if there are any subscriptions
for s in m.subscriptions: # subscribe/unsubscribe the peer for each topic
trace "about to subscribe to topic", topicId = s.topic
await p.subscribeTopic(s.topic, s.subscribe, peer.peerId)
p.subscribeTopic(s.topic, s.subscribe, peer)
method onNewPeer(p: PubSub, peer: PubSubPeer) {.base.} = discard
@ -354,7 +353,6 @@ proc init*[PubParams: object | bool](
sign: sign,
peers: initTable[PeerID, PubSubPeer](),
topics: initTable[string, Topic](),
cleanupLock: newAsyncLock(),
msgIdProvider: msgIdProvider)
else:
result = P(switch: switch,
@ -364,7 +362,6 @@ proc init*[PubParams: object | bool](
sign: sign,
peers: initTable[PeerID, PubSubPeer](),
topics: initTable[string, Topic](),
cleanupLock: newAsyncLock(),
msgIdProvider: msgIdProvider,
parameters: parameters)
result.initPubSub()

View File

@ -7,7 +7,7 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/[hashes, options, sequtils, strutils, tables, hashes, sets]
import std/[sequtils, strutils, tables, hashes, sets]
import chronos, chronicles, nimcrypto/sha2, metrics
import rpc/[messages, message, protobuf],
timedcache,
@ -46,7 +46,7 @@ type
recvdRpcCache: TimedCache[string] # cache for already received messages
observers*: ref seq[PubSubObserver] # ref as in smart_ptr
subscribed*: bool # are we subscribed to this peer
sendLock*: AsyncLock # send connection lock
dialLock: AsyncLock
score*: float64
iWantBudget*: int
@ -126,11 +126,74 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
debug "exiting pubsub peer read loop"
await conn.close()
if p.sendConn == conn:
p.sendConn = nil
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "Exception occurred in PubSubPeer.handle", exc = exc.msg
proc getSendConn(p: PubSubPeer): Future[Connection] {.async.} =
# get a cached send connection or create a new one
block: # check if there's an existing connection that can be reused
let current = p.sendConn
if not current.isNil:
if not (current.closed() or current.atEof):
# The existing send connection looks like it might work - reuse it
trace "Reusing existing connection", oid = $current.oid
return current
# Send connection is set but broken - get rid of it
p.sendConn = nil
# Careful, p.sendConn might change after here!
await current.close() # TODO this might be unnecessary
try:
# Testing has demonstrated that when we perform concurrent meshsub dials
# and later close one of them, other implementations such as rust-libp2p
# become deaf to our messages (potentially due to the clean-up associated
# with closing connections). To prevent this, we use a lock that ensures
# that only a single dial will be performed for each peer.
#
# Nevertheless, this approach is still quite problematic because the gossip
# sends and their respective dials may be started from the mplex read loop.
# This may cause the read loop to get stuck which ultimately results in a
# deadlock when the other side tries to send us any other message that must
# be routed through mplex (it will be stuck on `pushTo`). Such messages
# naturally arise in the process of dialing itself.
#
# See https://github.com/status-im/nim-libp2p/issues/337
#
# One possible long-term solution is to avoid "blocking" the mplex read
# loop by making the gossip send non-blocking through the use of a queue.
await p.dialLock.acquire()
# Another concurrent dial may have populated p.sendConn
if p.sendConn != nil:
let current = p.sendConn
if not current.isNil:
if not (current.closed() or current.atEof):
# The existing send connection looks like it might work - reuse it
trace "Reusing existing connection", oid = $current.oid
return current
# Grab a new send connection
let newConn = await p.switch.dial(p.peerId, p.codec) # ...and here
if newConn.isNil:
return nil
trace "Caching new send connection", oid = $newConn.oid
p.sendConn = newConn
asyncCheck p.handle(newConn) # start a read loop on the new connection
return newConn
finally:
if p.dialLock.locked:
p.dialLock.release()
proc send*(
p: PubSubPeer,
msg: RPCMsg,
@ -163,29 +226,19 @@ proc send*(
libp2p_pubsub_skipped_sent_messages.inc(labelValues = [p.id])
return
var conn: Connection
try:
trace "about to send message"
if not p.connected:
try:
await p.sendLock.acquire()
trace "no send connection, dialing peer"
# get a send connection if there is none
p.sendConn = await p.switch.dial(
p.peerId, p.codec)
conn = await p.getSendConn()
if not p.connected:
raise newException(CatchableError, "unable to get send pubsub stream")
if conn == nil:
debug "Couldn't get send connection, dropping message"
return
trace "sending encoded msgs to peer", connId = $conn.oid
await conn.writeLp(encoded).wait(timeout)
# install a reader on the send connection
asyncCheck p.handle(p.sendConn)
finally:
if p.sendLock.locked:
p.sendLock.release()
trace "sending encoded msgs to peer"
await p.sendConn.writeLp(encoded).wait(timeout)
p.sentRpcCache.put(digest)
trace "sent pubsub message to remote"
trace "sent pubsub message to remote", connId = $conn.oid
when defined(libp2p_expensive_metrics):
for x in mm.messages:
@ -195,9 +248,10 @@ proc send*(
except CatchableError as exc:
trace "unable to send to remote", exc = exc.msg
if not(isNil(p.sendConn)):
await p.sendConn.close()
p.sendConn = nil
# Next time sendConn is used, it will be have its close flag set and thus
# will be recycled
if not isNil(conn):
await conn.close()
raise exc
@ -213,4 +267,4 @@ proc newPubSubPeer*(peerId: PeerID,
result.peerId = peerId
result.sentRpcCache = newTimedCache[string](2.minutes)
result.recvdRpcCache = newTimedCache[string](2.minutes)
result.sendLock = newAsyncLock()
result.dialLock = newAsyncLock()

View File

@ -43,7 +43,7 @@ logScope:
topics = "bufferstream"
const
DefaultBufferSize* = 1024
DefaultBufferSize* = 102400
const
BufferStreamTrackerName* = "libp2p.bufferstream"

View File

@ -156,7 +156,19 @@ proc identify(s: Switch, conn: Connection) {.async, gcsafe.} =
trace "identify: identified remote peer", peer = $conn.peerInfo
proc mux(s: Switch, conn: Connection) {.async, gcsafe.} =
proc identify(s: Switch, muxer: Muxer) {.async, gcsafe.} =
# new stream for identify
var stream = await muxer.newStream()
defer:
if not(isNil(stream)):
await stream.close() # close identify stream
# do identify first, so that we have a
# PeerInfo in case we didn't before
await s.identify(stream)
proc mux(s: Switch, conn: Connection): Future[Muxer] {.async, gcsafe.} =
## mux incoming connection
trace "muxing connection", peer = $conn
@ -171,37 +183,24 @@ proc mux(s: Switch, conn: Connection) {.async, gcsafe.} =
# create new muxer for connection
let muxer = s.muxers[muxerName].newMuxer(conn)
s.connManager.storeMuxer(muxer)
trace "found a muxer", name = muxerName, peer = $conn
# install stream handler
muxer.streamHandler = s.streamHandler
# new stream for identify
var stream = await muxer.newStream()
s.connManager.storeOutgoing(muxer.connection)
s.connManager.storeMuxer(muxer)
defer:
if not(isNil(stream)):
await stream.close() # close identify stream
trace "found a muxer", name = muxerName, peer = $conn
# call muxer handler, this should
# not end until muxer ends
# start muxer read loop - the future will complete when loop ends
let handlerFut = muxer.handle()
# do identify first, so that we have a
# PeerInfo in case we didn't before
await s.identify(stream)
if isNil(conn.peerInfo):
await muxer.close()
raise newException(CatchableError,
"unable to identify peer, aborting upgrade")
# store it in muxed connections if we have a peer for it
trace "adding muxer for peer", peer = conn.peerInfo.id
s.connManager.storeMuxer(muxer, handlerFut) # update muxer with handler
return muxer
proc disconnect*(s: Switch, peerId: PeerID): Future[void] {.gcsafe.} =
s.connManager.dropPeer(peerId)
@ -215,8 +214,19 @@ proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, g
raise newException(CatchableError,
"unable to secure connection, stopping upgrade")
if sconn.peerInfo.isNil:
raise newException(CatchableError,
"current version of nim-libp2p requires that secure protocol negotiates peerid")
trace "upgrading connection"
await s.mux(sconn) # mux it if possible
let muxer = await s.mux(sconn) # mux it if possible
if muxer == nil:
# TODO this might be relaxed in the future
raise newException(CatchableError,
"a muxer is required for outgoing connections")
await s.identify(muxer)
if isNil(sconn.peerInfo):
await sconn.close()
raise newException(CatchableError,
@ -337,7 +347,6 @@ proc internalConnect(s: Switch,
doAssert not isNil(upgraded), "connection died after upgradeOutgoing"
s.connManager.storeOutgoing(upgraded)
conn = upgraded
trace "dial successful",
oid = $upgraded.oid,
@ -470,32 +479,21 @@ proc stop*(s: Switch) {.async.} =
trace "switch stopped"
proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} =
var stream = await muxer.newStream()
defer:
if not(isNil(stream)):
await stream.close()
if muxer.connection.peerInfo.isNil:
warn "This version of nim-libp2p requires secure protocol to negotiate peerid"
await muxer.close()
return
# store incoming connection
s.connManager.storeIncoming(muxer.connection)
# store muxer and muxed connection
s.connManager.storeMuxer(muxer)
try:
# once we got a muxed connection, attempt to
# identify it
await s.identify(stream)
if isNil(stream.peerInfo):
await muxer.close()
return
let
peerInfo = stream.peerInfo
peerId = peerInfo.peerId
muxer.connection.peerInfo = peerInfo
# store incoming connection
s.connManager.storeIncoming(muxer.connection)
# store muxer and muxed connection
s.connManager.storeMuxer(muxer)
trace "got new muxer", peer = shortLog(peerInfo)
await s.identify(muxer)
let peerId = muxer.connection.peerInfo.peerId
muxer.connection.closeEvent.wait()
.addCallback do(udata: pointer):
asyncCheck s.triggerConnEvent(

View File

@ -15,9 +15,7 @@ import utils,
../../libp2p/[errors,
switch,
stream/connection,
stream/bufferstream,
crypto/crypto,
protocols/pubsub/pubsubpeer,
protocols/pubsub/pubsub,
protocols/pubsub/floodsub,
protocols/pubsub/rpc/messages,