diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index b086eda21..b5a53463a 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -70,7 +70,7 @@ method handle*(m: Mplex) {.async, gcsafe.} = if MessageType(msgType) != MessageType.New: let channels = m.getChannelList(initiator) if not channels.contains(id): - # debug "handle: Channel with id and msg type ", id = id, msg = msgType + debug "handle: Channel with id and msg type ", id = id, msg = msgType continue channel = channels[id] @@ -82,15 +82,13 @@ method handle*(m: Mplex) {.async, gcsafe.} = if not isNil(m.streamHandler): let stream = newConnection(channel) stream.peerInfo = m.connection.peerInfo - let handlerFut = m.streamHandler(newConnection(stream)) + let handlerFut = m.streamHandler(stream) - # TODO: don't use a closure? # channel cleanup routine proc cleanUpChan(udata: pointer) {.gcsafe.} = if handlerFut.finished: channel.close().addCallback( proc(udata: pointer) = - # TODO: is waitFor() OK here? channel.cleanUp() .addCallback(proc(udata: pointer) = debug "handle: cleaned up channel ", id = id)) diff --git a/libp2p/muxers/muxer.nim b/libp2p/muxers/muxer.nim index 630633253..5b77405d7 100644 --- a/libp2p/muxers/muxer.nim +++ b/libp2p/muxers/muxer.nim @@ -7,12 +7,16 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import chronos +import chronos, chronicles import ../protocols/protocol, ../connection +logScope: + topic = "Muxer" + type StreamHandler* = proc(conn: Connection): Future[void] {.gcsafe.} + MuxerHandler* = proc(muxer: Muxer): Future[void] {.gcsafe.} Muxer* = ref object of RootObj streamHandler*: StreamHandler @@ -23,6 +27,7 @@ type MuxerProvider* = ref object of LPProtocol newMuxer*: MuxerCreator streamHandler*: StreamHandler + muxerHandler*: MuxerHandler method newStream*(m: Muxer, name: string = ""): Future[Connection] {.base, async, gcsafe.} = discard method close*(m: Muxer) {.base, async, gcsafe.} = discard @@ -37,13 +42,32 @@ proc newMuxerProvider*(creator: MuxerCreator, codec: string): MuxerProvider {.gc result.init() method `=streamHandler`*(m: MuxerProvider, handler: StreamHandler) {.base, gcsafe.} = + ## new stream (channels) handler + ## + ## triggered every time there is a new + ## stream (channel) oppened over a muxed + ## connection + ## m.streamHandler = handler +method `=muxerHandler`*(m: MuxerProvider, handler: MuxerHandler) {.base, gcsafe.} = + ## new muxer (muxed connections) handler + ## + ## triggered every time there is a new muxed + ## connection created + ## + m.muxerHandler = handler + method init(c: MuxerProvider) = proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} = let muxer = c.newMuxer(conn) + if not isNil(c.muxerHandler): + debug "CALLING MUXER HANDLER" + await c.muxerHandler(muxer) + if not isNil(c.streamHandler): muxer.streamHandler = c.streamHandler - await muxer.handle() + + await muxer.handle() c.handler = handler