From d46f357495ef68e8cd381b00e131478244d1698f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Oskar=20Thor=C3=A9n?= Date: Mon, 16 Nov 2020 17:55:49 +0800 Subject: [PATCH] Minimal accounting (#264) * Minimal accounting Start of accounting PoC and keeping track of balance with peer - Untested - Flag should be off by default - should be put in a separaet module * Move accountFor to right place * Accounting Use length of messages * Disable flag by default * Move account update func sig --- waku/node/v2/waku_types.nim | 3 +++ waku/node/v2/wakunode2.nim | 15 ++++++++++++++- waku/protocol/v2/waku_store.nim | 31 +++++++++++++++++++++++++++++++ waku/protocol/v2/waku_swap.nim | 10 ++++++++++ 4 files changed, 58 insertions(+), 1 deletion(-) diff --git a/waku/node/v2/waku_types.nim b/waku/node/v2/waku_types.nim index d52820306..3bfb60160 100644 --- a/waku/node/v2/waku_types.nim +++ b/waku/node/v2/waku_types.nim @@ -169,6 +169,9 @@ type date*: uint32 amount*: uint32 + AccountUpdateFunc* = proc(peerId: PeerId, amount: int) {.gcsafe.} + + # Encoding and decoding ------------------------------------------------------- # TODO Move out to to waku_message module # Possibly same with util functions diff --git a/waku/node/v2/wakunode2.nim b/waku/node/v2/wakunode2.nim index 5dc6542a1..023942f50 100644 --- a/waku/node/v2/wakunode2.nim +++ b/waku/node/v2/wakunode2.nim @@ -10,7 +10,7 @@ import libp2p/protocols/pubsub/pubsub, libp2p/peerinfo, libp2p/standard_setup, - ../../protocol/v2/[waku_relay, waku_store, waku_filter, message_notifier], + ../../protocol/v2/[waku_relay, waku_store, waku_filter, waku_swap, message_notifier], ./waku_types, ./message_store export waku_types @@ -21,6 +21,8 @@ logScope: # Default clientId const clientId* = "Nimbus Waku v2 node" +const SWAPAccountingEnabled* = false + # key and crypto modules different type KeyPair* = crypto.KeyPair @@ -196,6 +198,7 @@ proc unsubscribe*(node: WakuNode, request: FilterRequest) {.async, gcsafe.} = await node.wakuFilter.unsubscribe(request) node.filters.removeContentFilters(request.contentFilters) + proc publish*(node: WakuNode, topic: Topic, message: WakuMessage) = ## Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a ## `contentTopic` field for light node functionality. This field may be also @@ -219,6 +222,10 @@ proc query*(node: WakuNode, query: HistoryQuery, handler: QueryHandlerFunc) {.as ## Status: Implemented. await node.wakuStore.query(query, handler) + if SWAPAccountingEnabled: + debug "Using SWAPAccounting query" + await node.wakuStore.queryWithAccounting(query, handler, accountFor) + # TODO Extend with more relevant info: topics, peers, memory usage, online time, etc proc info*(node: WakuNode): WakuInfo = ## Returns information about the Node, such as what multiaddress it can be reached at. @@ -378,6 +385,12 @@ when isMainModule: waitFor node.start() + # TODO Move to conf + if SWAPAccountingEnabled: + info "SWAP Accounting enabled" + # TODO Mount SWAP protocol + # TODO Enable account module + if conf.store: var store: MessageStore diff --git a/waku/protocol/v2/waku_store.nim b/waku/protocol/v2/waku_store.nim index c7d4ef315..f60e55e8e 100644 --- a/waku/protocol/v2/waku_store.nim +++ b/waku/protocol/v2/waku_store.nim @@ -352,3 +352,34 @@ proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.asyn return handler(response.value.response) + +# NOTE: Experimental, maybe incorporate as part of query call +proc queryWithAccounting*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, + accountFor: AccountUpdateFunc) {.async, gcsafe.} = + # @TODO We need to be more stratigic about which peers we dial. Right now we just set one on the service. + # Ideally depending on the query and our set of peers we take a subset of ideal peers. + # This will require us to check for various factors such as: + # - which topics they track + # - latency? + # - default store peer? + + let peer = w.peers[0] + let conn = await w.switch.dial(peer.peerInfo.peerId, peer.peerInfo.addrs, WakuStoreCodec) + + await conn.writeLP(HistoryRPC(requestId: generateRequestId(w.rng), + query: query).encode().buffer) + + var message = await conn.readLp(64*1024) + let response = HistoryRPC.init(message) + + if response.isErr: + error "failed to decode response" + return + + # NOTE Perform accounting operation + # if SWAPAccountingEnabled: + let peerId = peer.peerInfo.peerId + let messages = response.value.response.messages + accountFor(peerId, messages.len) + + handler(response.value.response) diff --git a/waku/protocol/v2/waku_swap.nim b/waku/protocol/v2/waku_swap.nim index cc6f2b15f..5ee9256b5 100644 --- a/waku/protocol/v2/waku_swap.nim +++ b/waku/protocol/v2/waku_swap.nim @@ -71,3 +71,13 @@ proc init*(T: type Cheque, buffer: seq[byte]): ProtoResult[T] = discard ? pb.getField(3, cheque.amount) ok(cheque) + + +# Accounting +# + +proc accountFor*(peerId: PeerId, n: int) {.gcsafe.} = + info "Accounting for", peerId, n + +# TODO End to end communication +# TODO Better state management (STDOUT for now)