* fix: don't allow replacing pubkey

* fix: several small improvements

* removing pubkey setter

* improove error handling

* remove the use of Option[T] if not needed

* don't use optional

* fix-ci: temporarily pin p2pd to a working tag

* fix example to comply with latest changes

* bumping p2pd again to a higher version
This commit is contained in:
Dmitriy Ryajov 2019-12-10 14:50:35 -06:00 committed by GitHub
parent 77e866d29a
commit 293a219dbe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 393 additions and 389 deletions

View File

@ -31,7 +31,7 @@ install:
# install and build go-libp2p-daemon # install and build go-libp2p-daemon
- curl -O -L -s -S https://raw.githubusercontent.com/status-im/nimbus-build-system/master/scripts/build_p2pd.sh - curl -O -L -s -S https://raw.githubusercontent.com/status-im/nimbus-build-system/master/scripts/build_p2pd.sh
- bash build_p2pd.sh p2pdCache HEAD - bash build_p2pd.sh p2pdCache v0.2.1
script: script:
- nimble install -y --depsOnly - nimble install -y --depsOnly

View File

@ -73,21 +73,21 @@ This stack reflects the minimal requirements for the upcoming Eth2 implementatio
To run it, add nim-libp2p to your project's nimble file and spawn a node as follows: To run it, add nim-libp2p to your project's nimble file and spawn a node as follows:
```nim ```nim
import tables, options import tables
import chronos, chronicles import chronos
import ../libp2p/[switch, import ../libp2p/[switch,
multistream, multistream,
protocols/identify, protocols/identify,
connection, connection,
transports/transport, transports/transport,
transports/tcptransport, transports/tcptransport,
multiaddress, multiaddress,
peerinfo, peerinfo,
crypto/crypto, crypto/crypto,
peer, peer,
protocols/protocol, protocols/protocol,
muxers/muxer, muxers/muxer,
muxers/mplex/mplex, muxers/mplex/mplex,
muxers/mplex/types, muxers/mplex/types,
protocols/secure/secio, protocols/secure/secio,
protocols/secure/secure] protocols/secure/secure]
@ -99,9 +99,8 @@ type
method init(p: TestProto) {.gcsafe.} = method init(p: TestProto) {.gcsafe.} =
# handle incoming connections in closure # handle incoming connections in closure
proc handle(conn: Connection, proto: string) {.async, gcsafe.} = proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
let msg = cast[string](await conn.readLp()) echo "Got from remote - ", cast[string](await conn.readLp())
echo "Got from remote - ", cast[string](msg)
await conn.writeLp("Hello!") await conn.writeLp("Hello!")
await conn.close() await conn.close()
@ -118,9 +117,9 @@ proc createSwitch(ma: MultiAddress): (Switch, PeerInfo) =
let identify = newIdentify(peerInfo) # create the identify proto let identify = newIdentify(peerInfo) # create the identify proto
proc createMplex(conn: Connection): Muxer = proc createMplex(conn: Connection): Muxer =
# helper proc to create multiplexers, # helper proc to create multiplexers,
# use this to perform any custom setup up, # use this to perform any custom setup up,
# such as adjusting timeout or anything else # such as adjusting timeout or anything else
# that the muxer requires # that the muxer requires
result = newMplex(conn) result = newMplex(conn)
@ -130,17 +129,17 @@ proc createSwitch(ma: MultiAddress): (Switch, PeerInfo) =
let secureManagers = {SecioCodec: Secure(newSecio(seckey))}.toTable() # setup the secio and any other secure provider let secureManagers = {SecioCodec: Secure(newSecio(seckey))}.toTable() # setup the secio and any other secure provider
# create the switch # create the switch
let switch = newSwitch(peerInfo, let switch = newSwitch(peerInfo,
transports, transports,
identify, identify,
muxers, muxers,
secureManagers) secureManagers)
result = (switch, peerInfo) result = (switch, peerInfo)
proc main() {.async, gcsafe.} = proc main() {.async, gcsafe.} =
let ma1: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") let ma1: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
let ma2: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") let ma2: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
var peerInfo1, peerInfo2: PeerInfo var peerInfo1, peerInfo2: PeerInfo
var switch1, switch2: Switch var switch1, switch2: Switch
(switch1, peerInfo1) = createSwitch(ma1) # create node 1 (switch1, peerInfo1) = createSwitch(ma1) # create node 1
@ -155,9 +154,9 @@ proc main() {.async, gcsafe.} =
var switch2Fut = await switch2.start() # start second node var switch2Fut = await switch2.start() # start second node
let conn = await switch2.dial(switch1.peerInfo, TestCodec) # dial the first node let conn = await switch2.dial(switch1.peerInfo, TestCodec) # dial the first node
await conn.writeLp("Hello!") # writeLp send a lenght prefixed buffer over the wire await conn.writeLp("Hello!") # writeLp send a length prefixed buffer over the wire
let msg = cast[string](await conn.readLp()) # readLp reads lenght prefixed bytes and returns a buffer without the prefix # readLp reads length prefixed bytes and returns a buffer without the prefix
echo "Remote responded with - ", cast[string](msg) echo "Remote responded with - ", cast[string](await conn.readLp())
await allFutures(switch1.stop(), switch2.stop()) # close connections and shutdown all transports await allFutures(switch1.stop(), switch2.stop()) # close connections and shutdown all transports
await allFutures(switch1Fut & switch2Fut) # wait for all transports to shutdown await allFutures(switch1Fut & switch2Fut) # wait for all transports to shutdown

View File

@ -48,9 +48,10 @@ type
started: bool started: bool
proc id (p: ChatProto): string = proc id (p: ChatProto): string =
result = "unknown" if not isNil(p.conn.peerInfo):
if p.conn.peerInfo.peerId.isSome: $p.conn.peerInfo.peerId
result = $p.conn.peerInfo.peerId.get() else:
"unknown"
# forward declaration # forward declaration
proc readWriteLoop(p: ChatProto) {.async, gcsafe.} proc readWriteLoop(p: ChatProto) {.async, gcsafe.}
@ -66,9 +67,8 @@ proc dialPeer(p: ChatProto, address: string) {.async, gcsafe.} =
if parts.len == 11 and parts[^2] notin ["ipfs", "p2p"]: if parts.len == 11 and parts[^2] notin ["ipfs", "p2p"]:
quit("invalid or incompelete peerId") quit("invalid or incompelete peerId")
var remotePeer: PeerInfo var remotePeer = PeerInfo.init(parts[^1],
remotePeer.peerId = some(PeerID.init(parts[^1])) @[MultiAddress.init(address)])
remotePeer.addrs.add(MultiAddress.init(address))
echo &"dialing peer: {address}" echo &"dialing peer: {address}"
p.conn = await p.switch.dial(remotePeer, ChatCodec) p.conn = await p.switch.dial(remotePeer, ChatCodec)
@ -165,8 +165,7 @@ proc serveThread(customData: CustomData) {.async.} =
var transp = fromPipe(customData.consoleFd) var transp = fromPipe(customData.consoleFd)
let seckey = PrivateKey.random(RSA) let seckey = PrivateKey.random(RSA)
var peerInfo: PeerInfo var peerInfo = PeerInfo.init(seckey)
peerInfo.peerId = some(PeerID.init(seckey))
var localAddress = DefaultAddr var localAddress = DefaultAddr
while true: while true:
echo &"Type an address to bind to or Enter to use the default {DefaultAddr}" echo &"Type an address to bind to or Enter to use the default {DefaultAddr}"
@ -202,7 +201,7 @@ proc serveThread(customData: CustomData) {.async.} =
var libp2pFuts = await switch.start() var libp2pFuts = await switch.start()
chatProto.started = true chatProto.started = true
let id = peerInfo.peerId.get().pretty let id = peerInfo.peerId.pretty
echo "PeerID: " & id echo "PeerID: " & id
echo "listening on: " echo "listening on: "
for a in peerInfo.addrs: for a in peerInfo.addrs:

View File

