fix multistream 1.1 multi proto

This commit is contained in:
Giovanni Petrantoni 2020-07-08 11:52:44 +09:00
parent 57e6a95b9c
commit 0fbc0ec4b2
4 changed files with 25 additions and 30 deletions

View File

@ -7,7 +7,7 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
import strutils
import strutils, tables
import chronos, chronicles, stew/byteutils
import stream/connection,
vbuffer,
@ -29,7 +29,7 @@ type
Matcher* = proc (proto: string): bool {.gcsafe.}
HandlerHolder* = object
proto*: string
protos*: seq[string]
protocol*: LPProtocol
match*: Matcher
@ -133,15 +133,16 @@ proc handle*(m: MultistreamSelect, conn: Connection) {.async, gcsafe.} =
trace "handle: listing protos"
var protos = ""
for h in m.handlers:
protos &= (h.proto & "\n")
for proto in h.protos:
protos &= (proto & "\n")
await conn.writeLp(protos)
of Codec:
await conn.write(m.codec)
else:
for h in m.handlers:
if (not isNil(h.match) and h.match(ms)) or ms == h.proto:
if (not isNil(h.match) and h.match(ms)) or h.protos.contains(ms):
trace "found handler for", protocol = ms
await conn.writeLp((h.proto & "\n"))
await conn.writeLp((ms & "\n"))
await h.protocol.handler(conn, ms)
return
debug "no handlers for ", protocol = ms
@ -156,17 +157,11 @@ proc handle*(m: MultistreamSelect, conn: Connection) {.async, gcsafe.} =
trace "leaving multistream loop"
proc addHandler*[T: LPProtocol](m: MultistreamSelect,
codec: string,
codecs: seq[string],
protocol: T,
matcher: Matcher = nil) =
## register a protocol
# TODO: This is a bug in chronicles,
# it break if I uncomment this line.
# Which is almost the same as the
# one on the next override of addHandler
#
# trace "registering protocol", codec = codec
m.handlers.add(HandlerHolder(proto: codec,
trace "registering protocol", protos = codecs
m.handlers.add(HandlerHolder(protos: codecs,
protocol: protocol,
match: matcher))
@ -176,11 +171,11 @@ proc addHandler*[T: LPProtoHandler](m: MultistreamSelect,
matcher: Matcher = nil) =
## helper to allow registering pure handlers
trace "registering proto handler", codec = codec
trace "registering proto handler", proto = codec
let protocol = new LPProtocol
protocol.codec = codec
protocol.handler = handler
m.handlers.add(HandlerHolder(proto: codec,
m.handlers.add(HandlerHolder(protos: @[codec],
protocol: protocol,
match: matcher))

View File

@ -102,8 +102,8 @@ method init*(g: GossipSub) =
await g.handleConn(conn, proto)
g.handler = handler
g.codecs &= GossipSubCodec
g.codecs &= GossipSubCodec_11
g.codecs &= GossipSubCodec
proc replenishFanout(g: GossipSub, topic: string) =
## get fanout peers for a topic
@ -304,7 +304,7 @@ method handleDisconnect*(g: GossipSub, peer: PubSubPeer) {.async.} =
method subscribeToPeer*(p: GossipSub,
conn: Connection) {.async.} =
await procCall PubSub(p).subscribeToPeer(conn)
asyncCheck p.handleConn(conn, GossipSubCodec)
asyncCheck p.handleConn(conn, GossipSubCodec_11)
method subscribeTopic*(g: GossipSub,
topic: string,

View File

@ -357,7 +357,7 @@ proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} =
# add the muxer
for muxer in s.muxers.values:
ms.addHandler(muxer.codec, muxer)
ms.addHandler(muxer.codecs, muxer)
# handle subsequent requests
await ms.handle(sconn)
@ -476,7 +476,7 @@ proc mount*[T: LPProtocol](s: Switch, proto: T) {.gcsafe.} =
raise newException(CatchableError,
"Protocol has to define a codec string")
s.ms.addHandler(proto.codec, proto)
s.ms.addHandler(proto.codecs, proto)
proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} =
trace "starting switch for peer", peerInfo = shortLog(s.peerInfo)

View File

@ -40,7 +40,7 @@ suite "GossipSub internal":
conns &= conn
let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec_11)
gossipSub.peers[peerInfo.id].conn = conn
gossipSub.mesh[topic].incl(peerInfo.id)
@ -69,7 +69,7 @@ suite "GossipSub internal":
conns &= conn
let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec_11)
gossipSub.peers[peerInfo.id].conn = conn
gossipSub.gossipsub[topic].incl(peerInfo.id)
@ -100,7 +100,7 @@ suite "GossipSub internal":
conns &= conn
var peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec_11)
gossipSub.peers[peerInfo.id].handler = handler
gossipSub.gossipsub[topic].incl(peerInfo.id)
@ -133,7 +133,7 @@ suite "GossipSub internal":
conns &= conn
let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec_11)
gossipSub.peers[peerInfo.id].handler = handler
gossipSub.fanout[topic].incl(peerInfo.id)
@ -170,7 +170,7 @@ suite "GossipSub internal":
conns &= conn
let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec_11)
gossipSub.peers[peerInfo.id].handler = handler
gossipSub.fanout[topic1].incl(peerInfo.id)
gossipSub.fanout[topic2].incl(peerInfo.id)
@ -208,7 +208,7 @@ suite "GossipSub internal":
conns &= conn
let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec_11)
gossipSub.peers[peerInfo.id].handler = handler
if i mod 2 == 0:
gossipSub.fanout[topic].incl(peerInfo.id)
@ -221,7 +221,7 @@ suite "GossipSub internal":
conns &= conn
let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec_11)
gossipSub.peers[peerInfo.id].handler = handler
gossipSub.gossipsub[topic].incl(peerInfo.id)
@ -267,7 +267,7 @@ suite "GossipSub internal":
conns &= conn
let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec_11)
gossipSub.peers[peerInfo.id].handler = handler
if i mod 2 == 0:
gossipSub.fanout[topic].incl(peerInfo.id)
@ -309,7 +309,7 @@ suite "GossipSub internal":
conns &= conn
let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec_11)
gossipSub.peers[peerInfo.id].handler = handler
if i mod 2 == 0:
gossipSub.mesh[topic].incl(peerInfo.id)
@ -351,7 +351,7 @@ suite "GossipSub internal":
conns &= conn
let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec_11)
gossipSub.peers[peerInfo.id].handler = handler
if i mod 2 == 0:
gossipSub.mesh[topic].incl(peerInfo.id)