some light cleanup for pub/gossip sub (#273)

* move peer table out to its own file

* move peer table

* cleanup `==` and add one to peerinfo

* add peertable

* missed equality check
This commit is contained in:
Dmitriy Ryajov 2020-07-15 13:18:55 -06:00 committed by GitHub
parent b832668768
commit f35b8999b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 99 additions and 68 deletions

View File

@ -133,3 +133,19 @@ proc publicKey*(p: PeerInfo): Option[PublicKey] {.inline.} =
result = p.key result = p.key
else: else:
result = some(p.privateKey.getKey().tryGet()) result = some(p.privateKey.getKey().tryGet())
func `==`*(a, b: PeerInfo): bool =
# override equiality to support both nil and peerInfo comparisons
# this in the future will allow us to recycle refs
let
aptr = cast[pointer](a)
bptr = cast[pointer](b)
if isNil(aptr) and isNil(bptr):
return true
if isNil(aptr) or isNil(bptr):
return false
if aptr == bptr and a.peerId == b.peerId:
return true

View File

@ -12,6 +12,7 @@ import chronos, chronicles, metrics
import pubsub, import pubsub,
pubsubpeer, pubsubpeer,
timedcache, timedcache,
peertable,
rpc/[messages, message], rpc/[messages, message],
../../stream/connection, ../../stream/connection,
../../peerid, ../../peerid,

View File

@ -12,6 +12,7 @@ import chronos, chronicles, metrics
import pubsub, import pubsub,
floodsub, floodsub,
pubsubpeer, pubsubpeer,
peertable,
mcache, mcache,
timedcache, timedcache,
rpc/[messages, message], rpc/[messages, message],
@ -68,28 +69,6 @@ declareGauge(libp2p_gossipsub_peers_per_topic_gossipsub,
"gossipsub peers per topic in gossipsub", "gossipsub peers per topic in gossipsub",
labels = ["topic"]) labels = ["topic"])
func addPeer(table: var PeerTable, topic: string, peer: PubSubPeer): bool =
# returns true if the peer was added, false if it was already in the collection
not table.mgetOrPut(topic, initHashSet[PubSubPeer]()).containsOrIncl(peer)
func removePeer(table: var PeerTable, topic: string, peer: PubSubPeer) =
table.withValue(topic, peers):
peers[].excl(peer)
if peers[].len == 0:
table.del(topic)
func hasPeer(table: PeerTable, topic: string, peer: PubSubPeer): bool =
(topic in table) and (peer in table[topic])
func peers(table: PeerTable, topic: string): int =
if topic in table:
table[topic].len
else:
0
func getPeers(table: Table[string, HashSet[string]], topic: string): HashSet[string] =
table.getOrDefault(topic, initHashSet[string]())
method init*(g: GossipSub) = method init*(g: GossipSub) =
proc handler(conn: Connection, proto: string) {.async.} = proc handler(conn: Connection, proto: string) {.async.} =
## main protocol handler that gets triggered on every ## main protocol handler that gets triggered on every
@ -147,7 +126,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
# send a graft message to the peer # send a graft message to the peer
grafts.add peer grafts.add peer
discard g.mesh.addPeer(topic, peer) discard g.mesh.addPeer(topic, peer)
trace "got peer", peer = peer.id trace "got peer", peer = $peer
if g.mesh.peers(topic) > GossipSubDhi: if g.mesh.peers(topic) > GossipSubDhi:
# prune peers if we've gone over # prune peers if we've gone over
@ -331,7 +310,7 @@ proc handleGraft(g: GossipSub,
grafts: seq[ControlGraft]): seq[ControlPrune] = grafts: seq[ControlGraft]): seq[ControlPrune] =
for graft in grafts: for graft in grafts:
let topic = graft.topicID let topic = graft.topicID
trace "processing graft message", topic, peer = peer.id trace "processing graft message", topic, peer = $peer
# If they send us a graft before they send us a subscribe, what should # If they send us a graft before they send us a subscribe, what should
# we do? For now, we add them to mesh but don't add them to gossipsub. # we do? For now, we add them to mesh but don't add them to gossipsub.
@ -344,7 +323,7 @@ proc handleGraft(g: GossipSub,
if g.mesh.addPeer(topic, peer): if g.mesh.addPeer(topic, peer):
g.fanout.removePeer(topic, peer) g.fanout.removePeer(topic, peer)
else: else:
trace "Peer already in mesh", topic, peer = peer.id trace "Peer already in mesh", topic, peer = $peer
else: else:
result.add(ControlPrune(topicID: topic)) result.add(ControlPrune(topicID: topic))
else: else:
@ -355,7 +334,7 @@ proc handleGraft(g: GossipSub,
proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) = proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) =
for prune in prunes: for prune in prunes:
trace "processing prune message", peer = peer.id, trace "processing prune message", peer = $peer,
topicID = prune.topicID topicID = prune.topicID
g.mesh.removePeer(prune.topicID, peer) g.mesh.removePeer(prune.topicID, peer)
@ -366,7 +345,7 @@ proc handleIHave(g: GossipSub,
peer: PubSubPeer, peer: PubSubPeer,
ihaves: seq[ControlIHave]): ControlIWant = ihaves: seq[ControlIHave]): ControlIWant =
for ihave in ihaves: for ihave in ihaves:
trace "processing ihave message", peer = peer.id, trace "processing ihave message", peer = $peer,
topicID = ihave.topicID, topicID = ihave.topicID,
msgs = ihave.messageIDs msgs = ihave.messageIDs
@ -380,7 +359,7 @@ proc handleIWant(g: GossipSub,
iwants: seq[ControlIWant]): seq[Message] = iwants: seq[ControlIWant]): seq[Message] =
for iwant in iwants: for iwant in iwants:
for mid in iwant.messageIDs: for mid in iwant.messageIDs:
trace "processing iwant message", peer = peer.id, trace "processing iwant message", peer = $peer,
messageID = mid messageID = mid
let msg = g.mcache.get(mid) let msg = g.mcache.get(mid)
if msg.isSome: if msg.isSome:
@ -452,12 +431,13 @@ method rpcHandler*(g: GossipSub,
respControl.iwant.add(g.handleIHave(peer, control.ihave)) respControl.iwant.add(g.handleIHave(peer, control.ihave))
respControl.prune.add(g.handleGraft(peer, control.graft)) respControl.prune.add(g.handleGraft(peer, control.graft))
let messages = g.handleIWant(peer, control.iwant)
if respControl.graft.len > 0 or respControl.prune.len > 0 or if respControl.graft.len > 0 or respControl.prune.len > 0 or
respControl.ihave.len > 0 or respControl.iwant.len > 0: respControl.ihave.len > 0 or respControl.iwant.len > 0:
await peer.send( await peer.send(
@[RPCMsg(control: some(respControl), @[RPCMsg(control: some(respControl),
messages: g.handleIWant(peer, control.iwant))]) messages: messages)])
method subscribe*(g: GossipSub, method subscribe*(g: GossipSub,
topic: string, topic: string,
@ -511,8 +491,9 @@ method publish*(g: GossipSub,
msg = Message.init(g.peerInfo, data, topic, g.msgSeqno, g.sign) msg = Message.init(g.peerInfo, data, topic, g.msgSeqno, g.sign)
msgId = g.msgIdProvider(msg) msgId = g.msgIdProvider(msg)
trace "publishing on topic", trace "created new message", msg
topic, peers = peers.len, msg = msg.shortLog()
trace "publishing on topic", topic, peers = peers.len
if msgId notin g.mcache: if msgId notin g.mcache:
g.mcache.put(msgId, msg) g.mcache.put(msgId, msg)

View File

@ -0,0 +1,42 @@
## Nim-LibP2P
## Copyright (c) 2019 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import tables, sequtils, sets
import pubsubpeer
type
PeerTable* = Table[string, HashSet[PubSubPeer]]
proc hasPeerID*(t: PeerTable, topic, peerId: string): bool =
# unefficient but used only in tests!
let peers = toSeq(t.getOrDefault(topic))
peers.any do (peer: PubSubPeer) -> bool:
peer.id == peerId
func addPeer*(table: var PeerTable, topic: string, peer: PubSubPeer): bool =
# returns true if the peer was added, false if it was already in the collection
not table.mgetOrPut(topic, initHashSet[PubSubPeer]()).containsOrIncl(peer)
func removePeer*(table: var PeerTable, topic: string, peer: PubSubPeer) =
table.withValue(topic, peers):
peers[].excl(peer)
if peers[].len == 0:
table.del(topic)
func hasPeer*(table: PeerTable, topic: string, peer: PubSubPeer): bool =
(topic in table) and (peer in table[topic])
func peers*(table: PeerTable, topic: string): int =
if topic in table:
table[topic].len
else:
0
func getPeers*(table: Table[string, HashSet[string]], topic: string): HashSet[string] =
table.getOrDefault(topic, initHashSet[string]())

View File

@ -30,8 +30,6 @@ declareCounter(libp2p_pubsub_validation_failure, "pubsub failed validated messag
declarePublicCounter(libp2p_pubsub_messages_published, "published messages", labels = ["topic"]) declarePublicCounter(libp2p_pubsub_messages_published, "published messages", labels = ["topic"])
type type
PeerTable* = Table[string, HashSet[PubSubPeer]]
SendRes = tuple[published: seq[string], failed: seq[string]] # keep private SendRes = tuple[published: seq[string], failed: seq[string]] # keep private
TopicHandler* = proc(topic: string, TopicHandler* = proc(topic: string,
@ -62,20 +60,11 @@ type
msgIdProvider*: MsgIdProvider # Turn message into message id (not nil) msgIdProvider*: MsgIdProvider # Turn message into message id (not nil)
msgSeqno*: uint64 msgSeqno*: uint64
proc hasPeerID*(t: PeerTable, topic, peerId: string): bool =
# unefficient but used only in tests!
let peers = t.getOrDefault(topic)
if peers.len == 0:
false
else:
let ps = toSeq(peers)
ps.any do (peer: PubSubPeer) -> bool:
peer.id == peerId
method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.base.} = method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.base.} =
## handle peer disconnects ## handle peer disconnects
## ##
if not isNil(peer.peerInfo) and peer.id in p.peers: if not isNil(peer.peerInfo) and
peer.id in p.peers and not peer.inUse():
trace "deleting peer", peer = peer.id trace "deleting peer", peer = peer.id
p.peers.del(peer.id) p.peers.del(peer.id)
trace "peer disconnected", peer = peer.id trace "peer disconnected", peer = peer.id

View File

@ -40,6 +40,7 @@ type
recvdRpcCache: TimedCache[string] # cache for already received messages recvdRpcCache: TimedCache[string] # cache for already received messages
onConnect*: AsyncEvent onConnect*: AsyncEvent
observers*: ref seq[PubSubObserver] # ref as in smart_ptr observers*: ref seq[PubSubObserver] # ref as in smart_ptr
refs: int # how many active connections this peer has
RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.} RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.}
@ -53,27 +54,21 @@ func `==`*(a, b: PubSubPeer): bool =
let let
aptr = cast[pointer](a) aptr = cast[pointer](a)
bptr = cast[pointer](b) bptr = cast[pointer](b)
if aptr == nil:
if bptr == nil: if isNil(aptr) and isNil(bptr):
true return true
else:
false if isNil(aptr) or isNil(bptr):
elif bptr == nil: return false
false
else: if aptr == bptr and a.peerInfo == b.peerInfo:
if a.peerInfo == nil: return true
if b.peerInfo == nil:
true
else:
false
else:
if b.peerInfo == nil:
false
else:
a.peerInfo.id == b.peerInfo.id
proc id*(p: PubSubPeer): string = p.peerInfo.id proc id*(p: PubSubPeer): string = p.peerInfo.id
proc inUse*(p: PubSubPeer): bool =
p.refs > 0
proc connected*(p: PubSubPeer): bool = proc connected*(p: PubSubPeer): bool =
not(isNil(p.sendConn)) not(isNil(p.sendConn))
@ -82,6 +77,7 @@ proc `conn=`*(p: PubSubPeer, conn: Connection) =
trace "attaching send connection for peer", peer = p.id trace "attaching send connection for peer", peer = p.id
p.sendConn = conn p.sendConn = conn
p.onConnect.fire() p.onConnect.fire()
p.refs.inc()
proc conn*(p: PubSubPeer): Connection = proc conn*(p: PubSubPeer): Connection =
p.sendConn p.sendConn
@ -104,6 +100,7 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
trace "handling pubsub rpc", peer = p.id, closed = conn.closed trace "handling pubsub rpc", peer = p.id, closed = conn.closed
try: try:
try: try:
p.refs.inc()
while not conn.closed: while not conn.closed:
trace "waiting for data", peer = p.id, closed = conn.closed trace "waiting for data", peer = p.id, closed = conn.closed
let data = await conn.readLp(64 * 1024) let data = await conn.readLp(64 * 1024)
@ -141,6 +138,8 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
except CatchableError as exc: except CatchableError as exc:
trace "Exception occurred in PubSubPeer.handle", exc = exc.msg trace "Exception occurred in PubSubPeer.handle", exc = exc.msg
raise exc raise exc
finally:
p.refs.dec()
proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} = proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} =
logScope: logScope:
@ -187,6 +186,7 @@ proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} =
p.sendConn = nil p.sendConn = nil
p.onConnect.clear() p.onConnect.clear()
p.refs.dec()
raise exc raise exc
proc sendMsg*(p: PubSubPeer, proc sendMsg*(p: PubSubPeer,

View File

@ -18,7 +18,8 @@ import utils,
crypto/crypto, crypto/crypto,
protocols/pubsub/pubsub, protocols/pubsub/pubsub,
protocols/pubsub/floodsub, protocols/pubsub/floodsub,
protocols/pubsub/rpc/messages] protocols/pubsub/rpc/messages,
protocols/pubsub/peertable]
import ../helpers import ../helpers

View File

@ -19,6 +19,7 @@ import utils, ../../libp2p/[errors,
crypto/crypto, crypto/crypto,
protocols/pubsub/pubsub, protocols/pubsub/pubsub,
protocols/pubsub/gossipsub, protocols/pubsub/gossipsub,
protocols/pubsub/peertable,
protocols/pubsub/rpc/messages] protocols/pubsub/rpc/messages]
import ../helpers import ../helpers