feat: added stream abstraction and chronos wrapper

This commit is contained in:
Dmitriy Ryajov 2019-08-22 15:35:47 -06:00
parent 5f8e633c0f
commit 5d07145a0a
13 changed files with 203 additions and 43 deletions

16
.vscode/launch.json vendored Normal file
View File

@ -0,0 +1,16 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"type": "lldb-mi",
"request": "launch",
"name": "debug nim",
"target": "${fileDirname}/${fileBasenameNoExtension}",
"cwd": "${workspaceRoot}",
"valuesFormatting": "parseText"
}
]
}

2
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,2 @@
{
}

23
.vscode/tasks.json vendored Normal file
View File

@ -0,0 +1,23 @@
{
// See https://go.microsoft.com/fwlink/?LinkId=733558
// for the documentation about the tasks.json format
"version": "2.0.0",
"tasks": [
{
"label": "nim-build-file",
"type": "shell",
"command": "nimble c --d:debug --linedir:on --debuginfo --debugger:native --verbose ${file}",
"problemMatcher": [],
"group": {
"kind": "build",
"isDefault": true
}
},
{
"label": "nim-clean-file",
"type": "shell",
"command": "rm ${fileBasename}",
"problemMatcher": []
}
]
}

1
chronos Submodule

@ -0,0 +1 @@
Subproject commit 6d7864aa84010271cb8a4a8614c722bae50702cd

59
libp2p/chronosstream.nim Normal file
View File

@ -0,0 +1,59 @@
## Nim-LibP2P
## Copyright (c) 2018 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import chronos
import readerwriter
type ChronosStream* = ref object of ReadWrite
reader: AsyncStreamReader
writer: AsyncStreamWriter
server: StreamServer
client: StreamTransport
proc newChronosStream*(server: StreamServer,
client: StreamTransport): ChronosStream =
new result
result.server = server
result.client = client
result.reader = newAsyncStreamReader(client)
result.writer = newAsyncStreamWriter(client)
method read*(s: ChronosStream, n = -1): Future[seq[byte]] {.async.} =
result = await s.reader.read(n)
method readExactly*(s: ChronosStream, pbytes: pointer, nbytes: int): Future[void] {.async.} =
result = s.readExactly(pbytes, nbytes)
method readLine*(s: ChronosStream, limit = 0, sep = "\r\n"): Future[string] {.async.} =
result = await s.reader.readLine(limit, sep)
method readOnce*(s: ChronosStream, pbytes: pointer, nbytes: int): Future[int] {.async.} =
result = await s.reader.readOnce(pbytes, nbytes)
method readUntil*(s: ChronosStream, pbytes: pointer, nbytes: int, sep: seq[byte]): Future[int] {.async.} =
result = await s.reader.readUntil(pbytes, nbytes, sep)
method write*(s: ChronosStream, pbytes: pointer, nbytes: int) {.async.} =
result = s.writer.write(pbytes, nbytes)
method write*(s: ChronosStream, msg: string, msglen = -1) {.async.} =
result = s.writer.write(msg, msglen)
method write*(s: ChronosStream, msg: seq[byte], msglen = -1) {.async.} =
result = s.writer.write(msg, msglen)
method close*(s: ChronosStream) {.async.} =
await s.reader.closeWait()
await s.writer.finish()
await s.writer.closeWait()
await s.client.closeWait()
s.server.stop()
s.server.close()

View File

@ -7,47 +7,47 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
import chronos, peerinfo, multiaddress import chronos
import peerinfo, multiaddress, readerwriter, peerinfo
const DefaultReadSize = 1024 const DefaultReadSize = 1024
type type
Connection* = ref object of RootObj Connection* = ref object of ReadWrite
reader: AsyncStreamReader peerInfo*: PeerInfo
writer: AsyncStreamWriter stream: ReadWrite
server: StreamServer
client: StreamTransport
isOpen*: bool
proc newConnection*(server: StreamServer, proc newConnection*(stream: ReadWrite): Connection =
client: StreamTransport): Connection = ## create a new Connection for the specified async reader/writer
## create a new Connection for the specified async stream reader/writer
new result new result
result.isOpen = false result.stream = stream
result.server = server
result.client = client
result.reader = newAsyncStreamReader(client) method read*(s: Connection, n = -1): Future[seq[byte]] {.async.} =
result.writer = newAsyncStreamWriter(client) result = await s.stream.read(n)
method read* (c: Connection, size: int = DefaultReadSize): Future[seq[byte]] {.base, async, gcsafe.} = method readExactly*(s: Connection, pbytes: pointer, nbytes: int): Future[void] {.async.} =
## read DefaultReadSize (1024) bytes or `size` bytes if specified result = s.stream.readExactly(pbytes, nbytes)
result = await c.reader.read(size)
method write* (c: Connection, data: pointer, size: int): Future[void] {.base, async.} = method readLine*(s: Connection, limit = 0, sep = "\r\n"): Future[string] {.async.} =
## write bytes pointed to by `data` up to `size` size result = await s.stream.readLine(limit, sep)
discard c.writer.write(data, size)
method close* (c: Connection): Future[void] {.base, async.} = method readOnce*(s: Connection, pbytes: pointer, nbytes: int): Future[int] {.async.} =
## close connection result = await s.stream.readOnce(pbytes, nbytes)
await c.reader.closeWait()
await c.writer.finish() method readUntil*(s: Connection, pbytes: pointer, nbytes: int, sep: seq[byte]): Future[int] {.async.} =
await c.writer.closeWait() result = await s.stream.readUntil(pbytes, nbytes, sep)
await c.client.closeWait() method write*(s: Connection, pbytes: pointer, nbytes: int) {.async.} =
c.server.stop() result = s.stream.write(pbytes, nbytes)
c.server.close()
method write*(s: Connection, msg: string, msglen = -1) {.async.} =
result = s.stream.write(msg, msglen)
method write*[T](s: Connection, msg: seq[T], msglen = -1) {.async.} =
result = s.stream.write(msg, msglen)
method close*(s: Connection) {.async.} =
result = s.stream.close()
method getPeerInfo* (c: Connection): Future[PeerInfo] {.base, async.} = method getPeerInfo* (c: Connection): Future[PeerInfo] {.base, async.} =
## get up to date peer info ## get up to date peer info

