377 lines
12 KiB
Nim
Raw Normal View History

2019-09-09 20:15:52 -06:00
## Nim-LibP2P
2019-09-24 11:48:23 -06:00
## Copyright (c) 2019 Status Research & Development GmbH
2019-09-09 20:15:52 -06:00
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/[tables, sequtils, sets]
import chronos, chronicles, metrics
2019-09-09 20:15:52 -06:00
import pubsubpeer,
rpc/[message, messages],
../../switch,
2019-09-09 20:15:52 -06:00
../protocol,
../../stream/connection,
../../peerid,
../../peerinfo,
../../errors
2019-09-09 20:15:52 -06:00
Gossip one one (#240) * allow multiple codecs per protocol (without breaking things) * add 1.1 protocol to gossip * explicit peering part 1 * explicit peering part 2 * explicit peering part 3 * PeerInfo and ControlPrune protocols * fix encodePrune * validated always, even explicit peers * prune by score (score is stub still) * add a way to pass parameters to gossip * standard setup fixes * take into account explicit direct peers in publish * add floodPublish logic * small fixes, publish still half broken * make sure to waitsub in sparse test * use var semantics to optimize table access * wip... lvalues don't work properly sadly... * big publish refactor, replenish and balance * fix internal tests * use g.peers for fanout (todo: don't include flood peers) * exclude non gossip from fanout * internal test fixes * fix flood tests * fix test's trypublish * test interop fixes * make sure to not remove peers from gossip table * restore old replenishFanout * cleanups * restore utility module import * restore trace vs debug in gossip * improve fanout replenish behavior further * triage publish nil peers (issue is on master too but just hidden behind a if/in) * getGossipPeers fixes * remove topics from pubsubpeer (was unused) * simplify rebalanceMesh (following spec) and make it finally reach D_high * better diagnostics * merge new pubsubpeer, copy 1.1 to new module * fix up merge * conditional enable gossip11 module * add back topics in peers, re-enable flood publish * add more heartbeat locking to prevent races * actually lock the heartbeat * minor fixes * with sugar * merge 1.0 * remove assertion in publish * fix multistream 1.1 multi proto * Fix merge oops * wip * fix gossip 11 upstream * gossipsub11 -> gossipsub * support interop testing * tests fixing * fix directchat build * control prune updates (pb) * wip parameters * gossip internal tests fixes * parameters wip * finishup with params * cleanups/wip * small sugar * grafted and pruned procs * wip updateScores * wip * fix logging issue * pubsubpeer, chronicles explicit override * fix internal gossip tests * wip * tables troubleshooting * score wip * score wip * fixes * fix test utils generateNodes * don't delete while iterating in score update * fix grafted defect * add a handleConnect in subscribeTopic * pruning improvements * wip * score fixes * post merge - builds gossip tests * further merge fixes * rebalance improvements and opportunistic grafting * fix test for now * restore explicit peering * implement peer exchange graft message * add an hard cap to PX * backoff time management * IWANT cap/budget * Adaptive gossip dissemination * outbound mesh quota, internal tests fixing * oversub prune score based, finish outbound quota * finishup with score and ihave budget * use go daemon 0.3.0 * import fixes * byScore cleanup score sorting * remove pointless scaling in `/` Duration operator * revert using libp2p org for daemon * interop fixes * fixes and cleanup * remove heartbeat assertion, minor debug fixes * logging improvements and cleaning up * (to revert) add some traces * add explicit topic to gossip rpcs * pubsub merge fixes and type fix in switch * Revert "(to revert) add some traces" This reverts commit 4663eaab6cc336c81cee50bc54025cf0b7bcbd99. * cleanup some now irrelevant todo * shuffle peers anyway as score might be disabled * add missing shuffle * old merge fix * more merge fixes * debug improvements * re-enable gossip internal tests * add gossip10 fallback (dormant but tested) * split gossipsub internal tests into 1.0 and 1.1 Co-authored-by: Dmitriy Ryajov <dryajov@gmail.com>
2020-09-21 18:16:29 +09:00
import metrics
import stew/results
export results
2019-09-09 20:15:52 -06:00
export PubSubPeer
export PubSubObserver
Gossip one one (#240) * allow multiple codecs per protocol (without breaking things) * add 1.1 protocol to gossip * explicit peering part 1 * explicit peering part 2 * explicit peering part 3 * PeerInfo and ControlPrune protocols * fix encodePrune * validated always, even explicit peers * prune by score (score is stub still) * add a way to pass parameters to gossip * standard setup fixes * take into account explicit direct peers in publish * add floodPublish logic * small fixes, publish still half broken * make sure to waitsub in sparse test * use var semantics to optimize table access * wip... lvalues don't work properly sadly... * big publish refactor, replenish and balance * fix internal tests * use g.peers for fanout (todo: don't include flood peers) * exclude non gossip from fanout * internal test fixes * fix flood tests * fix test's trypublish * test interop fixes * make sure to not remove peers from gossip table * restore old replenishFanout * cleanups * restore utility module import * restore trace vs debug in gossip * improve fanout replenish behavior further * triage publish nil peers (issue is on master too but just hidden behind a if/in) * getGossipPeers fixes * remove topics from pubsubpeer (was unused) * simplify rebalanceMesh (following spec) and make it finally reach D_high * better diagnostics * merge new pubsubpeer, copy 1.1 to new module * fix up merge * conditional enable gossip11 module * add back topics in peers, re-enable flood publish * add more heartbeat locking to prevent races * actually lock the heartbeat * minor fixes * with sugar * merge 1.0 * remove assertion in publish * fix multistream 1.1 multi proto * Fix merge oops * wip * fix gossip 11 upstream * gossipsub11 -> gossipsub * support interop testing * tests fixing * fix directchat build * control prune updates (pb) * wip parameters * gossip internal tests fixes * parameters wip * finishup with params * cleanups/wip * small sugar * grafted and pruned procs * wip updateScores * wip * fix logging issue * pubsubpeer, chronicles explicit override * fix internal gossip tests * wip * tables troubleshooting * score wip * score wip * fixes * fix test utils generateNodes * don't delete while iterating in score update * fix grafted defect * add a handleConnect in subscribeTopic * pruning improvements * wip * score fixes * post merge - builds gossip tests * further merge fixes * rebalance improvements and opportunistic grafting * fix test for now * restore explicit peering * implement peer exchange graft message * add an hard cap to PX * backoff time management * IWANT cap/budget * Adaptive gossip dissemination * outbound mesh quota, internal tests fixing * oversub prune score based, finish outbound quota * finishup with score and ihave budget * use go daemon 0.3.0 * import fixes * byScore cleanup score sorting * remove pointless scaling in `/` Duration operator * revert using libp2p org for daemon * interop fixes * fixes and cleanup * remove heartbeat assertion, minor debug fixes * logging improvements and cleaning up * (to revert) add some traces * add explicit topic to gossip rpcs * pubsub merge fixes and type fix in switch * Revert "(to revert) add some traces" This reverts commit 4663eaab6cc336c81cee50bc54025cf0b7bcbd99. * cleanup some now irrelevant todo * shuffle peers anyway as score might be disabled * add missing shuffle * old merge fix * more merge fixes * debug improvements * re-enable gossip internal tests * add gossip10 fallback (dormant but tested) * split gossipsub internal tests into 1.0 and 1.1 Co-authored-by: Dmitriy Ryajov <dryajov@gmail.com>
2020-09-21 18:16:29 +09:00
export protocol
2019-09-09 20:15:52 -06:00
2019-09-11 20:10:38 -06:00
logScope:
topics = "pubsub"
2019-09-11 20:10:38 -06:00
declareGauge(libp2p_pubsub_peers, "pubsub peer instances")
declareGauge(libp2p_pubsub_topics, "pubsub subscribed topics")
declareCounter(libp2p_pubsub_validation_success, "pubsub successfully validated messages")
declareCounter(libp2p_pubsub_validation_failure, "pubsub failed validated messages")
when defined(libp2p_expensive_metrics):
declarePublicCounter(libp2p_pubsub_messages_published, "published messages", labels = ["topic"])
2019-09-09 20:15:52 -06:00
type
TopicHandler* = proc(topic: string,
data: seq[byte]): Future[void] {.gcsafe.}
ValidatorHandler* = proc(topic: string,
message: Message): Future[bool] {.gcsafe, closure.}
TopicPair* = tuple[topic: string, handler: TopicHandler]
2019-09-11 20:10:38 -06:00
MsgIdProvider* =
proc(m: Message): string {.noSideEffect, raises: [Defect], nimcall, gcsafe.}
2019-09-09 20:15:52 -06:00
Topic* = object
Gossip one one (#240) * allow multiple codecs per protocol (without breaking things) * add 1.1 protocol to gossip * explicit peering part 1 * explicit peering part 2 * explicit peering part 3 * PeerInfo and ControlPrune protocols * fix encodePrune * validated always, even explicit peers * prune by score (score is stub still) * add a way to pass parameters to gossip * standard setup fixes * take into account explicit direct peers in publish * add floodPublish logic * small fixes, publish still half broken * make sure to waitsub in sparse test * use var semantics to optimize table access * wip... lvalues don't work properly sadly... * big publish refactor, replenish and balance * fix internal tests * use g.peers for fanout (todo: don't include flood peers) * exclude non gossip from fanout * internal test fixes * fix flood tests * fix test's trypublish * test interop fixes * make sure to not remove peers from gossip table * restore old replenishFanout * cleanups * restore utility module import * restore trace vs debug in gossip * improve fanout replenish behavior further * triage publish nil peers (issue is on master too but just hidden behind a if/in) * getGossipPeers fixes * remove topics from pubsubpeer (was unused) * simplify rebalanceMesh (following spec) and make it finally reach D_high * better diagnostics * merge new pubsubpeer, copy 1.1 to new module * fix up merge * conditional enable gossip11 module * add back topics in peers, re-enable flood publish * add more heartbeat locking to prevent races * actually lock the heartbeat * minor fixes * with sugar * merge 1.0 * remove assertion in publish * fix multistream 1.1 multi proto * Fix merge oops * wip * fix gossip 11 upstream * gossipsub11 -> gossipsub * support interop testing * tests fixing * fix directchat build * control prune updates (pb) * wip parameters * gossip internal tests fixes * parameters wip * finishup with params * cleanups/wip * small sugar * grafted and pruned procs * wip updateScores * wip * fix logging issue * pubsubpeer, chronicles explicit override * fix internal gossip tests * wip * tables troubleshooting * score wip * score wip * fixes * fix test utils generateNodes * don't delete while iterating in score update * fix grafted defect * add a handleConnect in subscribeTopic * pruning improvements * wip * score fixes * post merge - builds gossip tests * further merge fixes * rebalance improvements and opportunistic grafting * fix test for now * restore explicit peering * implement peer exchange graft message * add an hard cap to PX * backoff time management * IWANT cap/budget * Adaptive gossip dissemination * outbound mesh quota, internal tests fixing * oversub prune score based, finish outbound quota * finishup with score and ihave budget * use go daemon 0.3.0 * import fixes * byScore cleanup score sorting * remove pointless scaling in `/` Duration operator * revert using libp2p org for daemon * interop fixes * fixes and cleanup * remove heartbeat assertion, minor debug fixes * logging improvements and cleaning up * (to revert) add some traces * add explicit topic to gossip rpcs * pubsub merge fixes and type fix in switch * Revert "(to revert) add some traces" This reverts commit 4663eaab6cc336c81cee50bc54025cf0b7bcbd99. * cleanup some now irrelevant todo * shuffle peers anyway as score might be disabled * add missing shuffle * old merge fix * more merge fixes * debug improvements * re-enable gossip internal tests * add gossip10 fallback (dormant but tested) * split gossipsub internal tests into 1.0 and 1.1 Co-authored-by: Dmitriy Ryajov <dryajov@gmail.com>
2020-09-21 18:16:29 +09:00
# make this a variant type if one day we have different Params structs
2019-09-09 20:15:52 -06:00
name*: string
handler*: seq[TopicHandler]
2019-09-09 20:15:52 -06:00
PubSub* = ref object of LPProtocol
switch*: Switch # the switch used to dial/connect to peers
peerInfo*: PeerInfo # this peer's info
topics*: Table[string, Topic] # local topics
peers*: Table[PeerID, PubSubPeer] ##\
## Peers that we are interested to gossip with (but not necessarily
## yet connected to)
triggerSelf*: bool # trigger own local handler on publish
verifySignature*: bool # enable signature verification
sign*: bool # enable message signing
validators*: Table[string, HashSet[ValidatorHandler]]
observers: ref seq[PubSubObserver] # ref as in smart_ptr
msgIdProvider*: MsgIdProvider # Turn message into message id (not nil)
msgSeqno*: uint64
2019-09-09 20:15:52 -06:00
method unsubscribePeer*(p: PubSub, peerId: PeerID) {.base.} =
## handle peer disconnects
##
trace "unsubscribing pubsub peer", peerId
p.peers.del(peerId)
libp2p_pubsub_peers.set(p.peers.len.int64)
proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg) =
## Attempt to send `msg` to remote peer
##
trace "sending pubsub message to peer", peer, msg = shortLog(msg)
peer.send(msg)
proc broadcast*(
p: PubSub,
sendPeers: openArray[PubSubPeer],
msg: RPCMsg) = # raises: [Defect]
## Attempt to send `msg` to the given peers
trace "broadcasting messages to peers",
peers = sendPeers.len, msg = shortLog(msg)
for peer in sendPeers:
p.send(peer, msg)
PubSub (Gossip & Flood) Implementation (#36) This adds gossipsub and floodsub, as well as basic interop testing with the go libp2p daemon. * add close event * wip: gossipsub * splitting rpc message * making message handling more consistent * initial gossipsub implementation * feat: nim 1.0 cleanup * wip: gossipsub protobuf * adding encoding/decoding of gossipsub messages * add disconnect handler * add proper gossipsub msg handling * misc: cleanup for nim 1.0 * splitting floodsub and gossipsub tests * feat: add mesh rebalansing * test pubsub * add mesh rebalansing tests * testing mesh maintenance * finishing mcache implementatin * wip: commenting out broken tests * wip: don't run heartbeat for now * switchout debug for trace logging * testing gossip peer selection algorithm * test stream piping * more work around message amplification * get the peerid from message * use timed cache as backing store * allow setting timeout in constructor * several changes to improve performance * more through testing of msg amplification * prevent gc issues * allow piping to self and prevent deadlocks * improove floodsub * allow running hook on cache eviction * prevent race conditions * prevent race conditions and improove tests * use hashes as cache keys * removing useless file * don't create a new seq * re-enable pubsub tests * fix imports * reduce number of runs to speed up tests * break out control message processing * normalize sleeps between steps * implement proper transport filtering * initial interop testing * clean up floodsub publish logic * allow dialing without a protocol * adding multiple reads/writes * use protobuf varint in mplex * don't loose conn's peerInfo * initial interop pubsub tests * don't duplicate connections/peers * bring back interop tests * wip: interop * re-enable interop and daemon tests * add multiple read write tests from handlers * don't cleanup channel prematurely * use correct channel to send/receive msgs * adjust tests with latest changes * include interop tests * remove temp logging output * fix ci * use correct public key serialization * additional tests for pubsub interop
2019-12-05 20:16:18 -06:00
proc sendSubs*(p: PubSub,
peer: PubSubPeer,
topics: seq[string],
subscribe: bool) =
PubSub (Gossip & Flood) Implementation (#36) This adds gossipsub and floodsub, as well as basic interop testing with the go libp2p daemon. * add close event * wip: gossipsub * splitting rpc message * making message handling more consistent * initial gossipsub implementation * feat: nim 1.0 cleanup * wip: gossipsub protobuf * adding encoding/decoding of gossipsub messages * add disconnect handler * add proper gossipsub msg handling * misc: cleanup for nim 1.0 * splitting floodsub and gossipsub tests * feat: add mesh rebalansing * test pubsub * add mesh rebalansing tests * testing mesh maintenance * finishing mcache implementatin * wip: commenting out broken tests * wip: don't run heartbeat for now * switchout debug for trace logging * testing gossip peer selection algorithm * test stream piping * more work around message amplification * get the peerid from message * use timed cache as backing store * allow setting timeout in constructor * several changes to improve performance * more through testing of msg amplification * prevent gc issues * allow piping to self and prevent deadlocks * improove floodsub * allow running hook on cache eviction * prevent race conditions * prevent race conditions and improove tests * use hashes as cache keys * removing useless file * don't create a new seq * re-enable pubsub tests * fix imports * reduce number of runs to speed up tests * break out control message processing * normalize sleeps between steps * implement proper transport filtering * initial interop testing * clean up floodsub publish logic * allow dialing without a protocol * adding multiple reads/writes * use protobuf varint in mplex * don't loose conn's peerInfo * initial interop pubsub tests * don't duplicate connections/peers * bring back interop tests * wip: interop * re-enable interop and daemon tests * add multiple read write tests from handlers * don't cleanup channel prematurely * use correct channel to send/receive msgs * adjust tests with latest changes * include interop tests * remove temp logging output * fix ci * use correct public key serialization * additional tests for pubsub interop
2019-12-05 20:16:18 -06:00
## send subscriptions to remote peer
p.send(peer, RPCMsg.withSubs(topics, subscribe))
PubSub (Gossip & Flood) Implementation (#36) This adds gossipsub and floodsub, as well as basic interop testing with the go libp2p daemon. * add close event * wip: gossipsub * splitting rpc message * making message handling more consistent * initial gossipsub implementation * feat: nim 1.0 cleanup * wip: gossipsub protobuf * adding encoding/decoding of gossipsub messages * add disconnect handler * add proper gossipsub msg handling * misc: cleanup for nim 1.0 * splitting floodsub and gossipsub tests * feat: add mesh rebalansing * test pubsub * add mesh rebalansing tests * testing mesh maintenance * finishing mcache implementatin * wip: commenting out broken tests * wip: don't run heartbeat for now * switchout debug for trace logging * testing gossip peer selection algorithm * test stream piping * more work around message amplification * get the peerid from message * use timed cache as backing store * allow setting timeout in constructor * several changes to improve performance * more through testing of msg amplification * prevent gc issues * allow piping to self and prevent deadlocks * improove floodsub * allow running hook on cache eviction * prevent race conditions * prevent race conditions and improove tests * use hashes as cache keys * removing useless file * don't create a new seq * re-enable pubsub tests * fix imports * reduce number of runs to speed up tests * break out control message processing * normalize sleeps between steps * implement proper transport filtering * initial interop testing * clean up floodsub publish logic * allow dialing without a protocol * adding multiple reads/writes * use protobuf varint in mplex * don't loose conn's peerInfo * initial interop pubsub tests * don't duplicate connections/peers * bring back interop tests * wip: interop * re-enable interop and daemon tests * add multiple read write tests from handlers * don't cleanup channel prematurely * use correct channel to send/receive msgs * adjust tests with latest changes * include interop tests * remove temp logging output * fix ci * use correct public key serialization * additional tests for pubsub interop
2019-12-05 20:16:18 -06:00
method subscribeTopic*(p: PubSub,
topic: string,
subscribe: bool,
peer: PubSubPeer) {.base.} =
# called when remote peer subscribes to a topic
discard
PubSub (Gossip & Flood) Implementation (#36) This adds gossipsub and floodsub, as well as basic interop testing with the go libp2p daemon. * add close event * wip: gossipsub * splitting rpc message * making message handling more consistent * initial gossipsub implementation * feat: nim 1.0 cleanup * wip: gossipsub protobuf * adding encoding/decoding of gossipsub messages * add disconnect handler * add proper gossipsub msg handling * misc: cleanup for nim 1.0 * splitting floodsub and gossipsub tests * feat: add mesh rebalansing * test pubsub * add mesh rebalansing tests * testing mesh maintenance * finishing mcache implementatin * wip: commenting out broken tests * wip: don't run heartbeat for now * switchout debug for trace logging * testing gossip peer selection algorithm * test stream piping * more work around message amplification * get the peerid from message * use timed cache as backing store * allow setting timeout in constructor * several changes to improve performance * more through testing of msg amplification * prevent gc issues * allow piping to self and prevent deadlocks * improove floodsub * allow running hook on cache eviction * prevent race conditions * prevent race conditions and improove tests * use hashes as cache keys * removing useless file * don't create a new seq * re-enable pubsub tests * fix imports * reduce number of runs to speed up tests * break out control message processing * normalize sleeps between steps * implement proper transport filtering * initial interop testing * clean up floodsub publish logic * allow dialing without a protocol * adding multiple reads/writes * use protobuf varint in mplex * don't loose conn's peerInfo * initial interop pubsub tests * don't duplicate connections/peers * bring back interop tests * wip: interop * re-enable interop and daemon tests * add multiple read write tests from handlers * don't cleanup channel prematurely * use correct channel to send/receive msgs * adjust tests with latest changes * include interop tests * remove temp logging output * fix ci * use correct public key serialization * additional tests for pubsub interop
2019-12-05 20:16:18 -06:00
method rpcHandler*(p: PubSub,
peer: PubSubPeer,
rpcMsg: RPCMsg) {.async, base.} =
PubSub (Gossip & Flood) Implementation (#36) This adds gossipsub and floodsub, as well as basic interop testing with the go libp2p daemon. * add close event * wip: gossipsub * splitting rpc message * making message handling more consistent * initial gossipsub implementation * feat: nim 1.0 cleanup * wip: gossipsub protobuf * adding encoding/decoding of gossipsub messages * add disconnect handler * add proper gossipsub msg handling * misc: cleanup for nim 1.0 * splitting floodsub and gossipsub tests * feat: add mesh rebalansing * test pubsub * add mesh rebalansing tests * testing mesh maintenance * finishing mcache implementatin * wip: commenting out broken tests * wip: don't run heartbeat for now * switchout debug for trace logging * testing gossip peer selection algorithm * test stream piping * more work around message amplification * get the peerid from message * use timed cache as backing store * allow setting timeout in constructor * several changes to improve performance * more through testing of msg amplification * prevent gc issues * allow piping to self and prevent deadlocks * improove floodsub * allow running hook on cache eviction * prevent race conditions * prevent race conditions and improove tests * use hashes as cache keys * removing useless file * don't create a new seq * re-enable pubsub tests * fix imports * reduce number of runs to speed up tests * break out control message processing * normalize sleeps between steps * implement proper transport filtering * initial interop testing * clean up floodsub publish logic * allow dialing without a protocol * adding multiple reads/writes * use protobuf varint in mplex * don't loose conn's peerInfo * initial interop pubsub tests * don't duplicate connections/peers * bring back interop tests * wip: interop * re-enable interop and daemon tests * add multiple read write tests from handlers * don't cleanup channel prematurely * use correct channel to send/receive msgs * adjust tests with latest changes * include interop tests * remove temp logging output * fix ci * use correct public key serialization * additional tests for pubsub interop
2019-12-05 20:16:18 -06:00
## handle rpc messages
trace "processing RPC message", msg = rpcMsg.shortLog, peer
for s in rpcMsg.subscriptions: # subscribe/unsubscribe the peer for each topic
trace "about to subscribe to topic", topicId = s.topic, peer
p.subscribeTopic(s.topic, s.subscribe, peer)
Gossip one one (#240) * allow multiple codecs per protocol (without breaking things) * add 1.1 protocol to gossip * explicit peering part 1 * explicit peering part 2 * explicit peering part 3 * PeerInfo and ControlPrune protocols * fix encodePrune * validated always, even explicit peers * prune by score (score is stub still) * add a way to pass parameters to gossip * standard setup fixes * take into account explicit direct peers in publish * add floodPublish logic * small fixes, publish still half broken * make sure to waitsub in sparse test * use var semantics to optimize table access * wip... lvalues don't work properly sadly... * big publish refactor, replenish and balance * fix internal tests * use g.peers for fanout (todo: don't include flood peers) * exclude non gossip from fanout * internal test fixes * fix flood tests * fix test's trypublish * test interop fixes * make sure to not remove peers from gossip table * restore old replenishFanout * cleanups * restore utility module import * restore trace vs debug in gossip * improve fanout replenish behavior further * triage publish nil peers (issue is on master too but just hidden behind a if/in) * getGossipPeers fixes * remove topics from pubsubpeer (was unused) * simplify rebalanceMesh (following spec) and make it finally reach D_high * better diagnostics * merge new pubsubpeer, copy 1.1 to new module * fix up merge * conditional enable gossip11 module * add back topics in peers, re-enable flood publish * add more heartbeat locking to prevent races * actually lock the heartbeat * minor fixes * with sugar * merge 1.0 * remove assertion in publish * fix multistream 1.1 multi proto * Fix merge oops * wip * fix gossip 11 upstream * gossipsub11 -> gossipsub * support interop testing * tests fixing * fix directchat build * control prune updates (pb) * wip parameters * gossip internal tests fixes * parameters wip * finishup with params * cleanups/wip * small sugar * grafted and pruned procs * wip updateScores * wip * fix logging issue * pubsubpeer, chronicles explicit override * fix internal gossip tests * wip * tables troubleshooting * score wip * score wip * fixes * fix test utils generateNodes * don't delete while iterating in score update * fix grafted defect * add a handleConnect in subscribeTopic * pruning improvements * wip * score fixes * post merge - builds gossip tests * further merge fixes * rebalance improvements and opportunistic grafting * fix test for now * restore explicit peering * implement peer exchange graft message * add an hard cap to PX * backoff time management * IWANT cap/budget * Adaptive gossip dissemination * outbound mesh quota, internal tests fixing * oversub prune score based, finish outbound quota * finishup with score and ihave budget * use go daemon 0.3.0 * import fixes * byScore cleanup score sorting * remove pointless scaling in `/` Duration operator * revert using libp2p org for daemon * interop fixes * fixes and cleanup * remove heartbeat assertion, minor debug fixes * logging improvements and cleaning up * (to revert) add some traces * add explicit topic to gossip rpcs * pubsub merge fixes and type fix in switch * Revert "(to revert) add some traces" This reverts commit 4663eaab6cc336c81cee50bc54025cf0b7bcbd99. * cleanup some now irrelevant todo * shuffle peers anyway as score might be disabled * add missing shuffle * old merge fix * more merge fixes * debug improvements * re-enable gossip internal tests * add gossip10 fallback (dormant but tested) * split gossipsub internal tests into 1.0 and 1.1 Co-authored-by: Dmitriy Ryajov <dryajov@gmail.com>
2020-09-21 18:16:29 +09:00
method onNewPeer(p: PubSub, peer: PubSubPeer) {.base.} = discard
method onPubSubPeerEvent*(p: PubSub, peer: PubsubPeer, event: PubsubPeerEvent) {.base, gcsafe.} =
# Peer event is raised for the send connection in particular
case event.kind
of PubSubPeerEventKind.Connected:
p.sendSubs(peer, toSeq(p.topics.keys), true)
of PubSubPeerEventKind.Disconnected:
discard
proc getOrCreatePeer*(
p: PubSub,
peer: PeerID,
Gossip one one (#240) * allow multiple codecs per protocol (without breaking things) * add 1.1 protocol to gossip * explicit peering part 1 * explicit peering part 2 * explicit peering part 3 * PeerInfo and ControlPrune protocols * fix encodePrune * validated always, even explicit peers * prune by score (score is stub still) * add a way to pass parameters to gossip * standard setup fixes * take into account explicit direct peers in publish * add floodPublish logic * small fixes, publish still half broken * make sure to waitsub in sparse test * use var semantics to optimize table access * wip... lvalues don't work properly sadly... * big publish refactor, replenish and balance * fix internal tests * use g.peers for fanout (todo: don't include flood peers) * exclude non gossip from fanout * internal test fixes * fix flood tests * fix test's trypublish * test interop fixes * make sure to not remove peers from gossip table * restore old replenishFanout * cleanups * restore utility module import * restore trace vs debug in gossip * improve fanout replenish behavior further * triage publish nil peers (issue is on master too but just hidden behind a if/in) * getGossipPeers fixes * remove topics from pubsubpeer (was unused) * simplify rebalanceMesh (following spec) and make it finally reach D_high * better diagnostics * merge new pubsubpeer, copy 1.1 to new module * fix up merge * conditional enable gossip11 module * add back topics in peers, re-enable flood publish * add more heartbeat locking to prevent races * actually lock the heartbeat * minor fixes * with sugar * merge 1.0 * remove assertion in publish * fix multistream 1.1 multi proto * Fix merge oops * wip * fix gossip 11 upstream * gossipsub11 -> gossipsub * support interop testing * tests fixing * fix directchat build * control prune updates (pb) * wip parameters * gossip internal tests fixes * parameters wip * finishup with params * cleanups/wip * small sugar * grafted and pruned procs * wip updateScores * wip * fix logging issue * pubsubpeer, chronicles explicit override * fix internal gossip tests * wip * tables troubleshooting * score wip * score wip * fixes * fix test utils generateNodes * don't delete while iterating in score update * fix grafted defect * add a handleConnect in subscribeTopic * pruning improvements * wip * score fixes * post merge - builds gossip tests * further merge fixes * rebalance improvements and opportunistic grafting * fix test for now * restore explicit peering * implement peer exchange graft message * add an hard cap to PX * backoff time management * IWANT cap/budget * Adaptive gossip dissemination * outbound mesh quota, internal tests fixing * oversub prune score based, finish outbound quota * finishup with score and ihave budget * use go daemon 0.3.0 * import fixes * byScore cleanup score sorting * remove pointless scaling in `/` Duration operator * revert using libp2p org for daemon * interop fixes * fixes and cleanup * remove heartbeat assertion, minor debug fixes * logging improvements and cleaning up * (to revert) add some traces * add explicit topic to gossip rpcs * pubsub merge fixes and type fix in switch * Revert "(to revert) add some traces" This reverts commit 4663eaab6cc336c81cee50bc54025cf0b7bcbd99. * cleanup some now irrelevant todo * shuffle peers anyway as score might be disabled * add missing shuffle * old merge fix * more merge fixes * debug improvements * re-enable gossip internal tests * add gossip10 fallback (dormant but tested) * split gossipsub internal tests into 1.0 and 1.1 Co-authored-by: Dmitriy Ryajov <dryajov@gmail.com>
2020-09-21 18:16:29 +09:00
protos: seq[string]): PubSubPeer =
if peer in p.peers:
return p.peers[peer]
PubSub (Gossip & Flood) Implementation (#36) This adds gossipsub and floodsub, as well as basic interop testing with the go libp2p daemon. * add close event * wip: gossipsub * splitting rpc message * making message handling more consistent * initial gossipsub implementation * feat: nim 1.0 cleanup * wip: gossipsub protobuf * adding encoding/decoding of gossipsub messages * add disconnect handler * add proper gossipsub msg handling * misc: cleanup for nim 1.0 * splitting floodsub and gossipsub tests * feat: add mesh rebalansing * test pubsub * add mesh rebalansing tests * testing mesh maintenance * finishing mcache implementatin * wip: commenting out broken tests * wip: don't run heartbeat for now * switchout debug for trace logging * testing gossip peer selection algorithm * test stream piping * more work around message amplification * get the peerid from message * use timed cache as backing store * allow setting timeout in constructor * several changes to improve performance * more through testing of msg amplification * prevent gc issues * allow piping to self and prevent deadlocks * improove floodsub * allow running hook on cache eviction * prevent race conditions * prevent race conditions and improove tests * use hashes as cache keys * removing useless file * don't create a new seq * re-enable pubsub tests * fix imports * reduce number of runs to speed up tests * break out control message processing * normalize sleeps between steps * implement proper transport filtering * initial interop testing * clean up floodsub publish logic * allow dialing without a protocol * adding multiple reads/writes * use protobuf varint in mplex * don't loose conn's peerInfo * initial interop pubsub tests * don't duplicate connections/peers * bring back interop tests * wip: interop * re-enable interop and daemon tests * add multiple read write tests from handlers * don't cleanup channel prematurely * use correct channel to send/receive msgs * adjust tests with latest changes * include interop tests * remove temp logging output * fix ci * use correct public key serialization * additional tests for pubsub interop
2019-12-05 20:16:18 -06:00
proc getConn(): Future[Connection] =
p.switch.dial(peer, protos)
proc onEvent(peer: PubsubPeer, event: PubsubPeerEvent) {.gcsafe.} =
p.onPubSubPeerEvent(peer, event)
PubSub (Gossip & Flood) Implementation (#36) This adds gossipsub and floodsub, as well as basic interop testing with the go libp2p daemon. * add close event * wip: gossipsub * splitting rpc message * making message handling more consistent * initial gossipsub implementation * feat: nim 1.0 cleanup * wip: gossipsub protobuf * adding encoding/decoding of gossipsub messages * add disconnect handler * add proper gossipsub msg handling * misc: cleanup for nim 1.0 * splitting floodsub and gossipsub tests * feat: add mesh rebalansing * test pubsub * add mesh rebalansing tests * testing mesh maintenance * finishing mcache implementatin * wip: commenting out broken tests * wip: don't run heartbeat for now * switchout debug for trace logging * testing gossip peer selection algorithm * test stream piping * more work around message amplification * get the peerid from message * use timed cache as backing store * allow setting timeout in constructor * several changes to improve performance * more through testing of msg amplification * prevent gc issues * allow piping to self and prevent deadlocks * improove floodsub * allow running hook on cache eviction * prevent race conditions * prevent race conditions and improove tests * use hashes as cache keys * removing useless file * don't create a new seq * re-enable pubsub tests * fix imports * reduce number of runs to speed up tests * break out control message processing * normalize sleeps between steps * implement proper transport filtering * initial interop testing * clean up floodsub publish logic * allow dialing without a protocol * adding multiple reads/writes * use protobuf varint in mplex * don't loose conn's peerInfo * initial interop pubsub tests * don't duplicate connections/peers * bring back interop tests * wip: interop * re-enable interop and daemon tests * add multiple read write tests from handlers * don't cleanup channel prematurely * use correct channel to send/receive msgs * adjust tests with latest changes * include interop tests * remove temp logging output * fix ci * use correct public key serialization * additional tests for pubsub interop
2019-12-05 20:16:18 -06:00
# create new pubsub peer
let pubSubPeer = newPubSubPeer(peer, getConn, onEvent, protos[0])
trace "created new pubsub peer", peer
PubSub (Gossip & Flood) Implementation (#36) This adds gossipsub and floodsub, as well as basic interop testing with the go libp2p daemon. * add close event * wip: gossipsub * splitting rpc message * making message handling more consistent * initial gossipsub implementation * feat: nim 1.0 cleanup * wip: gossipsub protobuf * adding encoding/decoding of gossipsub messages * add disconnect handler * add proper gossipsub msg handling * misc: cleanup for nim 1.0 * splitting floodsub and gossipsub tests * feat: add mesh rebalansing * test pubsub * add mesh rebalansing tests * testing mesh maintenance * finishing mcache implementatin * wip: commenting out broken tests * wip: don't run heartbeat for now * switchout debug for trace logging * testing gossip peer selection algorithm * test stream piping * more work around message amplification * get the peerid from message * use timed cache as backing store * allow setting timeout in constructor * several changes to improve performance * more through testing of msg amplification * prevent gc issues * allow piping to self and prevent deadlocks * improove floodsub * allow running hook on cache eviction * prevent race conditions * prevent race conditions and improove tests * use hashes as cache keys * removing useless file * don't create a new seq * re-enable pubsub tests * fix imports * reduce number of runs to speed up tests * break out control message processing * normalize sleeps between steps * implement proper transport filtering * initial interop testing * clean up floodsub publish logic * allow dialing without a protocol * adding multiple reads/writes * use protobuf varint in mplex * don't loose conn's peerInfo * initial interop pubsub tests * don't duplicate connections/peers * bring back interop tests * wip: interop * re-enable interop and daemon tests * add multiple read write tests from handlers * don't cleanup channel prematurely * use correct channel to send/receive msgs * adjust tests with latest changes * include interop tests * remove temp logging output * fix ci * use correct public key serialization * additional tests for pubsub interop
2019-12-05 20:16:18 -06:00
p.peers[peer] = pubSubPeer
pubSubPeer.observers = p.observers
2020-07-13 16:15:27 +02:00
Gossip one one (#240) * allow multiple codecs per protocol (without breaking things) * add 1.1 protocol to gossip * explicit peering part 1 * explicit peering part 2 * explicit peering part 3 * PeerInfo and ControlPrune protocols * fix encodePrune * validated always, even explicit peers * prune by score (score is stub still) * add a way to pass parameters to gossip * standard setup fixes * take into account explicit direct peers in publish * add floodPublish logic * small fixes, publish still half broken * make sure to waitsub in sparse test * use var semantics to optimize table access * wip... lvalues don't work properly sadly... * big publish refactor, replenish and balance * fix internal tests * use g.peers for fanout (todo: don't include flood peers) * exclude non gossip from fanout * internal test fixes * fix flood tests * fix test's trypublish * test interop fixes * make sure to not remove peers from gossip table * restore old replenishFanout * cleanups * restore utility module import * restore trace vs debug in gossip * improve fanout replenish behavior further * triage publish nil peers (issue is on master too but just hidden behind a if/in) * getGossipPeers fixes * remove topics from pubsubpeer (was unused) * simplify rebalanceMesh (following spec) and make it finally reach D_high * better diagnostics * merge new pubsubpeer, copy 1.1 to new module * fix up merge * conditional enable gossip11 module * add back topics in peers, re-enable flood publish * add more heartbeat locking to prevent races * actually lock the heartbeat * minor fixes * with sugar * merge 1.0 * remove assertion in publish * fix multistream 1.1 multi proto * Fix merge oops * wip * fix gossip 11 upstream * gossipsub11 -> gossipsub * support interop testing * tests fixing * fix directchat build * control prune updates (pb) * wip parameters * gossip internal tests fixes * parameters wip * finishup with params * cleanups/wip * small sugar * grafted and pruned procs * wip updateScores * wip * fix logging issue * pubsubpeer, chronicles explicit override * fix internal gossip tests * wip * tables troubleshooting * score wip * score wip * fixes * fix test utils generateNodes * don't delete while iterating in score update * fix grafted defect * add a handleConnect in subscribeTopic * pruning improvements * wip * score fixes * post merge - builds gossip tests * further merge fixes * rebalance improvements and opportunistic grafting * fix test for now * restore explicit peering * implement peer exchange graft message * add an hard cap to PX * backoff time management * IWANT cap/budget * Adaptive gossip dissemination * outbound mesh quota, internal tests fixing * oversub prune score based, finish outbound quota * finishup with score and ihave budget * use go daemon 0.3.0 * import fixes * byScore cleanup score sorting * remove pointless scaling in `/` Duration operator * revert using libp2p org for daemon * interop fixes * fixes and cleanup * remove heartbeat assertion, minor debug fixes * logging improvements and cleaning up * (to revert) add some traces * add explicit topic to gossip rpcs * pubsub merge fixes and type fix in switch * Revert "(to revert) add some traces" This reverts commit 4663eaab6cc336c81cee50bc54025cf0b7bcbd99. * cleanup some now irrelevant todo * shuffle peers anyway as score might be disabled * add missing shuffle * old merge fix * more merge fixes * debug improvements * re-enable gossip internal tests * add gossip10 fallback (dormant but tested) * split gossipsub internal tests into 1.0 and 1.1 Co-authored-by: Dmitriy Ryajov <dryajov@gmail.com>
2020-09-21 18:16:29 +09:00
onNewPeer(p, pubSubPeer)
# metrics
2020-07-13 16:15:27 +02:00
libp2p_pubsub_peers.set(p.peers.len.int64)
pubsubPeer.connect()
return pubSubPeer
PubSub (Gossip & Flood) Implementation (#36) This adds gossipsub and floodsub, as well as basic interop testing with the go libp2p daemon. * add close event * wip: gossipsub * splitting rpc message * making message handling more consistent * initial gossipsub implementation * feat: nim 1.0 cleanup * wip: gossipsub protobuf * adding encoding/decoding of gossipsub messages * add disconnect handler * add proper gossipsub msg handling * misc: cleanup for nim 1.0 * splitting floodsub and gossipsub tests * feat: add mesh rebalansing * test pubsub * add mesh rebalansing tests * testing mesh maintenance * finishing mcache implementatin * wip: commenting out broken tests * wip: don't run heartbeat for now * switchout debug for trace logging * testing gossip peer selection algorithm * test stream piping * more work around message amplification * get the peerid from message * use timed cache as backing store * allow setting timeout in constructor * several changes to improve performance * more through testing of msg amplification * prevent gc issues * allow piping to self and prevent deadlocks * improove floodsub * allow running hook on cache eviction * prevent race conditions * prevent race conditions and improove tests * use hashes as cache keys * removing useless file * don't create a new seq * re-enable pubsub tests * fix imports * reduce number of runs to speed up tests * break out control message processing * normalize sleeps between steps * implement proper transport filtering * initial interop testing * clean up floodsub publish logic * allow dialing without a protocol * adding multiple reads/writes * use protobuf varint in mplex * don't loose conn's peerInfo * initial interop pubsub tests * don't duplicate connections/peers * bring back interop tests * wip: interop * re-enable interop and daemon tests * add multiple read write tests from handlers * don't cleanup channel prematurely * use correct channel to send/receive msgs * adjust tests with latest changes * include interop tests * remove temp logging output * fix ci * use correct public key serialization * additional tests for pubsub interop
2019-12-05 20:16:18 -06:00
proc handleData*(p: PubSub, topic: string, data: seq[byte]): Future[void] {.async.} =
if topic notin p.topics: return # Not subscribed
for h in p.topics[topic].handler:
trace "triggering handler", topicID = topic
try:
await h(topic, data)
except CancelledError as exc:
raise exc
except CatchableError as exc:
# Handlers should never raise exceptions
warn "Error in topic handler", msg = exc.msg
PubSub (Gossip & Flood) Implementation (#36) This adds gossipsub and floodsub, as well as basic interop testing with the go libp2p daemon. * add close event * wip: gossipsub * splitting rpc message * making message handling more consistent * initial gossipsub implementation * feat: nim 1.0 cleanup * wip: gossipsub protobuf * adding encoding/decoding of gossipsub messages * add disconnect handler * add proper gossipsub msg handling * misc: cleanup for nim 1.0 * splitting floodsub and gossipsub tests * feat: add mesh rebalansing * test pubsub * add mesh rebalansing tests * testing mesh maintenance * finishing mcache implementatin * wip: commenting out broken tests * wip: don't run heartbeat for now * switchout debug for trace logging * testing gossip peer selection algorithm * test stream piping * more work around message amplification * get the peerid from message * use timed cache as backing store * allow setting timeout in constructor * several changes to improve performance * more through testing of msg amplification * prevent gc issues * allow piping to self and prevent deadlocks * improove floodsub * allow running hook on cache eviction * prevent race conditions * prevent race conditions and improove tests * use hashes as cache keys * removing useless file * don't create a new seq * re-enable pubsub tests * fix imports * reduce number of runs to speed up tests * break out control message processing * normalize sleeps between steps * implement proper transport filtering * initial interop testing * clean up floodsub publish logic * allow dialing without a protocol * adding multiple reads/writes * use protobuf varint in mplex * don't loose conn's peerInfo * initial interop pubsub tests * don't duplicate connections/peers * bring back interop tests * wip: interop * re-enable interop and daemon tests * add multiple read write tests from handlers * don't cleanup channel prematurely * use correct channel to send/receive msgs * adjust tests with latest changes * include interop tests * remove temp logging output * fix ci * use correct public key serialization * additional tests for pubsub interop
2019-12-05 20:16:18 -06:00
method handleConn*(p: PubSub,
conn: Connection,
proto: string) {.base, async.} =
PubSub (Gossip & Flood) Implementation (#36) This adds gossipsub and floodsub, as well as basic interop testing with the go libp2p daemon. * add close event * wip: gossipsub * splitting rpc message * making message handling more consistent * initial gossipsub implementation * feat: nim 1.0 cleanup * wip: gossipsub protobuf * adding encoding/decoding of gossipsub messages * add disconnect handler * add proper gossipsub msg handling * misc: cleanup for nim 1.0 * splitting floodsub and gossipsub tests * feat: add mesh rebalansing * test pubsub * add mesh rebalansing tests * testing mesh maintenance * finishing mcache implementatin * wip: commenting out broken tests * wip: don't run heartbeat for now * switchout debug for trace logging * testing gossip peer selection algorithm * test stream piping * more work around message amplification * get the peerid from message * use timed cache as backing store * allow setting timeout in constructor * several changes to improve performance * more through testing of msg amplification * prevent gc issues * allow piping to self and prevent deadlocks * improove floodsub * allow running hook on cache eviction * prevent race conditions * prevent race conditions and improove tests * use hashes as cache keys * removing useless file * don't create a new seq * re-enable pubsub tests * fix imports * reduce number of runs to speed up tests * break out control message processing * normalize sleeps between steps * implement proper transport filtering * initial interop testing * clean up floodsub publish logic * allow dialing without a protocol * adding multiple reads/writes * use protobuf varint in mplex * don't loose conn's peerInfo * initial interop pubsub tests * don't duplicate connections/peers * bring back interop tests * wip: interop * re-enable interop and daemon tests * add multiple read write tests from handlers * don't cleanup channel prematurely * use correct channel to send/receive msgs * adjust tests with latest changes * include interop tests * remove temp logging output * fix ci * use correct public key serialization * additional tests for pubsub interop
2019-12-05 20:16:18 -06:00
## handle incoming connections
##
## this proc will:
## 1) register a new PubSubPeer for the connection
## 2) register a handler with the peer;
## this handler gets called on every rpc message
## that the peer receives
## 3) ask the peer to subscribe us to every topic
## that we're interested in
##
if isNil(conn.peerInfo):
trace "no valid PeerId for peer"
await conn.close()
return
2020-05-21 09:01:36 -06:00
proc handler(peer: PubSubPeer, msg: RPCMsg): Future[void] =
# call pubsub rpc handler
p.rpcHandler(peer, msg)
2020-05-21 09:01:36 -06:00
Gossip one one (#240) * allow multiple codecs per protocol (without breaking things) * add 1.1 protocol to gossip * explicit peering part 1 * explicit peering part 2 * explicit peering part 3 * PeerInfo and ControlPrune protocols * fix encodePrune * validated always, even explicit peers * prune by score (score is stub still) * add a way to pass parameters to gossip * standard setup fixes * take into account explicit direct peers in publish * add floodPublish logic * small fixes, publish still half broken * make sure to waitsub in sparse test * use var semantics to optimize table access * wip... lvalues don't work properly sadly... * big publish refactor, replenish and balance * fix internal tests * use g.peers for fanout (todo: don't include flood peers) * exclude non gossip from fanout * internal test fixes * fix flood tests * fix test's trypublish * test interop fixes * make sure to not remove peers from gossip table * restore old replenishFanout * cleanups * restore utility module import * restore trace vs debug in gossip * improve fanout replenish behavior further * triage publish nil peers (issue is on master too but just hidden behind a if/in) * getGossipPeers fixes * remove topics from pubsubpeer (was unused) * simplify rebalanceMesh (following spec) and make it finally reach D_high * better diagnostics * merge new pubsubpeer, copy 1.1 to new module * fix up merge * conditional enable gossip11 module * add back topics in peers, re-enable flood publish * add more heartbeat locking to prevent races * actually lock the heartbeat * minor fixes * with sugar * merge 1.0 * remove assertion in publish * fix multistream 1.1 multi proto * Fix merge oops * wip * fix gossip 11 upstream * gossipsub11 -> gossipsub * support interop testing * tests fixing * fix directchat build * control prune updates (pb) * wip parameters * gossip internal tests fixes * parameters wip * finishup with params * cleanups/wip * small sugar * grafted and pruned procs * wip updateScores * wip * fix logging issue * pubsubpeer, chronicles explicit override * fix internal gossip tests * wip * tables troubleshooting * score wip * score wip * fixes * fix test utils generateNodes * don't delete while iterating in score update * fix grafted defect * add a handleConnect in subscribeTopic * pruning improvements * wip * score fixes * post merge - builds gossip tests * further merge fixes * rebalance improvements and opportunistic grafting * fix test for now * restore explicit peering * implement peer exchange graft message * add an hard cap to PX * backoff time management * IWANT cap/budget * Adaptive gossip dissemination * outbound mesh quota, internal tests fixing * oversub prune score based, finish outbound quota * finishup with score and ihave budget * use go daemon 0.3.0 * import fixes * byScore cleanup score sorting * remove pointless scaling in `/` Duration operator * revert using libp2p org for daemon * interop fixes * fixes and cleanup * remove heartbeat assertion, minor debug fixes * logging improvements and cleaning up * (to revert) add some traces * add explicit topic to gossip rpcs * pubsub merge fixes and type fix in switch * Revert "(to revert) add some traces" This reverts commit 4663eaab6cc336c81cee50bc54025cf0b7bcbd99. * cleanup some now irrelevant todo * shuffle peers anyway as score might be disabled * add missing shuffle * old merge fix * more merge fixes * debug improvements * re-enable gossip internal tests * add gossip10 fallback (dormant but tested) * split gossipsub internal tests into 1.0 and 1.1 Co-authored-by: Dmitriy Ryajov <dryajov@gmail.com>
2020-09-21 18:16:29 +09:00
let peer = p.getOrCreatePeer(conn.peerInfo.peerId, @[proto])
try:
2020-05-21 09:01:36 -06:00
peer.handler = handler
await peer.handle(conn) # spawn peer read loop
trace "pubsub peer handler ended", conn
except CancelledError as exc:
raise exc
2020-05-21 09:01:36 -06:00
except CatchableError as exc:
trace "exception ocurred in pubsub handle", exc = exc.msg, conn
finally:
await conn.close()
2020-04-07 09:49:43 -06:00
method subscribePeer*(p: PubSub, peer: PeerID) {.base.} =
## subscribe to remote peer to receive/send pubsub
## messages
##
Gossip one one (#240) * allow multiple codecs per protocol (without breaking things) * add 1.1 protocol to gossip * explicit peering part 1 * explicit peering part 2 * explicit peering part 3 * PeerInfo and ControlPrune protocols * fix encodePrune * validated always, even explicit peers * prune by score (score is stub still) * add a way to pass parameters to gossip * standard setup fixes * take into account explicit direct peers in publish * add floodPublish logic * small fixes, publish still half broken * make sure to waitsub in sparse test * use var semantics to optimize table access * wip... lvalues don't work properly sadly... * big publish refactor, replenish and balance * fix internal tests * use g.peers for fanout (todo: don't include flood peers) * exclude non gossip from fanout * internal test fixes * fix flood tests * fix test's trypublish * test interop fixes * make sure to not remove peers from gossip table * restore old replenishFanout * cleanups * restore utility module import * restore trace vs debug in gossip * improve fanout replenish behavior further * triage publish nil peers (issue is on master too but just hidden behind a if/in) * getGossipPeers fixes * remove topics from pubsubpeer (was unused) * simplify rebalanceMesh (following spec) and make it finally reach D_high * better diagnostics * merge new pubsubpeer, copy 1.1 to new module * fix up merge * conditional enable gossip11 module * add back topics in peers, re-enable flood publish * add more heartbeat locking to prevent races * actually lock the heartbeat * minor fixes * with sugar * merge 1.0 * remove assertion in publish * fix multistream 1.1 multi proto * Fix merge oops * wip * fix gossip 11 upstream * gossipsub11 -> gossipsub * support interop testing * tests fixing * fix directchat build * control prune updates (pb) * wip parameters * gossip internal tests fixes * parameters wip * finishup with params * cleanups/wip * small sugar * grafted and pruned procs * wip updateScores * wip * fix logging issue * pubsubpeer, chronicles explicit override * fix internal gossip tests * wip * tables troubleshooting * score wip * score wip * fixes * fix test utils generateNodes * don't delete while iterating in score update * fix grafted defect * add a handleConnect in subscribeTopic * pruning improvements * wip * score fixes * post merge - builds gossip tests * further merge fixes * rebalance improvements and opportunistic grafting * fix test for now * restore explicit peering * implement peer exchange graft message * add an hard cap to PX * backoff time management * IWANT cap/budget * Adaptive gossip dissemination * outbound mesh quota, internal tests fixing * oversub prune score based, finish outbound quota * finishup with score and ihave budget * use go daemon 0.3.0 * import fixes * byScore cleanup score sorting * remove pointless scaling in `/` Duration operator * revert using libp2p org for daemon * interop fixes * fixes and cleanup * remove heartbeat assertion, minor debug fixes * logging improvements and cleaning up * (to revert) add some traces * add explicit topic to gossip rpcs * pubsub merge fixes and type fix in switch * Revert "(to revert) add some traces" This reverts commit 4663eaab6cc336c81cee50bc54025cf0b7bcbd99. * cleanup some now irrelevant todo * shuffle peers anyway as score might be disabled * add missing shuffle * old merge fix * more merge fixes * debug improvements * re-enable gossip internal tests * add gossip10 fallback (dormant but tested) * split gossipsub internal tests into 1.0 and 1.1 Co-authored-by: Dmitriy Ryajov <dryajov@gmail.com>
2020-09-21 18:16:29 +09:00
let peer = p.getOrCreatePeer(peer, p.codecs)
peer.outbound = true # flag as outbound
PubSub (Gossip & Flood) Implementation (#36) This adds gossipsub and floodsub, as well as basic interop testing with the go libp2p daemon. * add close event * wip: gossipsub * splitting rpc message * making message handling more consistent * initial gossipsub implementation * feat: nim 1.0 cleanup * wip: gossipsub protobuf * adding encoding/decoding of gossipsub messages * add disconnect handler * add proper gossipsub msg handling * misc: cleanup for nim 1.0 * splitting floodsub and gossipsub tests * feat: add mesh rebalansing * test pubsub * add mesh rebalansing tests * testing mesh maintenance * finishing mcache implementatin * wip: commenting out broken tests * wip: don't run heartbeat for now * switchout debug for trace logging * testing gossip peer selection algorithm * test stream piping * more work around message amplification * get the peerid from message * use timed cache as backing store * allow setting timeout in constructor * several changes to improve performance * more through testing of msg amplification * prevent gc issues * allow piping to self and prevent deadlocks * improove floodsub * allow running hook on cache eviction * prevent race conditions * prevent race conditions and improove tests * use hashes as cache keys * removing useless file * don't create a new seq * re-enable pubsub tests * fix imports * reduce number of runs to speed up tests * break out control message processing * normalize sleeps between steps * implement proper transport filtering * initial interop testing * clean up floodsub publish logic * allow dialing without a protocol * adding multiple reads/writes * use protobuf varint in mplex * don't loose conn's peerInfo * initial interop pubsub tests * don't duplicate connections/peers * bring back interop tests * wip: interop * re-enable interop and daemon tests * add multiple read write tests from handlers * don't cleanup channel prematurely * use correct channel to send/receive msgs * adjust tests with latest changes * include interop tests * remove temp logging output * fix ci * use correct public key serialization * additional tests for pubsub interop
2019-12-05 20:16:18 -06:00
method unsubscribe*(p: PubSub,
topics: seq[TopicPair]) {.base, async.} =
2019-09-09 20:15:52 -06:00
## unsubscribe from a list of ``topic`` strings
for t in topics:
p.topics.withValue(t.topic, topic):
topic[].handler.keepIf(proc (x: auto): bool = x != t.handler)
if topic[].handler.len == 0:
# make sure we delete the topic if
# no more handlers are left
p.topics.del(t.topic)
libp2p_pubsub_topics.set(p.topics.len.int64)
proc unsubscribe*(p: PubSub,
topic: string,
handler: TopicHandler): Future[void] =
## unsubscribe from a ``topic`` string
##
p.unsubscribe(@[(topic, handler)])
2019-09-09 20:15:52 -06:00
method unsubscribeAll*(p: PubSub, topic: string) {.base, async.} =
p.topics.del(topic)
libp2p_pubsub_topics.set(p.topics.len.int64)
method subscribe*(p: PubSub,
topic: string,
handler: TopicHandler) {.base, async.} =
2019-09-09 20:15:52 -06:00
## subscribe to a topic
##
## ``topic`` - a string topic to subscribe to
##
## ``handler`` - is a user provided proc
## that will be triggered
2019-09-09 20:15:52 -06:00
## on every received message
##
PubSub (Gossip & Flood) Implementation (#36) This adds gossipsub and floodsub, as well as basic interop testing with the go libp2p daemon. * add close event * wip: gossipsub * splitting rpc message * making message handling more consistent * initial gossipsub implementation * feat: nim 1.0 cleanup * wip: gossipsub protobuf * adding encoding/decoding of gossipsub messages * add disconnect handler * add proper gossipsub msg handling * misc: cleanup for nim 1.0 * splitting floodsub and gossipsub tests * feat: add mesh rebalansing * test pubsub * add mesh rebalansing tests * testing mesh maintenance * finishing mcache implementatin * wip: commenting out broken tests * wip: don't run heartbeat for now * switchout debug for trace logging * testing gossip peer selection algorithm * test stream piping * more work around message amplification * get the peerid from message * use timed cache as backing store * allow setting timeout in constructor * several changes to improve performance * more through testing of msg amplification * prevent gc issues * allow piping to self and prevent deadlocks * improove floodsub * allow running hook on cache eviction * prevent race conditions * prevent race conditions and improove tests * use hashes as cache keys * removing useless file * don't create a new seq * re-enable pubsub tests * fix imports * reduce number of runs to speed up tests * break out control message processing * normalize sleeps between steps * implement proper transport filtering * initial interop testing * clean up floodsub publish logic * allow dialing without a protocol * adding multiple reads/writes * use protobuf varint in mplex * don't loose conn's peerInfo * initial interop pubsub tests * don't duplicate connections/peers * bring back interop tests * wip: interop * re-enable interop and daemon tests * add multiple read write tests from handlers * don't cleanup channel prematurely * use correct channel to send/receive msgs * adjust tests with latest changes * include interop tests * remove temp logging output * fix ci * use correct public key serialization * additional tests for pubsub interop
2019-12-05 20:16:18 -06:00
if topic notin p.topics:
2019-09-28 13:55:35 -06:00
trace "subscribing to topic", name = topic
p.topics[topic] = Topic(name: topic)
2019-09-28 13:55:35 -06:00
p.topics[topic].handler.add(handler)
2019-09-09 20:15:52 -06:00
for _, peer in p.peers:
p.sendSubs(peer, @[topic], true)
PubSub (Gossip & Flood) Implementation (#36) This adds gossipsub and floodsub, as well as basic interop testing with the go libp2p daemon. * add close event * wip: gossipsub * splitting rpc message * making message handling more consistent * initial gossipsub implementation * feat: nim 1.0 cleanup * wip: gossipsub protobuf * adding encoding/decoding of gossipsub messages * add disconnect handler * add proper gossipsub msg handling * misc: cleanup for nim 1.0 * splitting floodsub and gossipsub tests * feat: add mesh rebalansing * test pubsub * add mesh rebalansing tests * testing mesh maintenance * finishing mcache implementatin * wip: commenting out broken tests * wip: don't run heartbeat for now * switchout debug for trace logging * testing gossip peer selection algorithm * test stream piping * more work around message amplification * get the peerid from message * use timed cache as backing store * allow setting timeout in constructor * several changes to improve performance * more through testing of msg amplification * prevent gc issues * allow piping to self and prevent deadlocks * improove floodsub * allow running hook on cache eviction * prevent race conditions * prevent race conditions and improove tests * use hashes as cache keys * removing useless file * don't create a new seq * re-enable pubsub tests * fix imports * reduce number of runs to speed up tests * break out control message processing * normalize sleeps between steps * implement proper transport filtering * initial interop testing * clean up floodsub publish logic * allow dialing without a protocol * adding multiple reads/writes * use protobuf varint in mplex * don't loose conn's peerInfo * initial interop pubsub tests * don't duplicate connections/peers * bring back interop tests * wip: interop * re-enable interop and daemon tests * add multiple read write tests from handlers * don't cleanup channel prematurely * use correct channel to send/receive msgs * adjust tests with latest changes * include interop tests * remove temp logging output * fix ci * use correct public key serialization * additional tests for pubsub interop
2019-12-05 20:16:18 -06:00
# metrics
libp2p_pubsub_topics.set(p.topics.len.int64)
PubSub (Gossip & Flood) Implementation (#36) This adds gossipsub and floodsub, as well as basic interop testing with the go libp2p daemon. * add close event * wip: gossipsub * splitting rpc message * making message handling more consistent * initial gossipsub implementation * feat: nim 1.0 cleanup * wip: gossipsub protobuf * adding encoding/decoding of gossipsub messages * add disconnect handler * add proper gossipsub msg handling * misc: cleanup for nim 1.0 * splitting floodsub and gossipsub tests * feat: add mesh rebalansing * test pubsub * add mesh rebalansing tests * testing mesh maintenance * finishing mcache implementatin * wip: commenting out broken tests * wip: don't run heartbeat for now * switchout debug for trace logging * testing gossip peer selection algorithm * test stream piping * more work around message amplification * get the peerid from message * use timed cache as backing store * allow setting timeout in constructor * several changes to improve performance * more through testing of msg amplification * prevent gc issues * allow piping to self and prevent deadlocks * improove floodsub * allow running hook on cache eviction * prevent race conditions * prevent race conditions and improove tests * use hashes as cache keys * removing useless file * don't create a new seq * re-enable pubsub tests * fix imports * reduce number of runs to speed up tests * break out control message processing * normalize sleeps between steps * implement proper transport filtering * initial interop testing * clean up floodsub publish logic * allow dialing without a protocol * adding multiple reads/writes * use protobuf varint in mplex * don't loose conn's peerInfo * initial interop pubsub tests * don't duplicate connections/peers * bring back interop tests * wip: interop * re-enable interop and daemon tests * add multiple read write tests from handlers * don't cleanup channel prematurely * use correct channel to send/receive msgs * adjust tests with latest changes * include interop tests * remove temp logging output * fix ci * use correct public key serialization * additional tests for pubsub interop
2019-12-05 20:16:18 -06:00
method publish*(p: PubSub,
topic: string,
data: seq[byte]): Future[int] {.base, async.} =
2019-09-09 20:15:52 -06:00
## publish to a ``topic``
## The return value is the number of neighbours that we attempted to send the
## message to, excluding self. Note that this is an optimistic number of
## attempts - the number of peers that actually receive the message might
## be lower.
if p.triggerSelf:
await handleData(p, topic, data)
return 0
PubSub (Gossip & Flood) Implementation (#36) This adds gossipsub and floodsub, as well as basic interop testing with the go libp2p daemon. * add close event * wip: gossipsub * splitting rpc message * making message handling more consistent * initial gossipsub implementation * feat: nim 1.0 cleanup * wip: gossipsub protobuf * adding encoding/decoding of gossipsub messages * add disconnect handler * add proper gossipsub msg handling * misc: cleanup for nim 1.0 * splitting floodsub and gossipsub tests * feat: add mesh rebalansing * test pubsub * add mesh rebalansing tests * testing mesh maintenance * finishing mcache implementatin * wip: commenting out broken tests * wip: don't run heartbeat for now * switchout debug for trace logging * testing gossip peer selection algorithm * test stream piping * more work around message amplification * get the peerid from message * use timed cache as backing store * allow setting timeout in constructor * several changes to improve performance * more through testing of msg amplification * prevent gc issues * allow piping to self and prevent deadlocks * improove floodsub * allow running hook on cache eviction * prevent race conditions * prevent race conditions and improove tests * use hashes as cache keys * removing useless file * don't create a new seq * re-enable pubsub tests * fix imports * reduce number of runs to speed up tests * break out control message processing * normalize sleeps between steps * implement proper transport filtering * initial interop testing * clean up floodsub publish logic * allow dialing without a protocol * adding multiple reads/writes * use protobuf varint in mplex * don't loose conn's peerInfo * initial interop pubsub tests * don't duplicate connections/peers * bring back interop tests * wip: interop * re-enable interop and daemon tests * add multiple read write tests from handlers * don't cleanup channel prematurely * use correct channel to send/receive msgs * adjust tests with latest changes * include interop tests * remove temp logging output * fix ci * use correct public key serialization * additional tests for pubsub interop
2019-12-05 20:16:18 -06:00
method initPubSub*(p: PubSub) {.base.} =
## perform pubsub initialization
p.observers = new(seq[PubSubObserver])
if p.msgIdProvider == nil:
p.msgIdProvider = defaultMsgIdProvider
PubSub (Gossip & Flood) Implementation (#36) This adds gossipsub and floodsub, as well as basic interop testing with the go libp2p daemon. * add close event * wip: gossipsub * splitting rpc message * making message handling more consistent * initial gossipsub implementation * feat: nim 1.0 cleanup * wip: gossipsub protobuf * adding encoding/decoding of gossipsub messages * add disconnect handler * add proper gossipsub msg handling * misc: cleanup for nim 1.0 * splitting floodsub and gossipsub tests * feat: add mesh rebalansing * test pubsub * add mesh rebalansing tests * testing mesh maintenance * finishing mcache implementatin * wip: commenting out broken tests * wip: don't run heartbeat for now * switchout debug for trace logging * testing gossip peer selection algorithm * test stream piping * more work around message amplification * get the peerid from message * use timed cache as backing store * allow setting timeout in constructor * several changes to improve performance * more through testing of msg amplification * prevent gc issues * allow piping to self and prevent deadlocks * improove floodsub * allow running hook on cache eviction * prevent race conditions * prevent race conditions and improove tests * use hashes as cache keys * removing useless file * don't create a new seq * re-enable pubsub tests * fix imports * reduce number of runs to speed up tests * break out control message processing * normalize sleeps between steps * implement proper transport filtering * initial interop testing * clean up floodsub publish logic * allow dialing without a protocol * adding multiple reads/writes * use protobuf varint in mplex * don't loose conn's peerInfo * initial interop pubsub tests * don't duplicate connections/peers * bring back interop tests * wip: interop * re-enable interop and daemon tests * add multiple read write tests from handlers * don't cleanup channel prematurely * use correct channel to send/receive msgs * adjust tests with latest changes * include interop tests * remove temp logging output * fix ci * use correct public key serialization * additional tests for pubsub interop
2019-12-05 20:16:18 -06:00
method start*(p: PubSub) {.async, base.} =
## start pubsub
discard
method stop*(p: PubSub) {.async, base.} =
## stopt pubsub
2019-09-09 20:15:52 -06:00
discard
method addValidator*(p: PubSub,
topic: varargs[string],
hook: ValidatorHandler) {.base.} =
for t in topic:
if t notin p.validators:
p.validators[t] = initHashSet[ValidatorHandler]()
trace "adding validator for topic", topicId = t
p.validators[t].incl(hook)
method removeValidator*(p: PubSub,
topic: varargs[string],
hook: ValidatorHandler) {.base.} =
for t in topic:
if t in p.validators:
p.validators[t].excl(hook)
method validate*(p: PubSub, message: Message): Future[bool] {.async, base.} =
var pending: seq[Future[bool]]
trace "about to validate message"
for topic in message.topicIDs:
trace "looking for validators on topic", topicID = topic,
registered = toSeq(p.validators.keys)
if topic in p.validators:
trace "running validators for topic", topicID = topic
# TODO: add timeout to validator
pending.add(p.validators[topic].mapIt(it(topic, message)))
let futs = await allFinished(pending)
result = futs.allIt(not it.failed and it.read())
if result:
libp2p_pubsub_validation_success.inc()
else:
libp2p_pubsub_validation_failure.inc()
Gossip one one (#240) * allow multiple codecs per protocol (without breaking things) * add 1.1 protocol to gossip * explicit peering part 1 * explicit peering part 2 * explicit peering part 3 * PeerInfo and ControlPrune protocols * fix encodePrune * validated always, even explicit peers * prune by score (score is stub still) * add a way to pass parameters to gossip * standard setup fixes * take into account explicit direct peers in publish * add floodPublish logic * small fixes, publish still half broken * make sure to waitsub in sparse test * use var semantics to optimize table access * wip... lvalues don't work properly sadly... * big publish refactor, replenish and balance * fix internal tests * use g.peers for fanout (todo: don't include flood peers) * exclude non gossip from fanout * internal test fixes * fix flood tests * fix test's trypublish * test interop fixes * make sure to not remove peers from gossip table * restore old replenishFanout * cleanups * restore utility module import * restore trace vs debug in gossip * improve fanout replenish behavior further * triage publish nil peers (issue is on master too but just hidden behind a if/in) * getGossipPeers fixes * remove topics from pubsubpeer (was unused) * simplify rebalanceMesh (following spec) and make it finally reach D_high * better diagnostics * merge new pubsubpeer, copy 1.1 to new module * fix up merge * conditional enable gossip11 module * add back topics in peers, re-enable flood publish * add more heartbeat locking to prevent races * actually lock the heartbeat * minor fixes * with sugar * merge 1.0 * remove assertion in publish * fix multistream 1.1 multi proto * Fix merge oops * wip * fix gossip 11 upstream * gossipsub11 -> gossipsub * support interop testing * tests fixing * fix directchat build * control prune updates (pb) * wip parameters * gossip internal tests fixes * parameters wip * finishup with params * cleanups/wip * small sugar * grafted and pruned procs * wip updateScores * wip * fix logging issue * pubsubpeer, chronicles explicit override * fix internal gossip tests * wip * tables troubleshooting * score wip * score wip * fixes * fix test utils generateNodes * don't delete while iterating in score update * fix grafted defect * add a handleConnect in subscribeTopic * pruning improvements * wip * score fixes * post merge - builds gossip tests * further merge fixes * rebalance improvements and opportunistic grafting * fix test for now * restore explicit peering * implement peer exchange graft message * add an hard cap to PX * backoff time management * IWANT cap/budget * Adaptive gossip dissemination * outbound mesh quota, internal tests fixing * oversub prune score based, finish outbound quota * finishup with score and ihave budget * use go daemon 0.3.0 * import fixes * byScore cleanup score sorting * remove pointless scaling in `/` Duration operator * revert using libp2p org for daemon * interop fixes * fixes and cleanup * remove heartbeat assertion, minor debug fixes * logging improvements and cleaning up * (to revert) add some traces * add explicit topic to gossip rpcs * pubsub merge fixes and type fix in switch * Revert "(to revert) add some traces" This reverts commit 4663eaab6cc336c81cee50bc54025cf0b7bcbd99. * cleanup some now irrelevant todo * shuffle peers anyway as score might be disabled * add missing shuffle * old merge fix * more merge fixes * debug improvements * re-enable gossip internal tests * add gossip10 fallback (dormant but tested) * split gossipsub internal tests into 1.0 and 1.1 Co-authored-by: Dmitriy Ryajov <dryajov@gmail.com>
2020-09-21 18:16:29 +09:00
proc init*[PubParams: object | bool](
P: typedesc[PubSub],
switch: Switch,
triggerSelf: bool = false,
verifySignature: bool = true,
sign: bool = true,
Gossip one one (#240) * allow multiple codecs per protocol (without breaking things) * add 1.1 protocol to gossip * explicit peering part 1 * explicit peering part 2 * explicit peering part 3 * PeerInfo and ControlPrune protocols * fix encodePrune * validated always, even explicit peers * prune by score (score is stub still) * add a way to pass parameters to gossip * standard setup fixes * take into account explicit direct peers in publish * add floodPublish logic * small fixes, publish still half broken * make sure to waitsub in sparse test * use var semantics to optimize table access * wip... lvalues don't work properly sadly... * big publish refactor, replenish and balance * fix internal tests * use g.peers for fanout (todo: don't include flood peers) * exclude non gossip from fanout * internal test fixes * fix flood tests * fix test's trypublish * test interop fixes * make sure to not remove peers from gossip table * restore old replenishFanout * cleanups * restore utility module import * restore trace vs debug in gossip * improve fanout replenish behavior further * triage publish nil peers (issue is on master too but just hidden behind a if/in) * getGossipPeers fixes * remove topics from pubsubpeer (was unused) * simplify rebalanceMesh (following spec) and make it finally reach D_high * better diagnostics * merge new pubsubpeer, copy 1.1 to new module * fix up merge * conditional enable gossip11 module * add back topics in peers, re-enable flood publish * add more heartbeat locking to prevent races * actually lock the heartbeat * minor fixes * with sugar * merge 1.0 * remove assertion in publish * fix multistream 1.1 multi proto * Fix merge oops * wip * fix gossip 11 upstream * gossipsub11 -> gossipsub * support interop testing * tests fixing * fix directchat build * control prune updates (pb) * wip parameters * gossip internal tests fixes * parameters wip * finishup with params * cleanups/wip * small sugar * grafted and pruned procs * wip updateScores * wip * fix logging issue * pubsubpeer, chronicles explicit override * fix internal gossip tests * wip * tables troubleshooting * score wip * score wip * fixes * fix test utils generateNodes * don't delete while iterating in score update * fix grafted defect * add a handleConnect in subscribeTopic * pruning improvements * wip * score fixes * post merge - builds gossip tests * further merge fixes * rebalance improvements and opportunistic grafting * fix test for now * restore explicit peering * implement peer exchange graft message * add an hard cap to PX * backoff time management * IWANT cap/budget * Adaptive gossip dissemination * outbound mesh quota, internal tests fixing * oversub prune score based, finish outbound quota * finishup with score and ihave budget * use go daemon 0.3.0 * import fixes * byScore cleanup score sorting * remove pointless scaling in `/` Duration operator * revert using libp2p org for daemon * interop fixes * fixes and cleanup * remove heartbeat assertion, minor debug fixes * logging improvements and cleaning up * (to revert) add some traces * add explicit topic to gossip rpcs * pubsub merge fixes and type fix in switch * Revert "(to revert) add some traces" This reverts commit 4663eaab6cc336c81cee50bc54025cf0b7bcbd99. * cleanup some now irrelevant todo * shuffle peers anyway as score might be disabled * add missing shuffle * old merge fix * more merge fixes * debug improvements * re-enable gossip internal tests * add gossip10 fallback (dormant but tested) * split gossipsub internal tests into 1.0 and 1.1 Co-authored-by: Dmitriy Ryajov <dryajov@gmail.com>
2020-09-21 18:16:29 +09:00
msgIdProvider: MsgIdProvider = defaultMsgIdProvider,
parameters: PubParams = false): P =
let pubsub =
when PubParams is bool:
P(switch: switch,
peerInfo: switch.peerInfo,
triggerSelf: triggerSelf,
verifySignature: verifySignature,
sign: sign,
peers: initTable[PeerID, PubSubPeer](),
topics: initTable[string, Topic](),
msgIdProvider: msgIdProvider)
else:
P(switch: switch,
peerInfo: switch.peerInfo,
triggerSelf: triggerSelf,
verifySignature: verifySignature,
sign: sign,
peers: initTable[PeerID, PubSubPeer](),
topics: initTable[string, Topic](),
msgIdProvider: msgIdProvider,
parameters: parameters)
pubsub.initPubSub()
proc peerEventHandler(peerId: PeerID, event: PeerEvent) {.async.} =
if event == PeerEvent.Joined:
pubsub.subscribePeer(peerId)
else:
pubsub.unsubscribePeer(peerId)
switch.addPeerEventHandler(peerEventHandler, PeerEvent.Joined)
switch.addPeerEventHandler(peerEventHandler, PeerEvent.Left)
pubsub.initPubSub()
return pubsub
Gossip one one (#240) * allow multiple codecs per protocol (without breaking things) * add 1.1 protocol to gossip * explicit peering part 1 * explicit peering part 2 * explicit peering part 3 * PeerInfo and ControlPrune protocols * fix encodePrune * validated always, even explicit peers * prune by score (score is stub still) * add a way to pass parameters to gossip * standard setup fixes * take into account explicit direct peers in publish * add floodPublish logic * small fixes, publish still half broken * make sure to waitsub in sparse test * use var semantics to optimize table access * wip... lvalues don't work properly sadly... * big publish refactor, replenish and balance * fix internal tests * use g.peers for fanout (todo: don't include flood peers) * exclude non gossip from fanout * internal test fixes * fix flood tests * fix test's trypublish * test interop fixes * make sure to not remove peers from gossip table * restore old replenishFanout * cleanups * restore utility module import * restore trace vs debug in gossip * improve fanout replenish behavior further * triage publish nil peers (issue is on master too but just hidden behind a if/in) * getGossipPeers fixes * remove topics from pubsubpeer (was unused) * simplify rebalanceMesh (following spec) and make it finally reach D_high * better diagnostics * merge new pubsubpeer, copy 1.1 to new module * fix up merge * conditional enable gossip11 module * add back topics in peers, re-enable flood publish * add more heartbeat locking to prevent races * actually lock the heartbeat * minor fixes * with sugar * merge 1.0 * remove assertion in publish * fix multistream 1.1 multi proto * Fix merge oops * wip * fix gossip 11 upstream * gossipsub11 -> gossipsub * support interop testing * tests fixing * fix directchat build * control prune updates (pb) * wip parameters * gossip internal tests fixes * parameters wip * finishup with params * cleanups/wip * small sugar * grafted and pruned procs * wip updateScores * wip * fix logging issue * pubsubpeer, chronicles explicit override * fix internal gossip tests * wip * tables troubleshooting * score wip * score wip * fixes * fix test utils generateNodes * don't delete while iterating in score update * fix grafted defect * add a handleConnect in subscribeTopic * pruning improvements * wip * score fixes * post merge - builds gossip tests * further merge fixes * rebalance improvements and opportunistic grafting * fix test for now * restore explicit peering * implement peer exchange graft message * add an hard cap to PX * backoff time management * IWANT cap/budget * Adaptive gossip dissemination * outbound mesh quota, internal tests fixing * oversub prune score based, finish outbound quota * finishup with score and ihave budget * use go daemon 0.3.0 * import fixes * byScore cleanup score sorting * remove pointless scaling in `/` Duration operator * revert using libp2p org for daemon * interop fixes * fixes and cleanup * remove heartbeat assertion, minor debug fixes * logging improvements and cleaning up * (to revert) add some traces * add explicit topic to gossip rpcs * pubsub merge fixes and type fix in switch * Revert "(to revert) add some traces" This reverts commit 4663eaab6cc336c81cee50bc54025cf0b7bcbd99. * cleanup some now irrelevant todo * shuffle peers anyway as score might be disabled * add missing shuffle * old merge fix * more merge fixes * debug improvements * re-enable gossip internal tests * add gossip10 fallback (dormant but tested) * split gossipsub internal tests into 1.0 and 1.1 Co-authored-by: Dmitriy Ryajov <dryajov@gmail.com>
2020-09-21 18:16:29 +09:00
proc addObserver*(p: PubSub; observer: PubSubObserver) = p.observers[] &= observer
proc removeObserver*(p: PubSub; observer: PubSubObserver) =
let idx = p.observers[].find(observer)
if idx != -1:
p.observers[].del(idx)