misc: pubsub/floodsub and logging

This commit is contained in:
Dmitriy Ryajov 2019-09-11 20:10:38 -06:00
parent 4d9444afe9
commit 8920cd7d60
15 changed files with 183 additions and 95 deletions

View File

@ -47,7 +47,7 @@ type
skkey*: SkPublicKey skkey*: SkPublicKey
of ECDSA: of ECDSA:
eckey*: EcPublicKey eckey*: EcPublicKey
of NoSupport: else:
discard discard
PrivateKey* = object PrivateKey* = object
@ -60,7 +60,7 @@ type
skkey*: SkPrivateKey skkey*: SkPrivateKey
of ECDSA: of ECDSA:
eckey*: EcPrivateKey eckey*: EcPrivateKey
of NoSupport: else:
discard discard
KeyPair* = object KeyPair* = object

View File

@ -75,4 +75,4 @@ proc writeMsg*(conn: Connection,
id: uint, id: uint,
msgType: MessageType, msgType: MessageType,
data: string) {.async, gcsafe.} = data: string) {.async, gcsafe.} =
result = conn.writeMsg(id, msgType, cast[seq[byte]](toSeq(data.items))) result = conn.writeMsg(id, msgType, cast[seq[byte]](data))

View File

@ -28,7 +28,7 @@ import coder, types, channel,
../../stream/lpstream ../../stream/lpstream
logScope: logScope:
topic = "mplex" topic = "Mplex"
type type
Mplex* = ref object of Muxer Mplex* = ref object of Muxer
@ -106,6 +106,8 @@ method handle*(m: Mplex) {.async, gcsafe.} =
await channel.resetByRemote() await channel.resetByRemote()
break break
else: raise newMplexUnknownMsgError() else: raise newMplexUnknownMsgError()
except:
debug "exception occurred", exception = getCurrentExceptionMsg()
finally: finally:
await m.connection.close() await m.connection.close()

View File

