diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index ef0b612..f0f677b 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -95,6 +95,7 @@ proc newStreamInternal*(m: Mplex, result.peerInfo = m.connection.peerInfo result.observedAddr = m.connection.observedAddr + result.transportDir = m.connection.transportDir trace "Creating new channel", m, channel = result, id, initiator, name diff --git a/libp2p/protocols/pubsub/gossipsub/behavior.nim b/libp2p/protocols/pubsub/gossipsub/behavior.nim index d810c19..2e431c9 100644 --- a/libp2p/protocols/pubsub/gossipsub/behavior.nim +++ b/libp2p/protocols/pubsub/gossipsub/behavior.nim @@ -173,7 +173,15 @@ proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) = # add peer backoff if prune.backoff > 0: let - backoff = Moment.fromNow((prune.backoff + BackoffSlackTime).int64.seconds) + # avoid overflows and follow params + # worst case if the remote thinks we are wrong we get penalized + # but we won't end up with ghost peers + backoffSeconds = clamp( + prune.backoff + BackoffSlackTime, + 0'u64, + g.parameters.pruneBackoff.seconds.uint64 + BackoffSlackTime + ) + backoff = Moment.fromNow(backoffSeconds.int64.seconds) current = g.backingOff.getOrDefault(topic).getOrDefault(peer.peerId) if backoff > current: g.backingOff diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index ac13675..f8ee577 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -274,11 +274,21 @@ proc getOrCreatePeer*( proc getConn(): Future[Connection] = p.switch.dial(peer, protos) + proc dropConn(peer: PubSubPeer) = + proc dropConnAsync(peer: PubsubPeer) {.async.} = + try: + await p.switch.disconnect(peer.peerId) + except CancelledError: + raise + except CatchableError as exc: + trace "Failed to close connection", peer, error = exc.name, msg = exc.msg + asyncSpawn dropConnAsync(peer) + proc onEvent(peer: PubsubPeer, event: PubsubPeerEvent) {.gcsafe.} = p.onPubSubPeerEvent(peer, event) # create new pubsub peer - let pubSubPeer = newPubSubPeer(peer, getConn, onEvent, protos[0]) + let pubSubPeer = newPubSubPeer(peer, getConn, dropConn, onEvent, protos[0]) debug "created new pubsub peer", peer p.peers[peer] = pubSubPeer diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 19c9b38..e52bc82 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -41,10 +41,12 @@ type kind*: PubSubPeerEventKind GetConn* = proc(): Future[Connection] {.gcsafe.} + DropConn* = proc(peer: PubsubPeer) {.gcsafe.} # have to pass peer as it's unknown during init OnEvent* = proc(peer: PubSubPeer, event: PubsubPeerEvent) {.gcsafe.} PubSubPeer* = ref object of RootObj getConn*: GetConn # callback to establish a new send connection + dropConn*: DropConn # Function pointer to use to drop connections onEvent*: OnEvent # Connectivity updates for peer codec*: string # the protocol that this peer joined from sendConn*: Connection # cached send connection @@ -83,7 +85,11 @@ proc hasObservers(p: PubSubPeer): bool = p.observers != nil and anyIt(p.observers[], it != nil) func outbound*(p: PubSubPeer): bool = - if p.connected and p.sendConn.dir == Direction.Out: + # gossipsub 1.1 spec requires us to know if the transport is outgoing + # in order to give priotity to connections we make + # https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#outbound-mesh-quotas + # This behaviour is presrcibed to counter sybil attacks and ensures that a coordinated inbound attack can never fully take over the mesh + if not p.sendConn.isNil and p.sendConn.transportDir == Direction.Out: true else: false @@ -169,18 +175,18 @@ proc connectOnce(p: PubSubPeer): Future[void] {.async.} = finally: if p.sendConn != nil: trace "Removing send connection", p, conn = p.sendConn - await p.sendConn.close() - - try: - if p.onEvent != nil: - p.onEvent(p, PubsubPeerEvent(kind: PubSubPeerEventKind.Disconnected)) - except CancelledError as exc: - debug "Errors during diconnection events", error = exc.msg - - # clean up at the end p.sendConn = nil - # don't cleanup p.address else we leak some gossip stat table + + try: + if p.onEvent != nil: + p.onEvent(p, PubsubPeerEvent(kind: PubSubPeerEventKind.Disconnected)) + except CancelledError: + raise + except CatchableError as exc: + debug "Errors during diconnection events", error = exc.msg + + # don't cleanup p.address else we leak some gossip stat table proc connectImpl(p: PubSubPeer) {.async.} = try: @@ -189,9 +195,13 @@ proc connectImpl(p: PubSubPeer) {.async.} = # issue so we try to get a new on while true: await connectOnce(p) - + except CancelledError: + raise except CatchableError as exc: debug "Could not establish send connection", msg = exc.msg + finally: + # drop the connection, else we end up with ghost peers + if p.dropConn != nil: p.dropConn(p) proc connect*(p: PubSubPeer) = asyncSpawn connectImpl(p) @@ -255,10 +265,12 @@ proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) = proc newPubSubPeer*(peerId: PeerID, getConn: GetConn, + dropConn: DropConn, onEvent: OnEvent, codec: string): PubSubPeer = PubSubPeer( getConn: getConn, + dropConn: dropConn, onEvent: onEvent, codec: codec, peerId: peerId, diff --git a/libp2p/protocols/secure/secure.nim b/libp2p/protocols/secure/secure.nim index 1a47b00..0dc6bc1 100644 --- a/libp2p/protocols/secure/secure.nim +++ b/libp2p/protocols/secure/secure.nim @@ -76,6 +76,10 @@ proc handleConn(s: Secure, conn: Connection, initiator: bool): Future[Connection] {.async.} = var sconn = await s.handshake(conn, initiator) + # mark connection bottom level transport direction + # this is the safest place to do this + # we require this information in for example gossipsub + sconn.transportDir = if initiator: Direction.Out else: Direction.In proc cleanup() {.async.} = try: diff --git a/libp2p/stream/connection.nim b/libp2p/stream/connection.nim index 27a287d..ba888c9 100644 --- a/libp2p/stream/connection.nim +++ b/libp2p/stream/connection.nim @@ -33,6 +33,7 @@ type peerInfo*: PeerInfo observedAddr*: Multiaddress upgraded*: Future[void] + transportDir*: Direction # The bottom level transport (generally the socket) direction proc timeoutMonitor(s: Connection) {.async, gcsafe.} diff --git a/libp2p/switch.nim b/libp2p/switch.nim index faf7a15..97ededb 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -136,6 +136,10 @@ proc dialAndUpgrade(s: Switch, # make sure to assign the peer to the connection dialed.peerInfo = PeerInfo.init(peerId, addrs) + # also keep track of the connection's bottom unsafe transport direction + # required by gossipsub scoring + dialed.transportDir = Direction.Out + libp2p_successful_dials.inc() let conn = try: @@ -334,6 +338,11 @@ proc accept(s: Switch, transport: Transport) {.async.} = # noraises upgrades.release() continue + # set the direction of this bottom level transport + # in order to be able to consume this information in gossipsub if required + # gossipsub gives priority to connections we make + conn.transportDir = Direction.In + debug "Accepted an incoming connection", conn asyncSpawn upgradeMonitor(conn, upgrades) asyncSpawn s.upgrade.upgradeIncoming(conn) diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index b5bc5ee..81779d1 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -22,7 +22,10 @@ proc getPubSubPeer(p: TestGossipSub, peerId: PeerID): auto = proc getConn(): Future[Connection] = p.switch.dial(peerId, GossipSubCodec) - newPubSubPeer(peerId, getConn, nil, GossipSubCodec) + proc dropConn(peer: PubSubPeer) = + discard # we don't care about it here yet + + newPubSubPeer(peerId, getConn, dropConn, nil, GossipSubCodec) proc randomPeerInfo(): PeerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) @@ -591,7 +594,7 @@ suite "GossipSub internal": gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() for i in 0..<6: let conn = newBufferStream(noop) - conn.dir = Direction.In + conn.transportDir = Direction.In conns &= conn let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) conn.peerInfo = peerInfo @@ -605,7 +608,7 @@ suite "GossipSub internal": for i in 0..<7: let conn = newBufferStream(noop) - conn.dir = Direction.Out + conn.transportDir = Direction.Out conns &= conn let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) conn.peerInfo = peerInfo @@ -623,7 +626,7 @@ suite "GossipSub internal": check gossipSub.mesh[topic].len > gossipSub.parameters.dLow var outbound = 0 for peer in gossipSub.mesh[topic]: - if peer.sendConn.dir == Direction.Out: + if peer.sendConn.transportDir == Direction.Out: inc outbound # ensure we give priority and keep at least dOut outbound peers check outbound >= gossipSub.parameters.dOut