From 0fbc0ec4b2b4da5d3ec57c204b83df2b65370d41 Mon Sep 17 00:00:00 2001 From: Giovanni Petrantoni Date: Wed, 8 Jul 2020 11:52:44 +0900 Subject: [PATCH] fix multistream 1.1 multi proto --- libp2p/multistream.nim | 27 ++++++++++--------------- libp2p/protocols/pubsub/gossipsub11.nim | 4 ++-- libp2p/switch.nim | 4 ++-- tests/pubsub/testgossipinternal.nim | 20 +++++++++--------- 4 files changed, 25 insertions(+), 30 deletions(-) diff --git a/libp2p/multistream.nim b/libp2p/multistream.nim index 4d4006838..a53425612 100644 --- a/libp2p/multistream.nim +++ b/libp2p/multistream.nim @@ -7,7 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import strutils +import strutils, tables import chronos, chronicles, stew/byteutils import stream/connection, vbuffer, @@ -29,7 +29,7 @@ type Matcher* = proc (proto: string): bool {.gcsafe.} HandlerHolder* = object - proto*: string + protos*: seq[string] protocol*: LPProtocol match*: Matcher @@ -133,15 +133,16 @@ proc handle*(m: MultistreamSelect, conn: Connection) {.async, gcsafe.} = trace "handle: listing protos" var protos = "" for h in m.handlers: - protos &= (h.proto & "\n") + for proto in h.protos: + protos &= (proto & "\n") await conn.writeLp(protos) of Codec: await conn.write(m.codec) else: for h in m.handlers: - if (not isNil(h.match) and h.match(ms)) or ms == h.proto: + if (not isNil(h.match) and h.match(ms)) or h.protos.contains(ms): trace "found handler for", protocol = ms - await conn.writeLp((h.proto & "\n")) + await conn.writeLp((ms & "\n")) await h.protocol.handler(conn, ms) return debug "no handlers for ", protocol = ms @@ -156,17 +157,11 @@ proc handle*(m: MultistreamSelect, conn: Connection) {.async, gcsafe.} = trace "leaving multistream loop" proc addHandler*[T: LPProtocol](m: MultistreamSelect, - codec: string, + codecs: seq[string], protocol: T, matcher: Matcher = nil) = - ## register a protocol - # TODO: This is a bug in chronicles, - # it break if I uncomment this line. - # Which is almost the same as the - # one on the next override of addHandler - # - # trace "registering protocol", codec = codec - m.handlers.add(HandlerHolder(proto: codec, + trace "registering protocol", protos = codecs + m.handlers.add(HandlerHolder(protos: codecs, protocol: protocol, match: matcher)) @@ -176,11 +171,11 @@ proc addHandler*[T: LPProtoHandler](m: MultistreamSelect, matcher: Matcher = nil) = ## helper to allow registering pure handlers - trace "registering proto handler", codec = codec + trace "registering proto handler", proto = codec let protocol = new LPProtocol protocol.codec = codec protocol.handler = handler - m.handlers.add(HandlerHolder(proto: codec, + m.handlers.add(HandlerHolder(protos: @[codec], protocol: protocol, match: matcher)) diff --git a/libp2p/protocols/pubsub/gossipsub11.nim b/libp2p/protocols/pubsub/gossipsub11.nim index cb4d410b7..13865e31f 100644 --- a/libp2p/protocols/pubsub/gossipsub11.nim +++ b/libp2p/protocols/pubsub/gossipsub11.nim @@ -102,8 +102,8 @@ method init*(g: GossipSub) = await g.handleConn(conn, proto) g.handler = handler - g.codecs &= GossipSubCodec g.codecs &= GossipSubCodec_11 + g.codecs &= GossipSubCodec proc replenishFanout(g: GossipSub, topic: string) = ## get fanout peers for a topic @@ -304,7 +304,7 @@ method handleDisconnect*(g: GossipSub, peer: PubSubPeer) {.async.} = method subscribeToPeer*(p: GossipSub, conn: Connection) {.async.} = await procCall PubSub(p).subscribeToPeer(conn) - asyncCheck p.handleConn(conn, GossipSubCodec) + asyncCheck p.handleConn(conn, GossipSubCodec_11) method subscribeTopic*(g: GossipSub, topic: string, diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 188a79f70..ef0f40d55 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -357,7 +357,7 @@ proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} = # add the muxer for muxer in s.muxers.values: - ms.addHandler(muxer.codec, muxer) + ms.addHandler(muxer.codecs, muxer) # handle subsequent requests await ms.handle(sconn) @@ -476,7 +476,7 @@ proc mount*[T: LPProtocol](s: Switch, proto: T) {.gcsafe.} = raise newException(CatchableError, "Protocol has to define a codec string") - s.ms.addHandler(proto.codec, proto) + s.ms.addHandler(proto.codecs, proto) proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} = trace "starting switch for peer", peerInfo = shortLog(s.peerInfo) diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index ac33778f9..e8a382707 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -40,7 +40,7 @@ suite "GossipSub internal": conns &= conn let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo - gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) + gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec_11) gossipSub.peers[peerInfo.id].conn = conn gossipSub.mesh[topic].incl(peerInfo.id) @@ -69,7 +69,7 @@ suite "GossipSub internal": conns &= conn let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) conn.peerInfo = peerInfo - gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) + gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec_11) gossipSub.peers[peerInfo.id].conn = conn gossipSub.gossipsub[topic].incl(peerInfo.id) @@ -100,7 +100,7 @@ suite "GossipSub internal": conns &= conn var peerInfo = randomPeerInfo() conn.peerInfo = peerInfo - gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) + gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec_11) gossipSub.peers[peerInfo.id].handler = handler gossipSub.gossipsub[topic].incl(peerInfo.id) @@ -133,7 +133,7 @@ suite "GossipSub internal": conns &= conn let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) conn.peerInfo = peerInfo - gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) + gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec_11) gossipSub.peers[peerInfo.id].handler = handler gossipSub.fanout[topic].incl(peerInfo.id) @@ -170,7 +170,7 @@ suite "GossipSub internal": conns &= conn let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo - gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) + gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec_11) gossipSub.peers[peerInfo.id].handler = handler gossipSub.fanout[topic1].incl(peerInfo.id) gossipSub.fanout[topic2].incl(peerInfo.id) @@ -208,7 +208,7 @@ suite "GossipSub internal": conns &= conn let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo - gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) + gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec_11) gossipSub.peers[peerInfo.id].handler = handler if i mod 2 == 0: gossipSub.fanout[topic].incl(peerInfo.id) @@ -221,7 +221,7 @@ suite "GossipSub internal": conns &= conn let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo - gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) + gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec_11) gossipSub.peers[peerInfo.id].handler = handler gossipSub.gossipsub[topic].incl(peerInfo.id) @@ -267,7 +267,7 @@ suite "GossipSub internal": conns &= conn let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo - gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) + gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec_11) gossipSub.peers[peerInfo.id].handler = handler if i mod 2 == 0: gossipSub.fanout[topic].incl(peerInfo.id) @@ -309,7 +309,7 @@ suite "GossipSub internal": conns &= conn let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo - gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) + gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec_11) gossipSub.peers[peerInfo.id].handler = handler if i mod 2 == 0: gossipSub.mesh[topic].incl(peerInfo.id) @@ -351,7 +351,7 @@ suite "GossipSub internal": conns &= conn let peerInfo = randomPeerInfo() conn.peerInfo = peerInfo - gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) + gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec_11) gossipSub.peers[peerInfo.id].handler = handler if i mod 2 == 0: gossipSub.mesh[topic].incl(peerInfo.id)