More gossip cleanup (#257)

* more cleanup

* correct pubsub peer count

* close the stream first

* handle cancelation

* fix tests

* fix fanout ttl

* merging master

* remove `withLock` as it conflicts with stdlib

* fix trace build

Co-authored-by: Giovanni Petrantoni <giovanni@fragcolor.xyz>
This commit is contained in:
Dmitriy Ryajov 2020-07-09 14:21:47 -06:00 committed by GitHub
parent c720e042fc
commit 4c815d75e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 42 additions and 42 deletions

View File

@ -38,7 +38,7 @@ const GossipSubHistoryGossip* = 3
# heartbeat interval # heartbeat interval
const GossipSubHeartbeatInitialDelay* = 100.millis const GossipSubHeartbeatInitialDelay* = 100.millis
const GossipSubHeartbeatInterval* = 1.seconds const GossipSubHeartbeatInterval* = 5.seconds # TODO: per the spec it should be 1 second
# fanout ttl # fanout ttl
const GossipSubFanoutTTL* = 1.minutes const GossipSubFanoutTTL* = 1.minutes
@ -144,7 +144,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
shuffle(newPeers) shuffle(newPeers)
trace "getting peers", topic, peers = peerIds.len trace "getting peers", topic, peers = newPeers.len
for id in newPeers: for id in newPeers:
if g.mesh.peers(topic) >= GossipSubD: if g.mesh.peers(topic) >= GossipSubD:
@ -208,9 +208,6 @@ proc dropFanoutPeers(g: GossipSub) =
g.fanout.del(topic) g.fanout.del(topic)
trace "dropping fanout topic", topic trace "dropping fanout topic", topic
for topic in dropping:
g.lastFanoutPubSub.del(topic)
libp2p_gossipsub_peers_per_topic_fanout libp2p_gossipsub_peers_per_topic_fanout
.set(g.fanout.peers(topic).int64, labelValues = [topic]) .set(g.fanout.peers(topic).int64, labelValues = [topic])
@ -245,10 +242,6 @@ proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} =
trace "got gossip peers", peers = result.len trace "got gossip peers", peers = result.len
break break
if allPeers.len == 0:
trace "no peers for topic, skipping", topicID = topic
break
if id in gossipPeers: if id in gossipPeers:
continue continue
@ -284,7 +277,7 @@ proc heartbeat(g: GossipSub) {.async.} =
except CatchableError as exc: except CatchableError as exc:
trace "exception ocurred in gossipsub heartbeat", exc = exc.msg trace "exception ocurred in gossipsub heartbeat", exc = exc.msg
await sleepAsync(5.seconds) await sleepAsync(GossipSubHeartbeatInterval)
method handleDisconnect*(g: GossipSub, peer: PubSubPeer) = method handleDisconnect*(g: GossipSub, peer: PubSubPeer) =
## handle peer disconnects ## handle peer disconnects
@ -308,7 +301,7 @@ method handleDisconnect*(g: GossipSub, peer: PubSubPeer) =
.set(g.fanout.peers(t).int64, labelValues = [t]) .set(g.fanout.peers(t).int64, labelValues = [t])
method subscribePeer*(p: GossipSub, method subscribePeer*(p: GossipSub,
conn: Connection) = conn: Connection) =
procCall PubSub(p).subscribePeer(conn) procCall PubSub(p).subscribePeer(conn)
asyncCheck p.handleConn(conn, GossipSubCodec) asyncCheck p.handleConn(conn, GossipSubCodec)
@ -316,7 +309,7 @@ method subscribeTopic*(g: GossipSub,
topic: string, topic: string,
subscribe: bool, subscribe: bool,
peerId: string) {.gcsafe, async.} = peerId: string) {.gcsafe, async.} =
await procCall PubSub(g).subscribeTopic(topic, subscribe, peerId) await procCall FloodSub(g).subscribeTopic(topic, subscribe, peerId)
if subscribe: if subscribe:
trace "adding subscription for topic", peer = peerId, name = topic trace "adding subscription for topic", peer = peerId, name = topic
@ -521,6 +514,13 @@ method publish*(g: GossipSub,
g.replenishFanout(topic) g.replenishFanout(topic)
peers = g.fanout.getOrDefault(topic) peers = g.fanout.getOrDefault(topic)
# even if we couldn't publish,
# we still attempted to publish
# on the topic, so it makes sense
# to update the last topic publish
# time
g.lastFanoutPubSub[topic] = Moment.fromNow(GossipSubFanoutTTL)
let let
msg = Message.init(g.peerInfo, data, topic, g.sign) msg = Message.init(g.peerInfo, data, topic, g.sign)
msgId = g.msgIdProvider(msg) msgId = g.msgIdProvider(msg)

View File

@ -57,7 +57,7 @@ type
cleanupLock: AsyncLock cleanupLock: AsyncLock
validators*: Table[string, HashSet[ValidatorHandler]] validators*: Table[string, HashSet[ValidatorHandler]]
observers: ref seq[PubSubObserver] # ref as in smart_ptr observers: ref seq[PubSubObserver] # ref as in smart_ptr
msgIdProvider*: MsgIdProvider # Turn message into message id (not nil) msgIdProvider*: MsgIdProvider # Turn message into message id (not nil)
method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.base.} = method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.base.} =
## handle peer disconnects ## handle peer disconnects
@ -65,10 +65,10 @@ method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.base.} =
if not isNil(peer.peerInfo) and peer.id in p.peers: if not isNil(peer.peerInfo) and peer.id in p.peers:
trace "deleting peer", peer = peer.id trace "deleting peer", peer = peer.id
p.peers.del(peer.id) p.peers.del(peer.id)
trace "peer disconnected", peer = peer.id
# metrics # metrics
libp2p_pubsub_peers.set(p.peers.len.int64) libp2p_pubsub_peers.set(p.peers.len.int64)
trace "peer disconnected", peer = peer.id
proc sendSubs*(p: PubSub, proc sendSubs*(p: PubSub,
peer: PubSubPeer, peer: PubSubPeer,
@ -120,9 +120,9 @@ method rpcHandler*(p: PubSub,
trace "about to subscribe to topic", topicId = s.topic trace "about to subscribe to topic", topicId = s.topic
await p.subscribeTopic(s.topic, s.subscribe, peer.id) await p.subscribeTopic(s.topic, s.subscribe, peer.id)
proc getPeer(p: PubSub, proc getOrCreatePeer(p: PubSub,
peerInfo: PeerInfo, peerInfo: PeerInfo,
proto: string): PubSubPeer = proto: string): PubSubPeer =
if peerInfo.id in p.peers: if peerInfo.id in p.peers:
return p.peers[peerInfo.id] return p.peers[peerInfo.id]
@ -132,7 +132,6 @@ proc getPeer(p: PubSub,
p.peers[peer.id] = peer p.peers[peer.id] = peer
peer.observers = p.observers peer.observers = p.observers
libp2p_pubsub_peers.set(p.peers.len.int64)
return peer return peer
method handleConn*(p: PubSub, method handleConn*(p: PubSub,
@ -158,7 +157,7 @@ method handleConn*(p: PubSub,
# call pubsub rpc handler # call pubsub rpc handler
await p.rpcHandler(peer, msgs) await p.rpcHandler(peer, msgs)
let peer = p.getPeer(conn.peerInfo, proto) let peer = p.getOrCreatePeer(conn.peerInfo, proto)
let topics = toSeq(p.topics.keys) let topics = toSeq(p.topics.keys)
if topics.len > 0: if topics.len > 0:
await p.sendSubs(peer, topics, true) await p.sendSubs(peer, topics, true)
@ -177,23 +176,27 @@ method handleConn*(p: PubSub,
method subscribePeer*(p: PubSub, conn: Connection) {.base.} = method subscribePeer*(p: PubSub, conn: Connection) {.base.} =
if not(isNil(conn)): if not(isNil(conn)):
let peer = p.getPeer(conn.peerInfo, p.codec) let peer = p.getOrCreatePeer(conn.peerInfo, p.codec)
trace "subscribing to peer", peerId = conn.peerInfo.id trace "subscribing to peer", peerId = conn.peerInfo.id
if not peer.connected: if not peer.connected:
peer.conn = conn peer.conn = conn
method unsubscribePeer*(p: PubSub, peerInfo: PeerInfo) {.base, async.} = method unsubscribePeer*(p: PubSub, peerInfo: PeerInfo) {.base, async.} =
let peer = p.getPeer(peerInfo, p.codec) if peerInfo.id in p.peers:
trace "unsubscribing from peer", peerId = $peerInfo let peer = p.peers[peerInfo.id]
if not(isNil(peer.conn)):
await peer.conn.close()
p.handleDisconnect(peer) trace "unsubscribing from peer", peerId = $peerInfo
if not(isNil(peer.conn)):
await peer.conn.close()
proc connected*(p: PubSub, peer: PeerInfo): bool = p.handleDisconnect(peer)
let peer = p.getPeer(peer, p.codec)
if not(isNil(peer)): proc connected*(p: PubSub, peerInfo: PeerInfo): bool =
return peer.connected if peerInfo.id in p.peers:
let peer = p.peers[peerInfo.id]
if not(isNil(peer)):
return peer.connected
method unsubscribe*(p: PubSub, method unsubscribe*(p: PubSub,
topics: seq[TopicPair]) {.base, async.} = topics: seq[TopicPair]) {.base, async.} =
@ -205,6 +208,11 @@ method unsubscribe*(p: PubSub,
if h == t.handler: if h == t.handler:
p.topics[t.topic].handler.del(i) p.topics[t.topic].handler.del(i)
# make sure we delete the topic if
# no more handlers are left
if p.topics[t.topic].handler.len <= 0:
p.topics.del(t.topic)
method unsubscribe*(p: PubSub, method unsubscribe*(p: PubSub,
topic: string, topic: string,
handler: TopicHandler): Future[void] {.base.} = handler: TopicHandler): Future[void] {.base.} =

View File

@ -36,7 +36,6 @@ type
sendConn: Connection sendConn: Connection
peerInfo*: PeerInfo peerInfo*: PeerInfo
handler*: RPCHandler handler*: RPCHandler
topics*: seq[string]
sentRpcCache: TimedCache[string] # cache for already sent messages sentRpcCache: TimedCache[string] # cache for already sent messages
recvdRpcCache: TimedCache[string] # cache for already received messages recvdRpcCache: TimedCache[string] # cache for already received messages
onConnect*: AsyncEvent onConnect*: AsyncEvent

View File

@ -42,11 +42,11 @@ method initStream*(s: SecureConn) =
procCall Connection(s).initStream() procCall Connection(s).initStream()
method close*(s: SecureConn) {.async.} = method close*(s: SecureConn) {.async.} =
await procCall Connection(s).close()
if not(isNil(s.stream)): if not(isNil(s.stream)):
await s.stream.close() await s.stream.close()
await procCall Connection(s).close()
method readMessage*(c: SecureConn): Future[seq[byte]] {.async, base.} = method readMessage*(c: SecureConn): Future[seq[byte]] {.async, base.} =
doAssert(false, "Not implemented!") doAssert(false, "Not implemented!")

View File

@ -75,15 +75,10 @@ method close*(s: ChronosStream) {.async.} =
if not s.isClosed: if not s.isClosed:
trace "shutting down chronos stream", address = $s.client.remoteAddress(), trace "shutting down chronos stream", address = $s.client.remoteAddress(),
oid = s.oid oid = s.oid
# TODO: the sequence here matters
# don't move it after the connections
# close bellow
if not s.client.closed(): if not s.client.closed():
await s.client.closeWait() await s.client.closeWait()
await procCall Connection(s).close() await procCall Connection(s).close()
except CancelledError as exc: except CancelledError as exc:
raise exc raise exc
except CatchableError as exc: except CatchableError as exc:

View File

@ -302,12 +302,11 @@ proc cleanupConn(s: Switch, conn: Connection) {.async, gcsafe.} =
conn.peerInfo.close() conn.peerInfo.close()
finally: finally:
await conn.close() await conn.close()
libp2p_peers.set(s.connections.len.int64)
if lock.locked(): if lock.locked():
lock.release() lock.release()
libp2p_peers.set(s.connections.len.int64)
proc disconnect*(s: Switch, peer: PeerInfo) {.async, gcsafe.} = proc disconnect*(s: Switch, peer: PeerInfo) {.async, gcsafe.} =
let connections = s.connections.getOrDefault(peer.id) let connections = s.connections.getOrDefault(peer.id)
for connHolder in connections: for connHolder in connections:

View File

@ -32,6 +32,7 @@ suite "GossipSub internal":
gossipSub.mesh[topic] = initHashSet[string]() gossipSub.mesh[topic] = initHashSet[string]()
var conns = newSeq[Connection]() var conns = newSeq[Connection]()
gossipSub.gossipsub[topic] = initHashSet[string]()
for i in 0..<15: for i in 0..<15:
let conn = newBufferStream(noop) let conn = newBufferStream(noop)
conns &= conn conns &= conn
@ -60,6 +61,7 @@ suite "GossipSub internal":
gossipSub.mesh[topic] = initHashSet[string]() gossipSub.mesh[topic] = initHashSet[string]()
gossipSub.topics[topic] = Topic() # has to be in topics to rebalance gossipSub.topics[topic] = Topic() # has to be in topics to rebalance
gossipSub.gossipsub[topic] = initHashSet[string]()
var conns = newSeq[Connection]() var conns = newSeq[Connection]()
for i in 0..<15: for i in 0..<15:
let conn = newBufferStream(noop) let conn = newBufferStream(noop)
@ -99,7 +101,6 @@ suite "GossipSub internal":
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
gossipSub.peers[peerInfo.id].handler = handler gossipSub.peers[peerInfo.id].handler = handler
gossipSub.peers[peerInfo.id].topics &= topic
gossipSub.gossipsub[topic].incl(peerInfo.id) gossipSub.gossipsub[topic].incl(peerInfo.id)
check gossipSub.gossipsub[topic].len == 15 check gossipSub.gossipsub[topic].len == 15

View File

@ -86,7 +86,6 @@ suite "GossipSub":
nodes[1].addValidator("foobar", validator) nodes[1].addValidator("foobar", validator)
tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1 tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1
result = (await validatorFut) and (await handlerFut) result = (await validatorFut) and (await handlerFut)
await allFuturesThrowing( await allFuturesThrowing(
nodes[0].stop(), nodes[0].stop(),
@ -142,7 +141,6 @@ suite "GossipSub":
awaiters.add((await nodes[1].start())) awaiters.add((await nodes[1].start()))
await subscribeNodes(nodes) await subscribeNodes(nodes)
await nodes[1].subscribe("foo", handler) await nodes[1].subscribe("foo", handler)
await nodes[1].subscribe("bar", handler) await nodes[1].subscribe("bar", handler)