move streams to own dir

This commit is contained in:
Dmitriy Ryajov 2019-09-01 11:31:24 -06:00
parent 85b0b5d1da
commit cbf0f4f186
8 changed files with 105 additions and 65 deletions

View File

@ -8,7 +8,7 @@
## those terms.
import chronos
import peerinfo, multiaddress, stream, peerinfo, varint, vbuffer
import peerinfo, multiaddress, stream/lpstream, peerinfo, varint, vbuffer
const DefaultReadSize: uint = 64*1024

View File

@ -8,7 +8,7 @@
## those terms.
import chronos
import connection, transport, stream,
import connection, transport,
peerinfo, multiaddress
type

View File

@ -1,51 +0,0 @@
## Nim-LibP2P
## Copyright (c) 2018 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import chronos
type LPStream* = ref object of RootObj
closed*: bool
method read*(s: LPStream, n = -1): Future[seq[byte]]
{.base, async, gcsafe.} =
discard
method readExactly*(s: LPStream, pbytes: pointer, nbytes: int): Future[void]
{.base, async, gcsafe.} =
discard
method readLine*(s: LPStream, limit = 0, sep = "\r\n"): Future[string]
{.base, async, gcsafe.} =
discard
method readOnce*(s: LPStream, pbytes: pointer, nbytes: int): Future[int]
{.base, async, gcsafe.} =
discard
method readUntil*(s: LPStream,
pbytes: pointer, nbytes: int,
sep: seq[byte]): Future[int]
{.base, async, gcsafe.} =
discard
method write*(s: LPStream, pbytes: pointer, nbytes: int)
{.base, async, gcsafe.} =
discard
method write*(s: LPStream, msg: string, msglen = -1)
{.base, async, gcsafe.} =
discard
method write*(s: LPStream, msg: seq[byte], msglen = -1)
{.base, async, gcsafe.} =
discard
method close*(s: LPStream)
{.base, async, gcsafe.} =
discard

View File

@ -8,7 +8,7 @@
## those terms.
import chronos
import stream
import lpstream
type ChronosStream* = ref object of LPStream
reader: AsyncStreamReader

View File

@ -0,0 +1,82 @@
## Nim-LibP2P
## Copyright (c) 2018 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import chronos
type
LPStream* = ref object of RootObj
closed*: bool
LPStreamError* = object of CatchableError
LPStreamIncompleteError* = object of LPStreamError
LPStreamIncorrectError* = object of Defect
LPStreamLimitError* = object of LPStreamError
LPStreamReadError* = object of LPStreamError
par*: ref Exception
LPStreamWriteError* = object of LPStreamError
par*: ref Exception
proc newAsyncStreamReadError*(p: ref Exception): ref Exception {.inline.} =
var w = newException(AsyncStreamReadError, "Read stream failed")
w.msg = w.msg & ", originated from [" & $p.name & "] " & p.msg
w.par = p
result = w
proc newLPStreamWriteError*(p: ref Exception): ref Exception {.inline.} =
var w = newException(LPStreamWriteError, "Write stream failed")
w.msg = w.msg & ", originated from [" & $p.name & "] " & p.msg
w.par = p
result = w
proc newLPStreamIncompleteError*(): ref Exception {.inline.} =
result = newException(LPStreamIncompleteError, "Incomplete data received")
proc newLPStreamLimitError*(): ref Exception {.inline.} =
result = newException(LPStreamLimitError, "Buffer limit reached")
proc newLPStreamIncorrectError*(m: string): ref Exception {.inline.} =
result = newException(LPStreamIncorrectError, m)
method read*(s: LPStream, n = -1): Future[seq[byte]]
{.base, async, gcsafe.} =
assert(false, "not implemented!")
method readExactly*(s: LPStream, pbytes: pointer, nbytes: int): Future[void]
{.base, async, gcsafe.} =
assert(false, "not implemented!")
method readLine*(s: LPStream, limit = 0, sep = "\r\n"): Future[string]
{.base, async, gcsafe.} =
assert(false, "not implemented!")
method readOnce*(s: LPStream, pbytes: pointer, nbytes: int): Future[int]
{.base, async, gcsafe.} =
assert(false, "not implemented!")
method readUntil*(s: LPStream,
pbytes: pointer, nbytes: int,
sep: seq[byte]): Future[int]
{.base, async, gcsafe.} =
assert(false, "not implemented!")
method write*(s: LPStream, pbytes: pointer, nbytes: int)
{.base, async, gcsafe.} =
assert(false, "not implemented!")
method write*(s: LPStream, msg: string, msglen = -1)
{.base, async, gcsafe.} =
assert(false, "not implemented!")
method write*(s: LPStream, msg: seq[byte], msglen = -1)
{.base, async, gcsafe.} =
assert(false, "not implemented!")
method close*(s: LPStream)
{.base, async, gcsafe.} =
assert(false, "not implemented!")

