From 462da1f7a88cef6fb172c1f587b2f24122d7bbcf Mon Sep 17 00:00:00 2001 From: Giovanni Petrantoni <7008900+sinkingsugar@users.noreply.github.com> Date: Wed, 21 Oct 2020 12:26:04 +0900 Subject: [PATCH] gossip MessageID as seq[byte] (#391) * gossip MessageID as seq[byte] * combina hashes in defaultMsgIdProvider * wip * fix defaultMsgIdProvider --- libp2p/protocols/pubsub/floodsub.nim | 4 ++-- libp2p/protocols/pubsub/mcache.nim | 14 +++++++------- libp2p/protocols/pubsub/pubsub.nim | 2 +- libp2p/protocols/pubsub/rpc/message.nim | 15 ++++++++++----- libp2p/protocols/pubsub/rpc/messages.nim | 6 ++++-- 5 files changed, 24 insertions(+), 17 deletions(-) diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index cabcc7cfd..137c7920f 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -27,7 +27,7 @@ const FloodSubCodec* = "/floodsub/1.0.0" type FloodSub* = ref object of PubSub floodsub*: PeerTable # topic to remote peer map - seen*: TimedCache[string] # list of messages forwarded to peers + seen*: TimedCache[MessageID] # list of messages forwarded to peers method subscribeTopic*(f: FloodSub, topic: string, @@ -183,5 +183,5 @@ method unsubscribeAll*(f: FloodSub, topic: string) {.async.} = method initPubSub*(f: FloodSub) = procCall PubSub(f).initPubSub() f.floodsub = initTable[string, HashSet[PubSubPeer]]() - f.seen = TimedCache[string].init(2.minutes) + f.seen = TimedCache[MessageID].init(2.minutes) f.init() diff --git a/libp2p/protocols/pubsub/mcache.nim b/libp2p/protocols/pubsub/mcache.nim index 172e8bc62..4cd39baed 100644 --- a/libp2p/protocols/pubsub/mcache.nim +++ b/libp2p/protocols/pubsub/mcache.nim @@ -14,30 +14,30 @@ export sets, tables, messages, options type CacheEntry* = object - mid*: string + mid*: MessageID topicIDs*: seq[string] MCache* = object of RootObj - msgs*: Table[string, Message] + msgs*: Table[MessageID, Message] history*: seq[seq[CacheEntry]] historySize*: Natural windowSize*: Natural -func get*(c: MCache, mid: string): Option[Message] = +func get*(c: MCache, mid: MessageID): Option[Message] = result = none(Message) if mid in c.msgs: result = some(c.msgs[mid]) -func contains*(c: MCache, mid: string): bool = +func contains*(c: MCache, mid: MessageID): bool = c.get(mid).isSome -func put*(c: var MCache, msgId: string, msg: Message) = +func put*(c: var MCache, msgId: MessageID, msg: Message) = if msgId notin c.msgs: c.msgs[msgId] = msg c.history[0].add(CacheEntry(mid: msgId, topicIDs: msg.topicIDs)) -func window*(c: MCache, topic: string): HashSet[string] = - result = initHashSet[string]() +func window*(c: MCache, topic: string): HashSet[MessageID] = + result = initHashSet[MessageID]() let len = min(c.windowSize, c.history.len) diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index c06736fd2..75d432741 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -50,7 +50,7 @@ type TopicPair* = tuple[topic: string, handler: TopicHandler] MsgIdProvider* = - proc(m: Message): string {.noSideEffect, raises: [Defect], nimcall, gcsafe.} + proc(m: Message): MessageID {.noSideEffect, raises: [Defect], nimcall, gcsafe.} Topic* = object # make this a variant type if one day we have different Params structs diff --git a/libp2p/protocols/pubsub/rpc/message.nim b/libp2p/protocols/pubsub/rpc/message.nim index 7cc2d470c..77f6057c8 100644 --- a/libp2p/protocols/pubsub/rpc/message.nim +++ b/libp2p/protocols/pubsub/rpc/message.nim @@ -28,11 +28,16 @@ const PubSubPrefix = toBytes("libp2p-pubsub:") declareCounter(libp2p_pubsub_sig_verify_success, "pubsub successfully validated messages") declareCounter(libp2p_pubsub_sig_verify_failure, "pubsub failed validated messages") -func defaultMsgIdProvider*(m: Message): string = - if m.seqno.len > 0 and m.fromPeer.data.len > 0: - byteutils.toHex(m.seqno) & $m.fromPeer - else: - $m.data.hash & $m.topicIDs.hash +func defaultMsgIdProvider*(m: Message): MessageID = + let mid = + if m.seqno.len > 0 and m.fromPeer.data.len > 0: + byteutils.toHex(m.seqno) & $m.fromPeer + else: + # This part is irrelevant because it's not standard, + # We use it exclusively for testing basically and users should + # implement their own logic in the case they use anonymization + $m.data.hash & $m.topicIDs.hash + mid.toBytes() proc sign*(msg: Message, privateKey: PrivateKey): CryptoResult[seq[byte]] = ok((? privateKey.sign(PubSubPrefix & encodeMessage(msg, false))).getBytes()) diff --git a/libp2p/protocols/pubsub/rpc/messages.nim b/libp2p/protocols/pubsub/rpc/messages.nim index 301b8a33b..59c199d77 100644 --- a/libp2p/protocols/pubsub/rpc/messages.nim +++ b/libp2p/protocols/pubsub/rpc/messages.nim @@ -22,6 +22,8 @@ type subscribe*: bool topic*: string + MessageID* = seq[byte] + Message* = object fromPeer*: PeerId data*: seq[byte] @@ -38,10 +40,10 @@ type ControlIHave* = object topicID*: string - messageIDs*: seq[string] + messageIDs*: seq[MessageID] ControlIWant* = object - messageIDs*: seq[string] + messageIDs*: seq[MessageID] ControlGraft* = object topicID*: string