@ -7,7 +7,7 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
import sequtils, tables, options, sets import sequtils, tables, options, sets, sequtils, strutils
import chronos, chronicles import chronos, chronicles
import pubsub, import pubsub,
pubsubpeer, pubsubpeer,
@ -30,8 +30,10 @@ proc sendSubs(f: FloodSub,
subscribe: bool) subscribe: bool)
{.async, gcsafe.} = {.async, gcsafe.} =
## send subscriptions to remote peer ## send subscriptions to remote peer
debug "sending subscriptions", peer = peer.id, subscribe = subscribe
var msg: RPCMsg var msg: RPCMsg
for t in topics: for t in topics:
debug "sending topic", peer = peer.id, subscribe = subscribe, topicName = t
msg.subscriptions.add(SubOpts(topic: t, subscribe: subscribe)) msg.subscriptions.add(SubOpts(topic: t, subscribe: subscribe))
await peer.send(@[msg]) await peer.send(@[msg])
@ -47,14 +49,21 @@ proc rpcHandler(f: FloodSub,
## or messages forwarded to this peer ## or messages forwarded to this peer
## ##
debug "processing RPC message", peer = peer.id, msg = rpcMsgs
for m in rpcMsgs: # for all RPC messages for m in rpcMsgs: # for all RPC messages
if m.subscriptions.len > 0: # if there are any subscriptions if m.subscriptions.len > 0: # if there are any subscriptions
if peer.peerInfo.peerId.isSome:
debug "no valid PeerId for peer"
return
for s in m.subscriptions: # subscribe/unsubscribe the peer for each topic for s in m.subscriptions: # subscribe/unsubscribe the peer for each topic
let id = peer.conn.peerInfo.get().peerId.pretty let id = peer.id
if s.subscribe: if s.subscribe:
debug "subscribing to topic", peer = id, subscriptions = m.subscriptions, topic = s.topic
# subscribe the peer to the topic # subscribe the peer to the topic
f.peerTopics[s.topic].incl(id) f.peerTopics[s.topic].incl(id)
else: else:
debug "unsubscribing to topic", peer = id, subscriptions = m.subscriptions, topic = s.topic
# unsubscribe the peer to the topic # unsubscribe the peer to the topic
f.peerTopics[s.topic].excl(id) f.peerTopics[s.topic].excl(id)
@ -70,6 +79,7 @@ proc rpcHandler(f: FloodSub,
if f.topics.contains(t): # check that we're subscribed to it if f.topics.contains(t): # check that we're subscribed to it
await f.topics[t].handler(t, msg.data) # trigger user provided handler await f.topics[t].handler(t, msg.data) # trigger user provided handler
# forward the message to all peers interested it
for p in toSendPeers: for p in toSendPeers:
await f.peers[p].send(@[RPCMsg(messages: m.messages)]) await f.peers[p].send(@[RPCMsg(messages: m.messages)])
@ -89,7 +99,11 @@ proc handleConn(f: FloodSub, conn: Connection) {.async, gcsafe.} =
await f.rpcHandler(peer, msgs) await f.rpcHandler(peer, msgs)
var peer = newPubSubPeer(conn, handleRpc) var peer = newPubSubPeer(conn, handleRpc)
f.peers[peer.conn.peerInfo.get().peerId.pretty()] = peer if peer.peerInfo.peerId.isNone:
debug "no valid PeerInfo for peer"
return
f.peers[peer.id] = peer
let topics = toSeq(f.topics.keys) let topics = toSeq(f.topics.keys)
await f.sendSubs(peer, topics, true) await f.sendSubs(peer, topics, true)
asyncCheck peer.handle() asyncCheck peer.handle()
@ -106,16 +120,19 @@ method init(f: FloodSub) =
f.handler = handler f.handler = handler
f.codec = FloodSubCodec f.codec = FloodSubCodec
method subscribePeer*(f: FloodSub, conn: Connection) {.async, gcsafe.} = method subscribeToPeer*(f: FloodSub, conn: Connection) {.async, gcsafe.} =
await f.handleConn(conn) await f.handleConn(conn)
method publish*(f: FloodSub, method publish*(f: FloodSub,
topic: string, topic: string,
data: seq[byte]) data: seq[byte])
{.async, gcsafe.} = {.async, gcsafe.} =
debug "about to publish message on topic", topic = topic, data = data
let msg = makeMessage(f.peerInfo.peerId.get(), data, topic)
if topic in f.peerTopics:
for p in f.peerTopics[topic]: for p in f.peerTopics[topic]:
f.peers[p].send(Message(fromPeer: f.peerInfo.peerId.data, debug "pubslishing message", topic = topic, peer = p, data = data
data: data)) await f.peers[p].send(@[RPCMsg(messages: @[msg])])
method subscribe*(f: FloodSub, method subscribe*(f: FloodSub,
topic: string, topic: string,
@ -133,3 +150,7 @@ method unsubscribe*(f: FloodSub, topics: seq[string]) {.async, gcsafe.} =
proc newFloodSub*(peerInfo: PeerInfo): FloodSub = proc newFloodSub*(peerInfo: PeerInfo): FloodSub =
new result new result
result.peerInfo = peerInfo result.peerInfo = peerInfo
result.peers = initTable[string, PubSubPeer]()
result.topics = initTable[string, Topic]()
result.peerTopics = initTable[string, HashSet[string]]()
result.init()

View File

@ -8,7 +8,7 @@
## those terms. ## those terms.
import tables, sets import tables, sets
import chronos import chronos, chronicles
import pubsubpeer, import pubsubpeer,
../protocol, ../protocol,
../../connection, ../../connection,
@ -16,8 +16,13 @@ import pubsubpeer,
export PubSubPeer export PubSubPeer
logScope:
topic = "PubSub"
type type
TopicHandler* = proc(topic:string, data: seq[byte]): Future[void] {.gcsafe.} TopicHandler* = proc (topic: string,
data: seq[byte]): Future[void] {.closure, gcsafe.}
Topic* = object Topic* = object
name*: string name*: string
handler*: TopicHandler handler*: TopicHandler
@ -28,7 +33,7 @@ type
peers*: Table[string, PubSubPeer] # peerid to peer map peers*: Table[string, PubSubPeer] # peerid to peer map
peerTopics*: Table[string, HashSet[string]] # topic to remote peer map peerTopics*: Table[string, HashSet[string]] # topic to remote peer map
method subscribePeer*(p: PubSub, conn: Connection) {.base, async, gcsafe.} = method subscribeToPeer*(p: PubSub, conn: Connection) {.base, async, gcsafe.} =
## subscribe to a peer to send/receive pubsub messages ## subscribe to a peer to send/receive pubsub messages
discard discard

View File

@ -12,6 +12,7 @@ import chronos, chronicles
import ../../connection, import ../../connection,
../../protobuf/minprotobuf, ../../protobuf/minprotobuf,
../../peerinfo, ../../peerinfo,
../../peer,
rpcmsg rpcmsg
logScope: logScope:
@ -19,28 +20,40 @@ logScope:
type type
PubSubPeer* = ref object of RootObj PubSubPeer* = ref object of RootObj
peerInfo*: PeerInfo
conn*: Connection conn*: Connection
handler*: RPCHandler handler*: RPCHandler
topics*: seq[string] topics*: seq[string]
id*: string # base58 peer id string
RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.} RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.}
proc handle*(p: PubSubPeer) {.async, gcsafe.} = proc handle*(p: PubSubPeer) {.async, gcsafe.} =
debug "pubsub rpc", peer = p.id
try: try:
while not p.conn.closed: while not p.conn.closed:
let msg = decodeRpcMsg(await p.conn.readLp()) let data = await p.conn.readLp()
debug "Read data from peer", peer = p.peerInfo, data = data
let msg = decodeRpcMsg(data)
debug "Decoded msg from peer", peer = p.peerInfo, msg = msg
await p.handler(p, @[msg]) await p.handler(p, @[msg])
except: except:
debug "An exception occured while processing pubsub rpc requests", exc = getCurrentExceptionMsg() debug "An exception occured while processing pubsub rpc requests", exc = getCurrentExceptionMsg()
return return
finally: finally:
debug "closing connection to pubsub peer", peer = p.id
await p.conn.close() await p.conn.close()
proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async, gcsafe.} = proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async, gcsafe.} =
for m in msgs: for m in msgs:
await p.conn.writeLp(encodeRpcMsg(m).buffer) debug "sending msgs to peer", peer = p.id, msgs = msgs
let encoded = encodeRpcMsg(m)
if encoded.buffer.len > 0:
await p.conn.writeLp(encoded.buffer)
proc newPubSubPeer*(conn: Connection, handler: RPCHandler): PubSubPeer = proc newPubSubPeer*(conn: Connection, handler: RPCHandler): PubSubPeer =
new result new result
result.handler = handler result.handler = handler
result.conn = conn result.conn = conn
result.peerInfo = conn.peerInfo
result.id = conn.peerInfo.peerId.get().pretty()

