diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index baea4d264..819161c0c 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -15,7 +15,7 @@ import ./pubsub, ./pubsubpeer, ./timedcache, ./peertable, - ./rpc/[message, messages], + ./rpc/[message, messages, protobuf], ../../crypto/crypto, ../../stream/connection, ../../peerid, @@ -95,7 +95,16 @@ method unsubscribePeer*(f: FloodSub, peer: PeerId) = method rpcHandler*(f: FloodSub, peer: PubSubPeer, - rpcMsg: RPCMsg) {.async.} = + data: seq[byte]) {.async.} = + + var rpcMsg = decodeRpcMsg(data).valueOr: + debug "failed to decode msg from peer", peer, err = error + raise newException(CatchableError, "") + + trace "decoded msg from peer", peer, msg = rpcMsg.shortLog + # trigger hooks + peer.recvObservers(rpcMsg) + for i in 0.. 0: g.send(peer, RPCMsg(pong: rpcMsg.ping)) peer.pingBudget.dec @@ -445,14 +495,14 @@ method rpcHandler*(g: GossipSub, # always validate if signature is present or required debug "Dropping message due to failed signature verification", msgId = shortLog(msgId), peer - g.punishInvalidMessage(peer, msg.topicIds) + g.punishInvalidMessage(peer, msg) continue if msg.seqno.len > 0 and msg.seqno.len != 8: # if we have seqno should be 8 bytes long debug "Dropping message due to invalid seqno length", msgId = shortLog(msgId), peer - g.punishInvalidMessage(peer, msg.topicIds) + g.punishInvalidMessage(peer, msg) continue # g.anonymize needs no evaluation when receiving messages @@ -676,3 +726,13 @@ method initPubSub*(g: GossipSub) # init gossip stuff g.mcache = MCache.init(g.parameters.historyGossip, g.parameters.historyLength) + +method getOrCreatePeer*( + g: GossipSub, + peerId: PeerId, + protos: seq[string]): PubSubPeer = + + let peer = procCall PubSub(g).getOrCreatePeer(peerId, protos) + g.parameters.overheadRateLimit.withValue(overheadRateLimit): + peer.overheadRateLimitOpt = Opt.some(TokenBucket.new(overheadRateLimit.bytes, overheadRateLimit.interval)) + return peer diff --git a/libp2p/protocols/pubsub/gossipsub/scoring.nim b/libp2p/protocols/pubsub/gossipsub/scoring.nim index 3785e37d1..f1c1e923f 100644 --- a/libp2p/protocols/pubsub/gossipsub/scoring.nim +++ b/libp2p/protocols/pubsub/gossipsub/scoring.nim @@ -11,9 +11,12 @@ import std/[tables, sets] import chronos, chronicles, metrics +import chronos/ratelimit import "."/[types] import ".."/[pubsubpeer] +import ../rpc/messages import "../../.."/[peerid, multiaddress, switch, utils/heartbeat] +import ../pubsub logScope: topics = "libp2p gossipsub" @@ -27,6 +30,7 @@ declareGauge(libp2p_gossipsub_peers_score_invalidMessageDeliveries, "Detailed go declareGauge(libp2p_gossipsub_peers_score_appScore, "Detailed gossipsub scoring metric", labels = ["agent"]) declareGauge(libp2p_gossipsub_peers_score_behaviourPenalty, "Detailed gossipsub scoring metric", labels = ["agent"]) declareGauge(libp2p_gossipsub_peers_score_colocationFactor, "Detailed gossipsub scoring metric", labels = ["agent"]) +declarePublicCounter(libp2p_gossipsub_peers_rate_limit_disconnections, "The number of peer disconnections by gossipsub because of rate limit", labels = ["agent"]) proc init*(_: type[TopicParams]): TopicParams = TopicParams( @@ -85,27 +89,18 @@ proc colocationFactor(g: GossipSub, peer: PubSubPeer): float64 = {.pop.} -proc disconnectPeer(g: GossipSub, peer: PubSubPeer) {.async.} = - let agent = - when defined(libp2p_agents_metrics): - if peer.shortAgent.len > 0: - peer.shortAgent - else: - "unknown" - else: - "unknown" - libp2p_gossipsub_bad_score_disconnection.inc(labelValues = [agent]) - +proc disconnectPeer*(g: GossipSub, peer: PubSubPeer) {.async.} = try: await g.switch.disconnect(peer.peerId) except CatchableError as exc: # Never cancelled trace "Failed to close connection", peer, error = exc.name, msg = exc.msg -proc disconnectBadPeerCheck*(g: GossipSub, peer: PubSubPeer, score: float64) = +proc disconnectIfBadScorePeer*(g: GossipSub, peer: PubSubPeer, score: float64) = if g.parameters.disconnectBadPeers and score < g.parameters.graylistThreshold and peer.peerId notin g.parameters.directPeers: debug "disconnecting bad score peer", peer, score = peer.score asyncSpawn(g.disconnectPeer(peer)) + libp2p_gossipsub_bad_score_disconnection.inc(labelValues = [peer.getAgent()]) proc updateScores*(g: GossipSub) = # avoid async ## https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#the-score-function @@ -175,14 +170,7 @@ proc updateScores*(g: GossipSub) = # avoid async score += topicScore * topicParams.topicWeight # Score metrics - let agent = - when defined(libp2p_agents_metrics): - if peer.shortAgent.len > 0: - peer.shortAgent - else: - "unknown" - else: - "unknown" + let agent = peer.getAgent() libp2p_gossipsub_peers_score_firstMessageDeliveries.inc(info.firstMessageDeliveries, labelValues = [agent]) libp2p_gossipsub_peers_score_meshMessageDeliveries.inc(info.meshMessageDeliveries, labelValues = [agent]) libp2p_gossipsub_peers_score_meshFailurePenalty.inc(info.meshFailurePenalty, labelValues = [agent]) @@ -219,14 +207,7 @@ proc updateScores*(g: GossipSub) = # avoid async score += colocationFactor * g.parameters.ipColocationFactorWeight # Score metrics - let agent = - when defined(libp2p_agents_metrics): - if peer.shortAgent.len > 0: - peer.shortAgent - else: - "unknown" - else: - "unknown" + let agent = peer.getAgent() libp2p_gossipsub_peers_score_appScore.inc(peer.appScore, labelValues = [agent]) libp2p_gossipsub_peers_score_behaviourPenalty.inc(peer.behaviourPenalty, labelValues = [agent]) libp2p_gossipsub_peers_score_colocationFactor.inc(colocationFactor, labelValues = [agent]) @@ -246,8 +227,7 @@ proc updateScores*(g: GossipSub) = # avoid async trace "updated peer's score", peer, score = peer.score, n_topics, is_grafted - g.disconnectBadPeerCheck(peer, stats.score) - + g.disconnectIfBadScorePeer(peer, stats.score) libp2p_gossipsub_peers_scores.inc(peer.score, labelValues = [agent]) for peer in evicting: @@ -260,8 +240,18 @@ proc scoringHeartbeat*(g: GossipSub) {.async.} = trace "running scoring heartbeat", instance = cast[int](g) g.updateScores() -proc punishInvalidMessage*(g: GossipSub, peer: PubSubPeer, topics: seq[string]) = - for tt in topics: +proc punishInvalidMessage*(g: GossipSub, peer: PubSubPeer, msg: Message) = + let uselessAppBytesNum = msg.data.len + peer.overheadRateLimitOpt.withValue(overheadRateLimit): + if not overheadRateLimit.tryConsume(uselessAppBytesNum): + debug "Peer sent invalid message and it's above rate limit", peer, uselessAppBytesNum + libp2p_gossipsub_peers_rate_limit_disconnections.inc(labelValues = [peer.getAgent()]) # let's just measure at the beginning for test purposes. + # discard g.disconnectPeer(peer) + # debug "Peer disconnected", peer, uselessAppBytesNum + # raise newException(PeerRateLimitError, "Peer sent invalid message and it's above rate limit") + + + for tt in msg.topicIds: let t = tt if t notin g.topics: continue diff --git a/libp2p/protocols/pubsub/gossipsub/types.nim b/libp2p/protocols/pubsub/gossipsub/types.nim index ca79b290e..635fc9152 100644 --- a/libp2p/protocols/pubsub/gossipsub/types.nim +++ b/libp2p/protocols/pubsub/gossipsub/types.nim @@ -145,6 +145,8 @@ type bandwidthEstimatebps*: int # This is currently used only for limting flood publishing. 0 disables flood-limiting completely iwantTimeout*: Duration + overheadRateLimit*: Opt[tuple[bytes: int, interval: Duration]] + BackoffTable* = Table[string, Table[PeerId, Moment]] ValidationSeenTable* = Table[MessageId, HashSet[PubSubPeer]] diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 02436a9b4..ef4d68002 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -17,6 +17,7 @@ import std/[tables, sequtils, sets, strutils] import chronos, chronicles, metrics +import chronos/ratelimit import ./errors as pubsub_errors, ./pubsubpeer, ./rpc/[message, messages, protobuf], @@ -263,7 +264,7 @@ proc updateMetrics*(p: PubSub, rpcMsg: RPCMsg) = method rpcHandler*(p: PubSub, peer: PubSubPeer, - rpcMsg: RPCMsg): Future[void] {.base, async.} = + data: seq[byte]): Future[void] {.base, async.} = ## Handler that must be overridden by concrete implementation raiseAssert "Unimplemented" @@ -278,10 +279,11 @@ method onPubSubPeerEvent*(p: PubSub, peer: PubSubPeer, event: PubSubPeerEvent) { of PubSubPeerEventKind.Disconnected: discard -proc getOrCreatePeer*( +method getOrCreatePeer*( p: PubSub, peerId: PeerId, - protos: seq[string]): PubSubPeer = + protos: seq[string]): PubSubPeer {.base, gcsafe.} = + p.peers.withValue(peerId, peer): return peer[] @@ -354,9 +356,9 @@ method handleConn*(p: PubSub, ## that we're interested in ## - proc handler(peer: PubSubPeer, msg: RPCMsg): Future[void] = + proc handler(peer: PubSubPeer, data: seq[byte]): Future[void] = # call pubsub rpc handler - p.rpcHandler(peer, msg) + p.rpcHandler(peer, data) let peer = p.getOrCreatePeer(conn.peerId, @[proto]) diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 1dcd28286..464131ddf 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -12,6 +12,7 @@ import std/[sequtils, strutils, tables, hashes, options, sets, deques] import stew/results import chronos, chronicles, nimcrypto/sha2, metrics +import chronos/ratelimit import rpc/[messages, message, protobuf], ../../peerid, ../../peerinfo, @@ -32,6 +33,8 @@ when defined(libp2p_expensive_metrics): declareCounter(libp2p_pubsub_skipped_sent_messages, "number of sent skipped messages", labels = ["id"]) type + PeerRateLimitError* = object of CatchableError + PubSubObserver* = ref object onRecv*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [].} onSend*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [].} @@ -66,8 +69,9 @@ type maxMessageSize: int appScore*: float64 # application specific score behaviourPenalty*: float64 # the eventual penalty score + overheadRateLimitOpt*: Opt[TokenBucket] - RPCHandler* = proc(peer: PubSubPeer, msg: RPCMsg): Future[void] + RPCHandler* = proc(peer: PubSubPeer, data: seq[byte]): Future[void] {.gcsafe, raises: [].} when defined(libp2p_agents_metrics): @@ -107,7 +111,7 @@ func outbound*(p: PubSubPeer): bool = else: false -proc recvObservers(p: PubSubPeer, msg: var RPCMsg) = +proc recvObservers*(p: PubSubPeer, msg: var RPCMsg) = # trigger hooks if not(isNil(p.observers)) and p.observers[].len > 0: for obs in p.observers[]: @@ -134,26 +138,19 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = conn, peer = p, closed = conn.closed, data = data.shortLog - var rmsg = decodeRpcMsg(data).valueOr: - debug "failed to decode msg from peer", - conn, peer = p, closed = conn.closed, - err = error - break - data = newSeq[byte]() # Release memory - - trace "decoded msg from peer", - conn, peer = p, closed = conn.closed, - msg = rmsg.shortLog - # trigger hooks - p.recvObservers(rmsg) - when defined(libp2p_expensive_metrics): for m in rmsg.messages: for t in m.topicIDs: # metrics libp2p_pubsub_received_messages.inc(labelValues = [$p.peerId, t]) - await p.handler(p, rmsg) + await p.handler(p, data) + data = newSeq[byte]() # Release memory + except PeerRateLimitError as exc: + debug "Peer rate limit exceeded, exiting read while", conn, peer = p, error = exc.msg + except CatchableError as exc: + debug "Exception occurred in PubSubPeer.handle", + conn, peer = p, closed = conn.closed, exc = exc.msg finally: await conn.close() except CancelledError: @@ -307,7 +304,8 @@ proc new*( getConn: GetConn, onEvent: OnEvent, codec: string, - maxMessageSize: int): T = + maxMessageSize: int, + overheadRateLimitOpt: Opt[TokenBucket] = Opt.none(TokenBucket)): T = result = T( getConn: getConn, @@ -315,7 +313,18 @@ proc new*( codec: codec, peerId: peerId, connectedFut: newFuture[void](), - maxMessageSize: maxMessageSize + maxMessageSize: maxMessageSize, + overheadRateLimitOpt: overheadRateLimitOpt ) result.sentIHaves.addFirst(default(HashSet[MessageId])) result.heDontWants.addFirst(default(HashSet[MessageId])) + +proc getAgent*(peer: PubSubPeer): string = + return + when defined(libp2p_agents_metrics): + if peer.shortAgent.len > 0: + peer.shortAgent + else: + "unknown" + else: + "unknown" diff --git a/libp2p/protocols/pubsub/rpc/messages.nim b/libp2p/protocols/pubsub/rpc/messages.nim index ce6dd318b..d4cbf85da 100644 --- a/libp2p/protocols/pubsub/rpc/messages.nim +++ b/libp2p/protocols/pubsub/rpc/messages.nim @@ -116,3 +116,32 @@ func shortLog*(m: RPCMsg): auto = messages: mapIt(m.messages, it.shortLog), control: m.control.get(ControlMessage()).shortLog ) + +proc byteSize*(msg: Message): int = + var total = 0 + total += msg.fromPeer.len + total += msg.data.len + total += msg.seqno.len + total += msg.signature.len + total += msg.key.len + for topicId in msg.topicIds: + total += topicId.len + return total + +proc byteSize*(msgs: seq[Message]): int = + msgs.mapIt(byteSize(it)).foldl(a + b, 0) + +proc byteSize*(ihave: seq[ControlIHave]): int = + var total = 0 + for item in ihave: + total += item.topicId.len + for msgId in item.messageIds: + total += msgId.len + return total + +proc byteSize*(iwant: seq[ControlIWant]): int = + var total = 0 + for item in iwant: + for msgId in item.messageIds: + total += msgId.len + return total diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index 909e1d613..a764c8e52 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -10,6 +10,7 @@ import ../../libp2p/crypto/crypto import ../../libp2p/stream/bufferstream import ../../libp2p/switch import ../../libp2p/muxers/muxer +import ../../libp2p/protocols/pubsub/rpc/protobuf import ../helpers @@ -22,7 +23,7 @@ proc getPubSubPeer(p: TestGossipSub, peerId: PeerId): PubSubPeer = proc getConn(): Future[Connection] = p.switch.dial(peerId, GossipSubCodec) - let pubSubPeer = PubSubPeer.new(peerId, getConn, nil, GossipSubCodec, 1024 * 1024) + let pubSubPeer = PubSubPeer.new(peerId, getConn, nil, GossipSubCodec, 1024 * 1024, Opt.some(TokenBucket.new(1024, 500.milliseconds))) debug "created new pubsub peer", peerId p.peers[peerId] = pubSubPeer @@ -170,7 +171,7 @@ suite "GossipSub internal": asyncTest "`replenishFanout` Degree Lo": let gossipSub = TestGossipSub.init(newStandardSwitch()) - proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = + proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} = discard let topic = "foobar" @@ -197,7 +198,7 @@ suite "GossipSub internal": asyncTest "`dropFanoutPeers` drop expired fanout topics": let gossipSub = TestGossipSub.init(newStandardSwitch()) - proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = + proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} = discard let topic = "foobar" @@ -227,7 +228,7 @@ suite "GossipSub internal": asyncTest "`dropFanoutPeers` leave unexpired fanout topics": let gossipSub = TestGossipSub.init(newStandardSwitch()) - proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = + proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} = discard let topic1 = "foobar1" @@ -264,7 +265,7 @@ suite "GossipSub internal": asyncTest "`getGossipPeers` - should gather up to degree D non intersecting peers": let gossipSub = TestGossipSub.init(newStandardSwitch()) - proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = + proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} = discard let topic = "foobar" @@ -325,7 +326,7 @@ suite "GossipSub internal": asyncTest "`getGossipPeers` - should not crash on missing topics in mesh": let gossipSub = TestGossipSub.init(newStandardSwitch()) - proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = + proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} = discard let topic = "foobar" @@ -365,7 +366,7 @@ suite "GossipSub internal": asyncTest "`getGossipPeers` - should not crash on missing topics in fanout": let gossipSub = TestGossipSub.init(newStandardSwitch()) - proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = + proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} = discard let topic = "foobar" @@ -406,7 +407,7 @@ suite "GossipSub internal": asyncTest "`getGossipPeers` - should not crash on missing topics in gossip": let gossipSub = TestGossipSub.init(newStandardSwitch()) - proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = + proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} = discard let topic = "foobar" @@ -447,7 +448,7 @@ suite "GossipSub internal": asyncTest "Drop messages of topics without subscription": let gossipSub = TestGossipSub.init(newStandardSwitch()) - proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = + proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} = check false let topic = "foobar" @@ -470,7 +471,7 @@ suite "GossipSub internal": let peer = gossipSub.getPubSubPeer(peerId) inc seqno let msg = Message.init(peerId, ("bar" & $i).toBytes(), topic, some(seqno)) - await gossipSub.rpcHandler(peer, RPCMsg(messages: @[msg])) + await gossipSub.rpcHandler(peer, encodeRpcMsg(RPCMsg(messages: @[msg]), false)) check gossipSub.mcache.msgs.len == 0 @@ -481,7 +482,7 @@ suite "GossipSub internal": let gossipSub = TestGossipSub.init(newStandardSwitch()) gossipSub.parameters.disconnectBadPeers = true gossipSub.parameters.appSpecificWeight = 1.0 - proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = + proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} = check false let topic = "foobar" @@ -525,7 +526,7 @@ suite "GossipSub internal": conn.peerId = peerId let peer = gossipSub.getPubSubPeer(peerId) - await gossipSub.rpcHandler(peer, lotOfSubs) + await gossipSub.rpcHandler(peer, encodeRpcMsg(lotOfSubs, false)) check: gossipSub.gossipsub.len == gossipSub.topicsHigh @@ -656,7 +657,7 @@ suite "GossipSub internal": asyncTest "handleIHave/Iwant tests": let gossipSub = TestGossipSub.init(newStandardSwitch()) - proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = + proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} = check false proc handler2(topic: string, data: seq[byte]) {.async.} = discard @@ -733,7 +734,7 @@ suite "GossipSub internal": var iwantCount = 0 - proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = + proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} = check false proc handler2(topic: string, data: seq[byte]) {.async.} = discard @@ -779,7 +780,7 @@ suite "GossipSub internal": data: actualMessageData )] ) - await gossipSub.rpcHandler(firstPeer, rpcMsg) + await gossipSub.rpcHandler(firstPeer, encodeRpcMsg(rpcMsg, false)) check: not gossipSub.outstandingIWANTs.contains(ihaveMessageId.toBytes()) @@ -792,7 +793,7 @@ suite "GossipSub internal": gossipSub.parameters.iwantTimeout = 10.milliseconds await gossipSub.start() - proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = discard + proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} = discard proc handler2(topic: string, data: seq[byte]) {.async.} = discard let topic = "foobar" diff --git a/tests/pubsub/testmessage.nim b/tests/pubsub/testmessage.nim index 7bc4b267a..f4f062fa3 100644 --- a/tests/pubsub/testmessage.nim +++ b/tests/pubsub/testmessage.nim @@ -73,3 +73,15 @@ suite "Message": check: msgIdResult.isErr msgIdResult.error == ValidationResult.Reject + + test "byteSize for Message": + var msg = Message( + fromPeer: PeerId(data: @[]), # Empty seq[byte] + data: @[1'u8, 2, 3], # 3 bytes + seqno: @[1'u8], # 1 byte + signature: @[], # Empty seq[byte] + key: @[1'u8], # 1 byte + topicIds: @["abc", "defgh"] # 3 + 5 = 8 bytes + ) + + check byteSize(msg) == 3 + 1 + 1 + 8 # Total: 13 bytes \ No newline at end of file