Get rid of `types.Whisper.Poll` method

This commit is contained in:
Pedro Pombeiro 2019-12-18 14:02:43 +01:00 committed by Pedro Pombeiro
parent dd894ece15
commit 8d8880dc62
8 changed files with 124 additions and 61 deletions

View File

@ -32,10 +32,6 @@ func (w *gethWhisperWrapper) PublicWhisperAPI() types.PublicWhisperAPI {
return NewGethPublicWhisperAPIWrapper(whisper.NewPublicWhisperAPI(w.whisper))
}
func (w *gethWhisperWrapper) Poll() {
// noop
}
// MinPow returns the PoW value required by this node.
func (w *gethWhisperWrapper) MinPow() float64 {
return w.whisper.MinPow()

View File

@ -5,6 +5,7 @@ package nimbusbridge
// https://golang.org/cmd/cgo/
/*
#cgo LDFLAGS: -Wl,-rpath,'$ORIGIN' -L${SRCDIR} -lnimbus -lm
#include <stddef.h>
#include <stdbool.h>
#include <stdlib.h>
@ -18,6 +19,10 @@ import (
"runtime"
"strconv"
"strings"
"sync"
"syscall"
"time"
"unsafe"
"github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/eth-node/types"
@ -25,44 +30,84 @@ import (
"go.uber.org/zap"
)
func Init() {
runtime.LockOSThread()
type nimbusNodeWrapper struct {
mu sync.Mutex
routineQueue *RoutineQueue
tid int
nodeStarted bool
cancelPollingChan chan struct{}
w types.Whisper
}
func StartNimbus(privateKey *ecdsa.PrivateKey, listenAddr string, staging bool) error {
C.NimMain()
type Node interface {
types.Node
port, err := strconv.Atoi(strings.Split(listenAddr, ":")[1])
if err != nil {
return fmt.Errorf("failed to parse port number from %s", listenAddr)
StartNimbus(privateKey *ecdsa.PrivateKey, listenAddr string, staging bool) error
Stop()
}
func NewNodeBridge() Node {
c := make(chan Node, 1)
go func(c chan<- Node, delay time.Duration) {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
n := &nimbusNodeWrapper{
routineQueue: NewRoutineQueue(),
tid: syscall.Gettid(),
cancelPollingChan: make(chan struct{}, 1),
}
c <- n
for {
select {
case <-time.After(delay):
n.poll()
case <-n.cancelPollingChan:
return
}
}
}(c, 50*time.Millisecond)
return <-c
}
func (n *nimbusNodeWrapper) StartNimbus(privateKey *ecdsa.PrivateKey, listenAddr string, staging bool) error {
retVal := n.routineQueue.Send(func(c chan<- interface{}) {
c <- startNimbus(privateKey, listenAddr, staging)
n.nodeStarted = true
})
if err, ok := retVal.(error); ok {
return err
}
privateKeyC := C.CBytes(crypto.FromECDSA(privateKey))
defer C.free(privateKeyC)
if !C.nimbus_start(C.ushort(port), true, false, 0.002, (*C.uchar)(privateKeyC), C.bool(staging)) {
return errors.New("failed to start Nimbus node")
}
return nil
}
type nimbusNodeWrapper struct {
w types.Whisper
func (n *nimbusNodeWrapper) Stop() {
if n.cancelPollingChan != nil {
close(n.cancelPollingChan)
n.nodeStarted = false
n.cancelPollingChan = nil
}
}
func NewNodeBridge() types.Node {
return &nimbusNodeWrapper{w: NewNimbusWhisperWrapper()}
}
func (w *nimbusNodeWrapper) NewENSVerifier(_ *zap.Logger) enstypes.ENSVerifier {
func (n *nimbusNodeWrapper) NewENSVerifier(_ *zap.Logger) enstypes.ENSVerifier {
panic("not implemented")
}
func (w *nimbusNodeWrapper) GetWhisper(ctx interface{}) (types.Whisper, error) {
return w.w, nil
func (n *nimbusNodeWrapper) GetWhisper(ctx interface{}) (types.Whisper, error) {
n.mu.Lock()
defer n.mu.Unlock()
if n.w == nil {
n.w = NewNimbusWhisperWrapper(n.routineQueue)
}
return n.w, nil
}
func (w *nimbusNodeWrapper) AddPeer(url string) error {
func (n *nimbusNodeWrapper) AddPeer(url string) error {
urlC := C.CString(url)
defer C.free(unsafe.Pointer(urlC))
if !C.nimbus_add_peer(urlC) {
@ -72,6 +117,42 @@ func (w *nimbusNodeWrapper) AddPeer(url string) error {
return nil
}
func (w *nimbusNodeWrapper) RemovePeer(url string) error {
func (n *nimbusNodeWrapper) RemovePeer(url string) error {
panic("TODO: RemovePeer")
}
func (n *nimbusNodeWrapper) poll() {
if syscall.Gettid() != n.tid {
panic("poll called from wrong thread")
}
if (n.nodeStarted) {
C.nimbus_poll()
}
n.routineQueue.HandleEvent()
}
func startNimbus(privateKey *ecdsa.PrivateKey, listenAddr string, staging bool) error {
C.NimMain()
if listenAddr == "" {
listenAddr = ":30304"
}
addrParts := strings.Split(listenAddr, ":")
port, err := strconv.Atoi(addrParts[len(addrParts)-1])
if err != nil {
return fmt.Errorf("failed to parse port number from %s", listenAddr)
}
var privateKeyC unsafe.Pointer
if privateKey != nil {
privateKeyC = C.CBytes(crypto.FromECDSA(privateKey))
defer C.free(privateKeyC)
}
if !C.nimbus_start(C.ushort(port), true, false, 0.002, (*C.uchar)(privateKeyC), C.bool(staging)) {
return errors.New("failed to start Nimbus node")
}
return nil
}

View File

@ -31,6 +31,10 @@ type event struct {
}
func (q *RoutineQueue) HandleEvent() {
if syscall.Gettid() != q.tid {
panic("HandleEvent called from wrong thread")
}
select {
case ev := <-q.events:
ev.f(ev.done)

View File

@ -20,7 +20,6 @@ import (
"errors"
"fmt"
"sync"
"syscall"
"time"
"unsafe"
@ -35,17 +34,15 @@ type nimbusWhisperWrapper struct {
filterMessagesMu sync.Mutex
filterMessages map[string]*list.List
routineQueue *RoutineQueue
tid int
}
// NewNimbusWhisperWrapper returns an object that wraps Nimbus' Whisper in a types interface
func NewNimbusWhisperWrapper() types.Whisper {
func NewNimbusWhisperWrapper(routineQueue *RoutineQueue) types.Whisper {
return &nimbusWhisperWrapper{
timesource: func() time.Time { return time.Now() },
filters: map[string]types.Filter{},
filterMessages: map[string]*list.List{},
routineQueue: NewRoutineQueue(),
tid: syscall.Gettid(),
routineQueue: routineQueue,
}
}
@ -53,16 +50,6 @@ func (w *nimbusWhisperWrapper) PublicWhisperAPI() types.PublicWhisperAPI {
return NewNimbusPublicWhisperAPIWrapper(&w.filterMessagesMu, &w.filterMessages, w.routineQueue)
}
func (w *nimbusWhisperWrapper) Poll() {
if syscall.Gettid() != w.tid {
panic("Poll called from wrong thread")
}
C.nimbus_poll()
w.routineQueue.HandleEvent()
}
// MinPow returns the PoW value required by this node.
func (w *nimbusWhisperWrapper) MinPow() float64 {
return w.routineQueue.Send(func(c chan<- interface{}) {
@ -80,7 +67,7 @@ func (w *nimbusWhisperWrapper) BloomFilter() []byte {
dataC := C.malloc(C.size_t(C.BLOOM_LEN))
defer C.free(unsafe.Pointer(dataC))
C.nimbus_get_bloom_filter((*C.uchar)(unsafe.Pointer(dataC)))
C.nimbus_get_bloom_filter((*C.uchar)(dataC))
// Move the returned data into a Go array
data := make([]byte, C.BLOOM_LEN)
@ -121,7 +108,7 @@ func (w *nimbusWhisperWrapper) GetPrivateKey(id string) (*ecdsa.PrivateKey, erro
privKeyC := C.malloc(types.AesKeyLength)
defer C.free(unsafe.Pointer(privKeyC))
if ok := C.nimbus_get_private_key(idC, (*C.uchar)(unsafe.Pointer(privKeyC))); !ok {
if !C.nimbus_get_private_key(idC, (*C.uchar)(privKeyC)) {
c <- errors.New("failed to get private key from Nimbus")
return
}
@ -150,7 +137,7 @@ func (w *nimbusWhisperWrapper) AddKeyPair(key *ecdsa.PrivateKey) (string, error)
idC := C.malloc(C.size_t(C.ID_LEN))
defer C.free(idC)
if !C.nimbus_add_keypair((*C.uchar)(unsafe.Pointer(privKeyC)), (*C.uchar)(idC)) {
if !C.nimbus_add_keypair((*C.uchar)(privKeyC), (*C.uchar)(idC)) {
c <- errors.New("failed to add keypair to Nimbus")
return
}
@ -185,7 +172,7 @@ func (w *nimbusWhisperWrapper) AddSymKeyDirect(key []byte) (string, error) {
idC := C.malloc(C.size_t(C.ID_LEN))
defer C.free(idC)
if !C.nimbus_add_symkey((*C.uchar)(unsafe.Pointer(keyC)), (*C.uchar)(idC)) {
if !C.nimbus_add_symkey((*C.uchar)(keyC), (*C.uchar)(idC)) {
c <- errors.New("failed to add symkey to Nimbus")
return
}
@ -245,7 +232,7 @@ func (w *nimbusWhisperWrapper) GetSymKey(id string) ([]byte, error) {
// Allocate a buffer for Nimbus to return the symkey on
dataC := C.malloc(C.size_t(C.SYMKEY_LEN))
defer C.free(unsafe.Pointer(dataC))
if ok := C.nimbus_get_symkey(idC, (*C.uchar)(unsafe.Pointer(dataC))); !ok {
if !C.nimbus_get_symkey(idC, (*C.uchar)(dataC)) {
c <- errors.New("symkey not found")
return
}
@ -329,7 +316,7 @@ func (w *nimbusWhisperWrapper) GetFilter(id string) types.Filter {
}
func (w *nimbusWhisperWrapper) Unsubscribe(id string) error {
return w.routineQueue.Send(func(c chan<- interface{}) {
retVal := w.routineQueue.Send(func(c chan<- interface{}) {
idC, err := decodeHexID(id)
if err != nil {
c <- err
@ -355,7 +342,11 @@ func (w *nimbusWhisperWrapper) Unsubscribe(id string) error {
}
c <- nil
}).(error)
})
if err, ok := retVal.(error); ok {
return err
}
return nil
}
func decodeHexID(id string) (*C.uint8_t, error) {

View File

@ -26,9 +26,6 @@ type SubscriptionOptions struct {
type Whisper interface {
PublicWhisperAPI() PublicWhisperAPI
// Poll must be run periodically on the main thread by the host application
Poll()
// MinPow returns the PoW value required by this node.
MinPow() float64
// BloomFilter returns the aggregated bloom filter for all the topics of interest.

1
go.sum
View File

@ -416,6 +416,7 @@ github.com/mattn/go-isatty v0.0.5 h1:tHXDdz1cpzGaovsTB+TVB8q90WEokoVmfMqoVcrLUgw
github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.7 h1:UvyT9uN+3r7yLEYSlJsbQGdsaB/a0DlgWP3pql6iwOc=
github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-pointer v0.0.0-20190911064623-a0a44394634f h1:QTRRO+ozoYgT3CQRIzNVYJRU3DB8HRnkZv6mr4ISmMA=
github.com/mattn/go-pointer v0.0.0-20190911064623-a0a44394634f/go.mod h1:2zXcozF6qYGgmsG+SeTZz3oAbFLdD3OWqnUbNvJZAlc=
github.com/mattn/go-runewidth v0.0.3 h1:a+kO+98RDGEfo6asOGMmpodZq4FNtnGP54yps8BzLR4=
github.com/mattn/go-runewidth v0.0.3/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=

View File

@ -32,10 +32,6 @@ func (w *gethWhisperWrapper) PublicWhisperAPI() types.PublicWhisperAPI {
return NewGethPublicWhisperAPIWrapper(whisper.NewPublicWhisperAPI(w.whisper))
}
func (w *gethWhisperWrapper) Poll() {
// noop
}
// MinPow returns the PoW value required by this node.
func (w *gethWhisperWrapper) MinPow() float64 {
return w.whisper.MinPow()

View File

@ -26,9 +26,6 @@ type SubscriptionOptions struct {
type Whisper interface {
PublicWhisperAPI() PublicWhisperAPI
// Poll must be run periodically on the main thread by the host application
Poll()
// MinPow returns the PoW value required by this node.
MinPow() float64
// BloomFilter returns the aggregated bloom filter for all the topics of interest.