View File

@ -8,12 +8,15 @@
## those terms. ## those terms.
import sequtils import sequtils
import chronos, nimcrypto/sysrand import chronos, nimcrypto/sysrand, chronicles
import ../../peerinfo, import ../../peerinfo,
../../peer, ../../peer,
../../crypto/crypto, ../../crypto/crypto,
../../protobuf/minprotobuf ../../protobuf/minprotobuf
logScope:
topic = "RpcMsg"
const SignPrefix = "libp2p-pubsub:" const SignPrefix = "libp2p-pubsub:"
type type
@ -47,12 +50,14 @@ proc encodeMessage(msg: Message, buff: var ProtoBuffer) {.gcsafe.} =
buff.finish() buff.finish()
proc encodeSubs(subs: SubOpts, buff: var ProtoBuffer) {.gcsafe.} = proc encodeSubs(subs: SubOpts, buff: var ProtoBuffer) {.gcsafe.} =
buff.write(initProtoField(1, ord(subs.subscribe))) buff.write(initProtoField(1, subs.subscribe))
buff.write(initProtoField(2, subs.topic)) buff.write(initProtoField(2, subs.topic))
proc encodeRpcMsg*(msg: RPCMsg): ProtoBuffer {.gcsafe.} = proc encodeRpcMsg*(msg: RPCMsg): ProtoBuffer {.gcsafe.} =
result = initProtoBuffer({WithVarintLength}) result = initProtoBuffer({WithVarintLength})
debug "encoding msg: ", msg = msg
if msg.subscriptions.len > 0:
var subs = initProtoBuffer() var subs = initProtoBuffer()
for s in msg.subscriptions: for s in msg.subscriptions:
encodeSubs(s, subs) encodeSubs(s, subs)
@ -60,6 +65,7 @@ proc encodeRpcMsg*(msg: RPCMsg): ProtoBuffer {.gcsafe.} =
subs.finish() subs.finish()
result.write(initProtoField(1, subs)) result.write(initProtoField(1, subs))
if msg.messages.len > 0:
var messages = initProtoBuffer() var messages = initProtoBuffer()
for m in msg.messages: for m in msg.messages:
encodeMessage(m, messages) encodeMessage(m, messages)
@ -115,18 +121,26 @@ proc decodeRpcMsg*(msg: seq[byte]): RPCMsg {.gcsafe.} =
if pb.getBytes(6, msg.key) < 0: if pb.getBytes(6, msg.key) < 0:
break break
var prefix {.threadvar.}: seq[byte]
proc getPreix(): var seq[byte] =
if prefix.len == 0:
prefix = cast[seq[byte]](SignPrefix)
result = prefix
proc sign*(peerId: PeerID, msg: Message): Message = proc sign*(peerId: PeerID, msg: Message): Message =
var buff = initProtoBuffer() var buff = initProtoBuffer()
var prefix = cast[seq[byte]](toSeq(SignPrefix.items)) # TODO: can we cache this?
encodeMessage(msg, buff) encodeMessage(msg, buff)
if buff.buffer.len > 0: if buff.buffer.len > 0:
result = msg result = msg
result.signature = peerId. result.signature = peerId.
privateKey. privateKey.
sign(prefix & buff.buffer). sign(getPreix() & buff.buffer).
getBytes() getBytes()
proc makeMessage*(peerId: PeerID, data: seq[byte], name: string): Message = proc makeMessage*(peerId: PeerID,
data: seq[byte],
name: string):
Message {.gcsafe.} =
var seqno: seq[byte] = newSeq[byte](20) var seqno: seq[byte] = newSeq[byte](20)
if randomBytes(addr seqno[0], 20) > 0: if randomBytes(addr seqno[0], 20) > 0:
result = Message(fromPeer: peerId.getBytes(), result = Message(fromPeer: peerId.getBytes(),

View File

@ -256,7 +256,7 @@ method write*(s: BufferStream,
## stream. ## stream.
var buf = "" var buf = ""
shallowCopy(buf, if msglen > 0: msg[0..<msglen] else: msg) shallowCopy(buf, if msglen > 0: msg[0..<msglen] else: msg)
result = s.writeHandler(cast[seq[byte]](toSeq(buf.items))) result = s.writeHandler(cast[seq[byte]](buf))
method write*(s: BufferStream, method write*(s: BufferStream,
msg: seq[byte], msg: seq[byte],

View File

@ -7,9 +7,12 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
import chronos import chronos, chronicles
import lpstream import lpstream
logScope:
topic = "ChronosStream"
type ChronosStream* = ref object of LPStream type ChronosStream* = ref object of LPStream
reader: AsyncStreamReader reader: AsyncStreamReader
writer: AsyncStreamWriter writer: AsyncStreamWriter
@ -92,6 +95,7 @@ method write*(s: ChronosStream, msg: seq[byte], msglen = -1) {.async, gcsafe.} =
method close*(s: ChronosStream) {.async, gcsafe.} = method close*(s: ChronosStream) {.async, gcsafe.} =
if not s.closed: if not s.closed:
debug "closing connection for", address = $s.client.remoteAddress()
if not s.reader.closed: if not s.reader.closed:
await s.reader.closeWait() await s.reader.closeWait()

View File

@ -22,6 +22,9 @@ import connection,
muxers/muxer, muxers/muxer,
peer peer
logScope:
topic = "Switch"
type type
NoPubSubException = object of CatchableError NoPubSubException = object of CatchableError
@ -55,7 +58,7 @@ proc secure(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} =
var n = await s.secureManagers[0].secure(conn) var n = await s.secureManagers[0].secure(conn)
result = conn result = conn
proc identify(s: Switch, conn: Connection) {.async, gcsafe.} = proc identify*(s: Switch, conn: Connection) {.async, gcsafe.} =
## identify the connection ## identify the connection
# s.peerInfo.protocols = await s.ms.list(conn) # update protos before engagin in identify # s.peerInfo.protocols = await s.ms.list(conn) # update protos before engagin in identify
try: try:
@ -105,7 +108,8 @@ proc mux(s: Switch, conn: Connection): Future[void] {.async, gcsafe.} =
# add muxer handler cleanup proc # add muxer handler cleanup proc
handlerFut.addCallback( handlerFut.addCallback(
proc(udata: pointer = nil) {.gcsafe.} = proc(udata: pointer = nil) {.gcsafe.} =
debug "mux: Muxer handler completed for peer ", peer = conn.peerInfo.peerId.get().pretty debug "mux: Muxer handler completed for peer ",
peer = conn.peerInfo.peerId.get().pretty
) )
await s.identify(stream) await s.identify(stream)
await stream.close() # close idenity stream await stream.close() # close idenity stream
@ -204,23 +208,27 @@ proc stop*(s: Switch) {.async.} =
await allFutures(s.transports.mapIt(it.close())) await allFutures(s.transports.mapIt(it.close()))
proc subscribeToPeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} = proc subscribeToPeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} =
## Subscribe to pub sub peer
if s.pubSub.isSome: if s.pubSub.isSome:
let conn = await s.dial(peerInfo, s.pubSub.get().codec) let conn = await s.dial(peerInfo, s.pubSub.get().codec)
await s.pubSub.get().subscribeToPeer(conn) await s.pubSub.get().subscribeToPeer(conn)
proc subscribe*(s: Switch, topic: string, handler: TopicHandler): Future[void] {.gcsafe.} = proc subscribe*(s: Switch, topic: string, handler: TopicHandler): Future[void] {.gcsafe.} =
## subscribe to a pubsub topic
if s.pubSub.isNone: if s.pubSub.isNone:
raise newNoPubSubException() raise newNoPubSubException()
result = s.pubSub.get().subscribe(topic, handler) result = s.pubSub.get().subscribe(topic, handler)
proc unsubscribe*(s: Switch, topics: seq[string]): Future[void] {.gcsafe.} = proc unsubscribe*(s: Switch, topics: seq[string]): Future[void] {.gcsafe.} =
## unsubscribe from topics
if s.pubSub.isNone: if s.pubSub.isNone:
raise newNoPubSubException() raise newNoPubSubException()
result = s.pubSub.get().unsubscribe(topics) result = s.pubSub.get().unsubscribe(topics)
proc publish*(s: Switch, topic: string, data: seq[byte]): Future[void] {.gcsafe.} = proc publish*(s: Switch, topic: string, data: seq[byte]): Future[void] {.gcsafe.} =
# pubslish to pubsub topic
if s.pubSub.isNone: if s.pubSub.isNone:
raise newNoPubSubException() raise newNoPubSubException()
@ -243,17 +251,20 @@ proc newSwitch*(peerInfo: PeerInfo,
let s = result # can't capture result let s = result # can't capture result
result.streamHandler = proc(stream: Connection) {.async, gcsafe.} = result.streamHandler = proc(stream: Connection) {.async, gcsafe.} =
# TODO: figure out proper way of handling this. debug "handling connection for", peerInfo = stream.peerInfo
# Perhaps it's ok to discard this Future and handle
# errors elsewere?
await s.ms.handle(stream) # handle incoming connection await s.ms.handle(stream) # handle incoming connection
result.mount(identity) result.mount(identity)
for key, val in muxers: for key, val in muxers:
val.streamHandler = result.streamHandler val.streamHandler = result.streamHandler
val.muxerHandler = proc(muxer: Muxer) {.async, gcsafe.} =
debug "got new muxer"
let stream = await muxer.newStream()
await s.identify(stream)
result.mount(val) result.mount(val)
for s in secureManagers: for s in secureManagers.deduplicate():
debug "adding secure manager ", codec = s.codec debug "adding secure manager ", codec = s.codec
result.secureManagers.add(s) result.secureManagers.add(s)
result.mount(s) result.mount(s)

View File

@ -1,4 +1,4 @@
import tables import tables, options, sequtils
import chronos, chronicles import chronos, chronicles
import ../libp2p/switch, import ../libp2p/switch,
../libp2p/multistream, ../libp2p/multistream,
@ -14,21 +14,23 @@ import ../libp2p/switch,
../libp2p/muxers/mplex/mplex, ../libp2p/muxers/mplex/mplex,
../libp2p/muxers/mplex/types, ../libp2p/muxers/mplex/types,
../libp2p/protocols/secure/secure, ../libp2p/protocols/secure/secure,
../libp2p/protocols/secure/secio ../libp2p/protocols/secure/secio,
../libp2p/protocols/pubsub/pubsub,
../libp2p/protocols/pubsub/floodsub
type type
TestProto = ref object of LPProtocol TestProto = ref object of LPProtocol
switch*: Switch
method init(p: TestProto) {.gcsafe.} = method init(p: TestProto) {.gcsafe.} =
proc handle(stream: Connection, proto: string) {.async, gcsafe.} = proc handle(stream: Connection, proto: string) {.async, gcsafe.} = discard
await stream.writeLp("Hello from handler")
await stream.close()
p.codec = "/test/proto/1.0.0" p.codec = "/test/proto/1.0.0"
p.handler = handle p.handler = handle
proc newTestProto(): TestProto = proc newTestProto(switch: Switch): TestProto =
new result new result
result.switch = switch
result.init() result.init()
proc main() {.async.} = proc main() {.async.} =
@ -36,7 +38,7 @@ proc main() {.async.} =
let seckey = PrivateKey.random(RSA) let seckey = PrivateKey.random(RSA)
var peerInfo: PeerInfo var peerInfo: PeerInfo
peerInfo.peerId = PeerID.init(seckey) peerInfo.peerId = some(PeerID.init(seckey))
peerInfo.addrs.add(Multiaddress.init("/ip4/127.0.0.1/tcp/55055")) peerInfo.addrs.add(Multiaddress.init("/ip4/127.0.0.1/tcp/55055"))
proc createMplex(conn: Connection): Muxer = proc createMplex(conn: Connection): Muxer =
@ -46,20 +48,32 @@ proc main() {.async.} =
let transports = @[Transport(newTransport(TcpTransport))] let transports = @[Transport(newTransport(TcpTransport))]
let muxers = [(MplexCodec, mplexProvider)].toTable() let muxers = [(MplexCodec, mplexProvider)].toTable()
let identify = newIdentify(peerInfo) let identify = newIdentify(peerInfo)
let switch = newSwitch(peerInfo, transports, identify, muxers, @[Secure(newSecIo(seckey.getKey()))]) # let secureManagers = @[Secure(newSecIo(seckey.getKey()))]
let pubSub = some(PubSub(newFloodSub(peerInfo)))
let switch = newSwitch(peerInfo, transports, identify, muxers, pubSub = pubSub)
await switch.start() var libp2pFuts = await switch.start()
echo "Right after start"
for item in libp2pFuts:
echo item.finished
var remotePeer: PeerInfo var remotePeer: PeerInfo
remotePeer.peerId = PeerID.init("QmUA1Ghihi5u3gDwEDxhbu49jU42QPbvHttZFwB6b4K5oC") remotePeer.peerId = some(PeerID.init("QmUA1Ghihi5u3gDwEDxhbu49jU42QPbvHttZFwB6b4K5oC"))
remotePeer.addrs.add(ma) remotePeer.addrs.add(ma)
switch.mount(newTestProto()) switch.mount(newTestProto(switch))
echo "PeerID: " & peerInfo.peerId.pretty echo "PeerID: " & peerInfo.peerId.get().pretty
let conn = await switch.dial(remotePeer, "/test/proto/1.0.0") # let conn = await switch.dial(remotePeer, "/test/proto/1.0.0")
await conn.writeLp("Hello from dialer!") await switch.subscribeToPeer(remotePeer)
let msg = cast[string](await conn.readLp())
echo msg proc handler(topic: string, data: seq[byte]): Future[void] {.closure, gcsafe.} =
await conn.close() debug "IN HANDLER"
await switch.subscribe("mytesttopic", handler)
# let msg = cast[seq[byte]]("hello from nim")
# await switch.publish("mytesttopic", msg)
# debug "published message from test"
# TODO: for some reason the connection closes unless I do a forever loop
await allFutures(libp2pFuts)
waitFor(main()) waitFor(main())

View File

@ -185,7 +185,7 @@ suite "BufferStream":
let buff = newBufferStream(writeHandler, 10) let buff = newBufferStream(writeHandler, 10)
check buff.len == 0 check buff.len == 0
await buff.write(cast[seq[byte]](toSeq("Hello!".items)), 6) await buff.write(cast[seq[byte]]("Hello!"), 6)
result = true result = true
check: check:
@ -223,17 +223,17 @@ suite "BufferStream":
let buff = newBufferStream(writeHandler, 10) let buff = newBufferStream(writeHandler, 10)
check buff.len == 0 check buff.len == 0
await buff.pushTo(cast[seq[byte]](toSeq("Msg 1".items))) await buff.pushTo(cast[seq[byte]]("Msg 1"))
await buff.pushTo(cast[seq[byte]](toSeq("Msg 2".items))) await buff.pushTo(cast[seq[byte]]("Msg 2"))
await buff.pushTo(cast[seq[byte]](toSeq("Msg 3".items))) await buff.pushTo(cast[seq[byte]]("Msg 3"))
check cast[string](await buff.read(5)) == "Msg 1" check cast[string](await buff.read(5)) == "Msg 1"
check cast[string](await buff.read(5)) == "Msg 2" check cast[string](await buff.read(5)) == "Msg 2"
check cast[string](await buff.read(5)) == "Msg 3" check cast[string](await buff.read(5)) == "Msg 3"
await buff.pushTo(cast[seq[byte]](toSeq("Msg 4".items))) await buff.pushTo(cast[seq[byte]]("Msg 4"))
await buff.pushTo(cast[seq[byte]](toSeq("Msg 5".items))) await buff.pushTo(cast[seq[byte]]("Msg 5"))
await buff.pushTo(cast[seq[byte]](toSeq("Msg 6".items))) await buff.pushTo(cast[seq[byte]]("Msg 6"))
check cast[string](await buff.read(5)) == "Msg 4" check cast[string](await buff.read(5)) == "Msg 4"
check cast[string](await buff.read(5)) == "Msg 5" check cast[string](await buff.read(5)) == "Msg 5"

View File

@ -17,22 +17,24 @@ suite "Identify":
let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53360") let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53360")
let remoteSeckey = PrivateKey.random(RSA) let remoteSeckey = PrivateKey.random(RSA)
var remotePeerInfo: PeerInfo
var serverFut: Future[void]
var transport: TcpTransport
proc receiver() {.async.} = proc receiver() {.async.} =
var peerInfo: PeerInfo remotePeerInfo.peerId = some(PeerID.init(remoteSeckey))
peerInfo.peerId = PeerID.init(remoteSeckey) remotePeerInfo.addrs.add(ma)
peerInfo.addrs.add(ma) remotePeerInfo.protocols.add("/test/proto1/1.0.0")
peerInfo.protocols.add("/test/proto1/1.0.0") remotePeerInfo.protocols.add("/test/proto2/1.0.0")
peerInfo.protocols.add("/test/proto2/1.0.0")
let identifyProto = newIdentify(peerInfo) let identifyProto = newIdentify(remotePeerInfo)
let msListen = newMultistream() let msListen = newMultistream()
msListen.addHandler(IdentifyCodec, identifyProto) msListen.addHandler(IdentifyCodec, identifyProto)
proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} = proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} =
await msListen.handle(conn) await msListen.handle(conn)
let transport: TcpTransport = newTransport(TcpTransport) transport = newTransport(TcpTransport)
await transport.listen(ma, connHandler) serverFut = await transport.listen(ma, connHandler)
proc sender() {.async.} = proc sender() {.async.} =
let msDial = newMultistream() let msDial = newMultistream()
@ -41,22 +43,24 @@ suite "Identify":
let seckey = PrivateKey.random(RSA) let seckey = PrivateKey.random(RSA)
var peerInfo: PeerInfo var peerInfo: PeerInfo
peerInfo.peerId = PeerID.init(seckey) peerInfo.peerId = some(PeerID.init(seckey))
peerInfo.addrs.add(ma) peerInfo.addrs.add(ma)
let identifyProto = newIdentify(peerInfo) let identifyProto = newIdentify(peerInfo)
let res = await msDial.select(conn, IdentifyCodec) let res = await msDial.select(conn, IdentifyCodec)
let id = await identifyProto.identify(conn, remotePeerInfo)
let id = await identifyProto.identify(conn) check id.pubKey.get() == remoteSeckey.getKey()
await conn.close()
check id.pubKey == remoteSeckey.getKey()
check id.addrs[0] == ma check id.addrs[0] == ma
check id.protoVersion == ProtoVersion check id.protoVersion.get() == ProtoVersion
check id.agentVersion == AgentVersion # check id.agentVersion.get() == AgentVersion
check id.protos == @["/test/proto1/1.0.0", "/test/proto2/1.0.0"] check id.protos == @["/test/proto1/1.0.0", "/test/proto2/1.0.0"]
await allFutures(receiver(), sender()) await conn.close()
await allFutures(sender(), receiver())
await transport.close()
await serverFut
result = true result = true
check: check:
@ -68,7 +72,7 @@ suite "Identify":
let remoteSeckey = PrivateKey.random(RSA) let remoteSeckey = PrivateKey.random(RSA)
var remotePeerInfo: PeerInfo var remotePeerInfo: PeerInfo
remotePeerInfo.peerId = PeerID.init(remoteSeckey) remotePeerInfo.peerId = some(PeerID.init(remoteSeckey))
remotePeerInfo.addrs.add(ma) remotePeerInfo.addrs.add(ma)
let identifyProto1 = newIdentify(remotePeerInfo) let identifyProto1 = newIdentify(remotePeerInfo)
@ -79,7 +83,7 @@ suite "Identify":
await msListen.handle(conn) await msListen.handle(conn)
let transport1: TcpTransport = newTransport(TcpTransport) let transport1: TcpTransport = newTransport(TcpTransport)
await transport1.listen(ma, connHandler) asyncCheck transport1.listen(ma, connHandler)
let msDial = newMultistream() let msDial = newMultistream()
let transport2: TcpTransport = newTransport(TcpTransport) let transport2: TcpTransport = newTransport(TcpTransport)
@ -87,7 +91,7 @@ suite "Identify":
let seckey = PrivateKey.random(RSA) let seckey = PrivateKey.random(RSA)
var localPeerInfo: PeerInfo var localPeerInfo: PeerInfo
localPeerInfo.peerId = PeerID.init(seckey) localPeerInfo.peerId = some(PeerID.init(seckey))
localPeerInfo.addrs.add(ma) localPeerInfo.addrs.add(ma)
let identifyProto2 = newIdentify(localPeerInfo) let identifyProto2 = newIdentify(localPeerInfo)
@ -95,9 +99,9 @@ suite "Identify":
let wrongSec = PrivateKey.random(RSA) let wrongSec = PrivateKey.random(RSA)
var wrongRemotePeer: PeerInfo var wrongRemotePeer: PeerInfo
wrongRemotePeer.peerId = PeerID.init(wrongSec) wrongRemotePeer.peerId = some(PeerID.init(wrongSec))
let id = await identifyProto2.identify(conn, some(wrongRemotePeer)) let id = await identifyProto2.identify(conn, wrongRemotePeer)
await conn.close() await conn.close()
expect IdentityNoMatchError: expect IdentityNoMatchError:

View File

@ -20,7 +20,7 @@ suite "Mplex":
let stream = newBufferStream(encHandler) let stream = newBufferStream(encHandler)
let conn = newConnection(stream) let conn = newConnection(stream)
await conn.writeMsg(0, MessageType.New, cast[seq[byte]](toSeq("stream 1".items))) await conn.writeMsg(0, MessageType.New, cast[seq[byte]]("stream 1"))
result = true result = true
check: check:
@ -33,7 +33,7 @@ suite "Mplex":
let stream = newBufferStream(encHandler) let stream = newBufferStream(encHandler)
let conn = newConnection(stream) let conn = newConnection(stream)
await conn.writeMsg(17, MessageType.New, cast[seq[byte]](toSeq("stream 1".items))) await conn.writeMsg(17, MessageType.New, cast[seq[byte]]("stream 1"))
result = true result = true
check: check:
@ -47,7 +47,7 @@ suite "Mplex":
let stream = newBufferStream(encHandler) let stream = newBufferStream(encHandler)
let conn = newConnection(stream) let conn = newConnection(stream)
await conn.writeMsg(0, MessageType.MsgOut, cast[seq[byte]](toSeq("stream 1".items))) await conn.writeMsg(0, MessageType.MsgOut, cast[seq[byte]]("stream 1"))
result = true result = true
check: check:
@ -61,7 +61,7 @@ suite "Mplex":
let stream = newBufferStream(encHandler) let stream = newBufferStream(encHandler)
let conn = newConnection(stream) let conn = newConnection(stream)
await conn.writeMsg(17, MessageType.MsgOut, cast[seq[byte]](toSeq("stream 1".items))) await conn.writeMsg(17, MessageType.MsgOut, cast[seq[byte]]("stream 1"))
await conn.close() await conn.close()
result = true result = true
@ -289,7 +289,7 @@ suite "Mplex":
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
let chann = newChannel(1, newConnection(newBufferStream(writeHandler)), true) let chann = newChannel(1, newConnection(newBufferStream(writeHandler)), true)
await chann.pushTo(cast[seq[byte]](toSeq("Hello!".items))) await chann.pushTo(cast[seq[byte]]("Hello!"))
await chann.close() await chann.close()
let msg = await chann.read() let msg = await chann.read()
asyncDiscard chann.read() asyncDiscard chann.read()

View File

@ -14,7 +14,7 @@ suite "TCP transport":
result = conn.write(cstring("Hello!"), 6) result = conn.write(cstring("Hello!"), 6)
let transport: TcpTransport = newTransport(TcpTransport) let transport: TcpTransport = newTransport(TcpTransport)
await transport.listen(ma, connHandler) asyncCheck await transport.listen(ma, connHandler)
let streamTransport: StreamTransport = await connect(ma) let streamTransport: StreamTransport = await connect(ma)
let msg = await streamTransport.read(6) let msg = await streamTransport.read(6)
await transport.close() await transport.close()
@ -33,7 +33,7 @@ suite "TCP transport":
check cast[string](msg) == "Hello!" check cast[string](msg) == "Hello!"
let transport: TcpTransport = newTransport(TcpTransport) let transport: TcpTransport = newTransport(TcpTransport)
await transport.listen(ma, connHandler) asyncCheck await transport.listen(ma, connHandler)
let streamTransport: StreamTransport = await connect(ma) let streamTransport: StreamTransport = await connect(ma)
let sent = await streamTransport.write("Hello!", 6) let sent = await streamTransport.write("Hello!", 6)
result = sent == 6 result = sent == 6
@ -101,7 +101,7 @@ suite "TCP transport":
result = conn.write(cstring("Hello!"), 6) result = conn.write(cstring("Hello!"), 6)
let transport1: TcpTransport = newTransport(TcpTransport) let transport1: TcpTransport = newTransport(TcpTransport)
await transport1.listen(ma, connHandler) asyncCheck await transport1.listen(ma, connHandler)
let transport2: TcpTransport = newTransport(TcpTransport) let transport2: TcpTransport = newTransport(TcpTransport)
let conn = await transport2.dial(ma) let conn = await transport2.dial(ma)
@ -121,7 +121,7 @@ suite "TCP transport":
check cast[string](msg) == "Hello!" check cast[string](msg) == "Hello!"
let transport1: TcpTransport = newTransport(TcpTransport) let transport1: TcpTransport = newTransport(TcpTransport)
await transport1.listen(ma, connHandler) asyncCheck await transport1.listen(ma, connHandler)
let transport2: TcpTransport = newTransport(TcpTransport) let transport2: TcpTransport = newTransport(TcpTransport)
let conn = await transport2.dial(ma) let conn = await transport2.dial(ma)