From 010408aadb89781dc3cc3b2c741c3b5fe78c73ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Oskar=20Thor=C3=A9n?= Date: Wed, 18 Nov 2020 20:45:51 +0800 Subject: [PATCH] Minimal accounting state PoC (#278) * Add basic WakuSwap type and init * Mount swap protocol and keep accounting state * Flags off by default --- examples/v2/chat2.nim | 5 +++++ waku/v2/node/wakunode2.nim | 14 +++++++++++--- waku/v2/protocol/waku_store.nim | 4 ++-- waku/v2/protocol/waku_swap.nim | 31 ++++++++++++++++++++++++++++--- waku/v2/waku_types.nim | 11 +++++++++++ 5 files changed, 57 insertions(+), 8 deletions(-) diff --git a/examples/v2/chat2.nim b/examples/v2/chat2.nim index 3ff99b762..5cec2152d 100644 --- a/examples/v2/chat2.nim +++ b/examples/v2/chat2.nim @@ -213,6 +213,11 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} = echo &"{payload}" info "Hit store handler" + # TODO Use same flag as wakunode + # To be fixed here: https://github.com/status-im/nim-waku/issues/271 + if false: + node.mountSwap() + await node.query(HistoryQuery(topics: @[DefaultContentTopic]), storeHandler) if conf.filternode != "": diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index 8fe5c143d..d1041336d 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -22,6 +22,8 @@ logScope: # Default clientId const clientId* = "Nimbus Waku v2 node" +# TODO Toggle +# To be fixed here: https://github.com/status-im/nim-waku/issues/271 const SWAPAccountingEnabled* = false # key and crypto modules different @@ -225,7 +227,7 @@ proc query*(node: WakuNode, query: HistoryQuery, handler: QueryHandlerFunc) {.as if SWAPAccountingEnabled: debug "Using SWAPAccounting query" - await node.wakuStore.queryWithAccounting(query, handler, accountFor) + await node.wakuStore.queryWithAccounting(query, handler, node.wakuSwap) # TODO Extend with more relevant info: topics, peers, memory usage, online time, etc proc info*(node: WakuNode): WakuInfo = @@ -257,6 +259,13 @@ proc mountStore*(node: WakuNode, store: MessageStore = nil) = node.switch.mount(node.wakuStore) node.subscriptions.subscribe(WakuStoreCodec, node.wakuStore.subscription()) +proc mountSwap*(node: WakuNode) = + info "mounting swap" + node.wakuSwap = WakuSwap.init(node.switch, node.rng) + node.switch.mount(node.wakuSwap) + # NYI - Do we need this? + #node.subscriptions.subscribe(WakuSwapCodec, node.wakuSwap.subscription()) + proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string]()) {.async, gcsafe.} = let wakuRelay = WakuRelay.init( switch = node.switch, @@ -390,8 +399,7 @@ when isMainModule: # TODO Move to conf if SWAPAccountingEnabled: info "SWAP Accounting enabled" - # TODO Mount SWAP protocol - # TODO Enable account module + mountSwap(node) if conf.store: var store: MessageStore diff --git a/waku/v2/protocol/waku_store.nim b/waku/v2/protocol/waku_store.nim index c2b4a7ffe..5b4026abb 100644 --- a/waku/v2/protocol/waku_store.nim +++ b/waku/v2/protocol/waku_store.nim @@ -356,7 +356,7 @@ proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.asyn # NOTE: Experimental, maybe incorporate as part of query call proc queryWithAccounting*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, - accountFor: AccountUpdateFunc) {.async, gcsafe.} = + wakuSwap: WakuSwap) {.async, gcsafe.} = # @TODO We need to be more stratigic about which peers we dial. Right now we just set one on the service. # Ideally depending on the query and our set of peers we take a subset of ideal peers. # This will require us to check for various factors such as: @@ -381,6 +381,6 @@ proc queryWithAccounting*(w: WakuStore, query: HistoryQuery, handler: QueryHandl # if SWAPAccountingEnabled: let peerId = peer.peerInfo.peerId let messages = response.value.response.messages - accountFor(peerId, messages.len) + wakuSwap.accountFor(peerId, messages.len) handler(response.value.response) diff --git a/waku/v2/protocol/waku_swap.nim b/waku/v2/protocol/waku_swap.nim index 550054a8d..a0f64063c 100644 --- a/waku/v2/protocol/waku_swap.nim +++ b/waku/v2/protocol/waku_swap.nim @@ -76,8 +76,33 @@ proc init*(T: type Cheque, buffer: seq[byte]): ProtoResult[T] = # Accounting # -proc accountFor*(peerId: PeerId, n: int) {.gcsafe.} = - info "Accounting for", peerId, n +proc init*(wakuSwap: WakuSwap) = + info "wakuSwap init 1" + proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = + info "NYI swap handle incoming connection" + + proc accountFor(peerId: PeerId, n: int) {.gcsafe, closure.} = + info "Accounting for", peerId, n + info "Accounting test", text = wakuSwap.text + # Nicer way to write this? + if wakuSwap.accounting.hasKey(peerId): + wakuSwap.accounting[peerId] += n + else: + wakuSwap.accounting[peerId] = n + info "Accounting state", accounting = wakuSwap.accounting[peerId] + + wakuSwap.handler = handle + wakuSwap.codec = WakuSwapCodec + wakuSwap.accountFor = accountFor + + +proc init*(T: type WakuSwap, switch: Switch, rng: ref BrHmacDrbgContext): T = + info "wakuSwap init 2" + new result + result.rng = rng + result.switch = switch + result.accounting = initTable[PeerId, int]() + result.text = "test" + result.init() # TODO End to end communication -# TODO Better state management (STDOUT for now) diff --git a/waku/v2/waku_types.nim b/waku/v2/waku_types.nim index 3bfb60160..7f71e95e3 100644 --- a/waku/v2/waku_types.nim +++ b/waku/v2/waku_types.nim @@ -134,12 +134,23 @@ type # @TODO MAYBE MORE INFO? Filters* = Table[string, Filter] + AccountHandler* = proc (peerId: PeerId, amount: int) {.gcsafe, closure.} + + WakuSwap* = ref object of LPProtocol + switch*: Switch + rng*: ref BrHmacDrbgContext + #peers*: seq[PeerInfo] + text*: string + accounting*: Table[PeerId, int] + accountFor*: AccountHandler + # NOTE based on Eth2Node in NBC eth2_network.nim WakuNode* = ref object of RootObj switch*: Switch wakuRelay*: WakuRelay wakuStore*: WakuStore wakuFilter*: WakuFilter + wakuSwap*: WakuSwap peerInfo*: PeerInfo libp2pTransportLoops*: seq[Future[void]] # TODO Revist messages field indexing as well as if this should be Message or WakuMessage