From 5b7e8d99bd3ec4410f32870495d559bf5d172254 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Tue, 20 Aug 2019 10:18:15 -0600 Subject: [PATCH] initial commit for native libp2p implementation --- libp2p/connection.nim | 45 ++++++++++++++++++++++++++++++ libp2p/peerinfo.nim | 15 ++++++++++ libp2p/tcptransport.nim | 62 +++++++++++++++++++++++++++++++++++++++++ libp2p/transport.nim | 54 +++++++++++++++++++++++++++++++++++ 4 files changed, 176 insertions(+) create mode 100644 libp2p/connection.nim create mode 100644 libp2p/peerinfo.nim create mode 100644 libp2p/tcptransport.nim create mode 100644 libp2p/transport.nim diff --git a/libp2p/connection.nim b/libp2p/connection.nim new file mode 100644 index 000000000..740fa7bc0 --- /dev/null +++ b/libp2p/connection.nim @@ -0,0 +1,45 @@ +## Nim-LibP2P +## Copyright (c) 2018 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import chronos, peerinfo, multiaddress + +const DefaultReadSize = 1024 + +type + Connection* = ref object of RootObj + reader: AsyncStreamReader + writter: AsyncStreamWriter + +proc newConnection*(reader: AsyncStreamReader, writter: AsyncStreamWriter): Connection = + ## create a new Connection for the specified async stream reader/writter + new result + result.reader = reader + result.writter = writter + +method read* (c: Connection, size: int = DefaultReadSize): Future[seq[byte]] {.base, async.} = + ## read DefaultReadSize (1024) bytes or `size` bytes if specified + result = await c.reader.read(size) + +method write* (c: Connection, data: pointer, size: int): Future[void] {.base, async.} = + ## write bytes pointed to by `data` up to `size` size + result = c.writter.write(data, size) + +method close* (c: Connection): Future[void] {.base, async.} = + ## close connection + ## TODO: figure out how to correctly close the streams and underlying resource + discard + +method getPeerInfo* (c: Connection): Future[PeerInfo] {.base, async.} = + ## get up to date peer info + ## TODO: implement PeerInfo refresh over identify + discard + +method getObservedAddrs(c: Connection): Future[seq[MultiAddress]] {.base, async.} = + ## get resolved multiaddresses for the connection + discard diff --git a/libp2p/peerinfo.nim b/libp2p/peerinfo.nim new file mode 100644 index 000000000..59746de94 --- /dev/null +++ b/libp2p/peerinfo.nim @@ -0,0 +1,15 @@ +## Nim-LibP2P +## Copyright (c) 2018 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import peer, multiaddress + +type PeerInfo* = ref object of RootRef + peerId*: PeerID + addrs*: seq[MultiAddress] + protocols*: seq[string] diff --git a/libp2p/tcptransport.nim b/libp2p/tcptransport.nim new file mode 100644 index 000000000..6e062f96e --- /dev/null +++ b/libp2p/tcptransport.nim @@ -0,0 +1,62 @@ +## Nim-LibP2P +## Copyright (c) 2018 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import chronos +import transport, wire, connection, multiaddress, connection, multicodec + +type TcpTransport* = ref object of Transport + fd*: AsyncFD + server*: StreamServer + +proc connHandler(server: StreamServer, + client: StreamTransport): Future[Connection] {.gcsafe, async.} = + let t: TcpTransport = cast[TcpTransport](server.udata) + let conn: Connection = newConnection(newAsyncStreamReader(client), + newAsyncStreamWriter(client)) + let connHolder: ConnHolder = ConnHolder(connection: conn, + connFuture: t.handler(conn)) + t.connections.add(connHolder) + result = conn + +proc connCb(server: StreamServer, + client: StreamTransport) {.gcsafe, async.} = + discard connHandler(server, client) + +method init*(t: TcpTransport) = + t.multicodec = multiCodec("tcp") + +method close*(t: TcpTransport): Future[void] {.async.} = + ## start the transport + result = t.server.closeWait() + +method listen*(t: TcpTransport): Future[void] {.async.} = + let listenFuture: Future[void] = newFuture[void]() + result = listenFuture + + ## listen on the transport + discard createStreamServer(t.ma, + connCb, + {}, + t, + asyncInvalidSocket, + 100, + DefaultStreamBufferSize, + nil, + proc (server: StreamServer, + fd: AsyncFD): StreamTransport {.gcsafe.} = + t.server = server + t.fd = fd + listenFuture.complete() + ) + +method dial*(t: TcpTransport, + address: MultiAddress): Future[Connection] {.async.} = + ## dial a peer + let client: StreamTransport = await connect(address) + result = await connHandler(t.server, client) diff --git a/libp2p/transport.nim b/libp2p/transport.nim new file mode 100644 index 000000000..94d8cc8cc --- /dev/null +++ b/libp2p/transport.nim @@ -0,0 +1,54 @@ +## Nim-LibP2P +## Copyright (c) 2018 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import chronos +import peerinfo, connection, multiaddress, multicodec + +type + ConnHandler* = proc (conn: Connection): Future[void] {.gcsafe.} + + ConnHolder* = object + connection*: Connection + connFuture*: Future[void] + + Transport* = ref object of RootObj + ma*: Multiaddress + connections*: seq[ConnHolder] + handler*: ConnHandler + multicodec*: MultiCodec + +method init*(t: Transport) {.base.} = + ## perform protocol initialization + discard + +proc new*(t: typedesc[Transport], + ma: MultiAddress, + handler: ConnHandler): t = + new result + result.ma = ma + result.handler = handler + result.init() + +method close*(t: Transport) {.base, async.} = + ## start the transport + discard + +method listen*(t: Transport) {.base, async.} = + ## stop the transport + discard + +method dial*(t: Transport, address: MultiAddress): Future[Connection] {.base, async.} = + ## dial a peer + discard + +method supports(t: Transport, address: MultiAddress): bool {.base.} = + ## check if transport supportes the multiaddress + # TODO: this should implement generic logic that would use the multicodec + # declared in the multicodec field and set by each individual transport + result = true