Cleanup rpc handler (#261)

* more cleanup

* fix tests

* merging master

* remove `withLock` as it conflicts with stdlib

* wip

* more fanout ttl

Co-authored-by: Giovanni Petrantoni <giovanni@fragcolor.xyz>
This commit is contained in:
Dmitriy Ryajov 2020-07-09 17:54:16 -06:00 committed by GitHub
parent 4c815d75e7
commit bec9a0658f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 35 additions and 30 deletions

View File

@ -199,13 +199,12 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
proc dropFanoutPeers(g: GossipSub) =
# drop peers that we haven't published to in
# GossipSubFanoutTTL seconds
var dropping = newSeq[string]()
let now = Moment.now()
for topic, val in g.lastFanoutPubSub:
for topic in toSeq(g.lastFanoutPubSub.keys):
let val = g.lastFanoutPubSub[topic]
if now > val:
dropping.add(topic)
g.fanout.del(topic)
g.lastFanoutPubSub.del(topic)
trace "dropping fanout topic", topic
libp2p_gossipsub_peers_per_topic_fanout
@ -338,8 +337,7 @@ method subscribeTopic*(g: GossipSub,
proc handleGraft(g: GossipSub,
peer: PubSubPeer,
grafts: seq[ControlGraft],
respControl: var ControlMessage) =
grafts: seq[ControlGraft]): seq[ControlPrune] =
let peerId = peer.id
for graft in grafts:
let topic = graft.topicID
@ -358,9 +356,9 @@ proc handleGraft(g: GossipSub,
else:
trace "Peer already in mesh", topic, peerId
else:
respControl.prune.add(ControlPrune(topicID: topic))
result.add(ControlPrune(topicID: topic))
else:
respControl.prune.add(ControlPrune(topicID: topic))
result.add(ControlPrune(topicID: topic))
libp2p_gossipsub_peers_per_topic_mesh
.set(g.mesh.peers(topic).int64, labelValues = [topic])
@ -459,18 +457,17 @@ method rpcHandler*(g: GossipSub,
var respControl: ControlMessage
if m.control.isSome:
var control: ControlMessage = m.control.get()
let iWant: ControlIWant = g.handleIHave(peer, control.ihave)
if iWant.messageIDs.len > 0:
respControl.iwant.add(iWant)
let messages: seq[Message] = g.handleIWant(peer, control.iwant)
g.handleGraft(peer, control.graft, respControl)
let control = m.control.get()
g.handlePrune(peer, control.prune)
respControl.iwant.add(g.handleIHave(peer, control.ihave))
respControl.prune.add(g.handleGraft(peer, control.graft))
if respControl.graft.len > 0 or respControl.prune.len > 0 or
respControl.ihave.len > 0 or respControl.iwant.len > 0:
await peer.send(@[RPCMsg(control: some(respControl), messages: messages)])
await peer.send(
@[RPCMsg(control: some(respControl),
messages: g.handleIWant(peer, control.iwant))])
method subscribe*(g: GossipSub,
topic: string,

View File

@ -88,8 +88,9 @@ proc encodeControl*(control: ControlMessage, pb: var ProtoBuffer) {.gcsafe.} =
h.encodeIHave(ihave)
# write messages to protobuf
ihave.finish()
pb.write(initProtoField(1, ihave))
if ihave.buffer.len > 0:
ihave.finish()
pb.write(initProtoField(1, ihave))
if control.iwant.len > 0:
var iwant = initProtoBuffer()
@ -97,8 +98,9 @@ proc encodeControl*(control: ControlMessage, pb: var ProtoBuffer) {.gcsafe.} =
w.encodeIWant(iwant)
# write messages to protobuf
iwant.finish()
pb.write(initProtoField(2, iwant))
if iwant.buffer.len > 0:
iwant.finish()
pb.write(initProtoField(2, iwant))
if control.graft.len > 0:
var graft = initProtoBuffer()
@ -106,8 +108,9 @@ proc encodeControl*(control: ControlMessage, pb: var ProtoBuffer) {.gcsafe.} =
g.encodeGraft(graft)
# write messages to protobuf
graft.finish()
pb.write(initProtoField(3, graft))
if graft.buffer.len > 0:
graft.finish()
pb.write(initProtoField(3, graft))
if control.prune.len > 0:
var prune = initProtoBuffer()
@ -115,8 +118,9 @@ proc encodeControl*(control: ControlMessage, pb: var ProtoBuffer) {.gcsafe.} =
p.encodePrune(prune)
# write messages to protobuf
prune.finish()
pb.write(initProtoField(4, prune))
if prune.buffer.len > 0:
prune.finish()
pb.write(initProtoField(4, prune))
proc decodeControl*(pb: var ProtoBuffer): Option[ControlMessage] {.gcsafe.} =
trace "decoding control submessage"
@ -225,9 +229,11 @@ proc encodeRpcMsg*(msg: RPCMsg): ProtoBuffer {.gcsafe.} =
for s in msg.subscriptions:
var subs = initProtoBuffer()
encodeSubs(s, subs)
# write subscriptions to protobuf
subs.finish()
result.write(initProtoField(1, subs))
if subs.buffer.len > 0:
subs.finish()
result.write(initProtoField(1, subs))
if msg.messages.len > 0:
var messages = initProtoBuffer()
@ -235,16 +241,18 @@ proc encodeRpcMsg*(msg: RPCMsg): ProtoBuffer {.gcsafe.} =
encodeMessage(m, messages)
# write messages to protobuf
messages.finish()
result.write(initProtoField(2, messages))
if messages.buffer.len > 0:
messages.finish()
result.write(initProtoField(2, messages))
if msg.control.isSome:
var control = initProtoBuffer()
msg.control.get.encodeControl(control)
# write messages to protobuf
control.finish()
result.write(initProtoField(3, control))
if control.buffer.len > 0:
control.finish()
result.write(initProtoField(3, control))
if result.buffer.len > 0:
result.finish()