allow message id provider to be specified (#243)
* don't send public key in message when not signing (information leak) * don't run rebalance if there are peers in gossip (see #242) * don't crash randomly on bad peer id from remote
This commit is contained in:
parent
902880ef1f
commit
aa6756dfe0
|
@ -13,7 +13,6 @@ import pubsub,
|
||||||
pubsubpeer,
|
pubsubpeer,
|
||||||
timedcache,
|
timedcache,
|
||||||
rpc/[messages, message],
|
rpc/[messages, message],
|
||||||
../../crypto/crypto,
|
|
||||||
../../stream/connection,
|
../../stream/connection,
|
||||||
../../peer,
|
../../peer,
|
||||||
../../peerinfo,
|
../../peerinfo,
|
||||||
|
@ -65,8 +64,11 @@ method rpcHandler*(f: FloodSub,
|
||||||
if m.messages.len > 0: # if there are any messages
|
if m.messages.len > 0: # if there are any messages
|
||||||
var toSendPeers: HashSet[string] = initHashSet[string]()
|
var toSendPeers: HashSet[string] = initHashSet[string]()
|
||||||
for msg in m.messages: # for every message
|
for msg in m.messages: # for every message
|
||||||
if msg.msgId notin f.seen:
|
let msgId = f.msgIdProvider(msg)
|
||||||
f.seen.put(msg.msgId) # add the message to the seen cache
|
logScope: msgId
|
||||||
|
|
||||||
|
if msgId notin f.seen:
|
||||||
|
f.seen.put(msgId) # add the message to the seen cache
|
||||||
|
|
||||||
if f.verifySignature and not msg.verify(peer.peerInfo):
|
if f.verifySignature and not msg.verify(peer.peerInfo):
|
||||||
trace "dropping message due to failed signature verification"
|
trace "dropping message due to failed signature verification"
|
||||||
|
@ -81,10 +83,9 @@ method rpcHandler*(f: FloodSub,
|
||||||
toSendPeers.incl(f.floodsub[t]) # get all the peers interested in this topic
|
toSendPeers.incl(f.floodsub[t]) # get all the peers interested in this topic
|
||||||
if t in f.topics: # check that we're subscribed to it
|
if t in f.topics: # check that we're subscribed to it
|
||||||
for h in f.topics[t].handler:
|
for h in f.topics[t].handler:
|
||||||
trace "calling handler for message", msg = msg.msgId,
|
trace "calling handler for message", topicId = t,
|
||||||
topicId = t,
|
|
||||||
localPeer = f.peerInfo.id,
|
localPeer = f.peerInfo.id,
|
||||||
fromPeer = msg.fromPeerId().pretty
|
fromPeer = msg.fromPeer.pretty
|
||||||
await h(t, msg.data) # trigger user provided handler
|
await h(t, msg.data) # trigger user provided handler
|
||||||
|
|
||||||
# forward the message to all peers interested in it
|
# forward the message to all peers interested in it
|
||||||
|
@ -129,7 +130,7 @@ method publish*(f: FloodSub,
|
||||||
return
|
return
|
||||||
|
|
||||||
trace "publishing on topic", name = topic
|
trace "publishing on topic", name = topic
|
||||||
let msg = newMessage(f.peerInfo, data, topic, f.sign)
|
let msg = Message.init(f.peerInfo, data, topic, f.sign)
|
||||||
var sent: seq[Future[void]]
|
var sent: seq[Future[void]]
|
||||||
# start the future but do not wait yet
|
# start the future but do not wait yet
|
||||||
for p in f.floodsub.getOrDefault(topic):
|
for p in f.floodsub.getOrDefault(topic):
|
||||||
|
|
|
@ -15,7 +15,6 @@ import pubsub,
|
||||||
mcache,
|
mcache,
|
||||||
timedcache,
|
timedcache,
|
||||||
rpc/[messages, message],
|
rpc/[messages, message],
|
||||||
../../crypto/crypto,
|
|
||||||
../protocol,
|
../protocol,
|
||||||
../../peerinfo,
|
../../peerinfo,
|
||||||
../../stream/connection,
|
../../stream/connection,
|
||||||
|
@ -361,12 +360,16 @@ method rpcHandler*(g: GossipSub,
|
||||||
if m.messages.len > 0: # if there are any messages
|
if m.messages.len > 0: # if there are any messages
|
||||||
var toSendPeers: HashSet[string]
|
var toSendPeers: HashSet[string]
|
||||||
for msg in m.messages: # for every message
|
for msg in m.messages: # for every message
|
||||||
trace "processing message with id", msg = msg.msgId
|
let msgId = g.msgIdProvider(msg)
|
||||||
if msg.msgId in g.seen:
|
logScope: msgId
|
||||||
trace "message already processed, skipping", msg = msg.msgId
|
|
||||||
|
if msgId in g.seen:
|
||||||
|
trace "message already processed, skipping"
|
||||||
continue
|
continue
|
||||||
|
|
||||||
g.seen.put(msg.msgId) # add the message to the seen cache
|
trace "processing message"
|
||||||
|
|
||||||
|
g.seen.put(msgId) # add the message to the seen cache
|
||||||
|
|
||||||
if g.verifySignature and not msg.verify(peer.peerInfo):
|
if g.verifySignature and not msg.verify(peer.peerInfo):
|
||||||
trace "dropping message due to failed signature verification"
|
trace "dropping message due to failed signature verification"
|
||||||
|
@ -377,8 +380,8 @@ method rpcHandler*(g: GossipSub,
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# this shouldn't happen
|
# this shouldn't happen
|
||||||
if g.peerInfo.peerId == msg.fromPeerId():
|
if g.peerInfo.peerId == msg.fromPeer:
|
||||||
trace "skipping messages from self", msg = msg.msgId
|
trace "skipping messages from self"
|
||||||
continue
|
continue
|
||||||
|
|
||||||
for t in msg.topicIDs: # for every topic in the message
|
for t in msg.topicIDs: # for every topic in the message
|
||||||
|
@ -391,10 +394,9 @@ method rpcHandler*(g: GossipSub,
|
||||||
|
|
||||||
if t in g.topics: # if we're subscribed to the topic
|
if t in g.topics: # if we're subscribed to the topic
|
||||||
for h in g.topics[t].handler:
|
for h in g.topics[t].handler:
|
||||||
trace "calling handler for message", msg = msg.msgId,
|
trace "calling handler for message", topicId = t,
|
||||||
topicId = t,
|
|
||||||
localPeer = g.peerInfo.id,
|
localPeer = g.peerInfo.id,
|
||||||
fromPeer = msg.fromPeerId().pretty
|
fromPeer = msg.fromPeer.pretty
|
||||||
await h(t, msg.data) # trigger user provided handler
|
await h(t, msg.data) # trigger user provided handler
|
||||||
|
|
||||||
# forward the message to all peers interested in it
|
# forward the message to all peers interested in it
|
||||||
|
@ -409,7 +411,7 @@ method rpcHandler*(g: GossipSub,
|
||||||
|
|
||||||
let msgs = m.messages.filterIt(
|
let msgs = m.messages.filterIt(
|
||||||
# don't forward to message originator
|
# don't forward to message originator
|
||||||
id != it.fromPeerId()
|
id != it.fromPeer
|
||||||
)
|
)
|
||||||
|
|
||||||
var sent: seq[Future[void]]
|
var sent: seq[Future[void]]
|
||||||
|
@ -460,9 +462,9 @@ method publish*(g: GossipSub,
|
||||||
trace "about to publish message on topic", name = topic,
|
trace "about to publish message on topic", name = topic,
|
||||||
data = data.shortLog
|
data = data.shortLog
|
||||||
|
|
||||||
var peers: HashSet[string]
|
|
||||||
# TODO: we probably don't need to try multiple times
|
# TODO: we probably don't need to try multiple times
|
||||||
if data.len > 0 and topic.len > 0:
|
if data.len > 0 and topic.len > 0:
|
||||||
|
var peers = g.mesh.getOrDefault(topic)
|
||||||
for _ in 0..<5: # try to get peers up to 5 times
|
for _ in 0..<5: # try to get peers up to 5 times
|
||||||
if peers.len > 0:
|
if peers.len > 0:
|
||||||
break
|
break
|
||||||
|
@ -480,7 +482,10 @@ method publish*(g: GossipSub,
|
||||||
# wait a second between tries
|
# wait a second between tries
|
||||||
await sleepAsync(1.seconds)
|
await sleepAsync(1.seconds)
|
||||||
|
|
||||||
let msg = newMessage(g.peerInfo, data, topic, g.sign)
|
let
|
||||||
|
msg = Message.init(g.peerInfo, data, topic, g.sign)
|
||||||
|
msgId = g.msgIdProvider(msg)
|
||||||
|
|
||||||
trace "created new message", msg
|
trace "created new message", msg
|
||||||
var sent: seq[Future[void]]
|
var sent: seq[Future[void]]
|
||||||
for p in peers:
|
for p in peers:
|
||||||
|
@ -488,8 +493,8 @@ method publish*(g: GossipSub,
|
||||||
continue
|
continue
|
||||||
|
|
||||||
trace "publishing on topic", name = topic
|
trace "publishing on topic", name = topic
|
||||||
if msg.msgId notin g.mcache:
|
if msgId notin g.mcache:
|
||||||
g.mcache.put(msg)
|
g.mcache.put(msgId, msg)
|
||||||
|
|
||||||
if p in g.peers:
|
if p in g.peers:
|
||||||
sent.add(g.peers[p].send(@[RPCMsg(messages: @[msg])]))
|
sent.add(g.peers[p].send(@[RPCMsg(messages: @[msg])]))
|
||||||
|
@ -499,10 +504,10 @@ method publish*(g: GossipSub,
|
||||||
|
|
||||||
method start*(g: GossipSub) {.async.} =
|
method start*(g: GossipSub) {.async.} =
|
||||||
debug "gossipsub start"
|
debug "gossipsub start"
|
||||||
|
|
||||||
## start pubsub
|
## start pubsub
|
||||||
## start long running/repeating procedures
|
## start long running/repeating procedures
|
||||||
|
|
||||||
# interlock start to to avoid overlapping to stops
|
# interlock start to to avoid overlapping to stops
|
||||||
await g.heartbeatLock.acquire()
|
await g.heartbeatLock.acquire()
|
||||||
|
|
||||||
|
@ -514,7 +519,7 @@ method start*(g: GossipSub) {.async.} =
|
||||||
|
|
||||||
method stop*(g: GossipSub) {.async.} =
|
method stop*(g: GossipSub) {.async.} =
|
||||||
debug "gossipsub stop"
|
debug "gossipsub stop"
|
||||||
|
|
||||||
## stop pubsub
|
## stop pubsub
|
||||||
## stop long running tasks
|
## stop long running tasks
|
||||||
|
|
||||||
|
|
|
@ -9,7 +9,7 @@
|
||||||
|
|
||||||
import chronos, chronicles
|
import chronos, chronicles
|
||||||
import tables, options, sets, sequtils
|
import tables, options, sets, sequtils
|
||||||
import rpc/[messages, message], timedcache
|
import rpc/[messages], timedcache
|
||||||
|
|
||||||
type
|
type
|
||||||
CacheEntry* = object
|
CacheEntry* = object
|
||||||
|
@ -30,17 +30,17 @@ proc get*(c: MCache, mid: string): Option[Message] =
|
||||||
proc contains*(c: MCache, mid: string): bool =
|
proc contains*(c: MCache, mid: string): bool =
|
||||||
c.get(mid).isSome
|
c.get(mid).isSome
|
||||||
|
|
||||||
proc put*(c: MCache, msg: Message) =
|
proc put*(c: MCache, msgId: string, msg: Message) =
|
||||||
proc handler(key: string, val: Message) {.gcsafe.} =
|
proc handler(key: string, val: Message) {.gcsafe.} =
|
||||||
## make sure we remove the message from history
|
## make sure we remove the message from history
|
||||||
## to keep things consisten
|
## to keep things consisten
|
||||||
c.history.applyIt(
|
c.history.applyIt(
|
||||||
it.filterIt(it.mid != msg.msgId)
|
it.filterIt(it.mid != msgId)
|
||||||
)
|
)
|
||||||
|
|
||||||
if msg.msgId notin c.msgs:
|
if msgId notin c.msgs:
|
||||||
c.msgs.put(msg.msgId, msg, handler = handler)
|
c.msgs.put(msgId, msg, handler = handler)
|
||||||
c.history[0].add(CacheEntry(mid: msg.msgId, msg: msg))
|
c.history[0].add(CacheEntry(mid: msgId, msg: msg))
|
||||||
|
|
||||||
proc window*(c: MCache, topic: string): HashSet[string] =
|
proc window*(c: MCache, topic: string): HashSet[string] =
|
||||||
result = initHashSet[string]()
|
result = initHashSet[string]()
|
||||||
|
@ -56,7 +56,7 @@ proc window*(c: MCache, topic: string): HashSet[string] =
|
||||||
for entry in slot:
|
for entry in slot:
|
||||||
for t in entry.msg.topicIDs:
|
for t in entry.msg.topicIDs:
|
||||||
if t == topic:
|
if t == topic:
|
||||||
result.incl(entry.msg.msgId)
|
result.incl(entry.mid)
|
||||||
break
|
break
|
||||||
|
|
||||||
proc shift*(c: MCache) =
|
proc shift*(c: MCache) =
|
||||||
|
|
|
@ -10,9 +10,10 @@
|
||||||
import tables, sequtils, sets
|
import tables, sequtils, sets
|
||||||
import chronos, chronicles
|
import chronos, chronicles
|
||||||
import pubsubpeer,
|
import pubsubpeer,
|
||||||
rpc/messages,
|
rpc/[message, messages],
|
||||||
../protocol,
|
../protocol,
|
||||||
../../stream/connection,
|
../../stream/connection,
|
||||||
|
../../peer,
|
||||||
../../peerinfo
|
../../peerinfo
|
||||||
import metrics
|
import metrics
|
||||||
|
|
||||||
|
@ -38,6 +39,9 @@ type
|
||||||
|
|
||||||
TopicPair* = tuple[topic: string, handler: TopicHandler]
|
TopicPair* = tuple[topic: string, handler: TopicHandler]
|
||||||
|
|
||||||
|
MsgIdProvider* =
|
||||||
|
proc(m: Message): string {.noSideEffect, raises: [Defect], nimcall, gcsafe.}
|
||||||
|
|
||||||
Topic* = object
|
Topic* = object
|
||||||
name*: string
|
name*: string
|
||||||
handler*: seq[TopicHandler]
|
handler*: seq[TopicHandler]
|
||||||
|
@ -52,6 +56,7 @@ type
|
||||||
cleanupLock: AsyncLock
|
cleanupLock: AsyncLock
|
||||||
validators*: Table[string, HashSet[ValidatorHandler]]
|
validators*: Table[string, HashSet[ValidatorHandler]]
|
||||||
observers: ref seq[PubSubObserver] # ref as in smart_ptr
|
observers: ref seq[PubSubObserver] # ref as in smart_ptr
|
||||||
|
msgIdProvider*: MsgIdProvider # Turn message into message id (not nil)
|
||||||
|
|
||||||
proc sendSubs*(p: PubSub,
|
proc sendSubs*(p: PubSub,
|
||||||
peer: PubSubPeer,
|
peer: PubSubPeer,
|
||||||
|
@ -244,6 +249,8 @@ method publish*(p: PubSub,
|
||||||
method initPubSub*(p: PubSub) {.base.} =
|
method initPubSub*(p: PubSub) {.base.} =
|
||||||
## perform pubsub initialization
|
## perform pubsub initialization
|
||||||
p.observers = new(seq[PubSubObserver])
|
p.observers = new(seq[PubSubObserver])
|
||||||
|
if p.msgIdProvider == nil:
|
||||||
|
p.msgIdProvider = defaultMsgIdProvider
|
||||||
|
|
||||||
method start*(p: PubSub) {.async, base.} =
|
method start*(p: PubSub) {.async, base.} =
|
||||||
## start pubsub
|
## start pubsub
|
||||||
|
@ -292,12 +299,14 @@ proc newPubSub*(P: typedesc[PubSub],
|
||||||
peerInfo: PeerInfo,
|
peerInfo: PeerInfo,
|
||||||
triggerSelf: bool = false,
|
triggerSelf: bool = false,
|
||||||
verifySignature: bool = true,
|
verifySignature: bool = true,
|
||||||
sign: bool = true): P =
|
sign: bool = true,
|
||||||
|
msgIdProvider: MsgIdProvider = defaultMsgIdProvider): P =
|
||||||
result = P(peerInfo: peerInfo,
|
result = P(peerInfo: peerInfo,
|
||||||
triggerSelf: triggerSelf,
|
triggerSelf: triggerSelf,
|
||||||
verifySignature: verifySignature,
|
verifySignature: verifySignature,
|
||||||
sign: sign,
|
sign: sign,
|
||||||
cleanupLock: newAsyncLock())
|
cleanupLock: newAsyncLock(),
|
||||||
|
msgIdProvider: msgIdProvider)
|
||||||
result.initPubSub()
|
result.initPubSub()
|
||||||
|
|
||||||
proc addObserver*(p: PubSub; observer: PubSubObserver) = p.observers[] &= observer
|
proc addObserver*(p: PubSub; observer: PubSubObserver) = p.observers[] &= observer
|
||||||
|
|
|
@ -154,7 +154,7 @@ proc sendMsg*(p: PubSubPeer,
|
||||||
topic: string,
|
topic: string,
|
||||||
data: seq[byte],
|
data: seq[byte],
|
||||||
sign: bool): Future[void] {.gcsafe.} =
|
sign: bool): Future[void] {.gcsafe.} =
|
||||||
p.send(@[RPCMsg(messages: @[newMessage(p.peerInfo, data, topic, sign)])])
|
p.send(@[RPCMsg(messages: @[Message.init(p.peerInfo, data, topic, sign)])])
|
||||||
|
|
||||||
proc sendGraft*(p: PubSubPeer, topics: seq[string]) {.async.} =
|
proc sendGraft*(p: PubSubPeer, topics: seq[string]) {.async.} =
|
||||||
for topic in topics:
|
for topic in topics:
|
||||||
|
|
|
@ -7,9 +7,12 @@
|
||||||
## This file may not be copied, modified, or distributed except according to
|
## This file may not be copied, modified, or distributed except according to
|
||||||
## those terms.
|
## those terms.
|
||||||
|
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
|
||||||
import options
|
import options
|
||||||
import chronicles, stew/byteutils
|
import chronicles, stew/byteutils
|
||||||
import metrics
|
import metrics
|
||||||
|
import chronicles
|
||||||
import nimcrypto/sysrand
|
import nimcrypto/sysrand
|
||||||
import messages, protobuf,
|
import messages, protobuf,
|
||||||
../../../peer,
|
../../../peer,
|
||||||
|
@ -20,33 +23,18 @@ import messages, protobuf,
|
||||||
logScope:
|
logScope:
|
||||||
topics = "pubsubmessage"
|
topics = "pubsubmessage"
|
||||||
|
|
||||||
const PubSubPrefix = "libp2p-pubsub:"
|
const PubSubPrefix = toBytes("libp2p-pubsub:")
|
||||||
|
|
||||||
declareCounter(libp2p_pubsub_sig_verify_success, "pubsub successfully validated messages")
|
declareCounter(libp2p_pubsub_sig_verify_success, "pubsub successfully validated messages")
|
||||||
declareCounter(libp2p_pubsub_sig_verify_failure, "pubsub failed validated messages")
|
declareCounter(libp2p_pubsub_sig_verify_failure, "pubsub failed validated messages")
|
||||||
|
|
||||||
proc msgIdProvider(m: Message): string =
|
func defaultMsgIdProvider*(m: Message): string =
|
||||||
## default msg id provider
|
byteutils.toHex(m.seqno) & m.fromPeer.pretty
|
||||||
crypto.toHex(m.seqno) & PeerID.init(m.fromPeer).pretty
|
|
||||||
|
|
||||||
template msgId*(m: Message): string =
|
proc sign*(msg: Message, p: PeerInfo): seq[byte] {.gcsafe, raises: [ResultError[CryptoError], Defect].} =
|
||||||
## calls the ``msgIdProvider`` from
|
|
||||||
## the instantiation scope
|
|
||||||
##
|
|
||||||
mixin msgIdProvider
|
|
||||||
m.msgIdProvider()
|
|
||||||
|
|
||||||
proc fromPeerId*(m: Message): PeerId =
|
|
||||||
PeerID.init(m.fromPeer)
|
|
||||||
|
|
||||||
proc sign*(msg: Message, p: PeerInfo): Message {.gcsafe.} =
|
|
||||||
var buff = initProtoBuffer()
|
var buff = initProtoBuffer()
|
||||||
encodeMessage(msg, buff)
|
encodeMessage(msg, buff)
|
||||||
if buff.buffer.len > 0:
|
p.privateKey.sign(PubSubPrefix & buff.buffer).tryGet().getBytes()
|
||||||
result = msg
|
|
||||||
result.signature = p.privateKey.
|
|
||||||
sign(PubSubPrefix.toBytes() & buff.buffer).tryGet().
|
|
||||||
getBytes()
|
|
||||||
|
|
||||||
proc verify*(m: Message, p: PeerInfo): bool =
|
proc verify*(m: Message, p: PeerInfo): bool =
|
||||||
if m.signature.len > 0 and m.key.len > 0:
|
if m.signature.len > 0 and m.key.len > 0:
|
||||||
|
@ -61,27 +49,29 @@ proc verify*(m: Message, p: PeerInfo): bool =
|
||||||
var key: PublicKey
|
var key: PublicKey
|
||||||
if remote.init(m.signature) and key.init(m.key):
|
if remote.init(m.signature) and key.init(m.key):
|
||||||
trace "verifying signature", remoteSignature = remote
|
trace "verifying signature", remoteSignature = remote
|
||||||
result = remote.verify(PubSubPrefix.toBytes() & buff.buffer, key)
|
result = remote.verify(PubSubPrefix & buff.buffer, key)
|
||||||
|
|
||||||
if result:
|
if result:
|
||||||
libp2p_pubsub_sig_verify_success.inc()
|
libp2p_pubsub_sig_verify_success.inc()
|
||||||
else:
|
else:
|
||||||
libp2p_pubsub_sig_verify_failure.inc()
|
libp2p_pubsub_sig_verify_failure.inc()
|
||||||
|
|
||||||
proc newMessage*(p: PeerInfo,
|
proc init*(
|
||||||
data: seq[byte],
|
T: type Message,
|
||||||
topic: string,
|
p: PeerInfo,
|
||||||
sign: bool = true): Message {.gcsafe.} =
|
data: seq[byte],
|
||||||
|
topic: string,
|
||||||
|
sign: bool = true): Message {.gcsafe, raises: [CatchableError, Defect].} =
|
||||||
var seqno: seq[byte] = newSeq[byte](8)
|
var seqno: seq[byte] = newSeq[byte](8)
|
||||||
if randomBytes(addr seqno[0], 8) > 0:
|
if randomBytes(addr seqno[0], 8) <= 0:
|
||||||
if p.publicKey.isSome:
|
raise (ref CatchableError)(msg: "Cannot get randomness for message")
|
||||||
var key: seq[byte] = p.publicKey.get().getBytes().tryGet()
|
|
||||||
|
|
||||||
result = Message(fromPeer: p.peerId.getBytes(),
|
result = Message(
|
||||||
data: data,
|
fromPeer: p.peerId,
|
||||||
seqno: seqno,
|
data: data,
|
||||||
topicIDs: @[topic])
|
seqno: seqno,
|
||||||
if sign:
|
topicIDs: @[topic])
|
||||||
result = result.sign(p)
|
|
||||||
|
|
||||||
result.key = key
|
if sign and p.publicKey.isSome:
|
||||||
|
result.signature = sign(result, p)
|
||||||
|
result.key = p.publicKey.get().getBytes().tryGet()
|
||||||
|
|
|
@ -9,6 +9,7 @@
|
||||||
|
|
||||||
import options, sequtils
|
import options, sequtils
|
||||||
import ../../../utility
|
import ../../../utility
|
||||||
|
import ../../../peer
|
||||||
|
|
||||||
type
|
type
|
||||||
SubOpts* = object
|
SubOpts* = object
|
||||||
|
@ -16,7 +17,7 @@ type
|
||||||
topic*: string
|
topic*: string
|
||||||
|
|
||||||
Message* = object
|
Message* = object
|
||||||
fromPeer*: seq[byte]
|
fromPeer*: PeerId
|
||||||
data*: seq[byte]
|
data*: seq[byte]
|
||||||
seqno*: seq[byte]
|
seqno*: seq[byte]
|
||||||
topicIDs*: seq[string]
|
topicIDs*: seq[string]
|
||||||
|
@ -75,10 +76,10 @@ func shortLog*(c: ControlMessage): auto =
|
||||||
graft: mapIt(c.graft, it.shortLog),
|
graft: mapIt(c.graft, it.shortLog),
|
||||||
prune: mapIt(c.prune, it.shortLog)
|
prune: mapIt(c.prune, it.shortLog)
|
||||||
)
|
)
|
||||||
|
|
||||||
func shortLog*(msg: Message): auto =
|
func shortLog*(msg: Message): auto =
|
||||||
(
|
(
|
||||||
fromPeer: msg.fromPeer.shortLog,
|
fromPeer: msg.fromPeer,
|
||||||
data: msg.data.shortLog,
|
data: msg.data.shortLog,
|
||||||
seqno: msg.seqno.shortLog,
|
seqno: msg.seqno.shortLog,
|
||||||
topicIDs: $msg.topicIDs,
|
topicIDs: $msg.topicIDs,
|
||||||
|
|
|
@ -10,6 +10,7 @@
|
||||||
import options
|
import options
|
||||||
import chronicles
|
import chronicles
|
||||||
import messages,
|
import messages,
|
||||||
|
../../../peer,
|
||||||
../../../utility,
|
../../../utility,
|
||||||
../../../protobuf/minprotobuf
|
../../../protobuf/minprotobuf
|
||||||
|
|
||||||
|
@ -162,7 +163,7 @@ proc decodeSubs*(pb: var ProtoBuffer): seq[SubOpts] {.gcsafe.} =
|
||||||
trace "got subscriptions", subscriptions = result
|
trace "got subscriptions", subscriptions = result
|
||||||
|
|
||||||
proc encodeMessage*(msg: Message, pb: var ProtoBuffer) {.gcsafe.} =
|
proc encodeMessage*(msg: Message, pb: var ProtoBuffer) {.gcsafe.} =
|
||||||
pb.write(initProtoField(1, msg.fromPeer))
|
pb.write(initProtoField(1, msg.fromPeer.getBytes()))
|
||||||
pb.write(initProtoField(2, msg.data))
|
pb.write(initProtoField(2, msg.data))
|
||||||
pb.write(initProtoField(3, msg.seqno))
|
pb.write(initProtoField(3, msg.seqno))
|
||||||
|
|
||||||
|
@ -181,9 +182,16 @@ proc decodeMessages*(pb: var ProtoBuffer): seq[Message] {.gcsafe.} =
|
||||||
# TODO: which of this fields are really optional?
|
# TODO: which of this fields are really optional?
|
||||||
while true:
|
while true:
|
||||||
var msg: Message
|
var msg: Message
|
||||||
if pb.getBytes(1, msg.fromPeer) < 0:
|
var fromPeer: seq[byte]
|
||||||
|
if pb.getBytes(1, fromPeer) < 0:
|
||||||
break
|
break
|
||||||
trace "read message field", fromPeer = msg.fromPeer.shortLog
|
try:
|
||||||
|
msg.fromPeer = PeerID.init(fromPeer)
|
||||||
|
except CatchableError as err:
|
||||||
|
debug "Invalid fromPeer in message", msg = err.msg
|
||||||
|
break
|
||||||
|
|
||||||
|
trace "read message field", fromPeer = msg.fromPeer.pretty
|
||||||
|
|
||||||
if pb.getBytes(2, msg.data) < 0:
|
if pb.getBytes(2, msg.data) < 0:
|
||||||
break
|
break
|
||||||
|
|
|
@ -9,7 +9,8 @@ import
|
||||||
crypto/crypto, transports/[transport, tcptransport],
|
crypto/crypto, transports/[transport, tcptransport],
|
||||||
muxers/[muxer, mplex/mplex, mplex/types],
|
muxers/[muxer, mplex/mplex, mplex/types],
|
||||||
protocols/[identify, secure/secure],
|
protocols/[identify, secure/secure],
|
||||||
protocols/pubsub/[pubsub, gossipsub, floodsub]
|
protocols/pubsub/[pubsub, gossipsub, floodsub],
|
||||||
|
protocols/pubsub/rpc/message
|
||||||
|
|
||||||
import
|
import
|
||||||
protocols/secure/noise,
|
protocols/secure/noise,
|
||||||
|
@ -30,11 +31,12 @@ proc newStandardSwitch*(privKey = none(PrivateKey),
|
||||||
secureManagers: openarray[SecureProtocol] = [
|
secureManagers: openarray[SecureProtocol] = [
|
||||||
# array cos order matters
|
# array cos order matters
|
||||||
SecureProtocol.Secio,
|
SecureProtocol.Secio,
|
||||||
SecureProtocol.Noise,
|
SecureProtocol.Noise,
|
||||||
],
|
],
|
||||||
verifySignature = libp2p_pubsub_verify,
|
verifySignature = libp2p_pubsub_verify,
|
||||||
sign = libp2p_pubsub_sign,
|
sign = libp2p_pubsub_sign,
|
||||||
transportFlags: set[ServerFlags] = {}): Switch =
|
transportFlags: set[ServerFlags] = {},
|
||||||
|
msgIdProvider: MsgIdProvider = defaultMsgIdProvider): Switch =
|
||||||
proc createMplex(conn: Connection): Muxer =
|
proc createMplex(conn: Connection): Muxer =
|
||||||
newMplex(conn)
|
newMplex(conn)
|
||||||
|
|
||||||
|
@ -60,13 +62,15 @@ proc newStandardSwitch*(privKey = none(PrivateKey),
|
||||||
peerInfo = peerInfo,
|
peerInfo = peerInfo,
|
||||||
triggerSelf = triggerSelf,
|
triggerSelf = triggerSelf,
|
||||||
verifySignature = verifySignature,
|
verifySignature = verifySignature,
|
||||||
sign = sign).PubSub
|
sign = sign,
|
||||||
|
msgIdProvider = msgIdProvider).PubSub
|
||||||
else:
|
else:
|
||||||
newPubSub(FloodSub,
|
newPubSub(FloodSub,
|
||||||
peerInfo = peerInfo,
|
peerInfo = peerInfo,
|
||||||
triggerSelf = triggerSelf,
|
triggerSelf = triggerSelf,
|
||||||
verifySignature = verifySignature,
|
verifySignature = verifySignature,
|
||||||
sign = sign).PubSub
|
sign = sign,
|
||||||
|
msgIdProvider = msgIdProvider).PubSub
|
||||||
|
|
||||||
newSwitch(
|
newSwitch(
|
||||||
peerInfo,
|
peerInfo,
|
||||||
|
|
|
@ -5,6 +5,7 @@ include ../../libp2p/protocols/pubsub/gossipsub
|
||||||
import unittest
|
import unittest
|
||||||
import stew/byteutils
|
import stew/byteutils
|
||||||
import ../../libp2p/errors
|
import ../../libp2p/errors
|
||||||
|
import ../../libp2p/crypto/crypto
|
||||||
import ../../libp2p/stream/bufferstream
|
import ../../libp2p/stream/bufferstream
|
||||||
|
|
||||||
import ../helpers
|
import ../helpers
|
||||||
|
@ -229,8 +230,8 @@ suite "GossipSub internal":
|
||||||
conns &= conn
|
conns &= conn
|
||||||
let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA).get())
|
let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA).get())
|
||||||
conn.peerInfo = peerInfo
|
conn.peerInfo = peerInfo
|
||||||
let msg = newMessage(peerInfo, ("HELLO" & $i).toBytes(), topic, false)
|
let msg = Message.init(peerInfo, ("HELLO" & $i).toBytes(), topic, false)
|
||||||
gossipSub.mcache.put(msg)
|
gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg)
|
||||||
|
|
||||||
check gossipSub.fanout[topic].len == 15
|
check gossipSub.fanout[topic].len == 15
|
||||||
check gossipSub.mesh[topic].len == 15
|
check gossipSub.mesh[topic].len == 15
|
||||||
|
@ -279,8 +280,8 @@ suite "GossipSub internal":
|
||||||
conns &= conn
|
conns &= conn
|
||||||
let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA).get())
|
let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA).get())
|
||||||
conn.peerInfo = peerInfo
|
conn.peerInfo = peerInfo
|
||||||
let msg = newMessage(peerInfo, ("HELLO" & $i).toBytes(), topic, false)
|
let msg = Message.init(peerInfo, ("HELLO" & $i).toBytes(), topic, false)
|
||||||
gossipSub.mcache.put(msg)
|
gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg)
|
||||||
|
|
||||||
let peers = gossipSub.getGossipPeers()
|
let peers = gossipSub.getGossipPeers()
|
||||||
check peers.len == GossipSubD
|
check peers.len == GossipSubD
|
||||||
|
@ -322,8 +323,8 @@ suite "GossipSub internal":
|
||||||
conns &= conn
|
conns &= conn
|
||||||
let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA).get())
|
let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA).get())
|
||||||
conn.peerInfo = peerInfo
|
conn.peerInfo = peerInfo
|
||||||
let msg = newMessage(peerInfo, ("HELLO" & $i).toBytes(), topic, false)
|
let msg = Message.init(peerInfo, ("HELLO" & $i).toBytes(), topic, false)
|
||||||
gossipSub.mcache.put(msg)
|
gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg)
|
||||||
|
|
||||||
let peers = gossipSub.getGossipPeers()
|
let peers = gossipSub.getGossipPeers()
|
||||||
check peers.len == GossipSubD
|
check peers.len == GossipSubD
|
||||||
|
@ -365,8 +366,8 @@ suite "GossipSub internal":
|
||||||
conns &= conn
|
conns &= conn
|
||||||
let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA).get())
|
let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA).get())
|
||||||
conn.peerInfo = peerInfo
|
conn.peerInfo = peerInfo
|
||||||
let msg = newMessage(peerInfo, ("bar" & $i).toBytes(), topic, false)
|
let msg = Message.init(peerInfo, ("bar" & $i).toBytes(), topic, false)
|
||||||
gossipSub.mcache.put(msg)
|
gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg)
|
||||||
|
|
||||||
let peers = gossipSub.getGossipPeers()
|
let peers = gossipSub.getGossipPeers()
|
||||||
check peers.len == 0
|
check peers.len == 0
|
||||||
|
|
|
@ -11,25 +11,26 @@ import ../../libp2p/[peer,
|
||||||
suite "MCache":
|
suite "MCache":
|
||||||
test "put/get":
|
test "put/get":
|
||||||
var mCache = newMCache(3, 5)
|
var mCache = newMCache(3, 5)
|
||||||
var msg = Message(fromPeer: PeerID.init(PrivateKey.random(ECDSA).get()).data,
|
var msg = Message(fromPeer: PeerID.init(PrivateKey.random(ECDSA).get()),
|
||||||
seqno: "12345".toBytes())
|
seqno: "12345".toBytes())
|
||||||
mCache.put(msg)
|
let msgId = defaultMsgIdProvider(msg)
|
||||||
check mCache.get(msg.msgId).isSome and mCache.get(msg.msgId).get() == msg
|
mCache.put(msgId, msg)
|
||||||
|
check mCache.get(msgId).isSome and mCache.get(msgId).get() == msg
|
||||||
|
|
||||||
test "window":
|
test "window":
|
||||||
var mCache = newMCache(3, 5)
|
var mCache = newMCache(3, 5)
|
||||||
|
|
||||||
for i in 0..<3:
|
for i in 0..<3:
|
||||||
var msg = Message(fromPeer: PeerID.init(PrivateKey.random(ECDSA).get()).data,
|
var msg = Message(fromPeer: PeerID.init(PrivateKey.random(ECDSA).get()),
|
||||||
seqno: "12345".toBytes(),
|
seqno: "12345".toBytes(),
|
||||||
topicIDs: @["foo"])
|
topicIDs: @["foo"])
|
||||||
mCache.put(msg)
|
mCache.put(defaultMsgIdProvider(msg), msg)
|
||||||
|
|
||||||
for i in 0..<5:
|
for i in 0..<5:
|
||||||
var msg = Message(fromPeer: PeerID.init(PrivateKey.random(ECDSA).get()).data,
|
var msg = Message(fromPeer: PeerID.init(PrivateKey.random(ECDSA).get()),
|
||||||
seqno: "12345".toBytes(),
|
seqno: "12345".toBytes(),
|
||||||
topicIDs: @["bar"])
|
topicIDs: @["bar"])
|
||||||
mCache.put(msg)
|
mCache.put(defaultMsgIdProvider(msg), msg)
|
||||||
|
|
||||||
var mids = mCache.window("foo")
|
var mids = mCache.window("foo")
|
||||||
check mids.len == 3
|
check mids.len == 3
|
||||||
|
@ -41,28 +42,28 @@ suite "MCache":
|
||||||
var mCache = newMCache(1, 5)
|
var mCache = newMCache(1, 5)
|
||||||
|
|
||||||
for i in 0..<3:
|
for i in 0..<3:
|
||||||
var msg = Message(fromPeer: PeerID.init(PrivateKey.random(ECDSA).get()).data,
|
var msg = Message(fromPeer: PeerID.init(PrivateKey.random(ECDSA).get()),
|
||||||
seqno: "12345".toBytes(),
|
seqno: "12345".toBytes(),
|
||||||
topicIDs: @["foo"])
|
topicIDs: @["foo"])
|
||||||
mCache.put(msg)
|
mCache.put(defaultMsgIdProvider(msg), msg)
|
||||||
|
|
||||||
mCache.shift()
|
mCache.shift()
|
||||||
check mCache.window("foo").len == 0
|
check mCache.window("foo").len == 0
|
||||||
|
|
||||||
for i in 0..<3:
|
for i in 0..<3:
|
||||||
var msg = Message(fromPeer: PeerID.init(PrivateKey.random(ECDSA).get()).data,
|
var msg = Message(fromPeer: PeerID.init(PrivateKey.random(ECDSA).get()),
|
||||||
seqno: "12345".toBytes(),
|
seqno: "12345".toBytes(),
|
||||||
topicIDs: @["bar"])
|
topicIDs: @["bar"])
|
||||||
mCache.put(msg)
|
mCache.put(defaultMsgIdProvider(msg), msg)
|
||||||
|
|
||||||
mCache.shift()
|
mCache.shift()
|
||||||
check mCache.window("bar").len == 0
|
check mCache.window("bar").len == 0
|
||||||
|
|
||||||
for i in 0..<3:
|
for i in 0..<3:
|
||||||
var msg = Message(fromPeer: PeerID.init(PrivateKey.random(ECDSA).get()).data,
|
var msg = Message(fromPeer: PeerID.init(PrivateKey.random(ECDSA).get()),
|
||||||
seqno: "12345".toBytes(),
|
seqno: "12345".toBytes(),
|
||||||
topicIDs: @["baz"])
|
topicIDs: @["baz"])
|
||||||
mCache.put(msg)
|
mCache.put(defaultMsgIdProvider(msg), msg)
|
||||||
|
|
||||||
mCache.shift()
|
mCache.shift()
|
||||||
check mCache.window("baz").len == 0
|
check mCache.window("baz").len == 0
|
||||||
|
@ -71,22 +72,22 @@ suite "MCache":
|
||||||
var mCache = newMCache(1, 5)
|
var mCache = newMCache(1, 5)
|
||||||
|
|
||||||
for i in 0..<3:
|
for i in 0..<3:
|
||||||
var msg = Message(fromPeer: PeerID.init(PrivateKey.random(ECDSA).get()).data,
|
var msg = Message(fromPeer: PeerID.init(PrivateKey.random(ECDSA).get()),
|
||||||
seqno: "12345".toBytes(),
|
seqno: "12345".toBytes(),
|
||||||
topicIDs: @["foo"])
|
topicIDs: @["foo"])
|
||||||
mCache.put(msg)
|
mCache.put(defaultMsgIdProvider(msg), msg)
|
||||||
|
|
||||||
for i in 0..<3:
|
for i in 0..<3:
|
||||||
var msg = Message(fromPeer: PeerID.init(PrivateKey.random(ECDSA).get()).data,
|
var msg = Message(fromPeer: PeerID.init(PrivateKey.random(ECDSA).get()),
|
||||||
seqno: "12345".toBytes(),
|
seqno: "12345".toBytes(),
|
||||||
topicIDs: @["bar"])
|
topicIDs: @["bar"])
|
||||||
mCache.put(msg)
|
mCache.put(defaultMsgIdProvider(msg), msg)
|
||||||
|
|
||||||
for i in 0..<3:
|
for i in 0..<3:
|
||||||
var msg = Message(fromPeer: PeerID.init(PrivateKey.random(ECDSA).get()).data,
|
var msg = Message(fromPeer: PeerID.init(PrivateKey.random(ECDSA).get()),
|
||||||
seqno: "12345".toBytes(),
|
seqno: "12345".toBytes(),
|
||||||
topicIDs: @["baz"])
|
topicIDs: @["baz"])
|
||||||
mCache.put(msg)
|
mCache.put(defaultMsgIdProvider(msg), msg)
|
||||||
|
|
||||||
mCache.shift()
|
mCache.shift()
|
||||||
check mCache.window("foo").len == 0
|
check mCache.window("foo").len == 0
|
||||||
|
|
|
@ -1,33 +1,14 @@
|
||||||
import unittest
|
import unittest
|
||||||
import nimcrypto/sha2,
|
|
||||||
stew/[base64, byteutils]
|
import ../../libp2p/[peer, peerinfo,
|
||||||
import ../../libp2p/[peer,
|
|
||||||
crypto/crypto,
|
crypto/crypto,
|
||||||
protocols/pubsub/rpc/message,
|
protocols/pubsub/rpc/message,
|
||||||
protocols/pubsub/rpc/messages]
|
protocols/pubsub/rpc/messages]
|
||||||
|
|
||||||
suite "Message":
|
suite "Message":
|
||||||
test "default message id":
|
test "signature":
|
||||||
let msg = Message(fromPeer: PeerID.init(PrivateKey.random(ECDSA).get()).data,
|
let
|
||||||
seqno: ("12345").toBytes())
|
peer = PeerInfo.init(PrivateKey.random(ECDSA).get())
|
||||||
|
msg = Message.init(peer, @[], "topic", sign = true)
|
||||||
check msg.msgId == byteutils.toHex(msg.seqno) & PeerID.init(msg.fromPeer).pretty
|
|
||||||
|
|
||||||
test "sha256 message id":
|
|
||||||
let msg = Message(fromPeer: PeerID.init(PrivateKey.random(ECDSA).get()).data,
|
|
||||||
seqno: ("12345").toBytes(),
|
|
||||||
data: ("12345").toBytes())
|
|
||||||
|
|
||||||
proc msgIdProvider(m: Message): string =
|
|
||||||
Base64Url.encode(
|
|
||||||
sha256.
|
|
||||||
digest(m.data).
|
|
||||||
data.
|
|
||||||
toOpenArray(0, sha256.sizeDigest() - 1))
|
|
||||||
|
|
||||||
check msg.msgId == Base64Url.encode(
|
|
||||||
sha256.
|
|
||||||
digest(msg.data).
|
|
||||||
data.
|
|
||||||
toOpenArray(0, sha256.sizeDigest() - 1))
|
|
||||||
|
|
||||||
|
check verify(msg, peer)
|
||||||
|
|
Loading…
Reference in New Issue