diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index d37b9266e..e889fc096 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -305,6 +305,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) = proc validateAndRelay(g: GossipSub, msg: Message, msgId, msgIdSalted: MessageId, + msg_hash: string, peer: PubSubPeer) {.async.} = try: let validation = await g.validate(msg) @@ -368,7 +369,20 @@ proc validateAndRelay(g: GossipSub, # In theory, if topics are the same in all messages, we could batch - we'd # also have to be careful to only include validated messages g.broadcast(toSendPeers, RPCMsg(messages: @[msg])) - trace "forwarded message to peers", peers = toSendPeers.len, msgId, peer + + if toSendPeers.len > 0: + info "forwarded message to peers", msg_hash = msg_hash, + msg_id = shortLog(msgId), + sender_peer_id = peer.peerId, + pubsub_topics = msg.topicIds, + num_peers = toSendPeers.len, + target_peer_ids = toSeq(toSendPeers.mapIt(shortLog(it.peerId))) + else: + info "no peers to forward message", msg_hash = msg_hash, + msg_id = shortLog(msgId), + sender_peer_id = peer.peerId, + pubsub_topics = msg.topicIds + for topic in msg.topicIds: if topic notin g.topics: continue @@ -461,6 +475,18 @@ method rpcHandler*(g: GossipSub, msgId = msgIdResult.get msgIdSalted = msgId & g.seenSalt + if msg.topicIds.len == 0: + debug "Dropping message due to message without topics", msg_id = msgId + continue + + let msg_hash = g.msgHashProvider(msg.topicIds[0], msg.data).valueOr: + debug "Dropping message due to failed message hash generation", msg_id = msgId + continue + + info "received msg", msg_hash = msg_hash, + msg_id = shortLog(msgId), + sender_peer_id = peer.peerId + # addSeen adds salt to msgId to avoid # remote attacking the hash function if g.addSeen(msgId): @@ -509,7 +535,7 @@ method rpcHandler*(g: GossipSub, # (eg, pop everything you put in it) g.validationSeen[msgIdSalted] = initHashSet[PubSubPeer]() - asyncSpawn g.validateAndRelay(msg, msgId, msgIdSalted, peer) + asyncSpawn g.validateAndRelay(msg, msgId, msgIdSalted, msg_hash, peer) if rpcMsg.control.isSome(): g.handleControl(peer, rpcMsg.control.unsafeGet()) @@ -630,13 +656,20 @@ method publish*(g: GossipSub, else: inc g.msgSeqno Message.init(some(g.peerInfo), data, topic, some(g.msgSeqno), g.sign) - msgId = g.msgIdProvider(msg).valueOr: + msg_id = g.msgIdProvider(msg).valueOr: trace "Error generating message id, skipping publish", error = error libp2p_gossipsub_failed_publish.inc() return 0 + msg_hash = g.msgHashProvider(topic, data).valueOr: + trace "Error generating message hash, skipping publish", + error = error + libp2p_gossipsub_failed_publish.inc() + return 0 - logScope: msgId = shortLog(msgId) + logScope: + msg_id = shortLog(msg_id) + msg_hash = msg_hash trace "Created new message", msg = shortLog(msg), peers = peers.len @@ -647,6 +680,9 @@ method publish*(g: GossipSub, g.mcache.put(msgId, msg) + info "publish message to peers", num_peers = peers.len, + target_peer_ids = toSeq(peers.mapIt(shortLog(it.peerId))) + g.broadcast(peers, RPCMsg(messages: @[msg])) if g.knownTopics.contains(topic): diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index ef4d68002..0bfeef014 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -84,7 +84,7 @@ type InitializationError* = object of LPError TopicHandler* {.public.} = proc(topic: string, - data: seq[byte]): Future[void] {.gcsafe, raises: [].} + data: seq[byte]): Future[void] {.gcsafe, raises: [].} ValidatorHandler* {.public.} = proc(topic: string, message: Message): Future[ValidationResult] {.gcsafe, raises: [].} @@ -100,6 +100,12 @@ type ## we have to store it, which may be an attack vector. ## This callback can be used to reject topic we're not interested in + MsgHashProvider* {.public.} = + proc(topic: string, messageData: seq[byte]): + Result[string, string] {.noSideEffect, raises: [], gcsafe.} + ## Computes the message hash based on its topic and message data, and then returns it + ## in hex format + PubSub* {.public.} = ref object of LPProtocol switch*: Switch # the switch used to dial/connect to peers peerInfo*: PeerInfo # this peer's info @@ -128,6 +134,7 @@ type rng*: ref HmacDrbgContext knownTopics*: HashSet[string] + msgHashProvider*: MsgHashProvider method unsubscribePeer*(p: PubSub, peerId: PeerId) {.base, gcsafe.} = ## handle peer disconnects @@ -555,7 +562,8 @@ proc init*[PubParams: object | bool]( subscriptionValidator: SubscriptionValidator = nil, maxMessageSize: int = 1024 * 1024, rng: ref HmacDrbgContext = newRng(), - parameters: PubParams = false): P + parameters: PubParams = false, + msgHashProvider: MsgHashProvider = nil): P {.raises: [InitializationError], public.} = let pubsub = when PubParams is bool: @@ -569,7 +577,8 @@ proc init*[PubParams: object | bool]( subscriptionValidator: subscriptionValidator, maxMessageSize: maxMessageSize, rng: rng, - topicsHigh: int.high) + topicsHigh: int.high, + msgHashProvider: msgHashProvider) else: P(switch: switch, peerInfo: switch.peerInfo, @@ -582,7 +591,8 @@ proc init*[PubParams: object | bool]( parameters: parameters, maxMessageSize: maxMessageSize, rng: rng, - topicsHigh: int.high) + topicsHigh: int.high, + msgHashProvider: msgHashProvider) proc peerEventHandler(peerId: PeerId, event: PeerEvent) {.async.} = if event.kind == PeerEventKind.Joined: