Add peer lifetime feature for PeerInfo. (#77)

* Add peer lifetime feature for PeerInfo.
Refactor peerinfo to use openarrays instead of sequences.
Fix tests and examples to use arrays instead of sequences.

* Add access to lifetime Future[T] itself.
This commit is contained in:
Eugene Kabanov 2020-02-11 19:53:39 +02:00 committed by GitHub
parent 23712ecf3b
commit 540e79a430
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 80 additions and 44 deletions

View File

@ -68,7 +68,7 @@ proc dialPeer(p: ChatProto, address: string) {.async, gcsafe.} =
quit("invalid or incompelete peerId") quit("invalid or incompelete peerId")
var remotePeer = PeerInfo.init(parts[^1], var remotePeer = PeerInfo.init(parts[^1],
@[MultiAddress.init(address)]) [MultiAddress.init(address)])
echo &"dialing peer: {address}" echo &"dialing peer: {address}"
p.conn = await p.switch.dial(remotePeer, ChatCodec) p.conn = await p.switch.dial(remotePeer, ChatCodec)

View File

@ -8,6 +8,7 @@
## those terms. ## those terms.
import options import options
import chronos
import peer, multiaddress, crypto/crypto import peer, multiaddress, crypto/crypto
## A peer can be constructed in one of tree ways: ## A peer can be constructed in one of tree ways:
@ -27,53 +28,70 @@ type
peerId*: PeerID peerId*: PeerID
addrs*: seq[MultiAddress] addrs*: seq[MultiAddress]
protocols*: seq[string] protocols*: seq[string]
lifefut: Future[void]
case keyType*: KeyType: case keyType*: KeyType:
of HasPrivate: of HasPrivate:
privateKey*: PrivateKey privateKey*: PrivateKey
of HasPublic: of HasPublic:
key: Option[PublicKey] key: Option[PublicKey]
proc init*(p: typedesc[PeerInfo], template postInit(peerinfo: PeerInfo,
key: PrivateKey, addrs: openarray[MultiAddress],
addrs: seq[MultiAddress] = @[], protocols: openarray[string]) =
protocols: seq[string] = @[]): PeerInfo {.inline.} = if len(addrs) > 0:
peerinfo.addrs = @addrs
if len(protocols) > 0:
peerinfo.protocols = @protocols
peerinfo.lifefut = newFuture[void]("libp2p.peerinfo.lifetime")
result = PeerInfo(keyType: HasPrivate, proc init*(p: typedesc[PeerInfo], key: PrivateKey,
peerId: PeerID.init(key), addrs: openarray[MultiAddress] = [],
privateKey: key, protocols: openarray[string] = []): PeerInfo {.inline.} =
addrs: addrs, result = PeerInfo(keyType: HasPrivate, peerId: PeerID.init(key),
protocols: protocols) privateKey: key)
result.postInit(addrs, protocols)
proc init*(p: typedesc[PeerInfo], proc init*(p: typedesc[PeerInfo], peerId: PeerID,
peerId: PeerID, addrs: openarray[MultiAddress] = [],
addrs: seq[MultiAddress] = @[], protocols: openarray[string] = []): PeerInfo {.inline.} =
protocols: seq[string] = @[]): PeerInfo {.inline.} = result = PeerInfo(keyType: HasPublic, peerId: peerId)
result.postInit(addrs, protocols)
PeerInfo(keyType: HasPublic, proc init*(p: typedesc[PeerInfo], peerId: string,
peerId: peerId, addrs: openarray[MultiAddress] = [],
addrs: addrs, protocols: openarray[string] = []): PeerInfo {.inline.} =
protocols: protocols) result = PeerInfo(keyType: HasPublic, peerId: PeerID.init(peerId))
result.postInit(addrs, protocols)
proc init*(p: typedesc[PeerInfo], proc init*(p: typedesc[PeerInfo], key: PublicKey,
peerId: string, addrs: openarray[MultiAddress] = [],
addrs: seq[MultiAddress] = @[], protocols: openarray[string] = []): PeerInfo {.inline.} =
protocols: seq[string] = @[]): PeerInfo {.inline.} = result = PeerInfo(keyType: HasPublic, peerId: PeerID.init(key),
key: some(key))
result.postInit(addrs, protocols)
PeerInfo(keyType: HasPublic, proc close*(p: PeerInfo) {.inline.} =
peerId: PeerID.init(peerId), p.lifefut.complete()
addrs: addrs,
protocols: protocols)
proc init*(p: typedesc[PeerInfo], proc join*(p: PeerInfo): Future[void] {.inline.} =
key: PublicKey, var retFuture = newFuture[void]()
addrs: seq[MultiAddress] = @[], proc continuation(udata: pointer) {.gcsafe.} =
protocols: seq[string] = @[]): PeerInfo {.inline.} = if not(retFuture.finished()):
retFuture.complete()
proc cancellation(udata: pointer) {.gcsafe.} =
p.lifefut.removeCallback(continuation)
if p.lifefut.finished:
retFuture.complete()
else:
p.lifefut.addCallback(continuation)
retFuture.cancelCallback = cancellation
return retFuture
PeerInfo(keyType: HasPublic, proc isClosed*(p: PeerInfo): bool {.inline.} =
peerId: PeerID.init(key), result = p.lifefut.finished()
key: some(key),
addrs: addrs, proc lifeFuture*(p: PeerInfo): Future[void] {.inline.} =
protocols: protocols) result = p.lifefut
proc publicKey*(p: PeerInfo): Option[PublicKey] {.inline.} = proc publicKey*(p: PeerInfo): Option[PublicKey] {.inline.} =
if p.keyType == HasPublic: if p.keyType == HasPublic:
@ -87,7 +105,7 @@ proc publicKey*(p: PeerInfo): Option[PublicKey] {.inline.} =
result = some(p.privateKey.getKey()) result = some(p.privateKey.getKey())
proc id*(p: PeerInfo): string {.inline.} = proc id*(p: PeerInfo): string {.inline.} =
p.peerId.pretty result = p.peerId.pretty()
proc `$`*(p: PeerInfo): string = proc `$`*(p: PeerInfo): string =
result.add("PeerID: ") result.add("PeerID: ")

View File

@ -18,7 +18,7 @@ proc newStandardSwitch*(privKey = none(PrivateKey),
let let
seckey = privKey.get(otherwise = PrivateKey.random(ECDSA)) seckey = privKey.get(otherwise = PrivateKey.random(ECDSA))
peerInfo = PeerInfo.init(seckey, @[address]) peerInfo = PeerInfo.init(seckey, [address])
mplexProvider = newMuxerProvider(createMplex, MplexCodec) mplexProvider = newMuxerProvider(createMplex, MplexCodec)
transports = @[Transport(newTransport(TcpTransport))] transports = @[Transport(newTransport(TcpTransport))]
muxers = {MplexCodec: mplexProvider}.toTable muxers = {MplexCodec: mplexProvider}.toTable

View File

@ -144,6 +144,10 @@ proc cleanupConn(s: Switch, conn: Connection) {.async, gcsafe.} =
await s.connections[id].close() await s.connections[id].close()
s.connections.del(id) s.connections.del(id)
# TODO: Investigate cleanupConn() always called twice for one peer.
if not(conn.peerInfo.isClosed()):
conn.peerInfo.close()
proc disconnect*(s: Switch, peer: PeerInfo) {.async, gcsafe.} = proc disconnect*(s: Switch, peer: PeerInfo) {.async, gcsafe.} =
let conn = s.connections.getOrDefault(peer.id) let conn = s.connections.getOrDefault(peer.id)
if not isNil(conn): if not isNil(conn):

View File

@ -19,9 +19,9 @@ suite "Identify":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
let remoteSecKey = PrivateKey.random(RSA) let remoteSecKey = PrivateKey.random(RSA)
let remotePeerInfo = PeerInfo.init(remoteSecKey, let remotePeerInfo = PeerInfo.init(remoteSecKey,
@[ma], [ma],
@["/test/proto1/1.0.0", ["/test/proto1/1.0.0",
"/test/proto2/1.0.0"]) "/test/proto2/1.0.0"])
var serverFut: Future[void] var serverFut: Future[void]
let identifyProto1 = newIdentify(remotePeerInfo) let identifyProto1 = newIdentify(remotePeerInfo)
let msListen = newMultistream() let msListen = newMultistream()
@ -37,7 +37,7 @@ suite "Identify":
let transport2: TcpTransport = newTransport(TcpTransport) let transport2: TcpTransport = newTransport(TcpTransport)
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.ma)
var peerInfo = PeerInfo.init(PrivateKey.random(RSA), @[ma]) var peerInfo = PeerInfo.init(PrivateKey.random(RSA), [ma])
let identifyProto2 = newIdentify(peerInfo) let identifyProto2 = newIdentify(peerInfo)
discard await msDial.select(conn, IdentifyCodec) discard await msDial.select(conn, IdentifyCodec)
let id = await identifyProto2.identify(conn, remotePeerInfo) let id = await identifyProto2.identify(conn, remotePeerInfo)
@ -59,7 +59,7 @@ suite "Identify":
test "handle failed identify": test "handle failed identify":
proc testHandleError() {.async.} = proc testHandleError() {.async.} =
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
var remotePeerInfo = PeerInfo.init(PrivateKey.random(RSA), @[ma]) var remotePeerInfo = PeerInfo.init(PrivateKey.random(RSA), [ma])
let identifyProto1 = newIdentify(remotePeerInfo) let identifyProto1 = newIdentify(remotePeerInfo)
let msListen = newMultistream() let msListen = newMultistream()
@ -74,7 +74,7 @@ suite "Identify":
let transport2: TcpTransport = newTransport(TcpTransport) let transport2: TcpTransport = newTransport(TcpTransport)
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.ma)
var localPeerInfo = PeerInfo.init(PrivateKey.random(RSA), @[ma]) var localPeerInfo = PeerInfo.init(PrivateKey.random(RSA), [ma])
let identifyProto2 = newIdentify(localPeerInfo) let identifyProto2 = newIdentify(localPeerInfo)
discard await msDial.select(conn, IdentifyCodec) discard await msDial.select(conn, IdentifyCodec)
discard await identifyProto2.identify(conn, PeerInfo.init(PrivateKey.random(RSA))) discard await identifyProto2.identify(conn, PeerInfo.init(PrivateKey.random(RSA)))

