Gossipsub peer exchange (#647)

* Signed envelopes and routing records
* Send signed peer record as part of identify (#649)
* Add SPR from identify to new peer book (#657)
* Send & receive gossipsub PX
* Add Signed Payload

Co-authored-by: Hanno Cornelius <68783915+jm-clius@users.noreply.github.com>
This commit is contained in:
Tanguy 2022-03-14 09:39:30 +01:00 committed by GitHub
parent cba3ca3c3e
commit c7504d2446
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 422 additions and 79 deletions

View File

@ -42,6 +42,7 @@ type
rng: ref BrHmacDrbgContext rng: ref BrHmacDrbgContext
maxConnections: int maxConnections: int
maxIn: int maxIn: int
sendSignedPeerRecord: bool
maxOut: int maxOut: int
maxConnsPerPeer: int maxConnsPerPeer: int
protoVersion: string protoVersion: string
@ -77,6 +78,9 @@ proc withAddresses*(b: SwitchBuilder, addresses: seq[MultiAddress]): SwitchBuild
b.addresses = addresses b.addresses = addresses
b b
proc withSignedPeerRecord*(b: SwitchBuilder, sendIt = true): SwitchBuilder =
b.sendSignedPeerRecord = sendIt
b
proc withMplex*(b: SwitchBuilder, inTimeout = 5.minutes, outTimeout = 5.minutes): SwitchBuilder = proc withMplex*(b: SwitchBuilder, inTimeout = 5.minutes, outTimeout = 5.minutes): SwitchBuilder =
proc newMuxer(conn: Connection): Muxer = proc newMuxer(conn: Connection): Muxer =
@ -165,7 +169,7 @@ proc build*(b: SwitchBuilder): Switch
muxers muxers
let let
identify = Identify.new(peerInfo) identify = Identify.new(peerInfo, b.sendSignedPeerRecord)
connManager = ConnManager.new(b.maxConnsPerPeer, b.maxConnections, b.maxIn, b.maxOut) connManager = ConnManager.new(b.maxConnsPerPeer, b.maxConnections, b.maxIn, b.maxOut)
ms = MultistreamSelect.new() ms = MultistreamSelect.new()
muxedUpgrade = MuxedUpgrade.new(identify, muxers, secureManagerInstances, connManager, ms) muxedUpgrade = MuxedUpgrade.new(identify, muxers, secureManagerInstances, connManager, ms)
@ -209,7 +213,8 @@ proc newStandardSwitch*(
maxIn = -1, maxIn = -1,
maxOut = -1, maxOut = -1,
maxConnsPerPeer = MaxConnectionsPerPeer, maxConnsPerPeer = MaxConnectionsPerPeer,
nameResolver: NameResolver = nil): Switch nameResolver: NameResolver = nil,
sendSignedPeerRecord = false): Switch
{.raises: [Defect, LPError].} = {.raises: [Defect, LPError].} =
if SecureProtocol.Secio in secureManagers: if SecureProtocol.Secio in secureManagers:
quit("Secio is deprecated!") # use of secio is unsafe quit("Secio is deprecated!") # use of secio is unsafe
@ -219,6 +224,7 @@ proc newStandardSwitch*(
.new() .new()
.withAddresses(addrs) .withAddresses(addrs)
.withRng(rng) .withRng(rng)
.withSignedPeerRecord(sendSignedPeerRecord)
.withMaxConnections(maxConnections) .withMaxConnections(maxConnections)
.withMaxIn(maxIn) .withMaxIn(maxIn)
.withMaxOut(maxOut) .withMaxOut(maxOut)

View File

@ -11,9 +11,9 @@
import std/[options, sequtils, hashes] import std/[options, sequtils, hashes]
import pkg/[chronos, chronicles, stew/results] import pkg/[chronos, chronicles, stew/results]
import peerid, multiaddress, crypto/crypto, errors import peerid, multiaddress, crypto/crypto, routing_record, errors
export peerid, multiaddress, crypto, errors, results export peerid, multiaddress, crypto, routing_record, errors, results
## Our local peer info ## Our local peer info
@ -28,6 +28,7 @@ type
agentVersion*: string agentVersion*: string
privateKey*: PrivateKey privateKey*: PrivateKey
publicKey*: PublicKey publicKey*: PublicKey
signedPeerRecord*: Option[Envelope]
func shortLog*(p: PeerInfo): auto = func shortLog*(p: PeerInfo): auto =
( (
@ -53,13 +54,25 @@ proc new*(
except CatchableError: except CatchableError:
raise newException(PeerInfoError, "invalid private key") raise newException(PeerInfoError, "invalid private key")
let peerId = PeerID.init(key).tryGet()
let sprRes = SignedPeerRecord.init(
key,
PeerRecord.init(peerId, @addrs)
)
let spr = if sprRes.isOk:
some(sprRes.get().envelope)
else:
none(Envelope)
let peerInfo = PeerInfo( let peerInfo = PeerInfo(
peerId: PeerId.init(key).tryGet(), peerId: peerId,
publicKey: pubkey, publicKey: pubkey,
privateKey: key, privateKey: key,
protoVersion: protoVersion, protoVersion: protoVersion,
agentVersion: agentVersion, agentVersion: agentVersion,
addrs: @addrs, addrs: @addrs,
protocols: @protocols) protocols: @protocols,
signedPeerRecord: spr)
return peerInfo return peerInfo

View File

@ -14,6 +14,7 @@ import
./crypto/crypto, ./crypto/crypto,
./protocols/identify, ./protocols/identify,
./peerid, ./peerinfo, ./peerid, ./peerinfo,
./routing_record,
./multiaddress ./multiaddress
type type
@ -54,6 +55,8 @@ type
agentBook*: PeerBook[string] agentBook*: PeerBook[string]
protoVersionBook*: PeerBook[string] protoVersionBook*: PeerBook[string]
signedPeerRecordBook*: PeerBook[Envelope]
## Constructs a new PeerStore with metadata of type M ## Constructs a new PeerStore with metadata of type M
proc new*(T: type PeerStore): PeerStore = proc new*(T: type PeerStore): PeerStore =
var p: PeerStore var p: PeerStore
@ -160,3 +163,6 @@ proc updatePeerInfo*(
if info.protos.len > 0: if info.protos.len > 0:
peerStore.protoBook.set(info.peerId, info.protos) peerStore.protoBook.set(info.peerId, info.protos)
if info.signedPeerRecord.isSome:
peerStore.signedPeerRecordBook.set(info.peerId, info.signedPeerRecord.get())

View File

@ -9,7 +9,7 @@
{.push raises: [Defect].} {.push raises: [Defect].}
import std/[sequtils, options, strutils] import std/[sequtils, options, strutils, sugar]
import chronos, chronicles import chronos, chronicles
import ../protobuf/minprotobuf, import ../protobuf/minprotobuf,
../peerinfo, ../peerinfo,
@ -44,9 +44,11 @@ type
protoVersion*: Option[string] protoVersion*: Option[string]
agentVersion*: Option[string] agentVersion*: Option[string]
protos*: seq[string] protos*: seq[string]
signedPeerRecord*: Option[Envelope]
Identify* = ref object of LPProtocol Identify* = ref object of LPProtocol
peerInfo*: PeerInfo peerInfo*: PeerInfo
sendSignedPeerRecord*: bool
IdentifyPushHandler* = proc ( IdentifyPushHandler* = proc (
peer: PeerId, peer: PeerId,
@ -57,8 +59,23 @@ type
IdentifyPush* = ref object of LPProtocol IdentifyPush* = ref object of LPProtocol
identifyHandler: IdentifyPushHandler identifyHandler: IdentifyPushHandler
proc encodeMsg*(peerInfo: PeerInfo, observedAddr: MultiAddress): ProtoBuffer chronicles.expandIt(IdentifyInfo):
{.raises: [Defect, IdentifyNoPubKeyError].} = pubkey = ($it.pubkey).shortLog
addresses = it.addrs.map(x => $x).join(",")
protocols = it.protos.map(x => $x).join(",")
observable_address =
if it.observedAddr.isSome(): $it.observedAddr.get()
else: "None"
proto_version = it.protoVersion.get("None")
agent_version = it.agentVersion.get("None")
signedPeerRecord =
# The SPR contains the same data as the identify message
# would be cumbersome to log
if iinfo.signedPeerRecord.isSome(): "Some"
else: "None"
proc encodeMsg(peerInfo: PeerInfo, observedAddr: MultiAddress, sendSpr: bool): ProtoBuffer
{.raises: [Defect].} =
result = initProtoBuffer() result = initProtoBuffer()
let pkey = peerInfo.publicKey let pkey = peerInfo.publicKey
@ -76,6 +93,14 @@ proc encodeMsg*(peerInfo: PeerInfo, observedAddr: MultiAddress): ProtoBuffer
else: else:
peerInfo.agentVersion peerInfo.agentVersion
result.write(6, agentVersion) result.write(6, agentVersion)
## Optionally populate signedPeerRecord field.
## See https://github.com/libp2p/go-libp2p/blob/ddf96ce1cfa9e19564feb9bd3e8269958bbc0aba/p2p/protocol/identify/pb/identify.proto for reference.
if peerInfo.signedPeerRecord.isSome() and sendSpr:
let sprBuff = peerInfo.signedPeerRecord.get().encode()
if sprBuff.isOk():
result.write(8, sprBuff.get())
result.finish() result.finish()
proc decodeMsg*(buf: seq[byte]): Option[IdentifyInfo] = proc decodeMsg*(buf: seq[byte]): Option[IdentifyInfo] =
@ -85,6 +110,7 @@ proc decodeMsg*(buf: seq[byte]): Option[IdentifyInfo] =
oaddr: MultiAddress oaddr: MultiAddress
protoVersion: string protoVersion: string
agentVersion: string agentVersion: string
signedPeerRecord: SignedPeerRecord
var pb = initProtoBuffer(buf) var pb = initProtoBuffer(buf)
@ -95,8 +121,11 @@ proc decodeMsg*(buf: seq[byte]): Option[IdentifyInfo] =
let r5 = pb.getField(5, protoVersion) let r5 = pb.getField(5, protoVersion)
let r6 = pb.getField(6, agentVersion) let r6 = pb.getField(6, agentVersion)
let r8 = pb.getField(8, signedPeerRecord)
let res = r1.isOk() and r2.isOk() and r3.isOk() and let res = r1.isOk() and r2.isOk() and r3.isOk() and
r4.isOk() and r5.isOk() and r6.isOk() r4.isOk() and r5.isOk() and r6.isOk() and
r8.isOk()
if res: if res:
if r1.get(): if r1.get():
@ -107,21 +136,24 @@ proc decodeMsg*(buf: seq[byte]): Option[IdentifyInfo] =
iinfo.protoVersion = some(protoVersion) iinfo.protoVersion = some(protoVersion)
if r6.get(): if r6.get():
iinfo.agentVersion = some(agentVersion) iinfo.agentVersion = some(agentVersion)
debug "decodeMsg: decoded identify", pubkey = ($pubkey).shortLog, if r8.get() and r1.get():
addresses = iinfo.addrs.mapIt($it).join(","), if iinfo.pubkey.get() == signedPeerRecord.envelope.publicKey:
protocols = iinfo.protos.mapIt($it).join(","), iinfo.signedPeerRecord = some(signedPeerRecord.envelope)
observable_address = debug "decodeMsg: decoded identify", iinfo
if iinfo.observedAddr.isSome(): $iinfo.observedAddr.get()
else: "None",
proto_version = iinfo.protoVersion.get("None"),
agent_version = iinfo.agentVersion.get("None")
some(iinfo) some(iinfo)
else: else:
trace "decodeMsg: failed to decode received message" trace "decodeMsg: failed to decode received message"
none[IdentifyInfo]() none[IdentifyInfo]()
proc new*(T: typedesc[Identify], peerInfo: PeerInfo): T = proc new*(
let identify = T(peerInfo: peerInfo) T: typedesc[Identify],
peerInfo: PeerInfo,
sendSignedPeerRecord = false
): T =
let identify = T(
peerInfo: peerInfo,
sendSignedPeerRecord: sendSignedPeerRecord
)
identify.init() identify.init()
identify identify
@ -129,7 +161,7 @@ method init*(p: Identify) =
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
try: try:
trace "handling identify request", conn trace "handling identify request", conn
var pb = encodeMsg(p.peerInfo, conn.observedAddr) var pb = encodeMsg(p.peerInfo, conn.observedAddr, p.sendSignedPeerRecord)
await conn.writeLp(pb.buffer) await conn.writeLp(pb.buffer)
except CancelledError as exc: except CancelledError as exc:
raise exc raise exc
@ -209,5 +241,5 @@ proc init*(p: IdentifyPush) =
p.codec = IdentifyPushCodec p.codec = IdentifyPushCodec
proc push*(p: IdentifyPush, peerInfo: PeerInfo, conn: Connection) {.async.} = proc push*(p: IdentifyPush, peerInfo: PeerInfo, conn: Connection) {.async.} =
var pb = encodeMsg(peerInfo, conn.observedAddr) var pb = encodeMsg(peerInfo, conn.observedAddr, true)
await conn.writeLp(pb.buffer) await conn.writeLp(pb.buffer)

View File

@ -14,7 +14,7 @@ import chronos, chronicles, metrics
import "."/[types, scoring] import "."/[types, scoring]
import ".."/[pubsubpeer, peertable, timedcache, mcache, floodsub, pubsub] import ".."/[pubsubpeer, peertable, timedcache, mcache, floodsub, pubsub]
import "../rpc"/[messages] import "../rpc"/[messages]
import "../../.."/[peerid, multiaddress, utility, switch] import "../../.."/[peerid, multiaddress, utility, switch, routing_record, signed_envelope]
declareGauge(libp2p_gossipsub_cache_window_size, "the number of messages in the cache") declareGauge(libp2p_gossipsub_cache_window_size, "the number of messages in the cache")
declareGauge(libp2p_gossipsub_peers_per_topic_mesh, "gossipsub peers per topic in mesh", labels = ["topic"]) declareGauge(libp2p_gossipsub_peers_per_topic_mesh, "gossipsub peers per topic in mesh", labels = ["topic"])
@ -83,8 +83,16 @@ proc peerExchangeList*(g: GossipSub, topic: string): seq[PeerInfoMsg] {.raises:
x.score >= 0.0 x.score >= 0.0
# by spec, larger then Dhi, but let's put some hard caps # by spec, larger then Dhi, but let's put some hard caps
peers.setLen(min(peers.len, g.parameters.dHigh * 2)) peers.setLen(min(peers.len, g.parameters.dHigh * 2))
let sprBook = g.switch.peerStore.signedPeerRecordBook
peers.map do (x: PubSubPeer) -> PeerInfoMsg: peers.map do (x: PubSubPeer) -> PeerInfoMsg:
PeerInfoMsg(peerId: x.peerId.getBytes()) PeerInfoMsg(
peerId: x.peerId,
signedPeerRecord:
if x.peerId in sprBook:
sprBook.get(x.peerId).encode().get(default(seq[byte]))
else:
default(seq[byte])
)
proc handleGraft*(g: GossipSub, proc handleGraft*(g: GossipSub,
peer: PubSubPeer, peer: PubSubPeer,
@ -165,6 +173,29 @@ proc handleGraft*(g: GossipSub,
return prunes return prunes
proc getPeers(prune: ControlPrune, peer: PubSubPeer): seq[(PeerId, Option[PeerRecord])] =
var routingRecords: seq[(PeerId, Option[PeerRecord])]
for record in prune.peers:
let peerRecord =
if record.signedPeerRecord.len == 0:
none(PeerRecord)
else:
let signedRecord = SignedPeerRecord.decode(record.signedPeerRecord)
if signedRecord.isErr:
trace "peer sent invalid SPR", peer, error=signedRecord.error
none(PeerRecord)
else:
if record.peerID != signedRecord.get().data.peerId:
trace "peer sent envelope with wrong public key", peer
none(PeerRecord)
else:
some(signedRecord.get().data)
routingRecords.add((record.peerId, peerRecord))
routingRecords
proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) {.raises: [Defect].} = proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) {.raises: [Defect].} =
for prune in prunes: for prune in prunes:
let topic = prune.topicID let topic = prune.topicID
@ -190,9 +221,12 @@ proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) {.r
g.pruned(peer, topic, setBackoff = false) g.pruned(peer, topic, setBackoff = false)
g.mesh.removePeer(topic, peer) g.mesh.removePeer(topic, peer)
# TODO peer exchange, we miss ambient peer discovery in libp2p, so we are blocked by that if peer.score > g.parameters.gossipThreshold and prune.peers.len > 0 and
# another option could be to implement signed peer records g.routingRecordsHandler.len > 0:
## if peer.score > g.parameters.gossipThreshold and prunes.peers.len > 0: let routingRecords = prune.getPeers(peer)
for handler in g.routingRecordsHandler:
handler(peer.peerId, topic, routingRecords)
proc handleIHave*(g: GossipSub, proc handleIHave*(g: GossipSub,
peer: PubSubPeer, peer: PubSubPeer,

View File

@ -142,6 +142,13 @@ type
BackoffTable* = Table[string, Table[PeerId, Moment]] BackoffTable* = Table[string, Table[PeerId, Moment]]
ValidationSeenTable* = Table[MessageID, HashSet[PubSubPeer]] ValidationSeenTable* = Table[MessageID, HashSet[PubSubPeer]]
RoutingRecordsPair* = tuple[id: PeerId, record: Option[PeerRecord]]
RoutingRecordsHandler* =
proc(peer: PeerId,
tag: string, # For gossipsub, the topic
peers: seq[RoutingRecordsPair])
{.gcsafe, raises: [Defect].}
GossipSub* = ref object of FloodSub GossipSub* = ref object of FloodSub
mesh*: PeerTable # peers that we send messages to when we are subscribed to the topic mesh*: PeerTable # peers that we send messages to when we are subscribed to the topic
fanout*: PeerTable # peers that we send messages to when we're not subscribed to the topic fanout*: PeerTable # peers that we send messages to when we're not subscribed to the topic
@ -161,6 +168,7 @@ type
topicParams*: Table[string, TopicParams] topicParams*: Table[string, TopicParams]
directPeersLoop*: Future[void] directPeersLoop*: Future[void]
peersInIP*: Table[MultiAddress, HashSet[PeerId]] peersInIP*: Table[MultiAddress, HashSet[PeerId]]
routingRecordsHandler*: seq[RoutingRecordsHandler] # Callback for peer exchange
heartbeatEvents*: seq[AsyncEvent] heartbeatEvents*: seq[AsyncEvent]

View File

@ -10,14 +10,17 @@
{.push raises: [Defect].} {.push raises: [Defect].}
import options, sequtils import options, sequtils
import ../../../utility import "../../.."/[
import ../../../peerid peerid,
routing_record,
utility
]
export options export options
type type
PeerInfoMsg* = object PeerInfoMsg* = object
peerId*: seq[byte] peerId*: PeerId
signedPeerRecord*: seq[byte] signedPeerRecord*: seq[byte]
SubOpts* = object SubOpts* = object

View File

@ -11,7 +11,7 @@
{.push raises: [Defect].} {.push raises: [Defect].}
import std/[sequtils, times] import std/[sequtils, times, sugar]
import pkg/stew/[results, byteutils] import pkg/stew/[results, byteutils]
import import
multiaddress, multiaddress,
@ -22,11 +22,6 @@ import
export peerid, multiaddress, signed_envelope export peerid, multiaddress, signed_envelope
## Constants relating to signed peer records
const
EnvelopeDomain = multiCodec("libp2p-peer-record") # envelope domain as per RFC0002
EnvelopePayloadType= @[(byte) 0x03, (byte) 0x01] # payload_type for routing records as spec'ed in RFC0003
type type
AddressInfo* = object AddressInfo* = object
address*: MultiAddress address*: MultiAddress
@ -76,8 +71,9 @@ proc encode*(record: PeerRecord): seq[byte] =
proc init*(T: typedesc[PeerRecord], proc init*(T: typedesc[PeerRecord],
peerId: PeerId, peerId: PeerId,
seqNo: uint64, addresses: seq[MultiAddress],
addresses: seq[MultiAddress]): T = seqNo = getTime().toUnix().uint64 # follows the recommended implementation, using unix epoch as seq no.
): T =
PeerRecord( PeerRecord(
peerId: peerId, peerId: peerId,
@ -87,39 +83,13 @@ proc init*(T: typedesc[PeerRecord],
## Functions related to signed peer records ## Functions related to signed peer records
type SignedPeerRecord* = SignedPayload[PeerRecord]
proc init*(T: typedesc[Envelope], proc payloadDomain*(T: typedesc[PeerRecord]): string = $multiCodec("libp2p-peer-record")
privateKey: PrivateKey, proc payloadType*(T: typedesc[PeerRecord]): seq[byte] = @[(byte) 0x03, (byte) 0x01]
peerRecord: PeerRecord): Result[Envelope, CryptoError] =
## Init a signed envelope wrapping a peer record proc checkValid*(spr: SignedPeerRecord): Result[void, EnvelopeError] =
if not spr.data.peerId.match(spr.envelope.publicKey):
let envelope = ? Envelope.init(privateKey, err(EnvelopeInvalidSignature)
EnvelopePayloadType, else:
peerRecord.encode(), ok()
$EnvelopeDomain)
ok(envelope)
proc init*(T: typedesc[Envelope],
peerId: PeerId,
addresses: seq[MultiAddress],
privateKey: PrivateKey): Result[Envelope, CryptoError] =
## Creates a signed peer record for this peer:
## a peer routing record according to https://github.com/libp2p/specs/blob/500a7906dd7dd8f64e0af38de010ef7551fd61b6/RFC/0003-routing-records.md
## in a signed envelope according to https://github.com/libp2p/specs/blob/500a7906dd7dd8f64e0af38de010ef7551fd61b6/RFC/0002-signed-envelopes.md
# First create a peer record from the peer info
let peerRecord = PeerRecord.init(peerId,
getTime().toUnix().uint64, # This currently follows the recommended implementation, using unix epoch as seq no.
addresses)
let envelope = ? Envelope.init(privateKey,
peerRecord)
ok(envelope)
proc getSignedPeerRecord*(pb: ProtoBuffer, field: int,
value: var Envelope): ProtoResult[bool] {.
inline.} =
getField(pb, field, value, $EnvelopeDomain)

View File

@ -11,6 +11,7 @@
{.push raises: [Defect].} {.push raises: [Defect].}
import std/sugar
import pkg/stew/[results, byteutils] import pkg/stew/[results, byteutils]
import multicodec, import multicodec,
crypto/crypto, crypto/crypto,
@ -23,7 +24,8 @@ type
EnvelopeError* = enum EnvelopeError* = enum
EnvelopeInvalidProtobuf, EnvelopeInvalidProtobuf,
EnvelopeFieldMissing, EnvelopeFieldMissing,
EnvelopeInvalidSignature EnvelopeInvalidSignature,
EnvelopeWrongType
Envelope* = object Envelope* = object
publicKey*: PublicKey publicKey*: PublicKey
@ -116,3 +118,52 @@ proc getField*(pb: ProtoBuffer, field: int,
ok(true) ok(true)
else: else:
err(ProtoError.IncorrectBlob) err(ProtoError.IncorrectBlob)
type
SignedPayload*[T] = object
# T needs to have .encode(), .decode(), .payloadType(), .domain()
envelope*: Envelope
data*: T
proc init*[T](_: typedesc[SignedPayload[T]],
privateKey: PrivateKey,
data: T): Result[SignedPayload[T], CryptoError] =
mixin encode
let envelope = ? Envelope.init(privateKey,
T.payloadType(),
data.encode(),
T.payloadDomain)
ok(SignedPayload[T](data: data, envelope: envelope))
proc getField*[T](pb: ProtoBuffer, field: int,
value: var SignedPayload[T]): ProtoResult[bool] {.
inline.} =
if not ? getField(pb, field, value.envelope, T.payloadDomain):
ok(false)
else:
mixin decode
value.data = ? T.decode(value.envelope.payload).mapErr(x => ProtoError.IncorrectBlob)
ok(true)
proc decode*[T](
_: typedesc[SignedPayload[T]],
buffer: seq[byte]
): Result[SignedPayload[T], EnvelopeError] =
let
envelope = ? Envelope.decode(buffer, T.payloadDomain)
data = ? T.decode(envelope.payload).mapErr(x => EnvelopeInvalidProtobuf)
signedPayload = SignedPayload[T](envelope: envelope, data: data)
if envelope.payloadType != T.payloadType:
return err(EnvelopeWrongType)
when compiles(? signedPayload.checkValid()):
? signedPayload.checkValid()
ok(signedPayload)
proc encode*[T](msg: SignedPayload[T]): Result[seq[byte], CryptoError] =
msg.envelope.encode()

View File

@ -932,3 +932,75 @@ suite "GossipSub":
it.switch.stop()))) it.switch.stop())))
await allFuturesThrowing(nodesFut) await allFuturesThrowing(nodesFut)
asyncTest "e2e - GossipSub peer exchange":
# A, B & C are subscribed to something
# B unsubcribe from it, it should send
# PX to A & C
#
# C sent his SPR, not A
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
discard # not used in this test
let
nodes = generateNodes(
2,
gossip = true) &
generateNodes(1, gossip = true, sendSignedPeerRecord = true)
# start switches
nodesFut = await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start(),
nodes[2].switch.start(),
)
# start pubsub
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
nodes[2].start(),
))
var
gossip0 = GossipSub(nodes[0])
gossip1 = GossipSub(nodes[1])
gossip2 = GossipSub(nodes[1])
await subscribeNodes(nodes)
nodes[0].subscribe("foobar", handler)
nodes[1].subscribe("foobar", handler)
nodes[2].subscribe("foobar", handler)
for x in 0..<3:
for y in 0..<3:
if x != y:
await waitSub(nodes[x], nodes[y], "foobar")
var passed: Future[void] = newFuture[void]()
gossip0.routingRecordsHandler.add(proc(peer: PeerId, tag: string, peers: seq[RoutingRecordsPair]) =
check:
tag == "foobar"
peers.len == 2
peers[0].record.isSome() xor peers[1].record.isSome()
passed.complete()
)
nodes[1].unsubscribe("foobar", handler)
await passed
await allFuturesThrowing(
nodes[0].switch.stop(),
nodes[1].switch.stop(),
nodes[2].switch.stop()
)
await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop(),
nodes[2].stop()
)
await allFuturesThrowing(nodesFut.concat())

View File

@ -40,11 +40,12 @@ proc generateNodes*(
verifySignature: bool = libp2p_pubsub_verify, verifySignature: bool = libp2p_pubsub_verify,
anonymize: bool = libp2p_pubsub_anonymize, anonymize: bool = libp2p_pubsub_anonymize,
sign: bool = libp2p_pubsub_sign, sign: bool = libp2p_pubsub_sign,
sendSignedPeerRecord = false,
unsubscribeBackoff = 1.seconds, unsubscribeBackoff = 1.seconds,
maxMessageSize: int = 1024 * 1024): seq[PubSub] = maxMessageSize: int = 1024 * 1024): seq[PubSub] =
for i in 0..<num: for i in 0..<num:
let switch = newStandardSwitch(secureManagers = secureManagers) let switch = newStandardSwitch(secureManagers = secureManagers, sendSignedPeerRecord = sendSignedPeerRecord)
let pubsub = if gossip: let pubsub = if gossip:
let g = GossipSub.init( let g = GossipSub.init(
switch = switch, switch = switch,

View File

@ -77,6 +77,7 @@ suite "Identify":
check id.protoVersion.get() == ProtoVersion check id.protoVersion.get() == ProtoVersion
check id.agentVersion.get() == AgentVersion check id.agentVersion.get() == AgentVersion
check id.protos == @["/test/proto1/1.0.0", "/test/proto2/1.0.0"] check id.protos == @["/test/proto1/1.0.0", "/test/proto2/1.0.0"]
check id.signedPeerRecord.isNone()
asyncTest "custom agent version": asyncTest "custom agent version":
const customAgentVersion = "MY CUSTOM AGENT STRING" const customAgentVersion = "MY CUSTOM AGENT STRING"
@ -100,6 +101,7 @@ suite "Identify":
check id.protoVersion.get() == ProtoVersion check id.protoVersion.get() == ProtoVersion
check id.agentVersion.get() == customAgentVersion check id.agentVersion.get() == customAgentVersion
check id.protos == @["/test/proto1/1.0.0", "/test/proto2/1.0.0"] check id.protos == @["/test/proto1/1.0.0", "/test/proto2/1.0.0"]
check id.signedPeerRecord.isNone()
asyncTest "handle failed identify": asyncTest "handle failed identify":
msListen.addHandler(IdentifyCodec, identifyProto1) msListen.addHandler(IdentifyCodec, identifyProto1)
@ -123,6 +125,27 @@ suite "Identify":
discard await msDial.select(conn, IdentifyCodec) discard await msDial.select(conn, IdentifyCodec)
discard await identifyProto2.identify(conn, pi2.peerId) discard await identifyProto2.identify(conn, pi2.peerId)
asyncTest "can send signed peer record":
msListen.addHandler(IdentifyCodec, identifyProto1)
identifyProto1.sendSignedPeerRecord = true
serverFut = transport1.start(ma)
proc acceptHandler(): Future[void] {.async, gcsafe.} =
let c = await transport1.accept()
await msListen.handle(c)
acceptFut = acceptHandler()
conn = await transport2.dial(transport1.addrs[0])
discard await msDial.select(conn, IdentifyCodec)
let id = await identifyProto2.identify(conn, remotePeerInfo.peerId)
check id.pubkey.get() == remoteSecKey.getPublicKey().get()
check id.addrs == ma
check id.protoVersion.get() == ProtoVersion
check id.agentVersion.get() == AgentVersion
check id.protos == @["/test/proto1/1.0.0", "/test/proto2/1.0.0"]
check id.signedPeerRecord.get() == remotePeerInfo.signedPeerRecord.get()
suite "handle push identify message": suite "handle push identify message":
var var
switch1 {.threadvar.}: Switch switch1 {.threadvar.}: Switch
@ -160,6 +183,10 @@ suite "Identify":
switch1.peerStore.addressBook.get(switch2.peerInfo.peerId) == switch2.peerInfo.addrs.toHashSet() switch1.peerStore.addressBook.get(switch2.peerInfo.peerId) == switch2.peerInfo.addrs.toHashSet()
switch2.peerStore.addressBook.get(switch1.peerInfo.peerId) == switch1.peerInfo.addrs.toHashSet() switch2.peerStore.addressBook.get(switch1.peerInfo.peerId) == switch1.peerInfo.addrs.toHashSet()
#switch1.peerStore.signedPeerRecordBook.get(switch2.peerInfo.peerId) == switch2.peerInfo.signedPeerRecord.get()
#switch2.peerStore.signedPeerRecordBook.get(switch1.peerInfo.peerId) == switch1.peerInfo.signedPeerRecord.get()
# no longer sent by default
proc closeAll() {.async.} = proc closeAll() {.async.} =
await conn.close() await conn.close()

View File

@ -1,10 +1,12 @@
{.used.} {.used.}
import options, bearssl import options, bearssl
import chronos import chronos, stew/byteutils
import ../libp2p/crypto/crypto, import ../libp2p/crypto/crypto,
../libp2p/multicodec,
../libp2p/peerinfo, ../libp2p/peerinfo,
../libp2p/peerid ../libp2p/peerid,
../libp2p/routing_record
import ./helpers import ./helpers
@ -16,3 +18,32 @@ suite "PeerInfo":
check peerId == peerInfo.peerId check peerId == peerInfo.peerId
check seckey.getPublicKey().get() == peerInfo.publicKey check seckey.getPublicKey().get() == peerInfo.publicKey
test "Signed peer record":
const
ExpectedDomain = $multiCodec("libp2p-peer-record")
ExpectedPayloadType = @[(byte) 0x03, (byte) 0x01]
let
seckey = PrivateKey.random(rng[]).tryGet()
peerId = PeerID.init(seckey).get()
multiAddresses = @[MultiAddress.init("/ip4/0.0.0.0/tcp/24").tryGet(), MultiAddress.init("/ip4/0.0.0.0/tcp/25").tryGet()]
peerInfo = PeerInfo.new(seckey, multiAddresses)
let
env = peerInfo.signedPeerRecord.get()
rec = PeerRecord.decode(env.payload()).tryGet()
# Check envelope fields
check:
env.publicKey == peerInfo.publicKey
env.domain == ExpectedDomain
env.payloadType == ExpectedPayloadType
# Check payload (routing record)
check:
rec.peerId == peerId
rec.seqNo > 0
rec.addresses.len == 2
rec.addresses[0].address == multiAddresses[0]
rec.addresses[1].address == multiAddresses[1]

View File

@ -9,7 +9,7 @@ suite "Routing record":
privKey = PrivateKey.random(rng[]).tryGet() privKey = PrivateKey.random(rng[]).tryGet()
peerId = PeerId.init(privKey).tryGet() peerId = PeerId.init(privKey).tryGet()
multiAddresses = @[MultiAddress.init("/ip4/0.0.0.0/tcp/24").tryGet(), MultiAddress.init("/ip4/0.0.0.0/tcp/25").tryGet()] multiAddresses = @[MultiAddress.init("/ip4/0.0.0.0/tcp/24").tryGet(), MultiAddress.init("/ip4/0.0.0.0/tcp/25").tryGet()]
routingRecord = PeerRecord.init(peerId, 42, multiAddresses) routingRecord = PeerRecord.init(peerId, multiAddresses, 42)
buffer = routingRecord.encode() buffer = routingRecord.encode()
@ -36,3 +36,33 @@ suite "Routing record":
$decodedRecord.addresses[0].address == "/ip4/1.2.3.4/tcp/0" $decodedRecord.addresses[0].address == "/ip4/1.2.3.4/tcp/0"
$decodedRecord.addresses[1].address == "/ip4/1.2.3.4/tcp/1" $decodedRecord.addresses[1].address == "/ip4/1.2.3.4/tcp/1"
suite "Signed Routing Record":
test "Encode -> decode test":
let
rng = newRng()
privKey = PrivateKey.random(rng[]).tryGet()
peerId = PeerId.init(privKey).tryGet()
multiAddresses = @[MultiAddress.init("/ip4/0.0.0.0/tcp/24").tryGet(), MultiAddress.init("/ip4/0.0.0.0/tcp/25").tryGet()]
routingRecord = SignedPeerRecord.init(privKey, PeerRecord.init(peerId, multiAddresses, 42)).tryGet()
buffer = routingRecord.envelope.encode().tryGet()
parsedRR = SignedPeerRecord.decode(buffer).tryGet().data
check:
parsedRR.peerId == peerId
parsedRR.seqNo == 42
parsedRR.addresses.len == 2
parsedRR.addresses[0].address == multiAddresses[0]
parsedRR.addresses[1].address == multiAddresses[1]
test "Can't use mismatched public key":
let
rng = newRng()
privKey = PrivateKey.random(rng[]).tryGet()
privKey2 = PrivateKey.random(rng[]).tryGet()
peerId = PeerId.init(privKey).tryGet()
multiAddresses = @[MultiAddress.init("/ip4/0.0.0.0/tcp/24").tryGet(), MultiAddress.init("/ip4/0.0.0.0/tcp/25").tryGet()]
routingRecord = SignedPeerRecord.init(privKey2, PeerRecord.init(peerId, multiAddresses, 42)).tryGet()
buffer = routingRecord.envelope.encode().tryGet()
check SignedPeerRecord.decode(buffer).error == EnvelopeInvalidSignature

View File

@ -3,7 +3,7 @@ import stew/byteutils
import ../libp2p/[signed_envelope] import ../libp2p/[signed_envelope]
suite "Signed envelope": suite "Signed envelope":
test "Encode -> decode test": test "Encode -> decode -> encode -> decode test":
let let
rng = newRng() rng = newRng()
privKey = PrivateKey.random(rng[]).tryGet() privKey = PrivateKey.random(rng[]).tryGet()
@ -12,10 +12,16 @@ suite "Signed envelope":
decodedEnvelope = Envelope.decode(buffer, "domain").tryGet() decodedEnvelope = Envelope.decode(buffer, "domain").tryGet()
wrongDomain = Envelope.decode(buffer, "wdomain") wrongDomain = Envelope.decode(buffer, "wdomain")
reencodedEnvelope = decodedEnvelope.encode().tryGet()
redecodedEnvelope = Envelope.decode(reencodedEnvelope, "domain").tryGet()
check: check:
decodedEnvelope == envelope decodedEnvelope == envelope
wrongDomain.error == EnvelopeInvalidSignature wrongDomain.error == EnvelopeInvalidSignature
reencodedEnvelope == buffer
redecodedEnvelope == envelope
test "Interop decode test": test "Interop decode test":
# from https://github.com/libp2p/go-libp2p-core/blob/b18a4c9c5629870bde2cd85ab3b87a507600d411/record/envelope_test.go#L68 # from https://github.com/libp2p/go-libp2p-core/blob/b18a4c9c5629870bde2cd85ab3b87a507600d411/record/envelope_test.go#L68
let inputData = "0a24080112206f1581709bb7b1ef030d210db18e3b0ba1c776fba65d8cdaad05415142d189f812102f6c69627032702f74657374646174611a0c68656c6c6f20776f726c64212a401178673b51dfa842aad17e465e25d646ad16628916b964c3fb10c711fee87872bdd4e4646f58c277cdff09704913d8be1aec6322de8d3d0bb852120374aece08".hexToSeqByte() let inputData = "0a24080112206f1581709bb7b1ef030d210db18e3b0ba1c776fba65d8cdaad05415142d189f812102f6c69627032702f74657374646174611a0c68656c6c6f20776f726c64212a401178673b51dfa842aad17e465e25d646ad16628916b964c3fb10c711fee87872bdd4e4646f58c277cdff09704913d8be1aec6322de8d3d0bb852120374aece08".hexToSeqByte()
@ -28,3 +34,56 @@ suite "Signed envelope":
# same as above, but payload altered # same as above, but payload altered
let inputData = "0a24080112206f1581709bb7b1ef030d210db18e3b0ba1c776fba65d8cdaad05415142d189f812102f6c69627032702f74657374646174611a0c00006c6c6f20776f726c64212a401178673b51dfa842aad17e465e25d646ad16628916b964c3fb10c711fee87872bdd4e4646f58c277cdff09704913d8be1aec6322de8d3d0bb852120374aece08".hexToSeqByte() let inputData = "0a24080112206f1581709bb7b1ef030d210db18e3b0ba1c776fba65d8cdaad05415142d189f812102f6c69627032702f74657374646174611a0c00006c6c6f20776f726c64212a401178673b51dfa842aad17e465e25d646ad16628916b964c3fb10c711fee87872bdd4e4646f58c277cdff09704913d8be1aec6322de8d3d0bb852120374aece08".hexToSeqByte()
check Envelope.decode(inputData, "libp2p-testing").error == EnvelopeInvalidSignature check Envelope.decode(inputData, "libp2p-testing").error == EnvelopeInvalidSignature
# needs to be exported to work
type
DummyPayload* = object
awesome: byte
SignedDummy = SignedPayload[DummyPayload]
proc decode*(T: typedesc[DummyPayload], buffer: seq[byte]): Result[DummyPayload, cstring] =
ok(DummyPayload(awesome: buffer[0]))
proc encode*(pd: DummyPayload): seq[byte] =
@[pd.awesome]
proc checkValid*(pd: SignedDummy): Result[void, EnvelopeError] =
if pd.data.awesome == 12.byte: ok()
else: err(EnvelopeInvalidSignature)
proc payloadDomain*(T: typedesc[DummyPayload]): string = "dummy"
proc payloadType*(T: typedesc[DummyPayload]): seq[byte] = @[(byte) 0x00, (byte) 0x00]
suite "Signed payload":
test "Simple encode -> decode":
let
rng = newRng()
privKey = PrivateKey.random(rng[]).tryGet()
dummyPayload = DummyPayload(awesome: 12.byte)
signed = SignedDummy.init(privKey, dummyPayload).tryGet()
encoded = signed.encode().tryGet()
decoded = SignedDummy.decode(encoded).tryGet()
check:
dummyPayload.awesome == decoded.data.awesome
decoded.envelope.publicKey == privKey.getPublicKey().tryGet()
test "Invalid payload":
let
rng = newRng()
privKey = PrivateKey.random(rng[]).tryGet()
dummyPayload = DummyPayload(awesome: 30.byte)
signed = SignedDummy.init(privKey, dummyPayload).tryGet()
encoded = signed.encode().tryGet()
check SignedDummy.decode(encoded).error == EnvelopeInvalidSignature
test "Invalid payload type":
let
rng = newRng()
privKey = PrivateKey.random(rng[]).tryGet()
dummyPayload = DummyPayload(awesome: 30.byte)
signed = Envelope.init(privKey, @[55.byte], dummyPayload.encode(), DummyPayload.payloadDomain).tryGet()
encoded = signed.encode().tryGet()
check SignedDummy.decode(encoded).error == EnvelopeWrongType