use split out pubsub

This commit is contained in:
Dmitriy Ryajov 2020-08-08 14:52:02 -06:00 committed by zah
parent 612881b95d
commit 87f983c639
3 changed files with 43 additions and 22 deletions

View File

@ -15,7 +15,7 @@ import
multiaddress, multicodec, crypto/crypto, crypto/secp,
protocols/identify, protocols/protocol],
libp2p/protocols/secure/[secure, secio],
libp2p/protocols/pubsub/[pubsub, floodsub, rpc/message, rpc/messages],
libp2p/protocols/pubsub/[pubsub, floodsub, gossipsub, rpc/message, rpc/messages],
libp2p/transports/tcptransport,
libp2p/stream/lpstream,
eth/[keys, async_utils], eth/p2p/p2p_protocol_dsl,
@ -51,6 +51,7 @@ type
# TODO Is this really needed?
Eth2Node* = ref object of RootObj
switch*: Switch
pubsub*: PubSub
discovery*: Eth2DiscoveryProtocol
wantedPeers*: int
peerPool*: PeerPool[Peer, PeerID]
@ -211,7 +212,7 @@ const
PeerScoreHighLimit* = 1000
## Max value of peer's score
ConcurrentConnections* = 4
ConcurrentConnections* = 10
## Maximum number of active concurrent connection requests.
SeenTableTimeTimeout* =
@ -274,6 +275,7 @@ template libp2pProtocol*(name: string, version: int) {.pragma.}
template `$`*(peer: Peer): string = id(peer.info)
chronicles.formatIt(Peer): $it
chronicles.formatIt(PeerID): $it
template remote*(peer: Peer): untyped =
peer.info.peerId
@ -848,11 +850,11 @@ proc onConnEvent(node: Eth2Node, peerId: PeerID, event: ConnEvent) {.async.} =
case event.kind
of ConnEventKind.Connected:
inc peer.connections
debug "Peer upgraded", peer = peerId, connections = peer.connections
debug "Peer upgraded", peer = $peerId, connections = peer.connections
if peer.connections == 1:
# Libp2p may connect multiple times to the same peer - using different
# transports or both incoming and outgoing. For now, we'll count our
# transports for both incoming and outgoing. For now, we'll count our
# "fist" encounter with the peer as the true connection, leaving the
# other connections be - libp2p limits the number of concurrent
# connections to the same peer, and only one of these connections will be
@ -860,6 +862,8 @@ proc onConnEvent(node: Eth2Node, peerId: PeerID, event: ConnEvent) {.async.} =
# * For peer limits, we might miscount the incoming vs outgoing quota
# * Protocol handshakes are wonky: we'll not necessarily use the newly
# connected transport - instead we'll just pick a random one!
node.pubsub.subscribePeer(peerId)
await performProtocolHandshakes(peer, event.incoming)
# While performing the handshake, the peer might have been disconnected -
@ -880,18 +884,21 @@ proc onConnEvent(node: Eth2Node, peerId: PeerID, event: ConnEvent) {.async.} =
of ConnEventKind.Disconnected:
dec peer.connections
debug "Peer disconnected", peer = peerId, connections = peer.connections
debug "Peer disconnected", peer = $peerId, connections = peer.connections
if peer.connections == 0:
node.pubsub.unsubscribePeer(peerId)
let fut = peer.disconnectedFut
if fut != nil:
peer.disconnectedFut = nil
fut.complete()
proc init*(T: type Eth2Node, conf: BeaconNodeConf, enrForkId: ENRForkID,
switch: Switch, ip: Option[ValidIpAddress], tcpPort, udpPort: Port,
privKey: keys.PrivateKey, rng: ref BrHmacDrbgContext): T =
switch: Switch, pubsub: PubSub, ip: Option[ValidIpAddress],
tcpPort, udpPort: Port, privKey: keys.PrivateKey,
rng: ref BrHmacDrbgContext): T =
new result
result.switch = switch
result.pubsub = pubsub
result.wantedPeers = conf.maxPeers
result.peerPool = newPeerPool[Peer, PeerID](maxPeers = conf.maxPeers)
when not defined(local_testnet):
@ -932,6 +939,7 @@ template publicKey*(node: Eth2Node): keys.PublicKey =
proc startListening*(node: Eth2Node) {.async.} =
node.discovery.open()
node.libp2pTransportLoops = await node.switch.start()
await node.pubsub.start()
proc start*(node: Eth2Node) {.async.} =
for i in 0 ..< ConcurrentConnections:
@ -1189,15 +1197,21 @@ proc createEth2Node*(rng: ref BrHmacDrbgContext, conf: BeaconNodeConf, enrForkId
# that are different from the host address (this is relevant when we
# are running behind a NAT).
var switch = newStandardSwitch(some keys.seckey, hostAddress,
triggerSelf = true, gossip = true,
sign = false, verifySignature = false,
transportFlags = {ServerFlags.ReuseAddr},
msgIdProvider = msgIdProvider,
secureManagers = [
SecureProtocol.Noise, # Only noise in ETH2!
],
rng = rng)
result = Eth2Node.init(conf, enrForkId, switch,
let pubsub = GossipSub.init(
switch = switch,
msgIdProvider = msgIdProvider,
triggerSelf = true, sign = false,
verifySignature = false).PubSub
switch.mount(pubsub)
result = Eth2Node.init(conf, enrForkId, switch, pubsub,
extIp, extTcpPort, extUdpPort,
keys.seckey.asEthKey, rng = rng)
@ -1245,7 +1259,7 @@ proc subscribe*[MsgType](node: Eth2Node,
debug "Gossip msg handler error",
msg = err.msg, len = data.len, topic, msgId = gossipId(data)
await node.switch.subscribe(topic & "_snappy", execMsgHandler)
await node.pubsub.subscribe(topic & "_snappy", execMsgHandler)
proc addValidator*[MsgType](node: Eth2Node,
topic: string,
@ -1262,10 +1276,10 @@ proc addValidator*[MsgType](node: Eth2Node,
msg = err.msg, msgId = gossipId(message.data)
return false
node.switch.addValidator(topic & "_snappy", execValidator)
node.pubsub.addValidator(topic & "_snappy", execValidator)
proc unsubscribe*(node: Eth2Node, topic: string): Future[void] =
node.switch.unsubscribeAll(topic)
node.pubsub.unsubscribeAll(topic)
proc traceMessage(fut: FutureBase, msgId: string) =
fut.addCallback do (arg: pointer):
@ -1280,5 +1294,5 @@ proc broadcast*(node: Eth2Node, topic: string, msg: auto) =
inc nbc_gossip_messages_sent
let
data = snappy.encode(SSZ.encode(msg))
var futSnappy = node.switch.publish(topic & "_snappy", data, 1.minutes)
var futSnappy = node.pubsub.publish(topic & "_snappy", data)
traceMessage(futSnappy, gossipId(data))

View File

@ -9,6 +9,7 @@ import confutils, chronicles, chronos
import libp2p/[switch, standard_setup, multiaddress, multicodec, peerinfo]
import libp2p/crypto/crypto as lcrypto
import libp2p/crypto/secp as lsecp
import libp2p/protocols/pubsub/[pubsub, gossipsub]
import eth/p2p/discoveryv5/enr as enr
import eth/p2p/discoveryv5/[protocol, discovery_db, node]
import eth/keys as ethkeys, eth/trie/db
@ -674,9 +675,14 @@ proc run(conf: InspectorConf) {.async.} =
error "Bind address is incorrect MultiAddress", address = conf.bindAddress
quit(1)
var switch = newStandardSwitch(some(seckey), hostAddress.get(),
triggerSelf = true, gossip = true,
sign = false, verifySignature = false, rng = rng)
let switch = newStandardSwitch(some(seckey), hostAddress.get(), rng = rng)
let pubsub = GossipSub.init(
switch = switch,
triggerSelf = true, sign = false,
verifySignature = false).PubSub
switch.mount(pubsub)
if len(conf.topics) > 0:
for item in conf.topics:
@ -708,17 +714,18 @@ proc run(conf: InspectorConf) {.async.} =
data: seq[byte]): Future[void] {.gcsafe.} =
result = pubsubLogger(conf, switch, resolveQueue, topic, data)
discard switch.start()
discard await switch.start()
await pubsub.start()
var topicFilters = newSeq[string]()
try:
for filter in topics:
for topic in getTopics(forkDigest.get(), filter):
await switch.subscribe(topic, pubsubTrampoline)
await pubsub.subscribe(topic, pubsubTrampoline)
topicFilters.add(topic)
trace "Subscribed to topic", topic = topic
for filter in conf.customTopics:
await switch.subscribe(filter, pubsubTrampoline)
await pubsub.subscribe(filter, pubsubTrampoline)
topicFilters.add(filter)
trace "Subscribed to custom topic", topic = filter
except CatchableError as exc:

2
vendor/nim-libp2p vendored

@ -1 +1 @@
Subproject commit 6ffd5be0598d8c1043c456a2b87bfdb753906c2e
Subproject commit d1f1e1b31ed8d45fe83250298c2c0934bce6a2d1