From 8d8880dc627add6cec8bf89f656fa593eb464286 Mon Sep 17 00:00:00 2001 From: Pedro Pombeiro Date: Wed, 18 Dec 2019 14:02:43 +0100 Subject: [PATCH] Get rid of `types.Whisper.Poll` method --- eth-node/bridge/geth/whisper.go | 4 - eth-node/bridge/nimbus/node.go | 131 ++++++++++++++---- eth-node/bridge/nimbus/routine_queue.go | 4 + eth-node/bridge/nimbus/whisper.go | 35 ++--- eth-node/types/whisper.go | 3 - go.sum | 1 + .../status-go/eth-node/bridge/geth/whisper.go | 4 - .../status-go/eth-node/types/whisper.go | 3 - 8 files changed, 124 insertions(+), 61 deletions(-) diff --git a/eth-node/bridge/geth/whisper.go b/eth-node/bridge/geth/whisper.go index c4d73bc1c..9fbfced31 100644 --- a/eth-node/bridge/geth/whisper.go +++ b/eth-node/bridge/geth/whisper.go @@ -32,10 +32,6 @@ func (w *gethWhisperWrapper) PublicWhisperAPI() types.PublicWhisperAPI { return NewGethPublicWhisperAPIWrapper(whisper.NewPublicWhisperAPI(w.whisper)) } -func (w *gethWhisperWrapper) Poll() { - // noop -} - // MinPow returns the PoW value required by this node. func (w *gethWhisperWrapper) MinPow() float64 { return w.whisper.MinPow() diff --git a/eth-node/bridge/nimbus/node.go b/eth-node/bridge/nimbus/node.go index d168c107c..3007daae0 100644 --- a/eth-node/bridge/nimbus/node.go +++ b/eth-node/bridge/nimbus/node.go @@ -5,6 +5,7 @@ package nimbusbridge // https://golang.org/cmd/cgo/ /* +#cgo LDFLAGS: -Wl,-rpath,'$ORIGIN' -L${SRCDIR} -lnimbus -lm #include #include #include @@ -18,6 +19,10 @@ import ( "runtime" "strconv" "strings" + "sync" + "syscall" + "time" + "unsafe" "github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/eth-node/types" @@ -25,44 +30,84 @@ import ( "go.uber.org/zap" ) -func Init() { - runtime.LockOSThread() +type nimbusNodeWrapper struct { + mu sync.Mutex + + routineQueue *RoutineQueue + tid int + nodeStarted bool + cancelPollingChan chan struct{} + + w types.Whisper } -func StartNimbus(privateKey *ecdsa.PrivateKey, listenAddr string, staging bool) error { - C.NimMain() +type Node interface { + types.Node - port, err := strconv.Atoi(strings.Split(listenAddr, ":")[1]) - if err != nil { - return fmt.Errorf("failed to parse port number from %s", listenAddr) + StartNimbus(privateKey *ecdsa.PrivateKey, listenAddr string, staging bool) error + Stop() +} + +func NewNodeBridge() Node { + c := make(chan Node, 1) + go func(c chan<- Node, delay time.Duration) { + runtime.LockOSThread() + defer runtime.UnlockOSThread() + + n := &nimbusNodeWrapper{ + routineQueue: NewRoutineQueue(), + tid: syscall.Gettid(), + cancelPollingChan: make(chan struct{}, 1), + } + c <- n + + for { + select { + case <-time.After(delay): + n.poll() + case <-n.cancelPollingChan: + return + } + } + }(c, 50*time.Millisecond) + + return <-c +} + +func (n *nimbusNodeWrapper) StartNimbus(privateKey *ecdsa.PrivateKey, listenAddr string, staging bool) error { + retVal := n.routineQueue.Send(func(c chan<- interface{}) { + c <- startNimbus(privateKey, listenAddr, staging) + n.nodeStarted = true + }) + if err, ok := retVal.(error); ok { + return err } - - privateKeyC := C.CBytes(crypto.FromECDSA(privateKey)) - defer C.free(privateKeyC) - if !C.nimbus_start(C.ushort(port), true, false, 0.002, (*C.uchar)(privateKeyC), C.bool(staging)) { - return errors.New("failed to start Nimbus node") - } - return nil } -type nimbusNodeWrapper struct { - w types.Whisper +func (n *nimbusNodeWrapper) Stop() { + if n.cancelPollingChan != nil { + close(n.cancelPollingChan) + n.nodeStarted = false + n.cancelPollingChan = nil + } } -func NewNodeBridge() types.Node { - return &nimbusNodeWrapper{w: NewNimbusWhisperWrapper()} -} - -func (w *nimbusNodeWrapper) NewENSVerifier(_ *zap.Logger) enstypes.ENSVerifier { +func (n *nimbusNodeWrapper) NewENSVerifier(_ *zap.Logger) enstypes.ENSVerifier { panic("not implemented") } -func (w *nimbusNodeWrapper) GetWhisper(ctx interface{}) (types.Whisper, error) { - return w.w, nil +func (n *nimbusNodeWrapper) GetWhisper(ctx interface{}) (types.Whisper, error) { + n.mu.Lock() + defer n.mu.Unlock() + + if n.w == nil { + n.w = NewNimbusWhisperWrapper(n.routineQueue) + } + return n.w, nil } -func (w *nimbusNodeWrapper) AddPeer(url string) error { +func (n *nimbusNodeWrapper) AddPeer(url string) error { urlC := C.CString(url) defer C.free(unsafe.Pointer(urlC)) if !C.nimbus_add_peer(urlC) { @@ -72,6 +117,42 @@ func (w *nimbusNodeWrapper) AddPeer(url string) error { return nil } -func (w *nimbusNodeWrapper) RemovePeer(url string) error { +func (n *nimbusNodeWrapper) RemovePeer(url string) error { panic("TODO: RemovePeer") } + +func (n *nimbusNodeWrapper) poll() { + if syscall.Gettid() != n.tid { + panic("poll called from wrong thread") + } + + if (n.nodeStarted) { + C.nimbus_poll() + } + + n.routineQueue.HandleEvent() +} + +func startNimbus(privateKey *ecdsa.PrivateKey, listenAddr string, staging bool) error { + C.NimMain() + + if listenAddr == "" { + listenAddr = ":30304" + } + addrParts := strings.Split(listenAddr, ":") + port, err := strconv.Atoi(addrParts[len(addrParts)-1]) + if err != nil { + return fmt.Errorf("failed to parse port number from %s", listenAddr) + } + + var privateKeyC unsafe.Pointer + if privateKey != nil { + privateKeyC = C.CBytes(crypto.FromECDSA(privateKey)) + defer C.free(privateKeyC) + } + if !C.nimbus_start(C.ushort(port), true, false, 0.002, (*C.uchar)(privateKeyC), C.bool(staging)) { + return errors.New("failed to start Nimbus node") + } + + return nil +} diff --git a/eth-node/bridge/nimbus/routine_queue.go b/eth-node/bridge/nimbus/routine_queue.go index 3b8029b3b..5a76e8cb8 100644 --- a/eth-node/bridge/nimbus/routine_queue.go +++ b/eth-node/bridge/nimbus/routine_queue.go @@ -31,6 +31,10 @@ type event struct { } func (q *RoutineQueue) HandleEvent() { + if syscall.Gettid() != q.tid { + panic("HandleEvent called from wrong thread") + } + select { case ev := <-q.events: ev.f(ev.done) diff --git a/eth-node/bridge/nimbus/whisper.go b/eth-node/bridge/nimbus/whisper.go index 8bb4be764..994954d72 100644 --- a/eth-node/bridge/nimbus/whisper.go +++ b/eth-node/bridge/nimbus/whisper.go @@ -20,7 +20,6 @@ import ( "errors" "fmt" "sync" - "syscall" "time" "unsafe" @@ -35,17 +34,15 @@ type nimbusWhisperWrapper struct { filterMessagesMu sync.Mutex filterMessages map[string]*list.List routineQueue *RoutineQueue - tid int } // NewNimbusWhisperWrapper returns an object that wraps Nimbus' Whisper in a types interface -func NewNimbusWhisperWrapper() types.Whisper { +func NewNimbusWhisperWrapper(routineQueue *RoutineQueue) types.Whisper { return &nimbusWhisperWrapper{ timesource: func() time.Time { return time.Now() }, filters: map[string]types.Filter{}, filterMessages: map[string]*list.List{}, - routineQueue: NewRoutineQueue(), - tid: syscall.Gettid(), + routineQueue: routineQueue, } } @@ -53,16 +50,6 @@ func (w *nimbusWhisperWrapper) PublicWhisperAPI() types.PublicWhisperAPI { return NewNimbusPublicWhisperAPIWrapper(&w.filterMessagesMu, &w.filterMessages, w.routineQueue) } -func (w *nimbusWhisperWrapper) Poll() { - if syscall.Gettid() != w.tid { - panic("Poll called from wrong thread") - } - - C.nimbus_poll() - - w.routineQueue.HandleEvent() -} - // MinPow returns the PoW value required by this node. func (w *nimbusWhisperWrapper) MinPow() float64 { return w.routineQueue.Send(func(c chan<- interface{}) { @@ -80,7 +67,7 @@ func (w *nimbusWhisperWrapper) BloomFilter() []byte { dataC := C.malloc(C.size_t(C.BLOOM_LEN)) defer C.free(unsafe.Pointer(dataC)) - C.nimbus_get_bloom_filter((*C.uchar)(unsafe.Pointer(dataC))) + C.nimbus_get_bloom_filter((*C.uchar)(dataC)) // Move the returned data into a Go array data := make([]byte, C.BLOOM_LEN) @@ -121,7 +108,7 @@ func (w *nimbusWhisperWrapper) GetPrivateKey(id string) (*ecdsa.PrivateKey, erro privKeyC := C.malloc(types.AesKeyLength) defer C.free(unsafe.Pointer(privKeyC)) - if ok := C.nimbus_get_private_key(idC, (*C.uchar)(unsafe.Pointer(privKeyC))); !ok { + if !C.nimbus_get_private_key(idC, (*C.uchar)(privKeyC)) { c <- errors.New("failed to get private key from Nimbus") return } @@ -150,7 +137,7 @@ func (w *nimbusWhisperWrapper) AddKeyPair(key *ecdsa.PrivateKey) (string, error) idC := C.malloc(C.size_t(C.ID_LEN)) defer C.free(idC) - if !C.nimbus_add_keypair((*C.uchar)(unsafe.Pointer(privKeyC)), (*C.uchar)(idC)) { + if !C.nimbus_add_keypair((*C.uchar)(privKeyC), (*C.uchar)(idC)) { c <- errors.New("failed to add keypair to Nimbus") return } @@ -185,7 +172,7 @@ func (w *nimbusWhisperWrapper) AddSymKeyDirect(key []byte) (string, error) { idC := C.malloc(C.size_t(C.ID_LEN)) defer C.free(idC) - if !C.nimbus_add_symkey((*C.uchar)(unsafe.Pointer(keyC)), (*C.uchar)(idC)) { + if !C.nimbus_add_symkey((*C.uchar)(keyC), (*C.uchar)(idC)) { c <- errors.New("failed to add symkey to Nimbus") return } @@ -245,7 +232,7 @@ func (w *nimbusWhisperWrapper) GetSymKey(id string) ([]byte, error) { // Allocate a buffer for Nimbus to return the symkey on dataC := C.malloc(C.size_t(C.SYMKEY_LEN)) defer C.free(unsafe.Pointer(dataC)) - if ok := C.nimbus_get_symkey(idC, (*C.uchar)(unsafe.Pointer(dataC))); !ok { + if !C.nimbus_get_symkey(idC, (*C.uchar)(dataC)) { c <- errors.New("symkey not found") return } @@ -329,7 +316,7 @@ func (w *nimbusWhisperWrapper) GetFilter(id string) types.Filter { } func (w *nimbusWhisperWrapper) Unsubscribe(id string) error { - return w.routineQueue.Send(func(c chan<- interface{}) { + retVal := w.routineQueue.Send(func(c chan<- interface{}) { idC, err := decodeHexID(id) if err != nil { c <- err @@ -355,7 +342,11 @@ func (w *nimbusWhisperWrapper) Unsubscribe(id string) error { } c <- nil - }).(error) + }) + if err, ok := retVal.(error); ok { + return err + } + return nil } func decodeHexID(id string) (*C.uint8_t, error) { diff --git a/eth-node/types/whisper.go b/eth-node/types/whisper.go index 17fc26480..a8a34eab3 100644 --- a/eth-node/types/whisper.go +++ b/eth-node/types/whisper.go @@ -26,9 +26,6 @@ type SubscriptionOptions struct { type Whisper interface { PublicWhisperAPI() PublicWhisperAPI - // Poll must be run periodically on the main thread by the host application - Poll() - // MinPow returns the PoW value required by this node. MinPow() float64 // BloomFilter returns the aggregated bloom filter for all the topics of interest. diff --git a/go.sum b/go.sum index 6eae071ad..53a1d07fd 100644 --- a/go.sum +++ b/go.sum @@ -416,6 +416,7 @@ github.com/mattn/go-isatty v0.0.5 h1:tHXDdz1cpzGaovsTB+TVB8q90WEokoVmfMqoVcrLUgw github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.7 h1:UvyT9uN+3r7yLEYSlJsbQGdsaB/a0DlgWP3pql6iwOc= github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +github.com/mattn/go-pointer v0.0.0-20190911064623-a0a44394634f h1:QTRRO+ozoYgT3CQRIzNVYJRU3DB8HRnkZv6mr4ISmMA= github.com/mattn/go-pointer v0.0.0-20190911064623-a0a44394634f/go.mod h1:2zXcozF6qYGgmsG+SeTZz3oAbFLdD3OWqnUbNvJZAlc= github.com/mattn/go-runewidth v0.0.3 h1:a+kO+98RDGEfo6asOGMmpodZq4FNtnGP54yps8BzLR4= github.com/mattn/go-runewidth v0.0.3/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= diff --git a/vendor/github.com/status-im/status-go/eth-node/bridge/geth/whisper.go b/vendor/github.com/status-im/status-go/eth-node/bridge/geth/whisper.go index c4d73bc1c..9fbfced31 100644 --- a/vendor/github.com/status-im/status-go/eth-node/bridge/geth/whisper.go +++ b/vendor/github.com/status-im/status-go/eth-node/bridge/geth/whisper.go @@ -32,10 +32,6 @@ func (w *gethWhisperWrapper) PublicWhisperAPI() types.PublicWhisperAPI { return NewGethPublicWhisperAPIWrapper(whisper.NewPublicWhisperAPI(w.whisper)) } -func (w *gethWhisperWrapper) Poll() { - // noop -} - // MinPow returns the PoW value required by this node. func (w *gethWhisperWrapper) MinPow() float64 { return w.whisper.MinPow() diff --git a/vendor/github.com/status-im/status-go/eth-node/types/whisper.go b/vendor/github.com/status-im/status-go/eth-node/types/whisper.go index 17fc26480..a8a34eab3 100644 --- a/vendor/github.com/status-im/status-go/eth-node/types/whisper.go +++ b/vendor/github.com/status-im/status-go/eth-node/types/whisper.go @@ -26,9 +26,6 @@ type SubscriptionOptions struct { type Whisper interface { PublicWhisperAPI() PublicWhisperAPI - // Poll must be run periodically on the main thread by the host application - Poll() - // MinPow returns the PoW value required by this node. MinPow() float64 // BloomFilter returns the aggregated bloom filter for all the topics of interest.