39 lines
1.1 KiB
Nim
39 lines
1.1 KiB
Nim
import pkg/chronos
|
|
import pkg/protobuf_serialization
|
|
import pkg/libp2p/stream/connection
|
|
import ./messages
|
|
|
|
export messages
|
|
|
|
const MaxMessageSize = 8 * 1024 * 1024
|
|
|
|
type
|
|
BitswapStream* = ref object
|
|
bytestream: LpStream
|
|
messages: AsyncQueue[Message]
|
|
|
|
proc new*(t: type BitswapStream, bytestream: LpStream): BitswapStream =
|
|
BitswapStream(bytestream: bytestream, messages: newAsyncQueue[Message](1))
|
|
|
|
proc readOnce(stream: BitswapStream) {.async.} =
|
|
let encoded = await stream.bytestream.readLp(MaxMessageSize)
|
|
let message = Protobuf.decode(encoded, Message)
|
|
await stream.messages.put(message)
|
|
|
|
proc readLoop*(stream: BitswapStream) {.async.} =
|
|
while true:
|
|
try:
|
|
await stream.readOnce()
|
|
except LPStreamEOFError:
|
|
break
|
|
|
|
proc write*(stream: BitswapStream, message: Message) {.async.} =
|
|
let encoded = Protobuf.encode(message)
|
|
await stream.bytestream.writeLp(encoded)
|
|
|
|
proc read*(stream: BitswapStream): Future[Message] {.async.} =
|
|
result = await stream.messages.get()
|
|
|
|
proc close*(stream: BitswapStream) {.async.} =
|
|
await stream.bytestream.close()
|