@ -7,7 +7,7 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
import chronos, chronicles, options import chronos, chronicles
import peerinfo, import peerinfo,
multiaddress, multiaddress,
stream/lpstream, stream/lpstream,
@ -19,7 +19,7 @@ const DefaultReadSize*: uint = 64 * 1024
type type
Connection* = ref object of LPStream Connection* = ref object of LPStream
peerInfo*: Option[PeerInfo] peerInfo*: PeerInfo
stream*: LPStream stream*: LPStream
observedAddrs*: Multiaddress observedAddrs*: Multiaddress
@ -39,19 +39,17 @@ proc newConnection*(stream: LPStream): Connection =
let this = result let this = result
if not isNil(result.stream.closeEvent): if not isNil(result.stream.closeEvent):
result.stream.closeEvent.wait(). result.stream.closeEvent.wait().
addCallback( addCallback do (udata: pointer):
proc (udata: pointer) = if not this.closed:
if not this.closed: trace "closing this connection because wrapped stream closed"
trace "closing this connection because wrapped stream closed" asyncCheck this.close()
asyncCheck this.close()
)
method read*(s: Connection, n = -1): Future[seq[byte]] {.gcsafe.} = method read*(s: Connection, n = -1): Future[seq[byte]] {.gcsafe.} =
s.stream.read(n) s.stream.read(n)
method readExactly*(s: Connection, method readExactly*(s: Connection,
pbytes: pointer, pbytes: pointer,
nbytes: int): nbytes: int):
Future[void] {.gcsafe.} = Future[void] {.gcsafe.} =
s.stream.readExactly(pbytes, nbytes) s.stream.readExactly(pbytes, nbytes)
@ -70,7 +68,7 @@ method readOnce*(s: Connection,
method readUntil*(s: Connection, method readUntil*(s: Connection,
pbytes: pointer, pbytes: pointer,
nbytes: int, nbytes: int,
sep: seq[byte]): sep: seq[byte]):
Future[int] {.gcsafe.} = Future[int] {.gcsafe.} =
s.stream.readUntil(pbytes, nbytes, sep) s.stream.readUntil(pbytes, nbytes, sep)
@ -86,13 +84,13 @@ method write*(s: Connection,
Future[void] {.gcsafe.} = Future[void] {.gcsafe.} =
s.stream.write(msg, msglen) s.stream.write(msg, msglen)
method write*(s: Connection, method write*(s: Connection,
msg: seq[byte], msg: seq[byte],
msglen = -1): msglen = -1):
Future[void] {.gcsafe.} = Future[void] {.gcsafe.} =
s.stream.write(msg, msglen) s.stream.write(msg, msglen)
method closed*(s: Connection): bool = method closed*(s: Connection): bool =
if isNil(s.stream): if isNil(s.stream):
return false return false

View File

@ -43,7 +43,7 @@ proc readMplexVarint(conn: Connection): Future[Option[uint]] {.async, gcsafe.} =
except LPStreamIncompleteError as exc: except LPStreamIncompleteError as exc:
trace "unable to read varint", exc = exc.msg trace "unable to read varint", exc = exc.msg
proc readMsg*(conn: Connection): Future[Option[Msg]] {.async, gcsafe.} = proc readMsg*(conn: Connection): Future[Option[Msg]] {.async, gcsafe.} =
let headerVarint = await conn.readMplexVarint() let headerVarint = await conn.readMplexVarint()
if headerVarint.isNone: if headerVarint.isNone:
return return
@ -61,7 +61,7 @@ proc readMsg*(conn: Connection): Future[Option[Msg]] {.async, gcsafe.} =
proc writeMsg*(conn: Connection, proc writeMsg*(conn: Connection,
id: uint, id: uint,
msgType: MessageType, msgType: MessageType,
data: seq[byte] = @[]) {.async, gcsafe.} = data: seq[byte] = @[]) {.async, gcsafe.} =
## write lenght prefixed ## write lenght prefixed
var buf = initVBuffer() var buf = initVBuffer()
@ -75,6 +75,6 @@ proc writeMsg*(conn: Connection,
proc writeMsg*(conn: Connection, proc writeMsg*(conn: Connection,
id: uint, id: uint,
msgType: MessageType, msgType: MessageType,
data: string) {.async, gcsafe.} = data: string) {.async, gcsafe.} =
result = conn.writeMsg(id, msgType, cast[seq[byte]](data)) result = conn.writeMsg(id, msgType, cast[seq[byte]](data))

View File

@ -18,7 +18,7 @@ import types,
logScope: logScope:
topic = "MplexChannel" topic = "MplexChannel"
const DefaultChannelSize* = DefaultBufferSize * 64 # 64kb const DefaultChannelSize* = DefaultBufferSize * 64 # 64kb
type type
LPChannel* = ref object of BufferStream LPChannel* = ref object of BufferStream
@ -39,7 +39,7 @@ proc newChannel*(id: uint,
conn: Connection, conn: Connection,
initiator: bool, initiator: bool,
name: string = "", name: string = "",
size: int = DefaultChannelSize): LPChannel = size: int = DefaultChannelSize): LPChannel =
new result new result
result.id = id result.id = id
result.name = name result.name = name
@ -51,7 +51,7 @@ proc newChannel*(id: uint,
result.asyncLock = newAsyncLock() result.asyncLock = newAsyncLock()
let chan = result let chan = result
proc writeHandler(data: seq[byte]): Future[void] {.async.} = proc writeHandler(data: seq[byte]): Future[void] {.async.} =
# writes should happen in sequence # writes should happen in sequence
await chan.asyncLock.acquire() await chan.asyncLock.acquire()
trace "sending data ", data = data.toHex(), trace "sending data ", data = data.toHex(),
@ -66,11 +66,11 @@ proc newChannel*(id: uint,
proc closeMessage(s: LPChannel) {.async.} = proc closeMessage(s: LPChannel) {.async.} =
await s.conn.writeMsg(s.id, s.closeCode) # write header await s.conn.writeMsg(s.id, s.closeCode) # write header
proc closedByRemote*(s: LPChannel) {.async.} = proc closedByRemote*(s: LPChannel) {.async.} =
s.closedRemote = true s.closedRemote = true
proc cleanUp*(s: LPChannel): Future[void] = proc cleanUp*(s: LPChannel): Future[void] =
# method which calls the underlying buffer's `close` # method which calls the underlying buffer's `close`
# method used instead of `close` since it's overloaded to # method used instead of `close` since it's overloaded to
# simulate half-closed streams # simulate half-closed streams
result = procCall close(BufferStream(s)) result = procCall close(BufferStream(s))
@ -97,16 +97,16 @@ method closed*(s: LPChannel): bool =
proc pushTo*(s: LPChannel, data: seq[byte]): Future[void] = proc pushTo*(s: LPChannel, data: seq[byte]): Future[void] =
if s.closedRemote or s.isReset: if s.closedRemote or s.isReset:
raise newLPStreamClosedError() raise newLPStreamEOFError()
trace "pushing data to channel", data = data.toHex(), trace "pushing data to channel", data = data.toHex(),
id = s.id, id = s.id,
initiator = s.initiator initiator = s.initiator
result = procCall pushTo(BufferStream(s), data) result = procCall pushTo(BufferStream(s), data)
method read*(s: LPChannel, n = -1): Future[seq[byte]] = method read*(s: LPChannel, n = -1): Future[seq[byte]] =
if s.closed or s.isReset: if s.closed or s.isReset:
raise newLPStreamClosedError() raise newLPStreamEOFError()
result = procCall read(BufferStream(s), n) result = procCall read(BufferStream(s), n)
@ -115,7 +115,7 @@ method readExactly*(s: LPChannel,
nbytes: int): nbytes: int):
Future[void] = Future[void] =
if s.closed or s.isReset: if s.closed or s.isReset:
raise newLPStreamClosedError() raise newLPStreamEOFError()
result = procCall readExactly(BufferStream(s), pbytes, nbytes) result = procCall readExactly(BufferStream(s), pbytes, nbytes)
method readLine*(s: LPChannel, method readLine*(s: LPChannel,
@ -123,38 +123,38 @@ method readLine*(s: LPChannel,
sep = "\r\n"): sep = "\r\n"):
Future[string] = Future[string] =
if s.closed or s.isReset: if s.closed or s.isReset:
raise newLPStreamClosedError() raise newLPStreamEOFError()
result = procCall readLine(BufferStream(s), limit, sep) result = procCall readLine(BufferStream(s), limit, sep)
method readOnce*(s: LPChannel, method readOnce*(s: LPChannel,
pbytes: pointer, pbytes: pointer,
nbytes: int): nbytes: int):
Future[int] = Future[int] =
if s.closed or s.isReset: if s.closed or s.isReset:
raise newLPStreamClosedError() raise newLPStreamEOFError()
result = procCall readOnce(BufferStream(s), pbytes, nbytes) result = procCall readOnce(BufferStream(s), pbytes, nbytes)
method readUntil*(s: LPChannel, method readUntil*(s: LPChannel,
pbytes: pointer, nbytes: int, pbytes: pointer, nbytes: int,
sep: seq[byte]): sep: seq[byte]):
Future[int] = Future[int] =
if s.closed or s.isReset: if s.closed or s.isReset:
raise newLPStreamClosedError() raise newLPStreamEOFError()
result = procCall readOnce(BufferStream(s), pbytes, nbytes) result = procCall readOnce(BufferStream(s), pbytes, nbytes)
method write*(s: LPChannel, method write*(s: LPChannel,
pbytes: pointer, pbytes: pointer,
nbytes: int): Future[void] = nbytes: int): Future[void] =
if s.closedLocal or s.isReset: if s.closedLocal or s.isReset:
raise newLPStreamClosedError() raise newLPStreamEOFError()
result = procCall write(BufferStream(s), pbytes, nbytes) result = procCall write(BufferStream(s), pbytes, nbytes)
method write*(s: LPChannel, msg: string, msglen = -1) {.async.} = method write*(s: LPChannel, msg: string, msglen = -1) {.async.} =
if s.closedLocal or s.isReset: if s.closedLocal or s.isReset:
raise newLPStreamClosedError() raise newLPStreamEOFError()
result = procCall write(BufferStream(s), msg, msglen) result = procCall write(BufferStream(s), msg, msglen)
method write*(s: LPChannel, msg: seq[byte], msglen = -1) {.async.} = method write*(s: LPChannel, msg: seq[byte], msglen = -1) {.async.} =
if s.closedLocal or s.isReset: if s.closedLocal or s.isReset:
raise newLPStreamClosedError() raise newLPStreamEOFError()
result = procCall write(BufferStream(s), msg, msglen) result = procCall write(BufferStream(s), msg, msglen)

View File

@ -16,8 +16,8 @@ import chronos, chronicles
import ../muxer, import ../muxer,
../../connection, ../../connection,
../../stream/lpstream, ../../stream/lpstream,
coder, coder,
types, types,
lpchannel lpchannel
logScope: logScope:
@ -42,7 +42,7 @@ proc newStreamInternal*(m: Mplex,
initiator: bool = true, initiator: bool = true,
chanId: uint = 0, chanId: uint = 0,
name: string = ""): name: string = ""):
Future[LPChannel] {.async, gcsafe.} = Future[LPChannel] {.async, gcsafe.} =
## create new channel/stream ## create new channel/stream
let id = if initiator: m.currentId.inc(); m.currentId else: chanId let id = if initiator: m.currentId.inc(); m.currentId else: chanId
trace "creating new channel", channelId = id, initiator = initiator trace "creating new channel", channelId = id, initiator = initiator
@ -50,7 +50,7 @@ proc newStreamInternal*(m: Mplex,
m.getChannelList(initiator)[id] = result m.getChannelList(initiator)[id] = result
proc cleanupChann(m: Mplex, chann: LPChannel, initiator: bool) {.async, inline.} = proc cleanupChann(m: Mplex, chann: LPChannel, initiator: bool) {.async, inline.} =
## call the channel's `close` to signal the ## call the channel's `close` to signal the
## remote that the channel is closing ## remote that the channel is closing
if not isNil(chann) and not chann.closed: if not isNil(chann) and not chann.closed:
await chann.close() await chann.close()
@ -58,7 +58,7 @@ proc cleanupChann(m: Mplex, chann: LPChannel, initiator: bool) {.async, inline.}
m.getChannelList(initiator).del(chann.id) m.getChannelList(initiator).del(chann.id)
trace "cleaned up channel", id = chann.id trace "cleaned up channel", id = chann.id
method handle*(m: Mplex) {.async, gcsafe.} = method handle*(m: Mplex) {.async, gcsafe.} =
trace "starting mplex main loop" trace "starting mplex main loop"
try: try:
while not m.connection.closed: while not m.connection.closed:
@ -100,21 +100,21 @@ method handle*(m: Mplex) {.async, gcsafe.} =
continue continue
of MessageType.MsgIn, MessageType.MsgOut: of MessageType.MsgIn, MessageType.MsgOut:
trace "pushing data to channel", id = id, trace "pushing data to channel", id = id,
initiator = initiator, initiator = initiator,
msgType = msgType msgType = msgType
await channel.pushTo(data) await channel.pushTo(data)
of MessageType.CloseIn, MessageType.CloseOut: of MessageType.CloseIn, MessageType.CloseOut:
trace "closing channel", id = id, trace "closing channel", id = id,
initiator = initiator, initiator = initiator,
msgType = msgType msgType = msgType
await channel.closedByRemote() await channel.closedByRemote()
# m.getChannelList(initiator).del(id) # m.getChannelList(initiator).del(id)
of MessageType.ResetIn, MessageType.ResetOut: of MessageType.ResetIn, MessageType.ResetOut:
trace "resetting channel", id = id, trace "resetting channel", id = id,
initiator = initiator, initiator = initiator,
msgType = msgType msgType = msgType
await channel.resetByRemote() await channel.resetByRemote()
@ -126,7 +126,7 @@ method handle*(m: Mplex) {.async, gcsafe.} =
trace "stopping mplex main loop" trace "stopping mplex main loop"
await m.connection.close() await m.connection.close()
proc newMplex*(conn: Connection, proc newMplex*(conn: Connection,
maxChanns: uint = MaxChannels): Mplex = maxChanns: uint = MaxChannels): Mplex =
new result new result
result.connection = conn result.connection = conn
@ -135,11 +135,9 @@ proc newMplex*(conn: Connection,
result.local = initTable[uint, LPChannel]() result.local = initTable[uint, LPChannel]()
let m = result let m = result
conn.closeEvent.wait().addCallback( conn.closeEvent.wait().addCallback do (udata: pointer):
proc(udata: pointer) = trace "connection closed, cleaning up mplex"
trace "connection closed, cleaning up mplex" asyncCheck m.close()
asyncCheck m.close()
)
method newStream*(m: Mplex, name: string = ""): Future[Connection] {.async, gcsafe.} = method newStream*(m: Mplex, name: string = ""): Future[Connection] {.async, gcsafe.} =
let channel = await m.newStreamInternal() let channel = await m.newStreamInternal()
@ -148,7 +146,7 @@ method newStream*(m: Mplex, name: string = ""): Future[Connection] {.async, gcsa
result = newConnection(channel) result = newConnection(channel)
result.peerInfo = m.connection.peerInfo result.peerInfo = m.connection.peerInfo
method close*(m: Mplex) {.async, gcsafe.} = method close*(m: Mplex) {.async, gcsafe.} =
trace "closing mplex muxer" trace "closing mplex muxer"
await allFutures(@[allFutures(toSeq(m.remote.values).mapIt(it.reset())), await allFutures(@[allFutures(toSeq(m.remote.values).mapIt(it.reset())),
allFutures(toSeq(m.local.values).mapIt(it.reset()))]) allFutures(toSeq(m.local.values).mapIt(it.reset()))])

View File

@ -36,7 +36,7 @@ method newStream*(m: Muxer, name: string = ""): Future[Connection] {.base, async
method close*(m: Muxer) {.base, async, gcsafe.} = discard method close*(m: Muxer) {.base, async, gcsafe.} = discard
method handle*(m: Muxer): Future[void] {.base, async, gcsafe.} = discard method handle*(m: Muxer): Future[void] {.base, async, gcsafe.} = discard
proc newMuxerProvider*(creator: MuxerConstructor, codec: string): MuxerProvider {.gcsafe.} = proc newMuxerProvider*(creator: MuxerConstructor, codec: string): MuxerProvider {.gcsafe.} =
new result new result
result.newMuxer = creator result.newMuxer = creator
result.codec = codec result.codec = codec

View File

@ -8,7 +8,7 @@
## those terms. ## those terms.
## This module implementes API for libp2p peer. ## This module implementes API for libp2p peer.
import hashes, options import hashes
import nimcrypto/utils import nimcrypto/utils
import crypto/crypto, multicodec, multihash, base58, vbuffer import crypto/crypto, multicodec, multihash, base58, vbuffer
import protobuf/minprotobuf import protobuf/minprotobuf

View File

@ -23,8 +23,6 @@ type
HasPrivate, HasPrivate,
HasPublic HasPublic
InvalidPublicKeyException* = object of Exception
PeerInfo* = ref object of RootObj PeerInfo* = ref object of RootObj
peerId*: PeerID peerId*: PeerID
addrs*: seq[MultiAddress] addrs*: seq[MultiAddress]
@ -35,10 +33,6 @@ type
of HasPublic: of HasPublic:
key: Option[PublicKey] key: Option[PublicKey]
proc newInvalidPublicKeyException(): ref Exception =
newException(InvalidPublicKeyException,
"attempting to assign an invalid public key")
proc init*(p: typedesc[PeerInfo], proc init*(p: typedesc[PeerInfo],
key: PrivateKey, key: PrivateKey,
addrs: seq[MultiAddress] = @[], addrs: seq[MultiAddress] = @[],
@ -60,6 +54,16 @@ proc init*(p: typedesc[PeerInfo],
addrs: addrs, addrs: addrs,
protocols: protocols) protocols: protocols)
proc init*(p: typedesc[PeerInfo],
peerId: string,
addrs: seq[MultiAddress] = @[],
protocols: seq[string] = @[]): PeerInfo {.inline.} =
PeerInfo(keyType: HasPublic,
peerId: PeerID.init(peerId),
addrs: addrs,
protocols: protocols)
proc init*(p: typedesc[PeerInfo], proc init*(p: typedesc[PeerInfo],
key: PublicKey, key: PublicKey,
addrs: seq[MultiAddress] = @[], addrs: seq[MultiAddress] = @[],
@ -82,12 +86,6 @@ proc publicKey*(p: PeerInfo): Option[PublicKey] {.inline.} =
else: else:
result = some(p.privateKey.getKey()) result = some(p.privateKey.getKey())
proc `publicKey=`*(p: PeerInfo, key: PublicKey) =
if not (PeerID.init(key) == p.peerId):
raise newInvalidPublicKeyException()
p.key = some(key)
proc id*(p: PeerInfo): string {.inline.} = proc id*(p: PeerInfo): string {.inline.} =
p.peerId.pretty p.peerId.pretty

View File

@ -9,11 +9,11 @@
import options import options
import chronos, chronicles import chronos, chronicles
import ../protobuf/minprotobuf, import ../protobuf/minprotobuf,
../peerinfo, ../peerinfo,
../connection, ../connection,
../peer, ../peer,
../crypto/crypto, ../crypto/crypto,
../multiaddress, ../multiaddress,
../protocols/protocol ../protocols/protocol
@ -43,7 +43,7 @@ type
Identify* = ref object of LPProtocol Identify* = ref object of LPProtocol
peerInfo*: PeerInfo peerInfo*: PeerInfo
proc encodeMsg*(peerInfo: PeerInfo, observedAddrs: Multiaddress): ProtoBuffer = proc encodeMsg*(peerInfo: PeerInfo, observedAddrs: Multiaddress): ProtoBuffer =
result = initProtoBuffer() result = initProtoBuffer()
result.write(initProtoField(1, peerInfo.publicKey.get().getBytes())) result.write(initProtoField(1, peerInfo.publicKey.get().getBytes()))
@ -63,7 +63,7 @@ proc encodeMsg*(peerInfo: PeerInfo, observedAddrs: Multiaddress): ProtoBuffer =
result.write(initProtoField(6, agentVersion)) result.write(initProtoField(6, agentVersion))
result.finish() result.finish()
proc decodeMsg*(buf: seq[byte]): IdentifyInfo = proc decodeMsg*(buf: seq[byte]): IdentifyInfo =
var pb = initProtoBuffer(buf) var pb = initProtoBuffer(buf)
result.pubKey = none(PublicKey) result.pubKey = none(PublicKey)
@ -87,7 +87,7 @@ proc decodeMsg*(buf: seq[byte]): IdentifyInfo =
trace "read proto from message", proto = proto trace "read proto from message", proto = proto
result.protos.add(proto) result.protos.add(proto)
proto = "" proto = ""
var observableAddr = newSeq[byte]() var observableAddr = newSeq[byte]()
if pb.getBytes(4, observableAddr) > 0: # attempt to read the observed addr if pb.getBytes(4, observableAddr) > 0: # attempt to read the observed addr
var ma = MultiAddress.init(observableAddr) var ma = MultiAddress.init(observableAddr)
@ -109,7 +109,7 @@ proc newIdentify*(peerInfo: PeerInfo): Identify =
result.peerInfo = peerInfo result.peerInfo = peerInfo
result.init() result.init()
method init*(p: Identify) = method init*(p: Identify) =
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
trace "handling identify request" trace "handling identify request"
var pb = encodeMsg(p.peerInfo, await conn.getObservedAddrs()) var pb = encodeMsg(p.peerInfo, await conn.getObservedAddrs())
@ -119,9 +119,9 @@ method init*(p: Identify) =
p.handler = handle p.handler = handle
p.codec = IdentifyCodec p.codec = IdentifyCodec
proc identify*(p: Identify, proc identify*(p: Identify,
conn: Connection, conn: Connection,
remotePeerInfo: Option[PeerInfo]): Future[IdentifyInfo] {.async, gcsafe.} = remotePeerInfo: PeerInfo): Future[IdentifyInfo] {.async, gcsafe.} =
var message = await conn.readLp() var message = await conn.readLp()
if len(message) == 0: if len(message) == 0:
trace "identify: Invalid or empty message received!" trace "identify: Invalid or empty message received!"
@ -130,13 +130,13 @@ proc identify*(p: Identify,
result = decodeMsg(message) result = decodeMsg(message)
if remotePeerInfo.isSome and result.pubKey.isSome: if not isNil(remotePeerInfo) and result.pubKey.isSome:
let peer = PeerID.init(result.pubKey.get()) let peer = PeerID.init(result.pubKey.get())
# do a string comaprison of the ids, # do a string comaprison of the ids,
# because that is the only thing we # because that is the only thing we
# have in most cases # have in most cases
if peer != remotePeerInfo.get().peerId: if peer != remotePeerInfo.peerId:
trace "Peer ids don't match", trace "Peer ids don't match",
remote = peer.pretty(), remote = peer.pretty(),
local = remotePeerInfo.get().id local = remotePeerInfo.get().id

View File

@ -26,27 +26,27 @@ const FloodSubCodec* = "/floodsub/1.0.0"
type type
FloodSub* = ref object of PubSub FloodSub* = ref object of PubSub
floodsub*: Table[string, HashSet[string]] # topic to remote peer map floodsub*: Table[string, HashSet[string]] # topic to remote peer map
seen*: TimedCache[string] # list of messages forwarded to peers seen*: TimedCache[string] # list of messages forwarded to peers
method subscribeTopic*(f: FloodSub, method subscribeTopic*(f: FloodSub,
topic: string, topic: string,
subscribe: bool, subscribe: bool,
peerId: string) {.gcsafe.} = peerId: string) {.gcsafe.} =
procCall PubSub(f).subscribeTopic(topic, subscribe, peerId) procCall PubSub(f).subscribeTopic(topic, subscribe, peerId)
if topic notin f.floodsub: if topic notin f.floodsub:
f.floodsub[topic] = initHashSet[string]() f.floodsub[topic] = initHashSet[string]()
if subscribe: if subscribe:
trace "adding subscription for topic", peer = peerId, name = topic trace "adding subscription for topic", peer = peerId, name = topic
# subscribe the peer to the topic # subscribe the peer to the topic
f.floodsub[topic].incl(peerId) f.floodsub[topic].incl(peerId)
else: else:
trace "removing subscription for topic", peer = peerId, name = topic trace "removing subscription for topic", peer = peerId, name = topic
# unsubscribe the peer from the topic # unsubscribe the peer from the topic
f.floodsub[topic].excl(peerId) f.floodsub[topic].excl(peerId)
method handleDisconnect*(f: FloodSub, peer: PubSubPeer) {.async, gcsafe.} = method handleDisconnect*(f: FloodSub, peer: PubSubPeer) {.async, gcsafe.} =
## handle peer disconnects ## handle peer disconnects
for t in f.floodsub.keys: for t in f.floodsub.keys:
f.floodsub[t].excl(peer.id) f.floodsub[t].excl(peer.id)
@ -78,7 +78,7 @@ method rpcHandler*(f: FloodSub,
if p in f.peers and f.peers[p].id != peer.id: if p in f.peers and f.peers[p].id != peer.id:
await f.peers[p].send(@[RPCMsg(messages: m.messages)]) await f.peers[p].send(@[RPCMsg(messages: m.messages)])
method init(f: FloodSub) = method init(f: FloodSub) =
proc handler(conn: Connection, proto: string) {.async, gcsafe.} = proc handler(conn: Connection, proto: string) {.async, gcsafe.} =
## main protocol handler that gets triggered on every ## main protocol handler that gets triggered on every
## connection for a protocol string ## connection for a protocol string
@ -109,8 +109,8 @@ method publish*(f: FloodSub,
trace "publishing message", name = topic, peer = p, data = data trace "publishing message", name = topic, peer = p, data = data
await f.peers[p].send(@[RPCMsg(messages: @[msg])]) await f.peers[p].send(@[RPCMsg(messages: @[msg])])
method unsubscribe*(f: FloodSub, method unsubscribe*(f: FloodSub,
topics: seq[TopicPair]) {.async, gcsafe.} = topics: seq[TopicPair]) {.async, gcsafe.} =
await procCall PubSub(f).unsubscribe(topics) await procCall PubSub(f).unsubscribe(topics)
for p in f.peers.values: for p in f.peers.values:

View File

@ -27,7 +27,7 @@ logScope:
const GossipSubCodec* = "/meshsub/1.0.0" const GossipSubCodec* = "/meshsub/1.0.0"
# overlay parameters # overlay parameters
const GossipSubD* = 6 const GossipSubD* = 6
const GossipSubDlo* = 4 const GossipSubDlo* = 4
const GossipSubDhi* = 12 const GossipSubDhi* = 12
@ -37,25 +37,26 @@ const GossipSubHistoryGossip* = 3
# heartbeat interval # heartbeat interval
const GossipSubHeartbeatInitialDelay* = 100.millis const GossipSubHeartbeatInitialDelay* = 100.millis
const GossipSubHeartbeatInterval* = 1.seconds const GossipSubHeartbeatInterval* = 1.seconds
# fanout ttl # fanout ttl
const GossipSubFanoutTTL* = 60.seconds const GossipSubFanoutTTL* = 60.seconds
type type
GossipSub* = ref object of FloodSub GossipSub* = ref object of FloodSub
mesh*: Table[string, HashSet[string]] # meshes - topic to peer mesh*: Table[string, HashSet[string]] # meshes - topic to peer
fanout*: Table[string, HashSet[string]] # fanout - topic to peer fanout*: Table[string, HashSet[string]] # fanout - topic to peer
gossipsub*: Table[string, HashSet[string]] # topic to peer map of all gossipsub peers gossipsub*: Table[string, HashSet[string]] # topic to peer map of all gossipsub peers
lastFanoutPubSub*: Table[string, Moment] # last publish time for fanout topics lastFanoutPubSub*: Table[string, Moment] # last publish time for fanout topics
gossip*: Table[string, seq[ControlIHave]] # pending gossip gossip*: Table[string, seq[ControlIHave]] # pending gossip
control*: Table[string, ControlMessage] # pending control messages control*: Table[string, ControlMessage] # pending control messages
mcache*: MCache # messages cache mcache*: MCache # messages cache
heartbeatCancel*: Future[void] # cancelation future for heartbeat interval heartbeatCancel*: Future[void] # cancelation future for heartbeat interval
heartbeatLock: AsyncLock heartbeatLock: AsyncLock
# TODO: This belong in chronos, temporary left here until chronos is updated # TODO: This belong in chronos, temporary left here until chronos is updated
proc addInterval(every: Duration, cb: CallbackFunc, udata: pointer = nil): Future[void] = proc addInterval(every: Duration, cb: CallbackFunc,
udata: pointer = nil): Future[void] =
## Arrange the callback ``cb`` to be called on every ``Duration`` window ## Arrange the callback ``cb`` to be called on every ``Duration`` window
var retFuture = newFuture[void]("chronos.addInterval(Duration)") var retFuture = newFuture[void]("chronos.addInterval(Duration)")
@ -71,7 +72,7 @@ proc addInterval(every: Duration, cb: CallbackFunc, udata: pointer = nil): Futur
scheduleNext() scheduleNext()
return retFuture return retFuture
method init(g: GossipSub) = method init(g: GossipSub) =
proc handler(conn: Connection, proto: string) {.async, gcsafe.} = proc handler(conn: Connection, proto: string) {.async, gcsafe.} =
## main protocol handler that gets triggered on every ## main protocol handler that gets triggered on every
## connection for a protocol string ## connection for a protocol string
@ -83,7 +84,7 @@ method init(g: GossipSub) =
g.handler = handler g.handler = handler
g.codec = GossipSubCodec g.codec = GossipSubCodec
method handleDisconnect(g: GossipSub, peer: PubSubPeer) {.async, gcsafe.} = method handleDisconnect(g: GossipSub, peer: PubSubPeer) {.async, gcsafe.} =
## handle peer disconnects ## handle peer disconnects
await procCall FloodSub(g).handleDisconnect(peer) await procCall FloodSub(g).handleDisconnect(peer)
for t in g.gossipsub.keys: for t in g.gossipsub.keys:
@ -99,26 +100,26 @@ method subscribeTopic*(g: GossipSub,
topic: string, topic: string,
subscribe: bool, subscribe: bool,
peerId: string) {.gcsafe.} = peerId: string) {.gcsafe.} =
procCall PubSub(g).subscribeTopic(topic, subscribe, peerId) procCall PubSub(g).subscribeTopic(topic, subscribe, peerId)
if topic notin g.gossipsub: if topic notin g.gossipsub:
g.gossipsub[topic] = initHashSet[string]() g.gossipsub[topic] = initHashSet[string]()
if subscribe: if subscribe:
trace "adding subscription for topic", peer = peerId, name = topic trace "adding subscription for topic", peer = peerId, name = topic
# subscribe the peer to the topic # subscribe the peer to the topic
g.gossipsub[topic].incl(peerId) g.gossipsub[topic].incl(peerId)
else: else:
trace "removing subscription for topic", peer = peerId, name = topic trace "removing subscription for topic", peer = peerId, name = topic
# unsubscribe the peer from the topic # unsubscribe the peer from the topic
g.gossipsub[topic].excl(peerId) g.gossipsub[topic].excl(peerId)
proc handleGraft(g: GossipSub, proc handleGraft(g: GossipSub,
peer: PubSubPeer, peer: PubSubPeer,
grafts: seq[ControlGraft], grafts: seq[ControlGraft],
respControl: var ControlMessage) = respControl: var ControlMessage) =
for graft in grafts: for graft in grafts:
trace "processing graft message", peer = peer.id, trace "processing graft message", peer = peer.id,
topicID = graft.topicID topicID = graft.topicID
if graft.topicID in g.topics: if graft.topicID in g.topics:
@ -131,13 +132,14 @@ proc handleGraft(g: GossipSub,
proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) = proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) =
for prune in prunes: for prune in prunes:
trace "processing prune message", peer = peer.id, trace "processing prune message", peer = peer.id,
topicID = prune.topicID topicID = prune.topicID
if prune.topicID in g.mesh: if prune.topicID in g.mesh:
g.mesh[prune.topicID].excl(peer.id) g.mesh[prune.topicID].excl(peer.id)
proc handleIHave(g: GossipSub, peer: PubSubPeer, ihaves: seq[ControlIHave]): ControlIWant = proc handleIHave(g: GossipSub, peer: PubSubPeer, ihaves: seq[
ControlIHave]): ControlIWant =
for ihave in ihaves: for ihave in ihaves:
trace "processing ihave message", peer = peer.id, trace "processing ihave message", peer = peer.id,
topicID = ihave.topicID topicID = ihave.topicID
@ -147,7 +149,8 @@ proc handleIHave(g: GossipSub, peer: PubSubPeer, ihaves: seq[ControlIHave]): Con
if m notin g.seen: if m notin g.seen:
result.messageIDs.add(m) result.messageIDs.add(m)
proc handleIWant(g: GossipSub, peer: PubSubPeer, iwants: seq[ControlIWant]): seq[Message] = proc handleIWant(g: GossipSub, peer: PubSubPeer, iwants: seq[
ControlIWant]): seq[Message] =
for iwant in iwants: for iwant in iwants:
for mid in iwant.messageIDs: for mid in iwant.messageIDs:
trace "processing iwant message", peer = peer.id, trace "processing iwant message", peer = peer.id,
@ -158,7 +161,7 @@ proc handleIWant(g: GossipSub, peer: PubSubPeer, iwants: seq[ControlIWant]): seq
method rpcHandler(g: GossipSub, method rpcHandler(g: GossipSub,
peer: PubSubPeer, peer: PubSubPeer,
rpcMsgs: seq[RPCMsg]) {.async, gcsafe.} = rpcMsgs: seq[RPCMsg]) {.async, gcsafe.} =
await procCall PubSub(g).rpcHandler(peer, rpcMsgs) await procCall PubSub(g).rpcHandler(peer, rpcMsgs)
trace "processing RPC message", peer = peer.id, msg = rpcMsgs trace "processing RPC message", peer = peer.id, msg = rpcMsgs
@ -203,13 +206,13 @@ method rpcHandler(g: GossipSub,
for p in toSendPeers: for p in toSendPeers:
if p in g.peers and if p in g.peers and
g.peers[p].peerInfo.peerId != peer.peerInfo.peerId: g.peers[p].peerInfo.peerId != peer.peerInfo.peerId:
let id = g.peers[p].peerInfo.peerId let id = g.peers[p].peerInfo.peerId
let msgs = m.messages.filterIt( let msgs = m.messages.filterIt(
# don't forward to message originator # don't forward to message originator
id != it.fromPeerId() id != it.fromPeerId()
) )
if msgs.len > 0: if msgs.len > 0:
await g.peers[p].send(@[RPCMsg(messages: msgs)]) await g.peers[p].send(@[RPCMsg(messages: msgs)])
var respControl: ControlMessage var respControl: ControlMessage
if m.control.isSome: if m.control.isSome:
@ -224,9 +227,10 @@ method rpcHandler(g: GossipSub,
if respControl.graft.len > 0 or respControl.prune.len > 0 or if respControl.graft.len > 0 or respControl.prune.len > 0 or
respControl.ihave.len > 0 or respControl.iwant.len > 0: respControl.ihave.len > 0 or respControl.iwant.len > 0:
await peer.send(@[RPCMsg(control: some(respControl), messages: messages)]) await peer.send(@[RPCMsg(control: some(respControl),
messages: messages)])
proc replenishFanout(g: GossipSub, topic: string) {.async, gcsafe.} = proc replenishFanout(g: GossipSub, topic: string) {.async, gcsafe.} =
## get fanout peers for a topic ## get fanout peers for a topic
trace "about to replenish fanout" trace "about to replenish fanout"
if topic notin g.fanout: if topic notin g.fanout:
@ -242,7 +246,7 @@ proc replenishFanout(g: GossipSub, topic: string) {.async, gcsafe.} =
trace "fanout replenished with peers", peers = g.fanout[topic].len trace "fanout replenished with peers", peers = g.fanout[topic].len
proc rebalanceMesh(g: GossipSub, topic: string) {.async, gcsafe.} = proc rebalanceMesh(g: GossipSub, topic: string) {.async, gcsafe.} =
trace "about to rebalance mesh" trace "about to rebalance mesh"
# create a mesh topic that we're subscribing to # create a mesh topic that we're subscribing to
if topic notin g.mesh: if topic notin g.mesh:
@ -284,8 +288,8 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async, gcsafe.} =
trace "mesh balanced, got peers", peers = g.mesh[topic].len trace "mesh balanced, got peers", peers = g.mesh[topic].len
proc dropFanoutPeers(g: GossipSub) {.async, gcsafe.} = proc dropFanoutPeers(g: GossipSub) {.async, gcsafe.} =
# drop peers that we haven't published to in # drop peers that we haven't published to in
# GossipSubFanoutTTL seconds # GossipSubFanoutTTL seconds
for topic in g.lastFanoutPubSub.keys: for topic in g.lastFanoutPubSub.keys:
if Moment.now > g.lastFanoutPubSub[topic]: if Moment.now > g.lastFanoutPubSub[topic]:
@ -330,7 +334,7 @@ proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} =
result[id] = ControlMessage() result[id] = ControlMessage()
result[id].ihave.add(ihave) result[id].ihave.add(ihave)
proc heartbeat(g: GossipSub) {.async, gcsafe.} = proc heartbeat(g: GossipSub) {.async, gcsafe.} =
trace "running heartbeat" trace "running heartbeat"
await g.heartbeatLock.acquire() await g.heartbeatLock.acquire()
@ -354,7 +358,7 @@ method subscribe*(g: GossipSub,
asyncCheck g.rebalanceMesh(topic) asyncCheck g.rebalanceMesh(topic)
method unsubscribe*(g: GossipSub, method unsubscribe*(g: GossipSub,
topics: seq[TopicPair]) {.async, gcsafe.} = topics: seq[TopicPair]) {.async, gcsafe.} =
await procCall PubSub(g).unsubscribe(topics) await procCall PubSub(g).unsubscribe(topics)
for pair in topics: for pair in topics:
@ -400,7 +404,7 @@ method start*(g: GossipSub) {.async.} =
# setup the heartbeat interval # setup the heartbeat interval
g.heartbeatCancel = addInterval(GossipSubHeartbeatInterval, g.heartbeatCancel = addInterval(GossipSubHeartbeatInterval,
proc (arg: pointer = nil) {.gcsafe, locks: 0.} = proc (arg: pointer = nil) {.gcsafe, locks: 0.} =
asyncCheck g.heartbeat) asyncCheck g.heartbeat)
method stop*(g: GossipSub) {.async.} = method stop*(g: GossipSub) {.async.} =
## stopt pubsub ## stopt pubsub
@ -428,10 +432,10 @@ method initPubSub(g: GossipSub) =
## Unit tests ## Unit tests
when isMainModule and not defined(release): when isMainModule and not defined(release):
## Test internal (private) methods for gossip, ## Test internal (private) methods for gossip,
## mesh and fanout maintenance. ## mesh and fanout maintenance.
## Usually I wouldn't test private behaviour, ## Usually I wouldn't test private behaviour,
## but the maintenance methods are quite involved, ## but the maintenance methods are quite involved,
## hence these tests are here. ## hence these tests are here.
## ##
@ -444,18 +448,18 @@ when isMainModule and not defined(release):
suite "GossipSub": suite "GossipSub":
test "`rebalanceMesh` Degree Lo": test "`rebalanceMesh` Degree Lo":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
let gossipSub = newPubSub(TestGossipSub, let gossipSub = newPubSub(TestGossipSub,
PeerInfo.init(PrivateKey.random(RSA))) PeerInfo.init(PrivateKey.random(RSA)))
let topic = "foobar" let topic = "foobar"
gossipSub.mesh[topic] = initHashSet[string]() gossipSub.mesh[topic] = initHashSet[string]()
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = proc writeHandler(data: seq[byte]) {.async, gcsafe.} =
discard discard
for i in 0..<15: for i in 0..<15:
let conn = newConnection(newBufferStream(writeHandler)) let conn = newConnection(newBufferStream(writeHandler))
let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) let peerInfo = PeerInfo.init(PrivateKey.random(RSA))
conn.peerInfo = some(peerInfo) conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id].conn = conn gossipSub.peers[peerInfo.id].conn = conn
gossipSub.mesh[topic].incl(peerInfo.id) gossipSub.mesh[topic].incl(peerInfo.id)
@ -471,18 +475,18 @@ when isMainModule and not defined(release):
test "`rebalanceMesh` Degree Hi": test "`rebalanceMesh` Degree Hi":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
let gossipSub = newPubSub(TestGossipSub, let gossipSub = newPubSub(TestGossipSub,
PeerInfo.init(PrivateKey.random(RSA))) PeerInfo.init(PrivateKey.random(RSA)))
let topic = "foobar" let topic = "foobar"
gossipSub.gossipsub[topic] = initHashSet[string]() gossipSub.gossipsub[topic] = initHashSet[string]()
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = proc writeHandler(data: seq[byte]) {.async, gcsafe.} =
discard discard
for i in 0..<15: for i in 0..<15:
let conn = newConnection(newBufferStream(writeHandler)) let conn = newConnection(newBufferStream(writeHandler))
let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) let peerInfo = PeerInfo.init(PrivateKey.random(RSA))
conn.peerInfo = some(peerInfo) conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id].conn = conn gossipSub.peers[peerInfo.id].conn = conn
gossipSub.gossipsub[topic].incl(peerInfo.id) gossipSub.gossipsub[topic].incl(peerInfo.id)
@ -498,21 +502,21 @@ when isMainModule and not defined(release):
test "`replenishFanout` Degree Lo": test "`replenishFanout` Degree Lo":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
let gossipSub = newPubSub(TestGossipSub, let gossipSub = newPubSub(TestGossipSub,
PeerInfo.init(PrivateKey.random(RSA))) PeerInfo.init(PrivateKey.random(RSA)))
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} =
discard discard
let topic = "foobar" let topic = "foobar"
gossipSub.gossipsub[topic] = initHashSet[string]() gossipSub.gossipsub[topic] = initHashSet[string]()
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = proc writeHandler(data: seq[byte]) {.async, gcsafe.} =
discard discard
for i in 0..<15: for i in 0..<15:
let conn = newConnection(newBufferStream(writeHandler)) let conn = newConnection(newBufferStream(writeHandler))
var peerInfo = PeerInfo.init(PrivateKey.random(RSA)) var peerInfo = PeerInfo.init(PrivateKey.random(RSA))
conn.peerInfo = some(peerInfo) conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id].handler = handler gossipSub.peers[peerInfo.id].handler = handler
gossipSub.gossipsub[topic].incl(peerInfo.id) gossipSub.gossipsub[topic].incl(peerInfo.id)
@ -528,22 +532,22 @@ when isMainModule and not defined(release):
test "`dropFanoutPeers` drop expired fanout topics": test "`dropFanoutPeers` drop expired fanout topics":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
let gossipSub = newPubSub(TestGossipSub, let gossipSub = newPubSub(TestGossipSub,
PeerInfo.init(PrivateKey.random(RSA))) PeerInfo.init(PrivateKey.random(RSA)))
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} =
discard discard
let topic = "foobar" let topic = "foobar"
gossipSub.fanout[topic] = initHashSet[string]() gossipSub.fanout[topic] = initHashSet[string]()
gossipSub.lastFanoutPubSub[topic] = Moment.fromNow(100.millis) gossipSub.lastFanoutPubSub[topic] = Moment.fromNow(100.millis)
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = proc writeHandler(data: seq[byte]) {.async, gcsafe.} =
discard discard
for i in 0..<6: for i in 0..<6:
let conn = newConnection(newBufferStream(writeHandler)) let conn = newConnection(newBufferStream(writeHandler))
let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) let peerInfo = PeerInfo.init(PrivateKey.random(RSA))
conn.peerInfo = some(peerInfo) conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id].handler = handler gossipSub.peers[peerInfo.id].handler = handler
gossipSub.fanout[topic].incl(peerInfo.id) gossipSub.fanout[topic].incl(peerInfo.id)
@ -561,10 +565,10 @@ when isMainModule and not defined(release):
test "`dropFanoutPeers` leave unexpired fanout topics": test "`dropFanoutPeers` leave unexpired fanout topics":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
let gossipSub = newPubSub(TestGossipSub, let gossipSub = newPubSub(TestGossipSub,
PeerInfo.init(PrivateKey.random(RSA))) PeerInfo.init(PrivateKey.random(RSA)))
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} =
discard discard
let topic1 = "foobar1" let topic1 = "foobar1"
@ -574,13 +578,13 @@ when isMainModule and not defined(release):
gossipSub.lastFanoutPubSub[topic1] = Moment.fromNow(100.millis) gossipSub.lastFanoutPubSub[topic1] = Moment.fromNow(100.millis)
gossipSub.lastFanoutPubSub[topic1] = Moment.fromNow(500.millis) gossipSub.lastFanoutPubSub[topic1] = Moment.fromNow(500.millis)
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = proc writeHandler(data: seq[byte]) {.async, gcsafe.} =
discard discard
for i in 0..<6: for i in 0..<6:
let conn = newConnection(newBufferStream(writeHandler)) let conn = newConnection(newBufferStream(writeHandler))
let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) let peerInfo = PeerInfo.init(PrivateKey.random(RSA))
conn.peerInfo = some(peerInfo) conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id].handler = handler gossipSub.peers[peerInfo.id].handler = handler
gossipSub.fanout[topic1].incl(peerInfo.id) gossipSub.fanout[topic1].incl(peerInfo.id)
@ -601,13 +605,13 @@ when isMainModule and not defined(release):
test "`getGossipPeers` - should gather up to degree D non intersecting peers": test "`getGossipPeers` - should gather up to degree D non intersecting peers":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
let gossipSub = newPubSub(TestGossipSub, let gossipSub = newPubSub(TestGossipSub,
PeerInfo.init(PrivateKey.random(RSA))) PeerInfo.init(PrivateKey.random(RSA)))
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} =
discard discard
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = proc writeHandler(data: seq[byte]) {.async, gcsafe.} =
discard discard
let topic = "foobar" let topic = "foobar"
@ -617,7 +621,7 @@ when isMainModule and not defined(release):
for i in 0..<30: for i in 0..<30:
let conn = newConnection(newBufferStream(writeHandler)) let conn = newConnection(newBufferStream(writeHandler))
let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) let peerInfo = PeerInfo.init(PrivateKey.random(RSA))
conn.peerInfo = some(peerInfo) conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id].handler = handler gossipSub.peers[peerInfo.id].handler = handler
if i mod 2 == 0: if i mod 2 == 0:
@ -628,7 +632,7 @@ when isMainModule and not defined(release):
for i in 0..<15: for i in 0..<15:
let conn = newConnection(newBufferStream(writeHandler)) let conn = newConnection(newBufferStream(writeHandler))
let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) let peerInfo = PeerInfo.init(PrivateKey.random(RSA))
conn.peerInfo = some(peerInfo) conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id].handler = handler gossipSub.peers[peerInfo.id].handler = handler
gossipSub.gossipsub[topic].incl(peerInfo.id) gossipSub.gossipsub[topic].incl(peerInfo.id)
@ -650,10 +654,10 @@ when isMainModule and not defined(release):
test "`getGossipPeers` - should not crash on missing topics in mesh": test "`getGossipPeers` - should not crash on missing topics in mesh":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
let gossipSub = newPubSub(TestGossipSub, let gossipSub = newPubSub(TestGossipSub,
PeerInfo.init(PrivateKey.random(RSA))) PeerInfo.init(PrivateKey.random(RSA)))
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} =
discard discard
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = proc writeHandler(data: seq[byte]) {.async, gcsafe.} =
@ -665,7 +669,7 @@ when isMainModule and not defined(release):
for i in 0..<30: for i in 0..<30:
let conn = newConnection(newBufferStream(writeHandler)) let conn = newConnection(newBufferStream(writeHandler))
let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) let peerInfo = PeerInfo.init(PrivateKey.random(RSA))
conn.peerInfo = some(peerInfo) conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id].handler = handler gossipSub.peers[peerInfo.id].handler = handler
if i mod 2 == 0: if i mod 2 == 0:
@ -682,10 +686,10 @@ when isMainModule and not defined(release):
test "`getGossipPeers` - should not crash on missing topics in gossip": test "`getGossipPeers` - should not crash on missing topics in gossip":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
let gossipSub = newPubSub(TestGossipSub, let gossipSub = newPubSub(TestGossipSub,
PeerInfo.init(PrivateKey.random(RSA))) PeerInfo.init(PrivateKey.random(RSA)))
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} =
discard discard
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = proc writeHandler(data: seq[byte]) {.async, gcsafe.} =
@ -697,7 +701,7 @@ when isMainModule and not defined(release):
for i in 0..<30: for i in 0..<30:
let conn = newConnection(newBufferStream(writeHandler)) let conn = newConnection(newBufferStream(writeHandler))
let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) let peerInfo = PeerInfo.init(PrivateKey.random(RSA))
conn.peerInfo = some(peerInfo) conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id].handler = handler gossipSub.peers[peerInfo.id].handler = handler
if i mod 2 == 0: if i mod 2 == 0:
@ -714,10 +718,10 @@ when isMainModule and not defined(release):
test "`getGossipPeers` - should not crash on missing topics in gossip": test "`getGossipPeers` - should not crash on missing topics in gossip":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
let gossipSub = newPubSub(TestGossipSub, let gossipSub = newPubSub(TestGossipSub,
PeerInfo.init(PrivateKey.random(RSA))) PeerInfo.init(PrivateKey.random(RSA)))
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} =
discard discard
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = proc writeHandler(data: seq[byte]) {.async, gcsafe.} =
@ -729,7 +733,7 @@ when isMainModule and not defined(release):
for i in 0..<30: for i in 0..<30:
let conn = newConnection(newBufferStream(writeHandler)) let conn = newConnection(newBufferStream(writeHandler))
let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) let peerInfo = PeerInfo.init(PrivateKey.random(RSA))
conn.peerInfo = some(peerInfo) conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id].handler = handler gossipSub.peers[peerInfo.id].handler = handler
if i mod 2 == 0: if i mod 2 == 0:

