refactor(networking): peermanager refactor and cleanups (#1539)

* refactor(networking): use addServicePeer where needed + add metrics
This commit is contained in:
Alvaro Revuelta 2023-02-27 18:24:31 +01:00 committed by GitHub
parent c254115578
commit 700dbbb7fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 106 additions and 138 deletions

View File

@ -24,6 +24,7 @@ import libp2p/[switch, # manage transports, a single entry poi
nameresolving/dnsresolver]# define DNS resolution
import
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_lightpush,
../../waku/v2/protocol/waku_lightpush/rpc,
../../waku/v2/protocol/waku_filter,
../../waku/v2/protocol/waku_store,
@ -489,7 +490,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
echo "Connecting to storenode: " & $(storenode.get())
node.mountStoreClient()
node.setStorePeer(storenode.get())
node.peerManager.addServicePeer(storenode.get(), WakuStoreCodec)
proc storeHandler(response: HistoryResponse) {.gcsafe.} =
for msg in response.messages:
@ -509,13 +510,13 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
await mountLightPush(node)
node.mountLightPushClient()
node.setLightPushPeer(conf.lightpushnode)
node.peerManager.addServicePeer(parseRemotePeerInfo(conf.lightpushnode), WakuLightpushCodec)
if conf.filternode != "":
await node.mountFilter()
await node.mountFilterClient()
node.setFilterPeer(parseRemotePeerInfo(conf.filternode))
node.peerManager.addServicePeer(parseRemotePeerInfo(conf.filternode), WakuFilterCodec)
proc filterHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.gcsafe.} =
trace "Hit filter handler", contentTopic=msg.contentTopic

View File

@ -16,6 +16,10 @@ import
libp2p/errors,
../../../waku/v2/protocol/waku_message,
../../../waku/v2/node/waku_node,
../../../waku/v2/utils/peers,
../../../waku/v2/node/peer_manager,
../../waku/v2/protocol/waku_filter,
../../waku/v2/protocol/waku_store,
# Chat 2 imports
../chat2/chat2,
# Common cli config
@ -281,10 +285,12 @@ when isMainModule:
waitFor connectToNodes(bridge.nodev2, conf.staticnodes)
if conf.storenode != "":
setStorePeer(bridge.nodev2, conf.storenode)
let storePeer = parseRemotePeerInfo(conf.storenode)
bridge.nodev2.peerManager.addServicePeer(storePeer, WakuStoreCodec)
if conf.filternode != "":
setFilterPeer(bridge.nodev2, conf.filternode)
let filterPeer = parseRemotePeerInfo(conf.filternode)
bridge.nodev2.peerManager.addServicePeer(filterPeer, WakuFilterCodec)
if conf.rpc:
let ta = initTAddress(conf.rpcAddress,

View File

@ -26,6 +26,8 @@ import
libp2p/nameresolving/nameresolver,
../../waku/v2/utils/time,
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_store,
../../waku/v2/protocol/waku_filter,
../../waku/v2/node/message_cache,
../../waku/v2/node/waku_node,
../../waku/v2/node/peer_manager,
@ -428,11 +430,13 @@ when isMainModule:
if conf.storenode != "":
mountStoreClient(bridge.nodev2)
setStorePeer(bridge.nodev2, conf.storenode)
let storeNode = parseRemotePeerInfo(conf.storenode)
bridge.nodev2.peerManager.addServicePeer(storeNode, WakuStoreCodec)
if conf.filternode != "":
waitFor mountFilterClient(bridge.nodev2)
setFilterPeer(bridge.nodev2, conf.filternode)
let filterNode = parseRemotePeerInfo(conf.filternode)
bridge.nodev2.peerManager.addServicePeer(filterNode, WakuFilterCodec)
if conf.rpc:
let ta = initTAddress(conf.rpcAddress,

View File

@ -96,10 +96,9 @@ procSuite "Peer Manager":
await node.mountSwap()
node.mountStoreClient()
node.wakuSwap.setPeer(swapPeer.toRemotePeerInfo())
node.setStorePeer(storePeer.toRemotePeerInfo())
node.setFilterPeer(filterPeer.toRemotePeerInfo())
node.peerManager.addServicePeer(swapPeer.toRemotePeerInfo(), WakuSwapCodec)
node.peerManager.addServicePeer(storePeer.toRemotePeerInfo(), WakuStoreCodec)
node.peerManager.addServicePeer(filterPeer.toRemotePeerInfo(), WakuFilterCodec)
# Check peers were successfully added to peer manager
check:
@ -127,7 +126,7 @@ procSuite "Peer Manager":
await allFutures(nodes.mapIt(it.mountRelay()))
# Test default connectedness for new peers
nodes[0].peerManager.addPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuRelayCodec)
nodes[0].peerManager.addPeer(nodes[1].peerInfo.toRemotePeerInfo())
check:
# No information about node2's connectedness
nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == NotConnected
@ -160,7 +159,7 @@ procSuite "Peer Manager":
await nodes[0].start()
await nodes[0].mountRelay()
nodes[0].peerManager.addPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuRelayCodec)
nodes[0].peerManager.addPeer(nodes[1].peerInfo.toRemotePeerInfo())
# Set a low backoff to speed up test: 2, 4, 8, 16
nodes[0].peerManager.initialBackoffInSec = 2
@ -236,7 +235,8 @@ procSuite "Peer Manager":
node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected
await node3.mountRelay() # This should trigger a reconnect
await node3.mountRelay()
await node3.peerManager.connectToRelayPeers()
check:
# Reconnected to node2 after "restart"
@ -313,12 +313,12 @@ procSuite "Peer Manager":
let peerInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())
# Add all peers (but self) to node 0
nodes[0].peerManager.addPeer(peerInfos[1], WakuRelayCodec)
nodes[0].peerManager.addPeer(peerInfos[2], WakuRelayCodec)
nodes[0].peerManager.addPeer(peerInfos[3], WakuRelayCodec)
nodes[0].peerManager.addPeer(peerInfos[1])
nodes[0].peerManager.addPeer(peerInfos[2])
nodes[0].peerManager.addPeer(peerInfos[3])
# Attempt to connect to all known peers supporting a given protocol
await nodes[0].peerManager.reconnectPeers(WakuRelayCodec, protocolMatcher(WakuRelayCodec))
# Connect to relay peers
await nodes[0].peerManager.connectToRelayPeers()
check:
# Peerstore track all three peers
@ -512,7 +512,7 @@ procSuite "Peer Manager":
# Create 15 peers and add them to the peerstore
let peers = toSeq(1..15).mapIt(parseRemotePeerInfo("/ip4/0.0.0.0/tcp/0/p2p/" & $PeerId.random().get()))
for p in peers: pm.addPeer(p, "")
for p in peers: pm.addPeer(p)
# Check that we have 15 peers in the peerstore
check:

