From b6b90f6f544dd858de5f87ee24f10ee8f3dc5130 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Oskar=20Thor=C3=A9n?= Date: Tue, 24 Nov 2020 12:53:42 +0800 Subject: [PATCH] Accounting credit receiving node (#292) * Accounting WIP: Swap access through Store * Fix order bug and comment scenario + typo * WIP * Accounting: Account for receiving store node - Turn accountFor function into credit and debit - Misc formatting * Accounting: Fix bugs related to mount and test * Accounting: Simplify query signature We already have a ref to wakuSwap through wakuStore now. * Resolve rebase issues --- examples/v2/chat2.nim | 6 ++-- tests/v2/test_waku_swap.nim | 11 +++---- waku/v2/node/wakunode2.nim | 27 ++++++++++++----- waku/v2/protocol/waku_store/waku_store.nim | 29 ++++++++++++++----- .../protocol/waku_store/waku_store_types.nim | 4 ++- waku/v2/protocol/waku_swap/waku_swap.nim | 27 ++++++++++++----- .../v2/protocol/waku_swap/waku_swap_types.nim | 6 ++-- 7 files changed, 74 insertions(+), 36 deletions(-) diff --git a/examples/v2/chat2.nim b/examples/v2/chat2.nim index 55607a252..dbe0e7169 100644 --- a/examples/v2/chat2.nim +++ b/examples/v2/chat2.nim @@ -203,6 +203,9 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} = let listenStr = $peerInfo.addrs[0] & "/p2p/" & $peerInfo.peerId echo &"Listening on\n {listenStr}" + if conf.swap: + node.mountSwap() + if conf.storenode != "": node.mountStore() @@ -214,9 +217,6 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} = echo &"{payload}" info "Hit store handler" - if conf.swap: - node.mountSwap() - await node.query(HistoryQuery(topics: @[DefaultContentTopic]), storeHandler) if conf.filternode != "": diff --git a/tests/v2/test_waku_swap.nim b/tests/v2/test_waku_swap.nim index 9ab9ccec9..0c57e6e6c 100644 --- a/tests/v2/test_waku_swap.nim +++ b/tests/v2/test_waku_swap.nim @@ -61,11 +61,11 @@ procSuite "Waku SWAP Accounting": # Start nodes and mount protocols await node1.start() + node1.mountSwap() node1.mountStore() - node1.mountSwap() await node2.start() + node2.mountSwap() node2.mountStore() - node1.mountSwap() await node2.subscriptions.notify("/waku/2/default-waku/proto", message) @@ -81,13 +81,10 @@ procSuite "Waku SWAP Accounting": await node1.query(HistoryQuery(topics: @[contentTopic]), storeHandler) - # TODO Other node accounting field not set - # info "node2", msgs = node2.wakuSwap.accounting # crashes - # node2.wakuSwap.accounting[node1.peerInfo.peerId] = -1 - check: (await completionFut.withTimeout(5.seconds)) == true - # Accounting table updated with one message credit + # Accounting table updated with credit and debit, respectively node1.wakuSwap.accounting[node2.peerInfo.peerId] == 1 + node2.wakuSwap.accounting[node1.peerInfo.peerId] == -1 await node1.stop() await node2.stop() diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index efb649e7d..b1032fff4 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -238,12 +238,14 @@ proc query*(node: WakuNode, query: HistoryQuery, handler: QueryHandlerFunc) {.as ## ## Status: Implemented. + # TODO Once waku swap is less experimental, this can simplified if node.wakuSwap.isNil: debug "Using default query" await node.wakuStore.query(query, handler) else: debug "Using SWAPAccounting query" - await node.wakuStore.queryWithAccounting(query, handler, node.wakuSwap) + # TODO wakuSwap now part of wakuStore object + await node.wakuStore.queryWithAccounting(query, handler) # TODO Extend with more relevant info: topics, peers, memory usage, online time, etc proc info*(node: WakuNode): WakuInfo = @@ -269,12 +271,8 @@ proc mountFilter*(node: WakuNode) = node.switch.mount(node.wakuFilter) node.subscriptions.subscribe(WakuFilterCodec, node.wakuFilter.subscription()) -proc mountStore*(node: WakuNode, store: MessageStore = nil) = - info "mounting store" - node.wakuStore = WakuStore.init(node.switch, node.rng, store) - node.switch.mount(node.wakuStore) - node.subscriptions.subscribe(WakuStoreCodec, node.wakuStore.subscription()) - +# NOTE: If using the swap protocol, it must be mounted before store. This is +# because store is using a reference to the swap protocol. proc mountSwap*(node: WakuNode) = info "mounting swap" node.wakuSwap = WakuSwap.init(node.switch, node.rng) @@ -282,6 +280,19 @@ proc mountSwap*(node: WakuNode) = # NYI - Do we need this? #node.subscriptions.subscribe(WakuSwapCodec, node.wakuSwap.subscription()) +proc mountStore*(node: WakuNode, store: MessageStore = nil) = + info "mounting store" + + if node.wakuSwap.isNil: + debug "mounting store without swap" + node.wakuStore = WakuStore.init(node.switch, node.rng, store) + else: + debug "mounting store with swap" + node.wakuStore = WakuStore.init(node.switch, node.rng, store, node.wakuSwap) + + node.switch.mount(node.wakuStore) + node.subscriptions.subscribe(WakuStoreCodec, node.wakuStore.subscription()) + proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string]()) {.async, gcsafe.} = let wakuRelay = WakuRelay.init( switch = node.switch, @@ -430,7 +441,7 @@ when isMainModule: store = res.value mountStore(node, store) - + if conf.filter: mountFilter(node) diff --git a/waku/v2/protocol/waku_store/waku_store.nim b/waku/v2/protocol/waku_store/waku_store.nim index b6b5416ec..df23b15f0 100644 --- a/waku/v2/protocol/waku_store/waku_store.nim +++ b/waku/v2/protocol/waku_store/waku_store.nim @@ -309,6 +309,18 @@ method init*(ws: WakuStore) = let value = res.value let response = ws.findMessages(res.value.query) + + # TODO Do accounting here, response is HistoryResponse + # How do we get node or swap context? + if not ws.wakuSwap.isNil: + info "handle store swap test", text=ws.wakuSwap.text + # NOTE Perform accounting operation + let peerId = conn.peerInfo.peerId + let messages = response.messages + ws.wakuSwap.credit(peerId, messages.len) + else: + info "handle store swap is nil" + await conn.writeLp(HistoryRPC(requestId: value.requestId, response: response).encode().buffer) @@ -325,11 +337,13 @@ method init*(ws: WakuStore) = if res.isErr: warn "failed to load messages from store", err = res.error -proc init*(T: type WakuStore, switch: Switch, rng: ref BrHmacDrbgContext, store: MessageStore = nil): T = +proc init*(T: type WakuStore, switch: Switch, rng: ref BrHmacDrbgContext, + store: MessageStore = nil, wakuSwap: WakuSwap = nil): T = new result result.rng = rng result.switch = switch result.store = store + result.wakuSwap = wakuSwap result.init() # @TODO THIS SHOULD PROBABLY BE AN ADD FUNCTION AND APPEND THE PEER TO AN ARRAY @@ -377,8 +391,7 @@ proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.asyn handler(response.value.response) # NOTE: Experimental, maybe incorporate as part of query call -proc queryWithAccounting*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, - wakuSwap: WakuSwap) {.async, gcsafe.} = +proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} = # @TODO We need to be more stratigic about which peers we dial. Right now we just set one on the service. # Ideally depending on the query and our set of peers we take a subset of ideal peers. # This will require us to check for various factors such as: @@ -386,10 +399,10 @@ proc queryWithAccounting*(w: WakuStore, query: HistoryQuery, handler: QueryHandl # - latency? # - default store peer? - let peer = w.peers[0] - let conn = await w.switch.dial(peer.peerInfo.peerId, peer.peerInfo.addrs, WakuStoreCodec) + let peer = ws.peers[0] + let conn = await ws.switch.dial(peer.peerInfo.peerId, peer.peerInfo.addrs, WakuStoreCodec) - await conn.writeLP(HistoryRPC(requestId: generateRequestId(w.rng), + await conn.writeLP(HistoryRPC(requestId: generateRequestId(ws.rng), query: query).encode().buffer) var message = await conn.readLp(64*1024) @@ -400,9 +413,9 @@ proc queryWithAccounting*(w: WakuStore, query: HistoryQuery, handler: QueryHandl return # NOTE Perform accounting operation - # if SWAPAccountingEnabled: + # Assumes wakuSwap protocol is mounted let peerId = peer.peerInfo.peerId let messages = response.value.response.messages - wakuSwap.accountFor(peerId, messages.len) + ws.wakuSwap.debit(peerId, messages.len) handler(response.value.response) diff --git a/waku/v2/protocol/waku_store/waku_store_types.nim b/waku/v2/protocol/waku_store/waku_store_types.nim index 691f486fa..8a4e7741f 100644 --- a/waku/v2/protocol/waku_store/waku_store_types.nim +++ b/waku/v2/protocol/waku_store/waku_store_types.nim @@ -4,7 +4,8 @@ import bearssl, stew/[byteutils, endians2], libp2p/[switch, peerinfo], libp2p/protocols/protocol, - ../../waku_types + ../../waku_types, + ../waku_swap/waku_swap_types type QueryHandlerFunc* = proc(response: HistoryResponse) {.gcsafe, closure.} @@ -47,3 +48,4 @@ type peers*: seq[HistoryPeer] messages*: seq[IndexedWakuMessage] store*: MessageStore + wakuSwap*: WakuSwap diff --git a/waku/v2/protocol/waku_swap/waku_swap.nim b/waku/v2/protocol/waku_swap/waku_swap.nim index f4eb67320..78a31a47e 100644 --- a/waku/v2/protocol/waku_swap/waku_swap.nim +++ b/waku/v2/protocol/waku_swap/waku_swap.nim @@ -40,6 +40,8 @@ logScope: const WakuSwapCodec* = "/vac/waku/swap/2.0.0-alpha1" +# Serialization +# ------------------------------------------------------------------------------- proc encode*(handshake: Handshake): ProtoBuffer = result = initProtoBuffer() result.write(1, handshake.beneficiary) @@ -72,19 +74,29 @@ proc init*(T: type Cheque, buffer: seq[byte]): ProtoResult[T] = ok(cheque) - # Accounting +# ------------------------------------------------------------------------------- # +# We credit and debits peers based on what for now is a form of Karma asset. + +# TODO Test for credit/debit operations in succession proc init*(wakuSwap: WakuSwap) = info "wakuSwap init 1" proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = info "NYI swap handle incoming connection" - proc accountFor(peerId: PeerId, n: int) {.gcsafe, closure.} = - info "Accounting for", peerId, n - info "Accounting test", text = wakuSwap.text - # Nicer way to write this? + proc credit(peerId: PeerId, n: int) {.gcsafe, closure.} = + info "Crediting peer for", peerId, n + if wakuSwap.accounting.hasKey(peerId): + wakuSwap.accounting[peerId] -= n + else: + wakuSwap.accounting[peerId] = -n + info "Accounting state", accounting = wakuSwap.accounting[peerId] + + # TODO Debit and credit here for Karma asset + proc debit(peerId: PeerId, n: int) {.gcsafe, closure.} = + info "Debiting peer for", peerId, n if wakuSwap.accounting.hasKey(peerId): wakuSwap.accounting[peerId] += n else: @@ -93,9 +105,10 @@ proc init*(wakuSwap: WakuSwap) = wakuSwap.handler = handle wakuSwap.codec = WakuSwapCodec - wakuSwap.accountFor = accountFor - + wakuSwap.credit = credit + wakuSwap.debit = debit +# TODO Expression return? proc init*(T: type WakuSwap, switch: Switch, rng: ref BrHmacDrbgContext): T = info "wakuSwap init 2" new result diff --git a/waku/v2/protocol/waku_swap/waku_swap_types.nim b/waku/v2/protocol/waku_swap/waku_swap_types.nim index 1e7c76d66..7548d8778 100644 --- a/waku/v2/protocol/waku_swap/waku_swap_types.nim +++ b/waku/v2/protocol/waku_swap/waku_swap_types.nim @@ -17,7 +17,8 @@ type date*: uint32 amount*: uint32 - AccountHandler* = proc (peerId: PeerId, amount: int) {.gcsafe, closure.} + CreditHandler* = proc (peerId: PeerId, amount: int) {.gcsafe, closure.} + DebitHandler* = proc (peerId: PeerId, amount: int) {.gcsafe, closure.} WakuSwap* = ref object of LPProtocol switch*: Switch @@ -25,4 +26,5 @@ type #peers*: seq[PeerInfo] text*: string accounting*: Table[PeerId, int] - accountFor*: AccountHandler + credit*: CreditHandler + debit*: DebitHandler