From 5f6fcc3d90a6559cb7f827746abc53dbe4ca9d6f Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Sat, 7 Dec 2019 10:36:39 -0600 Subject: [PATCH] extract public and private keys fields from peerid (#44) * extract public and private keys fields from peerid * allow assigning a public key * cleaned up TODOs * make pubsub prefix a const * public key should be an `Option` --- libp2p/connection.nim | 4 +- libp2p/peer.nim | 10 -- libp2p/peerinfo.nim | 90 ++++++++++-- libp2p/protocols/identify.nim | 14 +- libp2p/protocols/pubsub/floodsub.nim | 2 +- libp2p/protocols/pubsub/gossipsub.nim | 185 ++++++++++-------------- libp2p/protocols/pubsub/pubsub.nim | 11 +- libp2p/protocols/pubsub/pubsubpeer.nim | 6 +- libp2p/protocols/pubsub/rpc/message.nim | 32 ++-- libp2p/protocols/pubsub/timedcache.nim | 1 - libp2p/protocols/secure/secio.nim | 9 +- libp2p/switch.nim | 47 +++--- tests/pubsub/testgossipsub.nim | 33 ++--- tests/pubsub/utils.nim | 5 +- tests/testidentify.nim | 48 ++---- tests/testinterop.nim | 42 +++--- tests/testmultistream.nim | 26 +--- tests/testnative.nim | 1 + tests/testpeerinfo.nim | 53 +++++++ tests/testswitch.nim | 5 +- 20 files changed, 324 insertions(+), 300 deletions(-) create mode 100644 tests/testpeerinfo.nim diff --git a/libp2p/connection.nim b/libp2p/connection.nim index af32809..d2a1fe3 100644 --- a/libp2p/connection.nim +++ b/libp2p/connection.nim @@ -7,7 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import chronos, chronicles +import chronos, chronicles, options import peerinfo, multiaddress, stream/lpstream, @@ -19,7 +19,7 @@ const DefaultReadSize*: uint = 64 * 1024 type Connection* = ref object of LPStream - peerInfo*: PeerInfo + peerInfo*: Option[PeerInfo] stream*: LPStream observedAddrs*: Multiaddress diff --git a/libp2p/peer.nim b/libp2p/peer.nim index bf1a246..0b96bd8 100644 --- a/libp2p/peer.nim +++ b/libp2p/peer.nim @@ -21,17 +21,9 @@ const type PeerID* = object data*: seq[byte] - privateKey*: Option[PrivateKey] - publicKey: Option[PublicKey] PeerIDError* = object of CatchableError -proc publicKey*(pid: PeerID): Option[PublicKey] {.inline.} = - if pid.publicKey.isSome and len(pid.publicKey.get().getBytes()) > 0: - result = pid.publicKey - elif pid.privateKey.isSome and len(pid.privateKey.get().getBytes()) > 0: - result = some(pid.privateKey.get().getKey()) - proc pretty*(pid: PeerID): string {.inline.} = ## Return base58 encoded ``pid`` representation. result = Base58.encode(pid.data) @@ -170,12 +162,10 @@ proc init*(t: typedesc[PeerID], pubkey: PublicKey): PeerID = else: mh = MultiHash.digest("sha2-256", pubraw) result.data = mh.data.buffer - result.publicKey = some(pubkey) proc init*(t: typedesc[PeerID], seckey: PrivateKey): PeerID {.inline.} = ## Create new peer id from private key ``seckey``. result = PeerID.init(seckey.getKey()) - result.privateKey = some(seckey) proc match*(pid: PeerID, pubkey: PublicKey): bool {.inline.} = ## Returns ``true`` if ``pid`` matches public key ``pubkey``. diff --git a/libp2p/peerinfo.nim b/libp2p/peerinfo.nim index 3d21fc0..9b54213 100644 --- a/libp2p/peerinfo.nim +++ b/libp2p/peerinfo.nim @@ -8,22 +8,92 @@ ## those terms. import options -import peer, multiaddress +import peer, multiaddress, crypto/crypto -type - PeerInfo* = object of RootObj - peerId*: Option[PeerID] +## A peer can be constructed in one of tree ways: +## 1) A local peer with a private key +## 2) A remote peer with a PeerID and it's public key stored +## in the ``id`` itself +## 3) A remote peer with a standalone public key, that isn't +## encoded in the ``id`` +## + +type + KeyType* = enum + HasPrivate, + HasPublic + + InvalidPublicKeyException* = object of Exception + + PeerInfo* = ref object of RootObj + peerId*: PeerID addrs*: seq[MultiAddress] protocols*: seq[string] + case keyType*: KeyType: + of HasPrivate: + privateKey*: PrivateKey + of HasPublic: + key: Option[PublicKey] -proc id*(p: PeerInfo): string = - if p.peerId.isSome: - result = p.peerId.get().pretty +proc newInvalidPublicKeyException(): ref Exception = + newException(InvalidPublicKeyException, + "attempting to assign an invalid public key") + +proc init*(p: typedesc[PeerInfo], + key: PrivateKey, + addrs: seq[MultiAddress] = @[], + protocols: seq[string] = @[]): PeerInfo {.inline.} = + + result = PeerInfo(keyType: HasPrivate, + peerId: PeerID.init(key), + privateKey: key, + addrs: addrs, + protocols: protocols) + +proc init*(p: typedesc[PeerInfo], + peerId: PeerID, + addrs: seq[MultiAddress] = @[], + protocols: seq[string] = @[]): PeerInfo {.inline.} = + + PeerInfo(keyType: HasPublic, + peerId: peerId, + addrs: addrs, + protocols: protocols) + +proc init*(p: typedesc[PeerInfo], + key: PublicKey, + addrs: seq[MultiAddress] = @[], + protocols: seq[string] = @[]): PeerInfo {.inline.} = + + PeerInfo(keyType: HasPublic, + peerId: PeerID.init(key), + key: some(key), + addrs: addrs, + protocols: protocols) + +proc publicKey*(p: PeerInfo): Option[PublicKey] {.inline.} = + if p.keyType == HasPublic: + if p.peerId.hasPublicKey(): + var pubKey: PublicKey + if p.peerId.extractPublicKey(pubKey): + result = some(pubKey) + elif p.key.isSome: + result = p.key + else: + result = some(p.privateKey.getKey()) + +proc `publicKey=`*(p: PeerInfo, key: PublicKey) = + if not (PeerID.init(key) == p.peerId): + raise newInvalidPublicKeyException() + + p.key = some(key) + +proc id*(p: PeerInfo): string {.inline.} = + p.peerId.pretty proc `$`*(p: PeerInfo): string = - if p.peerId.isSome: - result.add("PeerID: ") - result.add(p.id & "\n") + result.add("PeerID: ") + result.add(p.id & "\n") if p.addrs.len > 0: result.add("Peer Addrs: ") diff --git a/libp2p/protocols/identify.nim b/libp2p/protocols/identify.nim index e060edf..e5ee570 100644 --- a/libp2p/protocols/identify.nim +++ b/libp2p/protocols/identify.nim @@ -46,8 +46,7 @@ type proc encodeMsg*(peerInfo: PeerInfo, observedAddrs: Multiaddress): ProtoBuffer = result = initProtoBuffer() - if peerInfo.peerId.isSome: - result.write(initProtoField(1, peerInfo.peerId.get().publicKey.get().getBytes())) + result.write(initProtoField(1, peerInfo.publicKey.get().getBytes())) for ma in peerInfo.addrs: result.write(initProtoField(2, ma.data.buffer)) @@ -122,7 +121,7 @@ method init*(p: Identify) = proc identify*(p: Identify, conn: Connection, - remotePeerInfo: PeerInfo): Future[IdentifyInfo] {.async, gcsafe.} = + remotePeerInfo: Option[PeerInfo]): Future[IdentifyInfo] {.async, gcsafe.} = var message = await conn.readLp() if len(message) == 0: trace "identify: Invalid or empty message received!" @@ -131,15 +130,16 @@ proc identify*(p: Identify, result = decodeMsg(message) - if remotePeerInfo.peerId.isSome and result.pubKey.isSome: + if remotePeerInfo.isSome and result.pubKey.isSome: let peer = PeerID.init(result.pubKey.get()) # do a string comaprison of the ids, - # because that is the only thing we have in most cases - if peer != remotePeerInfo.peerId.get(): + # because that is the only thing we + # have in most cases + if peer != remotePeerInfo.get().peerId: trace "Peer ids don't match", remote = peer.pretty(), - local = remotePeerInfo.id + local = remotePeerInfo.get().id raise newException(IdentityNoMatchError, "Peer ids don't match") diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 89c7292..4f4b813 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -104,7 +104,7 @@ method publish*(f: FloodSub, return trace "publishing on topic", name = topic - let msg = newMessage(f.peerInfo.peerId.get(), data, topic) + let msg = newMessage(f.peerInfo, data, topic) for p in f.floodsub[topic]: trace "publishing message", name = topic, peer = p, data = data await f.peers[p].send(@[RPCMsg(messages: @[msg])]) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index c2c0369..04acb49 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -179,7 +179,7 @@ method rpcHandler(g: GossipSub, g.seen.put(msg.msgId) # add the message to the seen cache # this shouldn't happen - if g.peerInfo.peerId.get() == msg.fromPeerId(): + if g.peerInfo.peerId == msg.fromPeerId(): trace "skipping messages from self", msg = msg.msgId continue @@ -195,7 +195,7 @@ method rpcHandler(g: GossipSub, for h in g.topics[t].handler: trace "calling handler for message", msg = msg.msgId, topicId = t, - localPeer = g.peerInfo.peerId.get().pretty, + localPeer = g.peerInfo.id, fromPeer = msg.fromPeerId().pretty await h(t, msg.data) # trigger user provided handler @@ -203,7 +203,7 @@ method rpcHandler(g: GossipSub, for p in toSendPeers: if p in g.peers and g.peers[p].peerInfo.peerId != peer.peerInfo.peerId: - let id = g.peers[p].peerInfo.peerId.get() + let id = g.peers[p].peerInfo.peerId let msgs = m.messages.filterIt( # don't forward to message originator id != it.fromPeerId() @@ -384,9 +384,9 @@ method publish*(g: GossipSub, # set the fanout expiery time g.lastFanoutPubSub[topic] = Moment.fromNow(GossipSubFanoutTTL) - let msg = newMessage(g.peerInfo.peerId.get(), data, topic) + let msg = newMessage(g.peerInfo, data, topic) for p in peers: - if p == g.peerInfo.peerId.get().pretty: + if p == g.peerInfo.id: continue trace "publishing on topic", name = topic @@ -444,11 +444,8 @@ when isMainModule and not defined(release): suite "GossipSub": test "`rebalanceMesh` Degree Lo": proc testRun(): Future[bool] {.async.} = - var peerInfo: PeerInfo - var seckey = some(PrivateKey.random(RSA)) - - peerInfo.peerId = some(PeerID.init(seckey.get())) - let gossipSub = newPubSub(TestGossipSub, peerInfo) + let gossipSub = newPubSub(TestGossipSub, + PeerInfo.init(PrivateKey.random(RSA))) let topic = "foobar" gossipSub.mesh[topic] = initHashSet[string]() @@ -457,11 +454,11 @@ when isMainModule and not defined(release): for i in 0..<15: let conn = newConnection(newBufferStream(writeHandler)) - let peerId = PeerID.init(PrivateKey.random(RSA)) - conn.peerInfo.peerId = some(peerId) - gossipSub.peers[peerId.pretty] = newPubSubPeer(conn.peerInfo, GossipSubCodec) - gossipSub.peers[peerId.pretty].conn = conn - gossipSub.mesh[topic].incl(peerId.pretty) + let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) + conn.peerInfo = some(peerInfo) + gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) + gossipSub.peers[peerInfo.id].conn = conn + gossipSub.mesh[topic].incl(peerInfo.id) check gossipSub.peers.len == 15 await gossipSub.rebalanceMesh(topic) @@ -474,11 +471,8 @@ when isMainModule and not defined(release): test "`rebalanceMesh` Degree Hi": proc testRun(): Future[bool] {.async.} = - var peerInfo: PeerInfo - var seckey = some(PrivateKey.random(RSA)) - - peerInfo.peerId = some(PeerID.init(seckey.get())) - let gossipSub = newPubSub(TestGossipSub, peerInfo) + let gossipSub = newPubSub(TestGossipSub, + PeerInfo.init(PrivateKey.random(RSA))) let topic = "foobar" gossipSub.gossipsub[topic] = initHashSet[string]() @@ -487,11 +481,11 @@ when isMainModule and not defined(release): for i in 0..<15: let conn = newConnection(newBufferStream(writeHandler)) - let peerId = PeerID.init(PrivateKey.random(RSA)) - conn.peerInfo.peerId = some(peerId) - gossipSub.peers[peerId.pretty] = newPubSubPeer(conn.peerInfo, GossipSubCodec) - gossipSub.peers[peerId.pretty].conn = conn - gossipSub.gossipsub[topic].incl(peerId.pretty) + let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) + conn.peerInfo = some(peerInfo) + gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) + gossipSub.peers[peerInfo.id].conn = conn + gossipSub.gossipsub[topic].incl(peerInfo.id) check gossipSub.gossipsub[topic].len == 15 await gossipSub.rebalanceMesh(topic) @@ -504,11 +498,8 @@ when isMainModule and not defined(release): test "`replenishFanout` Degree Lo": proc testRun(): Future[bool] {.async.} = - var peerInfo: PeerInfo - var seckey = some(PrivateKey.random(RSA)) - - peerInfo.peerId = some(PeerID.init(seckey.get())) - let gossipSub = newPubSub(TestGossipSub, peerInfo) + let gossipSub = newPubSub(TestGossipSub, + PeerInfo.init(PrivateKey.random(RSA))) proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = discard @@ -520,11 +511,11 @@ when isMainModule and not defined(release): for i in 0..<15: let conn = newConnection(newBufferStream(writeHandler)) - let peerId = PeerID.init(PrivateKey.random(RSA)) - conn.peerInfo.peerId = some(peerId) - gossipSub.peers[peerId.pretty] = newPubSubPeer(conn.peerInfo, GossipSubCodec) - gossipSub.peers[peerId.pretty].handler = handler - gossipSub.gossipsub[topic].incl(peerId.pretty) + var peerInfo = PeerInfo.init(PrivateKey.random(RSA)) + conn.peerInfo = some(peerInfo) + gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) + gossipSub.peers[peerInfo.id].handler = handler + gossipSub.gossipsub[topic].incl(peerInfo.id) check gossipSub.gossipsub[topic].len == 15 await gossipSub.replenishFanout(topic) @@ -537,11 +528,8 @@ when isMainModule and not defined(release): test "`dropFanoutPeers` drop expired fanout topics": proc testRun(): Future[bool] {.async.} = - var peerInfo: PeerInfo - var seckey = some(PrivateKey.random(RSA)) - - peerInfo.peerId = some(PeerID.init(seckey.get())) - let gossipSub = newPubSub(TestGossipSub, peerInfo) + let gossipSub = newPubSub(TestGossipSub, + PeerInfo.init(PrivateKey.random(RSA))) proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = discard @@ -554,11 +542,11 @@ when isMainModule and not defined(release): for i in 0..<6: let conn = newConnection(newBufferStream(writeHandler)) - let peerId = PeerID.init(PrivateKey.random(RSA)) - conn.peerInfo.peerId = some(peerId) - gossipSub.peers[peerId.pretty] = newPubSubPeer(conn.peerInfo, GossipSubCodec) - gossipSub.peers[peerId.pretty].handler = handler - gossipSub.fanout[topic].incl(peerId.pretty) + let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) + conn.peerInfo = some(peerInfo) + gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) + gossipSub.peers[peerInfo.id].handler = handler + gossipSub.fanout[topic].incl(peerInfo.id) check gossipSub.fanout[topic].len == GossipSubD @@ -573,11 +561,8 @@ when isMainModule and not defined(release): test "`dropFanoutPeers` leave unexpired fanout topics": proc testRun(): Future[bool] {.async.} = - var peerInfo: PeerInfo - var seckey = some(PrivateKey.random(RSA)) - - peerInfo.peerId = some(PeerID.init(seckey.get())) - let gossipSub = newPubSub(TestGossipSub, peerInfo) + let gossipSub = newPubSub(TestGossipSub, + PeerInfo.init(PrivateKey.random(RSA))) proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = discard @@ -594,12 +579,12 @@ when isMainModule and not defined(release): for i in 0..<6: let conn = newConnection(newBufferStream(writeHandler)) - let peerId = PeerID.init(PrivateKey.random(RSA)) - conn.peerInfo.peerId = some(peerId) - gossipSub.peers[peerId.pretty] = newPubSubPeer(conn.peerInfo, GossipSubCodec) - gossipSub.peers[peerId.pretty].handler = handler - gossipSub.fanout[topic1].incl(peerId.pretty) - gossipSub.fanout[topic2].incl(peerId.pretty) + let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) + conn.peerInfo = some(peerInfo) + gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) + gossipSub.peers[peerInfo.id].handler = handler + gossipSub.fanout[topic1].incl(peerInfo.id) + gossipSub.fanout[topic2].incl(peerInfo.id) check gossipSub.fanout[topic1].len == GossipSubD check gossipSub.fanout[topic2].len == GossipSubD @@ -616,11 +601,8 @@ when isMainModule and not defined(release): test "`getGossipPeers` - should gather up to degree D non intersecting peers": proc testRun(): Future[bool] {.async.} = - var peerInfo: PeerInfo - var seckey = some(PrivateKey.random(RSA)) - - peerInfo.peerId = some(PeerID.init(seckey.get())) - let gossipSub = newPubSub(TestGossipSub, peerInfo) + let gossipSub = newPubSub(TestGossipSub, + PeerInfo.init(PrivateKey.random(RSA))) proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = discard @@ -634,22 +616,22 @@ when isMainModule and not defined(release): gossipSub.gossipsub[topic] = initHashSet[string]() for i in 0..<30: let conn = newConnection(newBufferStream(writeHandler)) - let peerId = PeerID.init(PrivateKey.random(RSA)) - conn.peerInfo.peerId = some(peerId) - gossipSub.peers[peerId.pretty] = newPubSubPeer(conn.peerInfo, GossipSubCodec) - gossipSub.peers[peerId.pretty].handler = handler + let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) + conn.peerInfo = some(peerInfo) + gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) + gossipSub.peers[peerInfo.id].handler = handler if i mod 2 == 0: - gossipSub.fanout[topic].incl(peerId.pretty) + gossipSub.fanout[topic].incl(peerInfo.id) else: - gossipSub.mesh[topic].incl(peerId.pretty) + gossipSub.mesh[topic].incl(peerInfo.id) for i in 0..<15: let conn = newConnection(newBufferStream(writeHandler)) - let peerId = PeerID.init(PrivateKey.random(RSA)) - conn.peerInfo.peerId = some(peerId) - gossipSub.peers[peerId.pretty] = newPubSubPeer(conn.peerInfo, GossipSubCodec) - gossipSub.peers[peerId.pretty].handler = handler - gossipSub.gossipsub[topic].incl(peerId.pretty) + let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) + conn.peerInfo = some(peerInfo) + gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) + gossipSub.peers[peerInfo.id].handler = handler + gossipSub.gossipsub[topic].incl(peerInfo.id) check gossipSub.fanout[topic].len == 15 check gossipSub.fanout[topic].len == 15 @@ -668,12 +650,9 @@ when isMainModule and not defined(release): test "`getGossipPeers` - should not crash on missing topics in mesh": proc testRun(): Future[bool] {.async.} = - var peerInfo: PeerInfo - var seckey = some(PrivateKey.random(RSA)) - - peerInfo.peerId = some(PeerID.init(seckey.get())) - let gossipSub = newPubSub(TestGossipSub, peerInfo) - + let gossipSub = newPubSub(TestGossipSub, + PeerInfo.init(PrivateKey.random(RSA))) + proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = discard @@ -685,14 +664,14 @@ when isMainModule and not defined(release): gossipSub.gossipsub[topic] = initHashSet[string]() for i in 0..<30: let conn = newConnection(newBufferStream(writeHandler)) - let peerId = PeerID.init(PrivateKey.random(RSA)) - conn.peerInfo.peerId = some(peerId) - gossipSub.peers[peerId.pretty] = newPubSubPeer(conn.peerInfo, GossipSubCodec) - gossipSub.peers[peerId.pretty].handler = handler + let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) + conn.peerInfo = some(peerInfo) + gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) + gossipSub.peers[peerInfo.id].handler = handler if i mod 2 == 0: - gossipSub.fanout[topic].incl(peerId.pretty) + gossipSub.fanout[topic].incl(peerInfo.id) else: - gossipSub.gossipsub[topic].incl(peerId.pretty) + gossipSub.gossipsub[topic].incl(peerInfo.id) let peers = gossipSub.getGossipPeers() check peers.len == GossipSubD @@ -703,11 +682,8 @@ when isMainModule and not defined(release): test "`getGossipPeers` - should not crash on missing topics in gossip": proc testRun(): Future[bool] {.async.} = - var peerInfo: PeerInfo - var seckey = some(PrivateKey.random(RSA)) - - peerInfo.peerId = some(PeerID.init(seckey.get())) - let gossipSub = newPubSub(TestGossipSub, peerInfo) + let gossipSub = newPubSub(TestGossipSub, + PeerInfo.init(PrivateKey.random(RSA))) proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = discard @@ -720,14 +696,14 @@ when isMainModule and not defined(release): gossipSub.gossipsub[topic] = initHashSet[string]() for i in 0..<30: let conn = newConnection(newBufferStream(writeHandler)) - let peerId = PeerID.init(PrivateKey.random(RSA)) - conn.peerInfo.peerId = some(peerId) - gossipSub.peers[peerId.pretty] = newPubSubPeer(conn.peerInfo, GossipSubCodec) - gossipSub.peers[peerId.pretty].handler = handler + let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) + conn.peerInfo = some(peerInfo) + gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) + gossipSub.peers[peerInfo.id].handler = handler if i mod 2 == 0: - gossipSub.mesh[topic].incl(peerId.pretty) + gossipSub.mesh[topic].incl(peerInfo.id) else: - gossipSub.gossipsub[topic].incl(peerId.pretty) + gossipSub.gossipsub[topic].incl(peerInfo.id) let peers = gossipSub.getGossipPeers() check peers.len == GossipSubD @@ -738,11 +714,8 @@ when isMainModule and not defined(release): test "`getGossipPeers` - should not crash on missing topics in gossip": proc testRun(): Future[bool] {.async.} = - var peerInfo: PeerInfo - var seckey = some(PrivateKey.random(RSA)) - - peerInfo.peerId = some(PeerID.init(seckey.get())) - let gossipSub = newPubSub(TestGossipSub, peerInfo) + let gossipSub = newPubSub(TestGossipSub, + PeerInfo.init(PrivateKey.random(RSA))) proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = discard @@ -755,14 +728,14 @@ when isMainModule and not defined(release): gossipSub.fanout[topic] = initHashSet[string]() for i in 0..<30: let conn = newConnection(newBufferStream(writeHandler)) - let peerId = PeerID.init(PrivateKey.random(RSA)) - conn.peerInfo.peerId = some(peerId) - gossipSub.peers[peerId.pretty] = newPubSubPeer(conn.peerInfo, GossipSubCodec) - gossipSub.peers[peerId.pretty].handler = handler + let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) + conn.peerInfo = some(peerInfo) + gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) + gossipSub.peers[peerInfo.id].handler = handler if i mod 2 == 0: - gossipSub.mesh[topic].incl(peerId.pretty) + gossipSub.mesh[topic].incl(peerInfo.id) else: - gossipSub.fanout[topic].incl(peerId.pretty) + gossipSub.fanout[topic].incl(peerInfo.id) let peers = gossipSub.getGossipPeers() check peers.len == 0 diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 022d30e..26c1db3 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -102,7 +102,7 @@ method handleConn*(p: PubSub, ## that we're interested in ## - if conn.peerInfo.peerId.isNone: + if conn.peerInfo.isNone: trace "no valid PeerId for peer" await conn.close() return @@ -111,7 +111,7 @@ method handleConn*(p: PubSub, # call floodsub rpc handler await p.rpcHandler(peer, msgs) - let peer = p.getPeer(conn.peerInfo, proto) + let peer = p.getPeer(conn.peerInfo.get(), proto) let topics = toSeq(p.topics.keys) if topics.len > 0: await p.sendSubs(peer, topics, true) @@ -123,8 +123,8 @@ method handleConn*(p: PubSub, method subscribeToPeer*(p: PubSub, conn: Connection) {.base, async, gcsafe.} = - var peer = p.getPeer(conn.peerInfo, p.codec) - trace "setting connection for peer", peerId = conn.peerInfo.id + var peer = p.getPeer(conn.peerInfo.get(), p.codec) + trace "setting connection for peer", peerId = conn.peerInfo.get().id if not peer.isConnected: peer.conn = conn @@ -133,9 +133,8 @@ method subscribeToPeer*(p: PubSub, .addCallback( proc(udata: pointer = nil) {.gcsafe.} = trace "connection closed, cleaning up peer", - peer = conn.peerInfo.id + peer = conn.peerInfo.get().id - # TODO: figureout how to handle properly without dicarding asyncCheck p.cleanUpHelper(peer) ) diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 5ea7b77..ef4dfe7 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -62,7 +62,7 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async, gcsafe.} = await p.handler(p, @[msg]) p.recvdRpcCache.put($hexData.hash) except CatchableError as exc: - error "an exception occured while processing pubsub rpc requests", exc = exc.msg + error "exception occured", exc = exc.msg finally: trace "exiting pubsub peer read loop", peer = p.id @@ -105,7 +105,7 @@ proc sendMsg*(p: PubSubPeer, peerId: PeerID, topic: string, data: seq[byte]): Future[void] {.gcsafe.} = - p.send(@[RPCMsg(messages: @[newMessage(p.peerInfo.peerId.get(), data, topic)])]) + p.send(@[RPCMsg(messages: @[newMessage(p.peerInfo, data, topic)])]) proc sendGraft*(p: PubSubPeer, topics: seq[string]) {.async, gcsafe.} = for topic in topics: @@ -124,4 +124,4 @@ proc newPubSubPeer*(peerInfo: PeerInfo, result.peerInfo = peerInfo result.sentRpcCache = newTimedCache[string](2.minutes) result.recvdRpcCache = newTimedCache[string](2.minutes) - result.onConnect = newAsyncEvent() \ No newline at end of file + result.onConnect = newAsyncEvent() diff --git a/libp2p/protocols/pubsub/rpc/message.nim b/libp2p/protocols/pubsub/rpc/message.nim index 55eae6a..dab7cf3 100644 --- a/libp2p/protocols/pubsub/rpc/message.nim +++ b/libp2p/protocols/pubsub/rpc/message.nim @@ -12,33 +12,32 @@ import chronicles import nimcrypto/sysrand import messages, protobuf, ../../../peer, + ../../../peerinfo, ../../../crypto/crypto, ../../../protobuf/minprotobuf logScope: topic = "PubSubMessage" +const PubSubPrefix = "libp2p-pubsub:" + proc msgId*(m: Message): string = m.seqno.toHex() & PeerID.init(m.fromPeer).pretty proc fromPeerId*(m: Message): PeerId = PeerID.init(m.fromPeer) -proc sign*(peerId: PeerID, msg: Message): Message {.gcsafe.} = +proc sign*(p: PeerInfo, msg: Message): Message {.gcsafe.} = var buff = initProtoBuffer() encodeMessage(msg, buff) - # NOTE: leave as is, moving out would imply making this .threadsafe., etc... - let prefix = cast[seq[byte]]("libp2p-pubsub:") + let prefix = cast[seq[byte]](PubSubPrefix) if buff.buffer.len > 0: result = msg - if peerId.privateKey.isSome: - result.signature = peerId. - privateKey. - get(). - sign(prefix & buff.buffer). - getBytes() + result.signature = p.privateKey. + sign(prefix & buff.buffer). + getBytes() -proc verify*(peerId: PeerID, m: Message): bool = +proc verify*(p: PeerInfo, m: Message): bool = if m.signature.len > 0 and m.key.len > 0: var msg = m msg.signature = @[] @@ -52,22 +51,19 @@ proc verify*(peerId: PeerID, m: Message): bool = if remote.init(m.signature) and key.init(m.key): result = remote.verify(buff.buffer, key) -proc newMessage*(peerId: PeerID, +proc newMessage*(p: PeerInfo, data: seq[byte], name: string, sign: bool = true): Message {.gcsafe.} = var seqno: seq[byte] = newSeq[byte](20) - if randomBytes(addr seqno[0], 20) > 0: - var key: seq[byte] = @[] + if p.publicKey.isSome and randomBytes(addr seqno[0], 20) > 0: + var key: seq[byte] = p.publicKey.get().getBytes() - if peerId.publicKey.isSome: - key = peerId.publicKey.get().getBytes() - - result = Message(fromPeer: peerId.getBytes(), + result = Message(fromPeer: p.peerId.getBytes(), data: data, seqno: seqno, topicIDs: @[name]) if sign: - result = sign(peerId, result) + result = p.sign(result) result.key = key diff --git a/libp2p/protocols/pubsub/timedcache.nim b/libp2p/protocols/pubsub/timedcache.nim index a0bb090..c7f78c1 100644 --- a/libp2p/protocols/pubsub/timedcache.nim +++ b/libp2p/protocols/pubsub/timedcache.nim @@ -40,7 +40,6 @@ proc put*[V](t: TimedCache[V], trace "adding entry to timed cache", key = key t.cache[key] = TimedEntry[V](val: val, handler: handler) - # TODO: addTimer with param Duration is missing from chronos, needs to be added addTimer( timeout, proc (arg: pointer = nil) {.gcsafe.} = diff --git a/libp2p/protocols/secure/secio.nim b/libp2p/protocols/secure/secio.nim index 59ff745..b635c82 100644 --- a/libp2p/protocols/secure/secio.nim +++ b/libp2p/protocols/secure/secio.nim @@ -11,6 +11,7 @@ import chronos, chronicles import nimcrypto/[sysrand, hmac, sha2, sha, hash, rijndael, twofish, bcmode] import secure, ../../connection, + ../../peerinfo, ../../stream/lpstream, ../../crypto/crypto, ../../crypto/ecnist, @@ -223,7 +224,7 @@ proc newSecureConnection*(conn: Connection, cipher: string, secrets: Secret, order: int, - peerId: PeerID): SecureConnection = + remotePubKey: PublicKey): SecureConnection = ## Create new secure connection, using specified hash algorithm ``hash``, ## cipher algorithm ``cipher``, stretched keys ``secrets`` and order ## ``order``. @@ -248,7 +249,7 @@ proc newSecureConnection*(conn: Connection, result.readerCoder.init(cipher, secrets.keyOpenArray(i1), secrets.ivOpenArray(i1)) - result.peerInfo.peerId = some(peerId) + result.peerInfo = some(PeerInfo.init(remotePubKey)) proc transactMessage(conn: Connection, msg: seq[byte]): Future[seq[byte]] {.async.} = @@ -396,7 +397,7 @@ proc handshake*(s: Secio, conn: Connection): Future[SecureConnection] {.async.} # Perform Nonce exchange over encrypted channel. - result = newSecureConnection(conn, hash, cipher, keys, order, remotePeerId) + result = newSecureConnection(conn, hash, cipher, keys, order, remotePubkey) await result.writeMessage(remoteNonce) var res = await result.readMessage() @@ -438,7 +439,7 @@ proc handleConn(s: Secio, conn: Connection): Future[Connection] {.async, gcsafe. if not sconn.closed: asyncCheck sconn.close() ) - secured.peerInfo.peerId = sconn.peerInfo.peerId + secured.peerInfo = some(PeerInfo.init(sconn.peerInfo.get().publicKey.get())) result = secured method init(s: Secio) {.gcsafe.} = diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 3e33664..d575ad9 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -65,13 +65,15 @@ proc secure(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} = proc identify(s: Switch, conn: Connection): Future[PeerInfo] {.async, gcsafe.} = ## identify the connection - result = conn.peerInfo + if conn.peerInfo.isSome: + result = conn.peerInfo.get() + try: if (await s.ms.select(conn, s.identity.codec)): let info = await s.identity.identify(conn, conn.peerInfo) if info.pubKey.isSome: - result.peerId = some(PeerID.init(info.pubKey.get())) # we might not have a peerId at all + result = PeerInfo.init(info.pubKey.get()) trace "identify: identified remote peer", peer = result.id if info.addrs.len > 0: @@ -111,37 +113,33 @@ proc mux(s: Switch, conn: Connection): Future[void] {.async, gcsafe.} = handlerFut.addCallback( proc(udata: pointer = nil) {.gcsafe.} = trace "muxer handler completed for peer", - peer = conn.peerInfo.id + peer = conn.peerInfo.get().id ) # do identify first, so that we have a # PeerInfo in case we didn't before - conn.peerInfo = await s.identify(stream) + conn.peerInfo = some((await s.identify(stream))) await stream.close() # close identify stream trace "connection's peerInfo", peerInfo = conn.peerInfo # store it in muxed connections if we have a peer for it - # TODO: We should make sure that this are cleaned up properly - # on exit even if there is no peer for it. This shouldn't - # happen once secio is in place, but still something to keep - # in mind - if conn.peerInfo.peerId.isSome: - trace "adding muxer for peer", peer = conn.peerInfo.id - s.muxed[conn.peerInfo.id] = muxer + if conn.peerInfo.isSome: + trace "adding muxer for peer", peer = conn.peerInfo.get().id + s.muxed[conn.peerInfo.get().id] = muxer proc cleanupConn(s: Switch, conn: Connection) {.async, gcsafe.} = - if conn.peerInfo.peerId.isSome: - let id = conn.peerInfo.id - trace "cleaning up connection for peer", peerId = id - if id in s.muxed: - await s.muxed[id].close() - s.muxed.del(id) + # if conn.peerInfo.peerId.isSome: + let id = conn.peerInfo.get().id + trace "cleaning up connection for peer", peerId = id + if id in s.muxed: + await s.muxed[id].close() + s.muxed.del(id) - if id in s.connections: - await s.connections[id].close() - s.connections.del(id) + if id in s.connections: + await s.connections[id].close() + s.connections.del(id) proc getMuxedStream(s: Switch, peerInfo: PeerInfo): Future[Option[Connection]] {.async, gcsafe.} = # if there is a muxer for the connection @@ -157,13 +155,12 @@ proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, g result = conn # don't mux/secure twise - if conn.peerInfo.peerId.isSome and - conn.peerInfo.id in s.muxed: + if conn.peerInfo.get().id in s.muxed: return result = await s.secure(result) # secure the connection await s.mux(result) # mux it if possible - s.connections[conn.peerInfo.id] = result + s.connections[conn.peerInfo.get().id] = result proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} = trace "upgrading incoming connection" @@ -205,7 +202,7 @@ proc dial*(s: Switch, trace "dialing address", address = $a result = await t.dial(a) # make sure to assign the peer to the connection - result.peerInfo = peer + result.peerInfo = some(peer) result = await s.upgradeOutgoing(result) result.closeEvent.wait().addCallback( proc(udata: pointer) = @@ -325,7 +322,7 @@ proc newSwitch*(peerInfo: PeerInfo, val.muxerHandler = proc(muxer: Muxer) {.async, gcsafe.} = trace "got new muxer" let stream = await muxer.newStream() - muxer.connection.peerInfo = await s.identify(stream) + muxer.connection.peerInfo = some((await s.identify(stream))) await stream.close() for k in secureManagers.keys: diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index d9d146b..aee9130 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -19,10 +19,7 @@ import utils, ../../libp2p/[switch, protocols/pubsub/gossipsub] proc createGossipSub(): GossipSub = - var peerInfo: PeerInfo - var seckey = some(PrivateKey.random(RSA)) - - peerInfo.peerId = some(PeerID.init(seckey.get())) + var peerInfo = PeerInfo.init(PrivateKey.random(RSA)) result = newPubSub(GossipSub, peerInfo) suite "GossipSub": @@ -36,11 +33,11 @@ suite "GossipSub": var buf1 = newBufferStream() var conn1 = newConnection(buf1) - conn1.peerInfo = gossip1.peerInfo + conn1.peerInfo = some(gossip1.peerInfo) var buf2 = newBufferStream() var conn2 = newConnection(buf2) - conn2.peerInfo = gossip2.peerInfo + conn2.peerInfo = some(gossip2.peerInfo) buf1 = buf1 | buf2 | buf1 @@ -52,7 +49,7 @@ suite "GossipSub": check: "foobar" in gossip2.gossipsub - gossip1.peerInfo.peerId.get().pretty in gossip2.gossipsub["foobar"] + gossip1.peerInfo.id in gossip2.gossipsub["foobar"] result = true @@ -83,7 +80,7 @@ suite "GossipSub": check: "foobar" in gossip2.topics "foobar" in gossip1.gossipsub - gossip2.peerInfo.peerId.get().pretty in gossip1.gossipsub["foobar"] + gossip2.peerInfo.id in gossip1.gossipsub["foobar"] await allFutures(nodes.mapIt(it.stop())) await allFutures(awaitters) @@ -103,11 +100,11 @@ suite "GossipSub": var buf1 = newBufferStream() var conn1 = newConnection(buf1) - conn1.peerInfo = gossip1.peerInfo + conn1.peerInfo = some(gossip1.peerInfo) var buf2 = newBufferStream() var conn2 = newConnection(buf2) - conn2.peerInfo = gossip2.peerInfo + conn2.peerInfo = some(gossip2.peerInfo) buf1 = buf1 | buf2 | buf1 @@ -131,8 +128,8 @@ suite "GossipSub": # TODO: in a real setting, we would be checking for the peerId from # gossip1 in gossip2 and vice versa, but since we're doing some mockery # with connection piping and such, this is fine - do not change! - gossip1.peerInfo.peerId.get().pretty in gossip1.gossipsub["foobar"] - gossip2.peerInfo.peerId.get().pretty in gossip2.gossipsub["foobar"] + gossip1.peerInfo.id in gossip1.gossipsub["foobar"] + gossip2.peerInfo.id in gossip2.gossipsub["foobar"] result = true @@ -170,8 +167,8 @@ suite "GossipSub": "foobar" in gossip1.gossipsub "foobar" in gossip2.gossipsub - gossip1.peerInfo.peerId.get().pretty in gossip2.gossipsub["foobar"] - gossip2.peerInfo.peerId.get().pretty in gossip1.gossipsub["foobar"] + gossip1.peerInfo.id in gossip2.gossipsub["foobar"] + gossip2.peerInfo.id in gossip1.gossipsub["foobar"] await allFutures(nodes.mapIt(it.stop())) await allFutures(awaitters) @@ -396,9 +393,9 @@ suite "GossipSub": closureScope: var dialerNode = dialer handler = proc(topic: string, data: seq[byte]) {.async, gcsafe, closure.} = - if dialerNode.peerInfo.peerId.get().pretty notin seen: - seen[dialerNode.peerInfo.peerId.get().pretty] = 0 - seen[dialerNode.peerInfo.peerId.get().pretty].inc + if dialerNode.peerInfo.id notin seen: + seen[dialerNode.peerInfo.id] = 0 + seen[dialerNode.peerInfo.id].inc check topic == "foobar" await dialer.subscribe("foobar", handler) @@ -409,7 +406,7 @@ suite "GossipSub": await nodes[0].publish("foobar", cast[seq[byte]]("from node " & - nodes[1].peerInfo.peerId.get().pretty)) + nodes[1].peerInfo.id)) await sleepAsync(1000.millis) await allFutures(nodes.mapIt(it.stop())) diff --git a/tests/pubsub/utils.nim b/tests/pubsub/utils.nim index 951e07b..3b7ef9f 100644 --- a/tests/pubsub/utils.nim +++ b/tests/pubsub/utils.nim @@ -25,14 +25,11 @@ proc createNode*(privKey: Option[PrivateKey] = none(PrivateKey), address: string = "/ip4/127.0.0.1/tcp/0", triggerSelf: bool = false, gossip: bool = false): Switch = - var peerInfo: PeerInfo var seckey = privKey if privKey.isNone: seckey = some(PrivateKey.random(RSA)) - peerInfo.peerId = some(PeerID.init(seckey.get())) - peerInfo.addrs.add(Multiaddress.init(address)) - + var peerInfo = PeerInfo.init(seckey.get(), @[Multiaddress.init(address)]) let mplexProvider = newMuxerProvider(createMplex, MplexCodec) let transports = @[Transport(newTransport(TcpTransport))] let muxers = [(MplexCodec, mplexProvider)].toTable() diff --git a/tests/testidentify.nim b/tests/testidentify.nim index 9c5c085..f2344e1 100644 --- a/tests/testidentify.nim +++ b/tests/testidentify.nim @@ -17,15 +17,12 @@ suite "Identify": test "handle identify message": proc testHandle(): Future[bool] {.async.} = let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") - - let remoteSeckey = PrivateKey.random(RSA) - var remotePeerInfo: PeerInfo + let remoteSecKey = PrivateKey.random(RSA) + let remotePeerInfo = PeerInfo.init(remoteSecKey, + @[ma], + @["/test/proto1/1.0.0", + "/test/proto2/1.0.0"]) var serverFut: Future[void] - remotePeerInfo.peerId = some(PeerID.init(remoteSeckey)) - remotePeerInfo.addrs.add(ma) - remotePeerInfo.protocols.add("/test/proto1/1.0.0") - remotePeerInfo.protocols.add("/test/proto2/1.0.0") - let identifyProto1 = newIdentify(remotePeerInfo) let msListen = newMultistream() @@ -40,23 +37,18 @@ suite "Identify": let transport2: TcpTransport = newTransport(TcpTransport) let conn = await transport2.dial(transport1.ma) - let seckey = PrivateKey.random(RSA) - var peerInfo: PeerInfo - peerInfo.peerId = some(PeerID.init(seckey)) - peerInfo.addrs.add(ma) - + var peerInfo = PeerInfo.init(PrivateKey.random(RSA), @[ma]) let identifyProto2 = newIdentify(peerInfo) - let res = await msDial.select(conn, IdentifyCodec) - let id = await identifyProto2.identify(conn, remotePeerInfo) + discard await msDial.select(conn, IdentifyCodec) + let id = await identifyProto2.identify(conn, some(remotePeerInfo)) - check id.pubKey.get() == remoteSeckey.getKey() + check id.pubKey.get() == remoteSecKey.getKey() check id.addrs[0] == ma check id.protoVersion.get() == ProtoVersion # check id.agentVersion.get() == AgentVersion check id.protos == @["/test/proto1/1.0.0", "/test/proto2/1.0.0"] await conn.close() - await transport1.close() await serverFut result = true @@ -67,12 +59,7 @@ suite "Identify": test "handle failed identify": proc testHandleError() {.async.} = let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") - - let remoteSeckey = PrivateKey.random(RSA) - var remotePeerInfo: PeerInfo - remotePeerInfo.peerId = some(PeerID.init(remoteSeckey)) - remotePeerInfo.addrs.add(ma) - + var remotePeerInfo = PeerInfo.init(PrivateKey.random(RSA), @[ma]) let identifyProto1 = newIdentify(remotePeerInfo) let msListen = newMultistream() @@ -87,19 +74,10 @@ suite "Identify": let transport2: TcpTransport = newTransport(TcpTransport) let conn = await transport2.dial(transport1.ma) - let seckey = PrivateKey.random(RSA) - var localPeerInfo: PeerInfo - localPeerInfo.peerId = some(PeerID.init(seckey)) - localPeerInfo.addrs.add(ma) - + var localPeerInfo = PeerInfo.init(PrivateKey.random(RSA), @[ma]) let identifyProto2 = newIdentify(localPeerInfo) - let res = await msDial.select(conn, IdentifyCodec) - - let wrongSec = PrivateKey.random(RSA) - var wrongRemotePeer: PeerInfo - wrongRemotePeer.peerId = some(PeerID.init(wrongSec)) - - let id = await identifyProto2.identify(conn, wrongRemotePeer) + discard await msDial.select(conn, IdentifyCodec) + discard await identifyProto2.identify(conn, some(PeerInfo.init(PrivateKey.random(RSA)))) await conn.close() expect IdentityNoMatchError: diff --git a/tests/testinterop.nim b/tests/testinterop.nim index baa3257..a1cd7dc 100644 --- a/tests/testinterop.nim +++ b/tests/testinterop.nim @@ -66,14 +66,11 @@ proc createNode*(privKey: Option[PrivateKey] = none(PrivateKey), address: string = "/ip4/127.0.0.1/tcp/0", triggerSelf: bool = false, gossip: bool = false): Switch = - var peerInfo: NativePeerInfo var seckey = privKey if privKey.isNone: seckey = some(PrivateKey.random(RSA)) - peerInfo.peerId = some(PeerID.init(seckey.get())) - peerInfo.addrs.add(Multiaddress.init(address)) - + var peerInfo = NativePeerInfo.init(seckey.get(), @[Multiaddress.init(address)]) proc createMplex(conn: Connection): Muxer = newMplex(conn) let mplexProvider = newMuxerProvider(createMplex, MplexCodec) let transports = @[Transport(newTransport(TcpTransport))] @@ -115,11 +112,10 @@ proc testPubSubDaemonPublish(gossip: bool = false): Future[bool] {.async.} = check smsg == pubsubData handlerFuture.complete(true) - await nativeNode.subscribeToPeer(NativePeerInfo(peerId: some(daemonPeer.peer), - addrs: daemonPeer.addresses)) - + await nativeNode.subscribeToPeer(NativePeerInfo.init(daemonPeer.peer, + daemonPeer.addresses)) await sleepAsync(1.seconds) - await daemonNode.connect(nativePeer.peerId.get(), nativePeer.addrs) + await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) proc pubsubHandler(api: DaemonAPI, ticket: PubsubTicket, @@ -152,11 +148,11 @@ proc testPubSubNodePublish(gossip: bool = false): Future[bool] {.async.} = let nativePeer = nativeNode.peerInfo var handlerFuture = newFuture[bool]() - await nativeNode.subscribeToPeer(NativePeerInfo(peerId: some(daemonPeer.peer), - addrs: daemonPeer.addresses)) + await nativeNode.subscribeToPeer(NativePeerInfo.init(daemonPeer.peer, + daemonPeer.addresses)) await sleepAsync(1.seconds) - await daemonNode.connect(nativePeer.peerId.get(), nativePeer.addrs) + await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) proc pubsubHandler(api: DaemonAPI, ticket: PubsubTicket, @@ -196,9 +192,9 @@ suite "Interop": testFuture.complete() await daemonNode.addHandler(protos, daemonHandler) - let conn = await nativeNode.dial(NativePeerInfo(peerId: some(daemonPeer.peer), - addrs: daemonPeer.addresses), - protos[0]) + let conn = await nativeNode.dial(NativePeerInfo.init(daemonPeer.peer, + daemonPeer.addresses), + protos[0]) await conn.writeLp("test 1") check "test 2" == cast[string]((await conn.readLp())) await sleepAsync(10.millis) @@ -233,9 +229,9 @@ suite "Interop": testFuture.complete(line) await daemonNode.addHandler(protos, daemonHandler) - let conn = await nativeNode.dial(NativePeerInfo(peerId: some(daemonPeer.peer), - addrs: daemonPeer.addresses), - protos[0]) + let conn = await nativeNode.dial(NativePeerInfo.init(daemonPeer.peer, + daemonPeer.addresses), + protos[0]) await conn.writeLp(test & "\r\n") result = test == (await wait(testFuture, 10.secs)) await nativeNode.stop() @@ -269,8 +265,8 @@ suite "Interop": let nativePeer = nativeNode.peerInfo let daemonNode = await newDaemonApi() - await daemonNode.connect(nativePeer.peerId.get(), nativePeer.addrs) - var stream = await daemonNode.openStream(nativePeer.peerId.get(), protos) + await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) + var stream = await daemonNode.openStream(nativePeer.peerId, protos) discard await stream.transp.writeLp(test) result = test == (await wait(testFuture, 10.secs)) @@ -308,8 +304,8 @@ suite "Interop": let nativePeer = nativeNode.peerInfo let daemonNode = await newDaemonApi() - await daemonNode.connect(nativePeer.peerId.get(), nativePeer.addrs) - var stream = await daemonNode.openStream(nativePeer.peerId.get(), protos) + await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) + var stream = await daemonNode.openStream(nativePeer.peerId, protos) asyncDiscard stream.transp.writeLp("test 1") check "test 2" == cast[string](await stream.transp.readLp()) @@ -356,8 +352,8 @@ suite "Interop": let nativePeer = nativeNode.peerInfo let daemonNode = await newDaemonApi() - await daemonNode.connect(nativePeer.peerId.get(), nativePeer.addrs) - var stream = await daemonNode.openStream(nativePeer.peerId.get(), protos) + await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) + var stream = await daemonNode.openStream(nativePeer.peerId, protos) while count < 10: discard await stream.transp.writeLp(test) diff --git a/tests/testmultistream.nim b/tests/testmultistream.nim index 1c59466..77bdc96 100644 --- a/tests/testmultistream.nim +++ b/tests/testmultistream.nim @@ -168,9 +168,6 @@ suite "Multistream select": let ms = newMultistream() let conn = newConnection(newTestSelectStream()) - let seckey = PrivateKey.random(RSA) - var peerInfo: PeerInfo - peerInfo.peerId = some(PeerID.init(seckey)) var protocol: LPProtocol = new LPProtocol proc testHandler(conn: Connection, proto: string): @@ -197,13 +194,9 @@ suite "Multistream select": check strProto == "\x26/test/proto1/1.0.0\n/test/proto2/1.0.0\n" await conn.close() - let seckey = PrivateKey.random(RSA) - var peerInfo: PeerInfo - peerInfo.peerId = some(PeerID.init(seckey)) + proc testHandler(conn: Connection, proto: string): Future[void] + {.async, gcsafe.} = discard var protocol: LPProtocol = new LPProtocol - proc testHandler(conn: Connection, - proto: string): - Future[void] {.async, gcsafe.} = discard protocol.handler = testHandler ms.addHandler("/test/proto1/1.0.0", protocol) ms.addHandler("/test/proto2/1.0.0", protocol) @@ -224,9 +217,6 @@ suite "Multistream select": check cast[string](msg) == Na await conn.close() - let seckey = PrivateKey.random(RSA) - var peerInfo: PeerInfo - peerInfo.peerId = some(PeerID.init(seckey)) var protocol: LPProtocol = new LPProtocol proc testHandler(conn: Connection, proto: string): @@ -244,9 +234,6 @@ suite "Multistream select": proc endToEnd(): Future[bool] {.async.} = let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") - let seckey = PrivateKey.random(RSA) - var peerInfo: PeerInfo - peerInfo.peerId = some(PeerID.init(seckey)) var protocol: LPProtocol = new LPProtocol proc testHandler(conn: Connection, proto: string): @@ -283,9 +270,6 @@ suite "Multistream select": let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") let msListen = newMultistream() - let seckey = PrivateKey.random(RSA) - var peerInfo: PeerInfo - peerInfo.peerId = some(PeerID.init(seckey)) var protocol: LPProtocol = new LPProtocol protocol.handler = proc(conn: Connection, proto: string) {.async, gcsafe.} = await conn.close() @@ -317,9 +301,6 @@ suite "Multistream select": proc endToEnd(): Future[bool] {.async.} = let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") - let seckey = PrivateKey.random(RSA) - var peerInfo: PeerInfo - peerInfo.peerId = some(PeerID.init(seckey)) var protocol: LPProtocol = new LPProtocol proc testHandler(conn: Connection, proto: string): @@ -356,9 +337,6 @@ suite "Multistream select": proc endToEnd(): Future[bool] {.async.} = let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") - let seckey = PrivateKey.random(RSA) - var peerInfo: PeerInfo - peerInfo.peerId = some(PeerID.init(seckey)) var protocol: LPProtocol = new LPProtocol proc testHandler(conn: Connection, proto: string): diff --git a/tests/testnative.nim b/tests/testnative.nim index 552f155..503774d 100644 --- a/tests/testnative.nim +++ b/tests/testnative.nim @@ -7,6 +7,7 @@ import testtransport, testbufferstream, testidentify, testswitch, + testpeerinfo, pubsub/testpubsub, # TODO: placing this before pubsub tests, # breaks some flood and gossip tests - no idea why diff --git a/tests/testpeerinfo.nim b/tests/testpeerinfo.nim new file mode 100644 index 0000000..bf48139 --- /dev/null +++ b/tests/testpeerinfo.nim @@ -0,0 +1,53 @@ + +import unittest, options +import ../libp2p/crypto/crypto, + ../libp2p/peerinfo, + ../libp2p/peer + +suite "PeerInfo": + test "Should init with private key": + let seckey = PrivateKey.random(RSA) + var peerInfo = PeerInfo.init(seckey) + var peerId = PeerID.init(seckey) + + check peerId == peerInfo.peerId + check seckey == peerInfo.privateKey + check seckey.getKey == peerInfo.publicKey.get() + + test "Should init with public key": + let seckey = PrivateKey.random(RSA) + var peerInfo = PeerInfo.init(seckey.getKey()) + var peerId = PeerID.init(seckey.getKey()) + + check peerId == peerInfo.peerId + check seckey.getKey == peerInfo.publicKey.get() + + test "Should init from PeerId with public key": + let seckey = PrivateKey.random(Ed25519) + var peerInfo = PeerInfo.init(PeerID.init(seckey.getKey())) + var peerId = PeerID.init(seckey.getKey()) + + check peerId == peerInfo.peerId + check seckey.getKey == peerInfo.publicKey.get() + + test "Should return none on missing public key": + let peerInfo = PeerInfo.init(PeerID.init(PrivateKey.random(RSA))) + check peerInfo.publicKey.isNone + + test "Should allow assigning public key": + let key = PrivateKey.random(RSA) + + let peerInfo = PeerInfo.init(PeerID.init(key)) + peerInfo.publicKey = key.getKey() + check peerInfo.publicKey.get() == key.getKey() + + test "Should throw on invalid public key assignement": + proc throwsOnInvalidPubKey() = + let validKey = PrivateKey.random(RSA) + let invalidKey = PrivateKey.random(RSA) + + let peerInfo = PeerInfo.init(PeerID.init(validKey)) + peerInfo.publicKey = invalidKey.getKey() + + expect InvalidPublicKeyException: + throwsOnInvalidPubKey() diff --git a/tests/testswitch.nim b/tests/testswitch.nim index 0f3d0b1..156a2f5 100644 --- a/tests/testswitch.nim +++ b/tests/testswitch.nim @@ -38,8 +38,7 @@ suite "Switch": test "e2e use switch": proc createSwitch(ma: MultiAddress): (Switch, PeerInfo) {.gcsafe.}= let seckey = PrivateKey.random(RSA) - var peerInfo: PeerInfo - peerInfo.peerId = some(PeerID.init(seckey)) + var peerInfo: PeerInfo = PeerInfo.init(PrivateKey.random(RSA)) peerInfo.addrs.add(ma) let identify = newIdentify(peerInfo) @@ -49,7 +48,7 @@ suite "Switch": let mplexProvider = newMuxerProvider(createMplex, MplexCodec) let transports = @[Transport(newTransport(TcpTransport))] let muxers = [(MplexCodec, mplexProvider)].toTable() - let secureManagers = [(SecioCodec, Secure(newSecio(seckey)))].toTable() + let secureManagers = [(SecioCodec, Secure(newSecio(peerInfo.privateKey)))].toTable() let switch = newSwitch(peerInfo, transports, identify,