feat: added length prefixed read/write methods

This commit is contained in:
Dmitriy Ryajov 2019-08-23 16:16:46 -06:00
parent cb4ff262b0
commit f23106029f
1 changed files with 31 additions and 2 deletions

View File

@ -8,9 +8,9 @@
## those terms. ## those terms.
import chronos import chronos
import peerinfo, multiaddress, readerwriter, peerinfo import peerinfo, multiaddress, readerwriter, peerinfo, varint, vbuffer
const DefaultReadSize = 1024 const DefaultReadSize: uint = 64*1024
type type
Connection* = ref object of ReadWrite Connection* = ref object of ReadWrite
@ -49,6 +49,35 @@ method write*(s: Connection, msg: seq[byte], msglen = -1) {.async.} =
method close*(s: Connection) {.async.} = method close*(s: Connection) {.async.} =
result = s.stream.close() result = s.stream.close()
proc readLp*(s: Connection): Future[seq[byte]] {.async.} =
## read lenght prefixed msg
var
size: uint
length: int
res: VarintStatus
var buffer = newSeq[byte](10)
try:
for i in 0..<len(buffer):
await s.readExactly(addr buffer[i], 1)
res = LP.getUVarint(buffer.toOpenArray(0, i), length, size)
if res == VarintStatus.Success:
break
if res != VarintStatus.Success or size > DefaultReadSize:
buffer.setLen(0)
buffer.setLen(size)
await s.readExactly(addr buffer[0], int(size))
except TransportIncompleteError:
buffer.setLen(0)
result = buffer
proc writeLp*(s: Connection, msg: string | seq[byte]) {.async.} =
## write lenght prefixed
var buf = initVBuffer()
buf.writeSeq(msg)
buf.finish()
result = s.write(buf.buffer)
method getPeerInfo* (c: Connection): Future[PeerInfo] {.base, async.} = method getPeerInfo* (c: Connection): Future[PeerInfo] {.base, async.} =
## get up to date peer info ## get up to date peer info
## TODO: implement PeerInfo refresh over identify ## TODO: implement PeerInfo refresh over identify