View File

@ -22,7 +22,7 @@ type
historySize*: Natural historySize*: Natural
windowSize*: Natural windowSize*: Natural
proc put*(c: MCache, msg: Message) = proc put*(c: MCache, msg: Message) =
proc handler(key: string, val: Message) {.gcsafe.} = proc handler(key: string, val: Message) {.gcsafe.} =
## make sure we remove the message from history ## make sure we remove the message from history
## to keep things consisten ## to keep things consisten
@ -38,13 +38,13 @@ proc get*(c: MCache, mid: string): Option[Message] =
if mid in c.msgs: if mid in c.msgs:
result = some(c.msgs[mid]) result = some(c.msgs[mid])
proc window*(c: MCache, topic: string): HashSet[string] = proc window*(c: MCache, topic: string): HashSet[string] =
result = initHashSet[string]() result = initHashSet[string]()
let len = let len =
if c.windowSize > c.history.len: if c.windowSize > c.history.len:
c.history.len c.history.len
else: else:
c.windowSize c.windowSize
if c.history.len > 0: if c.history.len > 0:

View File

@ -32,10 +32,10 @@ type
handler*: seq[TopicHandler] handler*: seq[TopicHandler]
PubSub* = ref object of LPProtocol PubSub* = ref object of LPProtocol
peerInfo*: PeerInfo # this peer's info peerInfo*: PeerInfo # this peer's info
topics*: Table[string, Topic] # local topics topics*: Table[string, Topic] # local topics
peers*: Table[string, PubSubPeer] # peerid to peer map peers*: Table[string, PubSubPeer] # peerid to peer map
triggerSelf*: bool # trigger own local handler on publish triggerSelf*: bool # trigger own local handler on publish
cleanupLock: AsyncLock cleanupLock: AsyncLock
proc sendSubs*(p: PubSub, proc sendSubs*(p: PubSub,
@ -49,8 +49,8 @@ proc sendSubs*(p: PubSub,
var msg: RPCMsg var msg: RPCMsg
for t in topics: for t in topics:
trace "sending topic", peer = peer.id, trace "sending topic", peer = peer.id,
subscribe = subscribe, subscribe = subscribe,
topicName = t topicName = t
msg.subscriptions.add(SubOpts(topic: t, subscribe: subscribe)) msg.subscriptions.add(SubOpts(topic: t, subscribe: subscribe))
@ -62,7 +62,7 @@ method rpcHandler*(p: PubSub,
## handle rpc messages ## handle rpc messages
discard discard
method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.async, base, gcsafe.} = method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.async, base, gcsafe.} =
## handle peer disconnects ## handle peer disconnects
if peer.id in p.peers: if peer.id in p.peers:
p.peers.del(peer.id) p.peers.del(peer.id)
@ -71,7 +71,7 @@ proc cleanUpHelper(p: PubSub, peer: PubSubPeer) {.async.} =
await p.cleanupLock.acquire() await p.cleanupLock.acquire()
if peer.refs == 0: if peer.refs == 0:
await p.handleDisconnect(peer) await p.handleDisconnect(peer)
peer.refs.dec() # decrement refcount peer.refs.dec() # decrement refcount
p.cleanupLock.release() p.cleanupLock.release()
@ -102,20 +102,20 @@ method handleConn*(p: PubSub,
## that we're interested in ## that we're interested in
## ##
if conn.peerInfo.isNone: if isNil(conn.peerInfo):
trace "no valid PeerId for peer" trace "no valid PeerId for peer"
await conn.close() await conn.close()
return return
proc handler(peer: PubSubPeer, msgs: seq[RPCMsg]) {.async, gcsafe.} = proc handler(peer: PubSubPeer, msgs: seq[RPCMsg]) {.async, gcsafe.} =
# call floodsub rpc handler # call floodsub rpc handler
await p.rpcHandler(peer, msgs) await p.rpcHandler(peer, msgs)
let peer = p.getPeer(conn.peerInfo.get(), proto) let peer = p.getPeer(conn.peerInfo, proto)
let topics = toSeq(p.topics.keys) let topics = toSeq(p.topics.keys)
if topics.len > 0: if topics.len > 0:
await p.sendSubs(peer, topics, true) await p.sendSubs(peer, topics, true)
peer.handler = handler peer.handler = handler
await peer.handle(conn) # spawn peer read loop await peer.handle(conn) # spawn peer read loop
trace "pubsub peer handler ended, cleaning up" trace "pubsub peer handler ended, cleaning up"
@ -123,23 +123,21 @@ method handleConn*(p: PubSub,
method subscribeToPeer*(p: PubSub, method subscribeToPeer*(p: PubSub,
conn: Connection) {.base, async, gcsafe.} = conn: Connection) {.base, async, gcsafe.} =
var peer = p.getPeer(conn.peerInfo.get(), p.codec) var peer = p.getPeer(conn.peerInfo, p.codec)
trace "setting connection for peer", peerId = conn.peerInfo.get().id trace "setting connection for peer", peerId = conn.peerInfo.id
if not peer.isConnected: if not peer.isConnected:
peer.conn = conn peer.conn = conn
# handle connection close # handle connection close
conn.closeEvent.wait() conn.closeEvent.wait()
.addCallback( .addCallback do (udata: pointer = nil):
proc(udata: pointer = nil) {.gcsafe.} = trace "connection closed, cleaning up peer",
trace "connection closed, cleaning up peer", peer = conn.peerInfo.id
peer = conn.peerInfo.get().id
asyncCheck p.cleanUpHelper(peer) asyncCheck p.cleanUpHelper(peer)
)
method unsubscribe*(p: PubSub, method unsubscribe*(p: PubSub,
topics: seq[TopicPair]) {.base, async, gcsafe.} = topics: seq[TopicPair]) {.base, async, gcsafe.} =
## unsubscribe from a list of ``topic`` strings ## unsubscribe from a list of ``topic`` strings
for t in topics: for t in topics:
for i, h in p.topics[t.topic].handler: for i, h in p.topics[t.topic].handler:
@ -155,18 +153,18 @@ method unsubscribe*(p: PubSub,
method subscribeTopic*(p: PubSub, method subscribeTopic*(p: PubSub,
topic: string, topic: string,
subscribe: bool, subscribe: bool,
peerId: string) {.base, gcsafe.} = peerId: string) {.base, gcsafe.} =
discard discard
method subscribe*(p: PubSub, method subscribe*(p: PubSub,
topic: string, topic: string,
handler: TopicHandler) {.base, async, gcsafe.} = handler: TopicHandler) {.base, async, gcsafe.} =
## subscribe to a topic ## subscribe to a topic
## ##
## ``topic`` - a string topic to subscribe to ## ``topic`` - a string topic to subscribe to
## ##
## ``handler`` - is a user provided proc ## ``handler`` - is a user provided proc
## that will be triggered ## that will be triggered
## on every received message ## on every received message
## ##
if topic notin p.topics: if topic notin p.topics:
@ -180,14 +178,14 @@ method subscribe*(p: PubSub,
method publish*(p: PubSub, method publish*(p: PubSub,
topic: string, topic: string,
data: seq[byte]) {.base, async, gcsafe.} = data: seq[byte]) {.base, async, gcsafe.} =
## publish to a ``topic`` ## publish to a ``topic``
if p.triggerSelf and topic in p.topics: if p.triggerSelf and topic in p.topics:
for h in p.topics[topic].handler: for h in p.topics[topic].handler:
await h(topic, data) await h(topic, data)
method initPubSub*(p: PubSub) {.base.} = method initPubSub*(p: PubSub) {.base.} =
## perform pubsub initializaion ## perform pubsub initializaion
discard discard
method start*(p: PubSub) {.async, base.} = method start*(p: PubSub) {.async, base.} =

View File

@ -9,7 +9,7 @@
import options, hashes, strutils, tables, hashes import options, hashes, strutils, tables, hashes
import chronos, chronicles import chronos, chronicles
import rpc/[messages, message, protobuf], import rpc/[messages, message, protobuf],
timedcache, timedcache,
../../peer, ../../peer,
../../peerinfo, ../../peerinfo,
@ -79,23 +79,21 @@ proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async, gcsafe.} =
if $encodedHex.hash in p.sentRpcCache: if $encodedHex.hash in p.sentRpcCache:
trace "message already sent to peer, skipping", peer = p.id trace "message already sent to peer, skipping", peer = p.id
continue continue
proc sendToRemote() {.async.} = proc sendToRemote() {.async.} =
trace "sending encoded msgs to peer", peer = p.id, encoded = encodedHex trace "sending encoded msgs to peer", peer = p.id, encoded = encodedHex
await p.sendConn.writeLp(encoded.buffer) await p.sendConn.writeLp(encoded.buffer)
p.sentRpcCache.put($encodedHex.hash) p.sentRpcCache.put($encodedHex.hash)
# if no connection has been set, # if no connection has been set,
# queue messages untill a connection # queue messages untill a connection
# becomes available # becomes available
if p.isConnected: if p.isConnected:
await sendToRemote() await sendToRemote()
return return
p.onConnect.wait().addCallback( p.onConnect.wait().addCallback do (udata: pointer):
proc(udata: pointer) =
asyncCheck sendToRemote() asyncCheck sendToRemote()
)
trace "enqueued message to send at a later time" trace "enqueued message to send at a later time"
except CatchableError as exc: except CatchableError as exc:
@ -112,7 +110,7 @@ proc sendGraft*(p: PubSubPeer, topics: seq[string]) {.async, gcsafe.} =
trace "sending graft msg to peer", peer = p.id, topicID = topic trace "sending graft msg to peer", peer = p.id, topicID = topic
await p.send(@[RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)])))]) await p.send(@[RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)])))])
proc sendPrune*(p: PubSubPeer, topics: seq[string]) {.async, gcsafe.} = proc sendPrune*(p: PubSubPeer, topics: seq[string]) {.async, gcsafe.} =
for topic in topics: for topic in topics:
trace "sending prune msg to peer", peer = p.id, topicID = topic trace "sending prune msg to peer", peer = p.id, topicID = topic
await p.send(@[RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)])))]) await p.send(@[RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)])))])

