fix(issue-1052): Single topic for RPC Message (#1061)
This commit is contained in:
parent
a2027003cd
commit
458b0885dd
|
@ -148,12 +148,15 @@ method rpcHandler*(f: FloodSub,
|
||||||
discard
|
discard
|
||||||
|
|
||||||
var toSendPeers = initHashSet[PubSubPeer]()
|
var toSendPeers = initHashSet[PubSubPeer]()
|
||||||
for t in msg.topicIds: # for every topic in the message
|
let topic = msg.topic
|
||||||
if t notin f.topics:
|
if topic notin f.topics:
|
||||||
|
debug "Dropping message due to topic not in floodsub topics", topic, msgId, peer
|
||||||
continue
|
continue
|
||||||
f.floodsub.withValue(t, peers): toSendPeers.incl(peers[])
|
|
||||||
|
|
||||||
await handleData(f, t, msg.data)
|
f.floodsub.withValue(topic, peers):
|
||||||
|
toSendPeers.incl(peers[])
|
||||||
|
|
||||||
|
await handleData(f, topic, msg.data)
|
||||||
|
|
||||||
# In theory, if topics are the same in all messages, we could batch - we'd
|
# In theory, if topics are the same in all messages, we could batch - we'd
|
||||||
# also have to be careful to only include validated messages
|
# also have to be careful to only include validated messages
|
||||||
|
|
|
@ -276,7 +276,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
|
||||||
var respControl: ControlMessage
|
var respControl: ControlMessage
|
||||||
g.handleIDontWant(peer, control.idontwant)
|
g.handleIDontWant(peer, control.idontwant)
|
||||||
let iwant = g.handleIHave(peer, control.ihave)
|
let iwant = g.handleIHave(peer, control.ihave)
|
||||||
if iwant.messageIds.len > 0:
|
if iwant.messageIDs.len > 0:
|
||||||
respControl.iwant.add(iwant)
|
respControl.iwant.add(iwant)
|
||||||
respControl.prune.add(g.handleGraft(peer, control.graft))
|
respControl.prune.add(g.handleGraft(peer, control.graft))
|
||||||
let messages = g.handleIWant(peer, control.iwant)
|
let messages = g.handleIWant(peer, control.iwant)
|
||||||
|
@ -292,8 +292,8 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
|
||||||
|
|
||||||
if isPruneNotEmpty:
|
if isPruneNotEmpty:
|
||||||
for prune in respControl.prune:
|
for prune in respControl.prune:
|
||||||
if g.knownTopics.contains(prune.topicId):
|
if g.knownTopics.contains(prune.topicID):
|
||||||
libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicId])
|
libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicID])
|
||||||
else:
|
else:
|
||||||
libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"])
|
libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"])
|
||||||
|
|
||||||
|
@ -304,7 +304,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
|
||||||
|
|
||||||
if messages.len > 0:
|
if messages.len > 0:
|
||||||
for smsg in messages:
|
for smsg in messages:
|
||||||
for topic in smsg.topicIds:
|
let topic = smsg.topic
|
||||||
if g.knownTopics.contains(topic):
|
if g.knownTopics.contains(topic):
|
||||||
libp2p_pubsub_broadcast_messages.inc(labelValues = [topic])
|
libp2p_pubsub_broadcast_messages.inc(labelValues = [topic])
|
||||||
else:
|
else:
|
||||||
|
@ -344,18 +344,18 @@ proc validateAndRelay(g: GossipSub,
|
||||||
# store in cache only after validation
|
# store in cache only after validation
|
||||||
g.mcache.put(msgId, msg)
|
g.mcache.put(msgId, msg)
|
||||||
|
|
||||||
g.rewardDelivered(peer, msg.topicIds, true)
|
let topic = msg.topic
|
||||||
|
g.rewardDelivered(peer, topic, true)
|
||||||
|
|
||||||
var toSendPeers = HashSet[PubSubPeer]()
|
var toSendPeers = HashSet[PubSubPeer]()
|
||||||
for t in msg.topicIds: # for every topic in the message
|
if topic notin g.topics:
|
||||||
if t notin g.topics:
|
return
|
||||||
continue
|
|
||||||
|
|
||||||
g.floodsub.withValue(t, peers): toSendPeers.incl(peers[])
|
g.floodsub.withValue(topic, peers): toSendPeers.incl(peers[])
|
||||||
g.mesh.withValue(t, peers): toSendPeers.incl(peers[])
|
g.mesh.withValue(topic, peers): toSendPeers.incl(peers[])
|
||||||
|
|
||||||
# add direct peers
|
# add direct peers
|
||||||
toSendPeers.incl(g.subscribedDirectPeers.getOrDefault(t))
|
toSendPeers.incl(g.subscribedDirectPeers.getOrDefault(topic))
|
||||||
|
|
||||||
# Don't send it to source peer, or peers that
|
# Don't send it to source peer, or peers that
|
||||||
# sent it during validation
|
# sent it during validation
|
||||||
|
@ -366,7 +366,7 @@ proc validateAndRelay(g: GossipSub,
|
||||||
# bigger than the messageId
|
# bigger than the messageId
|
||||||
if msg.data.len > msgId.len * 10:
|
if msg.data.len > msgId.len * 10:
|
||||||
g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage(
|
g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage(
|
||||||
idontwant: @[ControlIWant(messageIds: @[msgId])]
|
idontwant: @[ControlIWant(messageIDs: @[msgId])]
|
||||||
))), isHighPriority = true)
|
))), isHighPriority = true)
|
||||||
|
|
||||||
for peer in toSendPeers:
|
for peer in toSendPeers:
|
||||||
|
@ -383,8 +383,6 @@ proc validateAndRelay(g: GossipSub,
|
||||||
# also have to be careful to only include validated messages
|
# also have to be careful to only include validated messages
|
||||||
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), isHighPriority = false)
|
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), isHighPriority = false)
|
||||||
trace "forwarded message to peers", peers = toSendPeers.len, msgId, peer
|
trace "forwarded message to peers", peers = toSendPeers.len, msgId, peer
|
||||||
for topic in msg.topicIds:
|
|
||||||
if topic notin g.topics: continue
|
|
||||||
|
|
||||||
if g.knownTopics.contains(topic):
|
if g.knownTopics.contains(topic):
|
||||||
libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = [topic])
|
libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = [topic])
|
||||||
|
@ -396,7 +394,7 @@ proc validateAndRelay(g: GossipSub,
|
||||||
info "validateAndRelay failed", msg=exc.msg
|
info "validateAndRelay failed", msg=exc.msg
|
||||||
|
|
||||||
proc dataAndTopicsIdSize(msgs: seq[Message]): int =
|
proc dataAndTopicsIdSize(msgs: seq[Message]): int =
|
||||||
msgs.mapIt(it.data.len + it.topicIds.mapIt(it.len).foldl(a + b, 0)).foldl(a + b, 0)
|
msgs.mapIt(it.data.len + it.topic.len).foldl(a + b, 0)
|
||||||
|
|
||||||
proc messageOverhead(g: GossipSub, msg: RPCMsg, msgSize: int): int =
|
proc messageOverhead(g: GossipSub, msg: RPCMsg, msgSize: int): int =
|
||||||
# In this way we count even ignored fields by protobuf
|
# In this way we count even ignored fields by protobuf
|
||||||
|
@ -433,8 +431,7 @@ method rpcHandler*(g: GossipSub,
|
||||||
|
|
||||||
when defined(libp2p_expensive_metrics):
|
when defined(libp2p_expensive_metrics):
|
||||||
for m in rpcMsg.messages:
|
for m in rpcMsg.messages:
|
||||||
for t in m.topicIds:
|
libp2p_pubsub_received_messages.inc(labelValues = [$peer.peerId, m.topic])
|
||||||
libp2p_pubsub_received_messages.inc(labelValues = [$peer.peerId, t])
|
|
||||||
|
|
||||||
trace "decoded msg from peer", peer, msg = rpcMsg.shortLog
|
trace "decoded msg from peer", peer, msg = rpcMsg.shortLog
|
||||||
await rateLimit(g, peer, g.messageOverhead(rpcMsg, msgSize))
|
await rateLimit(g, peer, g.messageOverhead(rpcMsg, msgSize))
|
||||||
|
@ -470,6 +467,7 @@ method rpcHandler*(g: GossipSub,
|
||||||
let
|
let
|
||||||
msgId = msgIdResult.get
|
msgId = msgIdResult.get
|
||||||
msgIdSalted = msgId & g.seenSalt
|
msgIdSalted = msgId & g.seenSalt
|
||||||
|
topic = msg.topic
|
||||||
|
|
||||||
# addSeen adds salt to msgId to avoid
|
# addSeen adds salt to msgId to avoid
|
||||||
# remote attacking the hash function
|
# remote attacking the hash function
|
||||||
|
@ -484,7 +482,7 @@ method rpcHandler*(g: GossipSub,
|
||||||
|
|
||||||
if not alreadyReceived:
|
if not alreadyReceived:
|
||||||
let delay = Moment.now() - g.firstSeen(msgId)
|
let delay = Moment.now() - g.firstSeen(msgId)
|
||||||
g.rewardDelivered(peer, msg.topicIds, false, delay)
|
g.rewardDelivered(peer, topic, false, delay)
|
||||||
|
|
||||||
libp2p_gossipsub_duplicate.inc()
|
libp2p_gossipsub_duplicate.inc()
|
||||||
|
|
||||||
|
@ -494,7 +492,7 @@ method rpcHandler*(g: GossipSub,
|
||||||
libp2p_gossipsub_received.inc()
|
libp2p_gossipsub_received.inc()
|
||||||
|
|
||||||
# avoid processing messages we are not interested in
|
# avoid processing messages we are not interested in
|
||||||
if msg.topicIds.allIt(it notin g.topics):
|
if topic notin g.topics:
|
||||||
debug "Dropping message of topic without subscription", msgId = shortLog(msgId), peer
|
debug "Dropping message of topic without subscription", msgId = shortLog(msgId), peer
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
|
|
@ -103,8 +103,8 @@ proc handleGraft*(g: GossipSub,
|
||||||
grafts: seq[ControlGraft]): seq[ControlPrune] = # {.raises: [Defect].} TODO chronicles exception on windows
|
grafts: seq[ControlGraft]): seq[ControlPrune] = # {.raises: [Defect].} TODO chronicles exception on windows
|
||||||
var prunes: seq[ControlPrune]
|
var prunes: seq[ControlPrune]
|
||||||
for graft in grafts:
|
for graft in grafts:
|
||||||
let topic = graft.topicId
|
let topic = graft.topicID
|
||||||
trace "peer grafted topic", peer, topic
|
trace "peer grafted topicID", peer, topic
|
||||||
|
|
||||||
# It is an error to GRAFT on a direct peer
|
# It is an error to GRAFT on a direct peer
|
||||||
if peer.peerId in g.parameters.directPeers:
|
if peer.peerId in g.parameters.directPeers:
|
||||||
|
@ -207,9 +207,9 @@ proc getPeers(prune: ControlPrune, peer: PubSubPeer): seq[(PeerId, Option[PeerRe
|
||||||
|
|
||||||
proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) {.raises: [].} =
|
proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) {.raises: [].} =
|
||||||
for prune in prunes:
|
for prune in prunes:
|
||||||
let topic = prune.topicId
|
let topic = prune.topicID
|
||||||
|
|
||||||
trace "peer pruned topic", peer, topic
|
trace "peer pruned topicID", peer, topic
|
||||||
|
|
||||||
# add peer backoff
|
# add peer backoff
|
||||||
if prune.backoff > 0:
|
if prune.backoff > 0:
|
||||||
|
@ -248,26 +248,26 @@ proc handleIHave*(g: GossipSub,
|
||||||
else:
|
else:
|
||||||
for ihave in ihaves:
|
for ihave in ihaves:
|
||||||
trace "peer sent ihave",
|
trace "peer sent ihave",
|
||||||
peer, topic = ihave.topicId, msgs = ihave.messageIds
|
peer, topicID = ihave.topicID, msgs = ihave.messageIDs
|
||||||
if ihave.topicId in g.topics:
|
if ihave.topicID in g.topics:
|
||||||
for msgId in ihave.messageIds:
|
for msgId in ihave.messageIDs:
|
||||||
if not g.hasSeen(msgId):
|
if not g.hasSeen(msgId):
|
||||||
if peer.iHaveBudget <= 0:
|
if peer.iHaveBudget <= 0:
|
||||||
break
|
break
|
||||||
elif msgId notin res.messageIds:
|
elif msgId notin res.messageIDs:
|
||||||
res.messageIds.add(msgId)
|
res.messageIDs.add(msgId)
|
||||||
dec peer.iHaveBudget
|
dec peer.iHaveBudget
|
||||||
trace "requested message via ihave", messageID=msgId
|
trace "requested message via ihave", messageID=msgId
|
||||||
# shuffling res.messageIDs before sending it out to increase the likelihood
|
# shuffling res.messageIDs before sending it out to increase the likelihood
|
||||||
# of getting an answer if the peer truncates the list due to internal size restrictions.
|
# of getting an answer if the peer truncates the list due to internal size restrictions.
|
||||||
g.rng.shuffle(res.messageIds)
|
g.rng.shuffle(res.messageIDs)
|
||||||
return res
|
return res
|
||||||
|
|
||||||
proc handleIDontWant*(g: GossipSub,
|
proc handleIDontWant*(g: GossipSub,
|
||||||
peer: PubSubPeer,
|
peer: PubSubPeer,
|
||||||
iDontWants: seq[ControlIWant]) =
|
iDontWants: seq[ControlIWant]) =
|
||||||
for dontWant in iDontWants:
|
for dontWant in iDontWants:
|
||||||
for messageId in dontWant.messageIds:
|
for messageId in dontWant.messageIDs:
|
||||||
if peer.heDontWants[^1].len > 1000: break
|
if peer.heDontWants[^1].len > 1000: break
|
||||||
if messageId.len > 100: continue
|
if messageId.len > 100: continue
|
||||||
peer.heDontWants[^1].incl(messageId)
|
peer.heDontWants[^1].incl(messageId)
|
||||||
|
@ -282,7 +282,7 @@ proc handleIWant*(g: GossipSub,
|
||||||
trace "iwant: ignoring low score peer", peer, score = peer.score
|
trace "iwant: ignoring low score peer", peer, score = peer.score
|
||||||
else:
|
else:
|
||||||
for iwant in iwants:
|
for iwant in iwants:
|
||||||
for mid in iwant.messageIds:
|
for mid in iwant.messageIDs:
|
||||||
trace "peer sent iwant", peer, messageID = mid
|
trace "peer sent iwant", peer, messageID = mid
|
||||||
# canAskIWant will only return true once for a specific message
|
# canAskIWant will only return true once for a specific message
|
||||||
if not peer.canAskIWant(mid):
|
if not peer.canAskIWant(mid):
|
||||||
|
@ -579,7 +579,7 @@ proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.raises:
|
||||||
trace "getting gossip peers (iHave)", ntopics=topics.len
|
trace "getting gossip peers (iHave)", ntopics=topics.len
|
||||||
for topic in topics:
|
for topic in topics:
|
||||||
if topic notin g.gossipsub:
|
if topic notin g.gossipsub:
|
||||||
trace "topic not in gossip array, skipping", topicID = topic
|
trace "topic not in gossip array, skipping", topic = topic
|
||||||
continue
|
continue
|
||||||
|
|
||||||
let mids = g.mcache.window(topic)
|
let mids = g.mcache.window(topic)
|
||||||
|
@ -621,7 +621,7 @@ proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.raises:
|
||||||
g.rng.shuffle(allPeers)
|
g.rng.shuffle(allPeers)
|
||||||
allPeers.setLen(target)
|
allPeers.setLen(target)
|
||||||
|
|
||||||
let msgIdsAsSet = ihave.messageIds.toHashSet()
|
let msgIdsAsSet = ihave.messageIDs.toHashSet()
|
||||||
|
|
||||||
for peer in allPeers:
|
for peer in allPeers:
|
||||||
control.mgetOrPut(peer, ControlMessage()).ihave.add(ihave)
|
control.mgetOrPut(peer, ControlMessage()).ihave.add(ihave)
|
||||||
|
@ -687,8 +687,8 @@ proc onHeartbeat(g: GossipSub) {.raises: [].} =
|
||||||
for peer, control in peers:
|
for peer, control in peers:
|
||||||
# only ihave from here
|
# only ihave from here
|
||||||
for ihave in control.ihave:
|
for ihave in control.ihave:
|
||||||
if g.knownTopics.contains(ihave.topicId):
|
if g.knownTopics.contains(ihave.topicID):
|
||||||
libp2p_pubsub_broadcast_ihave.inc(labelValues = [ihave.topicId])
|
libp2p_pubsub_broadcast_ihave.inc(labelValues = [ihave.topicID])
|
||||||
else:
|
else:
|
||||||
libp2p_pubsub_broadcast_ihave.inc(labelValues = ["generic"])
|
libp2p_pubsub_broadcast_ihave.inc(labelValues = ["generic"])
|
||||||
g.send(peer, RPCMsg(control: some(control)), isHighPriority = true)
|
g.send(peer, RPCMsg(control: some(control)), isHighPriority = true)
|
||||||
|
|
|
@ -250,37 +250,36 @@ proc punishInvalidMessage*(g: GossipSub, peer: PubSubPeer, msg: Message) {.async
|
||||||
await g.disconnectPeer(peer)
|
await g.disconnectPeer(peer)
|
||||||
raise newException(PeerRateLimitError, "Peer disconnected because it's above rate limit.")
|
raise newException(PeerRateLimitError, "Peer disconnected because it's above rate limit.")
|
||||||
|
|
||||||
|
let topic = msg.topic
|
||||||
|
if topic notin g.topics:
|
||||||
|
return
|
||||||
|
|
||||||
for tt in msg.topicIds:
|
|
||||||
let t = tt
|
|
||||||
if t notin g.topics:
|
|
||||||
continue
|
|
||||||
|
|
||||||
let tt = t
|
|
||||||
# update stats
|
# update stats
|
||||||
g.withPeerStats(peer.peerId) do(stats: var PeerStats):
|
g.withPeerStats(peer.peerId) do(stats: var PeerStats):
|
||||||
stats.topicInfos.mgetOrPut(tt, TopicInfo()).invalidMessageDeliveries += 1
|
stats.topicInfos.mgetOrPut(topic, TopicInfo()).invalidMessageDeliveries += 1
|
||||||
|
|
||||||
proc addCapped*[T](stat: var T, diff, cap: T) =
|
proc addCapped*[T](stat: var T, diff, cap: T) =
|
||||||
stat += min(diff, cap - stat)
|
stat += min(diff, cap - stat)
|
||||||
|
|
||||||
proc rewardDelivered*(
|
proc rewardDelivered*(
|
||||||
g: GossipSub, peer: PubSubPeer, topics: openArray[string], first: bool, delay = ZeroDuration) =
|
g: GossipSub,
|
||||||
for tt in topics:
|
peer: PubSubPeer,
|
||||||
let t = tt
|
topic: string,
|
||||||
if t notin g.topics:
|
first: bool,
|
||||||
continue
|
delay = ZeroDuration,
|
||||||
|
) =
|
||||||
|
if topic notin g.topics:
|
||||||
|
return
|
||||||
|
|
||||||
let tt = t
|
let topicParams = g.topicParams.mgetOrPut(topic, TopicParams.init())
|
||||||
let topicParams = g.topicParams.mgetOrPut(t, TopicParams.init())
|
|
||||||
# if in mesh add more delivery score
|
# if in mesh add more delivery score
|
||||||
|
|
||||||
if delay > topicParams.meshMessageDeliveriesWindow:
|
if delay > topicParams.meshMessageDeliveriesWindow:
|
||||||
# Too old
|
# Too old
|
||||||
continue
|
return
|
||||||
|
|
||||||
g.withPeerStats(peer.peerId) do (stats: var PeerStats):
|
g.withPeerStats(peer.peerId) do (stats: var PeerStats):
|
||||||
stats.topicInfos.withValue(tt, tstats):
|
stats.topicInfos.withValue(topic, tstats):
|
||||||
if first:
|
if first:
|
||||||
tstats[].firstMessageDeliveries.addCapped(
|
tstats[].firstMessageDeliveries.addCapped(
|
||||||
1, topicParams.firstMessageDeliveriesCap)
|
1, topicParams.firstMessageDeliveriesCap)
|
||||||
|
@ -288,5 +287,5 @@ proc rewardDelivered*(
|
||||||
if tstats[].inMesh:
|
if tstats[].inMesh:
|
||||||
tstats[].meshMessageDeliveries.addCapped(
|
tstats[].meshMessageDeliveries.addCapped(
|
||||||
1, topicParams.meshMessageDeliveriesCap)
|
1, topicParams.meshMessageDeliveriesCap)
|
||||||
do: # make sure we don't loose this information
|
do: # make sure we don't lose this information
|
||||||
stats.topicInfos[tt] = TopicInfo(meshMessageDeliveries: 1)
|
stats.topicInfos[topic] = TopicInfo(meshMessageDeliveries: 1)
|
||||||
|
|
|
@ -17,7 +17,7 @@ export sets, tables, messages, options
|
||||||
type
|
type
|
||||||
CacheEntry* = object
|
CacheEntry* = object
|
||||||
mid*: MessageId
|
mid*: MessageId
|
||||||
topicIds*: seq[string]
|
topic*: string
|
||||||
|
|
||||||
MCache* = object of RootObj
|
MCache* = object of RootObj
|
||||||
msgs*: Table[MessageId, Message]
|
msgs*: Table[MessageId, Message]
|
||||||
|
@ -37,7 +37,7 @@ func contains*(c: MCache, mid: MessageId): bool =
|
||||||
func put*(c: var MCache, msgId: MessageId, msg: Message) =
|
func put*(c: var MCache, msgId: MessageId, msg: Message) =
|
||||||
if not c.msgs.hasKeyOrPut(msgId, msg):
|
if not c.msgs.hasKeyOrPut(msgId, msg):
|
||||||
# Only add cache entry if the message was not already in the cache
|
# Only add cache entry if the message was not already in the cache
|
||||||
c.history[0].add(CacheEntry(mid: msgId, topicIds: msg.topicIds))
|
c.history[0].add(CacheEntry(mid: msgId, topic: msg.topic))
|
||||||
|
|
||||||
func window*(c: MCache, topic: string): HashSet[MessageId] =
|
func window*(c: MCache, topic: string): HashSet[MessageId] =
|
||||||
let
|
let
|
||||||
|
@ -45,10 +45,8 @@ func window*(c: MCache, topic: string): HashSet[MessageId] =
|
||||||
|
|
||||||
for i in 0..<len:
|
for i in 0..<len:
|
||||||
for entry in c.history[i]:
|
for entry in c.history[i]:
|
||||||
for t in entry.topicIds:
|
if entry.topic == topic:
|
||||||
if t == topic:
|
|
||||||
result.incl(entry.mid)
|
result.incl(entry.mid)
|
||||||
break
|
|
||||||
|
|
||||||
func shift*(c: var MCache) =
|
func shift*(c: var MCache) =
|
||||||
for entry in c.history.pop():
|
for entry in c.history.pop():
|
||||||
|
|
|
@ -181,7 +181,7 @@ proc broadcast*(
|
||||||
libp2p_pubsub_broadcast_unsubscriptions.inc(npeers, labelValues = ["generic"])
|
libp2p_pubsub_broadcast_unsubscriptions.inc(npeers, labelValues = ["generic"])
|
||||||
|
|
||||||
for smsg in msg.messages:
|
for smsg in msg.messages:
|
||||||
for topic in smsg.topicIds:
|
let topic = smsg.topic
|
||||||
if p.knownTopics.contains(topic):
|
if p.knownTopics.contains(topic):
|
||||||
libp2p_pubsub_broadcast_messages.inc(npeers, labelValues = [topic])
|
libp2p_pubsub_broadcast_messages.inc(npeers, labelValues = [topic])
|
||||||
else:
|
else:
|
||||||
|
@ -191,18 +191,18 @@ proc broadcast*(
|
||||||
libp2p_pubsub_broadcast_iwant.inc(npeers * control.iwant.len.int64)
|
libp2p_pubsub_broadcast_iwant.inc(npeers * control.iwant.len.int64)
|
||||||
|
|
||||||
for ihave in control.ihave:
|
for ihave in control.ihave:
|
||||||
if p.knownTopics.contains(ihave.topicId):
|
if p.knownTopics.contains(ihave.topicID):
|
||||||
libp2p_pubsub_broadcast_ihave.inc(npeers, labelValues = [ihave.topicId])
|
libp2p_pubsub_broadcast_ihave.inc(npeers, labelValues = [ihave.topicID])
|
||||||
else:
|
else:
|
||||||
libp2p_pubsub_broadcast_ihave.inc(npeers, labelValues = ["generic"])
|
libp2p_pubsub_broadcast_ihave.inc(npeers, labelValues = ["generic"])
|
||||||
for graft in control.graft:
|
for graft in control.graft:
|
||||||
if p.knownTopics.contains(graft.topicId):
|
if p.knownTopics.contains(graft.topicID):
|
||||||
libp2p_pubsub_broadcast_graft.inc(npeers, labelValues = [graft.topicId])
|
libp2p_pubsub_broadcast_graft.inc(npeers, labelValues = [graft.topicID])
|
||||||
else:
|
else:
|
||||||
libp2p_pubsub_broadcast_graft.inc(npeers, labelValues = ["generic"])
|
libp2p_pubsub_broadcast_graft.inc(npeers, labelValues = ["generic"])
|
||||||
for prune in control.prune:
|
for prune in control.prune:
|
||||||
if p.knownTopics.contains(prune.topicId):
|
if p.knownTopics.contains(prune.topicID):
|
||||||
libp2p_pubsub_broadcast_prune.inc(npeers, labelValues = [prune.topicId])
|
libp2p_pubsub_broadcast_prune.inc(npeers, labelValues = [prune.topicID])
|
||||||
else:
|
else:
|
||||||
libp2p_pubsub_broadcast_prune.inc(npeers, labelValues = ["generic"])
|
libp2p_pubsub_broadcast_prune.inc(npeers, labelValues = ["generic"])
|
||||||
|
|
||||||
|
@ -252,9 +252,7 @@ proc updateMetrics*(p: PubSub, rpcMsg: RPCMsg) =
|
||||||
libp2p_pubsub_received_unsubscriptions.inc(labelValues = ["generic"])
|
libp2p_pubsub_received_unsubscriptions.inc(labelValues = ["generic"])
|
||||||
|
|
||||||
for i in 0..<rpcMsg.messages.len():
|
for i in 0..<rpcMsg.messages.len():
|
||||||
template smsg: untyped = rpcMsg.messages[i]
|
let topic = rpcMsg.messages[i].topic
|
||||||
for j in 0..<smsg.topicIds.len():
|
|
||||||
template topic: untyped = smsg.topicIds[j]
|
|
||||||
if p.knownTopics.contains(topic):
|
if p.knownTopics.contains(topic):
|
||||||
libp2p_pubsub_received_messages.inc(labelValues = [topic])
|
libp2p_pubsub_received_messages.inc(labelValues = [topic])
|
||||||
else:
|
else:
|
||||||
|
@ -263,18 +261,18 @@ proc updateMetrics*(p: PubSub, rpcMsg: RPCMsg) =
|
||||||
rpcMsg.control.withValue(control):
|
rpcMsg.control.withValue(control):
|
||||||
libp2p_pubsub_received_iwant.inc(control.iwant.len.int64)
|
libp2p_pubsub_received_iwant.inc(control.iwant.len.int64)
|
||||||
for ihave in control.ihave:
|
for ihave in control.ihave:
|
||||||
if p.knownTopics.contains(ihave.topicId):
|
if p.knownTopics.contains(ihave.topicID):
|
||||||
libp2p_pubsub_received_ihave.inc(labelValues = [ihave.topicId])
|
libp2p_pubsub_received_ihave.inc(labelValues = [ihave.topicID])
|
||||||
else:
|
else:
|
||||||
libp2p_pubsub_received_ihave.inc(labelValues = ["generic"])
|
libp2p_pubsub_received_ihave.inc(labelValues = ["generic"])
|
||||||
for graft in control.graft:
|
for graft in control.graft:
|
||||||
if p.knownTopics.contains(graft.topicId):
|
if p.knownTopics.contains(graft.topicID):
|
||||||
libp2p_pubsub_received_graft.inc(labelValues = [graft.topicId])
|
libp2p_pubsub_received_graft.inc(labelValues = [graft.topicID])
|
||||||
else:
|
else:
|
||||||
libp2p_pubsub_received_graft.inc(labelValues = ["generic"])
|
libp2p_pubsub_received_graft.inc(labelValues = ["generic"])
|
||||||
for prune in control.prune:
|
for prune in control.prune:
|
||||||
if p.knownTopics.contains(prune.topicId):
|
if p.knownTopics.contains(prune.topicID):
|
||||||
libp2p_pubsub_received_prune.inc(labelValues = [prune.topicId])
|
libp2p_pubsub_received_prune.inc(labelValues = [prune.topicID])
|
||||||
else:
|
else:
|
||||||
libp2p_pubsub_received_prune.inc(labelValues = ["generic"])
|
libp2p_pubsub_received_prune.inc(labelValues = ["generic"])
|
||||||
|
|
||||||
|
@ -517,7 +515,7 @@ method addValidator*(p: PubSub,
|
||||||
## will be sent to `hook`. `hook` can return either `Accept`,
|
## will be sent to `hook`. `hook` can return either `Accept`,
|
||||||
## `Ignore` or `Reject` (which can descore the peer)
|
## `Ignore` or `Reject` (which can descore the peer)
|
||||||
for t in topic:
|
for t in topic:
|
||||||
trace "adding validator for topic", topicId = t
|
trace "adding validator for topic", topic = t
|
||||||
p.validators.mgetOrPut(t, HashSet[ValidatorHandler]()).incl(hook)
|
p.validators.mgetOrPut(t, HashSet[ValidatorHandler]()).incl(hook)
|
||||||
|
|
||||||
method removeValidator*(p: PubSub,
|
method removeValidator*(p: PubSub,
|
||||||
|
@ -532,11 +530,11 @@ method removeValidator*(p: PubSub,
|
||||||
method validate*(p: PubSub, message: Message): Future[ValidationResult] {.async, base.} =
|
method validate*(p: PubSub, message: Message): Future[ValidationResult] {.async, base.} =
|
||||||
var pending: seq[Future[ValidationResult]]
|
var pending: seq[Future[ValidationResult]]
|
||||||
trace "about to validate message"
|
trace "about to validate message"
|
||||||
for topic in message.topicIds:
|
let topic = message.topic
|
||||||
trace "looking for validators on topic", topicId = topic,
|
trace "looking for validators on topic",
|
||||||
registered = toSeq(p.validators.keys)
|
topic = topic, registered = toSeq(p.validators.keys)
|
||||||
if topic in p.validators:
|
if topic in p.validators:
|
||||||
trace "running validators for topic", topicId = topic
|
trace "running validators for topic", topic = topic
|
||||||
for validator in p.validators[topic]:
|
for validator in p.validators[topic]:
|
||||||
pending.add(validator(topic, message))
|
pending.add(validator(topic, message))
|
||||||
|
|
||||||
|
|
|
@ -247,9 +247,8 @@ proc hasSendConn*(p: PubSubPeer): bool =
|
||||||
template sendMetrics(msg: RPCMsg): untyped =
|
template sendMetrics(msg: RPCMsg): untyped =
|
||||||
when defined(libp2p_expensive_metrics):
|
when defined(libp2p_expensive_metrics):
|
||||||
for x in msg.messages:
|
for x in msg.messages:
|
||||||
for t in x.topicIds:
|
|
||||||
# metrics
|
# metrics
|
||||||
libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t])
|
libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, x.topic])
|
||||||
|
|
||||||
proc clearSendPriorityQueue(p: PubSubPeer) =
|
proc clearSendPriorityQueue(p: PubSubPeer) =
|
||||||
if p.rpcmessagequeue.sendPriorityQueue.len == 0:
|
if p.rpcmessagequeue.sendPriorityQueue.len == 0:
|
||||||
|
|
|
@ -63,7 +63,7 @@ proc init*(
|
||||||
seqno: Option[uint64],
|
seqno: Option[uint64],
|
||||||
sign: bool = true): Message
|
sign: bool = true): Message
|
||||||
{.gcsafe, raises: [LPError].} =
|
{.gcsafe, raises: [LPError].} =
|
||||||
var msg = Message(data: data, topicIDs: @[topic])
|
var msg = Message(data: data, topic: topic)
|
||||||
|
|
||||||
# order matters, we want to include seqno in the signature
|
# order matters, we want to include seqno in the signature
|
||||||
seqno.withValue(seqn):
|
seqno.withValue(seqn):
|
||||||
|
@ -87,7 +87,7 @@ proc init*(
|
||||||
topic: string,
|
topic: string,
|
||||||
seqno: Option[uint64]): Message
|
seqno: Option[uint64]): Message
|
||||||
{.gcsafe, raises: [LPError].} =
|
{.gcsafe, raises: [LPError].} =
|
||||||
var msg = Message(data: data, topicIDs: @[topic])
|
var msg = Message(data: data, topic: topic)
|
||||||
msg.fromPeer = peerId
|
msg.fromPeer = peerId
|
||||||
|
|
||||||
seqno.withValue(seqn):
|
seqno.withValue(seqn):
|
||||||
|
|
|
@ -41,7 +41,7 @@ type
|
||||||
fromPeer*: PeerId
|
fromPeer*: PeerId
|
||||||
data*: seq[byte]
|
data*: seq[byte]
|
||||||
seqno*: seq[byte]
|
seqno*: seq[byte]
|
||||||
topicIds*: seq[string]
|
topic*: string
|
||||||
signature*: seq[byte]
|
signature*: seq[byte]
|
||||||
key*: seq[byte]
|
key*: seq[byte]
|
||||||
|
|
||||||
|
@ -53,17 +53,17 @@ type
|
||||||
idontwant*: seq[ControlIWant]
|
idontwant*: seq[ControlIWant]
|
||||||
|
|
||||||
ControlIHave* = object
|
ControlIHave* = object
|
||||||
topicId*: string
|
topicID*: string
|
||||||
messageIds*: seq[MessageId]
|
messageIDs*: seq[MessageId]
|
||||||
|
|
||||||
ControlIWant* = object
|
ControlIWant* = object
|
||||||
messageIds*: seq[MessageId]
|
messageIDs*: seq[MessageId]
|
||||||
|
|
||||||
ControlGraft* = object
|
ControlGraft* = object
|
||||||
topicId*: string
|
topicID*: string
|
||||||
|
|
||||||
ControlPrune* = object
|
ControlPrune* = object
|
||||||
topicId*: string
|
topicID*: string
|
||||||
peers*: seq[PeerInfoMsg]
|
peers*: seq[PeerInfoMsg]
|
||||||
backoff*: uint64
|
backoff*: uint64
|
||||||
|
|
||||||
|
@ -81,23 +81,23 @@ func withSubs*(
|
||||||
|
|
||||||
func shortLog*(s: ControlIHave): auto =
|
func shortLog*(s: ControlIHave): auto =
|
||||||
(
|
(
|
||||||
topicId: s.topicId.shortLog,
|
topic: s.topicID.shortLog,
|
||||||
messageIds: mapIt(s.messageIds, it.shortLog)
|
messageIDs: mapIt(s.messageIDs, it.shortLog)
|
||||||
)
|
)
|
||||||
|
|
||||||
func shortLog*(s: ControlIWant): auto =
|
func shortLog*(s: ControlIWant): auto =
|
||||||
(
|
(
|
||||||
messageIds: mapIt(s.messageIds, it.shortLog)
|
messageIDs: mapIt(s.messageIDs, it.shortLog)
|
||||||
)
|
)
|
||||||
|
|
||||||
func shortLog*(s: ControlGraft): auto =
|
func shortLog*(s: ControlGraft): auto =
|
||||||
(
|
(
|
||||||
topicId: s.topicId.shortLog
|
topic: s.topicID.shortLog
|
||||||
)
|
)
|
||||||
|
|
||||||
func shortLog*(s: ControlPrune): auto =
|
func shortLog*(s: ControlPrune): auto =
|
||||||
(
|
(
|
||||||
topicId: s.topicId.shortLog
|
topic: s.topicID.shortLog
|
||||||
)
|
)
|
||||||
|
|
||||||
func shortLog*(c: ControlMessage): auto =
|
func shortLog*(c: ControlMessage): auto =
|
||||||
|
@ -113,7 +113,7 @@ func shortLog*(msg: Message): auto =
|
||||||
fromPeer: msg.fromPeer.shortLog,
|
fromPeer: msg.fromPeer.shortLog,
|
||||||
data: msg.data.shortLog,
|
data: msg.data.shortLog,
|
||||||
seqno: msg.seqno.shortLog,
|
seqno: msg.seqno.shortLog,
|
||||||
topicIds: $msg.topicIds,
|
topic: msg.topic,
|
||||||
signature: msg.signature.shortLog,
|
signature: msg.signature.shortLog,
|
||||||
key: msg.key.shortLog
|
key: msg.key.shortLog
|
||||||
)
|
)
|
||||||
|
@ -133,35 +133,35 @@ static: expectedFields(SubOpts, @["subscribe", "topic"])
|
||||||
proc byteSize(subOpts: SubOpts): int =
|
proc byteSize(subOpts: SubOpts): int =
|
||||||
1 + subOpts.topic.len # 1 byte for the bool
|
1 + subOpts.topic.len # 1 byte for the bool
|
||||||
|
|
||||||
static: expectedFields(Message, @["fromPeer", "data", "seqno", "topicIds", "signature", "key"])
|
static: expectedFields(Message, @["fromPeer", "data", "seqno", "topic", "signature", "key"])
|
||||||
proc byteSize*(msg: Message): int =
|
proc byteSize*(msg: Message): int =
|
||||||
msg.fromPeer.len + msg.data.len + msg.seqno.len +
|
msg.fromPeer.len + msg.data.len + msg.seqno.len + msg.signature.len + msg.key.len +
|
||||||
msg.signature.len + msg.key.len + msg.topicIds.foldl(a + b.len, 0)
|
msg.topic.len
|
||||||
|
|
||||||
proc byteSize*(msgs: seq[Message]): int =
|
proc byteSize*(msgs: seq[Message]): int =
|
||||||
msgs.foldl(a + b.byteSize, 0)
|
msgs.foldl(a + b.byteSize, 0)
|
||||||
|
|
||||||
static: expectedFields(ControlIHave, @["topicId", "messageIds"])
|
static: expectedFields(ControlIHave, @["topicID", "messageIDs"])
|
||||||
proc byteSize(controlIHave: ControlIHave): int =
|
proc byteSize(controlIHave: ControlIHave): int =
|
||||||
controlIHave.topicId.len + controlIHave.messageIds.foldl(a + b.len, 0)
|
controlIHave.topicID.len + controlIHave.messageIDs.foldl(a + b.len, 0)
|
||||||
|
|
||||||
proc byteSize*(ihaves: seq[ControlIHave]): int =
|
proc byteSize*(ihaves: seq[ControlIHave]): int =
|
||||||
ihaves.foldl(a + b.byteSize, 0)
|
ihaves.foldl(a + b.byteSize, 0)
|
||||||
|
|
||||||
static: expectedFields(ControlIWant, @["messageIds"])
|
static: expectedFields(ControlIWant, @["messageIDs"])
|
||||||
proc byteSize(controlIWant: ControlIWant): int =
|
proc byteSize(controlIWant: ControlIWant): int =
|
||||||
controlIWant.messageIds.foldl(a + b.len, 0)
|
controlIWant.messageIDs.foldl(a + b.len, 0)
|
||||||
|
|
||||||
proc byteSize*(iwants: seq[ControlIWant]): int =
|
proc byteSize*(iwants: seq[ControlIWant]): int =
|
||||||
iwants.foldl(a + b.byteSize, 0)
|
iwants.foldl(a + b.byteSize, 0)
|
||||||
|
|
||||||
static: expectedFields(ControlGraft, @["topicId"])
|
static: expectedFields(ControlGraft, @["topicID"])
|
||||||
proc byteSize(controlGraft: ControlGraft): int =
|
proc byteSize(controlGraft: ControlGraft): int =
|
||||||
controlGraft.topicId.len
|
controlGraft.topicID.len
|
||||||
|
|
||||||
static: expectedFields(ControlPrune, @["topicId", "peers", "backoff"])
|
static: expectedFields(ControlPrune, @["topicID", "peers", "backoff"])
|
||||||
proc byteSize(controlPrune: ControlPrune): int =
|
proc byteSize(controlPrune: ControlPrune): int =
|
||||||
controlPrune.topicId.len + controlPrune.peers.foldl(a + b.byteSize, 0) + 8 # 8 bytes for uint64
|
controlPrune.topicID.len + controlPrune.peers.foldl(a + b.byteSize, 0) + 8 # 8 bytes for uint64
|
||||||
|
|
||||||
static: expectedFields(ControlMessage, @["ihave", "iwant", "graft", "prune", "idontwant"])
|
static: expectedFields(ControlMessage, @["ihave", "iwant", "graft", "prune", "idontwant"])
|
||||||
proc byteSize(control: ControlMessage): int =
|
proc byteSize(control: ControlMessage): int =
|
||||||
|
|
|
@ -29,7 +29,7 @@ when defined(libp2p_protobuf_metrics):
|
||||||
|
|
||||||
proc write*(pb: var ProtoBuffer, field: int, graft: ControlGraft) =
|
proc write*(pb: var ProtoBuffer, field: int, graft: ControlGraft) =
|
||||||
var ipb = initProtoBuffer()
|
var ipb = initProtoBuffer()
|
||||||
ipb.write(1, graft.topicId)
|
ipb.write(1, graft.topicID)
|
||||||
ipb.finish()
|
ipb.finish()
|
||||||
pb.write(field, ipb)
|
pb.write(field, ipb)
|
||||||
|
|
||||||
|
@ -45,7 +45,7 @@ proc write*(pb: var ProtoBuffer, field: int, infoMsg: PeerInfoMsg) =
|
||||||
|
|
||||||
proc write*(pb: var ProtoBuffer, field: int, prune: ControlPrune) =
|
proc write*(pb: var ProtoBuffer, field: int, prune: ControlPrune) =
|
||||||
var ipb = initProtoBuffer()
|
var ipb = initProtoBuffer()
|
||||||
ipb.write(1, prune.topicId)
|
ipb.write(1, prune.topicID)
|
||||||
for peer in prune.peers:
|
for peer in prune.peers:
|
||||||
ipb.write(2, peer)
|
ipb.write(2, peer)
|
||||||
ipb.write(3, prune.backoff)
|
ipb.write(3, prune.backoff)
|
||||||
|
@ -57,8 +57,8 @@ proc write*(pb: var ProtoBuffer, field: int, prune: ControlPrune) =
|
||||||
|
|
||||||
proc write*(pb: var ProtoBuffer, field: int, ihave: ControlIHave) =
|
proc write*(pb: var ProtoBuffer, field: int, ihave: ControlIHave) =
|
||||||
var ipb = initProtoBuffer()
|
var ipb = initProtoBuffer()
|
||||||
ipb.write(1, ihave.topicId)
|
ipb.write(1, ihave.topicID)
|
||||||
for mid in ihave.messageIds:
|
for mid in ihave.messageIDs:
|
||||||
ipb.write(2, mid)
|
ipb.write(2, mid)
|
||||||
ipb.finish()
|
ipb.finish()
|
||||||
pb.write(field, ipb)
|
pb.write(field, ipb)
|
||||||
|
@ -68,7 +68,7 @@ proc write*(pb: var ProtoBuffer, field: int, ihave: ControlIHave) =
|
||||||
|
|
||||||
proc write*(pb: var ProtoBuffer, field: int, iwant: ControlIWant) =
|
proc write*(pb: var ProtoBuffer, field: int, iwant: ControlIWant) =
|
||||||
var ipb = initProtoBuffer()
|
var ipb = initProtoBuffer()
|
||||||
for mid in iwant.messageIds:
|
for mid in iwant.messageIDs:
|
||||||
ipb.write(1, mid)
|
ipb.write(1, mid)
|
||||||
if len(ipb.buffer) > 0:
|
if len(ipb.buffer) > 0:
|
||||||
ipb.finish()
|
ipb.finish()
|
||||||
|
@ -110,8 +110,7 @@ proc encodeMessage*(msg: Message, anonymize: bool): seq[byte] =
|
||||||
pb.write(2, msg.data)
|
pb.write(2, msg.data)
|
||||||
if len(msg.seqno) > 0 and not anonymize:
|
if len(msg.seqno) > 0 and not anonymize:
|
||||||
pb.write(3, msg.seqno)
|
pb.write(3, msg.seqno)
|
||||||
for topic in msg.topicIds:
|
pb.write(4, msg.topic)
|
||||||
pb.write(4, topic)
|
|
||||||
if len(msg.signature) > 0 and not anonymize:
|
if len(msg.signature) > 0 and not anonymize:
|
||||||
pb.write(5, msg.signature)
|
pb.write(5, msg.signature)
|
||||||
if len(msg.key) > 0 and not anonymize:
|
if len(msg.key) > 0 and not anonymize:
|
||||||
|
@ -133,10 +132,10 @@ proc decodeGraft*(pb: ProtoBuffer): ProtoResult[ControlGraft] {.
|
||||||
|
|
||||||
trace "decodeGraft: decoding message"
|
trace "decodeGraft: decoding message"
|
||||||
var control = ControlGraft()
|
var control = ControlGraft()
|
||||||
if ? pb.getField(1, control.topicId):
|
if ? pb.getField(1, control.topicID):
|
||||||
trace "decodeGraft: read topicId", topic_id = control.topicId
|
trace "decodeGraft: read topicID", topicID = control.topicID
|
||||||
else:
|
else:
|
||||||
trace "decodeGraft: topicId is missing"
|
trace "decodeGraft: topicID is missing"
|
||||||
ok(control)
|
ok(control)
|
||||||
|
|
||||||
proc decodePeerInfoMsg*(pb: ProtoBuffer): ProtoResult[PeerInfoMsg] {.
|
proc decodePeerInfoMsg*(pb: ProtoBuffer): ProtoResult[PeerInfoMsg] {.
|
||||||
|
@ -160,10 +159,10 @@ proc decodePrune*(pb: ProtoBuffer): ProtoResult[ControlPrune] {.
|
||||||
|
|
||||||
trace "decodePrune: decoding message"
|
trace "decodePrune: decoding message"
|
||||||
var control = ControlPrune()
|
var control = ControlPrune()
|
||||||
if ? pb.getField(1, control.topicId):
|
if ? pb.getField(1, control.topicID):
|
||||||
trace "decodePrune: read topicId", topic_id = control.topicId
|
trace "decodePrune: read topicID", topic = control.topicID
|
||||||
else:
|
else:
|
||||||
trace "decodePrune: topicId is missing"
|
trace "decodePrune: topicID is missing"
|
||||||
var bpeers: seq[seq[byte]]
|
var bpeers: seq[seq[byte]]
|
||||||
if ? pb.getRepeatedField(2, bpeers):
|
if ? pb.getRepeatedField(2, bpeers):
|
||||||
for bpeer in bpeers:
|
for bpeer in bpeers:
|
||||||
|
@ -179,12 +178,12 @@ proc decodeIHave*(pb: ProtoBuffer): ProtoResult[ControlIHave] {.
|
||||||
|
|
||||||
trace "decodeIHave: decoding message"
|
trace "decodeIHave: decoding message"
|
||||||
var control = ControlIHave()
|
var control = ControlIHave()
|
||||||
if ? pb.getField(1, control.topicId):
|
if ? pb.getField(1, control.topicID):
|
||||||
trace "decodeIHave: read topicId", topic_id = control.topicId
|
trace "decodeIHave: read topicID", topic = control.topicID
|
||||||
else:
|
else:
|
||||||
trace "decodeIHave: topicId is missing"
|
trace "decodeIHave: topicID is missing"
|
||||||
if ? pb.getRepeatedField(2, control.messageIds):
|
if ? pb.getRepeatedField(2, control.messageIDs):
|
||||||
trace "decodeIHave: read messageIDs", message_ids = control.messageIds
|
trace "decodeIHave: read messageIDs", message_ids = control.messageIDs
|
||||||
else:
|
else:
|
||||||
trace "decodeIHave: no messageIDs"
|
trace "decodeIHave: no messageIDs"
|
||||||
ok(control)
|
ok(control)
|
||||||
|
@ -195,8 +194,8 @@ proc decodeIWant*(pb: ProtoBuffer): ProtoResult[ControlIWant] {.inline.} =
|
||||||
|
|
||||||
trace "decodeIWant: decoding message"
|
trace "decodeIWant: decoding message"
|
||||||
var control = ControlIWant()
|
var control = ControlIWant()
|
||||||
if ? pb.getRepeatedField(1, control.messageIds):
|
if ? pb.getRepeatedField(1, control.messageIDs):
|
||||||
trace "decodeIWant: read messageIDs", message_ids = control.messageIds
|
trace "decodeIWant: read messageIDs", message_ids = control.messageIDs
|
||||||
else:
|
else:
|
||||||
trace "decodeIWant: no messageIDs"
|
trace "decodeIWant: no messageIDs"
|
||||||
ok(control)
|
ok(control)
|
||||||
|
@ -286,10 +285,11 @@ proc decodeMessage*(pb: ProtoBuffer): ProtoResult[Message] {.inline.} =
|
||||||
trace "decodeMessage: read seqno", seqno = msg.seqno
|
trace "decodeMessage: read seqno", seqno = msg.seqno
|
||||||
else:
|
else:
|
||||||
trace "decodeMessage: seqno is missing"
|
trace "decodeMessage: seqno is missing"
|
||||||
if ? pb.getRepeatedField(4, msg.topicIds):
|
if ?pb.getField(4, msg.topic):
|
||||||
trace "decodeMessage: read topics", topic_ids = msg.topicIds
|
trace "decodeMessage: read topic", topic = msg.topic
|
||||||
else:
|
else:
|
||||||
trace "decodeMessage: topics are missing"
|
trace "decodeMessage: topic is required"
|
||||||
|
return err(ProtoError.RequiredFieldMissing)
|
||||||
if ? pb.getField(5, msg.signature):
|
if ? pb.getField(5, msg.signature):
|
||||||
trace "decodeMessage: read signature", signature = msg.signature.shortLog()
|
trace "decodeMessage: read signature", signature = msg.signature.shortLog()
|
||||||
else:
|
else:
|
||||||
|
|
|
@ -692,7 +692,7 @@ suite "GossipSub internal":
|
||||||
)
|
)
|
||||||
peer.iHaveBudget = 0
|
peer.iHaveBudget = 0
|
||||||
let iwants = gossipSub.handleIHave(peer, @[msg])
|
let iwants = gossipSub.handleIHave(peer, @[msg])
|
||||||
check: iwants.messageIds.len == 0
|
check: iwants.messageIDs.len == 0
|
||||||
|
|
||||||
block:
|
block:
|
||||||
# given duplicate ihave should generate only one iwant
|
# given duplicate ihave should generate only one iwant
|
||||||
|
@ -707,7 +707,7 @@ suite "GossipSub internal":
|
||||||
messageIDs: @[id, id, id]
|
messageIDs: @[id, id, id]
|
||||||
)
|
)
|
||||||
let iwants = gossipSub.handleIHave(peer, @[msg])
|
let iwants = gossipSub.handleIHave(peer, @[msg])
|
||||||
check: iwants.messageIds.len == 1
|
check: iwants.messageIDs.len == 1
|
||||||
|
|
||||||
block:
|
block:
|
||||||
# given duplicate iwant should generate only one message
|
# given duplicate iwant should generate only one message
|
||||||
|
@ -790,7 +790,7 @@ suite "GossipSub internal":
|
||||||
let (iwantMessageIds, sentMessages) = createMessages(gossip0, gossip1, messageSize, messageSize)
|
let (iwantMessageIds, sentMessages) = createMessages(gossip0, gossip1, messageSize, messageSize)
|
||||||
|
|
||||||
gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage(
|
gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage(
|
||||||
ihave: @[ControlIHave(topicId: "foobar", messageIds: iwantMessageIds)]
|
ihave: @[ControlIHave(topicID: "foobar", messageIDs: iwantMessageIds)]
|
||||||
))), isHighPriority = false)
|
))), isHighPriority = false)
|
||||||
|
|
||||||
checkUntilTimeout: receivedMessages[] == sentMessages
|
checkUntilTimeout: receivedMessages[] == sentMessages
|
||||||
|
@ -807,7 +807,7 @@ suite "GossipSub internal":
|
||||||
let (bigIWantMessageIds, sentMessages) = createMessages(gossip0, gossip1, messageSize, messageSize)
|
let (bigIWantMessageIds, sentMessages) = createMessages(gossip0, gossip1, messageSize, messageSize)
|
||||||
|
|
||||||
gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage(
|
gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage(
|
||||||
ihave: @[ControlIHave(topicId: "foobar", messageIds: bigIWantMessageIds)]
|
ihave: @[ControlIHave(topicID: "foobar", messageIDs: bigIWantMessageIds)]
|
||||||
))), isHighPriority = false)
|
))), isHighPriority = false)
|
||||||
|
|
||||||
await sleepAsync(300.milliseconds)
|
await sleepAsync(300.milliseconds)
|
||||||
|
@ -824,7 +824,7 @@ suite "GossipSub internal":
|
||||||
let (bigIWantMessageIds, sentMessages) = createMessages(gossip0, gossip1, size1, size2)
|
let (bigIWantMessageIds, sentMessages) = createMessages(gossip0, gossip1, size1, size2)
|
||||||
|
|
||||||
gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage(
|
gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage(
|
||||||
ihave: @[ControlIHave(topicId: "foobar", messageIds: bigIWantMessageIds)]
|
ihave: @[ControlIHave(topicID: "foobar", messageIDs: bigIWantMessageIds)]
|
||||||
))), isHighPriority = false)
|
))), isHighPriority = false)
|
||||||
|
|
||||||
checkUntilTimeout: receivedMessages[] == sentMessages
|
checkUntilTimeout: receivedMessages[] == sentMessages
|
||||||
|
@ -842,7 +842,7 @@ suite "GossipSub internal":
|
||||||
let (bigIWantMessageIds, sentMessages) = createMessages(gossip0, gossip1, size1, size2)
|
let (bigIWantMessageIds, sentMessages) = createMessages(gossip0, gossip1, size1, size2)
|
||||||
|
|
||||||
gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage(
|
gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage(
|
||||||
ihave: @[ControlIHave(topicId: "foobar", messageIds: bigIWantMessageIds)]
|
ihave: @[ControlIHave(topicID: "foobar", messageIDs: bigIWantMessageIds)]
|
||||||
))), isHighPriority = false)
|
))), isHighPriority = false)
|
||||||
|
|
||||||
var smallestSet: HashSet[seq[byte]]
|
var smallestSet: HashSet[seq[byte]]
|
||||||
|
|
|
@ -911,7 +911,7 @@ suite "GossipSub":
|
||||||
check: gossip3.mesh.peers("foobar") == 1
|
check: gossip3.mesh.peers("foobar") == 1
|
||||||
|
|
||||||
gossip3.broadcast(gossip3.mesh["foobar"], RPCMsg(control: some(ControlMessage(
|
gossip3.broadcast(gossip3.mesh["foobar"], RPCMsg(control: some(ControlMessage(
|
||||||
idontwant: @[ControlIWant(messageIds: @[newSeq[byte](10)])]
|
idontwant: @[ControlIWant(messageIDs: @[newSeq[byte](10)])]
|
||||||
))), isHighPriority = true)
|
))), isHighPriority = true)
|
||||||
checkUntilTimeout: gossip2.mesh.getOrDefault("foobar").anyIt(it.heDontWants[^1].len == 1)
|
checkUntilTimeout: gossip2.mesh.getOrDefault("foobar").anyIt(it.heDontWants[^1].len == 1)
|
||||||
|
|
||||||
|
@ -970,8 +970,9 @@ suite "GossipSub":
|
||||||
|
|
||||||
gossip0.broadcast(
|
gossip0.broadcast(
|
||||||
gossip0.mesh["foobar"],
|
gossip0.mesh["foobar"],
|
||||||
RPCMsg(messages: @[Message(topicIDs: @["foobar"], data: newSeq[byte](10))]),
|
RPCMsg(messages: @[Message(topic: "foobar", data: newSeq[byte](10))]),
|
||||||
isHighPriority = true)
|
isHighPriority = true,
|
||||||
|
)
|
||||||
await sleepAsync(300.millis)
|
await sleepAsync(300.millis)
|
||||||
|
|
||||||
check currentRateLimitHits() == rateLimitHits
|
check currentRateLimitHits() == rateLimitHits
|
||||||
|
@ -981,8 +982,9 @@ suite "GossipSub":
|
||||||
gossip1.parameters.disconnectPeerAboveRateLimit = true
|
gossip1.parameters.disconnectPeerAboveRateLimit = true
|
||||||
gossip0.broadcast(
|
gossip0.broadcast(
|
||||||
gossip0.mesh["foobar"],
|
gossip0.mesh["foobar"],
|
||||||
RPCMsg(messages: @[Message(topicIDs: @["foobar"], data: newSeq[byte](12))]),
|
RPCMsg(messages: @[Message(topic: "foobar", data: newSeq[byte](12))]),
|
||||||
isHighPriority = true)
|
isHighPriority = true,
|
||||||
|
)
|
||||||
await sleepAsync(300.millis)
|
await sleepAsync(300.millis)
|
||||||
|
|
||||||
check gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == true
|
check gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == true
|
||||||
|
@ -1053,7 +1055,7 @@ suite "GossipSub":
|
||||||
gossip0.addValidator(topic, execValidator)
|
gossip0.addValidator(topic, execValidator)
|
||||||
gossip1.addValidator(topic, execValidator)
|
gossip1.addValidator(topic, execValidator)
|
||||||
|
|
||||||
let msg = RPCMsg(messages: @[Message(topicIDs: @[topic], data: newSeq[byte](40))])
|
let msg = RPCMsg(messages: @[Message(topic: topic, data: newSeq[byte](40))])
|
||||||
|
|
||||||
gossip0.broadcast(gossip0.mesh[topic], msg, isHighPriority = true)
|
gossip0.broadcast(gossip0.mesh[topic], msg, isHighPriority = true)
|
||||||
await sleepAsync(300.millis)
|
await sleepAsync(300.millis)
|
||||||
|
@ -1065,8 +1067,9 @@ suite "GossipSub":
|
||||||
gossip1.parameters.disconnectPeerAboveRateLimit = true
|
gossip1.parameters.disconnectPeerAboveRateLimit = true
|
||||||
gossip0.broadcast(
|
gossip0.broadcast(
|
||||||
gossip0.mesh[topic],
|
gossip0.mesh[topic],
|
||||||
RPCMsg(messages: @[Message(topicIDs: @[topic], data: newSeq[byte](35))]),
|
RPCMsg(messages: @[Message(topic: topic, data: newSeq[byte](35))]),
|
||||||
isHighPriority = true)
|
isHighPriority = true,
|
||||||
|
)
|
||||||
|
|
||||||
checkUntilTimeout gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false
|
checkUntilTimeout gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false
|
||||||
check currentRateLimitHits() == rateLimitHits + 2
|
check currentRateLimitHits() == rateLimitHits + 2
|
||||||
|
|
|
@ -27,48 +27,48 @@ suite "MCache":
|
||||||
var mCache = MCache.init(3, 5)
|
var mCache = MCache.init(3, 5)
|
||||||
|
|
||||||
for i in 0..<3:
|
for i in 0..<3:
|
||||||
var msg = Message(fromPeer: randomPeerId(),
|
var
|
||||||
seqno: "12345".toBytes(),
|
msg =
|
||||||
topicIDs: @["foo"])
|
Message(fromPeer: randomPeerId(), seqno: "12345".toBytes(), topic: "foo")
|
||||||
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg)
|
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg)
|
||||||
|
|
||||||
for i in 0..<5:
|
for i in 0..<5:
|
||||||
var msg = Message(fromPeer: randomPeerId(),
|
var
|
||||||
seqno: "12345".toBytes(),
|
msg =
|
||||||
topicIDs: @["bar"])
|
Message(fromPeer: randomPeerId(), seqno: "12345".toBytes(), topic: "bar")
|
||||||
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg)
|
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg)
|
||||||
|
|
||||||
var mids = mCache.window("foo")
|
var mids = mCache.window("foo")
|
||||||
check mids.len == 3
|
check mids.len == 3
|
||||||
|
|
||||||
var id = toSeq(mids)[0]
|
var id = toSeq(mids)[0]
|
||||||
check mCache.get(id).get().topicIds[0] == "foo"
|
check mCache.get(id).get().topic == "foo"
|
||||||
|
|
||||||
test "shift - shift 1 window at a time":
|
test "shift - shift 1 window at a time":
|
||||||
var mCache = MCache.init(1, 5)
|
var mCache = MCache.init(1, 5)
|
||||||
|
|
||||||
for i in 0..<3:
|
for i in 0..<3:
|
||||||
var msg = Message(fromPeer: randomPeerId(),
|
var
|
||||||
seqno: "12345".toBytes(),
|
msg =
|
||||||
topicIDs: @["foo"])
|
Message(fromPeer: randomPeerId(), seqno: "12345".toBytes(), topic: "foo")
|
||||||
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg)
|
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg)
|
||||||
|
|
||||||
mCache.shift()
|
mCache.shift()
|
||||||
check mCache.window("foo").len == 0
|
check mCache.window("foo").len == 0
|
||||||
|
|
||||||
for i in 0..<3:
|
for i in 0..<3:
|
||||||
var msg = Message(fromPeer: randomPeerId(),
|
var
|
||||||
seqno: "12345".toBytes(),
|
msg =
|
||||||
topicIDs: @["bar"])
|
Message(fromPeer: randomPeerId(), seqno: "12345".toBytes(), topic: "bar")
|
||||||
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg)
|
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg)
|
||||||
|
|
||||||
mCache.shift()
|
mCache.shift()
|
||||||
check mCache.window("bar").len == 0
|
check mCache.window("bar").len == 0
|
||||||
|
|
||||||
for i in 0..<3:
|
for i in 0..<3:
|
||||||
var msg = Message(fromPeer: randomPeerId(),
|
var
|
||||||
seqno: "12345".toBytes(),
|
msg =
|
||||||
topicIDs: @["baz"])
|
Message(fromPeer: randomPeerId(), seqno: "12345".toBytes(), topic: "baz")
|
||||||
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg)
|
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg)
|
||||||
|
|
||||||
mCache.shift()
|
mCache.shift()
|
||||||
|
@ -78,21 +78,21 @@ suite "MCache":
|
||||||
var mCache = MCache.init(1, 5)
|
var mCache = MCache.init(1, 5)
|
||||||
|
|
||||||
for i in 0..<3:
|
for i in 0..<3:
|
||||||
var msg = Message(fromPeer: randomPeerId(),
|
var
|
||||||
seqno: "12345".toBytes(),
|
msg =
|
||||||
topicIDs: @["foo"])
|
Message(fromPeer: randomPeerId(), seqno: "12345".toBytes(), topic: "foo")
|
||||||
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg)
|
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg)
|
||||||
|
|
||||||
for i in 0..<3:
|
for i in 0..<3:
|
||||||
var msg = Message(fromPeer: randomPeerId(),
|
var
|
||||||
seqno: "12345".toBytes(),
|
msg =
|
||||||
topicIDs: @["bar"])
|
Message(fromPeer: randomPeerId(), seqno: "12345".toBytes(), topic: "bar")
|
||||||
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg)
|
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg)
|
||||||
|
|
||||||
for i in 0..<3:
|
for i in 0..<3:
|
||||||
var msg = Message(fromPeer: randomPeerId(),
|
var
|
||||||
seqno: "12345".toBytes(),
|
msg =
|
||||||
topicIDs: @["baz"])
|
Message(fromPeer: randomPeerId(), seqno: "12345".toBytes(), topic: "baz")
|
||||||
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg)
|
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenSuccess), msg)
|
||||||
|
|
||||||
mCache.shift()
|
mCache.shift()
|
||||||
|
|
|
@ -75,13 +75,16 @@ suite "Message":
|
||||||
msgIdResult.error == ValidationResult.Reject
|
msgIdResult.error == ValidationResult.Reject
|
||||||
|
|
||||||
test "byteSize for RPCMsg":
|
test "byteSize for RPCMsg":
|
||||||
var msg = Message(
|
var
|
||||||
|
msg =
|
||||||
|
Message(
|
||||||
fromPeer: PeerId(data: @['a'.byte, 'b'.byte]), # 2 bytes
|
fromPeer: PeerId(data: @['a'.byte, 'b'.byte]), # 2 bytes
|
||||||
data: @[1'u8, 2, 3], # 3 bytes
|
data: @[1'u8, 2, 3], # 3 bytes
|
||||||
seqno: @[4'u8, 5], # 2 bytes
|
seqno: @[4'u8, 5], # 2 bytes
|
||||||
signature: @['c'.byte, 'd'.byte], # 2 bytes
|
signature: @['c'.byte, 'd'.byte], # 2 bytes
|
||||||
key: @[6'u8, 7], # 2 bytes
|
key: @[6'u8, 7], # 2 bytes
|
||||||
topicIds: @["abc", "defgh"] # 3 + 5 = 8 bytes
|
topic: "abcde" # 5 bytes
|
||||||
|
,
|
||||||
)
|
)
|
||||||
|
|
||||||
var peerInfo = PeerInfoMsg(
|
var peerInfo = PeerInfoMsg(
|
||||||
|
@ -90,20 +93,20 @@ suite "Message":
|
||||||
)
|
)
|
||||||
|
|
||||||
var controlIHave = ControlIHave(
|
var controlIHave = ControlIHave(
|
||||||
topicId: "ijk", # 3 bytes
|
topicID: "ijk", # 3 bytes
|
||||||
messageIds: @[ @['l'.byte], @['m'.byte, 'n'.byte] ] # 1 + 2 = 3 bytes
|
messageIDs: @[ @['l'.byte], @['m'.byte, 'n'.byte] ] # 1 + 2 = 3 bytes
|
||||||
)
|
)
|
||||||
|
|
||||||
var controlIWant = ControlIWant(
|
var controlIWant = ControlIWant(
|
||||||
messageIds: @[ @['o'.byte, 'p'.byte], @['q'.byte] ] # 2 + 1 = 3 bytes
|
messageIDs: @[ @['o'.byte, 'p'.byte], @['q'.byte] ] # 2 + 1 = 3 bytes
|
||||||
)
|
)
|
||||||
|
|
||||||
var controlGraft = ControlGraft(
|
var controlGraft = ControlGraft(
|
||||||
topicId: "rst" # 3 bytes
|
topicID: "rst" # 3 bytes
|
||||||
)
|
)
|
||||||
|
|
||||||
var controlPrune = ControlPrune(
|
var controlPrune = ControlPrune(
|
||||||
topicId: "uvw", # 3 bytes
|
topicID: "uvw", # 3 bytes
|
||||||
peers: @[peerInfo, peerInfo], # (1 + 2) * 2 = 6 bytes
|
peers: @[peerInfo, peerInfo], # (1 + 2) * 2 = 6 bytes
|
||||||
backoff: 12345678 # 8 bytes for uint64
|
backoff: 12345678 # 8 bytes for uint64
|
||||||
)
|
)
|
||||||
|
@ -118,10 +121,10 @@ suite "Message":
|
||||||
|
|
||||||
var rpcMsg = RPCMsg(
|
var rpcMsg = RPCMsg(
|
||||||
subscriptions: @[SubOpts(subscribe: true, topic: "a".repeat(12)), SubOpts(subscribe: false, topic: "b".repeat(14))], # 1 + 12 + 1 + 14 = 28 bytes
|
subscriptions: @[SubOpts(subscribe: true, topic: "a".repeat(12)), SubOpts(subscribe: false, topic: "b".repeat(14))], # 1 + 12 + 1 + 14 = 28 bytes
|
||||||
messages: @[msg, msg], # 19 * 2 = 38 bytes
|
messages: @[msg, msg], # 16 * 2 = 32 bytes
|
||||||
ping: @[1'u8, 2], # 2 bytes
|
ping: @[1'u8, 2], # 2 bytes
|
||||||
pong: @[3'u8, 4], # 2 bytes
|
pong: @[3'u8, 4], # 2 bytes
|
||||||
control: some(control) # 12 + 3 + 3 + 17 + 3 = 38 bytes
|
control: some(control) # 12 + 3 + 3 + 17 + 3 = 38 bytes
|
||||||
)
|
)
|
||||||
|
|
||||||
check byteSize(rpcMsg) == 28 + 38 + 2 + 2 + 38 # Total: 108 bytes
|
check byteSize(rpcMsg) == 28 + 32 + 2 + 2 + 38 # Total: 102 bytes
|
||||||
|
|
|
@ -43,14 +43,15 @@ proc randomPeerId*(): PeerId =
|
||||||
raise newException(Defect, exc.msg)
|
raise newException(Defect, exc.msg)
|
||||||
|
|
||||||
func defaultMsgIdProvider*(m: Message): Result[MessageId, ValidationResult] =
|
func defaultMsgIdProvider*(m: Message): Result[MessageId, ValidationResult] =
|
||||||
let mid =
|
let
|
||||||
|
mid =
|
||||||
if m.seqno.len > 0 and m.fromPeer.data.len > 0:
|
if m.seqno.len > 0 and m.fromPeer.data.len > 0:
|
||||||
byteutils.toHex(m.seqno) & $m.fromPeer
|
byteutils.toHex(m.seqno) & $m.fromPeer
|
||||||
else:
|
else:
|
||||||
# This part is irrelevant because it's not standard,
|
# This part is irrelevant because it's not standard,
|
||||||
# We use it exclusively for testing basically and users should
|
# We use it exclusively for testing basically and users should
|
||||||
# implement their own logic in the case they use anonymization
|
# implement their own logic in the case they use anonymization
|
||||||
$m.data.hash & $m.topicIds.hash
|
$m.data.hash & $m.topic.hash
|
||||||
ok mid.toBytes()
|
ok mid.toBytes()
|
||||||
|
|
||||||
proc generateNodes*(
|
proc generateNodes*(
|
||||||
|
|
Loading…
Reference in New Issue