diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 48dc280..88a2634 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -83,7 +83,7 @@ proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg) = ## trace "sending pubsub message to peer", peer, msg = shortLog(msg) - peer.send(msg) + peer.send(msg, p.anonymize) proc broadcast*( p: PubSub, diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index ade6f44..a3ed506 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -193,7 +193,7 @@ proc sendImpl(conn: Connection, encoded: seq[byte]) {.async.} = await conn.close() # This will clean up the send connection -proc send*(p: PubSubPeer, msg: RPCMsg) = +proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) = doAssert(not isNil(p), "pubsubpeer nil!") let conn = p.sendConn @@ -203,15 +203,20 @@ proc send*(p: PubSubPeer, msg: RPCMsg) = trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg) + # When sending messages, we take care to re-encode them with the right + # anonymization flag to ensure that we're not penalized for sending invalid + # or malicious data on the wire - in particular, re-encoding protects against + # some forms of valid but redundantly encoded protobufs with unknown or + # duplicated fields let encoded = if p.hasObservers(): # trigger send hooks var mm = msg # hooks can modify the message p.sendObservers(mm) - encodeRpcMsg(mm) + encodeRpcMsg(mm, anonymize) else: # If there are no send hooks, we redundantly re-encode the message to # protobuf for every peer - this could easily be improved! - encodeRpcMsg(msg) + encodeRpcMsg(msg, anonymize) if encoded.len <= 0: debug "empty message, skipping", p, msg diff --git a/libp2p/protocols/pubsub/rpc/message.nim b/libp2p/protocols/pubsub/rpc/message.nim index ec9158c..e5e4f28 100644 --- a/libp2p/protocols/pubsub/rpc/message.nim +++ b/libp2p/protocols/pubsub/rpc/message.nim @@ -31,7 +31,7 @@ func defaultMsgIdProvider*(m: Message): string = byteutils.toHex(m.seqno) & $m.fromPeer proc sign*(msg: Message, privateKey: PrivateKey): CryptoResult[seq[byte]] = - ok((? privateKey.sign(PubSubPrefix & encodeMessage(msg))).getBytes()) + ok((? privateKey.sign(PubSubPrefix & encodeMessage(msg, false))).getBytes()) proc verify*(m: Message): bool = if m.signature.len > 0 and m.key.len > 0: @@ -43,7 +43,7 @@ proc verify*(m: Message): bool = var key: PublicKey if remote.init(m.signature) and key.init(m.key): trace "verifying signature", remoteSignature = remote - result = remote.verify(PubSubPrefix & encodeMessage(msg), key) + result = remote.verify(PubSubPrefix & encodeMessage(msg, false), key) if result: libp2p_pubsub_sig_verify_success.inc() @@ -60,7 +60,7 @@ proc init*( var msg = Message(data: data, topicIDs: @[topic]) # order matters, we want to include seqno in the signature - if seqno.isSome: + if seqno.isSome: msg.seqno = @(seqno.get().toBytesBE()) if peer.isSome: diff --git a/libp2p/protocols/pubsub/rpc/protobuf.nim b/libp2p/protocols/pubsub/rpc/protobuf.nim index 7a8c074..6bffc6e 100644 --- a/libp2p/protocols/pubsub/rpc/protobuf.nim +++ b/libp2p/protocols/pubsub/rpc/protobuf.nim @@ -78,24 +78,24 @@ proc write*(pb: var ProtoBuffer, field: int, subs: SubOpts) = ipb.finish() pb.write(field, ipb) -proc encodeMessage*(msg: Message): seq[byte] = +proc encodeMessage*(msg: Message, anonymize: bool): seq[byte] = var pb = initProtoBuffer() - if len(msg.fromPeer) > 0: + if len(msg.fromPeer) > 0 and not anonymize: pb.write(1, msg.fromPeer) pb.write(2, msg.data) - if len(msg.seqno) > 0: + if len(msg.seqno) > 0 and not anonymize: pb.write(3, msg.seqno) for topic in msg.topicIDs: pb.write(4, topic) - if len(msg.signature) > 0: + if len(msg.signature) > 0 and not anonymize: pb.write(5, msg.signature) - if len(msg.key) > 0: + if len(msg.key) > 0 and not anonymize: pb.write(6, msg.key) pb.finish() pb.buffer -proc write*(pb: var ProtoBuffer, field: int, msg: Message) = - pb.write(field, encodeMessage(msg)) +proc write*(pb: var ProtoBuffer, field: int, msg: Message, anonymize: bool) = + pb.write(field, encodeMessage(msg, anonymize)) proc decodeGraft*(pb: ProtoBuffer): ProtoResult[ControlGraft] {. inline.} = @@ -242,13 +242,13 @@ proc decodeMessages*(pb: ProtoBuffer): ProtoResult[seq[Message]] {.inline.} = trace "decodeMessages: no messages found" ok(msgs) -proc encodeRpcMsg*(msg: RPCMsg): seq[byte] = +proc encodeRpcMsg*(msg: RPCMsg, anonymize: bool): seq[byte] = trace "encodeRpcMsg: encoding message", msg = msg.shortLog() var pb = initProtoBuffer() for item in msg.subscriptions: pb.write(1, item) for item in msg.messages: - pb.write(2, item) + pb.write(2, item, anonymize) if msg.control.isSome(): pb.write(3, msg.control.get()) if len(pb.buffer) > 0: