Replace pricing exchange by account exchange

Rationale: price is no longer set per peer, but per chunk.
Only the Ethereum accounts of the peers needs to be exchanged.
This commit is contained in:
Mark Spanbroek 2021-05-10 16:21:47 +02:00 committed by markspanbroek
parent d4dd15734e
commit 6bd4260911
14 changed files with 84 additions and 105 deletions

View File

@ -157,8 +157,8 @@ proc new*(
blocks: seq[bt.Block]) {.gcsafe.} = blocks: seq[bt.Block]) {.gcsafe.} =
engine.blocksHandler(peer, blocks) engine.blocksHandler(peer, blocks)
proc pricingHandler(peer: PeerId, pricing: Pricing) = proc accountHandler(peer: PeerId, account: Account) =
engine.pricingHandler(peer, pricing) engine.accountHandler(peer, account)
proc paymentHandler(peer: PeerId, payment: SignedState) = proc paymentHandler(peer: PeerId, payment: SignedState) =
engine.paymentHandler(peer, payment) engine.paymentHandler(peer, payment)
@ -167,7 +167,7 @@ proc new*(
onWantList: blockWantListHandler, onWantList: blockWantListHandler,
onBlocks: blocksHandler, onBlocks: blocksHandler,
onPresence: blockPresenceHandler, onPresence: blockPresenceHandler,
onPricing: pricingHandler, onAccount: accountHandler,
onPayment: paymentHandler onPayment: paymentHandler
) )

View File

