diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 4982723d1..eb46b1f96 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -51,6 +51,8 @@ type mesh*: Table[string, HashSet[string]] # meshes - topic to peer fanout*: Table[string, HashSet[string]] # fanout - topic to peer gossipsub*: Table[string, HashSet[string]] # topic to peer map of all gossipsub peers + explicit*: Table[string, HashSet[string]] # # topic to peer map of all explicit peers + explicitPeers*: HashSet[string] # explicit (always connected/forward) peers lastFanoutPubSub*: Table[string, Moment] # last publish time for fanout topics gossip*: Table[string, seq[ControlIHave]] # pending gossip control*: Table[string, ControlMessage] # pending control messages @@ -70,6 +72,9 @@ method init*(g: GossipSub) = ## e.g. ``/floodsub/1.0.0``, etc... ## + if conn.peerInfo.maintain: + g.explicitPeers.incl(conn.peerInfo.id) + await g.handleConn(conn, proto) g.handler = handler @@ -288,10 +293,14 @@ method subscribeTopic*(g: GossipSub, trace "adding subscription for topic", peer = peerId, name = topic # subscribe remote peer to the topic g.gossipsub[topic].incl(peerId) + if peerId in g.explicit: + g.explicit[topic].incl(peerId) else: trace "removing subscription for topic", peer = peerId, name = topic # unsubscribe remote peer from the topic g.gossipsub[topic].excl(peerId) + if peerId in g.explicit: + g.explicit[topic].excl(peerId) libp2p_gossipsub_peers_per_topic_gossipsub .set(g.gossipsub.getOrDefault(topic).len.int64, labelValues = [topic]) @@ -375,7 +384,8 @@ method rpcHandler*(g: GossipSub, trace "dropping message due to failed signature verification" continue - if not (await g.validate(msg)): + # explicit peers skip validation! + if not peer.peerInfo.maintain and not (await g.validate(msg)): trace "dropping message due to failed validation" continue @@ -392,6 +402,9 @@ method rpcHandler*(g: GossipSub, if t in g.mesh: toSendPeers.incl(g.mesh[t]) # get all mesh peers for topic + if t in g.explicit: + toSendPeers.incl(g.explicit[t]) # always forward to explicit peers + if t in g.topics: # if we're subscribed to the topic for h in g.topics[t].handler: trace "calling handler for message", msg = msg.msgId, diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 848ceff69..7d34ea08c 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -415,8 +415,9 @@ proc maintainPeer(s: Switch, peerInfo: PeerInfo) {.async.} = tryAndWarn "explicit peer maintain": var conn = s.connections.getOrDefault(peerInfo.id) if conn.isNil or conn.closed: - # attempt redial in this case - discard + # attempt re-connect in this case + trace "explicit peering, trying to re-connect", peer=peerInfo + await s.connect(peerInfo) await sleepAsync(5.minutes) # spec recommended