diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index a0fa44c64..3d8f70bc4 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -136,7 +136,8 @@ method publish*(f: FloodSub, return trace "publishing on topic", name = topic - let msg = Message.init(f.peerInfo, data, topic, f.sign) + inc f.msgSeqno + let msg = Message.init(f.peerInfo, data, topic, f.msgSeqno, f.sign) # start the future but do not wait yet let (published, failed) = await f.sendHelper(f.floodsub.getOrDefault(topic), @[msg]) for p in failed: diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 708a19b4e..5fc2c4b50 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -506,8 +506,9 @@ method publish*(g: GossipSub, # time g.lastFanoutPubSub[topic] = Moment.fromNow(GossipSubFanoutTTL) + inc g.msgSeqno let - msg = Message.init(g.peerInfo, data, topic, g.sign) + msg = Message.init(g.peerInfo, data, topic, g.msgSeqno, g.sign) msgId = g.msgIdProvider(msg) trace "publishing on topic", diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index d9679fa85..66027e2d2 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -60,6 +60,7 @@ type validators*: Table[string, HashSet[ValidatorHandler]] observers: ref seq[PubSubObserver] # ref as in smart_ptr msgIdProvider*: MsgIdProvider # Turn message into message id (not nil) + msgSeqno*: uint64 proc hasPeerID*(t: PeerTable, topic, peerId: string): bool = # unefficient but used only in tests! diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index f5fcd1719..ca72a3224 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -187,8 +187,9 @@ proc sendMsg*(p: PubSubPeer, peerId: PeerID, topic: string, data: seq[byte], + seqno: uint64, sign: bool): Future[void] {.gcsafe.} = - p.send(@[RPCMsg(messages: @[Message.init(p.peerInfo, data, topic, sign)])]) + p.send(@[RPCMsg(messages: @[Message.init(p.peerInfo, data, topic, seqno, sign)])]) proc sendGraft*(p: PubSubPeer, topics: seq[string]) {.async.} = try: diff --git a/libp2p/protocols/pubsub/rpc/message.nim b/libp2p/protocols/pubsub/rpc/message.nim index 9ff941853..0eecd548c 100644 --- a/libp2p/protocols/pubsub/rpc/message.nim +++ b/libp2p/protocols/pubsub/rpc/message.nim @@ -19,6 +19,7 @@ import messages, protobuf, ../../../peerinfo, ../../../crypto/crypto, ../../../protobuf/minprotobuf +import stew/endians2 logScope: topics = "pubsubmessage" @@ -56,15 +57,12 @@ proc init*( p: PeerInfo, data: seq[byte], topic: string, + seqno: uint64, sign: bool = true): Message {.gcsafe, raises: [CatchableError, Defect].} = - var seqno: seq[byte] = newSeq[byte](8) - if randomBytes(addr seqno[0], 8) <= 0: - raise (ref CatchableError)(msg: "Cannot get randomness for message") - result = Message( fromPeer: p.peerId, data: data, - seqno: seqno, + seqno: @(seqno.toBytesBE), # unefficient, fine for now topicIDs: @[topic]) if sign and p.publicKey.isSome: diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index be0127873..7ba6cfac8 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -227,12 +227,14 @@ suite "GossipSub internal": gossipSub.gossipsub[topic].incl(peer) # generate messages + var seqno = 0'u64 for i in 0..5: let conn = newBufferStream(noop) conns &= conn let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo - let msg = Message.init(peerInfo, ("HELLO" & $i).toBytes(), topic, false) + inc seqno + let msg = Message.init(peerInfo, ("HELLO" & $i).toBytes(), topic, seqno, false) gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg) check gossipSub.fanout[topic].len == 15 @@ -276,12 +278,14 @@ suite "GossipSub internal": gossipSub.gossipsub[topic].incl(peer) # generate messages + var seqno = 0'u64 for i in 0..5: let conn = newBufferStream(noop) conns &= conn let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo - let msg = Message.init(peerInfo, ("HELLO" & $i).toBytes(), topic, false) + inc seqno + let msg = Message.init(peerInfo, ("HELLO" & $i).toBytes(), topic, seqno, false) gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg) let peers = gossipSub.getGossipPeers() @@ -318,12 +322,14 @@ suite "GossipSub internal": gossipSub.gossipsub[topic].incl(peer) # generate messages + var seqno = 0'u64 for i in 0..5: let conn = newBufferStream(noop) conns &= conn let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo - let msg = Message.init(peerInfo, ("HELLO" & $i).toBytes(), topic, false) + inc seqno + let msg = Message.init(peerInfo, ("HELLO" & $i).toBytes(), topic, seqno, false) gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg) let peers = gossipSub.getGossipPeers() @@ -360,12 +366,14 @@ suite "GossipSub internal": gossipSub.fanout[topic].incl(peer) # generate messages + var seqno = 0'u64 for i in 0..5: let conn = newBufferStream(noop) conns &= conn let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo - let msg = Message.init(peerInfo, ("bar" & $i).toBytes(), topic, false) + inc seqno + let msg = Message.init(peerInfo, ("bar" & $i).toBytes(), topic, seqno, false) gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg) let peers = gossipSub.getGossipPeers() diff --git a/tests/pubsub/testmessage.nim b/tests/pubsub/testmessage.nim index 48acffcef..571a0566c 100644 --- a/tests/pubsub/testmessage.nim +++ b/tests/pubsub/testmessage.nim @@ -11,8 +11,9 @@ let rng = newRng() suite "Message": test "signature": + var seqno = 11'u64 let peer = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) - msg = Message.init(peer, @[], "topic", sign = true) + msg = Message.init(peer, @[], "topic", seqno, sign = true) check verify(msg, peer)