View File

@ -14,10 +14,10 @@ import messages,
../../../crypto/crypto, ../../../crypto/crypto,
../../../peer ../../../peer
proc encodeGraft*(graft: ControlGraft, pb: var ProtoBuffer) {.gcsafe.} = proc encodeGraft*(graft: ControlGraft, pb: var ProtoBuffer) {.gcsafe.} =
pb.write(initProtoField(1, graft.topicID)) pb.write(initProtoField(1, graft.topicID))
proc decodeGraft*(pb: var ProtoBuffer): seq[ControlGraft] {.gcsafe.} = proc decodeGraft*(pb: var ProtoBuffer): seq[ControlGraft] {.gcsafe.} =
trace "decoding graft msg", buffer = pb.buffer.toHex() trace "decoding graft msg", buffer = pb.buffer.toHex()
while true: while true:
var topic: string var topic: string
@ -28,10 +28,10 @@ proc decodeGraft*(pb: var ProtoBuffer): seq[ControlGraft] {.gcsafe.} =
trace "read topic field from graft msg", topicID = topic trace "read topic field from graft msg", topicID = topic
result.add(ControlGraft(topicID: topic)) result.add(ControlGraft(topicID: topic))
proc encodePrune*(prune: ControlPrune, pb: var ProtoBuffer) {.gcsafe.} = proc encodePrune*(prune: ControlPrune, pb: var ProtoBuffer) {.gcsafe.} =
pb.write(initProtoField(1, prune.topicID)) pb.write(initProtoField(1, prune.topicID))
proc decodePrune*(pb: var ProtoBuffer): seq[ControlPrune] {.gcsafe.} = proc decodePrune*(pb: var ProtoBuffer): seq[ControlPrune] {.gcsafe.} =
trace "decoding prune msg" trace "decoding prune msg"
while true: while true:
var topic: string var topic: string
@ -41,12 +41,12 @@ proc decodePrune*(pb: var ProtoBuffer): seq[ControlPrune] {.gcsafe.} =
result.add(ControlPrune(topicID: topic)) result.add(ControlPrune(topicID: topic))
proc encodeIHave*(ihave: ControlIHave, pb: var ProtoBuffer) {.gcsafe.} = proc encodeIHave*(ihave: ControlIHave, pb: var ProtoBuffer) {.gcsafe.} =
pb.write(initProtoField(1, ihave.topicID)) pb.write(initProtoField(1, ihave.topicID))
for mid in ihave.messageIDs: for mid in ihave.messageIDs:
pb.write(initProtoField(2, mid)) pb.write(initProtoField(2, mid))
proc decodeIHave*(pb: var ProtoBuffer): seq[ControlIHave] {.gcsafe.} = proc decodeIHave*(pb: var ProtoBuffer): seq[ControlIHave] {.gcsafe.} =
trace "decoding ihave msg" trace "decoding ihave msg"
while true: while true:
@ -67,11 +67,11 @@ proc decodeIHave*(pb: var ProtoBuffer): seq[ControlIHave] {.gcsafe.} =
result.add(control) result.add(control)
proc encodeIWant*(iwant: ControlIWant, pb: var ProtoBuffer) {.gcsafe.} = proc encodeIWant*(iwant: ControlIWant, pb: var ProtoBuffer) {.gcsafe.} =
for mid in iwant.messageIDs: for mid in iwant.messageIDs:
pb.write(initProtoField(1, mid)) pb.write(initProtoField(1, mid))
proc decodeIWant*(pb: var ProtoBuffer): seq[ControlIWant] {.gcsafe.} = proc decodeIWant*(pb: var ProtoBuffer): seq[ControlIWant] {.gcsafe.} =
trace "decoding ihave msg" trace "decoding ihave msg"
while pb.enterSubMessage() > 0: while pb.enterSubMessage() > 0:
@ -82,7 +82,7 @@ proc decodeIWant*(pb: var ProtoBuffer): seq[ControlIWant] {.gcsafe.} =
iWant.messageIDs.add(mid) iWant.messageIDs.add(mid)
result.add(iWant) result.add(iWant)
proc encodeControl*(control: ControlMessage, pb: var ProtoBuffer) {.gcsafe.} = proc encodeControl*(control: ControlMessage, pb: var ProtoBuffer) {.gcsafe.} =
if control.ihave.len > 0: if control.ihave.len > 0:
var ihave = initProtoBuffer() var ihave = initProtoBuffer()
for h in control.ihave: for h in control.ihave:
@ -96,7 +96,7 @@ proc encodeControl*(control: ControlMessage, pb: var ProtoBuffer) {.gcsafe.} =
var iwant = initProtoBuffer() var iwant = initProtoBuffer()
for w in control.iwant: for w in control.iwant:
w.encodeIWant(iwant) w.encodeIWant(iwant)
# write messages to protobuf # write messages to protobuf
iwant.finish() iwant.finish()
pb.write(initProtoField(2, iwant)) pb.write(initProtoField(2, iwant))
@ -105,7 +105,7 @@ proc encodeControl*(control: ControlMessage, pb: var ProtoBuffer) {.gcsafe.} =
var graft = initProtoBuffer() var graft = initProtoBuffer()
for g in control.graft: for g in control.graft:
g.encodeGraft(graft) g.encodeGraft(graft)
# write messages to protobuf # write messages to protobuf
graft.finish() graft.finish()
pb.write(initProtoField(3, graft)) pb.write(initProtoField(3, graft))
@ -114,12 +114,12 @@ proc encodeControl*(control: ControlMessage, pb: var ProtoBuffer) {.gcsafe.} =
var prune = initProtoBuffer() var prune = initProtoBuffer()
for p in control.prune: for p in control.prune:
p.encodePrune(prune) p.encodePrune(prune)
# write messages to protobuf # write messages to protobuf
prune.finish() prune.finish()
pb.write(initProtoField(4, prune)) pb.write(initProtoField(4, prune))
proc decodeControl*(pb: var ProtoBuffer): Option[ControlMessage] {.gcsafe.} = proc decodeControl*(pb: var ProtoBuffer): Option[ControlMessage] {.gcsafe.} =
trace "decoding control submessage" trace "decoding control submessage"
var control: ControlMessage var control: ControlMessage
while true: while true:
@ -137,17 +137,17 @@ proc decodeControl*(pb: var ProtoBuffer): Option[ControlMessage] {.gcsafe.} =
control.graft = pb.decodeGraft() control.graft = pb.decodeGraft()
of 4: of 4:
control.prune = pb.decodePrune() control.prune = pb.decodePrune()
else: else:
raise newException(CatchableError, "message type not recognized") raise newException(CatchableError, "message type not recognized")
if result.isNone: if result.isNone:
result = some(control) result = some(control)
proc encodeSubs*(subs: SubOpts, pb: var ProtoBuffer) {.gcsafe.} = proc encodeSubs*(subs: SubOpts, pb: var ProtoBuffer) {.gcsafe.} =
pb.write(initProtoField(1, subs.subscribe)) pb.write(initProtoField(1, subs.subscribe))
pb.write(initProtoField(2, subs.topic)) pb.write(initProtoField(2, subs.topic))
proc decodeSubs*(pb: var ProtoBuffer): seq[SubOpts] {.gcsafe.} = proc decodeSubs*(pb: var ProtoBuffer): seq[SubOpts] {.gcsafe.} =
while true: while true:
var subOpt: SubOpts var subOpt: SubOpts
var subscr: int var subscr: int
@ -163,7 +163,7 @@ proc decodeSubs*(pb: var ProtoBuffer): seq[SubOpts] {.gcsafe.} =
trace "got subscriptions", subscriptions = result trace "got subscriptions", subscriptions = result
proc encodeMessage*(msg: Message, pb: var ProtoBuffer) {.gcsafe.} = proc encodeMessage*(msg: Message, pb: var ProtoBuffer) {.gcsafe.} =
pb.write(initProtoField(1, msg.fromPeer)) pb.write(initProtoField(1, msg.fromPeer))
pb.write(initProtoField(2, msg.data)) pb.write(initProtoField(2, msg.data))
pb.write(initProtoField(3, msg.seqno)) pb.write(initProtoField(3, msg.seqno))
@ -173,13 +173,13 @@ proc encodeMessage*(msg: Message, pb: var ProtoBuffer) {.gcsafe.} =
if msg.signature.len > 0: if msg.signature.len > 0:
pb.write(initProtoField(5, msg.signature)) pb.write(initProtoField(5, msg.signature))
if msg.key.len > 0: if msg.key.len > 0:
pb.write(initProtoField(6, msg.key)) pb.write(initProtoField(6, msg.key))
pb.finish() pb.finish()
proc decodeMessages*(pb: var ProtoBuffer): seq[Message] {.gcsafe.} = proc decodeMessages*(pb: var ProtoBuffer): seq[Message] {.gcsafe.} =
# TODO: which of this fields are really optional? # TODO: which of this fields are really optional?
while true: while true:
var msg: Message var msg: Message
@ -202,7 +202,7 @@ proc decodeMessages*(pb: var ProtoBuffer): seq[Message] {.gcsafe.} =
msg.topicIDs.add(topic) msg.topicIDs.add(topic)
trace "read message field", topicName = topic trace "read message field", topicName = topic
topic = "" topic = ""
discard pb.getBytes(5, msg.signature) discard pb.getBytes(5, msg.signature)
trace "read message field", signature = msg.signature trace "read message field", signature = msg.signature
@ -211,7 +211,7 @@ proc decodeMessages*(pb: var ProtoBuffer): seq[Message] {.gcsafe.} =
result.add(msg) result.add(msg)
proc encodeRpcMsg*(msg: RPCMsg): ProtoBuffer {.gcsafe.} = proc encodeRpcMsg*(msg: RPCMsg): ProtoBuffer {.gcsafe.} =
result = initProtoBuffer() result = initProtoBuffer()
trace "encoding msg: ", msg = msg trace "encoding msg: ", msg = msg
@ -244,7 +244,7 @@ proc encodeRpcMsg*(msg: RPCMsg): ProtoBuffer {.gcsafe.} =
if result.buffer.len > 0: if result.buffer.len > 0:
result.finish() result.finish()
proc decodeRpcMsg*(msg: seq[byte]): RPCMsg {.gcsafe.} = proc decodeRpcMsg*(msg: seq[byte]): RPCMsg {.gcsafe.} =
var pb = initProtoBuffer(msg) var pb = initProtoBuffer(msg)
result.subscriptions = newSeq[SubOpts]() result.subscriptions = newSeq[SubOpts]()
@ -262,5 +262,5 @@ proc decodeRpcMsg*(msg: seq[byte]): RPCMsg {.gcsafe.} =
result.messages = pb.decodeMessages() result.messages = pb.decodeMessages()
of 3: of 3:
result.control = pb.decodeControl() result.control = pb.decodeControl()
else: else:
raise newException(CatchableError, "message type not recognized") raise newException(CatchableError, "message type not recognized")

