Implement eth_newPendingTransactionFilter (#1113)

Implement eth_newPendingTransactionFilter
This commit is contained in:
Sebastian Delgado 2018-07-27 08:54:40 -07:00 committed by GitHub
parent 82a709fbc1
commit 7577296b3c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 230 additions and 14 deletions

View File

@ -42,6 +42,7 @@ type StatusBackend struct {
statusNode *node.StatusNode statusNode *node.StatusNode
pendingSignRequests *sign.PendingRequests pendingSignRequests *sign.PendingRequests
personalAPI *personal.PublicAPI personalAPI *personal.PublicAPI
rpcFilters *rpcfilters.Service
accountManager *account.Manager accountManager *account.Manager
transactor *transactions.Transactor transactor *transactions.Transactor
newNotification fcm.NotificationConstructor newNotification fcm.NotificationConstructor
@ -60,6 +61,7 @@ func NewStatusBackend() *StatusBackend {
transactor := transactions.NewTransactor(pendingSignRequests) transactor := transactions.NewTransactor(pendingSignRequests)
personalAPI := personal.NewAPI(pendingSignRequests) personalAPI := personal.NewAPI(pendingSignRequests)
notificationManager := fcm.NewNotification(fcmServerKey) notificationManager := fcm.NewNotification(fcmServerKey)
rpcFilters := rpcfilters.New(statusNode)
return &StatusBackend{ return &StatusBackend{
pendingSignRequests: pendingSignRequests, pendingSignRequests: pendingSignRequests,
@ -67,6 +69,7 @@ func NewStatusBackend() *StatusBackend {
accountManager: accountManager, accountManager: accountManager,
transactor: transactor, transactor: transactor,
personalAPI: personalAPI, personalAPI: personalAPI,
rpcFilters: rpcFilters,
newNotification: notificationManager, newNotification: notificationManager,
log: log.New("package", "status-go/api.StatusBackend"), log: log.New("package", "status-go/api.StatusBackend"),
} }
@ -223,7 +226,11 @@ func (b *StatusBackend) CallPrivateRPC(inputJSON string) string {
// SendTransaction creates a new transaction and waits until it's complete. // SendTransaction creates a new transaction and waits until it's complete.
func (b *StatusBackend) SendTransaction(ctx context.Context, args transactions.SendTxArgs) (hash gethcommon.Hash, err error) { func (b *StatusBackend) SendTransaction(ctx context.Context, args transactions.SendTxArgs) (hash gethcommon.Hash, err error) {
return b.transactor.SendTransaction(ctx, args) transactionHash, err := b.transactor.SendTransaction(ctx, args)
if err == nil {
go b.rpcFilters.TriggerTransactionSentToUpstreamEvent(transactionHash)
}
return transactionHash, err
} }
func (b *StatusBackend) getVerifiedAccount(password string) (*account.SelectedExtKey, error) { func (b *StatusBackend) getVerifiedAccount(password string) (*account.SelectedExtKey, error) {

View File

@ -39,16 +39,19 @@ func newFilter() *filter {
// PublicAPI represents filter API that is exported to `eth` namespace // PublicAPI represents filter API that is exported to `eth` namespace
type PublicAPI struct { type PublicAPI struct {
filters map[rpc.ID]*filter filters map[rpc.ID]*filter
filtersMu sync.Mutex filtersMu sync.Mutex
event *latestBlockChangedEvent latestBlockChangedEvent *latestBlockChangedEvent
transactionSentToUpstreamEvent *transactionSentToUpstreamEvent
} }
// NewPublicAPI returns a reference to the PublicAPI object // NewPublicAPI returns a reference to the PublicAPI object
func NewPublicAPI(event *latestBlockChangedEvent) *PublicAPI { func NewPublicAPI(latestBlockChangedEvent *latestBlockChangedEvent,
transactionSentToUpstreamEvent *transactionSentToUpstreamEvent) *PublicAPI {
return &PublicAPI{ return &PublicAPI{
filters: make(map[rpc.ID]*filter), filters: make(map[rpc.ID]*filter),
event: event, latestBlockChangedEvent: latestBlockChangedEvent,
transactionSentToUpstreamEvent: transactionSentToUpstreamEvent,
} }
} }
@ -64,8 +67,8 @@ func (api *PublicAPI) NewBlockFilter() rpc.ID {
api.filters[id] = f api.filters[id] = f
go func() { go func() {
id, s := api.event.Subscribe() id, s := api.latestBlockChangedEvent.Subscribe()
defer api.event.Unsubscribe(id) defer api.latestBlockChangedEvent.Unsubscribe(id)
for { for {
select { select {
@ -81,6 +84,36 @@ func (api *PublicAPI) NewBlockFilter() rpc.ID {
return id return id
} }
// NewPendingTransactionFilter is an implementation of `eth_newPendingTransactionFilter` API
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newpendingtransactionfilter
func (api *PublicAPI) NewPendingTransactionFilter() rpc.ID {
api.filtersMu.Lock()
defer api.filtersMu.Unlock()
f := newFilter()
id := rpc.ID(uuid.New())
api.filters[id] = f
go func() {
id, s := api.transactionSentToUpstreamEvent.Subscribe()
defer api.transactionSentToUpstreamEvent.Unsubscribe(id)
for {
select {
case hash := <-s:
f.AddHash(hash)
case <-f.done:
return
}
}
}()
return id
}
// UninstallFilter is an implemenation of `eth_uninstallFilter` API // UninstallFilter is an implemenation of `eth_uninstallFilter` API
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_uninstallfilter // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_uninstallfilter
func (api *PublicAPI) UninstallFilter(id rpc.ID) bool { func (api *PublicAPI) UninstallFilter(id rpc.ID) bool {

View File

@ -1,6 +1,7 @@
package rpcfilters package rpcfilters
import ( import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
@ -11,14 +12,19 @@ var _ node.Service = (*Service)(nil)
// Service represents out own implementation of personal sign operations. // Service represents out own implementation of personal sign operations.
type Service struct { type Service struct {
latestBlockChangedEvent *latestBlockChangedEvent latestBlockChangedEvent *latestBlockChangedEvent
transactionSentToUpstreamEvent *transactionSentToUpstreamEvent
} }
// New returns a new Service. // New returns a new Service.
func New(rpc rpcProvider) *Service { func New(rpc rpcProvider) *Service {
provider := &latestBlockProviderRPC{rpc} provider := &latestBlockProviderRPC{rpc}
event := newLatestBlockChangedEvent(provider) latestBlockChangedEvent := newLatestBlockChangedEvent(provider)
return &Service{event} transactionSentToUpstreamEvent := newTransactionSentToUpstreamEvent()
return &Service{
latestBlockChangedEvent,
transactionSentToUpstreamEvent,
}
} }
// Protocols returns a new protocols list. In this case, there are none. // Protocols returns a new protocols list. In this case, there are none.
@ -32,19 +38,32 @@ func (s *Service) APIs() []rpc.API {
{ {
Namespace: "eth", Namespace: "eth",
Version: "1.0", Version: "1.0",
Service: NewPublicAPI(s.latestBlockChangedEvent), Service: NewPublicAPI(
Public: true, s.latestBlockChangedEvent,
s.transactionSentToUpstreamEvent),
Public: true,
}, },
} }
} }
// Start is run when a service is started. // Start is run when a service is started.
func (s *Service) Start(server *p2p.Server) error { func (s *Service) Start(server *p2p.Server) error {
err := s.transactionSentToUpstreamEvent.Start()
if err != nil {
return err
}
return s.latestBlockChangedEvent.Start() return s.latestBlockChangedEvent.Start()
} }
// Stop is run when a service is stopped. // Stop is run when a service is stopped.
func (s *Service) Stop() error { func (s *Service) Stop() error {
s.transactionSentToUpstreamEvent.Stop()
s.latestBlockChangedEvent.Stop() s.latestBlockChangedEvent.Stop()
return nil return nil
} }
// TriggerTransactionSentToUpstreamEvent notifies the subscribers
// of the TransactionSentToUpstream event
func (s *Service) TriggerTransactionSentToUpstreamEvent(transactionHash common.Hash) {
s.transactionSentToUpstreamEvent.Trigger(transactionHash)
}

View File

@ -0,0 +1,103 @@
package rpcfilters
import (
"errors"
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
)
// transactionSentToUpstreamEvent represents an event that one can subscribe to
type transactionSentToUpstreamEvent struct {
sxMu sync.Mutex
sx map[int]chan common.Hash
listener chan common.Hash
quit chan struct{}
}
func newTransactionSentToUpstreamEvent() *transactionSentToUpstreamEvent {
return &transactionSentToUpstreamEvent{
sx: make(map[int]chan common.Hash),
listener: make(chan common.Hash),
}
}
func (e *transactionSentToUpstreamEvent) Start() error {
if e.quit != nil {
return errors.New("latest transaction sent to upstream event is already started")
}
e.quit = make(chan struct{})
go func() {
for {
select {
case transactionHash := <-e.listener:
if e.numberOfSubscriptions() == 0 {
continue
}
e.processTransactionSentToUpstream(transactionHash)
case <-e.quit:
return
}
}
}()
return nil
}
func (e *transactionSentToUpstreamEvent) numberOfSubscriptions() int {
e.sxMu.Lock()
defer e.sxMu.Unlock()
return len(e.sx)
}
func (e *transactionSentToUpstreamEvent) processTransactionSentToUpstream(transactionHash common.Hash) {
e.sxMu.Lock()
defer e.sxMu.Unlock()
for id, channel := range e.sx {
select {
case channel <- transactionHash:
default:
log.Error("dropping messages %s for subscriotion %d because the channel is full", transactionHash, id)
}
}
}
func (e *transactionSentToUpstreamEvent) Stop() {
if e.quit == nil {
return
}
select {
case <-e.quit:
return
default:
close(e.quit)
}
}
func (e *transactionSentToUpstreamEvent) Subscribe() (int, chan common.Hash) {
e.sxMu.Lock()
defer e.sxMu.Unlock()
channel := make(chan common.Hash, 512)
id := len(e.sx)
e.sx[id] = channel
return id, channel
}
func (e *transactionSentToUpstreamEvent) Unsubscribe(id int) {
e.sxMu.Lock()
defer e.sxMu.Unlock()
delete(e.sx, id)
}
// Trigger gets called in order to trigger the event
func (e *transactionSentToUpstreamEvent) Trigger(transactionHash common.Hash) {
e.listener <- transactionHash
}

View File

@ -0,0 +1,54 @@
package rpcfilters
import (
"sync"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
var transactionHashes = []common.Hash{common.HexToHash("0xAA"), common.HexToHash("0xBB"), common.HexToHash("0xCC")}
func TestTransactionSentToUpstreamEventMultipleSubscribe(t *testing.T) {
event := newTransactionSentToUpstreamEvent()
require.NoError(t, event.Start())
defer event.Stop()
var subscriptionChannels []chan common.Hash
for i := 0; i < 3; i++ {
id, channel := event.Subscribe()
// test id assignment
require.Equal(t, i, id)
// test numberOfSubscriptions
require.Equal(t, event.numberOfSubscriptions(), i+1)
subscriptionChannels = append(subscriptionChannels, channel)
}
var wg sync.WaitGroup
wg.Add(9)
go func() {
for _, channel := range subscriptionChannels {
ch := channel
go func() {
for _, expectedHash := range transactionHashes {
select {
case receivedHash := <-ch:
require.Equal(t, expectedHash, receivedHash)
case <-time.After(1 * time.Second):
assert.Fail(t, "timeout")
}
wg.Done()
}
}()
}
}()
for _, hashToTrigger := range transactionHashes {
event.Trigger(hashToTrigger)
}
wg.Wait()
}