extract public and private keys fields from peerid (#44)

* extract public and private keys fields from peerid

* allow assigning a public key

* cleaned up TODOs

* make pubsub prefix a const

* public key should be an `Option`
This commit is contained in:
Dmitriy Ryajov 2019-12-07 10:36:39 -06:00 committed by GitHub
parent e623e70e7b
commit 5f6fcc3d90
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 324 additions and 300 deletions

View File

@ -7,7 +7,7 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
import chronos, chronicles import chronos, chronicles, options
import peerinfo, import peerinfo,
multiaddress, multiaddress,
stream/lpstream, stream/lpstream,
@ -19,7 +19,7 @@ const DefaultReadSize*: uint = 64 * 1024
type type
Connection* = ref object of LPStream Connection* = ref object of LPStream
peerInfo*: PeerInfo peerInfo*: Option[PeerInfo]
stream*: LPStream stream*: LPStream
observedAddrs*: Multiaddress observedAddrs*: Multiaddress

View File

@ -21,17 +21,9 @@ const
type type
PeerID* = object PeerID* = object
data*: seq[byte] data*: seq[byte]
privateKey*: Option[PrivateKey]
publicKey: Option[PublicKey]
PeerIDError* = object of CatchableError PeerIDError* = object of CatchableError
proc publicKey*(pid: PeerID): Option[PublicKey] {.inline.} =
if pid.publicKey.isSome and len(pid.publicKey.get().getBytes()) > 0:
result = pid.publicKey
elif pid.privateKey.isSome and len(pid.privateKey.get().getBytes()) > 0:
result = some(pid.privateKey.get().getKey())
proc pretty*(pid: PeerID): string {.inline.} = proc pretty*(pid: PeerID): string {.inline.} =
## Return base58 encoded ``pid`` representation. ## Return base58 encoded ``pid`` representation.
result = Base58.encode(pid.data) result = Base58.encode(pid.data)
@ -170,12 +162,10 @@ proc init*(t: typedesc[PeerID], pubkey: PublicKey): PeerID =
else: else:
mh = MultiHash.digest("sha2-256", pubraw) mh = MultiHash.digest("sha2-256", pubraw)
result.data = mh.data.buffer result.data = mh.data.buffer
result.publicKey = some(pubkey)
proc init*(t: typedesc[PeerID], seckey: PrivateKey): PeerID {.inline.} = proc init*(t: typedesc[PeerID], seckey: PrivateKey): PeerID {.inline.} =
## Create new peer id from private key ``seckey``. ## Create new peer id from private key ``seckey``.
result = PeerID.init(seckey.getKey()) result = PeerID.init(seckey.getKey())
result.privateKey = some(seckey)
proc match*(pid: PeerID, pubkey: PublicKey): bool {.inline.} = proc match*(pid: PeerID, pubkey: PublicKey): bool {.inline.} =
## Returns ``true`` if ``pid`` matches public key ``pubkey``. ## Returns ``true`` if ``pid`` matches public key ``pubkey``.

View File

@ -8,22 +8,92 @@
## those terms. ## those terms.
import options import options
import peer, multiaddress import peer, multiaddress, crypto/crypto
type ## A peer can be constructed in one of tree ways:
PeerInfo* = object of RootObj ## 1) A local peer with a private key
peerId*: Option[PeerID] ## 2) A remote peer with a PeerID and it's public key stored
## in the ``id`` itself
## 3) A remote peer with a standalone public key, that isn't
## encoded in the ``id``
##
type
KeyType* = enum
HasPrivate,
HasPublic
InvalidPublicKeyException* = object of Exception
PeerInfo* = ref object of RootObj
peerId*: PeerID
addrs*: seq[MultiAddress] addrs*: seq[MultiAddress]
protocols*: seq[string] protocols*: seq[string]
case keyType*: KeyType:
of HasPrivate:
privateKey*: PrivateKey
of HasPublic:
key: Option[PublicKey]
proc id*(p: PeerInfo): string = proc newInvalidPublicKeyException(): ref Exception =
if p.peerId.isSome: newException(InvalidPublicKeyException,
result = p.peerId.get().pretty "attempting to assign an invalid public key")
proc init*(p: typedesc[PeerInfo],
key: PrivateKey,
addrs: seq[MultiAddress] = @[],
protocols: seq[string] = @[]): PeerInfo {.inline.} =
result = PeerInfo(keyType: HasPrivate,
peerId: PeerID.init(key),
privateKey: key,
addrs: addrs,
protocols: protocols)
proc init*(p: typedesc[PeerInfo],
peerId: PeerID,
addrs: seq[MultiAddress] = @[],
protocols: seq[string] = @[]): PeerInfo {.inline.} =
PeerInfo(keyType: HasPublic,
peerId: peerId,
addrs: addrs,
protocols: protocols)
proc init*(p: typedesc[PeerInfo],
key: PublicKey,
addrs: seq[MultiAddress] = @[],
protocols: seq[string] = @[]): PeerInfo {.inline.} =
PeerInfo(keyType: HasPublic,
peerId: PeerID.init(key),
key: some(key),
addrs: addrs,
protocols: protocols)
proc publicKey*(p: PeerInfo): Option[PublicKey] {.inline.} =
if p.keyType == HasPublic:
if p.peerId.hasPublicKey():
var pubKey: PublicKey
if p.peerId.extractPublicKey(pubKey):
result = some(pubKey)
elif p.key.isSome:
result = p.key
else:
result = some(p.privateKey.getKey())
proc `publicKey=`*(p: PeerInfo, key: PublicKey) =
if not (PeerID.init(key) == p.peerId):
raise newInvalidPublicKeyException()
p.key = some(key)
proc id*(p: PeerInfo): string {.inline.} =
p.peerId.pretty
proc `$`*(p: PeerInfo): string = proc `$`*(p: PeerInfo): string =
if p.peerId.isSome: result.add("PeerID: ")
result.add("PeerID: ") result.add(p.id & "\n")
result.add(p.id & "\n")
if p.addrs.len > 0: if p.addrs.len > 0:
result.add("Peer Addrs: ") result.add("Peer Addrs: ")

View File

@ -46,8 +46,7 @@ type
proc encodeMsg*(peerInfo: PeerInfo, observedAddrs: Multiaddress): ProtoBuffer = proc encodeMsg*(peerInfo: PeerInfo, observedAddrs: Multiaddress): ProtoBuffer =
result = initProtoBuffer() result = initProtoBuffer()
if peerInfo.peerId.isSome: result.write(initProtoField(1, peerInfo.publicKey.get().getBytes()))
result.write(initProtoField(1, peerInfo.peerId.get().publicKey.get().getBytes()))
for ma in peerInfo.addrs: for ma in peerInfo.addrs:
result.write(initProtoField(2, ma.data.buffer)) result.write(initProtoField(2, ma.data.buffer))
@ -122,7 +121,7 @@ method init*(p: Identify) =
proc identify*(p: Identify, proc identify*(p: Identify,
conn: Connection, conn: Connection,
remotePeerInfo: PeerInfo): Future[IdentifyInfo] {.async, gcsafe.} = remotePeerInfo: Option[PeerInfo]): Future[IdentifyInfo] {.async, gcsafe.} =
var message = await conn.readLp() var message = await conn.readLp()
if len(message) == 0: if len(message) == 0:
trace "identify: Invalid or empty message received!" trace "identify: Invalid or empty message received!"
@ -131,15 +130,16 @@ proc identify*(p: Identify,
result = decodeMsg(message) result = decodeMsg(message)
if remotePeerInfo.peerId.isSome and result.pubKey.isSome: if remotePeerInfo.isSome and result.pubKey.isSome:
let peer = PeerID.init(result.pubKey.get()) let peer = PeerID.init(result.pubKey.get())
# do a string comaprison of the ids, # do a string comaprison of the ids,
# because that is the only thing we have in most cases # because that is the only thing we
if peer != remotePeerInfo.peerId.get(): # have in most cases
if peer != remotePeerInfo.get().peerId:
trace "Peer ids don't match", trace "Peer ids don't match",
remote = peer.pretty(), remote = peer.pretty(),
local = remotePeerInfo.id local = remotePeerInfo.get().id
raise newException(IdentityNoMatchError, raise newException(IdentityNoMatchError,
"Peer ids don't match") "Peer ids don't match")

View File

@ -104,7 +104,7 @@ method publish*(f: FloodSub,
return return
trace "publishing on topic", name = topic trace "publishing on topic", name = topic
let msg = newMessage(f.peerInfo.peerId.get(), data, topic) let msg = newMessage(f.peerInfo, data, topic)
for p in f.floodsub[topic]: for p in f.floodsub[topic]:
trace "publishing message", name = topic, peer = p, data = data trace "publishing message", name = topic, peer = p, data = data
await f.peers[p].send(@[RPCMsg(messages: @[msg])]) await f.peers[p].send(@[RPCMsg(messages: @[msg])])

View File

@ -179,7 +179,7 @@ method rpcHandler(g: GossipSub,
g.seen.put(msg.msgId) # add the message to the seen cache g.seen.put(msg.msgId) # add the message to the seen cache
# this shouldn't happen # this shouldn't happen
if g.peerInfo.peerId.get() == msg.fromPeerId(): if g.peerInfo.peerId == msg.fromPeerId():
trace "skipping messages from self", msg = msg.msgId trace "skipping messages from self", msg = msg.msgId
continue continue
@ -195,7 +195,7 @@ method rpcHandler(g: GossipSub,
for h in g.topics[t].handler: for h in g.topics[t].handler:
trace "calling handler for message", msg = msg.msgId, trace "calling handler for message", msg = msg.msgId,
topicId = t, topicId = t,
localPeer = g.peerInfo.peerId.get().pretty, localPeer = g.peerInfo.id,
fromPeer = msg.fromPeerId().pretty fromPeer = msg.fromPeerId().pretty
await h(t, msg.data) # trigger user provided handler await h(t, msg.data) # trigger user provided handler
@ -203,7 +203,7 @@ method rpcHandler(g: GossipSub,
for p in toSendPeers: for p in toSendPeers:
if p in g.peers and if p in g.peers and
g.peers[p].peerInfo.peerId != peer.peerInfo.peerId: g.peers[p].peerInfo.peerId != peer.peerInfo.peerId:
let id = g.peers[p].peerInfo.peerId.get() let id = g.peers[p].peerInfo.peerId
let msgs = m.messages.filterIt( let msgs = m.messages.filterIt(
# don't forward to message originator # don't forward to message originator
id != it.fromPeerId() id != it.fromPeerId()
@ -384,9 +384,9 @@ method publish*(g: GossipSub,
# set the fanout expiery time # set the fanout expiery time
g.lastFanoutPubSub[topic] = Moment.fromNow(GossipSubFanoutTTL) g.lastFanoutPubSub[topic] = Moment.fromNow(GossipSubFanoutTTL)
let msg = newMessage(g.peerInfo.peerId.get(), data, topic) let msg = newMessage(g.peerInfo, data, topic)
for p in peers: for p in peers:
if p == g.peerInfo.peerId.get().pretty: if p == g.peerInfo.id:
continue continue
trace "publishing on topic", name = topic trace "publishing on topic", name = topic
@ -444,11 +444,8 @@ when isMainModule and not defined(release):
suite "GossipSub": suite "GossipSub":
test "`rebalanceMesh` Degree Lo": test "`rebalanceMesh` Degree Lo":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
var peerInfo: PeerInfo let gossipSub = newPubSub(TestGossipSub,
var seckey = some(PrivateKey.random(RSA)) PeerInfo.init(PrivateKey.random(RSA)))
peerInfo.peerId = some(PeerID.init(seckey.get()))
let gossipSub = newPubSub(TestGossipSub, peerInfo)
let topic = "foobar" let topic = "foobar"
gossipSub.mesh[topic] = initHashSet[string]() gossipSub.mesh[topic] = initHashSet[string]()
@ -457,11 +454,11 @@ when isMainModule and not defined(release):
for i in 0..<15: for i in 0..<15:
let conn = newConnection(newBufferStream(writeHandler)) let conn = newConnection(newBufferStream(writeHandler))
let peerId = PeerID.init(PrivateKey.random(RSA)) let peerInfo = PeerInfo.init(PrivateKey.random(RSA))
conn.peerInfo.peerId = some(peerId) conn.peerInfo = some(peerInfo)
gossipSub.peers[peerId.pretty] = newPubSubPeer(conn.peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerId.pretty].conn = conn gossipSub.peers[peerInfo.id].conn = conn
gossipSub.mesh[topic].incl(peerId.pretty) gossipSub.mesh[topic].incl(peerInfo.id)
check gossipSub.peers.len == 15 check gossipSub.peers.len == 15
await gossipSub.rebalanceMesh(topic) await gossipSub.rebalanceMesh(topic)
@ -474,11 +471,8 @@ when isMainModule and not defined(release):
test "`rebalanceMesh` Degree Hi": test "`rebalanceMesh` Degree Hi":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
var peerInfo: PeerInfo let gossipSub = newPubSub(TestGossipSub,
var seckey = some(PrivateKey.random(RSA)) PeerInfo.init(PrivateKey.random(RSA)))
peerInfo.peerId = some(PeerID.init(seckey.get()))
let gossipSub = newPubSub(TestGossipSub, peerInfo)
let topic = "foobar" let topic = "foobar"
gossipSub.gossipsub[topic] = initHashSet[string]() gossipSub.gossipsub[topic] = initHashSet[string]()
@ -487,11 +481,11 @@ when isMainModule and not defined(release):
for i in 0..<15: for i in 0..<15:
let conn = newConnection(newBufferStream(writeHandler)) let conn = newConnection(newBufferStream(writeHandler))
let peerId = PeerID.init(PrivateKey.random(RSA)) let peerInfo = PeerInfo.init(PrivateKey.random(RSA))
conn.peerInfo.peerId = some(peerId) conn.peerInfo = some(peerInfo)
gossipSub.peers[peerId.pretty] = newPubSubPeer(conn.peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerId.pretty].conn = conn gossipSub.peers[peerInfo.id].conn = conn
gossipSub.gossipsub[topic].incl(peerId.pretty) gossipSub.gossipsub[topic].incl(peerInfo.id)
check gossipSub.gossipsub[topic].len == 15 check gossipSub.gossipsub[topic].len == 15
await gossipSub.rebalanceMesh(topic) await gossipSub.rebalanceMesh(topic)
@ -504,11 +498,8 @@ when isMainModule and not defined(release):
test "`replenishFanout` Degree Lo": test "`replenishFanout` Degree Lo":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
var peerInfo: PeerInfo let gossipSub = newPubSub(TestGossipSub,
var seckey = some(PrivateKey.random(RSA)) PeerInfo.init(PrivateKey.random(RSA)))
peerInfo.peerId = some(PeerID.init(seckey.get()))
let gossipSub = newPubSub(TestGossipSub, peerInfo)
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} =
discard discard
@ -520,11 +511,11 @@ when isMainModule and not defined(release):
for i in 0..<15: for i in 0..<15:
let conn = newConnection(newBufferStream(writeHandler)) let conn = newConnection(newBufferStream(writeHandler))
let peerId = PeerID.init(PrivateKey.random(RSA)) var peerInfo = PeerInfo.init(PrivateKey.random(RSA))
conn.peerInfo.peerId = some(peerId) conn.peerInfo = some(peerInfo)
gossipSub.peers[peerId.pretty] = newPubSubPeer(conn.peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerId.pretty].handler = handler gossipSub.peers[peerInfo.id].handler = handler
gossipSub.gossipsub[topic].incl(peerId.pretty) gossipSub.gossipsub[topic].incl(peerInfo.id)
check gossipSub.gossipsub[topic].len == 15 check gossipSub.gossipsub[topic].len == 15
await gossipSub.replenishFanout(topic) await gossipSub.replenishFanout(topic)
@ -537,11 +528,8 @@ when isMainModule and not defined(release):
test "`dropFanoutPeers` drop expired fanout topics": test "`dropFanoutPeers` drop expired fanout topics":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
var peerInfo: PeerInfo let gossipSub = newPubSub(TestGossipSub,
var seckey = some(PrivateKey.random(RSA)) PeerInfo.init(PrivateKey.random(RSA)))
peerInfo.peerId = some(PeerID.init(seckey.get()))
let gossipSub = newPubSub(TestGossipSub, peerInfo)
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} =
discard discard
@ -554,11 +542,11 @@ when isMainModule and not defined(release):
for i in 0..<6: for i in 0..<6:
let conn = newConnection(newBufferStream(writeHandler)) let conn = newConnection(newBufferStream(writeHandler))
let peerId = PeerID.init(PrivateKey.random(RSA)) let peerInfo = PeerInfo.init(PrivateKey.random(RSA))
conn.peerInfo.peerId = some(peerId) conn.peerInfo = some(peerInfo)
gossipSub.peers[peerId.pretty] = newPubSubPeer(conn.peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerId.pretty].handler = handler gossipSub.peers[peerInfo.id].handler = handler
gossipSub.fanout[topic].incl(peerId.pretty) gossipSub.fanout[topic].incl(peerInfo.id)
check gossipSub.fanout[topic].len == GossipSubD check gossipSub.fanout[topic].len == GossipSubD
@ -573,11 +561,8 @@ when isMainModule and not defined(release):
test "`dropFanoutPeers` leave unexpired fanout topics": test "`dropFanoutPeers` leave unexpired fanout topics":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
var peerInfo: PeerInfo let gossipSub = newPubSub(TestGossipSub,
var seckey = some(PrivateKey.random(RSA)) PeerInfo.init(PrivateKey.random(RSA)))
peerInfo.peerId = some(PeerID.init(seckey.get()))
let gossipSub = newPubSub(TestGossipSub, peerInfo)
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} =
discard discard
@ -594,12 +579,12 @@ when isMainModule and not defined(release):
for i in 0..<6: for i in 0..<6:
let conn = newConnection(newBufferStream(writeHandler)) let conn = newConnection(newBufferStream(writeHandler))
let peerId = PeerID.init(PrivateKey.random(RSA)) let peerInfo = PeerInfo.init(PrivateKey.random(RSA))
conn.peerInfo.peerId = some(peerId) conn.peerInfo = some(peerInfo)
gossipSub.peers[peerId.pretty] = newPubSubPeer(conn.peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerId.pretty].handler = handler gossipSub.peers[peerInfo.id].handler = handler
gossipSub.fanout[topic1].incl(peerId.pretty) gossipSub.fanout[topic1].incl(peerInfo.id)
gossipSub.fanout[topic2].incl(peerId.pretty) gossipSub.fanout[topic2].incl(peerInfo.id)
check gossipSub.fanout[topic1].len == GossipSubD check gossipSub.fanout[topic1].len == GossipSubD
check gossipSub.fanout[topic2].len == GossipSubD check gossipSub.fanout[topic2].len == GossipSubD
@ -616,11 +601,8 @@ when isMainModule and not defined(release):
test "`getGossipPeers` - should gather up to degree D non intersecting peers": test "`getGossipPeers` - should gather up to degree D non intersecting peers":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
var peerInfo: PeerInfo let gossipSub = newPubSub(TestGossipSub,
var seckey = some(PrivateKey.random(RSA)) PeerInfo.init(PrivateKey.random(RSA)))
peerInfo.peerId = some(PeerID.init(seckey.get()))
let gossipSub = newPubSub(TestGossipSub, peerInfo)
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} =
discard discard
@ -634,22 +616,22 @@ when isMainModule and not defined(release):
gossipSub.gossipsub[topic] = initHashSet[string]() gossipSub.gossipsub[topic] = initHashSet[string]()
for i in 0..<30: for i in 0..<30:
let conn = newConnection(newBufferStream(writeHandler)) let conn = newConnection(newBufferStream(writeHandler))
let peerId = PeerID.init(PrivateKey.random(RSA)) let peerInfo = PeerInfo.init(PrivateKey.random(RSA))
conn.peerInfo.peerId = some(peerId) conn.peerInfo = some(peerInfo)
gossipSub.peers[peerId.pretty] = newPubSubPeer(conn.peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerId.pretty].handler = handler gossipSub.peers[peerInfo.id].handler = handler
if i mod 2 == 0: if i mod 2 == 0:
gossipSub.fanout[topic].incl(peerId.pretty) gossipSub.fanout[topic].incl(peerInfo.id)
else: else:
gossipSub.mesh[topic].incl(peerId.pretty) gossipSub.mesh[topic].incl(peerInfo.id)
for i in 0..<15: for i in 0..<15:
let conn = newConnection(newBufferStream(writeHandler)) let conn = newConnection(newBufferStream(writeHandler))
let peerId = PeerID.init(PrivateKey.random(RSA)) let peerInfo = PeerInfo.init(PrivateKey.random(RSA))
conn.peerInfo.peerId = some(peerId) conn.peerInfo = some(peerInfo)
gossipSub.peers[peerId.pretty] = newPubSubPeer(conn.peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerId.pretty].handler = handler gossipSub.peers[peerInfo.id].handler = handler
gossipSub.gossipsub[topic].incl(peerId.pretty) gossipSub.gossipsub[topic].incl(peerInfo.id)
check gossipSub.fanout[topic].len == 15 check gossipSub.fanout[topic].len == 15
check gossipSub.fanout[topic].len == 15 check gossipSub.fanout[topic].len == 15
@ -668,12 +650,9 @@ when isMainModule and not defined(release):
test "`getGossipPeers` - should not crash on missing topics in mesh": test "`getGossipPeers` - should not crash on missing topics in mesh":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
var peerInfo: PeerInfo let gossipSub = newPubSub(TestGossipSub,
var seckey = some(PrivateKey.random(RSA)) PeerInfo.init(PrivateKey.random(RSA)))
peerInfo.peerId = some(PeerID.init(seckey.get()))
let gossipSub = newPubSub(TestGossipSub, peerInfo)
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} =
discard discard
@ -685,14 +664,14 @@ when isMainModule and not defined(release):
gossipSub.gossipsub[topic] = initHashSet[string]() gossipSub.gossipsub[topic] = initHashSet[string]()
for i in 0..<30: for i in 0..<30:
let conn = newConnection(newBufferStream(writeHandler)) let conn = newConnection(newBufferStream(writeHandler))
let peerId = PeerID.init(PrivateKey.random(RSA)) let peerInfo = PeerInfo.init(PrivateKey.random(RSA))
conn.peerInfo.peerId = some(peerId) conn.peerInfo = some(peerInfo)
gossipSub.peers[peerId.pretty] = newPubSubPeer(conn.peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerId.pretty].handler = handler gossipSub.peers[peerInfo.id].handler = handler
if i mod 2 == 0: if i mod 2 == 0:
gossipSub.fanout[topic].incl(peerId.pretty) gossipSub.fanout[topic].incl(peerInfo.id)
else: else:
gossipSub.gossipsub[topic].incl(peerId.pretty) gossipSub.gossipsub[topic].incl(peerInfo.id)
let peers = gossipSub.getGossipPeers() let peers = gossipSub.getGossipPeers()
check peers.len == GossipSubD check peers.len == GossipSubD
@ -703,11 +682,8 @@ when isMainModule and not defined(release):
test "`getGossipPeers` - should not crash on missing topics in gossip": test "`getGossipPeers` - should not crash on missing topics in gossip":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
var peerInfo: PeerInfo let gossipSub = newPubSub(TestGossipSub,
var seckey = some(PrivateKey.random(RSA)) PeerInfo.init(PrivateKey.random(RSA)))
peerInfo.peerId = some(PeerID.init(seckey.get()))
let gossipSub = newPubSub(TestGossipSub, peerInfo)
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} =
discard discard
@ -720,14 +696,14 @@ when isMainModule and not defined(release):
gossipSub.gossipsub[topic] = initHashSet[string]() gossipSub.gossipsub[topic] = initHashSet[string]()
for i in 0..<30: for i in 0..<30:
let conn = newConnection(newBufferStream(writeHandler)) let conn = newConnection(newBufferStream(writeHandler))
let peerId = PeerID.init(PrivateKey.random(RSA)) let peerInfo = PeerInfo.init(PrivateKey.random(RSA))
conn.peerInfo.peerId = some(peerId) conn.peerInfo = some(peerInfo)
gossipSub.peers[peerId.pretty] = newPubSubPeer(conn.peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerId.pretty].handler = handler gossipSub.peers[peerInfo.id].handler = handler
if i mod 2 == 0: if i mod 2 == 0:
gossipSub.mesh[topic].incl(peerId.pretty) gossipSub.mesh[topic].incl(peerInfo.id)
else: else:
gossipSub.gossipsub[topic].incl(peerId.pretty) gossipSub.gossipsub[topic].incl(peerInfo.id)
let peers = gossipSub.getGossipPeers() let peers = gossipSub.getGossipPeers()
check peers.len == GossipSubD check peers.len == GossipSubD
@ -738,11 +714,8 @@ when isMainModule and not defined(release):
test "`getGossipPeers` - should not crash on missing topics in gossip": test "`getGossipPeers` - should not crash on missing topics in gossip":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
var peerInfo: PeerInfo let gossipSub = newPubSub(TestGossipSub,
var seckey = some(PrivateKey.random(RSA)) PeerInfo.init(PrivateKey.random(RSA)))
peerInfo.peerId = some(PeerID.init(seckey.get()))
let gossipSub = newPubSub(TestGossipSub, peerInfo)
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} =
discard discard
@ -755,14 +728,14 @@ when isMainModule and not defined(release):
gossipSub.fanout[topic] = initHashSet[string]() gossipSub.fanout[topic] = initHashSet[string]()
for i in 0..<30: for i in 0..<30:
let conn = newConnection(newBufferStream(writeHandler)) let conn = newConnection(newBufferStream(writeHandler))
let peerId = PeerID.init(PrivateKey.random(RSA)) let peerInfo = PeerInfo.init(PrivateKey.random(RSA))
conn.peerInfo.peerId = some(peerId) conn.peerInfo = some(peerInfo)
gossipSub.peers[peerId.pretty] = newPubSubPeer(conn.peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerId.pretty].handler = handler gossipSub.peers[peerInfo.id].handler = handler
if i mod 2 == 0: if i mod 2 == 0:
gossipSub.mesh[topic].incl(peerId.pretty) gossipSub.mesh[topic].incl(peerInfo.id)
else: else:
gossipSub.fanout[topic].incl(peerId.pretty) gossipSub.fanout[topic].incl(peerInfo.id)
let peers = gossipSub.getGossipPeers() let peers = gossipSub.getGossipPeers()
check peers.len == 0 check peers.len == 0

View File

@ -102,7 +102,7 @@ method handleConn*(p: PubSub,
## that we're interested in ## that we're interested in
## ##
if conn.peerInfo.peerId.isNone: if conn.peerInfo.isNone:
trace "no valid PeerId for peer" trace "no valid PeerId for peer"
await conn.close() await conn.close()
return return
@ -111,7 +111,7 @@ method handleConn*(p: PubSub,
# call floodsub rpc handler # call floodsub rpc handler
await p.rpcHandler(peer, msgs) await p.rpcHandler(peer, msgs)
let peer = p.getPeer(conn.peerInfo, proto) let peer = p.getPeer(conn.peerInfo.get(), proto)
let topics = toSeq(p.topics.keys) let topics = toSeq(p.topics.keys)
if topics.len > 0: if topics.len > 0:
await p.sendSubs(peer, topics, true) await p.sendSubs(peer, topics, true)
@ -123,8 +123,8 @@ method handleConn*(p: PubSub,
method subscribeToPeer*(p: PubSub, method subscribeToPeer*(p: PubSub,
conn: Connection) {.base, async, gcsafe.} = conn: Connection) {.base, async, gcsafe.} =
var peer = p.getPeer(conn.peerInfo, p.codec) var peer = p.getPeer(conn.peerInfo.get(), p.codec)
trace "setting connection for peer", peerId = conn.peerInfo.id trace "setting connection for peer", peerId = conn.peerInfo.get().id
if not peer.isConnected: if not peer.isConnected:
peer.conn = conn peer.conn = conn
@ -133,9 +133,8 @@ method subscribeToPeer*(p: PubSub,
.addCallback( .addCallback(
proc(udata: pointer = nil) {.gcsafe.} = proc(udata: pointer = nil) {.gcsafe.} =
trace "connection closed, cleaning up peer", trace "connection closed, cleaning up peer",
peer = conn.peerInfo.id peer = conn.peerInfo.get().id
# TODO: figureout how to handle properly without dicarding
asyncCheck p.cleanUpHelper(peer) asyncCheck p.cleanUpHelper(peer)
) )

View File

@ -62,7 +62,7 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async, gcsafe.} =
await p.handler(p, @[msg]) await p.handler(p, @[msg])
p.recvdRpcCache.put($hexData.hash) p.recvdRpcCache.put($hexData.hash)
except CatchableError as exc: except CatchableError as exc:
error "an exception occured while processing pubsub rpc requests", exc = exc.msg error "exception occured", exc = exc.msg
finally: finally:
trace "exiting pubsub peer read loop", peer = p.id trace "exiting pubsub peer read loop", peer = p.id
@ -105,7 +105,7 @@ proc sendMsg*(p: PubSubPeer,
peerId: PeerID, peerId: PeerID,
topic: string, topic: string,
data: seq[byte]): Future[void] {.gcsafe.} = data: seq[byte]): Future[void] {.gcsafe.} =
p.send(@[RPCMsg(messages: @[newMessage(p.peerInfo.peerId.get(), data, topic)])]) p.send(@[RPCMsg(messages: @[newMessage(p.peerInfo, data, topic)])])
proc sendGraft*(p: PubSubPeer, topics: seq[string]) {.async, gcsafe.} = proc sendGraft*(p: PubSubPeer, topics: seq[string]) {.async, gcsafe.} =
for topic in topics: for topic in topics:
@ -124,4 +124,4 @@ proc newPubSubPeer*(peerInfo: PeerInfo,
result.peerInfo = peerInfo result.peerInfo = peerInfo
result.sentRpcCache = newTimedCache[string](2.minutes) result.sentRpcCache = newTimedCache[string](2.minutes)
result.recvdRpcCache = newTimedCache[string](2.minutes) result.recvdRpcCache = newTimedCache[string](2.minutes)
result.onConnect = newAsyncEvent() result.onConnect = newAsyncEvent()

View File

@ -12,33 +12,32 @@ import chronicles
import nimcrypto/sysrand import nimcrypto/sysrand
import messages, protobuf, import messages, protobuf,
../../../peer, ../../../peer,
../../../peerinfo,
../../../crypto/crypto, ../../../crypto/crypto,
../../../protobuf/minprotobuf ../../../protobuf/minprotobuf
logScope: logScope:
topic = "PubSubMessage" topic = "PubSubMessage"
const PubSubPrefix = "libp2p-pubsub:"
proc msgId*(m: Message): string = proc msgId*(m: Message): string =
m.seqno.toHex() & PeerID.init(m.fromPeer).pretty m.seqno.toHex() & PeerID.init(m.fromPeer).pretty
proc fromPeerId*(m: Message): PeerId = proc fromPeerId*(m: Message): PeerId =
PeerID.init(m.fromPeer) PeerID.init(m.fromPeer)
proc sign*(peerId: PeerID, msg: Message): Message {.gcsafe.} = proc sign*(p: PeerInfo, msg: Message): Message {.gcsafe.} =
var buff = initProtoBuffer() var buff = initProtoBuffer()
encodeMessage(msg, buff) encodeMessage(msg, buff)
# NOTE: leave as is, moving out would imply making this .threadsafe., etc... let prefix = cast[seq[byte]](PubSubPrefix)
let prefix = cast[seq[byte]]("libp2p-pubsub:")
if buff.buffer.len > 0: if buff.buffer.len > 0:
result = msg result = msg
if peerId.privateKey.isSome: result.signature = p.privateKey.
result.signature = peerId. sign(prefix & buff.buffer).
privateKey. getBytes()
get().
sign(prefix & buff.buffer).
getBytes()
proc verify*(peerId: PeerID, m: Message): bool = proc verify*(p: PeerInfo, m: Message): bool =
if m.signature.len > 0 and m.key.len > 0: if m.signature.len > 0 and m.key.len > 0:
var msg = m var msg = m
msg.signature = @[] msg.signature = @[]
@ -52,22 +51,19 @@ proc verify*(peerId: PeerID, m: Message): bool =
if remote.init(m.signature) and key.init(m.key): if remote.init(m.signature) and key.init(m.key):
result = remote.verify(buff.buffer, key) result = remote.verify(buff.buffer, key)
proc newMessage*(peerId: PeerID, proc newMessage*(p: PeerInfo,
data: seq[byte], data: seq[byte],
name: string, name: string,
sign: bool = true): Message {.gcsafe.} = sign: bool = true): Message {.gcsafe.} =
var seqno: seq[byte] = newSeq[byte](20) var seqno: seq[byte] = newSeq[byte](20)
if randomBytes(addr seqno[0], 20) > 0: if p.publicKey.isSome and randomBytes(addr seqno[0], 20) > 0:
var key: seq[byte] = @[] var key: seq[byte] = p.publicKey.get().getBytes()
if peerId.publicKey.isSome: result = Message(fromPeer: p.peerId.getBytes(),
key = peerId.publicKey.get().getBytes()
result = Message(fromPeer: peerId.getBytes(),
data: data, data: data,
seqno: seqno, seqno: seqno,
topicIDs: @[name]) topicIDs: @[name])
if sign: if sign:
result = sign(peerId, result) result = p.sign(result)
result.key = key result.key = key

View File

@ -40,7 +40,6 @@ proc put*[V](t: TimedCache[V],
trace "adding entry to timed cache", key = key trace "adding entry to timed cache", key = key
t.cache[key] = TimedEntry[V](val: val, handler: handler) t.cache[key] = TimedEntry[V](val: val, handler: handler)
# TODO: addTimer with param Duration is missing from chronos, needs to be added
addTimer( addTimer(
timeout, timeout,
proc (arg: pointer = nil) {.gcsafe.} = proc (arg: pointer = nil) {.gcsafe.} =

View File

@ -11,6 +11,7 @@ import chronos, chronicles
import nimcrypto/[sysrand, hmac, sha2, sha, hash, rijndael, twofish, bcmode] import nimcrypto/[sysrand, hmac, sha2, sha, hash, rijndael, twofish, bcmode]
import secure, import secure,
../../connection, ../../connection,
../../peerinfo,
../../stream/lpstream, ../../stream/lpstream,
../../crypto/crypto, ../../crypto/crypto,
../../crypto/ecnist, ../../crypto/ecnist,
@ -223,7 +224,7 @@ proc newSecureConnection*(conn: Connection,
cipher: string, cipher: string,
secrets: Secret, secrets: Secret,
order: int, order: int,
peerId: PeerID): SecureConnection = remotePubKey: PublicKey): SecureConnection =
## Create new secure connection, using specified hash algorithm ``hash``, ## Create new secure connection, using specified hash algorithm ``hash``,
## cipher algorithm ``cipher``, stretched keys ``secrets`` and order ## cipher algorithm ``cipher``, stretched keys ``secrets`` and order
## ``order``. ## ``order``.
@ -248,7 +249,7 @@ proc newSecureConnection*(conn: Connection,
result.readerCoder.init(cipher, secrets.keyOpenArray(i1), result.readerCoder.init(cipher, secrets.keyOpenArray(i1),
secrets.ivOpenArray(i1)) secrets.ivOpenArray(i1))
result.peerInfo.peerId = some(peerId) result.peerInfo = some(PeerInfo.init(remotePubKey))
proc transactMessage(conn: Connection, proc transactMessage(conn: Connection,
msg: seq[byte]): Future[seq[byte]] {.async.} = msg: seq[byte]): Future[seq[byte]] {.async.} =
@ -396,7 +397,7 @@ proc handshake*(s: Secio, conn: Connection): Future[SecureConnection] {.async.}
# Perform Nonce exchange over encrypted channel. # Perform Nonce exchange over encrypted channel.
result = newSecureConnection(conn, hash, cipher, keys, order, remotePeerId) result = newSecureConnection(conn, hash, cipher, keys, order, remotePubkey)
await result.writeMessage(remoteNonce) await result.writeMessage(remoteNonce)
var res = await result.readMessage() var res = await result.readMessage()
@ -438,7 +439,7 @@ proc handleConn(s: Secio, conn: Connection): Future[Connection] {.async, gcsafe.
if not sconn.closed: if not sconn.closed:
asyncCheck sconn.close() asyncCheck sconn.close()
) )
secured.peerInfo.peerId = sconn.peerInfo.peerId secured.peerInfo = some(PeerInfo.init(sconn.peerInfo.get().publicKey.get()))
result = secured result = secured
method init(s: Secio) {.gcsafe.} = method init(s: Secio) {.gcsafe.} =

View File

@ -65,13 +65,15 @@ proc secure(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} =
proc identify(s: Switch, conn: Connection): Future[PeerInfo] {.async, gcsafe.} = proc identify(s: Switch, conn: Connection): Future[PeerInfo] {.async, gcsafe.} =
## identify the connection ## identify the connection
result = conn.peerInfo if conn.peerInfo.isSome:
result = conn.peerInfo.get()
try: try:
if (await s.ms.select(conn, s.identity.codec)): if (await s.ms.select(conn, s.identity.codec)):
let info = await s.identity.identify(conn, conn.peerInfo) let info = await s.identity.identify(conn, conn.peerInfo)
if info.pubKey.isSome: if info.pubKey.isSome:
result.peerId = some(PeerID.init(info.pubKey.get())) # we might not have a peerId at all result = PeerInfo.init(info.pubKey.get())
trace "identify: identified remote peer", peer = result.id trace "identify: identified remote peer", peer = result.id
if info.addrs.len > 0: if info.addrs.len > 0:
@ -111,37 +113,33 @@ proc mux(s: Switch, conn: Connection): Future[void] {.async, gcsafe.} =
handlerFut.addCallback( handlerFut.addCallback(
proc(udata: pointer = nil) {.gcsafe.} = proc(udata: pointer = nil) {.gcsafe.} =
trace "muxer handler completed for peer", trace "muxer handler completed for peer",
peer = conn.peerInfo.id peer = conn.peerInfo.get().id
) )
# do identify first, so that we have a # do identify first, so that we have a
# PeerInfo in case we didn't before # PeerInfo in case we didn't before
conn.peerInfo = await s.identify(stream) conn.peerInfo = some((await s.identify(stream)))
await stream.close() # close identify stream await stream.close() # close identify stream
trace "connection's peerInfo", peerInfo = conn.peerInfo trace "connection's peerInfo", peerInfo = conn.peerInfo
# store it in muxed connections if we have a peer for it # store it in muxed connections if we have a peer for it
# TODO: We should make sure that this are cleaned up properly if conn.peerInfo.isSome:
# on exit even if there is no peer for it. This shouldn't trace "adding muxer for peer", peer = conn.peerInfo.get().id
# happen once secio is in place, but still something to keep s.muxed[conn.peerInfo.get().id] = muxer
# in mind
if conn.peerInfo.peerId.isSome:
trace "adding muxer for peer", peer = conn.peerInfo.id
s.muxed[conn.peerInfo.id] = muxer
proc cleanupConn(s: Switch, conn: Connection) {.async, gcsafe.} = proc cleanupConn(s: Switch, conn: Connection) {.async, gcsafe.} =
if conn.peerInfo.peerId.isSome: # if conn.peerInfo.peerId.isSome:
let id = conn.peerInfo.id let id = conn.peerInfo.get().id
trace "cleaning up connection for peer", peerId = id trace "cleaning up connection for peer", peerId = id
if id in s.muxed: if id in s.muxed:
await s.muxed[id].close() await s.muxed[id].close()
s.muxed.del(id) s.muxed.del(id)
if id in s.connections: if id in s.connections:
await s.connections[id].close() await s.connections[id].close()
s.connections.del(id) s.connections.del(id)
proc getMuxedStream(s: Switch, peerInfo: PeerInfo): Future[Option[Connection]] {.async, gcsafe.} = proc getMuxedStream(s: Switch, peerInfo: PeerInfo): Future[Option[Connection]] {.async, gcsafe.} =
# if there is a muxer for the connection # if there is a muxer for the connection
@ -157,13 +155,12 @@ proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, g
result = conn result = conn
# don't mux/secure twise # don't mux/secure twise
if conn.peerInfo.peerId.isSome and if conn.peerInfo.get().id in s.muxed:
conn.peerInfo.id in s.muxed:
return return
result = await s.secure(result) # secure the connection result = await s.secure(result) # secure the connection
await s.mux(result) # mux it if possible await s.mux(result) # mux it if possible
s.connections[conn.peerInfo.id] = result s.connections[conn.peerInfo.get().id] = result
proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} = proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} =
trace "upgrading incoming connection" trace "upgrading incoming connection"
@ -205,7 +202,7 @@ proc dial*(s: Switch,
trace "dialing address", address = $a trace "dialing address", address = $a
result = await t.dial(a) result = await t.dial(a)
# make sure to assign the peer to the connection # make sure to assign the peer to the connection
result.peerInfo = peer result.peerInfo = some(peer)
result = await s.upgradeOutgoing(result) result = await s.upgradeOutgoing(result)
result.closeEvent.wait().addCallback( result.closeEvent.wait().addCallback(
proc(udata: pointer) = proc(udata: pointer) =
@ -325,7 +322,7 @@ proc newSwitch*(peerInfo: PeerInfo,
val.muxerHandler = proc(muxer: Muxer) {.async, gcsafe.} = val.muxerHandler = proc(muxer: Muxer) {.async, gcsafe.} =
trace "got new muxer" trace "got new muxer"
let stream = await muxer.newStream() let stream = await muxer.newStream()
muxer.connection.peerInfo = await s.identify(stream) muxer.connection.peerInfo = some((await s.identify(stream)))
await stream.close() await stream.close()
for k in secureManagers.keys: for k in secureManagers.keys:

View File

@ -19,10 +19,7 @@ import utils, ../../libp2p/[switch,
protocols/pubsub/gossipsub] protocols/pubsub/gossipsub]
proc createGossipSub(): GossipSub = proc createGossipSub(): GossipSub =
var peerInfo: PeerInfo var peerInfo = PeerInfo.init(PrivateKey.random(RSA))
var seckey = some(PrivateKey.random(RSA))
peerInfo.peerId = some(PeerID.init(seckey.get()))
result = newPubSub(GossipSub, peerInfo) result = newPubSub(GossipSub, peerInfo)
suite "GossipSub": suite "GossipSub":
@ -36,11 +33,11 @@ suite "GossipSub":
var buf1 = newBufferStream() var buf1 = newBufferStream()
var conn1 = newConnection(buf1) var conn1 = newConnection(buf1)
conn1.peerInfo = gossip1.peerInfo conn1.peerInfo = some(gossip1.peerInfo)
var buf2 = newBufferStream() var buf2 = newBufferStream()
var conn2 = newConnection(buf2) var conn2 = newConnection(buf2)
conn2.peerInfo = gossip2.peerInfo conn2.peerInfo = some(gossip2.peerInfo)
buf1 = buf1 | buf2 | buf1 buf1 = buf1 | buf2 | buf1
@ -52,7 +49,7 @@ suite "GossipSub":
check: check:
"foobar" in gossip2.gossipsub "foobar" in gossip2.gossipsub
gossip1.peerInfo.peerId.get().pretty in gossip2.gossipsub["foobar"] gossip1.peerInfo.id in gossip2.gossipsub["foobar"]
result = true result = true
@ -83,7 +80,7 @@ suite "GossipSub":
check: check:
"foobar" in gossip2.topics "foobar" in gossip2.topics
"foobar" in gossip1.gossipsub "foobar" in gossip1.gossipsub
gossip2.peerInfo.peerId.get().pretty in gossip1.gossipsub["foobar"] gossip2.peerInfo.id in gossip1.gossipsub["foobar"]
await allFutures(nodes.mapIt(it.stop())) await allFutures(nodes.mapIt(it.stop()))
await allFutures(awaitters) await allFutures(awaitters)
@ -103,11 +100,11 @@ suite "GossipSub":
var buf1 = newBufferStream() var buf1 = newBufferStream()
var conn1 = newConnection(buf1) var conn1 = newConnection(buf1)
conn1.peerInfo = gossip1.peerInfo conn1.peerInfo = some(gossip1.peerInfo)
var buf2 = newBufferStream() var buf2 = newBufferStream()
var conn2 = newConnection(buf2) var conn2 = newConnection(buf2)
conn2.peerInfo = gossip2.peerInfo conn2.peerInfo = some(gossip2.peerInfo)
buf1 = buf1 | buf2 | buf1 buf1 = buf1 | buf2 | buf1
@ -131,8 +128,8 @@ suite "GossipSub":
# TODO: in a real setting, we would be checking for the peerId from # TODO: in a real setting, we would be checking for the peerId from
# gossip1 in gossip2 and vice versa, but since we're doing some mockery # gossip1 in gossip2 and vice versa, but since we're doing some mockery
# with connection piping and such, this is fine - do not change! # with connection piping and such, this is fine - do not change!
gossip1.peerInfo.peerId.get().pretty in gossip1.gossipsub["foobar"] gossip1.peerInfo.id in gossip1.gossipsub["foobar"]
gossip2.peerInfo.peerId.get().pretty in gossip2.gossipsub["foobar"] gossip2.peerInfo.id in gossip2.gossipsub["foobar"]
result = true result = true
@ -170,8 +167,8 @@ suite "GossipSub":
"foobar" in gossip1.gossipsub "foobar" in gossip1.gossipsub
"foobar" in gossip2.gossipsub "foobar" in gossip2.gossipsub
gossip1.peerInfo.peerId.get().pretty in gossip2.gossipsub["foobar"] gossip1.peerInfo.id in gossip2.gossipsub["foobar"]
gossip2.peerInfo.peerId.get().pretty in gossip1.gossipsub["foobar"] gossip2.peerInfo.id in gossip1.gossipsub["foobar"]
await allFutures(nodes.mapIt(it.stop())) await allFutures(nodes.mapIt(it.stop()))
await allFutures(awaitters) await allFutures(awaitters)
@ -396,9 +393,9 @@ suite "GossipSub":
closureScope: closureScope:
var dialerNode = dialer var dialerNode = dialer
handler = proc(topic: string, data: seq[byte]) {.async, gcsafe, closure.} = handler = proc(topic: string, data: seq[byte]) {.async, gcsafe, closure.} =
if dialerNode.peerInfo.peerId.get().pretty notin seen: if dialerNode.peerInfo.id notin seen:
seen[dialerNode.peerInfo.peerId.get().pretty] = 0 seen[dialerNode.peerInfo.id] = 0
seen[dialerNode.peerInfo.peerId.get().pretty].inc seen[dialerNode.peerInfo.id].inc
check topic == "foobar" check topic == "foobar"
await dialer.subscribe("foobar", handler) await dialer.subscribe("foobar", handler)
@ -409,7 +406,7 @@ suite "GossipSub":
await nodes[0].publish("foobar", await nodes[0].publish("foobar",
cast[seq[byte]]("from node " & cast[seq[byte]]("from node " &
nodes[1].peerInfo.peerId.get().pretty)) nodes[1].peerInfo.id))
await sleepAsync(1000.millis) await sleepAsync(1000.millis)
await allFutures(nodes.mapIt(it.stop())) await allFutures(nodes.mapIt(it.stop()))

View File

@ -25,14 +25,11 @@ proc createNode*(privKey: Option[PrivateKey] = none(PrivateKey),
address: string = "/ip4/127.0.0.1/tcp/0", address: string = "/ip4/127.0.0.1/tcp/0",
triggerSelf: bool = false, triggerSelf: bool = false,
gossip: bool = false): Switch = gossip: bool = false): Switch =
var peerInfo: PeerInfo
var seckey = privKey var seckey = privKey
if privKey.isNone: if privKey.isNone:
seckey = some(PrivateKey.random(RSA)) seckey = some(PrivateKey.random(RSA))
peerInfo.peerId = some(PeerID.init(seckey.get())) var peerInfo = PeerInfo.init(seckey.get(), @[Multiaddress.init(address)])
peerInfo.addrs.add(Multiaddress.init(address))
let mplexProvider = newMuxerProvider(createMplex, MplexCodec) let mplexProvider = newMuxerProvider(createMplex, MplexCodec)
let transports = @[Transport(newTransport(TcpTransport))] let transports = @[Transport(newTransport(TcpTransport))]
let muxers = [(MplexCodec, mplexProvider)].toTable() let muxers = [(MplexCodec, mplexProvider)].toTable()

View File

@ -17,15 +17,12 @@ suite "Identify":
test "handle identify message": test "handle identify message":
proc testHandle(): Future[bool] {.async.} = proc testHandle(): Future[bool] {.async.} =
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
let remoteSecKey = PrivateKey.random(RSA)
let remoteSeckey = PrivateKey.random(RSA) let remotePeerInfo = PeerInfo.init(remoteSecKey,
var remotePeerInfo: PeerInfo @[ma],
@["/test/proto1/1.0.0",
"/test/proto2/1.0.0"])
var serverFut: Future[void] var serverFut: Future[void]
remotePeerInfo.peerId = some(PeerID.init(remoteSeckey))
remotePeerInfo.addrs.add(ma)
remotePeerInfo.protocols.add("/test/proto1/1.0.0")
remotePeerInfo.protocols.add("/test/proto2/1.0.0")
let identifyProto1 = newIdentify(remotePeerInfo) let identifyProto1 = newIdentify(remotePeerInfo)
let msListen = newMultistream() let msListen = newMultistream()
@ -40,23 +37,18 @@ suite "Identify":
let transport2: TcpTransport = newTransport(TcpTransport) let transport2: TcpTransport = newTransport(TcpTransport)
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.ma)
let seckey = PrivateKey.random(RSA) var peerInfo = PeerInfo.init(PrivateKey.random(RSA), @[ma])
var peerInfo: PeerInfo
peerInfo.peerId = some(PeerID.init(seckey))
peerInfo.addrs.add(ma)
let identifyProto2 = newIdentify(peerInfo) let identifyProto2 = newIdentify(peerInfo)
let res = await msDial.select(conn, IdentifyCodec) discard await msDial.select(conn, IdentifyCodec)
let id = await identifyProto2.identify(conn, remotePeerInfo) let id = await identifyProto2.identify(conn, some(remotePeerInfo))
check id.pubKey.get() == remoteSeckey.getKey() check id.pubKey.get() == remoteSecKey.getKey()
check id.addrs[0] == ma check id.addrs[0] == ma
check id.protoVersion.get() == ProtoVersion check id.protoVersion.get() == ProtoVersion
# check id.agentVersion.get() == AgentVersion # check id.agentVersion.get() == AgentVersion
check id.protos == @["/test/proto1/1.0.0", "/test/proto2/1.0.0"] check id.protos == @["/test/proto1/1.0.0", "/test/proto2/1.0.0"]
await conn.close() await conn.close()
await transport1.close() await transport1.close()
await serverFut await serverFut
result = true result = true
@ -67,12 +59,7 @@ suite "Identify":
test "handle failed identify": test "handle failed identify":
proc testHandleError() {.async.} = proc testHandleError() {.async.} =
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
var remotePeerInfo = PeerInfo.init(PrivateKey.random(RSA), @[ma])
let remoteSeckey = PrivateKey.random(RSA)
var remotePeerInfo: PeerInfo
remotePeerInfo.peerId = some(PeerID.init(remoteSeckey))
remotePeerInfo.addrs.add(ma)
let identifyProto1 = newIdentify(remotePeerInfo) let identifyProto1 = newIdentify(remotePeerInfo)
let msListen = newMultistream() let msListen = newMultistream()
@ -87,19 +74,10 @@ suite "Identify":
let transport2: TcpTransport = newTransport(TcpTransport) let transport2: TcpTransport = newTransport(TcpTransport)
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.ma)
let seckey = PrivateKey.random(RSA) var localPeerInfo = PeerInfo.init(PrivateKey.random(RSA), @[ma])
var localPeerInfo: PeerInfo
localPeerInfo.peerId = some(PeerID.init(seckey))
localPeerInfo.addrs.add(ma)
let identifyProto2 = newIdentify(localPeerInfo) let identifyProto2 = newIdentify(localPeerInfo)
let res = await msDial.select(conn, IdentifyCodec) discard await msDial.select(conn, IdentifyCodec)
discard await identifyProto2.identify(conn, some(PeerInfo.init(PrivateKey.random(RSA))))
let wrongSec = PrivateKey.random(RSA)
var wrongRemotePeer: PeerInfo
wrongRemotePeer.peerId = some(PeerID.init(wrongSec))
let id = await identifyProto2.identify(conn, wrongRemotePeer)
await conn.close() await conn.close()
expect IdentityNoMatchError: expect IdentityNoMatchError:

View File

@ -66,14 +66,11 @@ proc createNode*(privKey: Option[PrivateKey] = none(PrivateKey),
address: string = "/ip4/127.0.0.1/tcp/0", address: string = "/ip4/127.0.0.1/tcp/0",
triggerSelf: bool = false, triggerSelf: bool = false,
gossip: bool = false): Switch = gossip: bool = false): Switch =
var peerInfo: NativePeerInfo
var seckey = privKey var seckey = privKey
if privKey.isNone: if privKey.isNone:
seckey = some(PrivateKey.random(RSA)) seckey = some(PrivateKey.random(RSA))
peerInfo.peerId = some(PeerID.init(seckey.get())) var peerInfo = NativePeerInfo.init(seckey.get(), @[Multiaddress.init(address)])
peerInfo.addrs.add(Multiaddress.init(address))
proc createMplex(conn: Connection): Muxer = newMplex(conn) proc createMplex(conn: Connection): Muxer = newMplex(conn)
let mplexProvider = newMuxerProvider(createMplex, MplexCodec) let mplexProvider = newMuxerProvider(createMplex, MplexCodec)
let transports = @[Transport(newTransport(TcpTransport))] let transports = @[Transport(newTransport(TcpTransport))]
@ -115,11 +112,10 @@ proc testPubSubDaemonPublish(gossip: bool = false): Future[bool] {.async.} =
check smsg == pubsubData check smsg == pubsubData
handlerFuture.complete(true) handlerFuture.complete(true)
await nativeNode.subscribeToPeer(NativePeerInfo(peerId: some(daemonPeer.peer), await nativeNode.subscribeToPeer(NativePeerInfo.init(daemonPeer.peer,
addrs: daemonPeer.addresses)) daemonPeer.addresses))
await sleepAsync(1.seconds) await sleepAsync(1.seconds)
await daemonNode.connect(nativePeer.peerId.get(), nativePeer.addrs) await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
proc pubsubHandler(api: DaemonAPI, proc pubsubHandler(api: DaemonAPI,
ticket: PubsubTicket, ticket: PubsubTicket,
@ -152,11 +148,11 @@ proc testPubSubNodePublish(gossip: bool = false): Future[bool] {.async.} =
let nativePeer = nativeNode.peerInfo let nativePeer = nativeNode.peerInfo
var handlerFuture = newFuture[bool]() var handlerFuture = newFuture[bool]()
await nativeNode.subscribeToPeer(NativePeerInfo(peerId: some(daemonPeer.peer), await nativeNode.subscribeToPeer(NativePeerInfo.init(daemonPeer.peer,
addrs: daemonPeer.addresses)) daemonPeer.addresses))
await sleepAsync(1.seconds) await sleepAsync(1.seconds)
await daemonNode.connect(nativePeer.peerId.get(), nativePeer.addrs) await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
proc pubsubHandler(api: DaemonAPI, proc pubsubHandler(api: DaemonAPI,
ticket: PubsubTicket, ticket: PubsubTicket,
@ -196,9 +192,9 @@ suite "Interop":
testFuture.complete() testFuture.complete()
await daemonNode.addHandler(protos, daemonHandler) await daemonNode.addHandler(protos, daemonHandler)
let conn = await nativeNode.dial(NativePeerInfo(peerId: some(daemonPeer.peer), let conn = await nativeNode.dial(NativePeerInfo.init(daemonPeer.peer,
addrs: daemonPeer.addresses), daemonPeer.addresses),
protos[0]) protos[0])
await conn.writeLp("test 1") await conn.writeLp("test 1")
check "test 2" == cast[string]((await conn.readLp())) check "test 2" == cast[string]((await conn.readLp()))
await sleepAsync(10.millis) await sleepAsync(10.millis)
@ -233,9 +229,9 @@ suite "Interop":
testFuture.complete(line) testFuture.complete(line)
await daemonNode.addHandler(protos, daemonHandler) await daemonNode.addHandler(protos, daemonHandler)
let conn = await nativeNode.dial(NativePeerInfo(peerId: some(daemonPeer.peer), let conn = await nativeNode.dial(NativePeerInfo.init(daemonPeer.peer,
addrs: daemonPeer.addresses), daemonPeer.addresses),
protos[0]) protos[0])
await conn.writeLp(test & "\r\n") await conn.writeLp(test & "\r\n")
result = test == (await wait(testFuture, 10.secs)) result = test == (await wait(testFuture, 10.secs))
await nativeNode.stop() await nativeNode.stop()
@ -269,8 +265,8 @@ suite "Interop":
let nativePeer = nativeNode.peerInfo let nativePeer = nativeNode.peerInfo
let daemonNode = await newDaemonApi() let daemonNode = await newDaemonApi()
await daemonNode.connect(nativePeer.peerId.get(), nativePeer.addrs) await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
var stream = await daemonNode.openStream(nativePeer.peerId.get(), protos) var stream = await daemonNode.openStream(nativePeer.peerId, protos)
discard await stream.transp.writeLp(test) discard await stream.transp.writeLp(test)
result = test == (await wait(testFuture, 10.secs)) result = test == (await wait(testFuture, 10.secs))
@ -308,8 +304,8 @@ suite "Interop":
let nativePeer = nativeNode.peerInfo let nativePeer = nativeNode.peerInfo
let daemonNode = await newDaemonApi() let daemonNode = await newDaemonApi()
await daemonNode.connect(nativePeer.peerId.get(), nativePeer.addrs) await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
var stream = await daemonNode.openStream(nativePeer.peerId.get(), protos) var stream = await daemonNode.openStream(nativePeer.peerId, protos)
asyncDiscard stream.transp.writeLp("test 1") asyncDiscard stream.transp.writeLp("test 1")
check "test 2" == cast[string](await stream.transp.readLp()) check "test 2" == cast[string](await stream.transp.readLp())
@ -356,8 +352,8 @@ suite "Interop":
let nativePeer = nativeNode.peerInfo let nativePeer = nativeNode.peerInfo
let daemonNode = await newDaemonApi() let daemonNode = await newDaemonApi()
await daemonNode.connect(nativePeer.peerId.get(), nativePeer.addrs) await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
var stream = await daemonNode.openStream(nativePeer.peerId.get(), protos) var stream = await daemonNode.openStream(nativePeer.peerId, protos)
while count < 10: while count < 10:
discard await stream.transp.writeLp(test) discard await stream.transp.writeLp(test)

View File

@ -168,9 +168,6 @@ suite "Multistream select":
let ms = newMultistream() let ms = newMultistream()
let conn = newConnection(newTestSelectStream()) let conn = newConnection(newTestSelectStream())
let seckey = PrivateKey.random(RSA)
var peerInfo: PeerInfo
peerInfo.peerId = some(PeerID.init(seckey))
var protocol: LPProtocol = new LPProtocol var protocol: LPProtocol = new LPProtocol
proc testHandler(conn: Connection, proc testHandler(conn: Connection,
proto: string): proto: string):
@ -197,13 +194,9 @@ suite "Multistream select":
check strProto == "\x26/test/proto1/1.0.0\n/test/proto2/1.0.0\n" check strProto == "\x26/test/proto1/1.0.0\n/test/proto2/1.0.0\n"
await conn.close() await conn.close()
let seckey = PrivateKey.random(RSA) proc testHandler(conn: Connection, proto: string): Future[void]
var peerInfo: PeerInfo {.async, gcsafe.} = discard
peerInfo.peerId = some(PeerID.init(seckey))
var protocol: LPProtocol = new LPProtocol var protocol: LPProtocol = new LPProtocol
proc testHandler(conn: Connection,
proto: string):
Future[void] {.async, gcsafe.} = discard
protocol.handler = testHandler protocol.handler = testHandler
ms.addHandler("/test/proto1/1.0.0", protocol) ms.addHandler("/test/proto1/1.0.0", protocol)
ms.addHandler("/test/proto2/1.0.0", protocol) ms.addHandler("/test/proto2/1.0.0", protocol)
@ -224,9 +217,6 @@ suite "Multistream select":
check cast[string](msg) == Na check cast[string](msg) == Na
await conn.close() await conn.close()
let seckey = PrivateKey.random(RSA)
var peerInfo: PeerInfo
peerInfo.peerId = some(PeerID.init(seckey))
var protocol: LPProtocol = new LPProtocol var protocol: LPProtocol = new LPProtocol
proc testHandler(conn: Connection, proc testHandler(conn: Connection,
proto: string): proto: string):
@ -244,9 +234,6 @@ suite "Multistream select":
proc endToEnd(): Future[bool] {.async.} = proc endToEnd(): Future[bool] {.async.} =
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
let seckey = PrivateKey.random(RSA)
var peerInfo: PeerInfo
peerInfo.peerId = some(PeerID.init(seckey))
var protocol: LPProtocol = new LPProtocol var protocol: LPProtocol = new LPProtocol
proc testHandler(conn: Connection, proc testHandler(conn: Connection,
proto: string): proto: string):
@ -283,9 +270,6 @@ suite "Multistream select":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
let msListen = newMultistream() let msListen = newMultistream()
let seckey = PrivateKey.random(RSA)
var peerInfo: PeerInfo
peerInfo.peerId = some(PeerID.init(seckey))
var protocol: LPProtocol = new LPProtocol var protocol: LPProtocol = new LPProtocol
protocol.handler = proc(conn: Connection, proto: string) {.async, gcsafe.} = protocol.handler = proc(conn: Connection, proto: string) {.async, gcsafe.} =
await conn.close() await conn.close()
@ -317,9 +301,6 @@ suite "Multistream select":
proc endToEnd(): Future[bool] {.async.} = proc endToEnd(): Future[bool] {.async.} =
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
let seckey = PrivateKey.random(RSA)
var peerInfo: PeerInfo
peerInfo.peerId = some(PeerID.init(seckey))
var protocol: LPProtocol = new LPProtocol var protocol: LPProtocol = new LPProtocol
proc testHandler(conn: Connection, proc testHandler(conn: Connection,
proto: string): proto: string):
@ -356,9 +337,6 @@ suite "Multistream select":
proc endToEnd(): Future[bool] {.async.} = proc endToEnd(): Future[bool] {.async.} =
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
let seckey = PrivateKey.random(RSA)
var peerInfo: PeerInfo
peerInfo.peerId = some(PeerID.init(seckey))
var protocol: LPProtocol = new LPProtocol var protocol: LPProtocol = new LPProtocol
proc testHandler(conn: Connection, proc testHandler(conn: Connection,
proto: string): proto: string):

View File

@ -7,6 +7,7 @@ import testtransport,
testbufferstream, testbufferstream,
testidentify, testidentify,
testswitch, testswitch,
testpeerinfo,
pubsub/testpubsub, pubsub/testpubsub,
# TODO: placing this before pubsub tests, # TODO: placing this before pubsub tests,
# breaks some flood and gossip tests - no idea why # breaks some flood and gossip tests - no idea why

53
tests/testpeerinfo.nim Normal file
View File

@ -0,0 +1,53 @@
import unittest, options
import ../libp2p/crypto/crypto,
../libp2p/peerinfo,
../libp2p/peer
suite "PeerInfo":
test "Should init with private key":
let seckey = PrivateKey.random(RSA)
var peerInfo = PeerInfo.init(seckey)
var peerId = PeerID.init(seckey)
check peerId == peerInfo.peerId
check seckey == peerInfo.privateKey
check seckey.getKey == peerInfo.publicKey.get()
test "Should init with public key":
let seckey = PrivateKey.random(RSA)
var peerInfo = PeerInfo.init(seckey.getKey())
var peerId = PeerID.init(seckey.getKey())
check peerId == peerInfo.peerId
check seckey.getKey == peerInfo.publicKey.get()
test "Should init from PeerId with public key":
let seckey = PrivateKey.random(Ed25519)
var peerInfo = PeerInfo.init(PeerID.init(seckey.getKey()))
var peerId = PeerID.init(seckey.getKey())
check peerId == peerInfo.peerId
check seckey.getKey == peerInfo.publicKey.get()
test "Should return none on missing public key":
let peerInfo = PeerInfo.init(PeerID.init(PrivateKey.random(RSA)))
check peerInfo.publicKey.isNone
test "Should allow assigning public key":
let key = PrivateKey.random(RSA)
let peerInfo = PeerInfo.init(PeerID.init(key))
peerInfo.publicKey = key.getKey()
check peerInfo.publicKey.get() == key.getKey()
test "Should throw on invalid public key assignement":
proc throwsOnInvalidPubKey() =
let validKey = PrivateKey.random(RSA)
let invalidKey = PrivateKey.random(RSA)
let peerInfo = PeerInfo.init(PeerID.init(validKey))
peerInfo.publicKey = invalidKey.getKey()
expect InvalidPublicKeyException:
throwsOnInvalidPubKey()

View File

@ -38,8 +38,7 @@ suite "Switch":
test "e2e use switch": test "e2e use switch":
proc createSwitch(ma: MultiAddress): (Switch, PeerInfo) {.gcsafe.}= proc createSwitch(ma: MultiAddress): (Switch, PeerInfo) {.gcsafe.}=
let seckey = PrivateKey.random(RSA) let seckey = PrivateKey.random(RSA)
var peerInfo: PeerInfo var peerInfo: PeerInfo = PeerInfo.init(PrivateKey.random(RSA))
peerInfo.peerId = some(PeerID.init(seckey))
peerInfo.addrs.add(ma) peerInfo.addrs.add(ma)
let identify = newIdentify(peerInfo) let identify = newIdentify(peerInfo)
@ -49,7 +48,7 @@ suite "Switch":
let mplexProvider = newMuxerProvider(createMplex, MplexCodec) let mplexProvider = newMuxerProvider(createMplex, MplexCodec)
let transports = @[Transport(newTransport(TcpTransport))] let transports = @[Transport(newTransport(TcpTransport))]
let muxers = [(MplexCodec, mplexProvider)].toTable() let muxers = [(MplexCodec, mplexProvider)].toTable()
let secureManagers = [(SecioCodec, Secure(newSecio(seckey)))].toTable() let secureManagers = [(SecioCodec, Secure(newSecio(peerInfo.privateKey)))].toTable()
let switch = newSwitch(peerInfo, let switch = newSwitch(peerInfo,
transports, transports,
identify, identify,