View File

@ -167,8 +167,12 @@ procSuite "Waku v2 JSON-RPC API - Admin":
filterPeer = PeerInfo.new(generateEcdsaKey(), @[locationAddr])
storePeer = PeerInfo.new(generateEcdsaKey(), @[locationAddr])
node.setStorePeer(storePeer.toRemotePeerInfo())
node.setFilterPeer(filterPeer.toRemotePeerInfo())
node.peerManager.addServicePeer(filterPeer.toRemotePeerInfo(), WakuFilterCodec)
node.peerManager.addServicePeer(storePeer.toRemotePeerInfo(), WakuStoreCodec)
# Mock that we connected in the past so Identify populated this
node.peerManager.peerStore[ProtoBook][filterPeer.peerId] = @[WakuFilterCodec]
node.peerManager.peerStore[ProtoBook][storePeer.peerId] = @[WakuStoreCodec]
let response = await client.get_waku_v2_admin_v1_peers()

View File

@ -14,6 +14,7 @@ import
../../../waku/v2/node/jsonrpc/filter/handlers as filter_api,
../../../waku/v2/node/jsonrpc/filter/client as filter_api_client,
../../../waku/v2/protocol/waku_message,
../../../waku/v2/protocol/waku_filter,
../../../waku/v2/protocol/waku_filter/rpc,
../../../waku/v2/protocol/waku_filter/client,
../../../waku/v2/utils/peers,
@ -40,7 +41,7 @@ procSuite "Waku v2 JSON-RPC API - Filter":
await node1.mountFilter()
await node2.mountFilterClient()
node2.setFilterPeer(node1.peerInfo.toRemotePeerInfo())
node2.peerManager.addServicePeer(node1.peerInfo.toRemotePeerInfo(), WakuFilterCodec)
# RPC server setup
let

