explicit peering part 2
This commit is contained in:
parent
b3aebb18e9
commit
f8292f7086
|
@ -51,6 +51,8 @@ type
|
|||
mesh*: Table[string, HashSet[string]] # meshes - topic to peer
|
||||
fanout*: Table[string, HashSet[string]] # fanout - topic to peer
|
||||
gossipsub*: Table[string, HashSet[string]] # topic to peer map of all gossipsub peers
|
||||
explicit*: Table[string, HashSet[string]] # # topic to peer map of all explicit peers
|
||||
explicitPeers*: HashSet[string] # explicit (always connected/forward) peers
|
||||
lastFanoutPubSub*: Table[string, Moment] # last publish time for fanout topics
|
||||
gossip*: Table[string, seq[ControlIHave]] # pending gossip
|
||||
control*: Table[string, ControlMessage] # pending control messages
|
||||
|
@ -70,6 +72,9 @@ method init*(g: GossipSub) =
|
|||
## e.g. ``/floodsub/1.0.0``, etc...
|
||||
##
|
||||
|
||||
if conn.peerInfo.maintain:
|
||||
g.explicitPeers.incl(conn.peerInfo.id)
|
||||
|
||||
await g.handleConn(conn, proto)
|
||||
|
||||
g.handler = handler
|
||||
|
@ -288,10 +293,14 @@ method subscribeTopic*(g: GossipSub,
|
|||
trace "adding subscription for topic", peer = peerId, name = topic
|
||||
# subscribe remote peer to the topic
|
||||
g.gossipsub[topic].incl(peerId)
|
||||
if peerId in g.explicit:
|
||||
g.explicit[topic].incl(peerId)
|
||||
else:
|
||||
trace "removing subscription for topic", peer = peerId, name = topic
|
||||
# unsubscribe remote peer from the topic
|
||||
g.gossipsub[topic].excl(peerId)
|
||||
if peerId in g.explicit:
|
||||
g.explicit[topic].excl(peerId)
|
||||
|
||||
libp2p_gossipsub_peers_per_topic_gossipsub
|
||||
.set(g.gossipsub.getOrDefault(topic).len.int64, labelValues = [topic])
|
||||
|
@ -375,7 +384,8 @@ method rpcHandler*(g: GossipSub,
|
|||
trace "dropping message due to failed signature verification"
|
||||
continue
|
||||
|
||||
if not (await g.validate(msg)):
|
||||
# explicit peers skip validation!
|
||||
if not peer.peerInfo.maintain and not (await g.validate(msg)):
|
||||
trace "dropping message due to failed validation"
|
||||
continue
|
||||
|
||||
|
@ -392,6 +402,9 @@ method rpcHandler*(g: GossipSub,
|
|||
if t in g.mesh:
|
||||
toSendPeers.incl(g.mesh[t]) # get all mesh peers for topic
|
||||
|
||||
if t in g.explicit:
|
||||
toSendPeers.incl(g.explicit[t]) # always forward to explicit peers
|
||||
|
||||
if t in g.topics: # if we're subscribed to the topic
|
||||
for h in g.topics[t].handler:
|
||||
trace "calling handler for message", msg = msg.msgId,
|
||||
|
|
|
@ -415,8 +415,9 @@ proc maintainPeer(s: Switch, peerInfo: PeerInfo) {.async.} =
|
|||
tryAndWarn "explicit peer maintain":
|
||||
var conn = s.connections.getOrDefault(peerInfo.id)
|
||||
if conn.isNil or conn.closed:
|
||||
# attempt redial in this case
|
||||
discard
|
||||
# attempt re-connect in this case
|
||||
trace "explicit peering, trying to re-connect", peer=peerInfo
|
||||
await s.connect(peerInfo)
|
||||
|
||||
await sleepAsync(5.minutes) # spec recommended
|
||||
|
||||
|
|
Loading…
Reference in New Issue