Gossipsub wip (#502)

* Remove unused connections in pubsubpeer, also removed wrong usages, add a disconnect bad peers parameter

* handle exceptions in disconnectPeer

* small fix

* use the proper disconnection procedure for gossip peers

* fixes, more metrics add test about disconnection

* hot fix possible null pointers in switch

* silly isnil sugar

* Fix and test gossip directPeer connections
This commit is contained in:
Giovanni Petrantoni 2021-01-15 13:48:03 +09:00 committed by GitHub
parent 3878a95b23
commit 240ec84ffb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 176 additions and 36 deletions

View File

@ -21,7 +21,7 @@ import ./pubsub,
../../peerinfo, ../../peerinfo,
../../peerid, ../../peerid,
../../utility, ../../utility,
../../crypto/curve25519 ../../switch
import stew/results import stew/results
export results export results
@ -147,7 +147,9 @@ type
behaviourPenaltyWeight*: float64 behaviourPenaltyWeight*: float64
behaviourPenaltyDecay*: float64 behaviourPenaltyDecay*: float64
directPeers*: seq[PeerId] directPeers*: Table[PeerId, seq[MultiAddress]]
disconnectBadPeers*: bool
GossipSub* = ref object of FloodSub GossipSub* = ref object of FloodSub
mesh*: PeerTable # peers that we send messages to when we are subscribed to the topic mesh*: PeerTable # peers that we send messages to when we are subscribed to the topic
@ -197,9 +199,8 @@ declareGauge(libp2p_gossipsub_peers_per_topic_gossipsub,
declareCounter(libp2p_gossipsub_failed_publish, "number of failed publish") declareCounter(libp2p_gossipsub_failed_publish, "number of failed publish")
declareGauge(libp2p_gossipsub_cache_window_size, "the number of messages in the cache") declareGauge(libp2p_gossipsub_cache_window_size, "the number of messages in the cache")
when defined(libp2p_agents_metrics): declareGauge(libp2p_gossipsub_peers_scores, "the scores of the peers in gossipsub", labels = ["agent"])
declareGauge(libp2p_gossipsub_peers_scores, "the scores of the peers in gossipsub", labels = ["agent"]) declareCounter(libp2p_gossipsub_bad_score_disconnection, "the number of peers disconnected by gossipsub", labels = ["agent"])
declareGauge(libp2p_gossipsub_under_dlow_topics, "number of topics below dlow") declareGauge(libp2p_gossipsub_under_dlow_topics, "number of topics below dlow")
declareGauge(libp2p_gossipsub_under_dout_topics, "number of topics below dout") declareGauge(libp2p_gossipsub_under_dout_topics, "number of topics below dout")
declareGauge(libp2p_gossipsub_under_dhigh_above_dlow_topics, "number of topics below dhigh but above dlow") declareGauge(libp2p_gossipsub_under_dhigh_above_dlow_topics, "number of topics below dhigh but above dlow")
@ -236,6 +237,7 @@ proc init*(_: type[GossipSubParams]): GossipSubParams =
ipColocationFactorThreshold: 1.0, ipColocationFactorThreshold: 1.0,
behaviourPenaltyWeight: -1.0, behaviourPenaltyWeight: -1.0,
behaviourPenaltyDecay: 0.999, behaviourPenaltyDecay: 0.999,
disconnectBadPeers: false
) )
proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] = proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] =
@ -714,24 +716,56 @@ func `/`(a, b: Duration): float64 =
fb = float64(b.nanoseconds) fb = float64(b.nanoseconds)
fa / fb fa / fb
proc disconnectPeer(g: GossipSub, peer: PubSubPeer) {.async.} =
when defined(libp2p_agents_metrics):
let agent =
block:
if peer.shortAgent.len > 0:
peer.shortAgent
else:
if peer.sendConn != nil:
let shortAgent = peer.sendConn.peerInfo.agentVersion.split("/")[0].toLowerAscii()
if KnownLibP2PAgentsSeq.contains(shortAgent):
peer.shortAgent = shortAgent
else:
peer.shortAgent = "unknown"
peer.shortAgent
else:
"unknown"
libp2p_gossipsub_bad_score_disconnection.inc(labelValues = [agent])
else:
libp2p_gossipsub_bad_score_disconnection.inc(labelValues = ["unknown"])
if peer.sendConn != nil:
try:
await g.switch.disconnect(peer.peerId)
except CancelledError:
raise
except CatchableError as exc:
trace "Failed to close connection", peer, error = exc.name, msg = exc.msg
proc colocationFactor(g: GossipSub, peer: PubSubPeer): float64 = proc colocationFactor(g: GossipSub, peer: PubSubPeer): float64 =
if peer.connections.len == 0: if peer.sendConn == nil:
trace "colocationFactor, no connections", peer trace "colocationFactor, no connection", peer
0.0 0.0
else: else:
let let
address = peer.connections[0].observedAddr address = peer.sendConn.observedAddr
ipPeers = g.peersInIP.getOrDefault(address)
g.peersInIP.mgetOrPut(address, initHashSet[PubSubPeer]()).incl(peer)
if address notin g.peersInIP:
g.peersInIP[address] = initHashSet[PubSubPeer]()
g.peersInIP[address].incl(peer)
let
ipPeers = g.peersInIP[address]
len = ipPeers.len.float64 len = ipPeers.len.float64
if len > g.parameters.ipColocationFactorThreshold: if len > g.parameters.ipColocationFactorThreshold:
trace "colocationFactor over threshold", peer, address, len trace "colocationFactor over threshold", peer, address, len
let over = len - g.parameters.ipColocationFactorThreshold let over = len - g.parameters.ipColocationFactorThreshold
over * over over * over
else: else:
# lazy update peersInIP
if address notin g.peersInIP:
g.peersInIP[address] = initHashSet[PubSubPeer]()
g.peersInIP[address].incl(peer)
0.0 0.0
proc updateScores(g: GossipSub) = # avoid async proc updateScores(g: GossipSub) = # avoid async
@ -837,18 +871,18 @@ proc updateScores(g: GossipSub) = # avoid async
assert(g.peerStats[peer.peerId].score == peer.score) # nim sanity check assert(g.peerStats[peer.peerId].score == peer.score) # nim sanity check
trace "updated peer's score", peer, score = peer.score, n_topics, is_grafted trace "updated peer's score", peer, score = peer.score, n_topics, is_grafted
if g.parameters.disconnectBadPeers and stats.score < g.parameters.graylistThreshold:
debug "disconnecting bad score peer", peer, score = peer.score
asyncSpawn g.disconnectPeer(peer)
when defined(libp2p_agents_metrics): when defined(libp2p_agents_metrics):
let agent = let agent =
block: block:
if peer.shortAgent.len > 0: if peer.shortAgent.len > 0:
peer.shortAgent peer.shortAgent
else: else:
let connections = peer.connections.filterIt( if peer.sendConn != nil:
not isNil(it.peerInfo) and let shortAgent = peer.sendConn.peerInfo.agentVersion.split("/")[0].toLowerAscii()
it.peerInfo.agentVersion.len > 0
)
if connections.len > 0:
let shortAgent = connections[0].peerInfo.agentVersion.split("/")[0].toLowerAscii()
if KnownLibP2PAgentsSeq.contains(shortAgent): if KnownLibP2PAgentsSeq.contains(shortAgent):
peer.shortAgent = shortAgent peer.shortAgent = shortAgent
else: else:
@ -857,6 +891,8 @@ proc updateScores(g: GossipSub) = # avoid async
else: else:
"unknown" "unknown"
libp2p_gossipsub_peers_scores.inc(peer.score, labelValues = [agent]) libp2p_gossipsub_peers_scores.inc(peer.score, labelValues = [agent])
else:
libp2p_gossipsub_peers_scores.inc(peer.score, labelValues = ["unknown"])
for peer in evicting: for peer in evicting:
g.peerStats.del(peer) g.peerStats.del(peer)
@ -954,9 +990,11 @@ method unsubscribePeer*(g: GossipSub, peer: PeerID) =
return return
# remove from peer IPs collection too # remove from peer IPs collection too
if pubSubPeer.connections.len > 0: if pubSubPeer.sendConn != nil:
g.peersInIP.withValue(pubSubPeer.connections[0].observedAddr, s): g.peersInIP.withValue(pubSubPeer.sendConn.observedAddr, s):
s[].excl(pubSubPeer) s[].excl(pubSubPeer)
if s[].len == 0:
g.peersInIP.del(pubSubPeer.sendConn.observedAddr)
for t in toSeq(g.gossipsub.keys): for t in toSeq(g.gossipsub.keys):
g.gossipsub.removePeer(t, pubSubPeer) g.gossipsub.removePeer(t, pubSubPeer)
@ -1472,13 +1510,20 @@ method publish*(g: GossipSub,
proc maintainDirectPeers(g: GossipSub) {.async.} = proc maintainDirectPeers(g: GossipSub) {.async.} =
while g.heartbeatRunning: while g.heartbeatRunning:
for id in g.parameters.directPeers: for id, addrs in g.parameters.directPeers:
let peer = g.peers.getOrDefault(id) let peer = g.peers.getOrDefault(id)
if peer == nil: if isNil(peer):
# this creates a new peer and assigns the current switch to it trace "Attempting to dial a direct peer", peer = id
# as a result the next time we try to Send we will as well try to open a connection try:
# see pubsubpeer.nim send and such # dial, internally connection will be stored
discard g.getOrCreatePeer(id, g.codecs) let _ = await g.switch.dial(id, addrs, g.codecs)
# populate the peer after it's connected
discard g.getOrCreatePeer(id, g.codecs)
except CancelledError:
trace "Direct peer dial canceled"
raise
except CatchableError as exc:
debug "Direct peer error dialing", msg = exc.msg
await sleepAsync(1.minutes) await sleepAsync(1.minutes)

View File

@ -48,7 +48,6 @@ type
onEvent*: OnEvent # Connectivity updates for peer onEvent*: OnEvent # Connectivity updates for peer
codec*: string # the protocol that this peer joined from codec*: string # the protocol that this peer joined from
sendConn*: Connection # cached send connection sendConn*: Connection # cached send connection
connections*: seq[Connection] # connections to this peer
peerId*: PeerID peerId*: PeerID
handler*: RPCHandler handler*: RPCHandler
observers*: ref seq[PubSubObserver] # ref as in smart_ptr observers*: ref seq[PubSubObserver] # ref as in smart_ptr
@ -59,7 +58,7 @@ type
outbound*: bool # if this is an outbound connection outbound*: bool # if this is an outbound connection
appScore*: float64 # application specific score appScore*: float64 # application specific score
behaviourPenalty*: float64 # the eventual penalty score behaviourPenalty*: float64 # the eventual penalty score
when defined(libp2p_agents_metrics): when defined(libp2p_agents_metrics):
shortAgent*: string shortAgent*: string

View File

@ -57,7 +57,7 @@ type
Switch* = ref object of RootObj Switch* = ref object of RootObj
peerInfo*: PeerInfo peerInfo*: PeerInfo
connManager: ConnManager connManager*: ConnManager
transports*: seq[Transport] transports*: seq[Transport]
protocols*: seq[LPProtocol] protocols*: seq[LPProtocol]
muxers*: Table[string, MuxerProvider] muxers*: Table[string, MuxerProvider]
@ -244,7 +244,9 @@ proc upgradeIncoming(s: Switch, incomingConn: Connection) {.async, gcsafe.} = #
await ms.handle(cconn) await ms.handle(cconn)
except CatchableError as exc: except CatchableError as exc:
debug "Exception in secure handler during incoming upgrade", msg = exc.msg, conn debug "Exception in secure handler during incoming upgrade", msg = exc.msg, conn
if not cconn.upgraded.finished: if not isNil(cconn) and
not isNil(cconn.upgraded) and
not(cconn.upgraded.finished):
cconn.upgraded.fail(exc) cconn.upgraded.fail(exc)
finally: finally:
if not isNil(cconn): if not isNil(cconn):
@ -263,10 +265,13 @@ proc upgradeIncoming(s: Switch, incomingConn: Connection) {.async, gcsafe.} = #
await ms.handle(incomingConn, active = true) await ms.handle(incomingConn, active = true)
except CatchableError as exc: except CatchableError as exc:
debug "Exception upgrading incoming", exc = exc.msg debug "Exception upgrading incoming", exc = exc.msg
if not incomingConn.upgraded.finished: if not isNil(incomingConn) and
not isNil(incomingConn.upgraded) and
not(incomingConn.upgraded.finished):
incomingConn.upgraded.fail(exc) incomingConn.upgraded.fail(exc)
finally: finally:
await incomingConn.close() if not isNil(incomingConn):
await incomingConn.close()
proc dialAndUpgrade(s: Switch, proc dialAndUpgrade(s: Switch,
peerId: PeerID, peerId: PeerID,

View File

@ -9,6 +9,7 @@ import ../../libp2p/standard_setup
import ../../libp2p/errors import ../../libp2p/errors
import ../../libp2p/crypto/crypto import ../../libp2p/crypto/crypto
import ../../libp2p/stream/bufferstream import ../../libp2p/stream/bufferstream
import ../../libp2p/switch
import ../helpers import ../helpers
@ -422,9 +423,6 @@ suite "GossipSub internal":
check false check false
let topic = "foobar" let topic = "foobar"
# gossipSub.topicParams[topic] = TopicParams.init()
# gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
# gossipSub.fanout[topic] = initHashSet[PubSubPeer]()
var conns = newSeq[Connection]() var conns = newSeq[Connection]()
for i in 0..<30: for i in 0..<30:
let conn = newBufferStream(noop) let conn = newBufferStream(noop)
@ -451,3 +449,39 @@ suite "GossipSub internal":
await allFuturesThrowing(conns.mapIt(it.close())) await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop() await gossipSub.switch.stop()
asyncTest "Disconnect bad peers":
let gossipSub = TestGossipSub.init(newStandardSwitch())
gossipSub.parameters.disconnectBadPeers = true
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
check false
let topic = "foobar"
var conns = newSeq[Connection]()
for i in 0..<30:
let conn = newBufferStream(noop)
conns &= conn
let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo
let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
gossipSub.onNewPeer(peer)
peer.sendConn = conn
peer.handler = handler
peer.score = gossipSub.parameters.graylistThreshold - 1
gossipSub.gossipsub.mgetOrPut(topic, initHashSet[PubSubPeer]()).incl(peer)
gossipSub.peers[peerInfo.peerId] = peer
gossipSub.switch.connManager.storeIncoming(conn)
gossipSub.updateScores()
await sleepAsync(100.millis)
check:
# test our disconnect mechanics
gossipSub.gossipsub.peers(topic) == 0
# also ensure we cleanup properly the peersInIP table
gossipSub.peersInIP.len == 0
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()

View File

@ -716,3 +716,60 @@ suite "GossipSub":
) )
await allFuturesThrowing(nodesFut.concat()) await allFuturesThrowing(nodesFut.concat())
asyncTest "GossipSub test directPeers":
var handlerFut = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar"
handlerFut.complete(true)
let
nodes = generateNodes(2, gossip = true)
# start switches
nodesFut = await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start(),
)
var gossip = GossipSub(nodes[0])
gossip.parameters.directPeers[nodes[1].switch.peerInfo.peerId] = nodes[1].switch.peerInfo.addrs
# start pubsub
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
))
let invalidDetected = newFuture[void]()
gossip.subscriptionValidator =
proc(topic: string): bool =
if topic == "foobar":
try:
invalidDetected.complete()
except:
raise newException(Defect, "Exception during subscriptionValidator")
false
else:
true
# DO NOT SUBSCRIBE, CONNECTION SHOULD HAPPEN
### await subscribeNodes(nodes)
nodes[0].subscribe("foobar", handler)
nodes[1].subscribe("foobar", handler)
await invalidDetected.wait(10.seconds)
await allFuturesThrowing(
nodes[0].switch.stop(),
nodes[1].switch.stop()
)
await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop()
)
await allFuturesThrowing(nodesFut.concat())