View File

@ -249,7 +249,7 @@ proc newSecureConnection*(conn: Connection,
result.readerCoder.init(cipher, secrets.keyOpenArray(i1), result.readerCoder.init(cipher, secrets.keyOpenArray(i1),
secrets.ivOpenArray(i1)) secrets.ivOpenArray(i1))
result.peerInfo = some(PeerInfo.init(remotePubKey)) result.peerInfo = PeerInfo.init(remotePubKey)
proc transactMessage(conn: Connection, proc transactMessage(conn: Connection,
msg: seq[byte]): Future[seq[byte]] {.async.} = msg: seq[byte]): Future[seq[byte]] {.async.} =
@ -296,8 +296,11 @@ proc handshake*(s: Secio, conn: Connection): Future[SecureConnection] {.async.}
if randomBytes(localNonce) != SecioNonceSize: if randomBytes(localNonce) != SecioNonceSize:
raise newException(CatchableError, "Could not generate random data") raise newException(CatchableError, "Could not generate random data")
var request = createProposal(localNonce, localBytesPubkey, SecioExchanges, var request = createProposal(localNonce,
SecioCiphers, SecioHashes) localBytesPubkey,
SecioExchanges,
SecioCiphers,
SecioHashes)
localPeerId = PeerID.init(s.localPublicKey) localPeerId = PeerID.init(s.localPublicKey)
@ -415,8 +418,8 @@ proc readLoop(sconn: SecureConnection, stream: BufferStream) {.async.} =
let msg = await sconn.readMessage() let msg = await sconn.readMessage()
if msg.len > 0: if msg.len > 0:
await stream.pushTo(msg) await stream.pushTo(msg)
# tight loop, give a chance for other # tight loop, give a chance for other
# stuff to run as well # stuff to run as well
await sleepAsync(1.millis) await sleepAsync(1.millis)
except CatchableError as exc: except CatchableError as exc:
@ -434,12 +437,12 @@ proc handleConn(s: Secio, conn: Connection): Future[Connection] {.async, gcsafe.
asyncCheck readLoop(sconn, stream) asyncCheck readLoop(sconn, stream)
var secured = newConnection(stream) var secured = newConnection(stream)
secured.closeEvent.wait() secured.closeEvent.wait()
.addCallback(proc(udata: pointer) = .addCallback do (udata: pointer):
trace "wrapped connection closed, closing upstream" trace "wrapped connection closed, closing upstream"
if not sconn.closed: if not isNil(sconn) and not sconn.closed:
asyncCheck sconn.close() asyncCheck sconn.close()
)
secured.peerInfo = some(PeerInfo.init(sconn.peerInfo.get().publicKey.get())) secured.peerInfo = PeerInfo.init(sconn.peerInfo.publicKey.get())
result = secured result = secured
method init(s: Secio) {.gcsafe.} = method init(s: Secio) {.gcsafe.} =

View File

@ -19,7 +19,7 @@ type ChronosStream* = ref object of LPStream
server: StreamServer server: StreamServer
client: StreamTransport client: StreamTransport
proc newChronosStream*(server: StreamServer, proc newChronosStream*(server: StreamServer,
client: StreamTransport): ChronosStream = client: StreamTransport): ChronosStream =
new result new result
result.server = server result.server = server
@ -28,9 +28,9 @@ proc newChronosStream*(server: StreamServer,
result.writer = newAsyncStreamWriter(client) result.writer = newAsyncStreamWriter(client)
result.closeEvent = newAsyncEvent() result.closeEvent = newAsyncEvent()
method read*(s: ChronosStream, n = -1): Future[seq[byte]] {.async.} = method read*(s: ChronosStream, n = -1): Future[seq[byte]] {.async.} =
if s.reader.atEof: if s.reader.atEof:
raise newLPStreamClosedError() raise newLPStreamEOFError()
try: try:
result = await s.reader.read(n) result = await s.reader.read(n)
@ -39,11 +39,11 @@ method read*(s: ChronosStream, n = -1): Future[seq[byte]] {.async.} =
except AsyncStreamIncorrectError as exc: except AsyncStreamIncorrectError as exc:
raise newLPStreamIncorrectError(exc.msg) raise newLPStreamIncorrectError(exc.msg)
method readExactly*(s: ChronosStream, method readExactly*(s: ChronosStream,
pbytes: pointer, pbytes: pointer,
nbytes: int): Future[void] {.async.} = nbytes: int): Future[void] {.async.} =
if s.reader.atEof: if s.reader.atEof:
raise newLPStreamClosedError() raise newLPStreamEOFError()
try: try:
await s.reader.readExactly(pbytes, nbytes) await s.reader.readExactly(pbytes, nbytes)
@ -56,7 +56,7 @@ method readExactly*(s: ChronosStream,
method readLine*(s: ChronosStream, limit = 0, sep = "\r\n"): Future[string] {.async.} = method readLine*(s: ChronosStream, limit = 0, sep = "\r\n"): Future[string] {.async.} =
if s.reader.atEof: if s.reader.atEof:
raise newLPStreamClosedError() raise newLPStreamEOFError()
try: try:
result = await s.reader.readLine(limit, sep) result = await s.reader.readLine(limit, sep)
@ -67,7 +67,7 @@ method readLine*(s: ChronosStream, limit = 0, sep = "\r\n"): Future[string] {.as
method readOnce*(s: ChronosStream, pbytes: pointer, nbytes: int): Future[int] {.async.} = method readOnce*(s: ChronosStream, pbytes: pointer, nbytes: int): Future[int] {.async.} =
if s.reader.atEof: if s.reader.atEof:
raise newLPStreamClosedError() raise newLPStreamEOFError()
try: try:
result = await s.reader.readOnce(pbytes, nbytes) result = await s.reader.readOnce(pbytes, nbytes)
@ -76,12 +76,12 @@ method readOnce*(s: ChronosStream, pbytes: pointer, nbytes: int): Future[int] {.
except AsyncStreamIncorrectError as exc: except AsyncStreamIncorrectError as exc:
raise newLPStreamIncorrectError(exc.msg) raise newLPStreamIncorrectError(exc.msg)
method readUntil*(s: ChronosStream, method readUntil*(s: ChronosStream,
pbytes: pointer, pbytes: pointer,
nbytes: int, nbytes: int,
sep: seq[byte]): Future[int] {.async.} = sep: seq[byte]): Future[int] {.async.} =
if s.reader.atEof: if s.reader.atEof:
raise newLPStreamClosedError() raise newLPStreamEOFError()
try: try:
result = await s.reader.readUntil(pbytes, nbytes, sep) result = await s.reader.readUntil(pbytes, nbytes, sep)
@ -96,7 +96,7 @@ method readUntil*(s: ChronosStream,
method write*(s: ChronosStream, pbytes: pointer, nbytes: int) {.async.} = method write*(s: ChronosStream, pbytes: pointer, nbytes: int) {.async.} =
if s.writer.atEof: if s.writer.atEof:
raise newLPStreamClosedError() raise newLPStreamEOFError()
try: try:
await s.writer.write(pbytes, nbytes) await s.writer.write(pbytes, nbytes)
@ -109,7 +109,7 @@ method write*(s: ChronosStream, pbytes: pointer, nbytes: int) {.async.} =
method write*(s: ChronosStream, msg: string, msglen = -1) {.async.} = method write*(s: ChronosStream, msg: string, msglen = -1) {.async.} =
if s.writer.atEof: if s.writer.atEof:
raise newLPStreamClosedError() raise newLPStreamEOFError()
try: try:
await s.writer.write(msg, msglen) await s.writer.write(msg, msglen)
@ -122,7 +122,7 @@ method write*(s: ChronosStream, msg: string, msglen = -1) {.async.} =
method write*(s: ChronosStream, msg: seq[byte], msglen = -1) {.async.} = method write*(s: ChronosStream, msg: seq[byte], msglen = -1) {.async.} =
if s.writer.atEof: if s.writer.atEof:
raise newLPStreamClosedError() raise newLPStreamEOFError()
try: try:
await s.writer.write(msg, msglen) await s.writer.write(msg, msglen)
@ -133,7 +133,7 @@ method write*(s: ChronosStream, msg: seq[byte], msglen = -1) {.async.} =
except AsyncStreamIncorrectError as exc: except AsyncStreamIncorrectError as exc:
raise newLPStreamIncorrectError(exc.msg) raise newLPStreamIncorrectError(exc.msg)
method closed*(s: ChronosStream): bool {.inline.} = method closed*(s: ChronosStream): bool {.inline.} =
# TODO: we might only need to check for reader's EOF # TODO: we might only need to check for reader's EOF
result = s.reader.atEof() result = s.reader.atEof()

View File

@ -9,7 +9,7 @@
import chronos import chronos
type type
LPStream* = ref object of RootObj LPStream* = ref object of RootObj
isClosed*: bool isClosed*: bool
closeEvent*: AsyncEvent closeEvent*: AsyncEvent
@ -22,7 +22,7 @@ type
par*: ref Exception par*: ref Exception
LPStreamWriteError* = object of LPStreamError LPStreamWriteError* = object of LPStreamError
par*: ref Exception par*: ref Exception
LPStreamClosedError* = object of LPStreamError LPStreamEOFError* = object of LPStreamError
proc newLPStreamReadError*(p: ref Exception): ref Exception {.inline.} = proc newLPStreamReadError*(p: ref Exception): ref Exception {.inline.} =
var w = newException(LPStreamReadError, "Read stream failed") var w = newException(LPStreamReadError, "Read stream failed")
@ -45,10 +45,10 @@ proc newLPStreamLimitError*(): ref Exception {.inline.} =
proc newLPStreamIncorrectError*(m: string): ref Exception {.inline.} = proc newLPStreamIncorrectError*(m: string): ref Exception {.inline.} =
result = newException(LPStreamIncorrectError, m) result = newException(LPStreamIncorrectError, m)
proc newLPStreamClosedError*(): ref Exception {.inline.} = proc newLPStreamEOFError*(): ref Exception {.inline.} =
result = newException(LPStreamClosedError, "Stream closed!") result = newException(LPStreamEOFError, "Stream EOF!")
method closed*(s: LPStream): bool {.base, inline.} = method closed*(s: LPStream): bool {.base, inline.} =
s.isClosed s.isClosed
method read*(s: LPStream, n = -1): Future[seq[byte]] method read*(s: LPStream, n = -1): Future[seq[byte]]

View File

@ -65,13 +65,17 @@ proc secure(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} =
proc identify(s: Switch, conn: Connection): Future[PeerInfo] {.async, gcsafe.} = proc identify(s: Switch, conn: Connection): Future[PeerInfo] {.async, gcsafe.} =
## identify the connection ## identify the connection
if conn.peerInfo.isSome: if not isNil(conn.peerInfo):
result = conn.peerInfo.get() result = conn.peerInfo
try: try:
if (await s.ms.select(conn, s.identity.codec)): if (await s.ms.select(conn, s.identity.codec)):
let info = await s.identity.identify(conn, conn.peerInfo) let info = await s.identity.identify(conn, conn.peerInfo)
if info.pubKey.isNone and isNil(result):
raise newException(CatchableError,
"no public key provided and no existing peer identity found")
if info.pubKey.isSome: if info.pubKey.isSome:
result = PeerInfo.init(info.pubKey.get()) result = PeerInfo.init(info.pubKey.get())
trace "identify: identified remote peer", peer = result.id trace "identify: identified remote peer", peer = result.id
@ -112,58 +116,58 @@ proc mux(s: Switch, conn: Connection): Future[void] {.async, gcsafe.} =
# add muxer handler cleanup proc # add muxer handler cleanup proc
handlerFut.addCallback do (udata: pointer = nil): handlerFut.addCallback do (udata: pointer = nil):
trace "muxer handler completed for peer", trace "muxer handler completed for peer",
peer = conn.peerInfo.get().id peer = conn.peerInfo.id
# do identify first, so that we have a # do identify first, so that we have a
# PeerInfo in case we didn't before # PeerInfo in case we didn't before
conn.peerInfo = some((await s.identify(stream))) conn.peerInfo = await s.identify(stream)
await stream.close() # close identify stream await stream.close() # close identify stream
trace "connection's peerInfo", peerInfo = conn.peerInfo trace "connection's peerInfo", peerInfo = conn.peerInfo
# store it in muxed connections if we have a peer for it # store it in muxed connections if we have a peer for it
if conn.peerInfo.isSome: if not isNil(conn.peerInfo):
trace "adding muxer for peer", peer = conn.peerInfo.get().id trace "adding muxer for peer", peer = conn.peerInfo.id
s.muxed[conn.peerInfo.get().id] = muxer s.muxed[conn.peerInfo.id] = muxer
proc cleanupConn(s: Switch, conn: Connection) {.async, gcsafe.} = proc cleanupConn(s: Switch, conn: Connection) {.async, gcsafe.} =
# if conn.peerInfo.peerId.isSome: if not isNil(conn.peerInfo):
let id = conn.peerInfo.get().id let id = conn.peerInfo.id
trace "cleaning up connection for peer", peerId = id trace "cleaning up connection for peer", peerId = id
if id in s.muxed: if id in s.muxed:
await s.muxed[id].close() await s.muxed[id].close()
s.muxed.del(id) s.muxed.del(id)
if id in s.connections: if id in s.connections:
await s.connections[id].close() await s.connections[id].close()
s.connections.del(id) s.connections.del(id)
proc disconnect*(s: Switch, peer: PeerInfo) {.async, gcsafe.} = proc disconnect*(s: Switch, peer: PeerInfo) {.async, gcsafe.} =
let conn = s.connections.getOrDefault(peer.id) let conn = s.connections.getOrDefault(peer.id)
if conn != nil: if not isNil(conn):
await s.cleanupConn(conn) await s.cleanupConn(conn)
proc getMuxedStream(s: Switch, peerInfo: PeerInfo): Future[Option[Connection]] {.async, gcsafe.} = proc getMuxedStream(s: Switch, peerInfo: PeerInfo): Future[Connection] {.async, gcsafe.} =
# if there is a muxer for the connection # if there is a muxer for the connection
# use it instead to create a muxed stream # use it instead to create a muxed stream
if peerInfo.id in s.muxed: if peerInfo.id in s.muxed:
trace "connection is muxed, setting up a stream" trace "connection is muxed, setting up a stream"
let muxer = s.muxed[peerInfo.id] let muxer = s.muxed[peerInfo.id]
let conn = await muxer.newStream() let conn = await muxer.newStream()
result = some(conn) result = conn
proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} = proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} =
trace "handling connection", conn = conn trace "handling connection", conn = conn
result = conn result = conn
# don't mux/secure twise # don't mux/secure twise
if conn.peerInfo.get().id in s.muxed: if conn.peerInfo.id in s.muxed:
return return
result = await s.secure(result) # secure the connection result = await s.secure(result) # secure the connection
await s.mux(result) # mux it if possible await s.mux(result) # mux it if possible
s.connections[conn.peerInfo.get().id] = result s.connections[conn.peerInfo.id] = result
proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} = proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} =
trace "upgrading incoming connection" trace "upgrading incoming connection"
@ -206,7 +210,7 @@ proc dial*(s: Switch,
trace "Dialing address", address = $a trace "Dialing address", address = $a
result = await t.dial(a) result = await t.dial(a)
# make sure to assign the peer to the connection # make sure to assign the peer to the connection
result.peerInfo = some peer result.peerInfo = peer
result = await s.upgradeOutgoing(result) result = await s.upgradeOutgoing(result)
result.closeEvent.wait().addCallback do (udata: pointer): result.closeEvent.wait().addCallback do (udata: pointer):
asyncCheck s.cleanupConn(result) asyncCheck s.cleanupConn(result)
@ -214,11 +218,14 @@ proc dial*(s: Switch,
else: else:
trace "Reusing existing connection" trace "Reusing existing connection"
if isNil(result):
raise newException(CatchableError, "unable to establish outgoing link!")
if proto.len > 0 and not result.closed: if proto.len > 0 and not result.closed:
let stream = await s.getMuxedStream(peer) let stream = await s.getMuxedStream(peer)
if stream.isSome: if not isNil(stream):
trace "Connection is muxed, return muxed stream" trace "Connection is muxed, return muxed stream"
result = stream.get() result = stream
trace "Attempting to select remote", proto = proto trace "Attempting to select remote", proto = proto
if not await s.ms.select(result, proto): if not await s.ms.select(result, proto):
@ -324,7 +331,7 @@ proc newSwitch*(peerInfo: PeerInfo,
val.muxerHandler = proc(muxer: Muxer) {.async, gcsafe.} = val.muxerHandler = proc(muxer: Muxer) {.async, gcsafe.} =
trace "got new muxer" trace "got new muxer"
let stream = await muxer.newStream() let stream = await muxer.newStream()
muxer.connection.peerInfo = some((await s.identify(stream))) muxer.connection.peerInfo = await s.identify(stream)
await stream.close() await stream.close()
for k in secureManagers.keys: for k in secureManagers.keys:

View File

@ -30,7 +30,7 @@ proc connHandler*(t: Transport,
let conn: Connection = newConnection(newChronosStream(server, client)) let conn: Connection = newConnection(newChronosStream(server, client))
conn.observedAddrs = MultiAddress.init(client.remoteAddress) conn.observedAddrs = MultiAddress.init(client.remoteAddress)
if not initiator: if not initiator:
let handlerFut = if t.handler == nil: nil else: t.handler(conn) let handlerFut = if isNil(t.handler): nil else: t.handler(conn)
let connHolder: ConnHolder = ConnHolder(connection: conn, let connHolder: ConnHolder = ConnHolder(connection: conn,
connFuture: handlerFut) connFuture: handlerFut)
t.connections.add(connHolder) t.connections.add(connHolder)
@ -77,6 +77,6 @@ method dial*(t: TcpTransport,
let client: StreamTransport = await connect(address) let client: StreamTransport = await connect(address)
result = await t.connHandler(t.server, client, true) result = await t.connHandler(t.server, client, true)
method handles*(t: TcpTransport, address: MultiAddress): bool {.gcsafe.} = method handles*(t: TcpTransport, address: MultiAddress): bool {.gcsafe.} =
if procCall Transport(t).handles(address): if procCall Transport(t).handles(address):
result = address.protocols.filterIt( it == multiCodec("tcp") ).len > 0 result = address.protocols.filterIt( it == multiCodec("tcp") ).len > 0

View File

@ -25,7 +25,7 @@ proc createGossipSub(): GossipSub =
suite "GossipSub": suite "GossipSub":
test "should add remote peer topic subscriptions": test "should add remote peer topic subscriptions":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
discard discard
let gossip1 = createGossipSub() let gossip1 = createGossipSub()
@ -33,11 +33,11 @@ suite "GossipSub":
var buf1 = newBufferStream() var buf1 = newBufferStream()
var conn1 = newConnection(buf1) var conn1 = newConnection(buf1)
conn1.peerInfo = some(gossip1.peerInfo) conn1.peerInfo = gossip1.peerInfo
var buf2 = newBufferStream() var buf2 = newBufferStream()
var conn2 = newConnection(buf2) var conn2 = newConnection(buf2)
conn2.peerInfo = some(gossip2.peerInfo) conn2.peerInfo = gossip2.peerInfo
buf1 = buf1 | buf2 | buf1 buf1 = buf1 | buf2 | buf1
@ -50,7 +50,7 @@ suite "GossipSub":
check: check:
"foobar" in gossip2.gossipsub "foobar" in gossip2.gossipsub
gossip1.peerInfo.id in gossip2.gossipsub["foobar"] gossip1.peerInfo.id in gossip2.gossipsub["foobar"]
result = true result = true
check: check:
@ -92,7 +92,7 @@ suite "GossipSub":
test "should add remote peer topic subscriptions if both peers are subscribed": test "should add remote peer topic subscriptions if both peers are subscribed":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
discard discard
let gossip1 = createGossipSub() let gossip1 = createGossipSub()
@ -100,12 +100,12 @@ suite "GossipSub":
var buf1 = newBufferStream() var buf1 = newBufferStream()
var conn1 = newConnection(buf1) var conn1 = newConnection(buf1)
conn1.peerInfo = some(gossip1.peerInfo) conn1.peerInfo = gossip1.peerInfo
var buf2 = newBufferStream() var buf2 = newBufferStream()
var conn2 = newConnection(buf2) var conn2 = newConnection(buf2)
conn2.peerInfo = some(gossip2.peerInfo) conn2.peerInfo = gossip2.peerInfo
buf1 = buf1 | buf2 | buf1 buf1 = buf1 | buf2 | buf1
await gossip1.subscribeToPeer(conn2) await gossip1.subscribeToPeer(conn2)
@ -181,7 +181,7 @@ suite "GossipSub":
# test "send over fanout A -> B": # test "send over fanout A -> B":
# proc testRun(): Future[bool] {.async.} = # proc testRun(): Future[bool] {.async.} =
# var handlerFut = newFuture[bool]() # var handlerFut = newFuture[bool]()
# proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = # proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
# check: # check:
# topic == "foobar" # topic == "foobar"
# cast[string](data) == "Hello!" # cast[string](data) == "Hello!"
@ -221,7 +221,7 @@ suite "GossipSub":
test "e2e - send over fanout A -> B": test "e2e - send over fanout A -> B":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
var passed: bool var passed: bool
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar" check topic == "foobar"
passed = true passed = true
@ -255,7 +255,7 @@ suite "GossipSub":
# test "send over mesh A -> B": # test "send over mesh A -> B":
# proc testRun(): Future[bool] {.async.} = # proc testRun(): Future[bool] {.async.} =
# var passed: bool # var passed: bool
# proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = # proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
# check: # check:
# topic == "foobar" # topic == "foobar"
# cast[string](data) == "Hello!" # cast[string](data) == "Hello!"
@ -294,7 +294,7 @@ suite "GossipSub":
# test "e2e - send over mesh A -> B": # test "e2e - send over mesh A -> B":
# proc testRun(): Future[bool] {.async.} = # proc testRun(): Future[bool] {.async.} =
# var passed: bool # var passed: bool
# proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = # proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
# check topic == "foobar" # check topic == "foobar"
# passed = true # passed = true
@ -320,7 +320,7 @@ suite "GossipSub":
# check: # check:
# waitFor(testRun()) == true # waitFor(testRun()) == true
# test "with multiple peers": # test "with multiple peers":
# proc testRun(): Future[bool] {.async.} = # proc testRun(): Future[bool] {.async.} =
# var nodes: seq[GossipSub] # var nodes: seq[GossipSub]
# for i in 0..<10: # for i in 0..<10:
@ -361,8 +361,8 @@ suite "GossipSub":
# awaitters.add(dialer.start()) # awaitters.add(dialer.start())
# await nodes[0].publish("foobar", # await nodes[0].publish("foobar",
# cast[seq[byte]]("from node " & # cast[seq[byte]]("from node " &
# nodes[1].peerInfo.peerId.get().pretty)) # nodes[1].peerInfo.peerId.get().pretty))
# await sleepAsync(1000.millis) # await sleepAsync(1000.millis)
@ -404,8 +404,8 @@ suite "GossipSub":
await subscribeNodes(nodes) await subscribeNodes(nodes)
await sleepAsync(10.millis) await sleepAsync(10.millis)
await nodes[0].publish("foobar", await nodes[0].publish("foobar",
cast[seq[byte]]("from node " & cast[seq[byte]]("from node " &
nodes[1].peerInfo.id)) nodes[1].peerInfo.id))
await sleepAsync(1000.millis) await sleepAsync(1000.millis)

View File

@ -18,9 +18,9 @@ suite "Identify":
proc testHandle(): Future[bool] {.async.} = proc testHandle(): Future[bool] {.async.} =
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
let remoteSecKey = PrivateKey.random(RSA) let remoteSecKey = PrivateKey.random(RSA)
let remotePeerInfo = PeerInfo.init(remoteSecKey, let remotePeerInfo = PeerInfo.init(remoteSecKey,
@[ma], @[ma],
@["/test/proto1/1.0.0", @["/test/proto1/1.0.0",
"/test/proto2/1.0.0"]) "/test/proto2/1.0.0"])
var serverFut: Future[void] var serverFut: Future[void]
let identifyProto1 = newIdentify(remotePeerInfo) let identifyProto1 = newIdentify(remotePeerInfo)
@ -40,7 +40,7 @@ suite "Identify":
var peerInfo = PeerInfo.init(PrivateKey.random(RSA), @[ma]) var peerInfo = PeerInfo.init(PrivateKey.random(RSA), @[ma])
let identifyProto2 = newIdentify(peerInfo) let identifyProto2 = newIdentify(peerInfo)
discard await msDial.select(conn, IdentifyCodec) discard await msDial.select(conn, IdentifyCodec)
let id = await identifyProto2.identify(conn, some(remotePeerInfo)) let id = await identifyProto2.identify(conn, remotePeerInfo)
check id.pubKey.get() == remoteSecKey.getKey() check id.pubKey.get() == remoteSecKey.getKey()
check id.addrs[0] == ma check id.addrs[0] == ma
@ -77,7 +77,7 @@ suite "Identify":
var localPeerInfo = PeerInfo.init(PrivateKey.random(RSA), @[ma]) var localPeerInfo = PeerInfo.init(PrivateKey.random(RSA), @[ma])
let identifyProto2 = newIdentify(localPeerInfo) let identifyProto2 = newIdentify(localPeerInfo)
discard await msDial.select(conn, IdentifyCodec) discard await msDial.select(conn, IdentifyCodec)
discard await identifyProto2.identify(conn, some(PeerInfo.init(PrivateKey.random(RSA)))) discard await identifyProto2.identify(conn, PeerInfo.init(PrivateKey.random(RSA)))
await conn.close() await conn.close()
expect IdentityNoMatchError: expect IdentityNoMatchError:

View File

@ -59,10 +59,12 @@ proc readLp*(s: StreamTransport): Future[seq[byte]] {.async, gcsafe.} =
result.setLen(size) result.setLen(size)
if size > 0.uint: if size > 0.uint:
await s.readExactly(addr result[0], int(size)) await s.readExactly(addr result[0], int(size))
except LPStreamIncompleteError, LPStreamReadError: except LPStreamIncompleteError as exc:
trace "remote connection ended unexpectedly", exc = getCurrentExceptionMsg() trace "remote connection ended unexpectedly", exc = exc.msg
except LPStreamReadError as exc:
trace "unable to read from remote connection", exc = exc.msg
proc createNode*(privKey: Option[PrivateKey] = none(PrivateKey), proc createNode*(privKey: Option[PrivateKey] = none(PrivateKey),
address: string = "/ip4/127.0.0.1/tcp/0", address: string = "/ip4/127.0.0.1/tcp/0",
triggerSelf: bool = false, triggerSelf: bool = false,
gossip: bool = false): Switch = gossip: bool = false): Switch =
@ -77,7 +79,7 @@ proc createNode*(privKey: Option[PrivateKey] = none(PrivateKey),
let muxers = [(MplexCodec, mplexProvider)].toTable() let muxers = [(MplexCodec, mplexProvider)].toTable()
let identify = newIdentify(peerInfo) let identify = newIdentify(peerInfo)
let secureManagers = [(SecioCodec, Secure(newSecio(seckey.get())))].toTable() let secureManagers = [(SecioCodec, Secure(newSecio(seckey.get())))].toTable()
var pubSub: Option[PubSub] var pubSub: Option[PubSub]
if gossip: if gossip:
pubSub = some(PubSub(newPubSub(GossipSub, peerInfo, triggerSelf))) pubSub = some(PubSub(newPubSub(GossipSub, peerInfo, triggerSelf)))
@ -116,10 +118,10 @@ proc testPubSubDaemonPublish(gossip: bool = false): Future[bool] {.async.} =
daemonPeer.addresses)) daemonPeer.addresses))
await sleepAsync(1.seconds) await sleepAsync(1.seconds)
await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
proc pubsubHandler(api: DaemonAPI, proc pubsubHandler(api: DaemonAPI,
ticket: PubsubTicket, ticket: PubsubTicket,
message: PubSubMessage): Future[bool] {.async.} = message: PubSubMessage): Future[bool] {.async.} =
result = true # don't cancel subscription result = true # don't cancel subscription
asyncDiscard daemonNode.pubsubSubscribe(testTopic, pubsubHandler) asyncDiscard daemonNode.pubsubSubscribe(testTopic, pubsubHandler)
@ -153,10 +155,10 @@ proc testPubSubNodePublish(gossip: bool = false): Future[bool] {.async.} =
await sleepAsync(1.seconds) await sleepAsync(1.seconds)
await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
proc pubsubHandler(api: DaemonAPI, proc pubsubHandler(api: DaemonAPI,
ticket: PubsubTicket, ticket: PubsubTicket,
message: PubSubMessage): Future[bool] {.async.} = message: PubSubMessage): Future[bool] {.async.} =
let smsg = cast[string](message.data) let smsg = cast[string](message.data)
check smsg == pubsubData check smsg == pubsubData
handlerFuture.complete(true) handlerFuture.complete(true)
@ -185,7 +187,7 @@ suite "Interop":
proc daemonHandler(api: DaemonAPI, stream: P2PStream) {.async.} = proc daemonHandler(api: DaemonAPI, stream: P2PStream) {.async.} =
check cast[string](await stream.transp.readLp()) == "test 1" check cast[string](await stream.transp.readLp()) == "test 1"
asyncDiscard stream.transp.writeLp("test 2") asyncDiscard stream.transp.writeLp("test 2")
await sleepAsync(10.millis) await sleepAsync(10.millis)
check cast[string](await stream.transp.readLp()) == "test 3" check cast[string](await stream.transp.readLp()) == "test 3"
asyncDiscard stream.transp.writeLp("test 4") asyncDiscard stream.transp.writeLp("test 4")

View File

@ -268,7 +268,7 @@ suite "Mplex":
await chann.close() await chann.close()
await chann.write("Hello") await chann.write("Hello")
expect LPStreamClosedError: expect LPStreamEOFError:
waitFor(testClosedForWrite()) waitFor(testClosedForWrite())
test "half closed - channel should close for read by remote": test "half closed - channel should close for read by remote":
@ -281,7 +281,7 @@ suite "Mplex":
discard await chann.read() # this should work, since there is data in the buffer discard await chann.read() # this should work, since there is data in the buffer
discard await chann.read() # this should throw discard await chann.read() # this should throw
expect LPStreamClosedError: expect LPStreamEOFError:
waitFor(testClosedForRead()) waitFor(testClosedForRead())
test "reset - channel should fail reading": test "reset - channel should fail reading":
@ -291,7 +291,7 @@ suite "Mplex":
await chann.reset() await chann.reset()
asyncDiscard chann.read() asyncDiscard chann.read()
expect LPStreamClosedError: expect LPStreamEOFError:
waitFor(testResetRead()) waitFor(testResetRead())
test "reset - channel should fail writing": test "reset - channel should fail writing":
@ -301,7 +301,7 @@ suite "Mplex":
await chann.reset() await chann.reset()
await chann.write(cast[seq[byte]]("Hello!")) await chann.write(cast[seq[byte]]("Hello!"))
expect LPStreamClosedError: expect LPStreamEOFError:
waitFor(testResetWrite()) waitFor(testResetWrite())
test "should not allow pushing data to channel when remote end closed": test "should not allow pushing data to channel when remote end closed":
@ -311,5 +311,5 @@ suite "Mplex":
await chann.closedByRemote() await chann.closedByRemote()
await chann.pushTo(@[byte(1)]) await chann.pushTo(@[byte(1)])
expect LPStreamClosedError: expect LPStreamEOFError:
waitFor(testResetWrite()) waitFor(testResetWrite())

View File

@ -30,24 +30,24 @@ suite "PeerInfo":
check peerId == peerInfo.peerId check peerId == peerInfo.peerId
check seckey.getKey == peerInfo.publicKey.get() check seckey.getKey == peerInfo.publicKey.get()
test "Should return none on missing public key": test "Should init from CIDv0 string":
var peerInfo = PeerInfo.init("QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N")
check:
PeerID.init("QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N") == peerInfo.peerId
# TODO: CIDv1 is handling is missing from PeerID
# https://github.com/status-im/nim-libp2p/issues/53
# test "Should init from CIDv1 string":
# var peerInfo = PeerInfo.init("bafzbeie5745rpv2m6tjyuugywy4d5ewrqgqqhfnf445he3omzpjbx5xqxe")
# check:
# PeerID.init("bafzbeie5745rpv2m6tjyuugywy4d5ewrqgqqhfnf445he3omzpjbx5xqxe") == peerInfo.peerId
test "Should return none if pubkey is missing from id":
let peerInfo = PeerInfo.init(PeerID.init(PrivateKey.random(RSA))) let peerInfo = PeerInfo.init(PeerID.init(PrivateKey.random(RSA)))
check peerInfo.publicKey.isNone check peerInfo.publicKey.isNone
test "Should allow assigning public key": test "Should return some if pubkey is present in id":
let key = PrivateKey.random(RSA) let peerInfo = PeerInfo.init(PeerID.init(PrivateKey.random(Ed25519)))
check peerInfo.publicKey.isSome
let peerInfo = PeerInfo.init(PeerID.init(key))
peerInfo.publicKey = key.getKey()
check peerInfo.publicKey.get() == key.getKey()
test "Should throw on invalid public key assignement":
proc throwsOnInvalidPubKey() =
let validKey = PrivateKey.random(RSA)
let invalidKey = PrivateKey.random(RSA)
let peerInfo = PeerInfo.init(PeerID.init(validKey))
peerInfo.publicKey = invalidKey.getKey()
expect InvalidPublicKeyException:
throwsOnInvalidPubKey()