From aa6756dfe049dd3eba215938308ec956335c7e9e Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Sun, 28 Jun 2020 17:56:38 +0200 Subject: [PATCH] allow message id provider to be specified (#243) * don't send public key in message when not signing (information leak) * don't run rebalance if there are peers in gossip (see #242) * don't crash randomly on bad peer id from remote --- libp2p/protocols/pubsub/floodsub.nim | 15 +++--- libp2p/protocols/pubsub/gossipsub.nim | 41 +++++++++------- libp2p/protocols/pubsub/mcache.nim | 14 +++--- libp2p/protocols/pubsub/pubsub.nim | 15 ++++-- libp2p/protocols/pubsub/pubsubpeer.nim | 2 +- libp2p/protocols/pubsub/rpc/message.nim | 62 ++++++++++-------------- libp2p/protocols/pubsub/rpc/messages.nim | 7 +-- libp2p/protocols/pubsub/rpc/protobuf.nim | 14 ++++-- libp2p/standard_setup.nim | 14 ++++-- tests/pubsub/testgossipinternal.nim | 17 ++++--- tests/pubsub/testmcache.nim | 39 +++++++-------- tests/pubsub/testmessage.nim | 33 +++---------- 12 files changed, 137 insertions(+), 136 deletions(-) diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 7f68e6d..91f31c1 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -13,7 +13,6 @@ import pubsub, pubsubpeer, timedcache, rpc/[messages, message], - ../../crypto/crypto, ../../stream/connection, ../../peer, ../../peerinfo, @@ -65,8 +64,11 @@ method rpcHandler*(f: FloodSub, if m.messages.len > 0: # if there are any messages var toSendPeers: HashSet[string] = initHashSet[string]() for msg in m.messages: # for every message - if msg.msgId notin f.seen: - f.seen.put(msg.msgId) # add the message to the seen cache + let msgId = f.msgIdProvider(msg) + logScope: msgId + + if msgId notin f.seen: + f.seen.put(msgId) # add the message to the seen cache if f.verifySignature and not msg.verify(peer.peerInfo): trace "dropping message due to failed signature verification" @@ -81,10 +83,9 @@ method rpcHandler*(f: FloodSub, toSendPeers.incl(f.floodsub[t]) # get all the peers interested in this topic if t in f.topics: # check that we're subscribed to it for h in f.topics[t].handler: - trace "calling handler for message", msg = msg.msgId, - topicId = t, + trace "calling handler for message", topicId = t, localPeer = f.peerInfo.id, - fromPeer = msg.fromPeerId().pretty + fromPeer = msg.fromPeer.pretty await h(t, msg.data) # trigger user provided handler # forward the message to all peers interested in it @@ -129,7 +130,7 @@ method publish*(f: FloodSub, return trace "publishing on topic", name = topic - let msg = newMessage(f.peerInfo, data, topic, f.sign) + let msg = Message.init(f.peerInfo, data, topic, f.sign) var sent: seq[Future[void]] # start the future but do not wait yet for p in f.floodsub.getOrDefault(topic): diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 85bff0f..028a6ce 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -15,7 +15,6 @@ import pubsub, mcache, timedcache, rpc/[messages, message], - ../../crypto/crypto, ../protocol, ../../peerinfo, ../../stream/connection, @@ -361,12 +360,16 @@ method rpcHandler*(g: GossipSub, if m.messages.len > 0: # if there are any messages var toSendPeers: HashSet[string] for msg in m.messages: # for every message - trace "processing message with id", msg = msg.msgId - if msg.msgId in g.seen: - trace "message already processed, skipping", msg = msg.msgId + let msgId = g.msgIdProvider(msg) + logScope: msgId + + if msgId in g.seen: + trace "message already processed, skipping" continue - g.seen.put(msg.msgId) # add the message to the seen cache + trace "processing message" + + g.seen.put(msgId) # add the message to the seen cache if g.verifySignature and not msg.verify(peer.peerInfo): trace "dropping message due to failed signature verification" @@ -377,8 +380,8 @@ method rpcHandler*(g: GossipSub, continue # this shouldn't happen - if g.peerInfo.peerId == msg.fromPeerId(): - trace "skipping messages from self", msg = msg.msgId + if g.peerInfo.peerId == msg.fromPeer: + trace "skipping messages from self" continue for t in msg.topicIDs: # for every topic in the message @@ -391,10 +394,9 @@ method rpcHandler*(g: GossipSub, if t in g.topics: # if we're subscribed to the topic for h in g.topics[t].handler: - trace "calling handler for message", msg = msg.msgId, - topicId = t, + trace "calling handler for message", topicId = t, localPeer = g.peerInfo.id, - fromPeer = msg.fromPeerId().pretty + fromPeer = msg.fromPeer.pretty await h(t, msg.data) # trigger user provided handler # forward the message to all peers interested in it @@ -409,7 +411,7 @@ method rpcHandler*(g: GossipSub, let msgs = m.messages.filterIt( # don't forward to message originator - id != it.fromPeerId() + id != it.fromPeer ) var sent: seq[Future[void]] @@ -460,9 +462,9 @@ method publish*(g: GossipSub, trace "about to publish message on topic", name = topic, data = data.shortLog - var peers: HashSet[string] # TODO: we probably don't need to try multiple times if data.len > 0 and topic.len > 0: + var peers = g.mesh.getOrDefault(topic) for _ in 0..<5: # try to get peers up to 5 times if peers.len > 0: break @@ -480,7 +482,10 @@ method publish*(g: GossipSub, # wait a second between tries await sleepAsync(1.seconds) - let msg = newMessage(g.peerInfo, data, topic, g.sign) + let + msg = Message.init(g.peerInfo, data, topic, g.sign) + msgId = g.msgIdProvider(msg) + trace "created new message", msg var sent: seq[Future[void]] for p in peers: @@ -488,8 +493,8 @@ method publish*(g: GossipSub, continue trace "publishing on topic", name = topic - if msg.msgId notin g.mcache: - g.mcache.put(msg) + if msgId notin g.mcache: + g.mcache.put(msgId, msg) if p in g.peers: sent.add(g.peers[p].send(@[RPCMsg(messages: @[msg])])) @@ -499,10 +504,10 @@ method publish*(g: GossipSub, method start*(g: GossipSub) {.async.} = debug "gossipsub start" - + ## start pubsub ## start long running/repeating procedures - + # interlock start to to avoid overlapping to stops await g.heartbeatLock.acquire() @@ -514,7 +519,7 @@ method start*(g: GossipSub) {.async.} = method stop*(g: GossipSub) {.async.} = debug "gossipsub stop" - + ## stop pubsub ## stop long running tasks diff --git a/libp2p/protocols/pubsub/mcache.nim b/libp2p/protocols/pubsub/mcache.nim index 06157c9..82231f5 100644 --- a/libp2p/protocols/pubsub/mcache.nim +++ b/libp2p/protocols/pubsub/mcache.nim @@ -9,7 +9,7 @@ import chronos, chronicles import tables, options, sets, sequtils -import rpc/[messages, message], timedcache +import rpc/[messages], timedcache type CacheEntry* = object @@ -30,17 +30,17 @@ proc get*(c: MCache, mid: string): Option[Message] = proc contains*(c: MCache, mid: string): bool = c.get(mid).isSome -proc put*(c: MCache, msg: Message) = +proc put*(c: MCache, msgId: string, msg: Message) = proc handler(key: string, val: Message) {.gcsafe.} = ## make sure we remove the message from history ## to keep things consisten c.history.applyIt( - it.filterIt(it.mid != msg.msgId) + it.filterIt(it.mid != msgId) ) - if msg.msgId notin c.msgs: - c.msgs.put(msg.msgId, msg, handler = handler) - c.history[0].add(CacheEntry(mid: msg.msgId, msg: msg)) + if msgId notin c.msgs: + c.msgs.put(msgId, msg, handler = handler) + c.history[0].add(CacheEntry(mid: msgId, msg: msg)) proc window*(c: MCache, topic: string): HashSet[string] = result = initHashSet[string]() @@ -56,7 +56,7 @@ proc window*(c: MCache, topic: string): HashSet[string] = for entry in slot: for t in entry.msg.topicIDs: if t == topic: - result.incl(entry.msg.msgId) + result.incl(entry.mid) break proc shift*(c: MCache) = diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index c893892..8a0bbab 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -10,9 +10,10 @@ import tables, sequtils, sets import chronos, chronicles import pubsubpeer, - rpc/messages, + rpc/[message, messages], ../protocol, ../../stream/connection, + ../../peer, ../../peerinfo import metrics @@ -38,6 +39,9 @@ type TopicPair* = tuple[topic: string, handler: TopicHandler] + MsgIdProvider* = + proc(m: Message): string {.noSideEffect, raises: [Defect], nimcall, gcsafe.} + Topic* = object name*: string handler*: seq[TopicHandler] @@ -52,6 +56,7 @@ type cleanupLock: AsyncLock validators*: Table[string, HashSet[ValidatorHandler]] observers: ref seq[PubSubObserver] # ref as in smart_ptr + msgIdProvider*: MsgIdProvider # Turn message into message id (not nil) proc sendSubs*(p: PubSub, peer: PubSubPeer, @@ -244,6 +249,8 @@ method publish*(p: PubSub, method initPubSub*(p: PubSub) {.base.} = ## perform pubsub initialization p.observers = new(seq[PubSubObserver]) + if p.msgIdProvider == nil: + p.msgIdProvider = defaultMsgIdProvider method start*(p: PubSub) {.async, base.} = ## start pubsub @@ -292,12 +299,14 @@ proc newPubSub*(P: typedesc[PubSub], peerInfo: PeerInfo, triggerSelf: bool = false, verifySignature: bool = true, - sign: bool = true): P = + sign: bool = true, + msgIdProvider: MsgIdProvider = defaultMsgIdProvider): P = result = P(peerInfo: peerInfo, triggerSelf: triggerSelf, verifySignature: verifySignature, sign: sign, - cleanupLock: newAsyncLock()) + cleanupLock: newAsyncLock(), + msgIdProvider: msgIdProvider) result.initPubSub() proc addObserver*(p: PubSub; observer: PubSubObserver) = p.observers[] &= observer diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 681c511..3fb4377 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -154,7 +154,7 @@ proc sendMsg*(p: PubSubPeer, topic: string, data: seq[byte], sign: bool): Future[void] {.gcsafe.} = - p.send(@[RPCMsg(messages: @[newMessage(p.peerInfo, data, topic, sign)])]) + p.send(@[RPCMsg(messages: @[Message.init(p.peerInfo, data, topic, sign)])]) proc sendGraft*(p: PubSubPeer, topics: seq[string]) {.async.} = for topic in topics: diff --git a/libp2p/protocols/pubsub/rpc/message.nim b/libp2p/protocols/pubsub/rpc/message.nim index 6e0a594..d7a35ae 100644 --- a/libp2p/protocols/pubsub/rpc/message.nim +++ b/libp2p/protocols/pubsub/rpc/message.nim @@ -7,9 +7,12 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. +{.push raises: [Defect].} + import options import chronicles, stew/byteutils import metrics +import chronicles import nimcrypto/sysrand import messages, protobuf, ../../../peer, @@ -20,33 +23,18 @@ import messages, protobuf, logScope: topics = "pubsubmessage" -const PubSubPrefix = "libp2p-pubsub:" +const PubSubPrefix = toBytes("libp2p-pubsub:") declareCounter(libp2p_pubsub_sig_verify_success, "pubsub successfully validated messages") declareCounter(libp2p_pubsub_sig_verify_failure, "pubsub failed validated messages") -proc msgIdProvider(m: Message): string = - ## default msg id provider - crypto.toHex(m.seqno) & PeerID.init(m.fromPeer).pretty +func defaultMsgIdProvider*(m: Message): string = + byteutils.toHex(m.seqno) & m.fromPeer.pretty -template msgId*(m: Message): string = - ## calls the ``msgIdProvider`` from - ## the instantiation scope - ## - mixin msgIdProvider - m.msgIdProvider() - -proc fromPeerId*(m: Message): PeerId = - PeerID.init(m.fromPeer) - -proc sign*(msg: Message, p: PeerInfo): Message {.gcsafe.} = +proc sign*(msg: Message, p: PeerInfo): seq[byte] {.gcsafe, raises: [ResultError[CryptoError], Defect].} = var buff = initProtoBuffer() encodeMessage(msg, buff) - if buff.buffer.len > 0: - result = msg - result.signature = p.privateKey. - sign(PubSubPrefix.toBytes() & buff.buffer).tryGet(). - getBytes() + p.privateKey.sign(PubSubPrefix & buff.buffer).tryGet().getBytes() proc verify*(m: Message, p: PeerInfo): bool = if m.signature.len > 0 and m.key.len > 0: @@ -61,27 +49,29 @@ proc verify*(m: Message, p: PeerInfo): bool = var key: PublicKey if remote.init(m.signature) and key.init(m.key): trace "verifying signature", remoteSignature = remote - result = remote.verify(PubSubPrefix.toBytes() & buff.buffer, key) - + result = remote.verify(PubSubPrefix & buff.buffer, key) + if result: libp2p_pubsub_sig_verify_success.inc() else: libp2p_pubsub_sig_verify_failure.inc() -proc newMessage*(p: PeerInfo, - data: seq[byte], - topic: string, - sign: bool = true): Message {.gcsafe.} = +proc init*( + T: type Message, + p: PeerInfo, + data: seq[byte], + topic: string, + sign: bool = true): Message {.gcsafe, raises: [CatchableError, Defect].} = var seqno: seq[byte] = newSeq[byte](8) - if randomBytes(addr seqno[0], 8) > 0: - if p.publicKey.isSome: - var key: seq[byte] = p.publicKey.get().getBytes().tryGet() + if randomBytes(addr seqno[0], 8) <= 0: + raise (ref CatchableError)(msg: "Cannot get randomness for message") - result = Message(fromPeer: p.peerId.getBytes(), - data: data, - seqno: seqno, - topicIDs: @[topic]) - if sign: - result = result.sign(p) + result = Message( + fromPeer: p.peerId, + data: data, + seqno: seqno, + topicIDs: @[topic]) - result.key = key + if sign and p.publicKey.isSome: + result.signature = sign(result, p) + result.key = p.publicKey.get().getBytes().tryGet() diff --git a/libp2p/protocols/pubsub/rpc/messages.nim b/libp2p/protocols/pubsub/rpc/messages.nim index da7ac1a..afb6a5f 100644 --- a/libp2p/protocols/pubsub/rpc/messages.nim +++ b/libp2p/protocols/pubsub/rpc/messages.nim @@ -9,6 +9,7 @@ import options, sequtils import ../../../utility +import ../../../peer type SubOpts* = object @@ -16,7 +17,7 @@ type topic*: string Message* = object - fromPeer*: seq[byte] + fromPeer*: PeerId data*: seq[byte] seqno*: seq[byte] topicIDs*: seq[string] @@ -75,10 +76,10 @@ func shortLog*(c: ControlMessage): auto = graft: mapIt(c.graft, it.shortLog), prune: mapIt(c.prune, it.shortLog) ) - + func shortLog*(msg: Message): auto = ( - fromPeer: msg.fromPeer.shortLog, + fromPeer: msg.fromPeer, data: msg.data.shortLog, seqno: msg.seqno.shortLog, topicIDs: $msg.topicIDs, diff --git a/libp2p/protocols/pubsub/rpc/protobuf.nim b/libp2p/protocols/pubsub/rpc/protobuf.nim index 78a422c..5562324 100644 --- a/libp2p/protocols/pubsub/rpc/protobuf.nim +++ b/libp2p/protocols/pubsub/rpc/protobuf.nim @@ -10,6 +10,7 @@ import options import chronicles import messages, + ../../../peer, ../../../utility, ../../../protobuf/minprotobuf @@ -162,7 +163,7 @@ proc decodeSubs*(pb: var ProtoBuffer): seq[SubOpts] {.gcsafe.} = trace "got subscriptions", subscriptions = result proc encodeMessage*(msg: Message, pb: var ProtoBuffer) {.gcsafe.} = - pb.write(initProtoField(1, msg.fromPeer)) + pb.write(initProtoField(1, msg.fromPeer.getBytes())) pb.write(initProtoField(2, msg.data)) pb.write(initProtoField(3, msg.seqno)) @@ -181,9 +182,16 @@ proc decodeMessages*(pb: var ProtoBuffer): seq[Message] {.gcsafe.} = # TODO: which of this fields are really optional? while true: var msg: Message - if pb.getBytes(1, msg.fromPeer) < 0: + var fromPeer: seq[byte] + if pb.getBytes(1, fromPeer) < 0: break - trace "read message field", fromPeer = msg.fromPeer.shortLog + try: + msg.fromPeer = PeerID.init(fromPeer) + except CatchableError as err: + debug "Invalid fromPeer in message", msg = err.msg + break + + trace "read message field", fromPeer = msg.fromPeer.pretty if pb.getBytes(2, msg.data) < 0: break diff --git a/libp2p/standard_setup.nim b/libp2p/standard_setup.nim index 17fcb87..991378a 100644 --- a/libp2p/standard_setup.nim +++ b/libp2p/standard_setup.nim @@ -9,7 +9,8 @@ import crypto/crypto, transports/[transport, tcptransport], muxers/[muxer, mplex/mplex, mplex/types], protocols/[identify, secure/secure], - protocols/pubsub/[pubsub, gossipsub, floodsub] + protocols/pubsub/[pubsub, gossipsub, floodsub], + protocols/pubsub/rpc/message import protocols/secure/noise, @@ -30,11 +31,12 @@ proc newStandardSwitch*(privKey = none(PrivateKey), secureManagers: openarray[SecureProtocol] = [ # array cos order matters SecureProtocol.Secio, - SecureProtocol.Noise, + SecureProtocol.Noise, ], verifySignature = libp2p_pubsub_verify, sign = libp2p_pubsub_sign, - transportFlags: set[ServerFlags] = {}): Switch = + transportFlags: set[ServerFlags] = {}, + msgIdProvider: MsgIdProvider = defaultMsgIdProvider): Switch = proc createMplex(conn: Connection): Muxer = newMplex(conn) @@ -60,13 +62,15 @@ proc newStandardSwitch*(privKey = none(PrivateKey), peerInfo = peerInfo, triggerSelf = triggerSelf, verifySignature = verifySignature, - sign = sign).PubSub + sign = sign, + msgIdProvider = msgIdProvider).PubSub else: newPubSub(FloodSub, peerInfo = peerInfo, triggerSelf = triggerSelf, verifySignature = verifySignature, - sign = sign).PubSub + sign = sign, + msgIdProvider = msgIdProvider).PubSub newSwitch( peerInfo, diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index 82f785c..603dfaa 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -5,6 +5,7 @@ include ../../libp2p/protocols/pubsub/gossipsub import unittest import stew/byteutils import ../../libp2p/errors +import ../../libp2p/crypto/crypto import ../../libp2p/stream/bufferstream import ../helpers @@ -229,8 +230,8 @@ suite "GossipSub internal": conns &= conn let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA).get()) conn.peerInfo = peerInfo - let msg = newMessage(peerInfo, ("HELLO" & $i).toBytes(), topic, false) - gossipSub.mcache.put(msg) + let msg = Message.init(peerInfo, ("HELLO" & $i).toBytes(), topic, false) + gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg) check gossipSub.fanout[topic].len == 15 check gossipSub.mesh[topic].len == 15 @@ -279,8 +280,8 @@ suite "GossipSub internal": conns &= conn let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA).get()) conn.peerInfo = peerInfo - let msg = newMessage(peerInfo, ("HELLO" & $i).toBytes(), topic, false) - gossipSub.mcache.put(msg) + let msg = Message.init(peerInfo, ("HELLO" & $i).toBytes(), topic, false) + gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg) let peers = gossipSub.getGossipPeers() check peers.len == GossipSubD @@ -322,8 +323,8 @@ suite "GossipSub internal": conns &= conn let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA).get()) conn.peerInfo = peerInfo - let msg = newMessage(peerInfo, ("HELLO" & $i).toBytes(), topic, false) - gossipSub.mcache.put(msg) + let msg = Message.init(peerInfo, ("HELLO" & $i).toBytes(), topic, false) + gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg) let peers = gossipSub.getGossipPeers() check peers.len == GossipSubD @@ -365,8 +366,8 @@ suite "GossipSub internal": conns &= conn let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA).get()) conn.peerInfo = peerInfo - let msg = newMessage(peerInfo, ("bar" & $i).toBytes(), topic, false) - gossipSub.mcache.put(msg) + let msg = Message.init(peerInfo, ("bar" & $i).toBytes(), topic, false) + gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg) let peers = gossipSub.getGossipPeers() check peers.len == 0 diff --git a/tests/pubsub/testmcache.nim b/tests/pubsub/testmcache.nim index fca0011..118be8b 100644 --- a/tests/pubsub/testmcache.nim +++ b/tests/pubsub/testmcache.nim @@ -11,25 +11,26 @@ import ../../libp2p/[peer, suite "MCache": test "put/get": var mCache = newMCache(3, 5) - var msg = Message(fromPeer: PeerID.init(PrivateKey.random(ECDSA).get()).data, + var msg = Message(fromPeer: PeerID.init(PrivateKey.random(ECDSA).get()), seqno: "12345".toBytes()) - mCache.put(msg) - check mCache.get(msg.msgId).isSome and mCache.get(msg.msgId).get() == msg + let msgId = defaultMsgIdProvider(msg) + mCache.put(msgId, msg) + check mCache.get(msgId).isSome and mCache.get(msgId).get() == msg test "window": var mCache = newMCache(3, 5) for i in 0..<3: - var msg = Message(fromPeer: PeerID.init(PrivateKey.random(ECDSA).get()).data, + var msg = Message(fromPeer: PeerID.init(PrivateKey.random(ECDSA).get()), seqno: "12345".toBytes(), topicIDs: @["foo"]) - mCache.put(msg) + mCache.put(defaultMsgIdProvider(msg), msg) for i in 0..<5: - var msg = Message(fromPeer: PeerID.init(PrivateKey.random(ECDSA).get()).data, + var msg = Message(fromPeer: PeerID.init(PrivateKey.random(ECDSA).get()), seqno: "12345".toBytes(), topicIDs: @["bar"]) - mCache.put(msg) + mCache.put(defaultMsgIdProvider(msg), msg) var mids = mCache.window("foo") check mids.len == 3 @@ -41,28 +42,28 @@ suite "MCache": var mCache = newMCache(1, 5) for i in 0..<3: - var msg = Message(fromPeer: PeerID.init(PrivateKey.random(ECDSA).get()).data, + var msg = Message(fromPeer: PeerID.init(PrivateKey.random(ECDSA).get()), seqno: "12345".toBytes(), topicIDs: @["foo"]) - mCache.put(msg) + mCache.put(defaultMsgIdProvider(msg), msg) mCache.shift() check mCache.window("foo").len == 0 for i in 0..<3: - var msg = Message(fromPeer: PeerID.init(PrivateKey.random(ECDSA).get()).data, + var msg = Message(fromPeer: PeerID.init(PrivateKey.random(ECDSA).get()), seqno: "12345".toBytes(), topicIDs: @["bar"]) - mCache.put(msg) + mCache.put(defaultMsgIdProvider(msg), msg) mCache.shift() check mCache.window("bar").len == 0 for i in 0..<3: - var msg = Message(fromPeer: PeerID.init(PrivateKey.random(ECDSA).get()).data, + var msg = Message(fromPeer: PeerID.init(PrivateKey.random(ECDSA).get()), seqno: "12345".toBytes(), topicIDs: @["baz"]) - mCache.put(msg) + mCache.put(defaultMsgIdProvider(msg), msg) mCache.shift() check mCache.window("baz").len == 0 @@ -71,22 +72,22 @@ suite "MCache": var mCache = newMCache(1, 5) for i in 0..<3: - var msg = Message(fromPeer: PeerID.init(PrivateKey.random(ECDSA).get()).data, + var msg = Message(fromPeer: PeerID.init(PrivateKey.random(ECDSA).get()), seqno: "12345".toBytes(), topicIDs: @["foo"]) - mCache.put(msg) + mCache.put(defaultMsgIdProvider(msg), msg) for i in 0..<3: - var msg = Message(fromPeer: PeerID.init(PrivateKey.random(ECDSA).get()).data, + var msg = Message(fromPeer: PeerID.init(PrivateKey.random(ECDSA).get()), seqno: "12345".toBytes(), topicIDs: @["bar"]) - mCache.put(msg) + mCache.put(defaultMsgIdProvider(msg), msg) for i in 0..<3: - var msg = Message(fromPeer: PeerID.init(PrivateKey.random(ECDSA).get()).data, + var msg = Message(fromPeer: PeerID.init(PrivateKey.random(ECDSA).get()), seqno: "12345".toBytes(), topicIDs: @["baz"]) - mCache.put(msg) + mCache.put(defaultMsgIdProvider(msg), msg) mCache.shift() check mCache.window("foo").len == 0 diff --git a/tests/pubsub/testmessage.nim b/tests/pubsub/testmessage.nim index d0d48b4..77128ef 100644 --- a/tests/pubsub/testmessage.nim +++ b/tests/pubsub/testmessage.nim @@ -1,33 +1,14 @@ import unittest -import nimcrypto/sha2, - stew/[base64, byteutils] -import ../../libp2p/[peer, + +import ../../libp2p/[peer, peerinfo, crypto/crypto, protocols/pubsub/rpc/message, protocols/pubsub/rpc/messages] suite "Message": - test "default message id": - let msg = Message(fromPeer: PeerID.init(PrivateKey.random(ECDSA).get()).data, - seqno: ("12345").toBytes()) - - check msg.msgId == byteutils.toHex(msg.seqno) & PeerID.init(msg.fromPeer).pretty - - test "sha256 message id": - let msg = Message(fromPeer: PeerID.init(PrivateKey.random(ECDSA).get()).data, - seqno: ("12345").toBytes(), - data: ("12345").toBytes()) - - proc msgIdProvider(m: Message): string = - Base64Url.encode( - sha256. - digest(m.data). - data. - toOpenArray(0, sha256.sizeDigest() - 1)) - - check msg.msgId == Base64Url.encode( - sha256. - digest(msg.data). - data. - toOpenArray(0, sha256.sizeDigest() - 1)) + test "signature": + let + peer = PeerInfo.init(PrivateKey.random(ECDSA).get()) + msg = Message.init(peer, @[], "topic", sign = true) + check verify(msg, peer)