gossip MessageID as seq[byte] (#391)
* gossip MessageID as seq[byte] * combina hashes in defaultMsgIdProvider * wip * fix defaultMsgIdProvider
This commit is contained in:
parent
27b9bf436e
commit
462da1f7a8
|
@ -27,7 +27,7 @@ const FloodSubCodec* = "/floodsub/1.0.0"
|
||||||
type
|
type
|
||||||
FloodSub* = ref object of PubSub
|
FloodSub* = ref object of PubSub
|
||||||
floodsub*: PeerTable # topic to remote peer map
|
floodsub*: PeerTable # topic to remote peer map
|
||||||
seen*: TimedCache[string] # list of messages forwarded to peers
|
seen*: TimedCache[MessageID] # list of messages forwarded to peers
|
||||||
|
|
||||||
method subscribeTopic*(f: FloodSub,
|
method subscribeTopic*(f: FloodSub,
|
||||||
topic: string,
|
topic: string,
|
||||||
|
@ -183,5 +183,5 @@ method unsubscribeAll*(f: FloodSub, topic: string) {.async.} =
|
||||||
method initPubSub*(f: FloodSub) =
|
method initPubSub*(f: FloodSub) =
|
||||||
procCall PubSub(f).initPubSub()
|
procCall PubSub(f).initPubSub()
|
||||||
f.floodsub = initTable[string, HashSet[PubSubPeer]]()
|
f.floodsub = initTable[string, HashSet[PubSubPeer]]()
|
||||||
f.seen = TimedCache[string].init(2.minutes)
|
f.seen = TimedCache[MessageID].init(2.minutes)
|
||||||
f.init()
|
f.init()
|
||||||
|
|
|
@ -14,30 +14,30 @@ export sets, tables, messages, options
|
||||||
|
|
||||||
type
|
type
|
||||||
CacheEntry* = object
|
CacheEntry* = object
|
||||||
mid*: string
|
mid*: MessageID
|
||||||
topicIDs*: seq[string]
|
topicIDs*: seq[string]
|
||||||
|
|
||||||
MCache* = object of RootObj
|
MCache* = object of RootObj
|
||||||
msgs*: Table[string, Message]
|
msgs*: Table[MessageID, Message]
|
||||||
history*: seq[seq[CacheEntry]]
|
history*: seq[seq[CacheEntry]]
|
||||||
historySize*: Natural
|
historySize*: Natural
|
||||||
windowSize*: Natural
|
windowSize*: Natural
|
||||||
|
|
||||||
func get*(c: MCache, mid: string): Option[Message] =
|
func get*(c: MCache, mid: MessageID): Option[Message] =
|
||||||
result = none(Message)
|
result = none(Message)
|
||||||
if mid in c.msgs:
|
if mid in c.msgs:
|
||||||
result = some(c.msgs[mid])
|
result = some(c.msgs[mid])
|
||||||
|
|
||||||
func contains*(c: MCache, mid: string): bool =
|
func contains*(c: MCache, mid: MessageID): bool =
|
||||||
c.get(mid).isSome
|
c.get(mid).isSome
|
||||||
|
|
||||||
func put*(c: var MCache, msgId: string, msg: Message) =
|
func put*(c: var MCache, msgId: MessageID, msg: Message) =
|
||||||
if msgId notin c.msgs:
|
if msgId notin c.msgs:
|
||||||
c.msgs[msgId] = msg
|
c.msgs[msgId] = msg
|
||||||
c.history[0].add(CacheEntry(mid: msgId, topicIDs: msg.topicIDs))
|
c.history[0].add(CacheEntry(mid: msgId, topicIDs: msg.topicIDs))
|
||||||
|
|
||||||
func window*(c: MCache, topic: string): HashSet[string] =
|
func window*(c: MCache, topic: string): HashSet[MessageID] =
|
||||||
result = initHashSet[string]()
|
result = initHashSet[MessageID]()
|
||||||
|
|
||||||
let
|
let
|
||||||
len = min(c.windowSize, c.history.len)
|
len = min(c.windowSize, c.history.len)
|
||||||
|
|
|
@ -50,7 +50,7 @@ type
|
||||||
TopicPair* = tuple[topic: string, handler: TopicHandler]
|
TopicPair* = tuple[topic: string, handler: TopicHandler]
|
||||||
|
|
||||||
MsgIdProvider* =
|
MsgIdProvider* =
|
||||||
proc(m: Message): string {.noSideEffect, raises: [Defect], nimcall, gcsafe.}
|
proc(m: Message): MessageID {.noSideEffect, raises: [Defect], nimcall, gcsafe.}
|
||||||
|
|
||||||
Topic* = object
|
Topic* = object
|
||||||
# make this a variant type if one day we have different Params structs
|
# make this a variant type if one day we have different Params structs
|
||||||
|
|
|
@ -28,11 +28,16 @@ const PubSubPrefix = toBytes("libp2p-pubsub:")
|
||||||
declareCounter(libp2p_pubsub_sig_verify_success, "pubsub successfully validated messages")
|
declareCounter(libp2p_pubsub_sig_verify_success, "pubsub successfully validated messages")
|
||||||
declareCounter(libp2p_pubsub_sig_verify_failure, "pubsub failed validated messages")
|
declareCounter(libp2p_pubsub_sig_verify_failure, "pubsub failed validated messages")
|
||||||
|
|
||||||
func defaultMsgIdProvider*(m: Message): string =
|
func defaultMsgIdProvider*(m: Message): MessageID =
|
||||||
|
let mid =
|
||||||
if m.seqno.len > 0 and m.fromPeer.data.len > 0:
|
if m.seqno.len > 0 and m.fromPeer.data.len > 0:
|
||||||
byteutils.toHex(m.seqno) & $m.fromPeer
|
byteutils.toHex(m.seqno) & $m.fromPeer
|
||||||
else:
|
else:
|
||||||
|
# This part is irrelevant because it's not standard,
|
||||||
|
# We use it exclusively for testing basically and users should
|
||||||
|
# implement their own logic in the case they use anonymization
|
||||||
$m.data.hash & $m.topicIDs.hash
|
$m.data.hash & $m.topicIDs.hash
|
||||||
|
mid.toBytes()
|
||||||
|
|
||||||
proc sign*(msg: Message, privateKey: PrivateKey): CryptoResult[seq[byte]] =
|
proc sign*(msg: Message, privateKey: PrivateKey): CryptoResult[seq[byte]] =
|
||||||
ok((? privateKey.sign(PubSubPrefix & encodeMessage(msg, false))).getBytes())
|
ok((? privateKey.sign(PubSubPrefix & encodeMessage(msg, false))).getBytes())
|
||||||
|
|
|
@ -22,6 +22,8 @@ type
|
||||||
subscribe*: bool
|
subscribe*: bool
|
||||||
topic*: string
|
topic*: string
|
||||||
|
|
||||||
|
MessageID* = seq[byte]
|
||||||
|
|
||||||
Message* = object
|
Message* = object
|
||||||
fromPeer*: PeerId
|
fromPeer*: PeerId
|
||||||
data*: seq[byte]
|
data*: seq[byte]
|
||||||
|
@ -38,10 +40,10 @@ type
|
||||||
|
|
||||||
ControlIHave* = object
|
ControlIHave* = object
|
||||||
topicID*: string
|
topicID*: string
|
||||||
messageIDs*: seq[string]
|
messageIDs*: seq[MessageID]
|
||||||
|
|
||||||
ControlIWant* = object
|
ControlIWant* = object
|
||||||
messageIDs*: seq[string]
|
messageIDs*: seq[MessageID]
|
||||||
|
|
||||||
ControlGraft* = object
|
ControlGraft* = object
|
||||||
topicID*: string
|
topicID*: string
|
||||||
|
|
Loading…
Reference in New Issue