1
libp2p/crypto/BearSSL Submodule

@ -0,0 +1 @@
Subproject commit 2893441f2efd4603ddd6d7f49011bdda096a4a87

View File

@ -0,0 +1,11 @@
## Nim-LibP2P
## Copyright (c) 2018 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
type MultisteamSelect = ref object of RootObj

48
libp2p/readerwriter.nim Normal file
View File

@ -0,0 +1,48 @@
## Nim-LibP2P
## Copyright (c) 2018 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import chronos
type ReadWrite* = ref object of RootObj
method read*(s: ReadWrite, n = -1): Future[seq[byte]]
{.base, async.} =
discard
method readExactly*(s: ReadWrite, pbytes: pointer, nbytes: int): Future[void]
{.base, async.} =
discard
method readLine*(s: ReadWrite, limit = 0, sep = "\r\n"): Future[string]
{.base, async.} =
discard
method readOnce*(s: ReadWrite, pbytes: pointer, nbytes: int): Future[int]
{.base, async.} =
discard
method readUntil*(s: ReadWrite, pbytes: pointer, nbytes: int, sep: seq[byte]): Future[int]
{.base, async.} =
discard
method write*(w: ReadWrite, pbytes: pointer, nbytes: int)
{.base, async.} =
discard
method write*(w: ReadWrite, msg: string, msglen = -1)
{.base, async.} =
discard
method write*(w: ReadWrite, msg: seq[byte], msglen = -1)
{.base, async.} =
discard
method close*(w: ReadWrite)
{.base, async.} =
discard

View File

@ -8,11 +8,21 @@
## those terms. ## those terms.
import chronos import chronos
import transport, wire, connection, multiaddress, connection, multicodec import transport, wire, connection, multiaddress, connection, multicodec, chronosstream
type TcpTransport* = ref object of Transport type TcpTransport* = ref object of Transport
server*: StreamServer server*: StreamServer
method connHandler*(t: Transport,
server: StreamServer,
client: StreamTransport): Future[Connection] {.base, gcsafe, async.} =
let conn: Connection = newConnection(newChronosStream(server, client))
let handlerFut = if t.handler == nil: nil else: t.handler(conn)
let connHolder: ConnHolder = ConnHolder(connection: conn,
connFuture: handlerFut)
t.connections.add(connHolder)
result = conn
proc connCb(server: StreamServer, proc connCb(server: StreamServer,
client: StreamTransport) {.gcsafe, async.} = client: StreamTransport) {.gcsafe, async.} =
let t: Transport = cast[Transport](server.udata) let t: Transport = cast[Transport](server.udata)

BIN
libp2p/transport Executable file

Binary file not shown.

View File

@ -8,7 +8,7 @@
## those terms. ## those terms.
import chronos import chronos
import peerinfo, connection, multiaddress, multicodec import peerinfo, connection, multiaddress, multicodec, readerwriter
type type
ConnHandler* = proc (conn: Connection): Future[void] {.gcsafe.} ConnHandler* = proc (conn: Connection): Future[void] {.gcsafe.}
@ -23,16 +23,6 @@ type
handler*: ConnHandler handler*: ConnHandler
multicodec*: MultiCodec multicodec*: MultiCodec
method connHandler*(t: Transport,
server: StreamServer,
client: StreamTransport): Future[Connection] {.base, gcsafe, async.} =
let conn: Connection = newConnection(server, client)
let handlerFut = if t.handler == nil: nil else: t.handler(conn)
let connHolder: ConnHolder = ConnHolder(connection: conn,
connFuture: handlerFut)
t.connections.add(connHolder)
result = conn
method init*(t: Transport) {.base, error: "not implemented".} = method init*(t: Transport) {.base, error: "not implemented".} =
## perform protocol initialization ## perform protocol initialization
discard discard
@ -45,8 +35,7 @@ method close*(t: Transport) {.base, async.} =
## stop and cleanup the transport ## stop and cleanup the transport
## including all outstanding connections ## including all outstanding connections
for c in t.connections: for c in t.connections:
if c.connection.isOpen: await c.connection.close()
await c.connection.close()
method listen*(t: Transport, ma: MultiAddress, handler: ConnHandler) {.base, async.} = method listen*(t: Transport, ma: MultiAddress, handler: ConnHandler) {.base, async.} =
## listen for incoming connections ## listen for incoming connections

Binary file not shown.