reencode gossipsub messages with anonymization (#378)
This helps protect against clients sending more data than they should and thus getting penalized on topics that require anonymity
This commit is contained in:
parent
17e00e642a
commit
8ecef46738
|
@ -83,7 +83,7 @@ proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg) =
|
|||
##
|
||||
|
||||
trace "sending pubsub message to peer", peer, msg = shortLog(msg)
|
||||
peer.send(msg)
|
||||
peer.send(msg, p.anonymize)
|
||||
|
||||
proc broadcast*(
|
||||
p: PubSub,
|
||||
|
|
|
@ -193,7 +193,7 @@ proc sendImpl(conn: Connection, encoded: seq[byte]) {.async.} =
|
|||
|
||||
await conn.close() # This will clean up the send connection
|
||||
|
||||
proc send*(p: PubSubPeer, msg: RPCMsg) =
|
||||
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) =
|
||||
doAssert(not isNil(p), "pubsubpeer nil!")
|
||||
|
||||
let conn = p.sendConn
|
||||
|
@ -203,15 +203,20 @@ proc send*(p: PubSubPeer, msg: RPCMsg) =
|
|||
|
||||
trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg)
|
||||
|
||||
# When sending messages, we take care to re-encode them with the right
|
||||
# anonymization flag to ensure that we're not penalized for sending invalid
|
||||
# or malicious data on the wire - in particular, re-encoding protects against
|
||||
# some forms of valid but redundantly encoded protobufs with unknown or
|
||||
# duplicated fields
|
||||
let encoded = if p.hasObservers():
|
||||
# trigger send hooks
|
||||
var mm = msg # hooks can modify the message
|
||||
p.sendObservers(mm)
|
||||
encodeRpcMsg(mm)
|
||||
encodeRpcMsg(mm, anonymize)
|
||||
else:
|
||||
# If there are no send hooks, we redundantly re-encode the message to
|
||||
# protobuf for every peer - this could easily be improved!
|
||||
encodeRpcMsg(msg)
|
||||
encodeRpcMsg(msg, anonymize)
|
||||
|
||||
if encoded.len <= 0:
|
||||
debug "empty message, skipping", p, msg
|
||||
|
|
|
@ -31,7 +31,7 @@ func defaultMsgIdProvider*(m: Message): string =
|
|||
byteutils.toHex(m.seqno) & $m.fromPeer
|
||||
|
||||
proc sign*(msg: Message, privateKey: PrivateKey): CryptoResult[seq[byte]] =
|
||||
ok((? privateKey.sign(PubSubPrefix & encodeMessage(msg))).getBytes())
|
||||
ok((? privateKey.sign(PubSubPrefix & encodeMessage(msg, false))).getBytes())
|
||||
|
||||
proc verify*(m: Message): bool =
|
||||
if m.signature.len > 0 and m.key.len > 0:
|
||||
|
@ -43,7 +43,7 @@ proc verify*(m: Message): bool =
|
|||
var key: PublicKey
|
||||
if remote.init(m.signature) and key.init(m.key):
|
||||
trace "verifying signature", remoteSignature = remote
|
||||
result = remote.verify(PubSubPrefix & encodeMessage(msg), key)
|
||||
result = remote.verify(PubSubPrefix & encodeMessage(msg, false), key)
|
||||
|
||||
if result:
|
||||
libp2p_pubsub_sig_verify_success.inc()
|
||||
|
@ -60,7 +60,7 @@ proc init*(
|
|||
var msg = Message(data: data, topicIDs: @[topic])
|
||||
|
||||
# order matters, we want to include seqno in the signature
|
||||
if seqno.isSome:
|
||||
if seqno.isSome:
|
||||
msg.seqno = @(seqno.get().toBytesBE())
|
||||
|
||||
if peer.isSome:
|
||||
|
|
|
@ -78,24 +78,24 @@ proc write*(pb: var ProtoBuffer, field: int, subs: SubOpts) =
|
|||
ipb.finish()
|
||||
pb.write(field, ipb)
|
||||
|
||||
proc encodeMessage*(msg: Message): seq[byte] =
|
||||
proc encodeMessage*(msg: Message, anonymize: bool): seq[byte] =
|
||||
var pb = initProtoBuffer()
|
||||
if len(msg.fromPeer) > 0:
|
||||
if len(msg.fromPeer) > 0 and not anonymize:
|
||||
pb.write(1, msg.fromPeer)
|
||||
pb.write(2, msg.data)
|
||||
if len(msg.seqno) > 0:
|
||||
if len(msg.seqno) > 0 and not anonymize:
|
||||
pb.write(3, msg.seqno)
|
||||
for topic in msg.topicIDs:
|
||||
pb.write(4, topic)
|
||||
if len(msg.signature) > 0:
|
||||
if len(msg.signature) > 0 and not anonymize:
|
||||
pb.write(5, msg.signature)
|
||||
if len(msg.key) > 0:
|
||||
if len(msg.key) > 0 and not anonymize:
|
||||
pb.write(6, msg.key)
|
||||
pb.finish()
|
||||
pb.buffer
|
||||
|
||||
proc write*(pb: var ProtoBuffer, field: int, msg: Message) =
|
||||
pb.write(field, encodeMessage(msg))
|
||||
proc write*(pb: var ProtoBuffer, field: int, msg: Message, anonymize: bool) =
|
||||
pb.write(field, encodeMessage(msg, anonymize))
|
||||
|
||||
proc decodeGraft*(pb: ProtoBuffer): ProtoResult[ControlGraft] {.
|
||||
inline.} =
|
||||
|
@ -242,13 +242,13 @@ proc decodeMessages*(pb: ProtoBuffer): ProtoResult[seq[Message]] {.inline.} =
|
|||
trace "decodeMessages: no messages found"
|
||||
ok(msgs)
|
||||
|
||||
proc encodeRpcMsg*(msg: RPCMsg): seq[byte] =
|
||||
proc encodeRpcMsg*(msg: RPCMsg, anonymize: bool): seq[byte] =
|
||||
trace "encodeRpcMsg: encoding message", msg = msg.shortLog()
|
||||
var pb = initProtoBuffer()
|
||||
for item in msg.subscriptions:
|
||||
pb.write(1, item)
|
||||
for item in msg.messages:
|
||||
pb.write(2, item)
|
||||
pb.write(2, item, anonymize)
|
||||
if msg.control.isSome():
|
||||
pb.write(3, msg.control.get())
|
||||
if len(pb.buffer) > 0:
|
||||
|
|
Loading…
Reference in New Issue