diff --git a/.travis.yml b/.travis.yml index c307be257..37789bd55 100644 --- a/.travis.yml +++ b/.travis.yml @@ -31,7 +31,7 @@ install: # install and build go-libp2p-daemon - curl -O -L -s -S https://raw.githubusercontent.com/status-im/nimbus-build-system/master/scripts/build_p2pd.sh - - bash build_p2pd.sh p2pdCache HEAD + - bash build_p2pd.sh p2pdCache v0.2.1 script: - nimble install -y --depsOnly diff --git a/README.md b/README.md index 785347832..20293370a 100644 --- a/README.md +++ b/README.md @@ -73,21 +73,21 @@ This stack reflects the minimal requirements for the upcoming Eth2 implementatio To run it, add nim-libp2p to your project's nimble file and spawn a node as follows: ```nim -import tables, options -import chronos, chronicles -import ../libp2p/[switch, +import tables +import chronos +import ../libp2p/[switch, multistream, - protocols/identify, + protocols/identify, connection, - transports/transport, + transports/transport, transports/tcptransport, - multiaddress, + multiaddress, peerinfo, - crypto/crypto, + crypto/crypto, peer, - protocols/protocol, + protocols/protocol, muxers/muxer, - muxers/mplex/mplex, + muxers/mplex/mplex, muxers/mplex/types, protocols/secure/secio, protocols/secure/secure] @@ -99,9 +99,8 @@ type method init(p: TestProto) {.gcsafe.} = # handle incoming connections in closure - proc handle(conn: Connection, proto: string) {.async, gcsafe.} = - let msg = cast[string](await conn.readLp()) - echo "Got from remote - ", cast[string](msg) + proc handle(conn: Connection, proto: string) {.async, gcsafe.} = + echo "Got from remote - ", cast[string](await conn.readLp()) await conn.writeLp("Hello!") await conn.close() @@ -118,9 +117,9 @@ proc createSwitch(ma: MultiAddress): (Switch, PeerInfo) = let identify = newIdentify(peerInfo) # create the identify proto proc createMplex(conn: Connection): Muxer = - # helper proc to create multiplexers, + # helper proc to create multiplexers, # use this to perform any custom setup up, - # such as adjusting timeout or anything else + # such as adjusting timeout or anything else # that the muxer requires result = newMplex(conn) @@ -130,17 +129,17 @@ proc createSwitch(ma: MultiAddress): (Switch, PeerInfo) = let secureManagers = {SecioCodec: Secure(newSecio(seckey))}.toTable() # setup the secio and any other secure provider # create the switch - let switch = newSwitch(peerInfo, - transports, - identify, - muxers, + let switch = newSwitch(peerInfo, + transports, + identify, + muxers, secureManagers) result = (switch, peerInfo) proc main() {.async, gcsafe.} = let ma1: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") let ma2: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") - + var peerInfo1, peerInfo2: PeerInfo var switch1, switch2: Switch (switch1, peerInfo1) = createSwitch(ma1) # create node 1 @@ -155,9 +154,9 @@ proc main() {.async, gcsafe.} = var switch2Fut = await switch2.start() # start second node let conn = await switch2.dial(switch1.peerInfo, TestCodec) # dial the first node - await conn.writeLp("Hello!") # writeLp send a lenght prefixed buffer over the wire - let msg = cast[string](await conn.readLp()) # readLp reads lenght prefixed bytes and returns a buffer without the prefix - echo "Remote responded with - ", cast[string](msg) + await conn.writeLp("Hello!") # writeLp send a length prefixed buffer over the wire + # readLp reads length prefixed bytes and returns a buffer without the prefix + echo "Remote responded with - ", cast[string](await conn.readLp()) await allFutures(switch1.stop(), switch2.stop()) # close connections and shutdown all transports await allFutures(switch1Fut & switch2Fut) # wait for all transports to shutdown diff --git a/examples/directchat.nim b/examples/directchat.nim index eb59f3b59..0a3ac5a90 100644 --- a/examples/directchat.nim +++ b/examples/directchat.nim @@ -48,9 +48,10 @@ type started: bool proc id (p: ChatProto): string = - result = "unknown" - if p.conn.peerInfo.peerId.isSome: - result = $p.conn.peerInfo.peerId.get() + if not isNil(p.conn.peerInfo): + $p.conn.peerInfo.peerId + else: + "unknown" # forward declaration proc readWriteLoop(p: ChatProto) {.async, gcsafe.} @@ -66,9 +67,8 @@ proc dialPeer(p: ChatProto, address: string) {.async, gcsafe.} = if parts.len == 11 and parts[^2] notin ["ipfs", "p2p"]: quit("invalid or incompelete peerId") - var remotePeer: PeerInfo - remotePeer.peerId = some(PeerID.init(parts[^1])) - remotePeer.addrs.add(MultiAddress.init(address)) + var remotePeer = PeerInfo.init(parts[^1], + @[MultiAddress.init(address)]) echo &"dialing peer: {address}" p.conn = await p.switch.dial(remotePeer, ChatCodec) @@ -165,8 +165,7 @@ proc serveThread(customData: CustomData) {.async.} = var transp = fromPipe(customData.consoleFd) let seckey = PrivateKey.random(RSA) - var peerInfo: PeerInfo - peerInfo.peerId = some(PeerID.init(seckey)) + var peerInfo = PeerInfo.init(seckey) var localAddress = DefaultAddr while true: echo &"Type an address to bind to or Enter to use the default {DefaultAddr}" @@ -202,7 +201,7 @@ proc serveThread(customData: CustomData) {.async.} = var libp2pFuts = await switch.start() chatProto.started = true - let id = peerInfo.peerId.get().pretty + let id = peerInfo.peerId.pretty echo "PeerID: " & id echo "listening on: " for a in peerInfo.addrs: diff --git a/libp2p/connection.nim b/libp2p/connection.nim index d2a1fe339..dd359613b 100644 --- a/libp2p/connection.nim +++ b/libp2p/connection.nim @@ -7,7 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import chronos, chronicles, options +import chronos, chronicles import peerinfo, multiaddress, stream/lpstream, @@ -19,7 +19,7 @@ const DefaultReadSize*: uint = 64 * 1024 type Connection* = ref object of LPStream - peerInfo*: Option[PeerInfo] + peerInfo*: PeerInfo stream*: LPStream observedAddrs*: Multiaddress @@ -39,19 +39,17 @@ proc newConnection*(stream: LPStream): Connection = let this = result if not isNil(result.stream.closeEvent): result.stream.closeEvent.wait(). - addCallback( - proc (udata: pointer) = - if not this.closed: - trace "closing this connection because wrapped stream closed" - asyncCheck this.close() - ) + addCallback do (udata: pointer): + if not this.closed: + trace "closing this connection because wrapped stream closed" + asyncCheck this.close() method read*(s: Connection, n = -1): Future[seq[byte]] {.gcsafe.} = s.stream.read(n) method readExactly*(s: Connection, pbytes: pointer, - nbytes: int): + nbytes: int): Future[void] {.gcsafe.} = s.stream.readExactly(pbytes, nbytes) @@ -70,7 +68,7 @@ method readOnce*(s: Connection, method readUntil*(s: Connection, pbytes: pointer, nbytes: int, - sep: seq[byte]): + sep: seq[byte]): Future[int] {.gcsafe.} = s.stream.readUntil(pbytes, nbytes, sep) @@ -86,13 +84,13 @@ method write*(s: Connection, Future[void] {.gcsafe.} = s.stream.write(msg, msglen) -method write*(s: Connection, - msg: seq[byte], - msglen = -1): +method write*(s: Connection, + msg: seq[byte], + msglen = -1): Future[void] {.gcsafe.} = s.stream.write(msg, msglen) -method closed*(s: Connection): bool = +method closed*(s: Connection): bool = if isNil(s.stream): return false diff --git a/libp2p/muxers/mplex/coder.nim b/libp2p/muxers/mplex/coder.nim index c7ad10f00..808b12e8d 100644 --- a/libp2p/muxers/mplex/coder.nim +++ b/libp2p/muxers/mplex/coder.nim @@ -43,7 +43,7 @@ proc readMplexVarint(conn: Connection): Future[Option[uint]] {.async, gcsafe.} = except LPStreamIncompleteError as exc: trace "unable to read varint", exc = exc.msg -proc readMsg*(conn: Connection): Future[Option[Msg]] {.async, gcsafe.} = +proc readMsg*(conn: Connection): Future[Option[Msg]] {.async, gcsafe.} = let headerVarint = await conn.readMplexVarint() if headerVarint.isNone: return @@ -61,7 +61,7 @@ proc readMsg*(conn: Connection): Future[Option[Msg]] {.async, gcsafe.} = proc writeMsg*(conn: Connection, id: uint, - msgType: MessageType, + msgType: MessageType, data: seq[byte] = @[]) {.async, gcsafe.} = ## write lenght prefixed var buf = initVBuffer() @@ -75,6 +75,6 @@ proc writeMsg*(conn: Connection, proc writeMsg*(conn: Connection, id: uint, - msgType: MessageType, + msgType: MessageType, data: string) {.async, gcsafe.} = result = conn.writeMsg(id, msgType, cast[seq[byte]](data)) diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index ca012fee8..8ca054273 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -18,7 +18,7 @@ import types, logScope: topic = "MplexChannel" -const DefaultChannelSize* = DefaultBufferSize * 64 # 64kb +const DefaultChannelSize* = DefaultBufferSize * 64 # 64kb type LPChannel* = ref object of BufferStream @@ -39,7 +39,7 @@ proc newChannel*(id: uint, conn: Connection, initiator: bool, name: string = "", - size: int = DefaultChannelSize): LPChannel = + size: int = DefaultChannelSize): LPChannel = new result result.id = id result.name = name @@ -51,7 +51,7 @@ proc newChannel*(id: uint, result.asyncLock = newAsyncLock() let chan = result - proc writeHandler(data: seq[byte]): Future[void] {.async.} = + proc writeHandler(data: seq[byte]): Future[void] {.async.} = # writes should happen in sequence await chan.asyncLock.acquire() trace "sending data ", data = data.toHex(), @@ -66,11 +66,11 @@ proc newChannel*(id: uint, proc closeMessage(s: LPChannel) {.async.} = await s.conn.writeMsg(s.id, s.closeCode) # write header -proc closedByRemote*(s: LPChannel) {.async.} = +proc closedByRemote*(s: LPChannel) {.async.} = s.closedRemote = true proc cleanUp*(s: LPChannel): Future[void] = - # method which calls the underlying buffer's `close` + # method which calls the underlying buffer's `close` # method used instead of `close` since it's overloaded to # simulate half-closed streams result = procCall close(BufferStream(s)) @@ -97,16 +97,16 @@ method closed*(s: LPChannel): bool = proc pushTo*(s: LPChannel, data: seq[byte]): Future[void] = if s.closedRemote or s.isReset: - raise newLPStreamClosedError() - trace "pushing data to channel", data = data.toHex(), - id = s.id, + raise newLPStreamEOFError() + trace "pushing data to channel", data = data.toHex(), + id = s.id, initiator = s.initiator result = procCall pushTo(BufferStream(s), data) method read*(s: LPChannel, n = -1): Future[seq[byte]] = if s.closed or s.isReset: - raise newLPStreamClosedError() + raise newLPStreamEOFError() result = procCall read(BufferStream(s), n) @@ -115,7 +115,7 @@ method readExactly*(s: LPChannel, nbytes: int): Future[void] = if s.closed or s.isReset: - raise newLPStreamClosedError() + raise newLPStreamEOFError() result = procCall readExactly(BufferStream(s), pbytes, nbytes) method readLine*(s: LPChannel, @@ -123,38 +123,38 @@ method readLine*(s: LPChannel, sep = "\r\n"): Future[string] = if s.closed or s.isReset: - raise newLPStreamClosedError() + raise newLPStreamEOFError() result = procCall readLine(BufferStream(s), limit, sep) -method readOnce*(s: LPChannel, - pbytes: pointer, - nbytes: int): +method readOnce*(s: LPChannel, + pbytes: pointer, + nbytes: int): Future[int] = if s.closed or s.isReset: - raise newLPStreamClosedError() + raise newLPStreamEOFError() result = procCall readOnce(BufferStream(s), pbytes, nbytes) method readUntil*(s: LPChannel, pbytes: pointer, nbytes: int, - sep: seq[byte]): + sep: seq[byte]): Future[int] = if s.closed or s.isReset: - raise newLPStreamClosedError() + raise newLPStreamEOFError() result = procCall readOnce(BufferStream(s), pbytes, nbytes) method write*(s: LPChannel, - pbytes: pointer, + pbytes: pointer, nbytes: int): Future[void] = if s.closedLocal or s.isReset: - raise newLPStreamClosedError() + raise newLPStreamEOFError() result = procCall write(BufferStream(s), pbytes, nbytes) method write*(s: LPChannel, msg: string, msglen = -1) {.async.} = if s.closedLocal or s.isReset: - raise newLPStreamClosedError() + raise newLPStreamEOFError() result = procCall write(BufferStream(s), msg, msglen) method write*(s: LPChannel, msg: seq[byte], msglen = -1) {.async.} = if s.closedLocal or s.isReset: - raise newLPStreamClosedError() + raise newLPStreamEOFError() result = procCall write(BufferStream(s), msg, msglen) diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index 73300455f..6277a9de0 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -16,8 +16,8 @@ import chronos, chronicles import ../muxer, ../../connection, ../../stream/lpstream, - coder, - types, + coder, + types, lpchannel logScope: @@ -42,7 +42,7 @@ proc newStreamInternal*(m: Mplex, initiator: bool = true, chanId: uint = 0, name: string = ""): - Future[LPChannel] {.async, gcsafe.} = + Future[LPChannel] {.async, gcsafe.} = ## create new channel/stream let id = if initiator: m.currentId.inc(); m.currentId else: chanId trace "creating new channel", channelId = id, initiator = initiator @@ -50,7 +50,7 @@ proc newStreamInternal*(m: Mplex, m.getChannelList(initiator)[id] = result proc cleanupChann(m: Mplex, chann: LPChannel, initiator: bool) {.async, inline.} = - ## call the channel's `close` to signal the + ## call the channel's `close` to signal the ## remote that the channel is closing if not isNil(chann) and not chann.closed: await chann.close() @@ -58,7 +58,7 @@ proc cleanupChann(m: Mplex, chann: LPChannel, initiator: bool) {.async, inline.} m.getChannelList(initiator).del(chann.id) trace "cleaned up channel", id = chann.id -method handle*(m: Mplex) {.async, gcsafe.} = +method handle*(m: Mplex) {.async, gcsafe.} = trace "starting mplex main loop" try: while not m.connection.closed: @@ -100,21 +100,21 @@ method handle*(m: Mplex) {.async, gcsafe.} = continue of MessageType.MsgIn, MessageType.MsgOut: - trace "pushing data to channel", id = id, - initiator = initiator, - msgType = msgType + trace "pushing data to channel", id = id, + initiator = initiator, + msgType = msgType - await channel.pushTo(data) + await channel.pushTo(data) of MessageType.CloseIn, MessageType.CloseOut: - trace "closing channel", id = id, - initiator = initiator, + trace "closing channel", id = id, + initiator = initiator, msgType = msgType await channel.closedByRemote() # m.getChannelList(initiator).del(id) of MessageType.ResetIn, MessageType.ResetOut: - trace "resetting channel", id = id, - initiator = initiator, + trace "resetting channel", id = id, + initiator = initiator, msgType = msgType await channel.resetByRemote() @@ -126,7 +126,7 @@ method handle*(m: Mplex) {.async, gcsafe.} = trace "stopping mplex main loop" await m.connection.close() -proc newMplex*(conn: Connection, +proc newMplex*(conn: Connection, maxChanns: uint = MaxChannels): Mplex = new result result.connection = conn @@ -135,11 +135,9 @@ proc newMplex*(conn: Connection, result.local = initTable[uint, LPChannel]() let m = result - conn.closeEvent.wait().addCallback( - proc(udata: pointer) = - trace "connection closed, cleaning up mplex" - asyncCheck m.close() - ) + conn.closeEvent.wait().addCallback do (udata: pointer): + trace "connection closed, cleaning up mplex" + asyncCheck m.close() method newStream*(m: Mplex, name: string = ""): Future[Connection] {.async, gcsafe.} = let channel = await m.newStreamInternal() @@ -148,7 +146,7 @@ method newStream*(m: Mplex, name: string = ""): Future[Connection] {.async, gcsa result = newConnection(channel) result.peerInfo = m.connection.peerInfo -method close*(m: Mplex) {.async, gcsafe.} = +method close*(m: Mplex) {.async, gcsafe.} = trace "closing mplex muxer" await allFutures(@[allFutures(toSeq(m.remote.values).mapIt(it.reset())), allFutures(toSeq(m.local.values).mapIt(it.reset()))]) diff --git a/libp2p/muxers/muxer.nim b/libp2p/muxers/muxer.nim index 9662ac80b..331f55930 100644 --- a/libp2p/muxers/muxer.nim +++ b/libp2p/muxers/muxer.nim @@ -36,7 +36,7 @@ method newStream*(m: Muxer, name: string = ""): Future[Connection] {.base, async method close*(m: Muxer) {.base, async, gcsafe.} = discard method handle*(m: Muxer): Future[void] {.base, async, gcsafe.} = discard -proc newMuxerProvider*(creator: MuxerConstructor, codec: string): MuxerProvider {.gcsafe.} = +proc newMuxerProvider*(creator: MuxerConstructor, codec: string): MuxerProvider {.gcsafe.} = new result result.newMuxer = creator result.codec = codec diff --git a/libp2p/peer.nim b/libp2p/peer.nim index 0b96bd817..0d70a1522 100644 --- a/libp2p/peer.nim +++ b/libp2p/peer.nim @@ -8,7 +8,7 @@ ## those terms. ## This module implementes API for libp2p peer. -import hashes, options +import hashes import nimcrypto/utils import crypto/crypto, multicodec, multihash, base58, vbuffer import protobuf/minprotobuf diff --git a/libp2p/peerinfo.nim b/libp2p/peerinfo.nim index 9b54213ea..c4b8fba31 100644 --- a/libp2p/peerinfo.nim +++ b/libp2p/peerinfo.nim @@ -23,8 +23,6 @@ type HasPrivate, HasPublic - InvalidPublicKeyException* = object of Exception - PeerInfo* = ref object of RootObj peerId*: PeerID addrs*: seq[MultiAddress] @@ -35,10 +33,6 @@ type of HasPublic: key: Option[PublicKey] -proc newInvalidPublicKeyException(): ref Exception = - newException(InvalidPublicKeyException, - "attempting to assign an invalid public key") - proc init*(p: typedesc[PeerInfo], key: PrivateKey, addrs: seq[MultiAddress] = @[], @@ -60,6 +54,16 @@ proc init*(p: typedesc[PeerInfo], addrs: addrs, protocols: protocols) +proc init*(p: typedesc[PeerInfo], + peerId: string, + addrs: seq[MultiAddress] = @[], + protocols: seq[string] = @[]): PeerInfo {.inline.} = + + PeerInfo(keyType: HasPublic, + peerId: PeerID.init(peerId), + addrs: addrs, + protocols: protocols) + proc init*(p: typedesc[PeerInfo], key: PublicKey, addrs: seq[MultiAddress] = @[], @@ -82,12 +86,6 @@ proc publicKey*(p: PeerInfo): Option[PublicKey] {.inline.} = else: result = some(p.privateKey.getKey()) -proc `publicKey=`*(p: PeerInfo, key: PublicKey) = - if not (PeerID.init(key) == p.peerId): - raise newInvalidPublicKeyException() - - p.key = some(key) - proc id*(p: PeerInfo): string {.inline.} = p.peerId.pretty diff --git a/libp2p/protocols/identify.nim b/libp2p/protocols/identify.nim index e5ee570fc..43e10833d 100644 --- a/libp2p/protocols/identify.nim +++ b/libp2p/protocols/identify.nim @@ -9,11 +9,11 @@ import options import chronos, chronicles -import ../protobuf/minprotobuf, +import ../protobuf/minprotobuf, ../peerinfo, ../connection, - ../peer, - ../crypto/crypto, + ../peer, + ../crypto/crypto, ../multiaddress, ../protocols/protocol @@ -43,7 +43,7 @@ type Identify* = ref object of LPProtocol peerInfo*: PeerInfo -proc encodeMsg*(peerInfo: PeerInfo, observedAddrs: Multiaddress): ProtoBuffer = +proc encodeMsg*(peerInfo: PeerInfo, observedAddrs: Multiaddress): ProtoBuffer = result = initProtoBuffer() result.write(initProtoField(1, peerInfo.publicKey.get().getBytes())) @@ -63,7 +63,7 @@ proc encodeMsg*(peerInfo: PeerInfo, observedAddrs: Multiaddress): ProtoBuffer = result.write(initProtoField(6, agentVersion)) result.finish() -proc decodeMsg*(buf: seq[byte]): IdentifyInfo = +proc decodeMsg*(buf: seq[byte]): IdentifyInfo = var pb = initProtoBuffer(buf) result.pubKey = none(PublicKey) @@ -87,7 +87,7 @@ proc decodeMsg*(buf: seq[byte]): IdentifyInfo = trace "read proto from message", proto = proto result.protos.add(proto) proto = "" - + var observableAddr = newSeq[byte]() if pb.getBytes(4, observableAddr) > 0: # attempt to read the observed addr var ma = MultiAddress.init(observableAddr) @@ -109,7 +109,7 @@ proc newIdentify*(peerInfo: PeerInfo): Identify = result.peerInfo = peerInfo result.init() -method init*(p: Identify) = +method init*(p: Identify) = proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = trace "handling identify request" var pb = encodeMsg(p.peerInfo, await conn.getObservedAddrs()) @@ -119,9 +119,9 @@ method init*(p: Identify) = p.handler = handle p.codec = IdentifyCodec -proc identify*(p: Identify, - conn: Connection, - remotePeerInfo: Option[PeerInfo]): Future[IdentifyInfo] {.async, gcsafe.} = +proc identify*(p: Identify, + conn: Connection, + remotePeerInfo: PeerInfo): Future[IdentifyInfo] {.async, gcsafe.} = var message = await conn.readLp() if len(message) == 0: trace "identify: Invalid or empty message received!" @@ -130,13 +130,13 @@ proc identify*(p: Identify, result = decodeMsg(message) - if remotePeerInfo.isSome and result.pubKey.isSome: + if not isNil(remotePeerInfo) and result.pubKey.isSome: let peer = PeerID.init(result.pubKey.get()) # do a string comaprison of the ids, - # because that is the only thing we + # because that is the only thing we # have in most cases - if peer != remotePeerInfo.get().peerId: + if peer != remotePeerInfo.peerId: trace "Peer ids don't match", remote = peer.pretty(), local = remotePeerInfo.get().id diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 4f4b813b9..3abb8d65d 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -26,27 +26,27 @@ const FloodSubCodec* = "/floodsub/1.0.0" type FloodSub* = ref object of PubSub floodsub*: Table[string, HashSet[string]] # topic to remote peer map - seen*: TimedCache[string] # list of messages forwarded to peers + seen*: TimedCache[string] # list of messages forwarded to peers method subscribeTopic*(f: FloodSub, topic: string, subscribe: bool, peerId: string) {.gcsafe.} = - procCall PubSub(f).subscribeTopic(topic, subscribe, peerId) + procCall PubSub(f).subscribeTopic(topic, subscribe, peerId) - if topic notin f.floodsub: - f.floodsub[topic] = initHashSet[string]() + if topic notin f.floodsub: + f.floodsub[topic] = initHashSet[string]() - if subscribe: - trace "adding subscription for topic", peer = peerId, name = topic - # subscribe the peer to the topic - f.floodsub[topic].incl(peerId) - else: - trace "removing subscription for topic", peer = peerId, name = topic - # unsubscribe the peer from the topic - f.floodsub[topic].excl(peerId) + if subscribe: + trace "adding subscription for topic", peer = peerId, name = topic + # subscribe the peer to the topic + f.floodsub[topic].incl(peerId) + else: + trace "removing subscription for topic", peer = peerId, name = topic + # unsubscribe the peer from the topic + f.floodsub[topic].excl(peerId) -method handleDisconnect*(f: FloodSub, peer: PubSubPeer) {.async, gcsafe.} = +method handleDisconnect*(f: FloodSub, peer: PubSubPeer) {.async, gcsafe.} = ## handle peer disconnects for t in f.floodsub.keys: f.floodsub[t].excl(peer.id) @@ -78,7 +78,7 @@ method rpcHandler*(f: FloodSub, if p in f.peers and f.peers[p].id != peer.id: await f.peers[p].send(@[RPCMsg(messages: m.messages)]) -method init(f: FloodSub) = +method init(f: FloodSub) = proc handler(conn: Connection, proto: string) {.async, gcsafe.} = ## main protocol handler that gets triggered on every ## connection for a protocol string @@ -109,8 +109,8 @@ method publish*(f: FloodSub, trace "publishing message", name = topic, peer = p, data = data await f.peers[p].send(@[RPCMsg(messages: @[msg])]) -method unsubscribe*(f: FloodSub, - topics: seq[TopicPair]) {.async, gcsafe.} = +method unsubscribe*(f: FloodSub, + topics: seq[TopicPair]) {.async, gcsafe.} = await procCall PubSub(f).unsubscribe(topics) for p in f.peers.values: diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 04acb49ef..532a3914f 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -27,7 +27,7 @@ logScope: const GossipSubCodec* = "/meshsub/1.0.0" # overlay parameters -const GossipSubD* = 6 +const GossipSubD* = 6 const GossipSubDlo* = 4 const GossipSubDhi* = 12 @@ -37,25 +37,26 @@ const GossipSubHistoryGossip* = 3 # heartbeat interval const GossipSubHeartbeatInitialDelay* = 100.millis -const GossipSubHeartbeatInterval* = 1.seconds +const GossipSubHeartbeatInterval* = 1.seconds # fanout ttl const GossipSubFanoutTTL* = 60.seconds type GossipSub* = ref object of FloodSub - mesh*: Table[string, HashSet[string]] # meshes - topic to peer - fanout*: Table[string, HashSet[string]] # fanout - topic to peer + mesh*: Table[string, HashSet[string]] # meshes - topic to peer + fanout*: Table[string, HashSet[string]] # fanout - topic to peer gossipsub*: Table[string, HashSet[string]] # topic to peer map of all gossipsub peers - lastFanoutPubSub*: Table[string, Moment] # last publish time for fanout topics + lastFanoutPubSub*: Table[string, Moment] # last publish time for fanout topics gossip*: Table[string, seq[ControlIHave]] # pending gossip - control*: Table[string, ControlMessage] # pending control messages - mcache*: MCache # messages cache - heartbeatCancel*: Future[void] # cancelation future for heartbeat interval + control*: Table[string, ControlMessage] # pending control messages + mcache*: MCache # messages cache + heartbeatCancel*: Future[void] # cancelation future for heartbeat interval heartbeatLock: AsyncLock # TODO: This belong in chronos, temporary left here until chronos is updated -proc addInterval(every: Duration, cb: CallbackFunc, udata: pointer = nil): Future[void] = +proc addInterval(every: Duration, cb: CallbackFunc, + udata: pointer = nil): Future[void] = ## Arrange the callback ``cb`` to be called on every ``Duration`` window var retFuture = newFuture[void]("chronos.addInterval(Duration)") @@ -71,7 +72,7 @@ proc addInterval(every: Duration, cb: CallbackFunc, udata: pointer = nil): Futur scheduleNext() return retFuture -method init(g: GossipSub) = +method init(g: GossipSub) = proc handler(conn: Connection, proto: string) {.async, gcsafe.} = ## main protocol handler that gets triggered on every ## connection for a protocol string @@ -83,7 +84,7 @@ method init(g: GossipSub) = g.handler = handler g.codec = GossipSubCodec -method handleDisconnect(g: GossipSub, peer: PubSubPeer) {.async, gcsafe.} = +method handleDisconnect(g: GossipSub, peer: PubSubPeer) {.async, gcsafe.} = ## handle peer disconnects await procCall FloodSub(g).handleDisconnect(peer) for t in g.gossipsub.keys: @@ -99,26 +100,26 @@ method subscribeTopic*(g: GossipSub, topic: string, subscribe: bool, peerId: string) {.gcsafe.} = - procCall PubSub(g).subscribeTopic(topic, subscribe, peerId) + procCall PubSub(g).subscribeTopic(topic, subscribe, peerId) - if topic notin g.gossipsub: - g.gossipsub[topic] = initHashSet[string]() + if topic notin g.gossipsub: + g.gossipsub[topic] = initHashSet[string]() - if subscribe: - trace "adding subscription for topic", peer = peerId, name = topic - # subscribe the peer to the topic - g.gossipsub[topic].incl(peerId) - else: - trace "removing subscription for topic", peer = peerId, name = topic - # unsubscribe the peer from the topic - g.gossipsub[topic].excl(peerId) + if subscribe: + trace "adding subscription for topic", peer = peerId, name = topic + # subscribe the peer to the topic + g.gossipsub[topic].incl(peerId) + else: + trace "removing subscription for topic", peer = peerId, name = topic + # unsubscribe the peer from the topic + g.gossipsub[topic].excl(peerId) -proc handleGraft(g: GossipSub, +proc handleGraft(g: GossipSub, peer: PubSubPeer, grafts: seq[ControlGraft], respControl: var ControlMessage) = for graft in grafts: - trace "processing graft message", peer = peer.id, + trace "processing graft message", peer = peer.id, topicID = graft.topicID if graft.topicID in g.topics: @@ -131,13 +132,14 @@ proc handleGraft(g: GossipSub, proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) = for prune in prunes: - trace "processing prune message", peer = peer.id, + trace "processing prune message", peer = peer.id, topicID = prune.topicID if prune.topicID in g.mesh: g.mesh[prune.topicID].excl(peer.id) -proc handleIHave(g: GossipSub, peer: PubSubPeer, ihaves: seq[ControlIHave]): ControlIWant = +proc handleIHave(g: GossipSub, peer: PubSubPeer, ihaves: seq[ + ControlIHave]): ControlIWant = for ihave in ihaves: trace "processing ihave message", peer = peer.id, topicID = ihave.topicID @@ -147,7 +149,8 @@ proc handleIHave(g: GossipSub, peer: PubSubPeer, ihaves: seq[ControlIHave]): Con if m notin g.seen: result.messageIDs.add(m) -proc handleIWant(g: GossipSub, peer: PubSubPeer, iwants: seq[ControlIWant]): seq[Message] = +proc handleIWant(g: GossipSub, peer: PubSubPeer, iwants: seq[ + ControlIWant]): seq[Message] = for iwant in iwants: for mid in iwant.messageIDs: trace "processing iwant message", peer = peer.id, @@ -158,7 +161,7 @@ proc handleIWant(g: GossipSub, peer: PubSubPeer, iwants: seq[ControlIWant]): seq method rpcHandler(g: GossipSub, peer: PubSubPeer, - rpcMsgs: seq[RPCMsg]) {.async, gcsafe.} = + rpcMsgs: seq[RPCMsg]) {.async, gcsafe.} = await procCall PubSub(g).rpcHandler(peer, rpcMsgs) trace "processing RPC message", peer = peer.id, msg = rpcMsgs @@ -203,13 +206,13 @@ method rpcHandler(g: GossipSub, for p in toSendPeers: if p in g.peers and g.peers[p].peerInfo.peerId != peer.peerInfo.peerId: - let id = g.peers[p].peerInfo.peerId - let msgs = m.messages.filterIt( - # don't forward to message originator - id != it.fromPeerId() - ) - if msgs.len > 0: - await g.peers[p].send(@[RPCMsg(messages: msgs)]) + let id = g.peers[p].peerInfo.peerId + let msgs = m.messages.filterIt( + # don't forward to message originator + id != it.fromPeerId() + ) + if msgs.len > 0: + await g.peers[p].send(@[RPCMsg(messages: msgs)]) var respControl: ControlMessage if m.control.isSome: @@ -224,9 +227,10 @@ method rpcHandler(g: GossipSub, if respControl.graft.len > 0 or respControl.prune.len > 0 or respControl.ihave.len > 0 or respControl.iwant.len > 0: - await peer.send(@[RPCMsg(control: some(respControl), messages: messages)]) + await peer.send(@[RPCMsg(control: some(respControl), + messages: messages)]) -proc replenishFanout(g: GossipSub, topic: string) {.async, gcsafe.} = +proc replenishFanout(g: GossipSub, topic: string) {.async, gcsafe.} = ## get fanout peers for a topic trace "about to replenish fanout" if topic notin g.fanout: @@ -242,7 +246,7 @@ proc replenishFanout(g: GossipSub, topic: string) {.async, gcsafe.} = trace "fanout replenished with peers", peers = g.fanout[topic].len -proc rebalanceMesh(g: GossipSub, topic: string) {.async, gcsafe.} = +proc rebalanceMesh(g: GossipSub, topic: string) {.async, gcsafe.} = trace "about to rebalance mesh" # create a mesh topic that we're subscribing to if topic notin g.mesh: @@ -284,8 +288,8 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async, gcsafe.} = trace "mesh balanced, got peers", peers = g.mesh[topic].len -proc dropFanoutPeers(g: GossipSub) {.async, gcsafe.} = - # drop peers that we haven't published to in +proc dropFanoutPeers(g: GossipSub) {.async, gcsafe.} = + # drop peers that we haven't published to in # GossipSubFanoutTTL seconds for topic in g.lastFanoutPubSub.keys: if Moment.now > g.lastFanoutPubSub[topic]: @@ -330,7 +334,7 @@ proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} = result[id] = ControlMessage() result[id].ihave.add(ihave) -proc heartbeat(g: GossipSub) {.async, gcsafe.} = +proc heartbeat(g: GossipSub) {.async, gcsafe.} = trace "running heartbeat" await g.heartbeatLock.acquire() @@ -354,7 +358,7 @@ method subscribe*(g: GossipSub, asyncCheck g.rebalanceMesh(topic) method unsubscribe*(g: GossipSub, - topics: seq[TopicPair]) {.async, gcsafe.} = + topics: seq[TopicPair]) {.async, gcsafe.} = await procCall PubSub(g).unsubscribe(topics) for pair in topics: @@ -400,7 +404,7 @@ method start*(g: GossipSub) {.async.} = # setup the heartbeat interval g.heartbeatCancel = addInterval(GossipSubHeartbeatInterval, proc (arg: pointer = nil) {.gcsafe, locks: 0.} = - asyncCheck g.heartbeat) + asyncCheck g.heartbeat) method stop*(g: GossipSub) {.async.} = ## stopt pubsub @@ -428,10 +432,10 @@ method initPubSub(g: GossipSub) = ## Unit tests when isMainModule and not defined(release): - ## Test internal (private) methods for gossip, - ## mesh and fanout maintenance. - ## Usually I wouldn't test private behaviour, - ## but the maintenance methods are quite involved, + ## Test internal (private) methods for gossip, + ## mesh and fanout maintenance. + ## Usually I wouldn't test private behaviour, + ## but the maintenance methods are quite involved, ## hence these tests are here. ## @@ -444,18 +448,18 @@ when isMainModule and not defined(release): suite "GossipSub": test "`rebalanceMesh` Degree Lo": proc testRun(): Future[bool] {.async.} = - let gossipSub = newPubSub(TestGossipSub, + let gossipSub = newPubSub(TestGossipSub, PeerInfo.init(PrivateKey.random(RSA))) let topic = "foobar" gossipSub.mesh[topic] = initHashSet[string]() - proc writeHandler(data: seq[byte]) {.async, gcsafe.} = + proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard for i in 0..<15: let conn = newConnection(newBufferStream(writeHandler)) let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) - conn.peerInfo = some(peerInfo) + conn.peerInfo = peerInfo gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id].conn = conn gossipSub.mesh[topic].incl(peerInfo.id) @@ -471,18 +475,18 @@ when isMainModule and not defined(release): test "`rebalanceMesh` Degree Hi": proc testRun(): Future[bool] {.async.} = - let gossipSub = newPubSub(TestGossipSub, + let gossipSub = newPubSub(TestGossipSub, PeerInfo.init(PrivateKey.random(RSA))) let topic = "foobar" gossipSub.gossipsub[topic] = initHashSet[string]() - proc writeHandler(data: seq[byte]) {.async, gcsafe.} = + proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard for i in 0..<15: let conn = newConnection(newBufferStream(writeHandler)) let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) - conn.peerInfo = some(peerInfo) + conn.peerInfo = peerInfo gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id].conn = conn gossipSub.gossipsub[topic].incl(peerInfo.id) @@ -498,21 +502,21 @@ when isMainModule and not defined(release): test "`replenishFanout` Degree Lo": proc testRun(): Future[bool] {.async.} = - let gossipSub = newPubSub(TestGossipSub, + let gossipSub = newPubSub(TestGossipSub, PeerInfo.init(PrivateKey.random(RSA))) - proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = + proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = discard let topic = "foobar" gossipSub.gossipsub[topic] = initHashSet[string]() - proc writeHandler(data: seq[byte]) {.async, gcsafe.} = + proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard for i in 0..<15: let conn = newConnection(newBufferStream(writeHandler)) var peerInfo = PeerInfo.init(PrivateKey.random(RSA)) - conn.peerInfo = some(peerInfo) + conn.peerInfo = peerInfo gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id].handler = handler gossipSub.gossipsub[topic].incl(peerInfo.id) @@ -528,22 +532,22 @@ when isMainModule and not defined(release): test "`dropFanoutPeers` drop expired fanout topics": proc testRun(): Future[bool] {.async.} = - let gossipSub = newPubSub(TestGossipSub, + let gossipSub = newPubSub(TestGossipSub, PeerInfo.init(PrivateKey.random(RSA))) - proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = + proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = discard let topic = "foobar" gossipSub.fanout[topic] = initHashSet[string]() gossipSub.lastFanoutPubSub[topic] = Moment.fromNow(100.millis) - proc writeHandler(data: seq[byte]) {.async, gcsafe.} = + proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard for i in 0..<6: let conn = newConnection(newBufferStream(writeHandler)) let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) - conn.peerInfo = some(peerInfo) + conn.peerInfo = peerInfo gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id].handler = handler gossipSub.fanout[topic].incl(peerInfo.id) @@ -561,10 +565,10 @@ when isMainModule and not defined(release): test "`dropFanoutPeers` leave unexpired fanout topics": proc testRun(): Future[bool] {.async.} = - let gossipSub = newPubSub(TestGossipSub, + let gossipSub = newPubSub(TestGossipSub, PeerInfo.init(PrivateKey.random(RSA))) - proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = + proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = discard let topic1 = "foobar1" @@ -574,13 +578,13 @@ when isMainModule and not defined(release): gossipSub.lastFanoutPubSub[topic1] = Moment.fromNow(100.millis) gossipSub.lastFanoutPubSub[topic1] = Moment.fromNow(500.millis) - proc writeHandler(data: seq[byte]) {.async, gcsafe.} = + proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard for i in 0..<6: let conn = newConnection(newBufferStream(writeHandler)) let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) - conn.peerInfo = some(peerInfo) + conn.peerInfo = peerInfo gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id].handler = handler gossipSub.fanout[topic1].incl(peerInfo.id) @@ -601,13 +605,13 @@ when isMainModule and not defined(release): test "`getGossipPeers` - should gather up to degree D non intersecting peers": proc testRun(): Future[bool] {.async.} = - let gossipSub = newPubSub(TestGossipSub, + let gossipSub = newPubSub(TestGossipSub, PeerInfo.init(PrivateKey.random(RSA))) - proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = + proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = discard - proc writeHandler(data: seq[byte]) {.async, gcsafe.} = + proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard let topic = "foobar" @@ -617,7 +621,7 @@ when isMainModule and not defined(release): for i in 0..<30: let conn = newConnection(newBufferStream(writeHandler)) let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) - conn.peerInfo = some(peerInfo) + conn.peerInfo = peerInfo gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id].handler = handler if i mod 2 == 0: @@ -628,7 +632,7 @@ when isMainModule and not defined(release): for i in 0..<15: let conn = newConnection(newBufferStream(writeHandler)) let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) - conn.peerInfo = some(peerInfo) + conn.peerInfo = peerInfo gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id].handler = handler gossipSub.gossipsub[topic].incl(peerInfo.id) @@ -650,10 +654,10 @@ when isMainModule and not defined(release): test "`getGossipPeers` - should not crash on missing topics in mesh": proc testRun(): Future[bool] {.async.} = - let gossipSub = newPubSub(TestGossipSub, + let gossipSub = newPubSub(TestGossipSub, PeerInfo.init(PrivateKey.random(RSA))) - - proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = + + proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = discard proc writeHandler(data: seq[byte]) {.async, gcsafe.} = @@ -665,7 +669,7 @@ when isMainModule and not defined(release): for i in 0..<30: let conn = newConnection(newBufferStream(writeHandler)) let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) - conn.peerInfo = some(peerInfo) + conn.peerInfo = peerInfo gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id].handler = handler if i mod 2 == 0: @@ -682,10 +686,10 @@ when isMainModule and not defined(release): test "`getGossipPeers` - should not crash on missing topics in gossip": proc testRun(): Future[bool] {.async.} = - let gossipSub = newPubSub(TestGossipSub, + let gossipSub = newPubSub(TestGossipSub, PeerInfo.init(PrivateKey.random(RSA))) - proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = + proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = discard proc writeHandler(data: seq[byte]) {.async, gcsafe.} = @@ -697,7 +701,7 @@ when isMainModule and not defined(release): for i in 0..<30: let conn = newConnection(newBufferStream(writeHandler)) let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) - conn.peerInfo = some(peerInfo) + conn.peerInfo = peerInfo gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id].handler = handler if i mod 2 == 0: @@ -714,10 +718,10 @@ when isMainModule and not defined(release): test "`getGossipPeers` - should not crash on missing topics in gossip": proc testRun(): Future[bool] {.async.} = - let gossipSub = newPubSub(TestGossipSub, + let gossipSub = newPubSub(TestGossipSub, PeerInfo.init(PrivateKey.random(RSA))) - proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = + proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async, gcsafe.} = discard proc writeHandler(data: seq[byte]) {.async, gcsafe.} = @@ -729,7 +733,7 @@ when isMainModule and not defined(release): for i in 0..<30: let conn = newConnection(newBufferStream(writeHandler)) let peerInfo = PeerInfo.init(PrivateKey.random(RSA)) - conn.peerInfo = some(peerInfo) + conn.peerInfo = peerInfo gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id].handler = handler if i mod 2 == 0: diff --git a/libp2p/protocols/pubsub/mcache.nim b/libp2p/protocols/pubsub/mcache.nim index a7e92176d..31b18d1cf 100644 --- a/libp2p/protocols/pubsub/mcache.nim +++ b/libp2p/protocols/pubsub/mcache.nim @@ -22,7 +22,7 @@ type historySize*: Natural windowSize*: Natural -proc put*(c: MCache, msg: Message) = +proc put*(c: MCache, msg: Message) = proc handler(key: string, val: Message) {.gcsafe.} = ## make sure we remove the message from history ## to keep things consisten @@ -38,13 +38,13 @@ proc get*(c: MCache, mid: string): Option[Message] = if mid in c.msgs: result = some(c.msgs[mid]) -proc window*(c: MCache, topic: string): HashSet[string] = +proc window*(c: MCache, topic: string): HashSet[string] = result = initHashSet[string]() - let len = - if c.windowSize > c.history.len: - c.history.len - else: + let len = + if c.windowSize > c.history.len: + c.history.len + else: c.windowSize if c.history.len > 0: diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 26c1db312..c591dff79 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -32,10 +32,10 @@ type handler*: seq[TopicHandler] PubSub* = ref object of LPProtocol - peerInfo*: PeerInfo # this peer's info - topics*: Table[string, Topic] # local topics + peerInfo*: PeerInfo # this peer's info + topics*: Table[string, Topic] # local topics peers*: Table[string, PubSubPeer] # peerid to peer map - triggerSelf*: bool # trigger own local handler on publish + triggerSelf*: bool # trigger own local handler on publish cleanupLock: AsyncLock proc sendSubs*(p: PubSub, @@ -49,8 +49,8 @@ proc sendSubs*(p: PubSub, var msg: RPCMsg for t in topics: - trace "sending topic", peer = peer.id, - subscribe = subscribe, + trace "sending topic", peer = peer.id, + subscribe = subscribe, topicName = t msg.subscriptions.add(SubOpts(topic: t, subscribe: subscribe)) @@ -62,7 +62,7 @@ method rpcHandler*(p: PubSub, ## handle rpc messages discard -method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.async, base, gcsafe.} = +method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.async, base, gcsafe.} = ## handle peer disconnects if peer.id in p.peers: p.peers.del(peer.id) @@ -71,7 +71,7 @@ proc cleanUpHelper(p: PubSub, peer: PubSubPeer) {.async.} = await p.cleanupLock.acquire() if peer.refs == 0: await p.handleDisconnect(peer) - + peer.refs.dec() # decrement refcount p.cleanupLock.release() @@ -102,20 +102,20 @@ method handleConn*(p: PubSub, ## that we're interested in ## - if conn.peerInfo.isNone: + if isNil(conn.peerInfo): trace "no valid PeerId for peer" await conn.close() return proc handler(peer: PubSubPeer, msgs: seq[RPCMsg]) {.async, gcsafe.} = - # call floodsub rpc handler - await p.rpcHandler(peer, msgs) + # call floodsub rpc handler + await p.rpcHandler(peer, msgs) - let peer = p.getPeer(conn.peerInfo.get(), proto) + let peer = p.getPeer(conn.peerInfo, proto) let topics = toSeq(p.topics.keys) if topics.len > 0: await p.sendSubs(peer, topics, true) - + peer.handler = handler await peer.handle(conn) # spawn peer read loop trace "pubsub peer handler ended, cleaning up" @@ -123,23 +123,21 @@ method handleConn*(p: PubSub, method subscribeToPeer*(p: PubSub, conn: Connection) {.base, async, gcsafe.} = - var peer = p.getPeer(conn.peerInfo.get(), p.codec) - trace "setting connection for peer", peerId = conn.peerInfo.get().id + var peer = p.getPeer(conn.peerInfo, p.codec) + trace "setting connection for peer", peerId = conn.peerInfo.id if not peer.isConnected: peer.conn = conn # handle connection close conn.closeEvent.wait() - .addCallback( - proc(udata: pointer = nil) {.gcsafe.} = - trace "connection closed, cleaning up peer", - peer = conn.peerInfo.get().id + .addCallback do (udata: pointer = nil): + trace "connection closed, cleaning up peer", + peer = conn.peerInfo.id - asyncCheck p.cleanUpHelper(peer) - ) + asyncCheck p.cleanUpHelper(peer) method unsubscribe*(p: PubSub, - topics: seq[TopicPair]) {.base, async, gcsafe.} = + topics: seq[TopicPair]) {.base, async, gcsafe.} = ## unsubscribe from a list of ``topic`` strings for t in topics: for i, h in p.topics[t.topic].handler: @@ -155,18 +153,18 @@ method unsubscribe*(p: PubSub, method subscribeTopic*(p: PubSub, topic: string, subscribe: bool, - peerId: string) {.base, gcsafe.} = + peerId: string) {.base, gcsafe.} = discard method subscribe*(p: PubSub, topic: string, - handler: TopicHandler) {.base, async, gcsafe.} = + handler: TopicHandler) {.base, async, gcsafe.} = ## subscribe to a topic ## ## ``topic`` - a string topic to subscribe to ## - ## ``handler`` - is a user provided proc - ## that will be triggered + ## ``handler`` - is a user provided proc + ## that will be triggered ## on every received message ## if topic notin p.topics: @@ -180,14 +178,14 @@ method subscribe*(p: PubSub, method publish*(p: PubSub, topic: string, - data: seq[byte]) {.base, async, gcsafe.} = + data: seq[byte]) {.base, async, gcsafe.} = ## publish to a ``topic`` if p.triggerSelf and topic in p.topics: for h in p.topics[topic].handler: await h(topic, data) method initPubSub*(p: PubSub) {.base.} = - ## perform pubsub initializaion + ## perform pubsub initializaion discard method start*(p: PubSub) {.async, base.} = diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index ef4dfe797..1a5c440a6 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -9,7 +9,7 @@ import options, hashes, strutils, tables, hashes import chronos, chronicles -import rpc/[messages, message, protobuf], +import rpc/[messages, message, protobuf], timedcache, ../../peer, ../../peerinfo, @@ -79,23 +79,21 @@ proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async, gcsafe.} = if $encodedHex.hash in p.sentRpcCache: trace "message already sent to peer, skipping", peer = p.id continue - + proc sendToRemote() {.async.} = trace "sending encoded msgs to peer", peer = p.id, encoded = encodedHex await p.sendConn.writeLp(encoded.buffer) p.sentRpcCache.put($encodedHex.hash) - # if no connection has been set, - # queue messages untill a connection + # if no connection has been set, + # queue messages untill a connection # becomes available if p.isConnected: await sendToRemote() return - p.onConnect.wait().addCallback( - proc(udata: pointer) = + p.onConnect.wait().addCallback do (udata: pointer): asyncCheck sendToRemote() - ) trace "enqueued message to send at a later time" except CatchableError as exc: @@ -112,7 +110,7 @@ proc sendGraft*(p: PubSubPeer, topics: seq[string]) {.async, gcsafe.} = trace "sending graft msg to peer", peer = p.id, topicID = topic await p.send(@[RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)])))]) -proc sendPrune*(p: PubSubPeer, topics: seq[string]) {.async, gcsafe.} = +proc sendPrune*(p: PubSubPeer, topics: seq[string]) {.async, gcsafe.} = for topic in topics: trace "sending prune msg to peer", peer = p.id, topicID = topic await p.send(@[RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)])))]) diff --git a/libp2p/protocols/pubsub/rpc/protobuf.nim b/libp2p/protocols/pubsub/rpc/protobuf.nim index 37710be06..ed414f64d 100644 --- a/libp2p/protocols/pubsub/rpc/protobuf.nim +++ b/libp2p/protocols/pubsub/rpc/protobuf.nim @@ -14,10 +14,10 @@ import messages, ../../../crypto/crypto, ../../../peer -proc encodeGraft*(graft: ControlGraft, pb: var ProtoBuffer) {.gcsafe.} = +proc encodeGraft*(graft: ControlGraft, pb: var ProtoBuffer) {.gcsafe.} = pb.write(initProtoField(1, graft.topicID)) -proc decodeGraft*(pb: var ProtoBuffer): seq[ControlGraft] {.gcsafe.} = +proc decodeGraft*(pb: var ProtoBuffer): seq[ControlGraft] {.gcsafe.} = trace "decoding graft msg", buffer = pb.buffer.toHex() while true: var topic: string @@ -28,10 +28,10 @@ proc decodeGraft*(pb: var ProtoBuffer): seq[ControlGraft] {.gcsafe.} = trace "read topic field from graft msg", topicID = topic result.add(ControlGraft(topicID: topic)) -proc encodePrune*(prune: ControlPrune, pb: var ProtoBuffer) {.gcsafe.} = +proc encodePrune*(prune: ControlPrune, pb: var ProtoBuffer) {.gcsafe.} = pb.write(initProtoField(1, prune.topicID)) -proc decodePrune*(pb: var ProtoBuffer): seq[ControlPrune] {.gcsafe.} = +proc decodePrune*(pb: var ProtoBuffer): seq[ControlPrune] {.gcsafe.} = trace "decoding prune msg" while true: var topic: string @@ -41,12 +41,12 @@ proc decodePrune*(pb: var ProtoBuffer): seq[ControlPrune] {.gcsafe.} = result.add(ControlPrune(topicID: topic)) -proc encodeIHave*(ihave: ControlIHave, pb: var ProtoBuffer) {.gcsafe.} = +proc encodeIHave*(ihave: ControlIHave, pb: var ProtoBuffer) {.gcsafe.} = pb.write(initProtoField(1, ihave.topicID)) for mid in ihave.messageIDs: pb.write(initProtoField(2, mid)) -proc decodeIHave*(pb: var ProtoBuffer): seq[ControlIHave] {.gcsafe.} = +proc decodeIHave*(pb: var ProtoBuffer): seq[ControlIHave] {.gcsafe.} = trace "decoding ihave msg" while true: @@ -67,11 +67,11 @@ proc decodeIHave*(pb: var ProtoBuffer): seq[ControlIHave] {.gcsafe.} = result.add(control) -proc encodeIWant*(iwant: ControlIWant, pb: var ProtoBuffer) {.gcsafe.} = +proc encodeIWant*(iwant: ControlIWant, pb: var ProtoBuffer) {.gcsafe.} = for mid in iwant.messageIDs: pb.write(initProtoField(1, mid)) -proc decodeIWant*(pb: var ProtoBuffer): seq[ControlIWant] {.gcsafe.} = +proc decodeIWant*(pb: var ProtoBuffer): seq[ControlIWant] {.gcsafe.} = trace "decoding ihave msg" while pb.enterSubMessage() > 0: @@ -82,7 +82,7 @@ proc decodeIWant*(pb: var ProtoBuffer): seq[ControlIWant] {.gcsafe.} = iWant.messageIDs.add(mid) result.add(iWant) -proc encodeControl*(control: ControlMessage, pb: var ProtoBuffer) {.gcsafe.} = +proc encodeControl*(control: ControlMessage, pb: var ProtoBuffer) {.gcsafe.} = if control.ihave.len > 0: var ihave = initProtoBuffer() for h in control.ihave: @@ -96,7 +96,7 @@ proc encodeControl*(control: ControlMessage, pb: var ProtoBuffer) {.gcsafe.} = var iwant = initProtoBuffer() for w in control.iwant: w.encodeIWant(iwant) - + # write messages to protobuf iwant.finish() pb.write(initProtoField(2, iwant)) @@ -105,7 +105,7 @@ proc encodeControl*(control: ControlMessage, pb: var ProtoBuffer) {.gcsafe.} = var graft = initProtoBuffer() for g in control.graft: g.encodeGraft(graft) - + # write messages to protobuf graft.finish() pb.write(initProtoField(3, graft)) @@ -114,12 +114,12 @@ proc encodeControl*(control: ControlMessage, pb: var ProtoBuffer) {.gcsafe.} = var prune = initProtoBuffer() for p in control.prune: p.encodePrune(prune) - + # write messages to protobuf prune.finish() pb.write(initProtoField(4, prune)) -proc decodeControl*(pb: var ProtoBuffer): Option[ControlMessage] {.gcsafe.} = +proc decodeControl*(pb: var ProtoBuffer): Option[ControlMessage] {.gcsafe.} = trace "decoding control submessage" var control: ControlMessage while true: @@ -137,17 +137,17 @@ proc decodeControl*(pb: var ProtoBuffer): Option[ControlMessage] {.gcsafe.} = control.graft = pb.decodeGraft() of 4: control.prune = pb.decodePrune() - else: + else: raise newException(CatchableError, "message type not recognized") - + if result.isNone: result = some(control) -proc encodeSubs*(subs: SubOpts, pb: var ProtoBuffer) {.gcsafe.} = +proc encodeSubs*(subs: SubOpts, pb: var ProtoBuffer) {.gcsafe.} = pb.write(initProtoField(1, subs.subscribe)) pb.write(initProtoField(2, subs.topic)) -proc decodeSubs*(pb: var ProtoBuffer): seq[SubOpts] {.gcsafe.} = +proc decodeSubs*(pb: var ProtoBuffer): seq[SubOpts] {.gcsafe.} = while true: var subOpt: SubOpts var subscr: int @@ -163,7 +163,7 @@ proc decodeSubs*(pb: var ProtoBuffer): seq[SubOpts] {.gcsafe.} = trace "got subscriptions", subscriptions = result -proc encodeMessage*(msg: Message, pb: var ProtoBuffer) {.gcsafe.} = +proc encodeMessage*(msg: Message, pb: var ProtoBuffer) {.gcsafe.} = pb.write(initProtoField(1, msg.fromPeer)) pb.write(initProtoField(2, msg.data)) pb.write(initProtoField(3, msg.seqno)) @@ -173,13 +173,13 @@ proc encodeMessage*(msg: Message, pb: var ProtoBuffer) {.gcsafe.} = if msg.signature.len > 0: pb.write(initProtoField(5, msg.signature)) - + if msg.key.len > 0: pb.write(initProtoField(6, msg.key)) pb.finish() -proc decodeMessages*(pb: var ProtoBuffer): seq[Message] {.gcsafe.} = +proc decodeMessages*(pb: var ProtoBuffer): seq[Message] {.gcsafe.} = # TODO: which of this fields are really optional? while true: var msg: Message @@ -202,7 +202,7 @@ proc decodeMessages*(pb: var ProtoBuffer): seq[Message] {.gcsafe.} = msg.topicIDs.add(topic) trace "read message field", topicName = topic topic = "" - + discard pb.getBytes(5, msg.signature) trace "read message field", signature = msg.signature @@ -211,7 +211,7 @@ proc decodeMessages*(pb: var ProtoBuffer): seq[Message] {.gcsafe.} = result.add(msg) -proc encodeRpcMsg*(msg: RPCMsg): ProtoBuffer {.gcsafe.} = +proc encodeRpcMsg*(msg: RPCMsg): ProtoBuffer {.gcsafe.} = result = initProtoBuffer() trace "encoding msg: ", msg = msg @@ -244,7 +244,7 @@ proc encodeRpcMsg*(msg: RPCMsg): ProtoBuffer {.gcsafe.} = if result.buffer.len > 0: result.finish() -proc decodeRpcMsg*(msg: seq[byte]): RPCMsg {.gcsafe.} = +proc decodeRpcMsg*(msg: seq[byte]): RPCMsg {.gcsafe.} = var pb = initProtoBuffer(msg) result.subscriptions = newSeq[SubOpts]() @@ -262,5 +262,5 @@ proc decodeRpcMsg*(msg: seq[byte]): RPCMsg {.gcsafe.} = result.messages = pb.decodeMessages() of 3: result.control = pb.decodeControl() - else: + else: raise newException(CatchableError, "message type not recognized") diff --git a/libp2p/protocols/secure/secio.nim b/libp2p/protocols/secure/secio.nim index b635c827f..c0a73185b 100644 --- a/libp2p/protocols/secure/secio.nim +++ b/libp2p/protocols/secure/secio.nim @@ -249,7 +249,7 @@ proc newSecureConnection*(conn: Connection, result.readerCoder.init(cipher, secrets.keyOpenArray(i1), secrets.ivOpenArray(i1)) - result.peerInfo = some(PeerInfo.init(remotePubKey)) + result.peerInfo = PeerInfo.init(remotePubKey) proc transactMessage(conn: Connection, msg: seq[byte]): Future[seq[byte]] {.async.} = @@ -296,8 +296,11 @@ proc handshake*(s: Secio, conn: Connection): Future[SecureConnection] {.async.} if randomBytes(localNonce) != SecioNonceSize: raise newException(CatchableError, "Could not generate random data") - var request = createProposal(localNonce, localBytesPubkey, SecioExchanges, - SecioCiphers, SecioHashes) + var request = createProposal(localNonce, + localBytesPubkey, + SecioExchanges, + SecioCiphers, + SecioHashes) localPeerId = PeerID.init(s.localPublicKey) @@ -415,8 +418,8 @@ proc readLoop(sconn: SecureConnection, stream: BufferStream) {.async.} = let msg = await sconn.readMessage() if msg.len > 0: await stream.pushTo(msg) - - # tight loop, give a chance for other + + # tight loop, give a chance for other # stuff to run as well await sleepAsync(1.millis) except CatchableError as exc: @@ -434,12 +437,12 @@ proc handleConn(s: Secio, conn: Connection): Future[Connection] {.async, gcsafe. asyncCheck readLoop(sconn, stream) var secured = newConnection(stream) secured.closeEvent.wait() - .addCallback(proc(udata: pointer) = + .addCallback do (udata: pointer): trace "wrapped connection closed, closing upstream" - if not sconn.closed: + if not isNil(sconn) and not sconn.closed: asyncCheck sconn.close() - ) - secured.peerInfo = some(PeerInfo.init(sconn.peerInfo.get().publicKey.get())) + + secured.peerInfo = PeerInfo.init(sconn.peerInfo.publicKey.get()) result = secured method init(s: Secio) {.gcsafe.} = diff --git a/libp2p/stream/chronosstream.nim b/libp2p/stream/chronosstream.nim index 8ae124180..75b124102 100644 --- a/libp2p/stream/chronosstream.nim +++ b/libp2p/stream/chronosstream.nim @@ -19,7 +19,7 @@ type ChronosStream* = ref object of LPStream server: StreamServer client: StreamTransport -proc newChronosStream*(server: StreamServer, +proc newChronosStream*(server: StreamServer, client: StreamTransport): ChronosStream = new result result.server = server @@ -28,9 +28,9 @@ proc newChronosStream*(server: StreamServer, result.writer = newAsyncStreamWriter(client) result.closeEvent = newAsyncEvent() -method read*(s: ChronosStream, n = -1): Future[seq[byte]] {.async.} = +method read*(s: ChronosStream, n = -1): Future[seq[byte]] {.async.} = if s.reader.atEof: - raise newLPStreamClosedError() + raise newLPStreamEOFError() try: result = await s.reader.read(n) @@ -39,11 +39,11 @@ method read*(s: ChronosStream, n = -1): Future[seq[byte]] {.async.} = except AsyncStreamIncorrectError as exc: raise newLPStreamIncorrectError(exc.msg) -method readExactly*(s: ChronosStream, - pbytes: pointer, +method readExactly*(s: ChronosStream, + pbytes: pointer, nbytes: int): Future[void] {.async.} = if s.reader.atEof: - raise newLPStreamClosedError() + raise newLPStreamEOFError() try: await s.reader.readExactly(pbytes, nbytes) @@ -56,7 +56,7 @@ method readExactly*(s: ChronosStream, method readLine*(s: ChronosStream, limit = 0, sep = "\r\n"): Future[string] {.async.} = if s.reader.atEof: - raise newLPStreamClosedError() + raise newLPStreamEOFError() try: result = await s.reader.readLine(limit, sep) @@ -67,7 +67,7 @@ method readLine*(s: ChronosStream, limit = 0, sep = "\r\n"): Future[string] {.as method readOnce*(s: ChronosStream, pbytes: pointer, nbytes: int): Future[int] {.async.} = if s.reader.atEof: - raise newLPStreamClosedError() + raise newLPStreamEOFError() try: result = await s.reader.readOnce(pbytes, nbytes) @@ -76,12 +76,12 @@ method readOnce*(s: ChronosStream, pbytes: pointer, nbytes: int): Future[int] {. except AsyncStreamIncorrectError as exc: raise newLPStreamIncorrectError(exc.msg) -method readUntil*(s: ChronosStream, - pbytes: pointer, - nbytes: int, +method readUntil*(s: ChronosStream, + pbytes: pointer, + nbytes: int, sep: seq[byte]): Future[int] {.async.} = if s.reader.atEof: - raise newLPStreamClosedError() + raise newLPStreamEOFError() try: result = await s.reader.readUntil(pbytes, nbytes, sep) @@ -96,7 +96,7 @@ method readUntil*(s: ChronosStream, method write*(s: ChronosStream, pbytes: pointer, nbytes: int) {.async.} = if s.writer.atEof: - raise newLPStreamClosedError() + raise newLPStreamEOFError() try: await s.writer.write(pbytes, nbytes) @@ -109,7 +109,7 @@ method write*(s: ChronosStream, pbytes: pointer, nbytes: int) {.async.} = method write*(s: ChronosStream, msg: string, msglen = -1) {.async.} = if s.writer.atEof: - raise newLPStreamClosedError() + raise newLPStreamEOFError() try: await s.writer.write(msg, msglen) @@ -122,7 +122,7 @@ method write*(s: ChronosStream, msg: string, msglen = -1) {.async.} = method write*(s: ChronosStream, msg: seq[byte], msglen = -1) {.async.} = if s.writer.atEof: - raise newLPStreamClosedError() + raise newLPStreamEOFError() try: await s.writer.write(msg, msglen) @@ -133,7 +133,7 @@ method write*(s: ChronosStream, msg: seq[byte], msglen = -1) {.async.} = except AsyncStreamIncorrectError as exc: raise newLPStreamIncorrectError(exc.msg) -method closed*(s: ChronosStream): bool {.inline.} = +method closed*(s: ChronosStream): bool {.inline.} = # TODO: we might only need to check for reader's EOF result = s.reader.atEof() diff --git a/libp2p/stream/lpstream.nim b/libp2p/stream/lpstream.nim index 3574a0699..bdc0e3aa8 100644 --- a/libp2p/stream/lpstream.nim +++ b/libp2p/stream/lpstream.nim @@ -9,7 +9,7 @@ import chronos -type +type LPStream* = ref object of RootObj isClosed*: bool closeEvent*: AsyncEvent @@ -22,7 +22,7 @@ type par*: ref Exception LPStreamWriteError* = object of LPStreamError par*: ref Exception - LPStreamClosedError* = object of LPStreamError + LPStreamEOFError* = object of LPStreamError proc newLPStreamReadError*(p: ref Exception): ref Exception {.inline.} = var w = newException(LPStreamReadError, "Read stream failed") @@ -45,10 +45,10 @@ proc newLPStreamLimitError*(): ref Exception {.inline.} = proc newLPStreamIncorrectError*(m: string): ref Exception {.inline.} = result = newException(LPStreamIncorrectError, m) -proc newLPStreamClosedError*(): ref Exception {.inline.} = - result = newException(LPStreamClosedError, "Stream closed!") +proc newLPStreamEOFError*(): ref Exception {.inline.} = + result = newException(LPStreamEOFError, "Stream EOF!") -method closed*(s: LPStream): bool {.base, inline.} = +method closed*(s: LPStream): bool {.base, inline.} = s.isClosed method read*(s: LPStream, n = -1): Future[seq[byte]] diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 515d00796..cfbebd6f3 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -65,13 +65,17 @@ proc secure(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} = proc identify(s: Switch, conn: Connection): Future[PeerInfo] {.async, gcsafe.} = ## identify the connection - if conn.peerInfo.isSome: - result = conn.peerInfo.get() + if not isNil(conn.peerInfo): + result = conn.peerInfo try: if (await s.ms.select(conn, s.identity.codec)): let info = await s.identity.identify(conn, conn.peerInfo) + if info.pubKey.isNone and isNil(result): + raise newException(CatchableError, + "no public key provided and no existing peer identity found") + if info.pubKey.isSome: result = PeerInfo.init(info.pubKey.get()) trace "identify: identified remote peer", peer = result.id @@ -112,58 +116,58 @@ proc mux(s: Switch, conn: Connection): Future[void] {.async, gcsafe.} = # add muxer handler cleanup proc handlerFut.addCallback do (udata: pointer = nil): trace "muxer handler completed for peer", - peer = conn.peerInfo.get().id + peer = conn.peerInfo.id # do identify first, so that we have a # PeerInfo in case we didn't before - conn.peerInfo = some((await s.identify(stream))) + conn.peerInfo = await s.identify(stream) await stream.close() # close identify stream trace "connection's peerInfo", peerInfo = conn.peerInfo # store it in muxed connections if we have a peer for it - if conn.peerInfo.isSome: - trace "adding muxer for peer", peer = conn.peerInfo.get().id - s.muxed[conn.peerInfo.get().id] = muxer + if not isNil(conn.peerInfo): + trace "adding muxer for peer", peer = conn.peerInfo.id + s.muxed[conn.peerInfo.id] = muxer proc cleanupConn(s: Switch, conn: Connection) {.async, gcsafe.} = - # if conn.peerInfo.peerId.isSome: - let id = conn.peerInfo.get().id - trace "cleaning up connection for peer", peerId = id - if id in s.muxed: - await s.muxed[id].close() - s.muxed.del(id) + if not isNil(conn.peerInfo): + let id = conn.peerInfo.id + trace "cleaning up connection for peer", peerId = id + if id in s.muxed: + await s.muxed[id].close() + s.muxed.del(id) - if id in s.connections: - await s.connections[id].close() - s.connections.del(id) + if id in s.connections: + await s.connections[id].close() + s.connections.del(id) proc disconnect*(s: Switch, peer: PeerInfo) {.async, gcsafe.} = let conn = s.connections.getOrDefault(peer.id) - if conn != nil: + if not isNil(conn): await s.cleanupConn(conn) -proc getMuxedStream(s: Switch, peerInfo: PeerInfo): Future[Option[Connection]] {.async, gcsafe.} = +proc getMuxedStream(s: Switch, peerInfo: PeerInfo): Future[Connection] {.async, gcsafe.} = # if there is a muxer for the connection # use it instead to create a muxed stream if peerInfo.id in s.muxed: trace "connection is muxed, setting up a stream" let muxer = s.muxed[peerInfo.id] let conn = await muxer.newStream() - result = some(conn) + result = conn proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} = trace "handling connection", conn = conn result = conn # don't mux/secure twise - if conn.peerInfo.get().id in s.muxed: + if conn.peerInfo.id in s.muxed: return result = await s.secure(result) # secure the connection await s.mux(result) # mux it if possible - s.connections[conn.peerInfo.get().id] = result + s.connections[conn.peerInfo.id] = result proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} = trace "upgrading incoming connection" @@ -206,7 +210,7 @@ proc dial*(s: Switch, trace "Dialing address", address = $a result = await t.dial(a) # make sure to assign the peer to the connection - result.peerInfo = some peer + result.peerInfo = peer result = await s.upgradeOutgoing(result) result.closeEvent.wait().addCallback do (udata: pointer): asyncCheck s.cleanupConn(result) @@ -214,11 +218,14 @@ proc dial*(s: Switch, else: trace "Reusing existing connection" + if isNil(result): + raise newException(CatchableError, "unable to establish outgoing link!") + if proto.len > 0 and not result.closed: let stream = await s.getMuxedStream(peer) - if stream.isSome: + if not isNil(stream): trace "Connection is muxed, return muxed stream" - result = stream.get() + result = stream trace "Attempting to select remote", proto = proto if not await s.ms.select(result, proto): @@ -324,7 +331,7 @@ proc newSwitch*(peerInfo: PeerInfo, val.muxerHandler = proc(muxer: Muxer) {.async, gcsafe.} = trace "got new muxer" let stream = await muxer.newStream() - muxer.connection.peerInfo = some((await s.identify(stream))) + muxer.connection.peerInfo = await s.identify(stream) await stream.close() for k in secureManagers.keys: diff --git a/libp2p/transports/tcptransport.nim b/libp2p/transports/tcptransport.nim index c507ddbbf..5ded830f3 100644 --- a/libp2p/transports/tcptransport.nim +++ b/libp2p/transports/tcptransport.nim @@ -30,7 +30,7 @@ proc connHandler*(t: Transport, let conn: Connection = newConnection(newChronosStream(server, client)) conn.observedAddrs = MultiAddress.init(client.remoteAddress) if not initiator: - let handlerFut = if t.handler == nil: nil else: t.handler(conn) + let handlerFut = if isNil(t.handler): nil else: t.handler(conn) let connHolder: ConnHolder = ConnHolder(connection: conn, connFuture: handlerFut) t.connections.add(connHolder) @@ -77,6 +77,6 @@ method dial*(t: TcpTransport, let client: StreamTransport = await connect(address) result = await t.connHandler(t.server, client, true) -method handles*(t: TcpTransport, address: MultiAddress): bool {.gcsafe.} = +method handles*(t: TcpTransport, address: MultiAddress): bool {.gcsafe.} = if procCall Transport(t).handles(address): result = address.protocols.filterIt( it == multiCodec("tcp") ).len > 0 diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 3c980b06c..e8be14ee2 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -25,7 +25,7 @@ proc createGossipSub(): GossipSub = suite "GossipSub": test "should add remote peer topic subscriptions": proc testRun(): Future[bool] {.async.} = - proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = discard let gossip1 = createGossipSub() @@ -33,11 +33,11 @@ suite "GossipSub": var buf1 = newBufferStream() var conn1 = newConnection(buf1) - conn1.peerInfo = some(gossip1.peerInfo) + conn1.peerInfo = gossip1.peerInfo var buf2 = newBufferStream() var conn2 = newConnection(buf2) - conn2.peerInfo = some(gossip2.peerInfo) + conn2.peerInfo = gossip2.peerInfo buf1 = buf1 | buf2 | buf1 @@ -50,7 +50,7 @@ suite "GossipSub": check: "foobar" in gossip2.gossipsub gossip1.peerInfo.id in gossip2.gossipsub["foobar"] - + result = true check: @@ -92,7 +92,7 @@ suite "GossipSub": test "should add remote peer topic subscriptions if both peers are subscribed": proc testRun(): Future[bool] {.async.} = - proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = discard let gossip1 = createGossipSub() @@ -100,12 +100,12 @@ suite "GossipSub": var buf1 = newBufferStream() var conn1 = newConnection(buf1) - conn1.peerInfo = some(gossip1.peerInfo) + conn1.peerInfo = gossip1.peerInfo var buf2 = newBufferStream() var conn2 = newConnection(buf2) - conn2.peerInfo = some(gossip2.peerInfo) - + conn2.peerInfo = gossip2.peerInfo + buf1 = buf1 | buf2 | buf1 await gossip1.subscribeToPeer(conn2) @@ -181,7 +181,7 @@ suite "GossipSub": # test "send over fanout A -> B": # proc testRun(): Future[bool] {.async.} = # var handlerFut = newFuture[bool]() - # proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + # proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = # check: # topic == "foobar" # cast[string](data) == "Hello!" @@ -221,7 +221,7 @@ suite "GossipSub": test "e2e - send over fanout A -> B": proc testRun(): Future[bool] {.async.} = var passed: bool - proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = check topic == "foobar" passed = true @@ -255,7 +255,7 @@ suite "GossipSub": # test "send over mesh A -> B": # proc testRun(): Future[bool] {.async.} = # var passed: bool - # proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + # proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = # check: # topic == "foobar" # cast[string](data) == "Hello!" @@ -294,7 +294,7 @@ suite "GossipSub": # test "e2e - send over mesh A -> B": # proc testRun(): Future[bool] {.async.} = # var passed: bool - # proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + # proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = # check topic == "foobar" # passed = true @@ -320,7 +320,7 @@ suite "GossipSub": # check: # waitFor(testRun()) == true - # test "with multiple peers": + # test "with multiple peers": # proc testRun(): Future[bool] {.async.} = # var nodes: seq[GossipSub] # for i in 0..<10: @@ -361,8 +361,8 @@ suite "GossipSub": # awaitters.add(dialer.start()) - # await nodes[0].publish("foobar", - # cast[seq[byte]]("from node " & + # await nodes[0].publish("foobar", + # cast[seq[byte]]("from node " & # nodes[1].peerInfo.peerId.get().pretty)) # await sleepAsync(1000.millis) @@ -404,8 +404,8 @@ suite "GossipSub": await subscribeNodes(nodes) await sleepAsync(10.millis) - await nodes[0].publish("foobar", - cast[seq[byte]]("from node " & + await nodes[0].publish("foobar", + cast[seq[byte]]("from node " & nodes[1].peerInfo.id)) await sleepAsync(1000.millis) diff --git a/tests/testidentify.nim b/tests/testidentify.nim index f2344e1be..8d8577793 100644 --- a/tests/testidentify.nim +++ b/tests/testidentify.nim @@ -18,9 +18,9 @@ suite "Identify": proc testHandle(): Future[bool] {.async.} = let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") let remoteSecKey = PrivateKey.random(RSA) - let remotePeerInfo = PeerInfo.init(remoteSecKey, - @[ma], - @["/test/proto1/1.0.0", + let remotePeerInfo = PeerInfo.init(remoteSecKey, + @[ma], + @["/test/proto1/1.0.0", "/test/proto2/1.0.0"]) var serverFut: Future[void] let identifyProto1 = newIdentify(remotePeerInfo) @@ -40,7 +40,7 @@ suite "Identify": var peerInfo = PeerInfo.init(PrivateKey.random(RSA), @[ma]) let identifyProto2 = newIdentify(peerInfo) discard await msDial.select(conn, IdentifyCodec) - let id = await identifyProto2.identify(conn, some(remotePeerInfo)) + let id = await identifyProto2.identify(conn, remotePeerInfo) check id.pubKey.get() == remoteSecKey.getKey() check id.addrs[0] == ma @@ -77,7 +77,7 @@ suite "Identify": var localPeerInfo = PeerInfo.init(PrivateKey.random(RSA), @[ma]) let identifyProto2 = newIdentify(localPeerInfo) discard await msDial.select(conn, IdentifyCodec) - discard await identifyProto2.identify(conn, some(PeerInfo.init(PrivateKey.random(RSA)))) + discard await identifyProto2.identify(conn, PeerInfo.init(PrivateKey.random(RSA))) await conn.close() expect IdentityNoMatchError: diff --git a/tests/testinterop.nim b/tests/testinterop.nim index a1cd7dc9f..76086f7d1 100644 --- a/tests/testinterop.nim +++ b/tests/testinterop.nim @@ -59,10 +59,12 @@ proc readLp*(s: StreamTransport): Future[seq[byte]] {.async, gcsafe.} = result.setLen(size) if size > 0.uint: await s.readExactly(addr result[0], int(size)) - except LPStreamIncompleteError, LPStreamReadError: - trace "remote connection ended unexpectedly", exc = getCurrentExceptionMsg() + except LPStreamIncompleteError as exc: + trace "remote connection ended unexpectedly", exc = exc.msg + except LPStreamReadError as exc: + trace "unable to read from remote connection", exc = exc.msg -proc createNode*(privKey: Option[PrivateKey] = none(PrivateKey), +proc createNode*(privKey: Option[PrivateKey] = none(PrivateKey), address: string = "/ip4/127.0.0.1/tcp/0", triggerSelf: bool = false, gossip: bool = false): Switch = @@ -77,7 +79,7 @@ proc createNode*(privKey: Option[PrivateKey] = none(PrivateKey), let muxers = [(MplexCodec, mplexProvider)].toTable() let identify = newIdentify(peerInfo) let secureManagers = [(SecioCodec, Secure(newSecio(seckey.get())))].toTable() - + var pubSub: Option[PubSub] if gossip: pubSub = some(PubSub(newPubSub(GossipSub, peerInfo, triggerSelf))) @@ -116,10 +118,10 @@ proc testPubSubDaemonPublish(gossip: bool = false): Future[bool] {.async.} = daemonPeer.addresses)) await sleepAsync(1.seconds) await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) - + proc pubsubHandler(api: DaemonAPI, ticket: PubsubTicket, - message: PubSubMessage): Future[bool] {.async.} = + message: PubSubMessage): Future[bool] {.async.} = result = true # don't cancel subscription asyncDiscard daemonNode.pubsubSubscribe(testTopic, pubsubHandler) @@ -153,10 +155,10 @@ proc testPubSubNodePublish(gossip: bool = false): Future[bool] {.async.} = await sleepAsync(1.seconds) await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) - + proc pubsubHandler(api: DaemonAPI, ticket: PubsubTicket, - message: PubSubMessage): Future[bool] {.async.} = + message: PubSubMessage): Future[bool] {.async.} = let smsg = cast[string](message.data) check smsg == pubsubData handlerFuture.complete(true) @@ -185,7 +187,7 @@ suite "Interop": proc daemonHandler(api: DaemonAPI, stream: P2PStream) {.async.} = check cast[string](await stream.transp.readLp()) == "test 1" asyncDiscard stream.transp.writeLp("test 2") - + await sleepAsync(10.millis) check cast[string](await stream.transp.readLp()) == "test 3" asyncDiscard stream.transp.writeLp("test 4") diff --git a/tests/testmplex.nim b/tests/testmplex.nim index 90a4a777a..d0c0fd452 100644 --- a/tests/testmplex.nim +++ b/tests/testmplex.nim @@ -268,7 +268,7 @@ suite "Mplex": await chann.close() await chann.write("Hello") - expect LPStreamClosedError: + expect LPStreamEOFError: waitFor(testClosedForWrite()) test "half closed - channel should close for read by remote": @@ -281,7 +281,7 @@ suite "Mplex": discard await chann.read() # this should work, since there is data in the buffer discard await chann.read() # this should throw - expect LPStreamClosedError: + expect LPStreamEOFError: waitFor(testClosedForRead()) test "reset - channel should fail reading": @@ -291,7 +291,7 @@ suite "Mplex": await chann.reset() asyncDiscard chann.read() - expect LPStreamClosedError: + expect LPStreamEOFError: waitFor(testResetRead()) test "reset - channel should fail writing": @@ -301,7 +301,7 @@ suite "Mplex": await chann.reset() await chann.write(cast[seq[byte]]("Hello!")) - expect LPStreamClosedError: + expect LPStreamEOFError: waitFor(testResetWrite()) test "should not allow pushing data to channel when remote end closed": @@ -311,5 +311,5 @@ suite "Mplex": await chann.closedByRemote() await chann.pushTo(@[byte(1)]) - expect LPStreamClosedError: + expect LPStreamEOFError: waitFor(testResetWrite()) diff --git a/tests/testpeerinfo.nim b/tests/testpeerinfo.nim index bf48139ea..7c2e3c719 100644 --- a/tests/testpeerinfo.nim +++ b/tests/testpeerinfo.nim @@ -30,24 +30,24 @@ suite "PeerInfo": check peerId == peerInfo.peerId check seckey.getKey == peerInfo.publicKey.get() - test "Should return none on missing public key": + test "Should init from CIDv0 string": + var peerInfo = PeerInfo.init("QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N") + + check: + PeerID.init("QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N") == peerInfo.peerId + + # TODO: CIDv1 is handling is missing from PeerID + # https://github.com/status-im/nim-libp2p/issues/53 + # test "Should init from CIDv1 string": + # var peerInfo = PeerInfo.init("bafzbeie5745rpv2m6tjyuugywy4d5ewrqgqqhfnf445he3omzpjbx5xqxe") + + # check: + # PeerID.init("bafzbeie5745rpv2m6tjyuugywy4d5ewrqgqqhfnf445he3omzpjbx5xqxe") == peerInfo.peerId + + test "Should return none if pubkey is missing from id": let peerInfo = PeerInfo.init(PeerID.init(PrivateKey.random(RSA))) check peerInfo.publicKey.isNone - test "Should allow assigning public key": - let key = PrivateKey.random(RSA) - - let peerInfo = PeerInfo.init(PeerID.init(key)) - peerInfo.publicKey = key.getKey() - check peerInfo.publicKey.get() == key.getKey() - - test "Should throw on invalid public key assignement": - proc throwsOnInvalidPubKey() = - let validKey = PrivateKey.random(RSA) - let invalidKey = PrivateKey.random(RSA) - - let peerInfo = PeerInfo.init(PeerID.init(validKey)) - peerInfo.publicKey = invalidKey.getKey() - - expect InvalidPublicKeyException: - throwsOnInvalidPubKey() + test "Should return some if pubkey is present in id": + let peerInfo = PeerInfo.init(PeerID.init(PrivateKey.random(Ed25519))) + check peerInfo.publicKey.isSome