diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 8809d6fe7..4a7d1a13b 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -148,12 +148,15 @@ method rpcHandler*(f: FloodSub, discard var toSendPeers = initHashSet[PubSubPeer]() - for t in msg.topicIds: # for every topic in the message - if t notin f.topics: - continue - f.floodsub.withValue(t, peers): toSendPeers.incl(peers[]) + let topic = msg.topic + if topic notin f.topics: + debug "Dropping message due to topic not in floodsub topics", topic, msgId, peer + continue - await handleData(f, t, msg.data) + f.floodsub.withValue(topic, peers): + toSendPeers.incl(peers[]) + + await handleData(f, topic, msg.data) # In theory, if topics are the same in all messages, we could batch - we'd # also have to be careful to only include validated messages diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index d93b8b9b5..0382279a8 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -276,7 +276,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) = var respControl: ControlMessage g.handleIDontWant(peer, control.idontwant) let iwant = g.handleIHave(peer, control.ihave) - if iwant.messageIds.len > 0: + if iwant.messageIDs.len > 0: respControl.iwant.add(iwant) respControl.prune.add(g.handleGraft(peer, control.graft)) let messages = g.handleIWant(peer, control.iwant) @@ -292,8 +292,8 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) = if isPruneNotEmpty: for prune in respControl.prune: - if g.knownTopics.contains(prune.topicId): - libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicId]) + if g.knownTopics.contains(prune.topicID): + libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicID]) else: libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"]) @@ -304,11 +304,11 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) = if messages.len > 0: for smsg in messages: - for topic in smsg.topicIds: - if g.knownTopics.contains(topic): - libp2p_pubsub_broadcast_messages.inc(labelValues = [topic]) - else: - libp2p_pubsub_broadcast_messages.inc(labelValues = ["generic"]) + let topic = smsg.topic + if g.knownTopics.contains(topic): + libp2p_pubsub_broadcast_messages.inc(labelValues = [topic]) + else: + libp2p_pubsub_broadcast_messages.inc(labelValues = ["generic"]) # iwant replies have lower priority trace "sending iwant reply messages", peer @@ -344,18 +344,18 @@ proc validateAndRelay(g: GossipSub, # store in cache only after validation g.mcache.put(msgId, msg) - g.rewardDelivered(peer, msg.topicIds, true) + let topic = msg.topic + g.rewardDelivered(peer, topic, true) var toSendPeers = HashSet[PubSubPeer]() - for t in msg.topicIds: # for every topic in the message - if t notin g.topics: - continue + if topic notin g.topics: + return - g.floodsub.withValue(t, peers): toSendPeers.incl(peers[]) - g.mesh.withValue(t, peers): toSendPeers.incl(peers[]) + g.floodsub.withValue(topic, peers): toSendPeers.incl(peers[]) + g.mesh.withValue(topic, peers): toSendPeers.incl(peers[]) - # add direct peers - toSendPeers.incl(g.subscribedDirectPeers.getOrDefault(t)) + # add direct peers + toSendPeers.incl(g.subscribedDirectPeers.getOrDefault(topic)) # Don't send it to source peer, or peers that # sent it during validation @@ -366,7 +366,7 @@ proc validateAndRelay(g: GossipSub, # bigger than the messageId if msg.data.len > msgId.len * 10: g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage( - idontwant: @[ControlIWant(messageIds: @[msgId])] + idontwant: @[ControlIWant(messageIDs: @[msgId])] ))), isHighPriority = true) for peer in toSendPeers: @@ -383,20 +383,18 @@ proc validateAndRelay(g: GossipSub, # also have to be careful to only include validated messages g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), isHighPriority = false) trace "forwarded message to peers", peers = toSendPeers.len, msgId, peer - for topic in msg.topicIds: - if topic notin g.topics: continue - if g.knownTopics.contains(topic): - libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = [topic]) - else: - libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = ["generic"]) + if g.knownTopics.contains(topic): + libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = [topic]) + else: + libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = ["generic"]) - await handleData(g, topic, msg.data) + await handleData(g, topic, msg.data) except CatchableError as exc: info "validateAndRelay failed", msg=exc.msg proc dataAndTopicsIdSize(msgs: seq[Message]): int = - msgs.mapIt(it.data.len + it.topicIds.mapIt(it.len).foldl(a + b, 0)).foldl(a + b, 0) + msgs.mapIt(it.data.len + it.topic.len).foldl(a + b, 0) proc messageOverhead(g: GossipSub, msg: RPCMsg, msgSize: int): int = # In this way we count even ignored fields by protobuf @@ -433,8 +431,7 @@ method rpcHandler*(g: GossipSub, when defined(libp2p_expensive_metrics): for m in rpcMsg.messages: - for t in m.topicIds: - libp2p_pubsub_received_messages.inc(labelValues = [$peer.peerId, t]) + libp2p_pubsub_received_messages.inc(labelValues = [$peer.peerId, m.topic]) trace "decoded msg from peer", peer, msg = rpcMsg.shortLog await rateLimit(g, peer, g.messageOverhead(rpcMsg, msgSize)) @@ -470,6 +467,7 @@ method rpcHandler*(g: GossipSub, let msgId = msgIdResult.get msgIdSalted = msgId & g.seenSalt + topic = msg.topic # addSeen adds salt to msgId to avoid # remote attacking the hash function @@ -484,7 +482,7 @@ method rpcHandler*(g: GossipSub, if not alreadyReceived: let delay = Moment.now() - g.firstSeen(msgId) - g.rewardDelivered(peer, msg.topicIds, false, delay) + g.rewardDelivered(peer, topic, false, delay) libp2p_gossipsub_duplicate.inc() @@ -494,7 +492,7 @@ method rpcHandler*(g: GossipSub, libp2p_gossipsub_received.inc() # avoid processing messages we are not interested in - if msg.topicIds.allIt(it notin g.topics): + if topic notin g.topics: debug "Dropping message of topic without subscription", msgId = shortLog(msgId), peer continue diff --git a/libp2p/protocols/pubsub/gossipsub/behavior.nim b/libp2p/protocols/pubsub/gossipsub/behavior.nim index b626cb1da..302695abf 100644 --- a/libp2p/protocols/pubsub/gossipsub/behavior.nim +++ b/libp2p/protocols/pubsub/gossipsub/behavior.nim @@ -103,8 +103,8 @@ proc handleGraft*(g: GossipSub, grafts: seq[ControlGraft]): seq[ControlPrune] = # {.raises: [Defect].} TODO chronicles exception on windows var prunes: seq[ControlPrune] for graft in grafts: - let topic = graft.topicId - trace "peer grafted topic", peer, topic + let topic = graft.topicID + trace "peer grafted topicID", peer, topic # It is an error to GRAFT on a direct peer if peer.peerId in g.parameters.directPeers: @@ -207,9 +207,9 @@ proc getPeers(prune: ControlPrune, peer: PubSubPeer): seq[(PeerId, Option[PeerRe proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) {.raises: [].} = for prune in prunes: - let topic = prune.topicId + let topic = prune.topicID - trace "peer pruned topic", peer, topic + trace "peer pruned topicID", peer, topic # add peer backoff if prune.backoff > 0: @@ -248,26 +248,26 @@ proc handleIHave*(g: GossipSub, else: for ihave in ihaves: trace "peer sent ihave", - peer, topic = ihave.topicId, msgs = ihave.messageIds - if ihave.topicId in g.topics: - for msgId in ihave.messageIds: + peer, topicID = ihave.topicID, msgs = ihave.messageIDs + if ihave.topicID in g.topics: + for msgId in ihave.messageIDs: if not g.hasSeen(msgId): if peer.iHaveBudget <= 0: break - elif msgId notin res.messageIds: - res.messageIds.add(msgId) + elif msgId notin res.messageIDs: + res.messageIDs.add(msgId) dec peer.iHaveBudget trace "requested message via ihave", messageID=msgId # shuffling res.messageIDs before sending it out to increase the likelihood # of getting an answer if the peer truncates the list due to internal size restrictions. - g.rng.shuffle(res.messageIds) + g.rng.shuffle(res.messageIDs) return res proc handleIDontWant*(g: GossipSub, peer: PubSubPeer, iDontWants: seq[ControlIWant]) = for dontWant in iDontWants: - for messageId in dontWant.messageIds: + for messageId in dontWant.messageIDs: if peer.heDontWants[^1].len > 1000: break if messageId.len > 100: continue peer.heDontWants[^1].incl(messageId) @@ -282,7 +282,7 @@ proc handleIWant*(g: GossipSub, trace "iwant: ignoring low score peer", peer, score = peer.score else: for iwant in iwants: - for mid in iwant.messageIds: + for mid in iwant.messageIDs: trace "peer sent iwant", peer, messageID = mid # canAskIWant will only return true once for a specific message if not peer.canAskIWant(mid): @@ -579,7 +579,7 @@ proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.raises: trace "getting gossip peers (iHave)", ntopics=topics.len for topic in topics: if topic notin g.gossipsub: - trace "topic not in gossip array, skipping", topicID = topic + trace "topic not in gossip array, skipping", topic = topic continue let mids = g.mcache.window(topic) @@ -621,7 +621,7 @@ proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.raises: g.rng.shuffle(allPeers) allPeers.setLen(target) - let msgIdsAsSet = ihave.messageIds.toHashSet() + let msgIdsAsSet = ihave.messageIDs.toHashSet() for peer in allPeers: control.mgetOrPut(peer, ControlMessage()).ihave.add(ihave) @@ -687,8 +687,8 @@ proc onHeartbeat(g: GossipSub) {.raises: [].} = for peer, control in peers: # only ihave from here for ihave in control.ihave: - if g.knownTopics.contains(ihave.topicId): - libp2p_pubsub_broadcast_ihave.inc(labelValues = [ihave.topicId]) + if g.knownTopics.contains(ihave.topicID): + libp2p_pubsub_broadcast_ihave.inc(labelValues = [ihave.topicID]) else: libp2p_pubsub_broadcast_ihave.inc(labelValues = ["generic"]) g.send(peer, RPCMsg(control: some(control)), isHighPriority = true) diff --git a/libp2p/protocols/pubsub/gossipsub/scoring.nim b/libp2p/protocols/pubsub/gossipsub/scoring.nim index ee4f34da0..7cac93d93 100644 --- a/libp2p/protocols/pubsub/gossipsub/scoring.nim +++ b/libp2p/protocols/pubsub/gossipsub/scoring.nim @@ -250,43 +250,42 @@ proc punishInvalidMessage*(g: GossipSub, peer: PubSubPeer, msg: Message) {.async await g.disconnectPeer(peer) raise newException(PeerRateLimitError, "Peer disconnected because it's above rate limit.") + let topic = msg.topic + if topic notin g.topics: + return - for tt in msg.topicIds: - let t = tt - if t notin g.topics: - continue - - let tt = t - # update stats - g.withPeerStats(peer.peerId) do (stats: var PeerStats): - stats.topicInfos.mgetOrPut(tt, TopicInfo()).invalidMessageDeliveries += 1 + # update stats + g.withPeerStats(peer.peerId) do(stats: var PeerStats): + stats.topicInfos.mgetOrPut(topic, TopicInfo()).invalidMessageDeliveries += 1 proc addCapped*[T](stat: var T, diff, cap: T) = stat += min(diff, cap - stat) proc rewardDelivered*( - g: GossipSub, peer: PubSubPeer, topics: openArray[string], first: bool, delay = ZeroDuration) = - for tt in topics: - let t = tt - if t notin g.topics: - continue + g: GossipSub, + peer: PubSubPeer, + topic: string, + first: bool, + delay = ZeroDuration, +) = + if topic notin g.topics: + return - let tt = t - let topicParams = g.topicParams.mgetOrPut(t, TopicParams.init()) - # if in mesh add more delivery score + let topicParams = g.topicParams.mgetOrPut(topic, TopicParams.init()) + # if in mesh add more delivery score - if delay > topicParams.meshMessageDeliveriesWindow: - # Too old - continue + if delay > topicParams.meshMessageDeliveriesWindow: + # Too old + return - g.withPeerStats(peer.peerId) do (stats: var PeerStats): - stats.topicInfos.withValue(tt, tstats): - if first: - tstats[].firstMessageDeliveries.addCapped( - 1, topicParams.firstMessageDeliveriesCap) + g.withPeerStats(peer.peerId) do (stats: var PeerStats): + stats.topicInfos.withValue(topic, tstats): + if first: + tstats[].firstMessageDeliveries.addCapped( + 1, topicParams.firstMessageDeliveriesCap) - if tstats[].inMesh: - tstats[].meshMessageDeliveries.addCapped( - 1, topicParams.meshMessageDeliveriesCap) - do: # make sure we don't loose this information - stats.topicInfos[tt] = TopicInfo(meshMessageDeliveries: 1) + if tstats[].inMesh: + tstats[].meshMessageDeliveries.addCapped( + 1, topicParams.meshMessageDeliveriesCap) + do: # make sure we don't lose this information + stats.topicInfos[topic] = TopicInfo(meshMessageDeliveries: 1) diff --git a/libp2p/protocols/pubsub/mcache.nim b/libp2p/protocols/pubsub/mcache.nim index d6ea6871a..da0d92fb5 100644 --- a/libp2p/protocols/pubsub/mcache.nim +++ b/libp2p/protocols/pubsub/mcache.nim @@ -17,7 +17,7 @@ export sets, tables, messages, options type CacheEntry* = object mid*: MessageId - topicIds*: seq[string] + topic*: string MCache* = object of RootObj msgs*: Table[MessageId, Message] @@ -37,7 +37,7 @@ func contains*(c: MCache, mid: MessageId): bool = func put*(c: var MCache, msgId: MessageId, msg: Message) = if not c.msgs.hasKeyOrPut(msgId, msg): # Only add cache entry if the message was not already in the cache - c.history[0].add(CacheEntry(mid: msgId, topicIds: msg.topicIds)) + c.history[0].add(CacheEntry(mid: msgId, topic: msg.topic)) func window*(c: MCache, topic: string): HashSet[MessageId] = let @@ -45,10 +45,8 @@ func window*(c: MCache, topic: string): HashSet[MessageId] = for i in 0.. 0: ipb.finish() @@ -110,8 +110,7 @@ proc encodeMessage*(msg: Message, anonymize: bool): seq[byte] = pb.write(2, msg.data) if len(msg.seqno) > 0 and not anonymize: pb.write(3, msg.seqno) - for topic in msg.topicIds: - pb.write(4, topic) + pb.write(4, msg.topic) if len(msg.signature) > 0 and not anonymize: pb.write(5, msg.signature) if len(msg.key) > 0 and not anonymize: @@ -133,10 +132,10 @@ proc decodeGraft*(pb: ProtoBuffer): ProtoResult[ControlGraft] {. trace "decodeGraft: decoding message" var control = ControlGraft() - if ? pb.getField(1, control.topicId): - trace "decodeGraft: read topicId", topic_id = control.topicId + if ? pb.getField(1, control.topicID): + trace "decodeGraft: read topicID", topicID = control.topicID else: - trace "decodeGraft: topicId is missing" + trace "decodeGraft: topicID is missing" ok(control) proc decodePeerInfoMsg*(pb: ProtoBuffer): ProtoResult[PeerInfoMsg] {. @@ -160,10 +159,10 @@ proc decodePrune*(pb: ProtoBuffer): ProtoResult[ControlPrune] {. trace "decodePrune: decoding message" var control = ControlPrune() - if ? pb.getField(1, control.topicId): - trace "decodePrune: read topicId", topic_id = control.topicId + if ? pb.getField(1, control.topicID): + trace "decodePrune: read topicID", topic = control.topicID else: - trace "decodePrune: topicId is missing" + trace "decodePrune: topicID is missing" var bpeers: seq[seq[byte]] if ? pb.getRepeatedField(2, bpeers): for bpeer in bpeers: @@ -179,12 +178,12 @@ proc decodeIHave*(pb: ProtoBuffer): ProtoResult[ControlIHave] {. trace "decodeIHave: decoding message" var control = ControlIHave() - if ? pb.getField(1, control.topicId): - trace "decodeIHave: read topicId", topic_id = control.topicId + if ? pb.getField(1, control.topicID): + trace "decodeIHave: read topicID", topic = control.topicID else: - trace "decodeIHave: topicId is missing" - if ? pb.getRepeatedField(2, control.messageIds): - trace "decodeIHave: read messageIDs", message_ids = control.messageIds + trace "decodeIHave: topicID is missing" + if ? pb.getRepeatedField(2, control.messageIDs): + trace "decodeIHave: read messageIDs", message_ids = control.messageIDs else: trace "decodeIHave: no messageIDs" ok(control) @@ -195,8 +194,8 @@ proc decodeIWant*(pb: ProtoBuffer): ProtoResult[ControlIWant] {.inline.} = trace "decodeIWant: decoding message" var control = ControlIWant() - if ? pb.getRepeatedField(1, control.messageIds): - trace "decodeIWant: read messageIDs", message_ids = control.messageIds + if ? pb.getRepeatedField(1, control.messageIDs): + trace "decodeIWant: read messageIDs", message_ids = control.messageIDs else: trace "decodeIWant: no messageIDs" ok(control) @@ -286,10 +285,11 @@ proc decodeMessage*(pb: ProtoBuffer): ProtoResult[Message] {.inline.} = trace "decodeMessage: read seqno", seqno = msg.seqno else: trace "decodeMessage: seqno is missing" - if ? pb.getRepeatedField(4, msg.topicIds): - trace "decodeMessage: read topics", topic_ids = msg.topicIds + if ?pb.getField(4, msg.topic): + trace "decodeMessage: read topic", topic = msg.topic else: - trace "decodeMessage: topics are missing" + trace "decodeMessage: topic is required" + return err(ProtoError.RequiredFieldMissing) if ? pb.getField(5, msg.signature): trace "decodeMessage: read signature", signature = msg.signature.shortLog() else: diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index 971029832..96032b5fd 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -692,7 +692,7 @@ suite "GossipSub internal": ) peer.iHaveBudget = 0 let iwants = gossipSub.handleIHave(peer, @[msg]) - check: iwants.messageIds.len == 0 + check: iwants.messageIDs.len == 0 block: # given duplicate ihave should generate only one iwant @@ -707,7 +707,7 @@ suite "GossipSub internal": messageIDs: @[id, id, id] ) let iwants = gossipSub.handleIHave(peer, @[msg]) - check: iwants.messageIds.len == 1 + check: iwants.messageIDs.len == 1 block: # given duplicate iwant should generate only one message @@ -790,7 +790,7 @@ suite "GossipSub internal": let (iwantMessageIds, sentMessages) = createMessages(gossip0, gossip1, messageSize, messageSize) gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage( - ihave: @[ControlIHave(topicId: "foobar", messageIds: iwantMessageIds)] + ihave: @[ControlIHave(topicID: "foobar", messageIDs: iwantMessageIds)] ))), isHighPriority = false) checkUntilTimeout: receivedMessages[] == sentMessages @@ -807,7 +807,7 @@ suite "GossipSub internal": let (bigIWantMessageIds, sentMessages) = createMessages(gossip0, gossip1, messageSize, messageSize) gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage( - ihave: @[ControlIHave(topicId: "foobar", messageIds: bigIWantMessageIds)] + ihave: @[ControlIHave(topicID: "foobar", messageIDs: bigIWantMessageIds)] ))), isHighPriority = false) await sleepAsync(300.milliseconds) @@ -824,7 +824,7 @@ suite "GossipSub internal": let (bigIWantMessageIds, sentMessages) = createMessages(gossip0, gossip1, size1, size2) gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage( - ihave: @[ControlIHave(topicId: "foobar", messageIds: bigIWantMessageIds)] + ihave: @[ControlIHave(topicID: "foobar", messageIDs: bigIWantMessageIds)] ))), isHighPriority = false) checkUntilTimeout: receivedMessages[] == sentMessages @@ -842,7 +842,7 @@ suite "GossipSub internal": let (bigIWantMessageIds, sentMessages) = createMessages(gossip0, gossip1, size1, size2) gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage( - ihave: @[ControlIHave(topicId: "foobar", messageIds: bigIWantMessageIds)] + ihave: @[ControlIHave(topicID: "foobar", messageIDs: bigIWantMessageIds)] ))), isHighPriority = false) var smallestSet: HashSet[seq[byte]] diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index a843ed51d..514e64dab 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -911,7 +911,7 @@ suite "GossipSub": check: gossip3.mesh.peers("foobar") == 1 gossip3.broadcast(gossip3.mesh["foobar"], RPCMsg(control: some(ControlMessage( - idontwant: @[ControlIWant(messageIds: @[newSeq[byte](10)])] + idontwant: @[ControlIWant(messageIDs: @[newSeq[byte](10)])] ))), isHighPriority = true) checkUntilTimeout: gossip2.mesh.getOrDefault("foobar").anyIt(it.heDontWants[^1].len == 1) @@ -970,8 +970,9 @@ suite "GossipSub": gossip0.broadcast( gossip0.mesh["foobar"], - RPCMsg(messages: @[Message(topicIDs: @["foobar"], data: newSeq[byte](10))]), - isHighPriority = true) + RPCMsg(messages: @[Message(topic: "foobar", data: newSeq[byte](10))]), + isHighPriority = true, + ) await sleepAsync(300.millis) check currentRateLimitHits() == rateLimitHits @@ -981,8 +982,9 @@ suite "GossipSub": gossip1.parameters.disconnectPeerAboveRateLimit = true gossip0.broadcast( gossip0.mesh["foobar"], - RPCMsg(messages: @[Message(topicIDs: @["foobar"], data: newSeq[byte](12))]), - isHighPriority = true) + RPCMsg(messages: @[Message(topic: "foobar", data: newSeq[byte](12))]), + isHighPriority = true, + ) await sleepAsync(300.millis) check gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == true @@ -1053,7 +1055,7 @@ suite "GossipSub": gossip0.addValidator(topic, execValidator) gossip1.addValidator(topic, execValidator) - let msg = RPCMsg(messages: @[Message(topicIDs: @[topic], data: newSeq[byte](40))]) + let msg = RPCMsg(messages: @[Message(topic: topic, data: newSeq[byte](40))]) gossip0.broadcast(gossip0.mesh[topic], msg, isHighPriority = true) await sleepAsync(300.millis) @@ -1065,8 +1067,9 @@ suite "GossipSub": gossip1.parameters.disconnectPeerAboveRateLimit = true gossip0.broadcast( gossip0.mesh[topic], - RPCMsg(messages: @[Message(topicIDs: @[topic], data: newSeq[byte](35))]), - isHighPriority = true) + RPCMsg(messages: @[Message(topic: topic, data: newSeq[byte](35))]), + isHighPriority = true, + ) checkUntilTimeout gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false check currentRateLimitHits() == rateLimitHits + 2 diff --git a/tests/pubsub/testmcache.nim b/tests/pubsub/testmcache.nim index 6c1222778..ca022de1f 100644 --- a/tests/pubsub/testmcache.nim +++ b/tests/pubsub/testmcache.nim @@ -27,48 +27,48 @@ suite "MCache": var mCache = MCache.init(3, 5) for i in 0..<3: - var msg = Message(fromPeer: randomPeerId(), - seqno: "12345".toBytes(), - topicIDs: @["foo"]) + var + msg = + Message(fromPeer: randomPeerId(), seqno: "12345".toBytes(), topic: "foo") mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg) for i in 0..<5: - var msg = Message(fromPeer: randomPeerId(), - seqno: "12345".toBytes(), - topicIDs: @["bar"]) + var + msg = + Message(fromPeer: randomPeerId(), seqno: "12345".toBytes(), topic: "bar") mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg) var mids = mCache.window("foo") check mids.len == 3 var id = toSeq(mids)[0] - check mCache.get(id).get().topicIds[0] == "foo" + check mCache.get(id).get().topic == "foo" test "shift - shift 1 window at a time": var mCache = MCache.init(1, 5) for i in 0..<3: - var msg = Message(fromPeer: randomPeerId(), - seqno: "12345".toBytes(), - topicIDs: @["foo"]) + var + msg = + Message(fromPeer: randomPeerId(), seqno: "12345".toBytes(), topic: "foo") mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg) mCache.shift() check mCache.window("foo").len == 0 for i in 0..<3: - var msg = Message(fromPeer: randomPeerId(), - seqno: "12345".toBytes(), - topicIDs: @["bar"]) + var + msg = + Message(fromPeer: randomPeerId(), seqno: "12345".toBytes(), topic: "bar") mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg) mCache.shift() check mCache.window("bar").len == 0 for i in 0..<3: - var msg = Message(fromPeer: randomPeerId(), - seqno: "12345".toBytes(), - topicIDs: @["baz"]) + var + msg = + Message(fromPeer: randomPeerId(), seqno: "12345".toBytes(), topic: "baz") mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg) mCache.shift() @@ -78,21 +78,21 @@ suite "MCache": var mCache = MCache.init(1, 5) for i in 0..<3: - var msg = Message(fromPeer: randomPeerId(), - seqno: "12345".toBytes(), - topicIDs: @["foo"]) + var + msg = + Message(fromPeer: randomPeerId(), seqno: "12345".toBytes(), topic: "foo") mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg) for i in 0..<3: - var msg = Message(fromPeer: randomPeerId(), - seqno: "12345".toBytes(), - topicIDs: @["bar"]) + var + msg = + Message(fromPeer: randomPeerId(), seqno: "12345".toBytes(), topic: "bar") mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg) for i in 0..<3: - var msg = Message(fromPeer: randomPeerId(), - seqno: "12345".toBytes(), - topicIDs: @["baz"]) + var + msg = + Message(fromPeer: randomPeerId(), seqno: "12345".toBytes(), topic: "baz") mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg) mCache.shift() diff --git a/tests/pubsub/testmessage.nim b/tests/pubsub/testmessage.nim index 589920b40..ccff641b1 100644 --- a/tests/pubsub/testmessage.nim +++ b/tests/pubsub/testmessage.nim @@ -75,14 +75,17 @@ suite "Message": msgIdResult.error == ValidationResult.Reject test "byteSize for RPCMsg": - var msg = Message( - fromPeer: PeerId(data: @['a'.byte, 'b'.byte]), # 2 bytes - data: @[1'u8, 2, 3], # 3 bytes - seqno: @[4'u8, 5], # 2 bytes - signature: @['c'.byte, 'd'.byte], # 2 bytes - key: @[6'u8, 7], # 2 bytes - topicIds: @["abc", "defgh"] # 3 + 5 = 8 bytes - ) + var + msg = + Message( + fromPeer: PeerId(data: @['a'.byte, 'b'.byte]), # 2 bytes + data: @[1'u8, 2, 3], # 3 bytes + seqno: @[4'u8, 5], # 2 bytes + signature: @['c'.byte, 'd'.byte], # 2 bytes + key: @[6'u8, 7], # 2 bytes + topic: "abcde" # 5 bytes + , + ) var peerInfo = PeerInfoMsg( peerId: PeerId(data: @['e'.byte]), # 1 byte @@ -90,20 +93,20 @@ suite "Message": ) var controlIHave = ControlIHave( - topicId: "ijk", # 3 bytes - messageIds: @[ @['l'.byte], @['m'.byte, 'n'.byte] ] # 1 + 2 = 3 bytes + topicID: "ijk", # 3 bytes + messageIDs: @[ @['l'.byte], @['m'.byte, 'n'.byte] ] # 1 + 2 = 3 bytes ) var controlIWant = ControlIWant( - messageIds: @[ @['o'.byte, 'p'.byte], @['q'.byte] ] # 2 + 1 = 3 bytes + messageIDs: @[ @['o'.byte, 'p'.byte], @['q'.byte] ] # 2 + 1 = 3 bytes ) var controlGraft = ControlGraft( - topicId: "rst" # 3 bytes + topicID: "rst" # 3 bytes ) var controlPrune = ControlPrune( - topicId: "uvw", # 3 bytes + topicID: "uvw", # 3 bytes peers: @[peerInfo, peerInfo], # (1 + 2) * 2 = 6 bytes backoff: 12345678 # 8 bytes for uint64 ) @@ -118,10 +121,10 @@ suite "Message": var rpcMsg = RPCMsg( subscriptions: @[SubOpts(subscribe: true, topic: "a".repeat(12)), SubOpts(subscribe: false, topic: "b".repeat(14))], # 1 + 12 + 1 + 14 = 28 bytes - messages: @[msg, msg], # 19 * 2 = 38 bytes + messages: @[msg, msg], # 16 * 2 = 32 bytes ping: @[1'u8, 2], # 2 bytes pong: @[3'u8, 4], # 2 bytes control: some(control) # 12 + 3 + 3 + 17 + 3 = 38 bytes ) - check byteSize(rpcMsg) == 28 + 38 + 2 + 2 + 38 # Total: 108 bytes + check byteSize(rpcMsg) == 28 + 32 + 2 + 2 + 38 # Total: 102 bytes diff --git a/tests/pubsub/utils.nim b/tests/pubsub/utils.nim index d77653741..94bbd4081 100644 --- a/tests/pubsub/utils.nim +++ b/tests/pubsub/utils.nim @@ -43,14 +43,15 @@ proc randomPeerId*(): PeerId = raise newException(Defect, exc.msg) func defaultMsgIdProvider*(m: Message): Result[MessageId, ValidationResult] = - let mid = - if m.seqno.len > 0 and m.fromPeer.data.len > 0: - byteutils.toHex(m.seqno) & $m.fromPeer - else: - # This part is irrelevant because it's not standard, - # We use it exclusively for testing basically and users should - # implement their own logic in the case they use anonymization - $m.data.hash & $m.topicIds.hash + let + mid = + if m.seqno.len > 0 and m.fromPeer.data.len > 0: + byteutils.toHex(m.seqno) & $m.fromPeer + else: + # This part is irrelevant because it's not standard, + # We use it exclusively for testing basically and users should + # implement their own logic in the case they use anonymization + $m.data.hash & $m.topic.hash ok mid.toBytes() proc generateNodes*(