@ -50,6 +50,10 @@ type
wallet*: WalletRef # nitro wallet for micropayments wallet*: WalletRef # nitro wallet for micropayments
pricing*: ?Pricing # optional bandwidth pricing pricing*: ?Pricing # optional bandwidth pricing
Pricing* = object
address*: EthAddress
price*: UInt256
proc contains*(a: AsyncHeapQueue[Entry], b: Cid): bool = proc contains*(a: AsyncHeapQueue[Entry], b: Cid): bool =
## Convenience method to check for entry prepense ## Convenience method to check for entry prepense
## ##
@ -246,21 +250,20 @@ proc wantListHandler*(
if not b.scheduleTask(peerCtx): if not b.scheduleTask(peerCtx):
trace "Unable to schedule task for peer", peer trace "Unable to schedule task for peer", peer
proc pricingHandler*(engine: BitswapEngine, peer: PeerID, pricing: Pricing) = proc accountHandler*(engine: BitswapEngine, peer: PeerID, account: Account) =
let context = engine.getPeerCtx(peer) let context = engine.getPeerCtx(peer)
if context.isNil: if context.isNil:
return return
context.pricing = pricing.some context.account = account.some
proc paymentHandler*(engine: BitswapEngine, peer: PeerId, payment: SignedState) = proc paymentHandler*(engine: BitswapEngine, peer: PeerId, payment: SignedState) =
without context =? engine.getPeerCtx(peer).option and without context =? engine.getPeerCtx(peer).option and
contextPricing =? context.pricing and account =? context.account:
enginePricing =? engine.pricing:
return return
if channel =? context.paymentChannel: if channel =? context.paymentChannel:
let sender = contextPricing.address let sender = account.address
discard engine.wallet.acceptPayment(channel, Asset, sender, payment) discard engine.wallet.acceptPayment(channel, Asset, sender, payment)
else: else:
context.paymentChannel = engine.wallet.acceptChannel(payment).option context.paymentChannel = engine.wallet.acceptChannel(payment).option
@ -280,8 +283,8 @@ proc setupPeer*(b: BitswapEngine, peer: PeerID) =
if b.wantList.len > 0: if b.wantList.len > 0:
b.request.sendWantList(peer, b.wantList, full = true) b.request.sendWantList(peer, b.wantList, full = true)
if pricing =? b.pricing: if address =? b.pricing.?address:
b.request.sendPricing(peer, pricing) b.request.sendAccount(peer, Account(address: address))
proc dropPeer*(b: BitswapEngine, peer: PeerID) = proc dropPeer*(b: BitswapEngine, peer: PeerID) =
## Cleanup disconnected peer ## Cleanup disconnected peer

View File

@ -21,20 +21,20 @@ func openLedgerChannel*(wallet: WalletRef,
func getOrOpenChannel(wallet: WalletRef, peer: BitswapPeerCtx): ?!ChannelId = func getOrOpenChannel(wallet: WalletRef, peer: BitswapPeerCtx): ?!ChannelId =
if channel =? peer.paymentChannel: if channel =? peer.paymentChannel:
success channel success channel
elif pricing =? peer.pricing: elif account =? peer.account:
let channel = ?wallet.openLedgerChannel(pricing.address, Asset) let channel = ?wallet.openLedgerChannel(account.address, Asset)
peer.paymentChannel = channel.some peer.paymentChannel = channel.some
success channel success channel
else: else:
failure "no pricing set for peer" failure "no account set for peer"
func pay*(wallet: WalletRef, func pay*(wallet: WalletRef,
peer: BitswapPeerCtx, peer: BitswapPeerCtx,
amount: UInt256): ?!SignedState = amount: UInt256): ?!SignedState =
if pricing =? peer.pricing: if account =? peer.account:
let asset = Asset let asset = Asset
let receiver = pricing.address let receiver = account.address
let channel = ?wallet.getOrOpenChannel(peer) let channel = ?wallet.getOrOpenChannel(peer)
wallet.pay(channel, asset, receiver, amount) wallet.pay(channel, asset, receiver, amount)
else: else:
failure "no pricing set for peer" failure "no account set for peer"

View File

@ -31,14 +31,14 @@ type
WantListHandler* = proc(peer: PeerID, wantList: WantList) {.gcsafe.} WantListHandler* = proc(peer: PeerID, wantList: WantList) {.gcsafe.}
BlocksHandler* = proc(peer: PeerID, blocks: seq[bt.Block]) {.gcsafe.} BlocksHandler* = proc(peer: PeerID, blocks: seq[bt.Block]) {.gcsafe.}
BlockPresenceHandler* = proc(peer: PeerID, precense: seq[BlockPresence]) {.gcsafe.} BlockPresenceHandler* = proc(peer: PeerID, precense: seq[BlockPresence]) {.gcsafe.}
PricingHandler* = proc(peer: PeerID, pricing: Pricing) {.gcsafe.} AccountHandler* = proc(peer: PeerID, account: Account) {.gcsafe.}
PaymentHandler* = proc(peer: PeerID, payment: SignedState) {.gcsafe.} PaymentHandler* = proc(peer: PeerID, payment: SignedState) {.gcsafe.}
BitswapHandlers* = object BitswapHandlers* = object
onWantList*: WantListHandler onWantList*: WantListHandler
onBlocks*: BlocksHandler onBlocks*: BlocksHandler
onPresence*: BlockPresenceHandler onPresence*: BlockPresenceHandler
onPricing*: PricingHandler onAccount*: AccountHandler
onPayment*: PaymentHandler onPayment*: PaymentHandler
WantListBroadcaster* = proc( WantListBroadcaster* = proc(
@ -52,14 +52,14 @@ type
BlocksBroadcaster* = proc(peer: PeerID, presence: seq[bt.Block]) {.gcsafe.} BlocksBroadcaster* = proc(peer: PeerID, presence: seq[bt.Block]) {.gcsafe.}
PresenceBroadcaster* = proc(peer: PeerID, presence: seq[BlockPresence]) {.gcsafe.} PresenceBroadcaster* = proc(peer: PeerID, presence: seq[BlockPresence]) {.gcsafe.}
PricingBroadcaster* = proc(peer: PeerID, pricing: Pricing) {.gcsafe.} AccountBroadcaster* = proc(peer: PeerID, account: Account) {.gcsafe.}
PaymentBroadcaster* = proc(peer: PeerID, payment: SignedState) {.gcsafe.} PaymentBroadcaster* = proc(peer: PeerID, payment: SignedState) {.gcsafe.}
BitswapRequest* = object BitswapRequest* = object
sendWantList*: WantListBroadcaster sendWantList*: WantListBroadcaster
sendBlocks*: BlocksBroadcaster sendBlocks*: BlocksBroadcaster
sendPresence*: PresenceBroadcaster sendPresence*: PresenceBroadcaster
sendPricing*: PricingBroadcaster sendAccount*: AccountBroadcaster
sendPayment*: PaymentBroadcaster sendPayment*: PaymentBroadcaster
BitswapNetwork* = ref object of LPProtocol BitswapNetwork* = ref object of LPProtocol
@ -202,20 +202,20 @@ proc broadcastBlockPresence*(
trace "Sending presence to peer", peer = id trace "Sending presence to peer", peer = id
asyncSpawn b.peers[id].send(Message(blockPresences: presence)) asyncSpawn b.peers[id].send(Message(blockPresences: presence))
proc handlePricing(network: BitswapNetwork, proc handleAccount(network: BitswapNetwork,
peer: NetworkPeer, peer: NetworkPeer,
pricing: Pricing) = account: Account) =
if network.handlers.onPricing.isNil: if network.handlers.onAccount.isNil:
return return
network.handlers.onPricing(peer.id, pricing) network.handlers.onAccount(peer.id, account)
proc broadcastPricing*(network: BitswapNetwork, proc broadcastAccount*(network: BitswapNetwork,
id: PeerId, id: PeerId,
pricing: Pricing) = account: Account) =
if id notin network.peers: if id notin network.peers:
return return
let message = Message(pricing: PricingMessage.init(pricing)) let message = Message(account: AccountMessage.init(account))
asyncSpawn network.peers[id].send(message) asyncSpawn network.peers[id].send(message)
proc broadcastPayment*(network: BitswapNetwork, proc broadcastPayment*(network: BitswapNetwork,
@ -248,8 +248,8 @@ proc rpcHandler(b: BitswapNetwork, peer: NetworkPeer, msg: Message) {.async.} =
if msg.blockPresences.len > 0: if msg.blockPresences.len > 0:
b.handleBlockPresence(peer, msg.blockPresences) b.handleBlockPresence(peer, msg.blockPresences)
if pricing =? Pricing.init(msg.pricing): if account =? Account.init(msg.account):
b.handlePricing(peer, pricing) b.handleAccount(peer, account)
if payment =? SignedState.init(msg.payment): if payment =? SignedState.init(msg.payment):
b.handlePayment(peer, payment) b.handlePayment(peer, payment)
@ -347,8 +347,8 @@ proc new*(
proc sendPresence(id: PeerID, presence: seq[BlockPresence]) {.gcsafe.} = proc sendPresence(id: PeerID, presence: seq[BlockPresence]) {.gcsafe.} =
b.broadcastBlockPresence(id, presence) b.broadcastBlockPresence(id, presence)
proc sendPricing(id: PeerID, pricing: Pricing) = proc sendAccount(id: PeerID, account: Account) =
b.broadcastPricing(id, pricing) b.broadcastAccount(id, account)
proc sendPayment(id: PeerID, payment: SignedState) = proc sendPayment(id: PeerID, payment: SignedState) =
b.broadcastPayment(id, payment) b.broadcastPayment(id, payment)
@ -357,7 +357,7 @@ proc new*(
sendWantList: sendWantList, sendWantList: sendWantList,
sendBlocks: sendBlocks, sendBlocks: sendBlocks,
sendPresence: sendPresence, sendPresence: sendPresence,
sendPricing: sendPricing, sendAccount: sendAccount,
sendPayment: sendPayment sendPayment: sendPayment
) )

View File

@ -18,7 +18,7 @@ type
peerWants*: seq[Entry] # remote peers want lists peerWants*: seq[Entry] # remote peers want lists
exchanged*: int # times peer has exchanged with us exchanged*: int # times peer has exchanged with us
lastExchange*: Moment # last time peer has exchanged with us lastExchange*: Moment # last time peer has exchanged with us
pricing*: ?Pricing # optional bandwidth price for this peer account*: ?Account # ethereum account of this peer
paymentChannel*: ?ChannelId # payment channel id paymentChannel*: ?ChannelId # payment channel id
proc peerHave*(context: BitswapPeerCtx): seq[Cid] = proc peerHave*(context: BitswapPeerCtx): seq[Cid] =

View File

@ -17,7 +17,7 @@ import_proto3 "message.proto"
export Message export Message
export Wantlist, WantType, Entry export Wantlist, WantType, Entry
export Block, BlockPresenceType, BlockPresence export Block, BlockPresenceType, BlockPresence
export PricingMessage, StateChannelUpdate export AccountMessage, StateChannelUpdate
proc hash*(e: Entry): Hash = proc hash*(e: Entry): Hash =
hash(e.`block`) hash(e.`block`)

View File

@ -35,12 +35,11 @@ message Message {
message BlockPresence { message BlockPresence {
bytes cid = 1; bytes cid = 1;
BlockPresenceType type = 2; BlockPresenceType type = 2;
bytes price = 3; // Amount of assets to pay per byte (UInt256) bytes price = 3; // Amount of assets to pay for the block (UInt256)
} }
message PricingMessage { message AccountMessage {
bytes address = 1; // Ethereum address to which payments should be made bytes address = 1; // Ethereum address to which payments should be made
bytes price = 3; // Amount of assets to pay per byte (UInt256)
} }
message StateChannelUpdate { message StateChannelUpdate {
@ -52,6 +51,6 @@ message Message {
repeated Block payload = 3; // used to send Blocks in bitswap 1.1.0 repeated Block payload = 3; // used to send Blocks in bitswap 1.1.0
repeated BlockPresence blockPresences = 4; repeated BlockPresence blockPresences = 4;
int32 pendingBytes = 5; int32 pendingBytes = 5;
PricingMessage pricing = 6; AccountMessage account = 6;
StateChannelUpdate payment = 7; StateChannelUpdate payment = 7;
} }

View File

@ -6,7 +6,7 @@ import pkg/questionable
import pkg/upraises import pkg/upraises
import ./bitswap import ./bitswap
export PricingMessage export AccountMessage
export StateChannelUpdate export StateChannelUpdate
export stint export stint
@ -15,15 +15,11 @@ export nitro
push: {.upraises: [].} push: {.upraises: [].}
type type
Pricing* = object Account* = object
address*: EthAddress address*: EthAddress
price*: UInt256
func init*(_: type PricingMessage, pricing: Pricing): PricingMessage = func init*(_: type AccountMessage, account: Account): AccountMessage =
PricingMessage( AccountMessage(address: @(account.address.toArray))
address: @(pricing.address.toArray),
price: @(pricing.price.toBytesBE)
)
func parse(_: type EthAddress, bytes: seq[byte]): ?EthAddress = func parse(_: type EthAddress, bytes: seq[byte]): ?EthAddress =
var address: array[20, byte] var address: array[20, byte]
@ -33,16 +29,10 @@ func parse(_: type EthAddress, bytes: seq[byte]): ?EthAddress =
address[i] = bytes[i] address[i] = bytes[i]
EthAddress(address).some EthAddress(address).some
func parse(_: type UInt256, bytes: seq[byte]): ?UInt256 = func init*(_: type Account, message: AccountMessage): ?Account =
if bytes.len > 32: without address =? EthAddress.parse(message.address):
return UInt256.none return none Account
UInt256.fromBytesBE(bytes).some some Account(address: address)
func init*(_: type Pricing, message: PricingMessage): ?Pricing =
without address =? EthAddress.parse(message.address) and
price =? UInt256.parse(message.price):
return Pricing.none
Pricing(address: address, price: price).some
func init*(_: type StateChannelUpdate, state: SignedState): StateChannelUpdate = func init*(_: type StateChannelUpdate, state: SignedState): StateChannelUpdate =
StateChannelUpdate(update: state.toJson.toBytes) StateChannelUpdate(update: state.toJson.toBytes)

View File

@ -4,6 +4,7 @@ import ../../examples
suite "engine payments": suite "engine payments":
let address = EthAddress.example
let amount = 42.u256 let amount = 42.u256
var wallet: WalletRef var wallet: WalletRef
@ -12,17 +13,16 @@ suite "engine payments":
setup: setup:
wallet = WalletRef.example wallet = WalletRef.example
peer = BitswapPeerCtx.example peer = BitswapPeerCtx.example
peer.pricing = Pricing.example.some peer.account = Account(address: address).some
test "pays for received blocks": test "pays for received blocks":
let payment = !wallet.pay(peer, amount) let payment = !wallet.pay(peer, amount)
let pricing = !peer.pricing
let balances = payment.state.outcome.balances(Asset) let balances = payment.state.outcome.balances(Asset)
let destination = pricing.address.toDestination let destination = address.toDestination
check !balances[destination] == amount check !balances[destination] == amount
test "no payment when no price is set": test "no payment when no account is set":
peer.pricing = Pricing.none peer.account = Account.none
check wallet.pay(peer, amount).isFailure check wallet.pay(peer, amount).isFailure
test "uses same channel for consecutive payments": test "uses same channel for consecutive payments":

View File

@ -4,34 +4,21 @@ import pkg/stew/byteutils
import ../../examples import ../../examples
import ../../../../dagger/bitswap/protobuf/payments import ../../../../dagger/bitswap/protobuf/payments
suite "pricing protobuf messages": suite "account protobuf messages":
let address = EthAddress.example let account = Account(address: EthAddress.example)
let price = UInt256.example let message = AccountMessage.init(account)
let pricing = Pricing(address: address, price: price)
let message = PricingMessage.init(pricing)
test "encodes recipient of payments": test "encodes recipient of payments":
check message.address == @(address.toArray) check message.address == @(account.address.toArray)
test "encodes price per byte":
check message.price == @(price.toBytesBE)
test "decodes recipient of payments": test "decodes recipient of payments":
check Pricing.init(message).?address == address.some check Account.init(message).?address == account.address.some
test "decodes price":
check Pricing.init(message).?price == price.some
test "fails to decode when address has incorrect number of bytes": test "fails to decode when address has incorrect number of bytes":
var incorrect = message var incorrect = message
incorrect.address.del(0) incorrect.address.del(0)
check Pricing.init(incorrect).isNone check Account.init(incorrect).isNone
test "fails to decode when price has too many bytes":
var incorrect = message
incorrect.price = newSeq[byte](33)
check Pricing.init(incorrect).isNone
suite "channel update messages": suite "channel update messages":

View File

@ -103,9 +103,9 @@ suite "Bitswap engine - 2 nodes":
peerCtx2.peerHave.mapIt( $it ).sorted(cmp[string]) == peerCtx2.peerHave.mapIt( $it ).sorted(cmp[string]) ==
bitswap1.engine.wantList.mapIt( $it ).sorted(cmp[string]) bitswap1.engine.wantList.mapIt( $it ).sorted(cmp[string])
test "exchanges pricing on connect": test "exchanges accounts on connect":
check peerCtx1.pricing == pricing1.some check peerCtx1.account.?address == pricing1.address.some
check peerCtx2.pricing == pricing2.some check peerCtx2.account.?address == pricing2.address.some
test "should send want-have for block": test "should send want-have for block":
let blk = bt.Block.new("Block 1".toBytes) let blk = bt.Block.new("Block 1".toBytes)
@ -152,7 +152,6 @@ suite "Bitswap engine - 2 nodes":
test "receives payments for blocks that were sent": test "receives payments for blocks that were sent":
let blocks = await bitswap1.getBlocks(blocks2.mapIt(it.cid)) let blocks = await bitswap1.getBlocks(blocks2.mapIt(it.cid))
let pricing = !bitswap2.engine.pricing
await sleepAsync(100.millis) await sleepAsync(100.millis)
let channel = !peerCtx1.paymentChannel let channel = !peerCtx1.paymentChannel
check wallet2.balance(channel, Asset) > 0 check wallet2.balance(channel, Asset) > 0

View File

@ -57,14 +57,14 @@ suite "Bitswap engine basic":
await done await done
test "sends pricing to new peers": test "sends account to new peers":
let pricing = Pricing.example let pricing = Pricing.example
proc sendPricing(peer: PeerID, toBeSent: Pricing) = proc sendAccount(peer: PeerID, account: Account) =
check toBeSent == pricing check account.address == pricing.address
done.complete() done.complete()
let request = BitswapRequest(sendPricing: sendPricing) let request = BitswapRequest(sendAccount: sendAccount)
let engine = BitswapEngine.new(MemoryStore.new, wallet, request) let engine = BitswapEngine.new(MemoryStore.new, wallet, request)
engine.pricing = pricing.some engine.pricing = pricing.some
@ -153,16 +153,16 @@ suite "Bitswap engine handlers":
check engine.localStore.hasBlock(b.cid) check engine.localStore.hasBlock(b.cid)
test "sends payments for received blocks": test "sends payments for received blocks":
let pricing = Pricing.example let account = Account(address: EthAddress.example)
let peerContext = engine.getPeerCtx(peerId) let peerContext = engine.getPeerCtx(peerId)
peerContext.pricing = pricing.some peerContext.account = account.some
peerContext.peerPrices = blocks.mapIt((it.cid, rand(uint16).u256)).toTable peerContext.peerPrices = blocks.mapIt((it.cid, rand(uint16).u256)).toTable
engine.request.sendPayment = proc(receiver: PeerID, payment: SignedState) = engine.request.sendPayment = proc(receiver: PeerID, payment: SignedState) =
let amount = blocks.mapIt(peerContext.peerPrices[it.cid]).foldl(a+b) let amount = blocks.mapIt(peerContext.peerPrices[it.cid]).foldl(a+b)
let balances = !payment.state.outcome.balances(Asset) let balances = !payment.state.outcome.balances(Asset)
check receiver == peerId check receiver == peerId
check balances[pricing.address.toDestination] == amount check balances[account.address.toDestination] == amount
done.complete() done.complete()
engine.blocksHandler(peerId, blocks) engine.blocksHandler(peerId, blocks)

View File

@ -103,16 +103,16 @@ suite "Bitswap network":
await done.wait(500.millis) await done.wait(500.millis)
test "handles pricing messages": test "handles account messages":
let pricing = Pricing.example let account = Account(address: EthAddress.example)
proc handlePricing(peer: PeerID, received: Pricing) = proc handleAccount(peer: PeerID, received: Account) =
check received == pricing check received == account
done.complete() done.complete()
network.handlers.onPricing = handlePricing network.handlers.onAccount = handleAccount
let message = Message(pricing: PricingMessage.init(pricing)) let message = Message(account: AccountMessage.init(account))
await buffer.pushData(lenPrefix(Protobuf.encode(message))) await buffer.pushData(lenPrefix(Protobuf.encode(message)))
await done.wait(100.millis) await done.wait(100.millis)
@ -224,16 +224,16 @@ suite "Bitswap Network - e2e":
await done.wait(500.millis) await done.wait(500.millis)
test "broadcasts pricing": test "broadcasts account":
let pricing = Pricing.example let account = Account(address: EthAddress.example)
proc handlePricing(peer: PeerID, received: Pricing) = proc handleAccount(peer: PeerID, received: Account) =
check received == pricing check received == account
done.complete() done.complete()
network2.handlers.onPricing = handlePricing network2.handlers.onAccount = handleAccount
network1.broadcastPricing(switch2.peerInfo.peerId, pricing) network1.broadcastAccount(switch2.peerInfo.peerId, account)
await done.wait(100.millis) await done.wait(100.millis)

View File

@ -5,6 +5,7 @@ import pkg/nitro
import pkg/dagger/p2p/rng import pkg/dagger/p2p/rng
import pkg/dagger/bitswap/protobuf/payments import pkg/dagger/bitswap/protobuf/payments
import pkg/dagger/bitswap/peercontext import pkg/dagger/bitswap/peercontext
import pkg/dagger/bitswap/engine
import pkg/dagger/blocktype import pkg/dagger/blocktype
proc example*(_: type EthAddress): EthAddress = proc example*(_: type EthAddress): EthAddress =