swarm, p2p/protocols: Stream accounting (#18337)

* swarm: completed 1st phase of swap accounting

* swarm, p2p/protocols: added stream pricing

* swarm/network/stream: gofmt simplify stream.go

* swarm: fixed review comments

* swarm: used snapshots for swap tests

* swarm: custom retrieve for swap (less cascaded requests at any one time)

* swarm: addressed PR comments

* swarm: log output formatting

* swarm: removed parallelism in swap tests

* swarm: swap tests simplification

* swarm: removed swap_test.go

* swarm/network/stream: added prefix space for comments

* swarm/network/stream: unit test for prices

* swarm/network/stream: don't hardcode price

* swarm/network/stream: fixed invalid price check
This commit is contained in:
holisticode 2019-01-07 18:59:00 -05:00 committed by Viktor Trón
parent 56a3f6c03c
commit ae857e74bf
3 changed files with 185 additions and 104 deletions

View File

@ -22,31 +22,33 @@ import (
"github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/metrics"
) )
//define some metrics // define some metrics
var ( var (
//All metrics are cumulative // All metrics are cumulative
//total amount of units credited // total amount of units credited
mBalanceCredit metrics.Counter mBalanceCredit metrics.Counter
//total amount of units debited // total amount of units debited
mBalanceDebit metrics.Counter mBalanceDebit metrics.Counter
//total amount of bytes credited // total amount of bytes credited
mBytesCredit metrics.Counter mBytesCredit metrics.Counter
//total amount of bytes debited // total amount of bytes debited
mBytesDebit metrics.Counter mBytesDebit metrics.Counter
//total amount of credited messages // total amount of credited messages
mMsgCredit metrics.Counter mMsgCredit metrics.Counter
//total amount of debited messages // total amount of debited messages
mMsgDebit metrics.Counter mMsgDebit metrics.Counter
//how many times local node had to drop remote peers // how many times local node had to drop remote peers
mPeerDrops metrics.Counter mPeerDrops metrics.Counter
//how many times local node overdrafted and dropped // how many times local node overdrafted and dropped
mSelfDrops metrics.Counter mSelfDrops metrics.Counter
MetricsRegistry metrics.Registry
) )
//Prices defines how prices are being passed on to the accounting instance // Prices defines how prices are being passed on to the accounting instance
type Prices interface { type Prices interface {
//Return the Price for a message // Return the Price for a message
Price(interface{}) *Price Price(interface{}) *Price
} }
@ -57,20 +59,20 @@ const (
Receiver = Payer(false) Receiver = Payer(false)
) )
//Price represents the costs of a message // Price represents the costs of a message
type Price struct { type Price struct {
Value uint64 // Value uint64
PerByte bool //True if the price is per byte or for unit PerByte bool // True if the price is per byte or for unit
Payer Payer Payer Payer
} }
//For gives back the price for a message // For gives back the price for a message
//A protocol provides the message price in absolute value // A protocol provides the message price in absolute value
//This method then returns the correct signed amount, // This method then returns the correct signed amount,
//depending on who pays, which is identified by the `payer` argument: // depending on who pays, which is identified by the `payer` argument:
//`Send` will pass a `Sender` payer, `Receive` will pass the `Receiver` argument. // `Send` will pass a `Sender` payer, `Receive` will pass the `Receiver` argument.
//Thus: If Sending and sender pays, amount positive, otherwise negative // Thus: If Sending and sender pays, amount positive, otherwise negative
//If Receiving, and receiver pays, amount positive, otherwise negative // If Receiving, and receiver pays, amount positive, otherwise negative
func (p *Price) For(payer Payer, size uint32) int64 { func (p *Price) For(payer Payer, size uint32) int64 {
price := p.Value price := p.Value
if p.PerByte { if p.PerByte {
@ -82,22 +84,22 @@ func (p *Price) For(payer Payer, size uint32) int64 {
return int64(price) return int64(price)
} }
//Balance is the actual accounting instance // Balance is the actual accounting instance
//Balance defines the operations needed for accounting // Balance defines the operations needed for accounting
//Implementations internally maintain the balance for every peer // Implementations internally maintain the balance for every peer
type Balance interface { type Balance interface {
//Adds amount to the local balance with remote node `peer`; // Adds amount to the local balance with remote node `peer`;
//positive amount = credit local node // positive amount = credit local node
//negative amount = debit local node // negative amount = debit local node
Add(amount int64, peer *Peer) error Add(amount int64, peer *Peer) error
} }
//Accounting implements the Hook interface // Accounting implements the Hook interface
//It interfaces to the balances through the Balance interface, // It interfaces to the balances through the Balance interface,
//while interfacing with protocols and its prices through the Prices interface // while interfacing with protocols and its prices through the Prices interface
type Accounting struct { type Accounting struct {
Balance //interface to accounting logic Balance // interface to accounting logic
Prices //interface to prices logic Prices // interface to prices logic
} }
func NewAccounting(balance Balance, po Prices) *Accounting { func NewAccounting(balance Balance, po Prices) *Accounting {
@ -108,70 +110,68 @@ func NewAccounting(balance Balance, po Prices) *Accounting {
return ah return ah
} }
//SetupAccountingMetrics creates a separate registry for p2p accounting metrics; // SetupAccountingMetrics creates a separate registry for p2p accounting metrics;
//this registry should be independent of any other metrics as it persists at different endpoints. // this registry should be independent of any other metrics as it persists at different endpoints.
//It also instantiates the given metrics and starts the persisting go-routine which // It also instantiates the given metrics and starts the persisting go-routine which
//at the passed interval writes the metrics to a LevelDB // at the passed interval writes the metrics to a LevelDB
func SetupAccountingMetrics(reportInterval time.Duration, path string) *AccountingMetrics { func SetupAccountingMetrics(reportInterval time.Duration, path string) *AccountingMetrics {
//create an empty registry // create an empty registry
registry := metrics.NewRegistry() MetricsRegistry = metrics.NewRegistry()
//instantiate the metrics // instantiate the metrics
mBalanceCredit = metrics.NewRegisteredCounterForced("account.balance.credit", registry) mBalanceCredit = metrics.NewRegisteredCounterForced("account.balance.credit", MetricsRegistry)
mBalanceDebit = metrics.NewRegisteredCounterForced("account.balance.debit", registry) mBalanceDebit = metrics.NewRegisteredCounterForced("account.balance.debit", MetricsRegistry)
mBytesCredit = metrics.NewRegisteredCounterForced("account.bytes.credit", registry) mBytesCredit = metrics.NewRegisteredCounterForced("account.bytes.credit", MetricsRegistry)
mBytesDebit = metrics.NewRegisteredCounterForced("account.bytes.debit", registry) mBytesDebit = metrics.NewRegisteredCounterForced("account.bytes.debit", MetricsRegistry)
mMsgCredit = metrics.NewRegisteredCounterForced("account.msg.credit", registry) mMsgCredit = metrics.NewRegisteredCounterForced("account.msg.credit", MetricsRegistry)
mMsgDebit = metrics.NewRegisteredCounterForced("account.msg.debit", registry) mMsgDebit = metrics.NewRegisteredCounterForced("account.msg.debit", MetricsRegistry)
mPeerDrops = metrics.NewRegisteredCounterForced("account.peerdrops", registry) mPeerDrops = metrics.NewRegisteredCounterForced("account.peerdrops", MetricsRegistry)
mSelfDrops = metrics.NewRegisteredCounterForced("account.selfdrops", registry) mSelfDrops = metrics.NewRegisteredCounterForced("account.selfdrops", MetricsRegistry)
//create the DB and start persisting // create the DB and start persisting
return NewAccountingMetrics(registry, reportInterval, path) return NewAccountingMetrics(MetricsRegistry, reportInterval, path)
} }
//Implement Hook.Send
// Send takes a peer, a size and a msg and // Send takes a peer, a size and a msg and
// - calculates the cost for the local node sending a msg of size to peer using the Prices interface // - calculates the cost for the local node sending a msg of size to peer using the Prices interface
// - credits/debits local node using balance interface // - credits/debits local node using balance interface
func (ah *Accounting) Send(peer *Peer, size uint32, msg interface{}) error { func (ah *Accounting) Send(peer *Peer, size uint32, msg interface{}) error {
//get the price for a message (through the protocol spec) // get the price for a message (through the protocol spec)
price := ah.Price(msg) price := ah.Price(msg)
//this message doesn't need accounting // this message doesn't need accounting
if price == nil { if price == nil {
return nil return nil
} }
//evaluate the price for sending messages // evaluate the price for sending messages
costToLocalNode := price.For(Sender, size) costToLocalNode := price.For(Sender, size)
//do the accounting // do the accounting
err := ah.Add(costToLocalNode, peer) err := ah.Add(costToLocalNode, peer)
//record metrics: just increase counters for user-facing metrics // record metrics: just increase counters for user-facing metrics
ah.doMetrics(costToLocalNode, size, err) ah.doMetrics(costToLocalNode, size, err)
return err return err
} }
//Implement Hook.Receive
// Receive takes a peer, a size and a msg and // Receive takes a peer, a size and a msg and
// - calculates the cost for the local node receiving a msg of size from peer using the Prices interface // - calculates the cost for the local node receiving a msg of size from peer using the Prices interface
// - credits/debits local node using balance interface // - credits/debits local node using balance interface
func (ah *Accounting) Receive(peer *Peer, size uint32, msg interface{}) error { func (ah *Accounting) Receive(peer *Peer, size uint32, msg interface{}) error {
//get the price for a message (through the protocol spec) // get the price for a message (through the protocol spec)
price := ah.Price(msg) price := ah.Price(msg)
//this message doesn't need accounting // this message doesn't need accounting
if price == nil { if price == nil {
return nil return nil
} }
//evaluate the price for receiving messages // evaluate the price for receiving messages
costToLocalNode := price.For(Receiver, size) costToLocalNode := price.For(Receiver, size)
//do the accounting // do the accounting
err := ah.Add(costToLocalNode, peer) err := ah.Add(costToLocalNode, peer)
//record metrics: just increase counters for user-facing metrics // record metrics: just increase counters for user-facing metrics
ah.doMetrics(costToLocalNode, size, err) ah.doMetrics(costToLocalNode, size, err)
return err return err
} }
//record some metrics // record some metrics
//this is not an error handling. `err` is returned by both `Send` and `Receive` // this is not an error handling. `err` is returned by both `Send` and `Receive`
//`err` will only be non-nil if a limit has been violated (overdraft), in which case the peer has been dropped. // `err` will only be non-nil if a limit has been violated (overdraft), in which case the peer has been dropped.
//if the limit has been violated and `err` is thus not nil: // if the limit has been violated and `err` is thus not nil:
// * if the price is positive, local node has been credited; thus `err` implicitly signals the REMOTE has been dropped // * if the price is positive, local node has been credited; thus `err` implicitly signals the REMOTE has been dropped
// * if the price is negative, local node has been debited, thus `err` implicitly signals LOCAL node "overdraft" // * if the price is negative, local node has been debited, thus `err` implicitly signals LOCAL node "overdraft"
func (ah *Accounting) doMetrics(price int64, size uint32, err error) { func (ah *Accounting) doMetrics(price int64, size uint32, err error) {
@ -180,7 +180,7 @@ func (ah *Accounting) doMetrics(price int64, size uint32, err error) {
mBytesCredit.Inc(int64(size)) mBytesCredit.Inc(int64(size))
mMsgCredit.Inc(1) mMsgCredit.Inc(1)
if err != nil { if err != nil {
//increase the number of times a remote node has been dropped due to "overdraft" // increase the number of times a remote node has been dropped due to "overdraft"
mPeerDrops.Inc(1) mPeerDrops.Inc(1)
} }
} else { } else {
@ -188,7 +188,7 @@ func (ah *Accounting) doMetrics(price int64, size uint32, err error) {
mBytesDebit.Inc(int64(size)) mBytesDebit.Inc(int64(size))
mMsgDebit.Inc(1) mMsgDebit.Inc(1)
if err != nil { if err != nil {
//increase the number of times the local node has done an "overdraft" in respect to other nodes // increase the number of times the local node has done an "overdraft" in respect to other nodes
mSelfDrops.Inc(1) mSelfDrops.Inc(1)
} }
} }

View File

@ -48,28 +48,28 @@ const (
HashSize = 32 HashSize = 32
) )
//Enumerate options for syncing and retrieval // Enumerate options for syncing and retrieval
type SyncingOption int type SyncingOption int
type RetrievalOption int type RetrievalOption int
//Syncing options // Syncing options
const ( const (
//Syncing disabled // Syncing disabled
SyncingDisabled SyncingOption = iota SyncingDisabled SyncingOption = iota
//Register the client and the server but not subscribe // Register the client and the server but not subscribe
SyncingRegisterOnly SyncingRegisterOnly
//Both client and server funcs are registered, subscribe sent automatically // Both client and server funcs are registered, subscribe sent automatically
SyncingAutoSubscribe SyncingAutoSubscribe
) )
const ( const (
//Retrieval disabled. Used mostly for tests to isolate syncing features (i.e. syncing only) // Retrieval disabled. Used mostly for tests to isolate syncing features (i.e. syncing only)
RetrievalDisabled RetrievalOption = iota RetrievalDisabled RetrievalOption = iota
//Only the client side of the retrieve request is registered. // Only the client side of the retrieve request is registered.
//(light nodes do not serve retrieve requests) // (light nodes do not serve retrieve requests)
//once the client is registered, subscription to retrieve request stream is always sent // once the client is registered, subscription to retrieve request stream is always sent
RetrievalClientOnly RetrievalClientOnly
//Both client and server funcs are registered, subscribe sent automatically // Both client and server funcs are registered, subscribe sent automatically
RetrievalEnabled RetrievalEnabled
) )
@ -86,18 +86,18 @@ type Registry struct {
peers map[enode.ID]*Peer peers map[enode.ID]*Peer
delivery *Delivery delivery *Delivery
intervalsStore state.Store intervalsStore state.Store
autoRetrieval bool //automatically subscribe to retrieve request stream autoRetrieval bool // automatically subscribe to retrieve request stream
maxPeerServers int maxPeerServers int
spec *protocols.Spec //this protocol's spec balance protocols.Balance // implements protocols.Balance, for accounting
balance protocols.Balance //implements protocols.Balance, for accounting prices protocols.Prices // implements protocols.Prices, provides prices to accounting
prices protocols.Prices //implements protocols.Prices, provides prices to accounting spec *protocols.Spec // this protocol's spec
} }
// RegistryOptions holds optional values for NewRegistry constructor. // RegistryOptions holds optional values for NewRegistry constructor.
type RegistryOptions struct { type RegistryOptions struct {
SkipCheck bool SkipCheck bool
Syncing SyncingOption //Defines syncing behavior Syncing SyncingOption // Defines syncing behavior
Retrieval RetrievalOption //Defines retrieval behavior Retrieval RetrievalOption // Defines retrieval behavior
SyncUpdateDelay time.Duration SyncUpdateDelay time.Duration
MaxPeerServers int // The limit of servers for each peer in registry MaxPeerServers int // The limit of servers for each peer in registry
} }
@ -110,7 +110,7 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
if options.SyncUpdateDelay <= 0 { if options.SyncUpdateDelay <= 0 {
options.SyncUpdateDelay = 15 * time.Second options.SyncUpdateDelay = 15 * time.Second
} }
//check if retriaval has been disabled // check if retrieval has been disabled
retrieval := options.Retrieval != RetrievalDisabled retrieval := options.Retrieval != RetrievalDisabled
streamer := &Registry{ streamer := &Registry{
@ -130,7 +130,7 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
streamer.api = NewAPI(streamer) streamer.api = NewAPI(streamer)
delivery.getPeer = streamer.getPeer delivery.getPeer = streamer.getPeer
//if retrieval is enabled, register the server func, so that retrieve requests will be served (non-light nodes only) // if retrieval is enabled, register the server func, so that retrieve requests will be served (non-light nodes only)
if options.Retrieval == RetrievalEnabled { if options.Retrieval == RetrievalEnabled {
streamer.RegisterServerFunc(swarmChunkServerStreamName, func(_ *Peer, _ string, live bool) (Server, error) { streamer.RegisterServerFunc(swarmChunkServerStreamName, func(_ *Peer, _ string, live bool) (Server, error) {
if !live { if !live {
@ -140,20 +140,20 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
}) })
} }
//if retrieval is not disabled, register the client func (both light nodes and normal nodes can issue retrieve requests) // if retrieval is not disabled, register the client func (both light nodes and normal nodes can issue retrieve requests)
if options.Retrieval != RetrievalDisabled { if options.Retrieval != RetrievalDisabled {
streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, t string, live bool) (Client, error) { streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, t string, live bool) (Client, error) {
return NewSwarmSyncerClient(p, syncChunkStore, NewStream(swarmChunkServerStreamName, t, live)) return NewSwarmSyncerClient(p, syncChunkStore, NewStream(swarmChunkServerStreamName, t, live))
}) })
} }
//If syncing is not disabled, the syncing functions are registered (both client and server) // If syncing is not disabled, the syncing functions are registered (both client and server)
if options.Syncing != SyncingDisabled { if options.Syncing != SyncingDisabled {
RegisterSwarmSyncerServer(streamer, syncChunkStore) RegisterSwarmSyncerServer(streamer, syncChunkStore)
RegisterSwarmSyncerClient(streamer, syncChunkStore) RegisterSwarmSyncerClient(streamer, syncChunkStore)
} }
//if syncing is set to automatically subscribe to the syncing stream, start the subscription process // if syncing is set to automatically subscribe to the syncing stream, start the subscription process
if options.Syncing == SyncingAutoSubscribe { if options.Syncing == SyncingAutoSubscribe {
// latestIntC function ensures that // latestIntC function ensures that
// - receiving from the in chan is not blocked by processing inside the for loop // - receiving from the in chan is not blocked by processing inside the for loop
@ -235,13 +235,17 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
return streamer return streamer
} }
//we need to construct a spec instance per node instance // This is an accounted protocol, therefore we need to provide a pricing Hook to the spec
// For simulations to be able to run multiple nodes and not override the hook's balance,
// we need to construct a spec instance per node instance
func (r *Registry) setupSpec() { func (r *Registry) setupSpec() {
//first create the "bare" spec // first create the "bare" spec
r.createSpec() r.createSpec()
//if balance is nil, this node has been started without swap support (swapEnabled flag is false) // now create the pricing object
r.createPriceOracle()
// if balance is nil, this node has been started without swap support (swapEnabled flag is false)
if r.balance != nil && !reflect.ValueOf(r.balance).IsNil() { if r.balance != nil && !reflect.ValueOf(r.balance).IsNil() {
//swap is enabled, so setup the hook // swap is enabled, so setup the hook
r.spec.Hook = protocols.NewAccounting(r.balance, r.prices) r.spec.Hook = protocols.NewAccounting(r.balance, r.prices)
} }
} }
@ -533,11 +537,11 @@ func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error {
return p.handleWantedHashesMsg(ctx, msg) return p.handleWantedHashesMsg(ctx, msg)
case *ChunkDeliveryMsgRetrieval: case *ChunkDeliveryMsgRetrieval:
//handling chunk delivery is the same for retrieval and syncing, so let's cast the msg // handling chunk delivery is the same for retrieval and syncing, so let's cast the msg
return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg))) return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg)))
case *ChunkDeliveryMsgSyncing: case *ChunkDeliveryMsgSyncing:
//handling chunk delivery is the same for retrieval and syncing, so let's cast the msg // handling chunk delivery is the same for retrieval and syncing, so let's cast the msg
return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg))) return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg)))
case *RetrieveRequestMsg: case *RetrieveRequestMsg:
@ -726,9 +730,9 @@ func (c *clientParams) clientCreated() {
close(c.clientCreatedC) close(c.clientCreatedC)
} }
//GetSpec returns the streamer spec to callers // GetSpec returns the streamer spec to callers
//This used to be a global variable but for simulations with // This used to be a global variable but for simulations with
//multiple nodes its fields (notably the Hook) would be overwritten // multiple nodes its fields (notably the Hook) would be overwritten
func (r *Registry) GetSpec() *protocols.Spec { func (r *Registry) GetSpec() *protocols.Spec {
return r.spec return r.spec
} }
@ -756,6 +760,52 @@ func (r *Registry) createSpec() {
r.spec = spec r.spec = spec
} }
// An accountable message needs some meta information attached to it
// in order to evaluate the correct price
type StreamerPrices struct {
priceMatrix map[reflect.Type]*protocols.Price
registry *Registry
}
// Price implements the accounting interface and returns the price for a specific message
func (sp *StreamerPrices) Price(msg interface{}) *protocols.Price {
t := reflect.TypeOf(msg).Elem()
return sp.priceMatrix[t]
}
// Instead of hardcoding the price, get it
// through a function - it could be quite complex in the future
func (sp *StreamerPrices) getRetrieveRequestMsgPrice() uint64 {
return uint64(1)
}
// Instead of hardcoding the price, get it
// through a function - it could be quite complex in the future
func (sp *StreamerPrices) getChunkDeliveryMsgRetrievalPrice() uint64 {
return uint64(1)
}
// createPriceOracle sets up a matrix which can be queried to get
// the price for a message via the Price method
func (r *Registry) createPriceOracle() {
sp := &StreamerPrices{
registry: r,
}
sp.priceMatrix = map[reflect.Type]*protocols.Price{
reflect.TypeOf(ChunkDeliveryMsgRetrieval{}): {
Value: sp.getChunkDeliveryMsgRetrievalPrice(), // arbitrary price for now
PerByte: true,
Payer: protocols.Receiver,
},
reflect.TypeOf(RetrieveRequestMsg{}): {
Value: sp.getRetrieveRequestMsgPrice(), // arbitrary price for now
PerByte: false,
Payer: protocols.Sender,
},
}
r.prices = sp
}
func (r *Registry) Protocols() []p2p.Protocol { func (r *Registry) Protocols() []p2p.Protocol {
return []p2p.Protocol{ return []p2p.Protocol{
{ {

View File

@ -921,3 +921,34 @@ func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) {
} }
} }
} }
//TestHasPriceImplementation is to check that the Registry has a
//`Price` interface implementation
func TestHasPriceImplementation(t *testing.T) {
_, r, _, teardown, err := newStreamerTester(t, &RegistryOptions{
Retrieval: RetrievalDisabled,
Syncing: SyncingDisabled,
})
defer teardown()
if err != nil {
t.Fatal(err)
}
if r.prices == nil {
t.Fatal("No prices implementation available for the stream protocol")
}
pricesInstance, ok := r.prices.(*StreamerPrices)
if !ok {
t.Fatal("`Registry` does not have the expected Prices instance")
}
price := pricesInstance.Price(&ChunkDeliveryMsgRetrieval{})
if price == nil || price.Value == 0 || price.Value != pricesInstance.getChunkDeliveryMsgRetrievalPrice() {
t.Fatal("No prices set for chunk delivery msg")
}
price = pricesInstance.Price(&RetrieveRequestMsg{})
if price == nil || price.Value == 0 || price.Value != pricesInstance.getRetrieveRequestMsgPrice() {
t.Fatal("No prices set for chunk delivery msg")
}
}