diff --git a/README.md b/README.md index 8203960e8..785347832 100644 --- a/README.md +++ b/README.md @@ -126,8 +126,8 @@ proc createSwitch(ma: MultiAddress): (Switch, PeerInfo) = let mplexProvider = newMuxerProvider(createMplex, MplexCodec) # create multiplexer let transports = @[Transport(newTransport(TcpTransport))] # add all transports (tcp only for now, but can be anything in the future) - let muxers = [(MplexCodec, mplexProvider)].toTable() # add all muxers - let secureManagers = [(SecioCodec, Secure(newSecio(seckey)))].toTable() # setup the secio and any other secure provider + let muxers = {MplexCodec: mplexProvider}.toTable() # add all muxers + let secureManagers = {SecioCodec: Secure(newSecio(seckey))}.toTable() # setup the secio and any other secure provider # create the switch let switch = newSwitch(peerInfo, diff --git a/examples/directchat.nim b/examples/directchat.nim index 1aac8b578..eb59f3b59 100644 --- a/examples/directchat.nim +++ b/examples/directchat.nim @@ -74,7 +74,7 @@ proc dialPeer(p: ChatProto, address: string) {.async, gcsafe.} = p.conn = await p.switch.dial(remotePeer, ChatCodec) p.connected = true -proc writeAndPrint(p: ChatProto) {.async, gcsafe.} = +proc writeAndPrint(p: ChatProto) {.async, gcsafe.} = while true: if not p.connected: # echo &"{p.id} ->" @@ -86,7 +86,7 @@ proc writeAndPrint(p: ChatProto) {.async, gcsafe.} = if line.startsWith("/help") or line.startsWith("/?") or not p.started: echo Help continue - + if line.startsWith("/disconnect"): echo "Ending current session" if p.connected and p.conn.closed.not: @@ -100,7 +100,7 @@ proc writeAndPrint(p: ChatProto) {.async, gcsafe.} = if yesno.cmpIgnoreCase("y") == 0: await p.conn.close() p.connected = false - elif yesno.cmpIgnoreCase("n") == 0: + elif yesno.cmpIgnoreCase("n") == 0: continue else: echo "unrecognized response" @@ -131,22 +131,22 @@ proc writeAndPrint(p: ChatProto) {.async, gcsafe.} = # echo getCurrentExceptionMsg() proc readWriteLoop(p: ChatProto) {.async, gcsafe.} = - asyncCheck p.writeAndPrint() - asyncCheck p.readAndPrint() + asyncCheck p.writeAndPrint() + asyncCheck p.readAndPrint() method init(p: ChatProto) {.gcsafe.} = - proc handle(stream: Connection, proto: string) {.async, gcsafe.} = + proc handle(stream: Connection, proto: string) {.async, gcsafe.} = if p.connected and not p.conn.closed: echo "a chat session is already in progress - disconnecting!" await stream.close() - - p.conn = stream - p.connected = true + else: + p.conn = stream + p.connected = true p.codec = ChatCodec p.handler = handle -proc newChatProto(switch: Switch, transp: StreamTransport): ChatProto = +proc newChatProto(switch: Switch, transp: StreamTransport): ChatProto = new result result.switch = switch result.transp = transp @@ -156,7 +156,7 @@ proc threadMain(wfd: AsyncFD) {.thread.} = ## This procedure performs reading from `stdin` and sends data over ## pipe to main thread. var transp = fromPipe(wfd) - + while true: var line = stdin.readLine() discard waitFor transp.write(line & "\r\n") @@ -217,7 +217,7 @@ proc main() {.async.} = var (rfd, wfd) = createAsyncPipe() if rfd == asyncInvalidPipe or wfd == asyncInvalidPipe: raise newException(ValueError, "Could not initialize pipe!") - + data.consoleFd = rfd data.serveFut = serveThread(data) var thread: Thread[AsyncFD] diff --git a/libp2p.nim b/libp2p.nim index 4dba88c86..8e0caadad 100644 --- a/libp2p.nim +++ b/libp2p.nim @@ -6,7 +6,12 @@ ## at your option. ## This file may not be copied, modified, or distributed except according to ## those terms. -import libp2p/daemon/[daemonapi, transpool] -import libp2p/protobuf/minprotobuf -import libp2p/varint -export daemonapi, minprotobuf, varint, transpool + +import + libp2p/daemon/[daemonapi, transpool], + libp2p/protobuf/minprotobuf, + libp2p/varint + +export + daemonapi, transpool, minprotobuf, varint + diff --git a/libp2p/crypto/crypto.nim b/libp2p/crypto/crypto.nim index 9ffdb70fa..ac33f4153 100644 --- a/libp2p/crypto/crypto.nim +++ b/libp2p/crypto/crypto.nim @@ -427,7 +427,7 @@ proc `$`*(sig: Signature): string = ## Get string representation of signature ``sig``. result = toHex(sig.data) -proc sign*(key: PrivateKey, data: openarray[byte]): Signature = +proc sign*(key: PrivateKey, data: openarray[byte]): Signature {.gcsafe.} = ## Sign message ``data`` using private key ``key`` and return generated ## signature in raw binary form. if key.scheme == RSA: diff --git a/libp2p/crypto/ecnist.nim b/libp2p/crypto/ecnist.nim index 27b09ade7..e8faa4ce4 100644 --- a/libp2p/crypto/ecnist.nim +++ b/libp2p/crypto/ecnist.nim @@ -863,7 +863,7 @@ proc getSecret*(pubkey: EcPublicKey, seckey: EcPrivateKey): seq[byte] = copyMem(addr result[0], addr data[0], res) proc sign*[T: byte|char](seckey: EcPrivateKey, - message: openarray[T]): EcSignature = + message: openarray[T]): EcSignature {.gcsafe.} = ## Get ECDSA signature of data ``message`` using private key ``seckey``. doAssert(not isNil(seckey)) var hc: BrHashCompatContext diff --git a/libp2p/crypto/ed25519/ed25519.nim b/libp2p/crypto/ed25519/ed25519.nim index 5371d7965..20c17b7ea 100644 --- a/libp2p/crypto/ed25519/ed25519.nim +++ b/libp2p/crypto/ed25519/ed25519.nim @@ -1836,7 +1836,7 @@ proc clear*(pair: var EdKeyPair) = burnMem(pair.pubkey.data) proc sign*[T: byte|char](key: EdPrivateKey, - message: openarray[T]): EdSignature {.noinit.} = + message: openarray[T]): EdSignature {.gcsafe, noinit.} = ## Create ED25519 signature of data ``message`` using private key ``key``. var ctx: sha512 var r: GeP3 diff --git a/libp2p/crypto/rsa.nim b/libp2p/crypto/rsa.nim index dddb2a62e..46d70e14b 100644 --- a/libp2p/crypto/rsa.nim +++ b/libp2p/crypto/rsa.nim @@ -723,13 +723,13 @@ proc `==`*(a, b: RsaPublicKey): bool = result = r1 and r2 proc sign*[T: byte|char](key: RsaPrivateKey, - message: openarray[T]): RsaSignature = + message: openarray[T]): RsaSignature {.gcsafe.} = ## Get RSA PKCS1.5 signature of data ``message`` using SHA256 and private ## key ``key``. doAssert(not isNil(key)) var hc: BrHashCompatContext var hash: array[32, byte] - var impl = BrRsaPkcs1SignGetDefault() + let impl = BrRsaPkcs1SignGetDefault() result = new RsaSignature result.buffer = newSeq[byte]((key.seck.nBitlen + 7) shr 3) var kv = addr sha256Vtable diff --git a/libp2p/crypto/secp.nim b/libp2p/crypto/secp.nim index 6dc497346..b8344986c 100644 --- a/libp2p/crypto/secp.nim +++ b/libp2p/crypto/secp.nim @@ -339,7 +339,7 @@ proc `$`*(sig: SkSignature): string = discard sig.toBytes(ssig) result = toHex(ssig) -proc sign*[T: byte|char](key: SkPrivateKey, msg: openarray[T]): SkSignature = +proc sign*[T: byte|char](key: SkPrivateKey, msg: openarray[T]): SkSignature {.gcsafe.} = ## Sign message `msg` using private key `key` and return signature object. let ctx = getContext() var hash = sha256.digest(msg) diff --git a/libp2p/protocols/protocol.nim b/libp2p/protocols/protocol.nim index 7c4cbb27d..c31a4131a 100644 --- a/libp2p/protocols/protocol.nim +++ b/libp2p/protocols/protocol.nim @@ -11,8 +11,8 @@ import chronos import ../connection type - LPProtoHandler* = proc (conn: Connection, - proto: string): + LPProtoHandler* = proc (conn: Connection, + proto: string): Future[void] {.gcsafe, closure.} LPProtocol* = ref object of RootObj diff --git a/libp2p/protocols/pubsub/rpc/message.nim b/libp2p/protocols/pubsub/rpc/message.nim index dab7cf32a..9bceb9a49 100644 --- a/libp2p/protocols/pubsub/rpc/message.nim +++ b/libp2p/protocols/pubsub/rpc/message.nim @@ -27,7 +27,7 @@ proc msgId*(m: Message): string = proc fromPeerId*(m: Message): PeerId = PeerID.init(m.fromPeer) -proc sign*(p: PeerInfo, msg: Message): Message {.gcsafe.} = +proc sign*(p: PeerInfo, msg: Message): Message {.gcsafe.} = var buff = initProtoBuffer() encodeMessage(msg, buff) let prefix = cast[seq[byte]](PubSubPrefix) @@ -54,7 +54,7 @@ proc verify*(p: PeerInfo, m: Message): bool = proc newMessage*(p: PeerInfo, data: seq[byte], name: string, - sign: bool = true): Message {.gcsafe.} = + sign: bool = true): Message {.gcsafe.} = var seqno: seq[byte] = newSeq[byte](20) if p.publicKey.isSome and randomBytes(addr seqno[0], 20) > 0: var key: seq[byte] = p.publicKey.get().getBytes() diff --git a/libp2p/standard_setup.nim b/libp2p/standard_setup.nim new file mode 100644 index 000000000..30faa07b4 --- /dev/null +++ b/libp2p/standard_setup.nim @@ -0,0 +1,35 @@ +import + options, tables, + switch, peer, peerinfo, connection, multiaddress, + crypto/crypto, transports/[transport, tcptransport], + muxers/[muxer, mplex/mplex, mplex/types], + protocols/[identify, secure/secure, secure/secio], + protocols/pubsub/[pubsub, gossipsub, floodsub] + +export + switch, peer, peerinfo, connection, multiaddress, crypto + +proc newStandardSwitch*(privKey = none(PrivateKey), + address = MultiAddress.init("/ip4/127.0.0.1/tcp/0"), + triggerSelf = false, gossip = false): Switch = + proc createMplex(conn: Connection): Muxer = + result = newMplex(conn) + + let + seckey = privKey.get(otherwise = PrivateKey.random(RSA)) + peerInfo = PeerInfo.init(seckey, @[address]) + mplexProvider = newMuxerProvider(createMplex, MplexCodec) + transports = @[Transport(newTransport(TcpTransport))] + muxers = {MplexCodec: mplexProvider}.toTable + identify = newIdentify(peerInfo) + secureManagers = {SecioCodec: Secure(newSecio seckey)}.toTable + pubSub = if gossip: PubSub newPubSub(GossipSub, peerInfo, triggerSelf) + else: PubSub newPubSub(FloodSub, peerInfo, triggerSelf) + + result = newSwitch(peerInfo, + transports, + identify, + muxers, + secureManagers = secureManagers, + pubSub = some(pubSub)) + diff --git a/libp2p/switch.nim b/libp2p/switch.nim index d575ad900..3d4c3ce25 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -25,9 +25,9 @@ logScope: topic = "Switch" #TODO: General note - use a finite state machine to manage the different -# steps of connections establishing and upgrading. This makes everything -# more robust and less prone to ordering attacks - i.e. muxing can come if -# and only if the channel has been secured (i.e. if a secure manager has been +# steps of connections establishing and upgrading. This makes everything +# more robust and less prone to ordering attacks - i.e. muxing can come if +# and only if the channel has been secured (i.e. if a secure manager has been # previously provided) type @@ -46,10 +46,10 @@ type secureManagers*: Table[string, Secure] pubSub*: Option[PubSub] -proc newNoPubSubException(): ref Exception {.inline.} = +proc newNoPubSubException(): ref Exception {.inline.} = result = newException(NoPubSubException, "no pubsub provided!") -proc secure(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} = +proc secure(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} = ## secure the incoming connection let managers = toSeq(s.secureManagers.keys) @@ -110,11 +110,9 @@ proc mux(s: Switch, conn: Connection): Future[void] {.async, gcsafe.} = let handlerFut = muxer.handle() # add muxer handler cleanup proc - handlerFut.addCallback( - proc(udata: pointer = nil) {.gcsafe.} = - trace "muxer handler completed for peer", - peer = conn.peerInfo.get().id - ) + handlerFut.addCallback do (udata: pointer = nil): + trace "muxer handler completed for peer", + peer = conn.peerInfo.get().id # do identify first, so that we have a # PeerInfo in case we didn't before @@ -141,7 +139,12 @@ proc cleanupConn(s: Switch, conn: Connection) {.async, gcsafe.} = await s.connections[id].close() s.connections.del(id) -proc getMuxedStream(s: Switch, peerInfo: PeerInfo): Future[Option[Connection]] {.async, gcsafe.} = +proc disconnect*(s: Switch, peer: PeerInfo) {.async, gcsafe.} = + let conn = s.connections.getOrDefault(peer.id) + if conn != nil: + await s.cleanupConn(conn) + +proc getMuxedStream(s: Switch, peerInfo: PeerInfo): Future[Option[Connection]] {.async, gcsafe.} = # if there is a muxer for the connection # use it instead to create a muxed stream if peerInfo.id in s.muxed: @@ -162,7 +165,7 @@ proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, g await s.mux(result) # mux it if possible s.connections[conn.peerInfo.get().id] = result -proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} = +proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} = trace "upgrading incoming connection" let ms = newMultistream() @@ -189,41 +192,40 @@ proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} = # handle secured connections await ms.handle(conn) -proc dial*(s: Switch, - peer: PeerInfo, - proto: string = ""): - Future[Connection] {.async.} = +proc dial*(s: Switch, + peer: PeerInfo, + proto: string = ""): + Future[Connection] {.async.} = let id = peer.id - trace "dialing peer", peer = id - for t in s.transports: # for each transport - for a in peer.addrs: # for each address - if t.handles(a): # check if it can dial it - if id notin s.connections: - trace "dialing address", address = $a + trace "Dialing peer", peer = id + result = s.connections.getOrDefault(id) + if result == nil or result.closed: + for t in s.transports: # for each transport + for a in peer.addrs: # for each address + if t.handles(a): # check if it can dial it + trace "Dialing address", address = $a result = await t.dial(a) # make sure to assign the peer to the connection - result.peerInfo = some(peer) - result = await s.upgradeOutgoing(result) - result.closeEvent.wait().addCallback( - proc(udata: pointer) = + result.peerInfo = some peer + result = await s.upgradeOutgoing(result) + result.closeEvent.wait().addCallback do (udata: pointer): asyncCheck s.cleanupConn(result) - ) + break + else: + trace "Reusing existing connection" - if proto.len > 0 and not result.closed: - let stream = await s.getMuxedStream(peer) - if stream.isSome: - trace "connection is muxed, return muxed stream" - result = stream.get() - trace "attempting to select remote", proto = proto + if proto.len > 0 and not result.closed: + let stream = await s.getMuxedStream(peer) + if stream.isSome: + trace "Connection is muxed, return muxed stream" + result = stream.get() + trace "Attempting to select remote", proto = proto - if not (await s.ms.select(result, proto)): - error "unable to select protocol: ", proto = proto - raise newException(CatchableError, - &"unable to select protocol: {proto}") + if not await s.ms.select(result, proto): + error "Unable to select sub-protocol", proto = proto + raise newException(CatchableError, &"unable to select protocol: {proto}") - break # don't dial more than one addr on the same transport - -proc mount*[T: LPProtocol](s: Switch, proto: T) {.gcsafe.} = +proc mount*[T: LPProtocol](s: Switch, proto: T) {.gcsafe.} = if isNil(proto.handler): raise newException(CatchableError, "Protocol has to define a handle method or proc") @@ -234,7 +236,7 @@ proc mount*[T: LPProtocol](s: Switch, proto: T) {.gcsafe.} = s.ms.addHandler(proto.codec, proto) -proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} = +proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} = trace "starting switch" proc handle(conn: Connection): Future[void] {.async, closure, gcsafe.} = @@ -253,13 +255,13 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} = var server = await t.listen(a, handle) s.peerInfo.addrs[i] = t.ma # update peer's address startFuts.add(server) - + if s.pubSub.isSome: await s.pubSub.get().start() result = startFuts # listen for incoming connections -proc stop*(s: Switch) {.async.} = +proc stop*(s: Switch) {.async.} = trace "stopping switch" if s.pubSub.isSome: @@ -274,21 +276,21 @@ proc subscribeToPeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} = let conn = await s.dial(peerInfo, s.pubSub.get().codec) await s.pubSub.get().subscribeToPeer(conn) -proc subscribe*(s: Switch, topic: string, handler: TopicHandler): Future[void] {.gcsafe.} = +proc subscribe*(s: Switch, topic: string, handler: TopicHandler): Future[void] {.gcsafe.} = ## subscribe to a pubsub topic if s.pubSub.isNone: raise newNoPubSubException() result = s.pubSub.get().subscribe(topic, handler) -proc unsubscribe*(s: Switch, topics: seq[TopicPair]): Future[void] {.gcsafe.} = +proc unsubscribe*(s: Switch, topics: seq[TopicPair]): Future[void] {.gcsafe.} = ## unsubscribe from topics if s.pubSub.isNone: raise newNoPubSubException() result = s.pubSub.get().unsubscribe(topics) -proc publish*(s: Switch, topic: string, data: seq[byte]): Future[void] {.gcsafe.} = +proc publish*(s: Switch, topic: string, data: seq[byte]): Future[void] {.gcsafe.} = # pubslish to pubsub topic if s.pubSub.isNone: raise newNoPubSubException() @@ -312,7 +314,7 @@ proc newSwitch*(peerInfo: PeerInfo, result.secureManagers = initTable[string, Secure]() let s = result # can't capture result - result.streamHandler = proc(stream: Connection) {.async, gcsafe.} = + result.streamHandler = proc(stream: Connection) {.async, gcsafe.} = trace "handling connection for", peerInfo = stream.peerInfo await s.ms.handle(stream) # handle incoming connection @@ -337,3 +339,4 @@ proc newSwitch*(peerInfo: PeerInfo, if pubSub.isSome: result.pubSub = pubSub result.mount(pubSub.get()) + diff --git a/tests/pubsub/testfloodsub.nim b/tests/pubsub/testfloodsub.nim index 7e98d8076..937fc1fff 100644 --- a/tests/pubsub/testfloodsub.nim +++ b/tests/pubsub/testfloodsub.nim @@ -9,7 +9,7 @@ import unittest, sequtils, options import chronos -import utils, +import utils, ../../libp2p/[switch, crypto/crypto] suite "FloodSub": @@ -72,7 +72,7 @@ suite "FloodSub": var nodes: seq[Switch] = newSeq[Switch]() for i in 0..<10: - nodes.add(createNode()) + nodes.add(newStandardSwitch()) var awaitters: seq[Future[void]] for node in nodes: @@ -104,7 +104,7 @@ suite "FloodSub": var nodes: seq[Switch] = newSeq[Switch]() for i in 0..<10: - nodes.add(createNode(none(PrivateKey), "/ip4/127.0.0.1/tcp/0", true)) + nodes.add newStandardSwitch(triggerSelf = true) var awaitters: seq[Future[void]] for node in nodes: @@ -118,7 +118,7 @@ suite "FloodSub": for node in nodes: await node.publish("foobar", cast[seq[byte]]("Hello!")) await sleepAsync(10.millis) - + await sleepAsync(100.millis) await allFutures(nodes.mapIt(it.stop())) diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index aee9130c4..3c980b06c 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -63,7 +63,7 @@ suite "GossipSub": var nodes: seq[Switch] = newSeq[Switch]() for i in 0..<2: - nodes.add(createNode(gossip = true)) + nodes.add newStandardSwitch(gossip = true) var awaitters: seq[Future[void]] for node in nodes: @@ -143,7 +143,7 @@ suite "GossipSub": var nodes: seq[Switch] = newSeq[Switch]() for i in 0..<2: - nodes.add(createNode(gossip = true)) + nodes.add newStandardSwitch(gossip = true) var awaitters: seq[Future[void]] for node in nodes: @@ -384,7 +384,7 @@ suite "GossipSub": var awaitters: seq[Future[void]] for i in 0..<10: - nodes.add(createNode(none(PrivateKey), "/ip4/127.0.0.1/tcp/0", true, true)) + nodes.add newStandardSwitch(triggerSelf = true, gossip = true) awaitters.add((await nodes[i].start())) var seen: Table[string, int] diff --git a/tests/pubsub/utils.nim b/tests/pubsub/utils.nim index 3b7ef9f68..55af70d2f 100644 --- a/tests/pubsub/utils.nim +++ b/tests/pubsub/utils.nim @@ -1,57 +1,11 @@ import options, tables import chronos -import ../../libp2p/[switch, - peer, - connection, - multiaddress, - peerinfo, - muxers/muxer, - crypto/crypto, - muxers/mplex/mplex, - muxers/mplex/types, - protocols/identify, - transports/transport, - transports/tcptransport, - protocols/secure/secure, - protocols/secure/secio, - protocols/pubsub/pubsub, - protocols/pubsub/gossipsub, - protocols/pubsub/floodsub] - -proc createMplex(conn: Connection): Muxer = - result = newMplex(conn) - -proc createNode*(privKey: Option[PrivateKey] = none(PrivateKey), - address: string = "/ip4/127.0.0.1/tcp/0", - triggerSelf: bool = false, - gossip: bool = false): Switch = - var seckey = privKey - if privKey.isNone: - seckey = some(PrivateKey.random(RSA)) - - var peerInfo = PeerInfo.init(seckey.get(), @[Multiaddress.init(address)]) - let mplexProvider = newMuxerProvider(createMplex, MplexCodec) - let transports = @[Transport(newTransport(TcpTransport))] - let muxers = [(MplexCodec, mplexProvider)].toTable() - let identify = newIdentify(peerInfo) - let secureManagers = [(SecioCodec, Secure(newSecio(seckey.get())))].toTable() - - var pubSub: Option[PubSub] - if gossip: - pubSub = some(PubSub(newPubSub(GossipSub, peerInfo, triggerSelf))) - else: - pubSub = some(PubSub(newPubSub(FloodSub, peerInfo, triggerSelf))) - - result = newSwitch(peerInfo, - transports, - identify, - muxers, - secureManagers = secureManagers, - pubSub = pubSub) +import ../../libp2p/standard_setup +export standard_setup proc generateNodes*(num: Natural, gossip: bool = false): seq[Switch] = for i in 0..