Add handler for pricing messages
This commit is contained in:
parent
364d3a9277
commit
5f9215a371
|
@ -17,10 +17,12 @@ import pkg/nitro
|
||||||
|
|
||||||
import ../blocktype as bt
|
import ../blocktype as bt
|
||||||
import ./protobuf/bitswap as pb
|
import ./protobuf/bitswap as pb
|
||||||
|
import ./protobuf/payments
|
||||||
import ./networkpeer
|
import ./networkpeer
|
||||||
|
|
||||||
export pb, networkpeer
|
export pb, networkpeer
|
||||||
export nitro
|
export nitro
|
||||||
|
export payments
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "dagger bitswap network"
|
topics = "dagger bitswap network"
|
||||||
|
@ -31,11 +33,13 @@ type
|
||||||
WantListHandler* = proc(peer: PeerID, wantList: WantList) {.gcsafe.}
|
WantListHandler* = proc(peer: PeerID, wantList: WantList) {.gcsafe.}
|
||||||
BlocksHandler* = proc(peer: PeerID, blocks: seq[bt.Block]) {.gcsafe.}
|
BlocksHandler* = proc(peer: PeerID, blocks: seq[bt.Block]) {.gcsafe.}
|
||||||
BlockPresenceHandler* = proc(peer: PeerID, precense: seq[BlockPresence]) {.gcsafe.}
|
BlockPresenceHandler* = proc(peer: PeerID, precense: seq[BlockPresence]) {.gcsafe.}
|
||||||
|
PricingHandler* = proc(peer: PeerID, pricing: Pricing) {.gcsafe.}
|
||||||
|
|
||||||
BitswapHandlers* = object
|
BitswapHandlers* = object
|
||||||
onWantList*: WantListHandler
|
onWantList*: WantListHandler
|
||||||
onBlocks*: BlocksHandler
|
onBlocks*: BlocksHandler
|
||||||
onPresence*: BlockPresenceHandler
|
onPresence*: BlockPresenceHandler
|
||||||
|
onPricing*: PricingHandler
|
||||||
|
|
||||||
WantListBroadcaster* = proc(
|
WantListBroadcaster* = proc(
|
||||||
id: PeerID,
|
id: PeerID,
|
||||||
|
@ -195,6 +199,13 @@ proc broadcastBlockPresence*(
|
||||||
trace "Sending presence to peer", peer = id
|
trace "Sending presence to peer", peer = id
|
||||||
asyncSpawn b.peers[id].send(Message(blockPresences: presence))
|
asyncSpawn b.peers[id].send(Message(blockPresences: presence))
|
||||||
|
|
||||||
|
proc handlePricing(network: BitswapNetwork,
|
||||||
|
peer: NetworkPeer,
|
||||||
|
pricing: Pricing) =
|
||||||
|
if network.handlers.onPricing.isNil:
|
||||||
|
return
|
||||||
|
network.handlers.onPricing(peer.id, pricing)
|
||||||
|
|
||||||
proc rpcHandler(b: BitswapNetwork, peer: NetworkPeer, msg: Message) {.async.} =
|
proc rpcHandler(b: BitswapNetwork, peer: NetworkPeer, msg: Message) {.async.} =
|
||||||
try:
|
try:
|
||||||
if msg.wantlist.entries.len > 0:
|
if msg.wantlist.entries.len > 0:
|
||||||
|
@ -209,6 +220,9 @@ proc rpcHandler(b: BitswapNetwork, peer: NetworkPeer, msg: Message) {.async.} =
|
||||||
if msg.blockPresences.len > 0:
|
if msg.blockPresences.len > 0:
|
||||||
b.handleBlockPresence(peer, msg.blockPresences)
|
b.handleBlockPresence(peer, msg.blockPresences)
|
||||||
|
|
||||||
|
if pricing =? Pricing.init(msg.pricing):
|
||||||
|
b.handlePricing(peer, pricing)
|
||||||
|
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
trace "Exception in bitswap rpc handler", exc = exc.msg
|
trace "Exception in bitswap rpc handler", exc = exc.msg
|
||||||
|
|
||||||
|
|
|
@ -9,11 +9,13 @@ import pkg/protobuf_serialization
|
||||||
|
|
||||||
import pkg/dagger/stores/memorystore
|
import pkg/dagger/stores/memorystore
|
||||||
import pkg/dagger/bitswap/network
|
import pkg/dagger/bitswap/network
|
||||||
|
import pkg/dagger/bitswap/protobuf/payments
|
||||||
import pkg/dagger/p2p/rng
|
import pkg/dagger/p2p/rng
|
||||||
import pkg/dagger/chunker
|
import pkg/dagger/chunker
|
||||||
import pkg/dagger/blocktype as bt
|
import pkg/dagger/blocktype as bt
|
||||||
|
|
||||||
import ../helpers
|
import ../helpers
|
||||||
|
import ../examples
|
||||||
|
|
||||||
suite "Bitswap network":
|
suite "Bitswap network":
|
||||||
let
|
let
|
||||||
|
@ -102,6 +104,20 @@ suite "Bitswap network":
|
||||||
|
|
||||||
await done.wait(500.millis)
|
await done.wait(500.millis)
|
||||||
|
|
||||||
|
test "handles pricing messages":
|
||||||
|
let pricing = Pricing.example
|
||||||
|
|
||||||
|
proc handlePricing(peer: PeerID, received: Pricing) =
|
||||||
|
check received == pricing
|
||||||
|
done.complete()
|
||||||
|
|
||||||
|
network.handlers.onPricing = handlePricing
|
||||||
|
|
||||||
|
let message = Message(pricing: PricingMessage.init(pricing))
|
||||||
|
await buffer.pushData(lenPrefix(Protobuf.encode(message)))
|
||||||
|
|
||||||
|
await done.wait(100.millis)
|
||||||
|
|
||||||
suite "Bitswap Network - e2e":
|
suite "Bitswap Network - e2e":
|
||||||
let
|
let
|
||||||
chunker = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256)
|
chunker = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256)
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
import std/random
|
import std/random
|
||||||
import pkg/nitro
|
import pkg/nitro
|
||||||
|
import pkg/dagger/bitswap/protobuf/payments
|
||||||
|
|
||||||
proc example*(_: type EthAddress): EthAddress =
|
proc example*(_: type EthAddress): EthAddress =
|
||||||
EthPrivateKey.random().toPublicKey.toAddress
|
EthPrivateKey.random().toPublicKey.toAddress
|
||||||
|
@ -21,3 +22,10 @@ proc example*(_: type SignedState): SignedState =
|
||||||
let nonce = UInt48.example
|
let nonce = UInt48.example
|
||||||
let channel = wallet.openLedgerChannel(hub, chainId, nonce, asset, amount).get
|
let channel = wallet.openLedgerChannel(hub, chainId, nonce, asset, amount).get
|
||||||
wallet.pay(channel, asset, receiver, amount).get
|
wallet.pay(channel, asset, receiver, amount).get
|
||||||
|
|
||||||
|
proc example*(_: type Pricing): Pricing =
|
||||||
|
Pricing(
|
||||||
|
address: EthAddress.example,
|
||||||
|
asset: EthAddress.example,
|
||||||
|
price: UInt256.example()
|
||||||
|
)
|
||||||
|
|
Loading…
Reference in New Issue