diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 11c4802..de46567 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -72,10 +72,19 @@ method rpcHandler*(f: FloodSub, trace "Dropping already-seen message", msgId, peer continue - if f.verifySignature and not msg.verify(peer.peerId): + if (msg.signature.len > 0 or f.verifySignature) and not msg.verify(): + # always validate if signature is present or required debug "Dropping message due to failed signature verification", msgId, peer continue + if msg.seqno.len > 0 and msg.seqno.len != 8: + # if we have seqno should be 8 bytes long + debug "Dropping message due to invalid seqno length", msgId, peer + continue + + # g.anonymize needs no evaluation when receiving messages + # as we have a "lax" policy and allow signed messages + if not (await f.validate(msg)): trace "Dropping message due to failed validation", msgId, peer continue @@ -129,7 +138,11 @@ method publish*(f: FloodSub, inc f.msgSeqno let - msg = Message.init(f.peerInfo, data, topic, f.msgSeqno, f.sign) + msg = + if f.anonymize: + Message.init(none(PeerInfo), data, topic, none(uint64), false) + else: + Message.init(some(f.peerInfo), data, topic, some(f.msgSeqno), f.sign) msgId = f.msgIdProvider(msg) trace "Created new message", diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 77c335d..af2a40f 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -1046,11 +1046,21 @@ method rpcHandler*(g: GossipSub, g.mcache.put(msgId, msg) - if g.verifySignature and not msg.verify(peer.peerId): + if (msg.signature.len > 0 or g.verifySignature) and not msg.verify(): + # always validate if signature is present or required debug "Dropping message due to failed signature verification", msgId, peer g.punishPeer(peer, msg) continue + if msg.seqno.len > 0 and msg.seqno.len != 8: + # if we have seqno should be 8 bytes long + debug "Dropping message due to invalid seqno length", msgId, peer + g.punishPeer(peer, msg) + continue + + # g.anonymize needs no evaluation when receiving messages + # as we have a "lax" policy and allow signed messages + if not (await g.validate(msg)): debug "Dropping message due to failed validation", msgId, peer g.punishPeer(peer, msg) @@ -1197,7 +1207,11 @@ method publish*(g: GossipSub, inc g.msgSeqno let - msg = Message.init(g.peerInfo, data, topic, g.msgSeqno, g.sign) + msg = + if g.anonymize: + Message.init(none(PeerInfo), data, topic, none(uint64), false) + else: + Message.init(some(g.peerInfo), data, topic, some(g.msgSeqno), g.sign) msgId = g.msgIdProvider(msg) logScope: msgId diff --git a/libp2p/protocols/pubsub/gossipsub10.nim b/libp2p/protocols/pubsub/gossipsub10.nim index a4ac681..4ce0091 100644 --- a/libp2p/protocols/pubsub/gossipsub10.nim +++ b/libp2p/protocols/pubsub/gossipsub10.nim @@ -451,10 +451,19 @@ method rpcHandler*(g: GossipSub, g.mcache.put(msgId, msg) - if g.verifySignature and not msg.verify(peer.peerId): + if (msg.signature.len > 0 or g.verifySignature) and not msg.verify(): + # always validate if signature is present or required debug "Dropping message due to failed signature verification", msgId, peer continue + if msg.seqno.len > 0 and msg.seqno.len != 8: + # if we have seqno should be 8 bytes long + debug "Dropping message due to invalid seqno length", msgId, peer + continue + + # g.anonymize needs no evaluation when receiving messages + # as we have a "lax" policy and allow signed messages + if not (await g.validate(msg)): trace "Dropping message due to failed validation", msgId, peer continue @@ -556,7 +565,11 @@ method publish*(g: GossipSub, inc g.msgSeqno let - msg = Message.init(g.peerInfo, data, topic, g.msgSeqno, g.sign) + msg = + if g.anonymize: + Message.init(none(PeerInfo), data, topic, none(uint64), false) + else: + Message.init(some(g.peerInfo), data, topic, some(g.msgSeqno), g.sign) msgId = g.msgIdProvider(msg) logScope: msgId diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index bd1a2d9..c840c85 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -67,6 +67,7 @@ type observers: ref seq[PubSubObserver] # ref as in smart_ptr msgIdProvider*: MsgIdProvider # Turn message into message id (not nil) msgSeqno*: uint64 + anonymize*: bool # if we omit fromPeer and seqno from RPC messages we send method unsubscribePeer*(p: PubSub, peerId: PeerID) {.base.} = ## handle peer disconnects @@ -329,6 +330,7 @@ proc init*[PubParams: object | bool]( P: typedesc[PubSub], switch: Switch, triggerSelf: bool = false, + anonymize: bool = false, verifySignature: bool = true, sign: bool = true, msgIdProvider: MsgIdProvider = defaultMsgIdProvider, @@ -338,6 +340,7 @@ proc init*[PubParams: object | bool]( P(switch: switch, peerInfo: switch.peerInfo, triggerSelf: triggerSelf, + anonymize: anonymize, verifySignature: verifySignature, sign: sign, peers: initTable[PeerID, PubSubPeer](), @@ -347,6 +350,7 @@ proc init*[PubParams: object | bool]( P(switch: switch, peerInfo: switch.peerInfo, triggerSelf: triggerSelf, + anonymize: anonymize, verifySignature: verifySignature, sign: sign, peers: initTable[PeerID, PubSubPeer](), diff --git a/libp2p/protocols/pubsub/rpc/message.nim b/libp2p/protocols/pubsub/rpc/message.nim index 090bd37..ec9158c 100644 --- a/libp2p/protocols/pubsub/rpc/message.nim +++ b/libp2p/protocols/pubsub/rpc/message.nim @@ -33,7 +33,7 @@ func defaultMsgIdProvider*(m: Message): string = proc sign*(msg: Message, privateKey: PrivateKey): CryptoResult[seq[byte]] = ok((? privateKey.sign(PubSubPrefix & encodeMessage(msg))).getBytes()) -proc verify*(m: Message, p: PeerID): bool = +proc verify*(m: Message): bool = if m.signature.len > 0 and m.key.len > 0: var msg = m msg.signature = @[] @@ -52,20 +52,26 @@ proc verify*(m: Message, p: PeerID): bool = proc init*( T: type Message, - peer: PeerInfo, + peer: Option[PeerInfo], data: seq[byte], topic: string, - seqno: uint64, + seqno: Option[uint64], sign: bool = true): Message {.gcsafe, raises: [CatchableError, Defect].} = - result = Message( - fromPeer: peer.peerId, - data: data, - seqno: @(seqno.toBytesBE), # unefficient, fine for now - topicIDs: @[topic]) + var msg = Message(data: data, topicIDs: @[topic]) - if sign: - if peer.keyType != KeyType.HasPrivate: - raise (ref CatchableError)(msg: "Cannot sign message without private key") + # order matters, we want to include seqno in the signature + if seqno.isSome: + msg.seqno = @(seqno.get().toBytesBE()) - result.signature = sign(result, peer.privateKey).tryGet() - result.key = peer.privateKey.getKey().tryGet().getBytes().tryGet() + if peer.isSome: + let peer = peer.get() + msg.fromPeer = peer.peerId + if sign: + if peer.keyType != KeyType.HasPrivate: + raise (ref CatchableError)(msg: "Cannot sign message without private key") + msg.signature = sign(msg, peer.privateKey).tryGet() + msg.key = peer.privateKey.getKey().tryGet().getBytes().tryGet() + elif sign: + raise (ref CatchableError)(msg: "Cannot sign message without peer info") + + msg diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index 5de2aa7..2cfecb7 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -2,6 +2,7 @@ include ../../libp2p/protocols/pubsub/gossipsub {.used.} +import options import unittest, bearssl import stew/byteutils import ../../libp2p/standard_setup @@ -267,7 +268,7 @@ suite "GossipSub internal": let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo inc seqno - let msg = Message.init(peerInfo, ("HELLO" & $i).toBytes(), topic, seqno, false) + let msg = Message.init(some(peerInfo), ("HELLO" & $i).toBytes(), topic, some(seqno), false) gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg) check gossipSub.fanout[topic].len == 15 @@ -321,7 +322,7 @@ suite "GossipSub internal": let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo inc seqno - let msg = Message.init(peerInfo, ("HELLO" & $i).toBytes(), topic, seqno, false) + let msg = Message.init(some(peerInfo), ("HELLO" & $i).toBytes(), topic, some(seqno), false) gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg) let peers = gossipSub.getGossipPeers() @@ -369,7 +370,7 @@ suite "GossipSub internal": let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo inc seqno - let msg = Message.init(peerInfo, ("HELLO" & $i).toBytes(), topic, seqno, false) + let msg = Message.init(some(peerInfo), ("HELLO" & $i).toBytes(), topic, some(seqno), false) gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg) let peers = gossipSub.getGossipPeers() @@ -417,7 +418,7 @@ suite "GossipSub internal": let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo inc seqno - let msg = Message.init(peerInfo, ("bar" & $i).toBytes(), topic, seqno, false) + let msg = Message.init(some(peerInfo), ("bar" & $i).toBytes(), topic, some(seqno), false) gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg) let peers = gossipSub.getGossipPeers() diff --git a/tests/pubsub/testgossipinternal10.nim b/tests/pubsub/testgossipinternal10.nim index 6d13e5e..88052fb 100644 --- a/tests/pubsub/testgossipinternal10.nim +++ b/tests/pubsub/testgossipinternal10.nim @@ -2,6 +2,7 @@ include ../../libp2p/protocols/pubsub/gossipsub10 {.used.} +import options import unittest, bearssl import stew/byteutils import ../../libp2p/standard_setup @@ -244,7 +245,7 @@ suite "GossipSub internal": let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo inc seqno - let msg = Message.init(peerInfo, ("HELLO" & $i).toBytes(), topic, seqno, false) + let msg = Message.init(some(peerInfo), ("HELLO" & $i).toBytes(), topic, some(seqno), false) gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg) check gossipSub.fanout[topic].len == 15 @@ -296,7 +297,7 @@ suite "GossipSub internal": let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo inc seqno - let msg = Message.init(peerInfo, ("HELLO" & $i).toBytes(), topic, seqno, false) + let msg = Message.init(some(peerInfo), ("HELLO" & $i).toBytes(), topic, some(seqno), false) gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg) let peers = gossipSub.getGossipPeers() @@ -341,7 +342,7 @@ suite "GossipSub internal": let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo inc seqno - let msg = Message.init(peerInfo, ("HELLO" & $i).toBytes(), topic, seqno, false) + let msg = Message.init(some(peerInfo), ("HELLO" & $i).toBytes(), topic, some(seqno), false) gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg) let peers = gossipSub.getGossipPeers() @@ -386,7 +387,7 @@ suite "GossipSub internal": let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo inc seqno - let msg = Message.init(peerInfo, ("bar" & $i).toBytes(), topic, seqno, false) + let msg = Message.init(some(peerInfo), ("bar" & $i).toBytes(), topic, some(seqno), false) gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg) let peers = gossipSub.getGossipPeers() diff --git a/tests/pubsub/testmessage.nim b/tests/pubsub/testmessage.nim index 8a70dd2..de11e8e 100644 --- a/tests/pubsub/testmessage.nim +++ b/tests/pubsub/testmessage.nim @@ -2,6 +2,7 @@ import unittest {.used.} +import options import ../../libp2p/[peerid, peerinfo, crypto/crypto, protocols/pubsub/rpc/message, @@ -14,6 +15,6 @@ suite "Message": var seqno = 11'u64 let peer = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) - msg = Message.init(peer, @[], "topic", seqno, sign = true) + msg = Message.init(some(peer), @[], "topic", some(seqno), sign = true) - check verify(msg, peer.peerId) + check verify(msg)