View File

@ -16,6 +16,7 @@ import
../../../waku/v2/protocol/waku_message,
../../../waku/v2/protocol/waku_archive,
../../../waku/v2/protocol/waku_archive/driver/queue_driver,
../../../waku/v2/protocol/waku_store,
../../../waku/v2/protocol/waku_store/rpc,
../../../waku/v2/utils/peers,
../../../waku/v2/utils/time,
@ -66,7 +67,7 @@ procSuite "Waku v2 JSON-RPC API - Store":
var listenSwitch = newStandardSwitch(some(key))
await listenSwitch.start()
node.setStorePeer(listenSwitch.peerInfo.toRemotePeerInfo())
node.peerManager.addServicePeer(listenSwitch.peerInfo.toRemotePeerInfo(), WakuStoreCodec)
listenSwitch.mount(node.wakuRelay)
listenSwitch.mount(node.wakuStore)

View File

@ -5,7 +5,7 @@ else:
import
std/[options, sets, sequtils, times],
std/[options, sets, sequtils, times, strutils],
chronos,
chronicles,
metrics,
@ -22,8 +22,9 @@ declareCounter waku_peers_dials, "Number of peer dials", ["outcome"]
# TODO: Populate from PeerStore.Source when ready
declarePublicCounter waku_node_conns_initiated, "Number of connections initiated", ["source"]
declarePublicGauge waku_peers_errors, "Number of peer manager errors", ["type"]
declarePublicGauge waku_connected_peers, "Number of connected peers per direction: inbound|outbound", ["direction"]
declarePublicGauge waku_connected_peers, "Number of connected peers per direction", ["direction"]
declarePublicGauge waku_peer_store_size, "Number of peers managed by the peer store"
declarePublicGauge waku_service_peers, "Service peer protocol and multiaddress ", labels = ["protocol", "peerId"]
logScope:
topics = "waku node peer_manager"
@ -61,6 +62,16 @@ type
serviceSlots*: Table[string, RemotePeerInfo]
started: bool
proc protocolMatcher*(codec: string): Matcher =
## Returns a protocol matcher function for the provided codec
proc match(proto: string): bool {.gcsafe.} =
## Matches a proto with any postfix to the provided codec.
## E.g. if the codec is `/vac/waku/filter/2.0.0` it matches the protos:
## `/vac/waku/filter/2.0.0`, `/vac/waku/filter/2.0.0-beta3`, `/vac/waku/filter/2.0.0-actualnonsense`
return proto.startsWith(codec)
return match
####################
# Helper functions #
####################
@ -244,7 +255,7 @@ proc new*(T: type PeerManager,
# Manager interface #
#####################
proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string) =
proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo) =
# Adds peer to manager for the specified protocol
if remotePeerInfo.peerId == pm.switch.peerInfo.peerId:
@ -260,14 +271,11 @@ proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string) =
# Peer already managed
return
debug "Adding peer to manager", peerId = remotePeerInfo.peerId, addresses = remotePeerInfo.addrs, proto = proto
trace "Adding peer to manager", peerId = remotePeerInfo.peerId, addresses = remotePeerInfo.addrs
pm.peerStore[AddressBook][remotePeerInfo.peerId] = remotePeerInfo.addrs
pm.peerStore[KeyBook][remotePeerInfo.peerId] = publicKey
# TODO: Remove this once service slots is ready
pm.peerStore[ProtoBook][remotePeerInfo.peerId] = pm.peerStore[ProtoBook][remotePeerInfo.peerId] & proto
# Add peer to storage. Entry will subsequently be updated with connectedness information
if not pm.storage.isNil:
pm.storage.insertOrReplace(remotePeerInfo.peerId, pm.peerStore.get(remotePeerInfo.peerId), NotConnected)
@ -279,23 +287,23 @@ proc addServicePeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: str
return
info "Adding peer to service slots", peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], service = proto
waku_service_peers.set(1, labelValues = [$proto, $remotePeerInfo.addrs[0]])
# Set peer for service slot
pm.serviceSlots[proto] = remotePeerInfo
# TODO: Remove proto once fully refactored
pm.addPeer(remotePeerInfo, proto)
pm.addPeer(remotePeerInfo)
proc reconnectPeers*(pm: PeerManager,
proto: string,
protocolMatcher: Matcher,
backoff: chronos.Duration = chronos.seconds(0)) {.async.} =
## Reconnect to peers registered for this protocol. This will update connectedness.
## Especially useful to resume connections from persistent storage after a restart.
debug "Reconnecting peers", proto=proto
for storedInfo in pm.peerStore.peers(protocolMatcher):
# Proto is not persisted, we need to iterate over all peers.
for storedInfo in pm.peerStore.peers(protocolMatcher(proto)):
# Check that the peer can be connected
if storedInfo.connectedness == CannotConnect:
debug "Not reconnecting to unreachable or non-existing peer", peerId=storedInfo.peerId
@ -332,10 +340,11 @@ proc dialPeer*(pm: PeerManager,
# Dial a given peer and add it to the list of known peers
# TODO: check peer validity and score before continuing. Limit number of peers to be managed.
# First add dialed peer info to peer store, if it does not exist yet...
# First add dialed peer info to peer store, if it does not exist yet..
# TODO: nim libp2p peerstore already adds them
if not pm.peerStore.hasPeer(remotePeerInfo.peerId, proto):
trace "Adding newly dialed peer to manager", peerId= $remotePeerInfo.peerId, address= $remotePeerInfo.addrs[0], proto= proto
pm.addPeer(remotePeerInfo, proto)
pm.addPeer(remotePeerInfo)
return await pm.dialPeer(remotePeerInfo.peerId,remotePeerInfo.addrs, proto, dialTimeout, source)
@ -380,38 +389,32 @@ proc connectToNodes*(pm: PeerManager,
# later.
await sleepAsync(chronos.seconds(5))
# Ensures a healthy amount of connected relay peers
proc relayConnectivityLoop*(pm: PeerManager) {.async.} =
debug "Starting relay connectivity loop"
while pm.started:
proc connectToRelayPeers*(pm: PeerManager) {.async.} =
let maxConnections = pm.switch.connManager.inSema.size
let numInPeers = pm.switch.connectedPeers(lpstream.Direction.In).len
let numOutPeers = pm.switch.connectedPeers(lpstream.Direction.Out).len
let numConPeers = numInPeers + numOutPeers
let maxConnections = pm.switch.connManager.inSema.size
let numInPeers = pm.switch.connectedPeers(lpstream.Direction.In).len
let numOutPeers = pm.switch.connectedPeers(lpstream.Direction.Out).len
let numConPeers = numInPeers + numOutPeers
# TODO: Enforce a given in/out peers ratio
# TODO: Enforce a given in/out peers ratio
# Leave some room for service peers
if numConPeers >= (maxConnections - 5):
return
# Leave some room for service peers
if numConPeers >= (maxConnections - 5):
await sleepAsync(ConnectivityLoopInterval)
continue
# TODO: Track only relay connections (nwaku/issues/1566)
let notConnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs))
let outsideBackoffPeers = notConnectedPeers.filterIt(pm.peerStore.canBeConnected(it.peerId,
pm.initialBackoffInSec,
pm.backoffFactor))
let numPeersToConnect = min(min(maxConnections - numConPeers, outsideBackoffPeers.len), MaxParalelDials)
let notConnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs))
let outsideBackoffPeers = notConnectedPeers.filterIt(pm.peerStore.canBeConnected(it.peerId,
pm.initialBackoffInSec,
pm.backoffFactor))
let numPeersToConnect = min(min(maxConnections - numConPeers, outsideBackoffPeers.len), MaxParalelDials)
info "Relay peer connections",
connectedPeers = numConPeers,
targetConnectedPeers = maxConnections,
notConnectedPeers = notConnectedPeers.len,
outsideBackoffPeers = outsideBackoffPeers.len
info "Relay connectivity loop",
connectedPeers = numConPeers,
targetConnectedPeers = maxConnections,
notConnectedPeers = notConnectedPeers.len,
outsideBackoffPeers = outsideBackoffPeers.len
await pm.connectToNodes(outsideBackoffPeers[0..<numPeersToConnect], WakuRelayCodec)
await sleepAsync(ConnectivityLoopInterval)
await pm.connectToNodes(outsideBackoffPeers[0..<numPeersToConnect], WakuRelayCodec)
proc prunePeerStore*(pm: PeerManager) =
let numPeers = toSeq(pm.peerStore[AddressBook].book.keys).len
@ -447,13 +450,6 @@ proc prunePeerStore*(pm: PeerManager) =
capacity = capacity,
pruned = pruned
proc prunePeerStoreLoop(pm: PeerManager) {.async.} =
while pm.started:
pm.prunePeerStore()
await sleepAsync(PrunePeerStoreInterval)
proc selectPeer*(pm: PeerManager, proto: string): Option[RemotePeerInfo] =
debug "Selecting peer from peerstore", protocol=proto
@ -481,6 +477,20 @@ proc selectPeer*(pm: PeerManager, proto: string): Option[RemotePeerInfo] =
debug "No peer found for protocol", protocol=proto
return none(RemotePeerInfo)
# Prunes peers from peerstore to remove old/stale ones
proc prunePeerStoreLoop(pm: PeerManager) {.async.} =
debug "Starting prune peerstore loop"
while pm.started:
pm.prunePeerStore()
await sleepAsync(PrunePeerStoreInterval)
# Ensures a healthy amount of connected relay peers
proc relayConnectivityLoop*(pm: PeerManager) {.async.} =
debug "Starting relay connectivity loop"
while pm.started:
await pm.connectToRelayPeers()
await sleepAsync(ConnectivityLoopInterval)
proc start*(pm: PeerManager) =
pm.started = true
asyncSpawn pm.relayConnectivityLoop()

