feat: allow msgIdProvider to fail (#688)
* feat: allow msgIdProvider to fail Closes: #642. Changes the return type of the msgIdProvider to `Result[MessageID, string]` so that message id generation can fail. String error type was chosen as this `msgIdProvider` mainly because the failed message id generation drops the message and logs the error provided. Because `msgIdProvider` can be externally provided by library consumers, an enum didn’t make sense and a object seemed to be overkill. Exceptions could have been used as well, however, in this case, Result ergonomics were warranted and prevented wrapping quite a large block of code in try/except. The `defaultMsgIdProvider` function previously allowed message id generation to fail silently for use in the tests: when seqno or source peerid were not valid, the message id generated was based on a hash of the message data and topic ids. The silent failing was moved to the `defaultMsgIdProvider` used only in the tests so that it could not fail silently in applications. Unit tests were added for the `defaultMsgIdProvider`. * Change MsgIdProvider error type to ValidationResult
This commit is contained in:
parent
9a7e3bda3c
commit
3b718baa97
|
@ -0,0 +1,6 @@
|
|||
# this module will be further extended in PR
|
||||
# https://github.com/status-im/nim-libp2p/pull/107/
|
||||
|
||||
type
|
||||
ValidationResult* {.pure.} = enum
|
||||
Accept, Reject, Ignore
|
|
@ -96,7 +96,14 @@ method rpcHandler*(f: FloodSub,
|
|||
f.handleSubscribe(peer, sub.topic, sub.subscribe)
|
||||
|
||||
for msg in rpcMsg.messages: # for every message
|
||||
let msgId = f.msgIdProvider(msg)
|
||||
let msgIdResult = f.msgIdProvider(msg)
|
||||
if msgIdResult.isErr:
|
||||
debug "Dropping message due to failed message id generation",
|
||||
error = msgIdResult.error
|
||||
# TODO: descore peers due to error during message validation (malicious?)
|
||||
continue
|
||||
|
||||
let msgId = msgIdResult.get
|
||||
|
||||
if f.addSeen(msgId):
|
||||
trace "Dropping already-seen message", msgId, peer
|
||||
|
@ -184,7 +191,14 @@ method publish*(f: FloodSub,
|
|||
Message.init(none(PeerInfo), data, topic, none(uint64), false)
|
||||
else:
|
||||
Message.init(some(f.peerInfo), data, topic, some(f.msgSeqno), f.sign)
|
||||
msgId = f.msgIdProvider(msg)
|
||||
msgIdResult = f.msgIdProvider(msg)
|
||||
|
||||
if msgIdResult.isErr:
|
||||
trace "Error generating message id, skipping publish",
|
||||
error = msgIdResult.error
|
||||
return 0
|
||||
|
||||
let msgId = msgIdResult.get
|
||||
|
||||
trace "Created new message",
|
||||
msg = shortLog(msg), peers = peers.len, topic, msgId
|
||||
|
|
|
@ -362,8 +362,16 @@ method rpcHandler*(g: GossipSub,
|
|||
|
||||
for i in 0..<rpcMsg.messages.len(): # for every message
|
||||
template msg: untyped = rpcMsg.messages[i]
|
||||
let msgIdResult = g.msgIdProvider(msg)
|
||||
|
||||
if msgIdResult.isErr:
|
||||
debug "Dropping message due to failed message id generation",
|
||||
error = msgIdResult.error
|
||||
# TODO: descore peers due to error during message validation (malicious?)
|
||||
continue
|
||||
|
||||
let
|
||||
msgId = g.msgIdProvider(msg)
|
||||
msgId = msgIdResult.get
|
||||
msgIdSalted = msgId & g.seenSalt
|
||||
|
||||
# addSeen adds salt to msgId to avoid
|
||||
|
@ -505,7 +513,15 @@ method publish*(g: GossipSub,
|
|||
Message.init(none(PeerInfo), data, topic, none(uint64), false)
|
||||
else:
|
||||
Message.init(some(g.peerInfo), data, topic, some(g.msgSeqno), g.sign)
|
||||
msgId = g.msgIdProvider(msg)
|
||||
msgIdResult = g.msgIdProvider(msg)
|
||||
|
||||
if msgIdResult.isErr:
|
||||
trace "Error generating message id, skipping publish",
|
||||
error = msgIdResult.error
|
||||
libp2p_gossipsub_failed_publish.inc()
|
||||
return 0
|
||||
|
||||
let msgId = msgIdResult.get
|
||||
|
||||
logScope: msgId = shortLog(msgId)
|
||||
|
||||
|
|
|
@ -11,7 +11,8 @@
|
|||
|
||||
import std/[tables, sequtils, sets, strutils]
|
||||
import chronos, chronicles, metrics, bearssl
|
||||
import ./pubsubpeer,
|
||||
import ./errors as pubsub_errors,
|
||||
./pubsubpeer,
|
||||
./rpc/[message, messages, protobuf],
|
||||
../../switch,
|
||||
../protocol,
|
||||
|
@ -76,16 +77,13 @@ type
|
|||
TopicHandler* = proc(topic: string,
|
||||
data: seq[byte]): Future[void] {.gcsafe, raises: [Defect].}
|
||||
|
||||
ValidationResult* {.pure.} = enum
|
||||
Accept, Reject, Ignore
|
||||
|
||||
ValidatorHandler* = proc(topic: string,
|
||||
message: Message): Future[ValidationResult] {.gcsafe, raises: [Defect].}
|
||||
|
||||
TopicPair* = tuple[topic: string, handler: TopicHandler]
|
||||
|
||||
MsgIdProvider* =
|
||||
proc(m: Message): MessageID {.noSideEffect, raises: [Defect], gcsafe.}
|
||||
proc(m: Message): Result[MessageID, ValidationResult] {.noSideEffect, raises: [Defect], gcsafe.}
|
||||
|
||||
SubscriptionValidator* =
|
||||
proc(topic: string): bool {.raises: [Defect], gcsafe.}
|
||||
|
|
|
@ -16,9 +16,10 @@ import ./messages,
|
|||
../../../peerid,
|
||||
../../../peerinfo,
|
||||
../../../crypto/crypto,
|
||||
../../../protobuf/minprotobuf
|
||||
../../../protobuf/minprotobuf,
|
||||
../../../protocols/pubsub/errors
|
||||
|
||||
export messages
|
||||
export errors, messages
|
||||
|
||||
logScope:
|
||||
topics = "pubsubmessage"
|
||||
|
@ -28,16 +29,12 @@ const PubSubPrefix = toBytes("libp2p-pubsub:")
|
|||
declareCounter(libp2p_pubsub_sig_verify_success, "pubsub successfully validated messages")
|
||||
declareCounter(libp2p_pubsub_sig_verify_failure, "pubsub failed validated messages")
|
||||
|
||||
func defaultMsgIdProvider*(m: Message): MessageID =
|
||||
let mid =
|
||||
if m.seqno.len > 0 and m.fromPeer.data.len > 0:
|
||||
byteutils.toHex(m.seqno) & $m.fromPeer
|
||||
else:
|
||||
# This part is irrelevant because it's not standard,
|
||||
# We use it exclusively for testing basically and users should
|
||||
# implement their own logic in the case they use anonymization
|
||||
$m.data.hash & $m.topicIDs.hash
|
||||
mid.toBytes()
|
||||
func defaultMsgIdProvider*(m: Message): Result[MessageID, ValidationResult] =
|
||||
if m.seqno.len > 0 and m.fromPeer.data.len > 0:
|
||||
let mid = byteutils.toHex(m.seqno) & $m.fromPeer
|
||||
ok mid.toBytes()
|
||||
else:
|
||||
err ValidationResult.Reject
|
||||
|
||||
proc sign*(msg: Message, privateKey: PrivateKey): CryptoResult[seq[byte]] =
|
||||
ok((? privateKey.sign(PubSubPrefix & encodeMessage(msg, false))).getBytes())
|
||||
|
|
|
@ -20,6 +20,7 @@ import utils,
|
|||
protocols/pubsub/floodsub,
|
||||
protocols/pubsub/rpc/messages,
|
||||
protocols/pubsub/peertable]
|
||||
import ../../libp2p/protocols/pubsub/errors as pubsub_errors
|
||||
|
||||
import ../helpers
|
||||
|
||||
|
|
|
@ -39,6 +39,8 @@ proc randomPeerId(): PeerId =
|
|||
except CatchableError as exc:
|
||||
raise newException(Defect, exc.msg)
|
||||
|
||||
const MsgIdFail = "msg id gen failure"
|
||||
|
||||
suite "GossipSub internal":
|
||||
teardown:
|
||||
checkTrackers()
|
||||
|
@ -308,7 +310,7 @@ suite "GossipSub internal":
|
|||
conn.peerId = peerId
|
||||
inc seqno
|
||||
let msg = Message.init(peerId, ("HELLO" & $i).toBytes(), topic, some(seqno))
|
||||
gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg)
|
||||
gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdFail), msg)
|
||||
|
||||
check gossipSub.fanout[topic].len == 15
|
||||
check gossipSub.mesh[topic].len == 15
|
||||
|
@ -355,7 +357,7 @@ suite "GossipSub internal":
|
|||
conn.peerId = peerId
|
||||
inc seqno
|
||||
let msg = Message.init(peerId, ("HELLO" & $i).toBytes(), topic, some(seqno))
|
||||
gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg)
|
||||
gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdFail), msg)
|
||||
|
||||
let peers = gossipSub.getGossipPeers()
|
||||
check peers.len == gossipSub.parameters.d
|
||||
|
@ -396,7 +398,7 @@ suite "GossipSub internal":
|
|||
conn.peerId = peerId
|
||||
inc seqno
|
||||
let msg = Message.init(peerId, ("HELLO" & $i).toBytes(), topic, some(seqno))
|
||||
gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg)
|
||||
gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdFail), msg)
|
||||
|
||||
let peers = gossipSub.getGossipPeers()
|
||||
check peers.len == gossipSub.parameters.d
|
||||
|
@ -437,7 +439,7 @@ suite "GossipSub internal":
|
|||
conn.peerId = peerId
|
||||
inc seqno
|
||||
let msg = Message.init(peerId, ("bar" & $i).toBytes(), topic, some(seqno))
|
||||
gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg)
|
||||
gossipSub.mcache.put(gossipSub.msgIdProvider(msg).expect(MsgIdFail), msg)
|
||||
|
||||
let peers = gossipSub.getGossipPeers()
|
||||
check peers.len == 0
|
||||
|
|
|
@ -24,6 +24,7 @@ import utils, ../../libp2p/[errors,
|
|||
protocols/pubsub/peertable,
|
||||
protocols/pubsub/timedcache,
|
||||
protocols/pubsub/rpc/messages]
|
||||
import ../../libp2p/protocols/pubsub/errors as pubsub_errors
|
||||
import ../helpers
|
||||
|
||||
proc `$`(peer: PubSubPeer): string = shortLog(peer)
|
||||
|
|
|
@ -5,19 +5,21 @@ import stew/byteutils
|
|||
import ../../libp2p/[peerid,
|
||||
crypto/crypto,
|
||||
protocols/pubsub/mcache,
|
||||
protocols/pubsub/rpc/message,
|
||||
protocols/pubsub/rpc/messages]
|
||||
import ./utils
|
||||
|
||||
var rng = newRng()
|
||||
|
||||
proc randomPeerId(): PeerId =
|
||||
PeerId.init(PrivateKey.random(ECDSA, rng[]).get()).get()
|
||||
|
||||
const MsgIdGenFail = "msg id gen failure"
|
||||
|
||||
suite "MCache":
|
||||
test "put/get":
|
||||
var mCache = MCache.init(3, 5)
|
||||
var msg = Message(fromPeer: randomPeerId(), seqno: "12345".toBytes())
|
||||
let msgId = defaultMsgIdProvider(msg)
|
||||
let msgId = defaultMsgIdProvider(msg).expect(MsgIdGenFail)
|
||||
mCache.put(msgId, msg)
|
||||
check mCache.get(msgId).isSome and mCache.get(msgId).get() == msg
|
||||
|
||||
|
@ -28,13 +30,13 @@ suite "MCache":
|
|||
var msg = Message(fromPeer: randomPeerId(),
|
||||
seqno: "12345".toBytes(),
|
||||
topicIDs: @["foo"])
|
||||
mCache.put(defaultMsgIdProvider(msg), msg)
|
||||
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenFail), msg)
|
||||
|
||||
for i in 0..<5:
|
||||
var msg = Message(fromPeer: randomPeerId(),
|
||||
seqno: "12345".toBytes(),
|
||||
topicIDs: @["bar"])
|
||||
mCache.put(defaultMsgIdProvider(msg), msg)
|
||||
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenFail), msg)
|
||||
|
||||
var mids = mCache.window("foo")
|
||||
check mids.len == 3
|
||||
|
@ -49,7 +51,7 @@ suite "MCache":
|
|||
var msg = Message(fromPeer: randomPeerId(),
|
||||
seqno: "12345".toBytes(),
|
||||
topicIDs: @["foo"])
|
||||
mCache.put(defaultMsgIdProvider(msg), msg)
|
||||
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenFail), msg)
|
||||
|
||||
mCache.shift()
|
||||
check mCache.window("foo").len == 0
|
||||
|
@ -58,7 +60,7 @@ suite "MCache":
|
|||
var msg = Message(fromPeer: randomPeerId(),
|
||||
seqno: "12345".toBytes(),
|
||||
topicIDs: @["bar"])
|
||||
mCache.put(defaultMsgIdProvider(msg), msg)
|
||||
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenFail), msg)
|
||||
|
||||
mCache.shift()
|
||||
check mCache.window("bar").len == 0
|
||||
|
@ -67,7 +69,7 @@ suite "MCache":
|
|||
var msg = Message(fromPeer: randomPeerId(),
|
||||
seqno: "12345".toBytes(),
|
||||
topicIDs: @["baz"])
|
||||
mCache.put(defaultMsgIdProvider(msg), msg)
|
||||
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenFail), msg)
|
||||
|
||||
mCache.shift()
|
||||
check mCache.window("baz").len == 0
|
||||
|
@ -79,19 +81,19 @@ suite "MCache":
|
|||
var msg = Message(fromPeer: randomPeerId(),
|
||||
seqno: "12345".toBytes(),
|
||||
topicIDs: @["foo"])
|
||||
mCache.put(defaultMsgIdProvider(msg), msg)
|
||||
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenFail), msg)
|
||||
|
||||
for i in 0..<3:
|
||||
var msg = Message(fromPeer: randomPeerId(),
|
||||
seqno: "12345".toBytes(),
|
||||
topicIDs: @["bar"])
|
||||
mCache.put(defaultMsgIdProvider(msg), msg)
|
||||
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenFail), msg)
|
||||
|
||||
for i in 0..<3:
|
||||
var msg = Message(fromPeer: randomPeerId(),
|
||||
seqno: "12345".toBytes(),
|
||||
topicIDs: @["baz"])
|
||||
mCache.put(defaultMsgIdProvider(msg), msg)
|
||||
mCache.put(defaultMsgIdProvider(msg).expect(MsgIdGenFail), msg)
|
||||
|
||||
mCache.shift()
|
||||
check mCache.window("foo").len == 0
|
||||
|
|
|
@ -3,8 +3,10 @@ import unittest2
|
|||
{.used.}
|
||||
|
||||
import options
|
||||
import stew/byteutils
|
||||
import ../../libp2p/[peerid, peerinfo,
|
||||
crypto/crypto,
|
||||
protocols/pubsub/errors,
|
||||
protocols/pubsub/rpc/message,
|
||||
protocols/pubsub/rpc/messages]
|
||||
|
||||
|
@ -18,3 +20,56 @@ suite "Message":
|
|||
msg = Message.init(some(peer), @[], "topic", some(seqno), sign = true)
|
||||
|
||||
check verify(msg)
|
||||
|
||||
test "defaultMsgIdProvider success":
|
||||
let
|
||||
seqno = 11'u64
|
||||
pkHex =
|
||||
"""08011240B9EA7F0357B5C1247E4FCB5AD09C46818ECB07318CA84711875F4C6C
|
||||
E6B946186A4EB44E0D714B2A2D48263D75CF52D30BEF9D9AE2A9FEB7DAF1775F
|
||||
E731065A"""
|
||||
seckey = PrivateKey.init(fromHex(stripSpaces(pkHex)))
|
||||
.expect("invalid private key bytes")
|
||||
peer = PeerInfo.new(seckey)
|
||||
msg = Message.init(some(peer), @[], "topic", some(seqno), sign = true)
|
||||
msgIdResult = msg.defaultMsgIdProvider()
|
||||
|
||||
check:
|
||||
msgIdResult.isOk
|
||||
string.fromBytes(msgIdResult.get) ==
|
||||
"000000000000000b12D3KooWGyLzSt9g4U9TdHYDvVWAs5Ht4WrocgoyqPxxvnqAL8qw"
|
||||
|
||||
test "defaultMsgIdProvider error - no source peer id":
|
||||
let
|
||||
seqno = 11'u64
|
||||
pkHex =
|
||||
"""08011240B9EA7F0357B5C1247E4FCB5AD09C46818ECB07318CA84711875F4C6C
|
||||
E6B946186A4EB44E0D714B2A2D48263D75CF52D30BEF9D9AE2A9FEB7DAF1775F
|
||||
E731065A"""
|
||||
seckey = PrivateKey.init(fromHex(stripSpaces(pkHex)))
|
||||
.expect("invalid private key bytes")
|
||||
peer = PeerInfo.new(seckey)
|
||||
|
||||
var msg = Message.init(peer.some, @[], "topic", some(seqno), sign = true)
|
||||
msg.fromPeer = PeerId()
|
||||
let msgIdResult = msg.defaultMsgIdProvider()
|
||||
|
||||
check:
|
||||
msgIdResult.isErr
|
||||
msgIdResult.error == ValidationResult.Reject
|
||||
|
||||
test "defaultMsgIdProvider error - no source seqno":
|
||||
let
|
||||
pkHex =
|
||||
"""08011240B9EA7F0357B5C1247E4FCB5AD09C46818ECB07318CA84711875F4C6C
|
||||
E6B946186A4EB44E0D714B2A2D48263D75CF52D30BEF9D9AE2A9FEB7DAF1775F
|
||||
E731065A"""
|
||||
seckey = PrivateKey.init(fromHex(stripSpaces(pkHex)))
|
||||
.expect("invalid private key bytes")
|
||||
peer = PeerInfo.new(seckey)
|
||||
msg = Message.init(some(peer), @[], "topic", uint64.none, sign = true)
|
||||
msgIdResult = msg.defaultMsgIdProvider()
|
||||
|
||||
check:
|
||||
msgIdResult.isErr
|
||||
msgIdResult.error == ValidationResult.Reject
|
||||
|
|
|
@ -4,24 +4,37 @@ const
|
|||
libp2p_pubsub_verify {.booldefine.} = true
|
||||
libp2p_pubsub_anonymize {.booldefine.} = false
|
||||
|
||||
import random, tables
|
||||
import chronos
|
||||
import hashes, random, tables
|
||||
import chronos, stew/[byteutils, results]
|
||||
import ../../libp2p/[builders,
|
||||
protocols/pubsub/errors,
|
||||
protocols/pubsub/pubsub,
|
||||
protocols/pubsub/gossipsub,
|
||||
protocols/pubsub/floodsub,
|
||||
protocols/pubsub/rpc/messages,
|
||||
protocols/secure/secure]
|
||||
|
||||
export builders
|
||||
|
||||
randomize()
|
||||
|
||||
func defaultMsgIdProvider*(m: Message): Result[MessageID, ValidationResult] =
|
||||
let mid =
|
||||
if m.seqno.len > 0 and m.fromPeer.data.len > 0:
|
||||
byteutils.toHex(m.seqno) & $m.fromPeer
|
||||
else:
|
||||
# This part is irrelevant because it's not standard,
|
||||
# We use it exclusively for testing basically and users should
|
||||
# implement their own logic in the case they use anonymization
|
||||
$m.data.hash & $m.topicIDs.hash
|
||||
ok mid.toBytes()
|
||||
|
||||
proc generateNodes*(
|
||||
num: Natural,
|
||||
secureManagers: openArray[SecureProtocol] = [
|
||||
SecureProtocol.Noise
|
||||
],
|
||||
msgIdProvider: MsgIdProvider = nil,
|
||||
msgIdProvider: MsgIdProvider = defaultMsgIdProvider,
|
||||
gossip: bool = false,
|
||||
triggerSelf: bool = false,
|
||||
verifySignature: bool = libp2p_pubsub_verify,
|
||||
|
|
Loading…
Reference in New Issue