add msg hash information in logs

This commit is contained in:
Ivan Folgueira Bande 2024-02-28 10:05:29 +01:00
parent e3c967ad19
commit a34522f2ab
No known key found for this signature in database
GPG Key ID: 3C117481F89E24A7
2 changed files with 54 additions and 8 deletions

View File

@ -305,6 +305,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
proc validateAndRelay(g: GossipSub, proc validateAndRelay(g: GossipSub,
msg: Message, msg: Message,
msgId, msgIdSalted: MessageId, msgId, msgIdSalted: MessageId,
msg_hash: string,
peer: PubSubPeer) {.async.} = peer: PubSubPeer) {.async.} =
try: try:
let validation = await g.validate(msg) let validation = await g.validate(msg)
@ -368,7 +369,20 @@ proc validateAndRelay(g: GossipSub,
# In theory, if topics are the same in all messages, we could batch - we'd # In theory, if topics are the same in all messages, we could batch - we'd
# also have to be careful to only include validated messages # also have to be careful to only include validated messages
g.broadcast(toSendPeers, RPCMsg(messages: @[msg])) g.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
trace "forwarded message to peers", peers = toSendPeers.len, msgId, peer
if toSendPeers.len > 0:
info "forwarded message to peers", msg_hash = msg_hash,
msg_id = shortLog(msgId),
sender_peer_id = peer.peerId,
pubsub_topics = msg.topicIds,
num_peers = toSendPeers.len,
target_peer_ids = toSeq(toSendPeers.mapIt(shortLog(it.peerId)))
else:
info "no peers to forward message", msg_hash = msg_hash,
msg_id = shortLog(msgId),
sender_peer_id = peer.peerId,
pubsub_topics = msg.topicIds
for topic in msg.topicIds: for topic in msg.topicIds:
if topic notin g.topics: continue if topic notin g.topics: continue
@ -461,6 +475,18 @@ method rpcHandler*(g: GossipSub,
msgId = msgIdResult.get msgId = msgIdResult.get
msgIdSalted = msgId & g.seenSalt msgIdSalted = msgId & g.seenSalt
if msg.topicIds.len == 0:
debug "Dropping message due to message without topics", msg_id = msgId
continue
let msg_hash = g.msgHashProvider(msg.topicIds[0], msg.data).valueOr:
debug "Dropping message due to failed message hash generation", msg_id = msgId
continue
info "received msg", msg_hash = msg_hash,
msg_id = shortLog(msgId),
sender_peer_id = peer.peerId
# addSeen adds salt to msgId to avoid # addSeen adds salt to msgId to avoid
# remote attacking the hash function # remote attacking the hash function
if g.addSeen(msgId): if g.addSeen(msgId):
@ -509,7 +535,7 @@ method rpcHandler*(g: GossipSub,
# (eg, pop everything you put in it) # (eg, pop everything you put in it)
g.validationSeen[msgIdSalted] = initHashSet[PubSubPeer]() g.validationSeen[msgIdSalted] = initHashSet[PubSubPeer]()
asyncSpawn g.validateAndRelay(msg, msgId, msgIdSalted, peer) asyncSpawn g.validateAndRelay(msg, msgId, msgIdSalted, msg_hash, peer)
if rpcMsg.control.isSome(): if rpcMsg.control.isSome():
g.handleControl(peer, rpcMsg.control.unsafeGet()) g.handleControl(peer, rpcMsg.control.unsafeGet())
@ -630,13 +656,20 @@ method publish*(g: GossipSub,
else: else:
inc g.msgSeqno inc g.msgSeqno
Message.init(some(g.peerInfo), data, topic, some(g.msgSeqno), g.sign) Message.init(some(g.peerInfo), data, topic, some(g.msgSeqno), g.sign)
msgId = g.msgIdProvider(msg).valueOr: msg_id = g.msgIdProvider(msg).valueOr:
trace "Error generating message id, skipping publish", trace "Error generating message id, skipping publish",
error = error error = error
libp2p_gossipsub_failed_publish.inc() libp2p_gossipsub_failed_publish.inc()
return 0 return 0
msg_hash = g.msgHashProvider(topic, data).valueOr:
trace "Error generating message hash, skipping publish",
error = error
libp2p_gossipsub_failed_publish.inc()
return 0
logScope: msgId = shortLog(msgId) logScope:
msg_id = shortLog(msg_id)
msg_hash = msg_hash
trace "Created new message", msg = shortLog(msg), peers = peers.len trace "Created new message", msg = shortLog(msg), peers = peers.len
@ -647,6 +680,9 @@ method publish*(g: GossipSub,
g.mcache.put(msgId, msg) g.mcache.put(msgId, msg)
info "publish message to peers", num_peers = peers.len,
target_peer_ids = toSeq(peers.mapIt(shortLog(it.peerId)))
g.broadcast(peers, RPCMsg(messages: @[msg])) g.broadcast(peers, RPCMsg(messages: @[msg]))
if g.knownTopics.contains(topic): if g.knownTopics.contains(topic):

View File

@ -84,7 +84,7 @@ type
InitializationError* = object of LPError InitializationError* = object of LPError
TopicHandler* {.public.} = proc(topic: string, TopicHandler* {.public.} = proc(topic: string,
data: seq[byte]): Future[void] {.gcsafe, raises: [].} data: seq[byte]): Future[void] {.gcsafe, raises: [].}
ValidatorHandler* {.public.} = proc(topic: string, ValidatorHandler* {.public.} = proc(topic: string,
message: Message): Future[ValidationResult] {.gcsafe, raises: [].} message: Message): Future[ValidationResult] {.gcsafe, raises: [].}
@ -100,6 +100,12 @@ type
## we have to store it, which may be an attack vector. ## we have to store it, which may be an attack vector.
## This callback can be used to reject topic we're not interested in ## This callback can be used to reject topic we're not interested in
MsgHashProvider* {.public.} =
proc(topic: string, messageData: seq[byte]):
Result[string, string] {.noSideEffect, raises: [], gcsafe.}
## Computes the message hash based on its topic and message data, and then returns it
## in hex format
PubSub* {.public.} = ref object of LPProtocol PubSub* {.public.} = ref object of LPProtocol
switch*: Switch # the switch used to dial/connect to peers switch*: Switch # the switch used to dial/connect to peers
peerInfo*: PeerInfo # this peer's info peerInfo*: PeerInfo # this peer's info
@ -128,6 +134,7 @@ type
rng*: ref HmacDrbgContext rng*: ref HmacDrbgContext
knownTopics*: HashSet[string] knownTopics*: HashSet[string]
msgHashProvider*: MsgHashProvider
method unsubscribePeer*(p: PubSub, peerId: PeerId) {.base, gcsafe.} = method unsubscribePeer*(p: PubSub, peerId: PeerId) {.base, gcsafe.} =
## handle peer disconnects ## handle peer disconnects
@ -555,7 +562,8 @@ proc init*[PubParams: object | bool](
subscriptionValidator: SubscriptionValidator = nil, subscriptionValidator: SubscriptionValidator = nil,
maxMessageSize: int = 1024 * 1024, maxMessageSize: int = 1024 * 1024,
rng: ref HmacDrbgContext = newRng(), rng: ref HmacDrbgContext = newRng(),
parameters: PubParams = false): P parameters: PubParams = false,
msgHashProvider: MsgHashProvider = nil): P
{.raises: [InitializationError], public.} = {.raises: [InitializationError], public.} =
let pubsub = let pubsub =
when PubParams is bool: when PubParams is bool:
@ -569,7 +577,8 @@ proc init*[PubParams: object | bool](
subscriptionValidator: subscriptionValidator, subscriptionValidator: subscriptionValidator,
maxMessageSize: maxMessageSize, maxMessageSize: maxMessageSize,
rng: rng, rng: rng,
topicsHigh: int.high) topicsHigh: int.high,
msgHashProvider: msgHashProvider)
else: else:
P(switch: switch, P(switch: switch,
peerInfo: switch.peerInfo, peerInfo: switch.peerInfo,
@ -582,7 +591,8 @@ proc init*[PubParams: object | bool](
parameters: parameters, parameters: parameters,
maxMessageSize: maxMessageSize, maxMessageSize: maxMessageSize,
rng: rng, rng: rng,
topicsHigh: int.high) topicsHigh: int.high,
msgHashProvider: msgHashProvider)
proc peerEventHandler(peerId: PeerId, event: PeerEvent) {.async.} = proc peerEventHandler(peerId: PeerId, event: PeerEvent) {.async.} =
if event.kind == PeerEventKind.Joined: if event.kind == PeerEventKind.Joined: