Minimal accounting state PoC (#278)

* Add basic WakuSwap type and init

* Mount swap protocol and keep accounting state

* Flags off by default
This commit is contained in:
Oskar Thorén 2020-11-18 20:45:51 +08:00 committed by GitHub
parent e68fe50bc1
commit 010408aadb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 57 additions and 8 deletions

View File

@ -213,6 +213,11 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
echo &"{payload}" echo &"{payload}"
info "Hit store handler" info "Hit store handler"
# TODO Use same flag as wakunode
# To be fixed here: https://github.com/status-im/nim-waku/issues/271
if false:
node.mountSwap()
await node.query(HistoryQuery(topics: @[DefaultContentTopic]), storeHandler) await node.query(HistoryQuery(topics: @[DefaultContentTopic]), storeHandler)
if conf.filternode != "": if conf.filternode != "":

View File

@ -22,6 +22,8 @@ logScope:
# Default clientId # Default clientId
const clientId* = "Nimbus Waku v2 node" const clientId* = "Nimbus Waku v2 node"
# TODO Toggle
# To be fixed here: https://github.com/status-im/nim-waku/issues/271
const SWAPAccountingEnabled* = false const SWAPAccountingEnabled* = false
# key and crypto modules different # key and crypto modules different
@ -225,7 +227,7 @@ proc query*(node: WakuNode, query: HistoryQuery, handler: QueryHandlerFunc) {.as
if SWAPAccountingEnabled: if SWAPAccountingEnabled:
debug "Using SWAPAccounting query" debug "Using SWAPAccounting query"
await node.wakuStore.queryWithAccounting(query, handler, accountFor) await node.wakuStore.queryWithAccounting(query, handler, node.wakuSwap)
# TODO Extend with more relevant info: topics, peers, memory usage, online time, etc # TODO Extend with more relevant info: topics, peers, memory usage, online time, etc
proc info*(node: WakuNode): WakuInfo = proc info*(node: WakuNode): WakuInfo =
@ -257,6 +259,13 @@ proc mountStore*(node: WakuNode, store: MessageStore = nil) =
node.switch.mount(node.wakuStore) node.switch.mount(node.wakuStore)
node.subscriptions.subscribe(WakuStoreCodec, node.wakuStore.subscription()) node.subscriptions.subscribe(WakuStoreCodec, node.wakuStore.subscription())
proc mountSwap*(node: WakuNode) =
info "mounting swap"
node.wakuSwap = WakuSwap.init(node.switch, node.rng)
node.switch.mount(node.wakuSwap)
# NYI - Do we need this?
#node.subscriptions.subscribe(WakuSwapCodec, node.wakuSwap.subscription())
proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string]()) {.async, gcsafe.} = proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string]()) {.async, gcsafe.} =
let wakuRelay = WakuRelay.init( let wakuRelay = WakuRelay.init(
switch = node.switch, switch = node.switch,
@ -390,8 +399,7 @@ when isMainModule:
# TODO Move to conf # TODO Move to conf
if SWAPAccountingEnabled: if SWAPAccountingEnabled:
info "SWAP Accounting enabled" info "SWAP Accounting enabled"
# TODO Mount SWAP protocol mountSwap(node)
# TODO Enable account module
if conf.store: if conf.store:
var store: MessageStore var store: MessageStore

View File

@ -356,7 +356,7 @@ proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.asyn
# NOTE: Experimental, maybe incorporate as part of query call # NOTE: Experimental, maybe incorporate as part of query call
proc queryWithAccounting*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, proc queryWithAccounting*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc,
accountFor: AccountUpdateFunc) {.async, gcsafe.} = wakuSwap: WakuSwap) {.async, gcsafe.} =
# @TODO We need to be more stratigic about which peers we dial. Right now we just set one on the service. # @TODO We need to be more stratigic about which peers we dial. Right now we just set one on the service.
# Ideally depending on the query and our set of peers we take a subset of ideal peers. # Ideally depending on the query and our set of peers we take a subset of ideal peers.
# This will require us to check for various factors such as: # This will require us to check for various factors such as:
@ -381,6 +381,6 @@ proc queryWithAccounting*(w: WakuStore, query: HistoryQuery, handler: QueryHandl
# if SWAPAccountingEnabled: # if SWAPAccountingEnabled:
let peerId = peer.peerInfo.peerId let peerId = peer.peerInfo.peerId
let messages = response.value.response.messages let messages = response.value.response.messages
accountFor(peerId, messages.len) wakuSwap.accountFor(peerId, messages.len)
handler(response.value.response) handler(response.value.response)

View File

@ -76,8 +76,33 @@ proc init*(T: type Cheque, buffer: seq[byte]): ProtoResult[T] =
# Accounting # Accounting
# #
proc accountFor*(peerId: PeerId, n: int) {.gcsafe.} = proc init*(wakuSwap: WakuSwap) =
info "Accounting for", peerId, n info "wakuSwap init 1"
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
info "NYI swap handle incoming connection"
proc accountFor(peerId: PeerId, n: int) {.gcsafe, closure.} =
info "Accounting for", peerId, n
info "Accounting test", text = wakuSwap.text
# Nicer way to write this?
if wakuSwap.accounting.hasKey(peerId):
wakuSwap.accounting[peerId] += n
else:
wakuSwap.accounting[peerId] = n
info "Accounting state", accounting = wakuSwap.accounting[peerId]
wakuSwap.handler = handle
wakuSwap.codec = WakuSwapCodec
wakuSwap.accountFor = accountFor
proc init*(T: type WakuSwap, switch: Switch, rng: ref BrHmacDrbgContext): T =
info "wakuSwap init 2"
new result
result.rng = rng
result.switch = switch
result.accounting = initTable[PeerId, int]()
result.text = "test"
result.init()
# TODO End to end communication # TODO End to end communication
# TODO Better state management (STDOUT for now)

View File

@ -134,12 +134,23 @@ type
# @TODO MAYBE MORE INFO? # @TODO MAYBE MORE INFO?
Filters* = Table[string, Filter] Filters* = Table[string, Filter]
AccountHandler* = proc (peerId: PeerId, amount: int) {.gcsafe, closure.}
WakuSwap* = ref object of LPProtocol
switch*: Switch
rng*: ref BrHmacDrbgContext
#peers*: seq[PeerInfo]
text*: string
accounting*: Table[PeerId, int]
accountFor*: AccountHandler
# NOTE based on Eth2Node in NBC eth2_network.nim # NOTE based on Eth2Node in NBC eth2_network.nim
WakuNode* = ref object of RootObj WakuNode* = ref object of RootObj
switch*: Switch switch*: Switch
wakuRelay*: WakuRelay wakuRelay*: WakuRelay
wakuStore*: WakuStore wakuStore*: WakuStore
wakuFilter*: WakuFilter wakuFilter*: WakuFilter
wakuSwap*: WakuSwap
peerInfo*: PeerInfo peerInfo*: PeerInfo
libp2pTransportLoops*: seq[Future[void]] libp2pTransportLoops*: seq[Future[void]]
# TODO Revist messages field indexing as well as if this should be Message or WakuMessage # TODO Revist messages field indexing as well as if this should be Message or WakuMessage