View File

@ -104,17 +104,6 @@ type
announcedAddresses* : seq[MultiAddress]
started*: bool # Indicates that node has started listening
proc protocolMatcher*(codec: string): Matcher =
## Returns a protocol matcher function for the provided codec
proc match(proto: string): bool {.gcsafe.} =
## Matches a proto with any postfix to the provided codec.
## E.g. if the codec is `/vac/waku/filter/2.0.0` it matches the protos:
## `/vac/waku/filter/2.0.0`, `/vac/waku/filter/2.0.0-beta3`, `/vac/waku/filter/2.0.0-actualnonsense`
return proto.startsWith(codec)
return match
template ip4TcpEndPoint(address, port): MultiAddress =
MultiAddress.init(address, tcpProtocol, port)
@ -516,7 +505,6 @@ proc startRelay*(node: WakuNode) {.async.} =
let backoffPeriod = node.wakuRelay.parameters.pruneBackoff + chronos.seconds(BackoffSlackTime)
await node.peerManager.reconnectPeers(WakuRelayCodec,
protocolMatcher(WakuRelayCodec),
backoffPeriod)
# Start the WakuRelay protocol
@ -633,22 +621,6 @@ proc filterUnsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics:
error "failed filter unsubscription", error=unsubRes.error
waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"])
# TODO: Move to application module (e.g., wakunode2.nim)
proc setFilterPeer*(node: WakuNode, peer: RemotePeerInfo|string) {.raises: [Defect, ValueError, LPError],
deprecated: "Use the explicit destination peer procedures".} =
if node.wakuFilterClient.isNil():
error "could not set peer, waku filter client is nil"
return
info "seting filter client peer", peer=peer
let remotePeer = when peer is string: parseRemotePeerInfo(peer)
else: peer
node.peerManager.addPeer(remotePeer, WakuFilterCodec)
waku_filter_peers.inc()
# TODO: Move to application module (e.g., wakunode2.nim)
proc subscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: ContentTopic|seq[ContentTopic], handler: FilterPushHandler) {.async, gcsafe,
deprecated: "Use the explicit destination peer procedure. Use 'node.filterSubscribe()' instead.".} =
@ -808,21 +780,6 @@ proc query*(node: WakuNode, query: HistoryQuery, peer: RemotePeerInfo): Future[W
return ok(response)
# TODO: Move to application module (e.g., wakunode2.nim)
proc setStorePeer*(node: WakuNode, peer: RemotePeerInfo|string) {.raises: [Defect, ValueError, LPError],
deprecated: "Use 'node.query()' with peer destination instead".} =
if node.wakuStoreClient.isNil():
error "could not set peer, waku store client is nil"
return
info "set store peer", peer=peer
let remotePeer = when peer is string: parseRemotePeerInfo(peer)
else: peer
node.peerManager.addPeer(remotePeer, WakuStoreCodec)
waku_store_peers.inc()
# TODO: Move to application module (e.g., wakunode2.nim)
proc query*(node: WakuNode, query: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe,
deprecated: "Use 'node.query()' with peer destination instead".} =
@ -901,17 +858,6 @@ proc lightpushPublish*(node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMe
return await node.wakuLightpushClient.publish(pubsubTopic, message, peer)
# TODO: Move to application module (e.g., wakunode2.nim)
proc setLightPushPeer*(node: WakuNode, peer: RemotePeerInfo|string) {.raises: [Defect, ValueError, LPError],
deprecated: "Use 'node.lightpushPublish()' instead".} =
debug "seting lightpush client peer", peer=peer
let remotePeer = when peer is string: parseRemotePeerInfo(peer)
else: peer
node.peerManager.addPeer(remotePeer, WakuLightPushCodec)
waku_lightpush_peers.inc()
# TODO: Move to application module (e.g., wakunode2.nim)
proc lightpushPublish*(node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMessage): Future[void] {.async, gcsafe,
deprecated: "Use 'node.lightpushPublish()' instead".} =
@ -977,7 +923,7 @@ proc fetchPeerExchangePeers*(node: Wakunode, amount: uint64) {.async, raises: [D
var record: enr.Record
if enr.fromBytes(record, pi.enr):
# TODO: Add source: PX
node.peerManager.addPeer(record.toRemotePeerInfo().get, WakuRelayCodec)
node.peerManager.addPeer(record.toRemotePeerInfo().get)
validPeers += 1
info "Retrieved peer info via peer exchange protocol", validPeers = validPeers
else:
@ -1062,8 +1008,7 @@ proc runDiscv5Loop(node: WakuNode) {.async.} =
# Add all peers, new ones and already seen (in case their addresses changed)
for peer in discoveredPeers:
# TODO: proto: WakuRelayCodec will be removed from add peer
node.peerManager.addPeer(peer, WakuRelayCodec)
node.peerManager.addPeer(peer)
# Discovery `queryRandom` can have a synchronous fast path for example
# when no peers are in the routing table. Don't run it in continuous loop.

View File

@ -238,7 +238,7 @@ proc init*(wakuSwap: WakuSwap) =
proc credit(peerId: PeerID, n: int)
{.gcsafe, closure, raises: [Defect, KeyError, Exception].} =
info "Crediting peer: ", peer=peerId, amount=n
if wakuSwap.accounting.hasKey(peerId):
wakuSwap.accounting[peerId] -= n
@ -250,7 +250,7 @@ proc init*(wakuSwap: WakuSwap) =
# TODO Debit and credit here for Karma asset
proc debit(peerId: PeerID, n: int)
{.gcsafe, closure, raises: [Defect, KeyError, Exception].} =
info "Debiting peer: ", peer=peerId, amount=n
if wakuSwap.accounting.hasKey(peerId):
wakuSwap.accounting[peerId] += n
@ -258,10 +258,10 @@ proc init*(wakuSwap: WakuSwap) =
wakuSwap.accounting[peerId] = n
info "Accounting state", accounting = wakuSwap.accounting[peerId]
wakuSwap.applyPolicy(peerId)
proc applyPolicy(peerId: PeerID)
{.gcsafe, closure, raises: [Defect, KeyError, Exception].} =
# TODO Separate out depending on if policy is soft (accounting only) mock (send cheque but don't cash/verify) hard (actually send funds over testnet)
#Check if the Disconnect Threshold has been hit. Account Balance nears the disconnectThreshold after a Credit has been done
@ -290,10 +290,10 @@ proc init*(wakuSwap: WakuSwap) =
# TODO Expression return?
proc init*(T: type WakuSwap, peerManager: PeerManager, rng: ref rand.HmacDrbgContext, swapConfig: SwapConfig): T =
info "wakuSwap init 2"
let
let
accounting = initTable[PeerId, int]()
text = "test"
var ws = WakuSwap(rng: rng,
peerManager: peerManager,
accounting: accounting,
@ -303,8 +303,4 @@ proc init*(T: type WakuSwap, peerManager: PeerManager, rng: ref rand.HmacDrbgCon
return ws
proc setPeer*(ws: WakuSwap, peer: RemotePeerInfo) =
ws.peerManager.addPeer(peer, WakuSwapCodec)
waku_swap_peers_count.inc()
# TODO End to end communication