View File

@ -72,7 +72,7 @@ proc createNode*(privKey: Option[PrivateKey] = none(PrivateKey),
if privKey.isNone: if privKey.isNone:
seckey = some(PrivateKey.random(RSA)) seckey = some(PrivateKey.random(RSA))
var peerInfo = NativePeerInfo.init(seckey.get(), @[Multiaddress.init(address)]) var peerInfo = NativePeerInfo.init(seckey.get(), [Multiaddress.init(address)])
proc createMplex(conn: Connection): Muxer = newMplex(conn) proc createMplex(conn: Connection): Muxer = newMplex(conn)
let mplexProvider = newMuxerProvider(createMplex, MplexCodec) let mplexProvider = newMuxerProvider(createMplex, MplexCodec)
let transports = @[Transport(newTransport(TcpTransport))] let transports = @[Transport(newTransport(TcpTransport))]

View File

@ -1,5 +1,6 @@
import unittest, options import unittest, options
import chronos
import ../libp2p/crypto/crypto, import ../libp2p/crypto/crypto,
../libp2p/peerinfo, ../libp2p/peerinfo,
../libp2p/peer ../libp2p/peer
@ -51,3 +52,16 @@ suite "PeerInfo":
test "Should return some if pubkey is present in id": test "Should return some if pubkey is present in id":
let peerInfo = PeerInfo.init(PeerID.init(PrivateKey.random(Ed25519))) let peerInfo = PeerInfo.init(PeerID.init(PrivateKey.random(Ed25519)))
check peerInfo.publicKey.isSome check peerInfo.publicKey.isSome
test "join() and isClosed() test":
proc testJoin(): Future[bool] {.async, gcsafe.} =
let peerInfo = PeerInfo.init(PeerID.init(PrivateKey.random(Ed25519)))
check peerInfo.isClosed() == false
var joinFut = peerInfo.join()
check joinFut.finished() == false
peerInfo.close()
await wait(joinFut, 100.milliseconds)
check peerInfo.isClosed() == true
check (joinFut.finished() == true) and (joinFut.cancelled() == false)
result = true
check waitFor(testJoin()) == true