View File

@ -9,7 +9,8 @@
import tables, sequtils
import chronos
import connection, transport, stream,
import connection, transport,
stream/lpstream,
multistream, protocol,
peerinfo, multiaddress

View File

@ -10,7 +10,7 @@
import chronos
import transport, wire, connection,
multiaddress, connection,
multicodec, chronosstream
multicodec, stream/chronosstream
type TcpTransport* = ref object of Transport
server*: StreamServer

View File

@ -1,7 +1,7 @@
import unittest, strutils, sequtils, sugar
import chronos
import ../libp2p/connection, ../libp2p/multistream,
../libp2p/stream, ../libp2p/connection,
../libp2p/stream/lpstream, ../libp2p/connection,
../libp2p/multiaddress, ../libp2p/transport,
../libp2p/tcptransport, ../libp2p/protocol,
../libp2p/crypto/crypto, ../libp2p/peerinfo,
@ -14,7 +14,7 @@ type
method readExactly*(s: TestSelectStream,
pbytes: pointer,
nbytes: int): Future[void] {.async.} =
nbytes: int): Future[void] {.async, gcsafe.} =
case s.step:
of 1:
var buf = newSeq[byte](1)
@ -38,6 +38,9 @@ method readExactly*(s: TestSelectStream,
cstring("\0x3na\n"),
"\0x3na\n".len())
method write*(s: TestSelectStream, msg: string, msglen = -1)
{.async, gcsafe.} = discard
proc newTestSelectStream(): TestSelectStream =
new result
result.step = 1
@ -52,7 +55,7 @@ type
method readExactly*(s: TestLsStream,
pbytes: pointer,
nbytes: int): Future[void] {.async.} =
nbytes: int): Future[void] {.async, gcsafe.} =
case s.step:
of 1:
var buf = newSeq[byte](1)
@ -72,14 +75,17 @@ method readExactly*(s: TestLsStream,
var buf = "ls\n"
copyMem(pbytes, addr buf[0], buf.len())
else:
copyMem(pbytes,
cstring("\0x3na\n"),
"\0x3na\n".len())
copyMem(pbytes, cstring("\0x3na\n"), "\0x3na\n".len())
method write*(s: TestLsStream, msg: seq[byte], msglen = -1) {.async.} =
method write*(s: TestLsStream, msg: seq[byte], msglen = -1) {.async, gcsafe.} =
if s.step == 4:
await s.ls(msg)
method write*(s: TestLsStream, msg: string, msglen = -1)
{.async, gcsafe.} = discard
method close*(s: TestLsStream) {.async, gcsafe.} = discard
proc newTestLsStream(ls: LsHandler): TestLsStream =
new result
result.ls = ls
@ -95,7 +101,7 @@ type
method readExactly*(s: TestNaStream,
pbytes: pointer,
nbytes: int): Future[void] {.async.} =
nbytes: int): Future[void] {.async, gcsafe.} =
case s.step:
of 1:
var buf = newSeq[byte](1)
@ -119,10 +125,12 @@ method readExactly*(s: TestNaStream,
cstring("\0x3na\n"),
"\0x3na\n".len())
method write*(s: TestNaStream, msg: string, msglen = -1) {.async.} =
method write*(s: TestNaStream, msg: string, msglen = -1) {.async, gcsafe.} =
if s.step == 4:
await s.na(msg)
method close*(s: TestNaStream) {.async, gcsafe.} = discard
proc newTestNaStream(na: NaHandler): TestNaStream =
new result
result.na = na