diff --git a/eth-node/bridge/geth/public_waku_api.go b/eth-node/bridge/geth/public_waku_api.go index 7016d52b1..32b7365d9 100644 --- a/eth-node/bridge/geth/public_waku_api.go +++ b/eth-node/bridge/geth/public_waku_api.go @@ -10,7 +10,7 @@ import ( wakucommon "github.com/status-im/status-go/waku/common" ) -type gethPublicWakuAPIWrapper struct { +type GethPublicWakuAPIWrapper struct { api *waku.PublicWakuAPI } @@ -20,29 +20,29 @@ func NewGethPublicWakuAPIWrapper(api *waku.PublicWakuAPI) types.PublicWakuAPI { panic("PublicWakuAPI cannot be nil") } - return &gethPublicWakuAPIWrapper{ + return &GethPublicWakuAPIWrapper{ api: api, } } // AddPrivateKey imports the given private key. -func (w *gethPublicWakuAPIWrapper) AddPrivateKey(ctx context.Context, privateKey types.HexBytes) (string, error) { +func (w *GethPublicWakuAPIWrapper) AddPrivateKey(ctx context.Context, privateKey types.HexBytes) (string, error) { return w.api.AddPrivateKey(ctx, hexutil.Bytes(privateKey)) } // GenerateSymKeyFromPassword derives a key from the given password, stores it, and returns its ID. -func (w *gethPublicWakuAPIWrapper) GenerateSymKeyFromPassword(ctx context.Context, passwd string) (string, error) { +func (w *GethPublicWakuAPIWrapper) GenerateSymKeyFromPassword(ctx context.Context, passwd string) (string, error) { return w.api.GenerateSymKeyFromPassword(ctx, passwd) } // DeleteKeyPair removes the key with the given key if it exists. -func (w *gethPublicWakuAPIWrapper) DeleteKeyPair(ctx context.Context, key string) (bool, error) { +func (w *GethPublicWakuAPIWrapper) DeleteKeyPair(ctx context.Context, key string) (bool, error) { return w.api.DeleteKeyPair(ctx, key) } // NewMessageFilter creates a new filter that can be used to poll for // (new) messages that satisfy the given criteria. -func (w *gethPublicWakuAPIWrapper) NewMessageFilter(req types.Criteria) (string, error) { +func (w *GethPublicWakuAPIWrapper) NewMessageFilter(req types.Criteria) (string, error) { topics := make([]wakucommon.TopicType, len(req.Topics)) for index, tt := range req.Topics { topics[index] = wakucommon.TopicType(tt) @@ -59,13 +59,13 @@ func (w *gethPublicWakuAPIWrapper) NewMessageFilter(req types.Criteria) (string, return w.api.NewMessageFilter(criteria) } -func (w *gethPublicWakuAPIWrapper) BloomFilter() []byte { +func (w *GethPublicWakuAPIWrapper) BloomFilter() []byte { return w.api.BloomFilter() } // GetFilterMessages returns the messages that match the filter criteria and // are received between the last poll and now. -func (w *gethPublicWakuAPIWrapper) GetFilterMessages(id string) ([]*types.Message, error) { +func (w *GethPublicWakuAPIWrapper) GetFilterMessages(id string) ([]*types.Message, error) { msgs, err := w.api.GetFilterMessages(id) if err != nil { return nil, err @@ -91,7 +91,7 @@ func (w *gethPublicWakuAPIWrapper) GetFilterMessages(id string) ([]*types.Messag // Post posts a message on the network. // returns the hash of the message in case of success. -func (w *gethPublicWakuAPIWrapper) Post(ctx context.Context, req types.NewMessage) ([]byte, error) { +func (w *GethPublicWakuAPIWrapper) Post(ctx context.Context, req types.NewMessage) ([]byte, error) { msg := waku.NewMessage{ SymKeyID: req.SymKeyID, PublicKey: req.PublicKey, diff --git a/eth-node/bridge/geth/waku.go b/eth-node/bridge/geth/waku.go index 7b67b31ce..42a222d88 100644 --- a/eth-node/bridge/geth/waku.go +++ b/eth-node/bridge/geth/waku.go @@ -15,7 +15,7 @@ import ( wakucommon "github.com/status-im/status-go/waku/common" ) -type gethWakuWrapper struct { +type GethWakuWrapper struct { waku *waku.Waku } @@ -25,112 +25,112 @@ func NewGethWakuWrapper(w *waku.Waku) types.Waku { panic("waku cannot be nil") } - return &gethWakuWrapper{ + return &GethWakuWrapper{ waku: w, } } // GetGethWhisperFrom retrieves the underlying whisper Whisper struct from a wrapped Whisper interface func GetGethWakuFrom(m types.Waku) *waku.Waku { - return m.(*gethWakuWrapper).waku + return m.(*GethWakuWrapper).waku } -func (w *gethWakuWrapper) PublicWakuAPI() types.PublicWakuAPI { +func (w *GethWakuWrapper) PublicWakuAPI() types.PublicWakuAPI { return NewGethPublicWakuAPIWrapper(waku.NewPublicWakuAPI(w.waku)) } -func (w *gethWakuWrapper) Version() uint { +func (w *GethWakuWrapper) Version() uint { return 1 } // Added for compatibility with waku V2 -func (w *gethWakuWrapper) PeerCount() int { +func (w *GethWakuWrapper) PeerCount() int { return -1 } // Added for compatibility with waku V2 -func (w *gethWakuWrapper) StartDiscV5() error { +func (w *GethWakuWrapper) StartDiscV5() error { return errors.New("not available in WakuV1") } // Added for compatibility with waku V2 -func (w *gethWakuWrapper) StopDiscV5() error { +func (w *GethWakuWrapper) StopDiscV5() error { return errors.New("not available in WakuV1") } // PeerCount function only added for compatibility with waku V2 -func (w *gethWakuWrapper) AddStorePeer(address string) (peer.ID, error) { +func (w *GethWakuWrapper) AddStorePeer(address string) (peer.ID, error) { return "", errors.New("not available in WakuV1") } // SubscribeToPubsubTopic function only added for compatibility with waku V2 -func (w *gethWakuWrapper) SubscribeToPubsubTopic(topic string, optPublicKey *ecdsa.PublicKey) error { +func (w *GethWakuWrapper) SubscribeToPubsubTopic(topic string, optPublicKey *ecdsa.PublicKey) error { // not available in WakuV1 return errors.New("not available in WakuV1") } -func (w *gethWakuWrapper) UnsubscribeFromPubsubTopic(topic string) error { +func (w *GethWakuWrapper) UnsubscribeFromPubsubTopic(topic string) error { // not available in WakuV1 return errors.New("not available in WakuV1") } -func (w *gethWakuWrapper) RetrievePubsubTopicKey(topic string) (*ecdsa.PrivateKey, error) { +func (w *GethWakuWrapper) RetrievePubsubTopicKey(topic string) (*ecdsa.PrivateKey, error) { // not available in WakuV1 return nil, errors.New("not available in WakuV1") } -func (w *gethWakuWrapper) StorePubsubTopicKey(topic string, privKey *ecdsa.PrivateKey) error { +func (w *GethWakuWrapper) StorePubsubTopicKey(topic string, privKey *ecdsa.PrivateKey) error { // not available in WakuV1 return errors.New("not available in WakuV1") } -func (w *gethWakuWrapper) RemovePubsubTopicKey(topic string) error { +func (w *GethWakuWrapper) RemovePubsubTopicKey(topic string) error { // not available in WakuV1 return errors.New("not available in WakuV1") } // AddRelayPeer function only added for compatibility with waku V2 -func (w *gethWakuWrapper) AddRelayPeer(address string) (peer.ID, error) { +func (w *GethWakuWrapper) AddRelayPeer(address string) (peer.ID, error) { return "", errors.New("not available in WakuV1") } // DialPeer function only added for compatibility with waku V2 -func (w *gethWakuWrapper) DialPeer(address string) error { +func (w *GethWakuWrapper) DialPeer(address string) error { return errors.New("not available in WakuV1") } // DialPeerByID function only added for compatibility with waku V2 -func (w *gethWakuWrapper) DialPeerByID(peerID string) error { +func (w *GethWakuWrapper) DialPeerByID(peerID string) error { return errors.New("not available in WakuV1") } // ListenAddresses function only added for compatibility with waku V2 -func (w *gethWakuWrapper) ListenAddresses() ([]string, error) { +func (w *GethWakuWrapper) ListenAddresses() ([]string, error) { return nil, errors.New("not available in WakuV1") } // PeerCount function only added for compatibility with waku V2 -func (w *gethWakuWrapper) DropPeer(peerID string) error { +func (w *GethWakuWrapper) DropPeer(peerID string) error { return errors.New("not available in WakuV1") } -func (w *gethWakuWrapper) SubscribeToConnStatusChanges() (*types.ConnStatusSubscription, error) { +func (w *GethWakuWrapper) SubscribeToConnStatusChanges() (*types.ConnStatusSubscription, error) { return nil, errors.New("not available in WakuV1") } // Peers function only added for compatibility with waku V2 -func (w *gethWakuWrapper) Peers() map[string]types.WakuV2Peer { +func (w *GethWakuWrapper) Peers() map[string]types.WakuV2Peer { p := make(map[string]types.WakuV2Peer) return p } // MinPow returns the PoW value required by this node. -func (w *gethWakuWrapper) MinPow() float64 { +func (w *GethWakuWrapper) MinPow() float64 { return w.waku.MinPow() } // MaxMessageSize returns the MaxMessageSize set -func (w *gethWakuWrapper) MaxMessageSize() uint32 { +func (w *GethWakuWrapper) MaxMessageSize() uint32 { return w.waku.MaxMessageSize() } @@ -138,16 +138,16 @@ func (w *gethWakuWrapper) MaxMessageSize() uint32 { // The nodes are required to send only messages that match the advertised bloom filter. // If a message does not match the bloom, it will tantamount to spam, and the peer will // be disconnected. -func (w *gethWakuWrapper) BloomFilter() []byte { +func (w *GethWakuWrapper) BloomFilter() []byte { return w.waku.BloomFilter() } // GetCurrentTime returns current time. -func (w *gethWakuWrapper) GetCurrentTime() time.Time { +func (w *GethWakuWrapper) GetCurrentTime() time.Time { return w.waku.CurrentTime() } -func (w *gethWakuWrapper) SubscribeEnvelopeEvents(eventsProxy chan<- types.EnvelopeEvent) types.Subscription { +func (w *GethWakuWrapper) SubscribeEnvelopeEvents(eventsProxy chan<- types.EnvelopeEvent) types.Subscription { events := make(chan wakucommon.EnvelopeEvent, 100) // must be buffered to prevent blocking whisper go func() { for e := range events { @@ -158,37 +158,37 @@ func (w *gethWakuWrapper) SubscribeEnvelopeEvents(eventsProxy chan<- types.Envel return NewGethSubscriptionWrapper(w.waku.SubscribeEnvelopeEvents(events)) } -func (w *gethWakuWrapper) GetPrivateKey(id string) (*ecdsa.PrivateKey, error) { +func (w *GethWakuWrapper) GetPrivateKey(id string) (*ecdsa.PrivateKey, error) { return w.waku.GetPrivateKey(id) } // AddKeyPair imports a asymmetric private key and returns a deterministic identifier. -func (w *gethWakuWrapper) AddKeyPair(key *ecdsa.PrivateKey) (string, error) { +func (w *GethWakuWrapper) AddKeyPair(key *ecdsa.PrivateKey) (string, error) { return w.waku.AddKeyPair(key) } // DeleteKeyPair deletes the key with the specified ID if it exists. -func (w *gethWakuWrapper) DeleteKeyPair(keyID string) bool { +func (w *GethWakuWrapper) DeleteKeyPair(keyID string) bool { return w.waku.DeleteKeyPair(keyID) } -func (w *gethWakuWrapper) AddSymKeyDirect(key []byte) (string, error) { +func (w *GethWakuWrapper) AddSymKeyDirect(key []byte) (string, error) { return w.waku.AddSymKeyDirect(key) } -func (w *gethWakuWrapper) AddSymKeyFromPassword(password string) (string, error) { +func (w *GethWakuWrapper) AddSymKeyFromPassword(password string) (string, error) { return w.waku.AddSymKeyFromPassword(password) } -func (w *gethWakuWrapper) DeleteSymKey(id string) bool { +func (w *GethWakuWrapper) DeleteSymKey(id string) bool { return w.waku.DeleteSymKey(id) } -func (w *gethWakuWrapper) GetSymKey(id string) ([]byte, error) { +func (w *GethWakuWrapper) GetSymKey(id string) ([]byte, error) { return w.waku.GetSymKey(id) } -func (w *gethWakuWrapper) Subscribe(opts *types.SubscriptionOptions) (string, error) { +func (w *GethWakuWrapper) Subscribe(opts *types.SubscriptionOptions) (string, error) { var ( err error keyAsym *ecdsa.PrivateKey @@ -222,23 +222,23 @@ func (w *gethWakuWrapper) Subscribe(opts *types.SubscriptionOptions) (string, er return id, nil } -func (w *gethWakuWrapper) GetStats() types.StatsSummary { +func (w *GethWakuWrapper) GetStats() types.StatsSummary { return w.waku.GetStats() } -func (w *gethWakuWrapper) GetFilter(id string) types.Filter { +func (w *GethWakuWrapper) GetFilter(id string) types.Filter { return NewWakuFilterWrapper(w.waku.GetFilter(id), id) } -func (w *gethWakuWrapper) Unsubscribe(ctx context.Context, id string) error { +func (w *GethWakuWrapper) Unsubscribe(ctx context.Context, id string) error { return w.waku.Unsubscribe(id) } -func (w *gethWakuWrapper) UnsubscribeMany(ids []string) error { +func (w *GethWakuWrapper) UnsubscribeMany(ids []string) error { return w.waku.UnsubscribeMany(ids) } -func (w *gethWakuWrapper) createFilterWrapper(id string, keyAsym *ecdsa.PrivateKey, keySym []byte, pow float64, topics [][]byte) (types.Filter, error) { +func (w *GethWakuWrapper) createFilterWrapper(id string, keyAsym *ecdsa.PrivateKey, keySym []byte, pow float64, topics [][]byte) (types.Filter, error) { return NewWakuFilterWrapper(&wakucommon.Filter{ KeyAsym: keyAsym, KeySym: keySym, @@ -249,7 +249,7 @@ func (w *gethWakuWrapper) createFilterWrapper(id string, keyAsym *ecdsa.PrivateK }, id), nil } -func (w *gethWakuWrapper) SendMessagesRequest(peerID []byte, r types.MessagesRequest) error { +func (w *GethWakuWrapper) SendMessagesRequest(peerID []byte, r types.MessagesRequest) error { return w.waku.SendMessagesRequest(peerID, wakucommon.MessagesRequest{ ID: r.ID, From: r.From, @@ -266,25 +266,25 @@ func (w *gethWakuWrapper) SendMessagesRequest(peerID []byte, r types.MessagesReq // request and respond with a number of peer-to-peer messages (possibly expired), // which are not supposed to be forwarded any further. // The whisper protocol is agnostic of the format and contents of envelope. -func (w *gethWakuWrapper) RequestHistoricMessagesWithTimeout(peerID []byte, envelope types.Envelope, timeout time.Duration) error { +func (w *GethWakuWrapper) RequestHistoricMessagesWithTimeout(peerID []byte, envelope types.Envelope, timeout time.Duration) error { return w.waku.RequestHistoricMessagesWithTimeout(peerID, envelope.Unwrap().(*wakucommon.Envelope), timeout) } -func (w *gethWakuWrapper) ProcessingP2PMessages() bool { +func (w *GethWakuWrapper) ProcessingP2PMessages() bool { return w.waku.ProcessingP2PMessages() } -func (w *gethWakuWrapper) MarkP2PMessageAsProcessed(hash common.Hash) { +func (w *GethWakuWrapper) MarkP2PMessageAsProcessed(hash common.Hash) { w.waku.MarkP2PMessageAsProcessed(hash) } -func (w *gethWakuWrapper) RequestStoreMessages(ctx context.Context, peerID []byte, r types.MessagesRequest, processEnvelopes bool) (*types.StoreRequestCursor, int, error) { +func (w *GethWakuWrapper) RequestStoreMessages(ctx context.Context, peerID []byte, r types.MessagesRequest, processEnvelopes bool) (*types.StoreRequestCursor, int, error) { return nil, 0, errors.New("not implemented") } -func (w *gethWakuWrapper) ConnectionChanged(_ connection.State) {} +func (w *GethWakuWrapper) ConnectionChanged(_ connection.State) {} -func (w *gethWakuWrapper) ClearEnvelopesCache() { +func (w *GethWakuWrapper) ClearEnvelopesCache() { w.waku.ClearEnvelopesCache() } diff --git a/protocol/messenger_waku_wrapper_test.go b/protocol/messenger_waku_wrapper_test.go new file mode 100644 index 000000000..2bdfd68e2 --- /dev/null +++ b/protocol/messenger_waku_wrapper_test.go @@ -0,0 +1,76 @@ +package protocol + +import ( + "context" + + "go.uber.org/zap" + + gethbridge "github.com/status-im/status-go/eth-node/bridge/geth" + "github.com/status-im/status-go/eth-node/types" + "github.com/status-im/status-go/waku" +) + +type testWakuWrapper struct { + *gethbridge.GethWakuWrapper + + publicWakuAPIWrapper *testPublicWakuAPIWrapper +} + +func newTestWaku(w *waku.Waku) types.Waku { + wrapper := gethbridge.NewGethWakuWrapper(w) + return &testWakuWrapper{ + GethWakuWrapper: wrapper.(*gethbridge.GethWakuWrapper), + publicWakuAPIWrapper: newTestPublicWakuAPI(waku.NewPublicWakuAPI(w)).(*testPublicWakuAPIWrapper), + } +} + +func (tw *testWakuWrapper) PublicWakuAPI() types.PublicWakuAPI { + return tw.publicWakuAPIWrapper +} + +func (tw *testWakuWrapper) SubscribePostEvents() chan *PostMessageSubscription { + subscription := make(chan *PostMessageSubscription, 100) + tw.publicWakuAPIWrapper.postSubscriptions = append(tw.publicWakuAPIWrapper.postSubscriptions, subscription) + return subscription +} + +type PostMessageSubscription struct { + id []byte + msg *types.NewMessage +} + +type testPublicWakuAPIWrapper struct { + *gethbridge.GethPublicWakuAPIWrapper + + postSubscriptions []chan *PostMessageSubscription +} + +func newTestPublicWakuAPI(api *waku.PublicWakuAPI) types.PublicWakuAPI { + wrapper := gethbridge.NewGethPublicWakuAPIWrapper(api) + return &testPublicWakuAPIWrapper{ + GethPublicWakuAPIWrapper: wrapper.(*gethbridge.GethPublicWakuAPIWrapper), + } +} + +func (tp *testPublicWakuAPIWrapper) Post(ctx context.Context, req types.NewMessage) ([]byte, error) { + id, err := tp.GethPublicWakuAPIWrapper.Post(ctx, req) + if err != nil { + return nil, err + } + for _, s := range tp.postSubscriptions { + select { + case s <- &PostMessageSubscription{id: id, msg: &req}: + default: + // subscription channel full + } + } + return id, err +} + +func newTestWakuWrapper(config *waku.Config, logger *zap.Logger) (*testWakuWrapper, error) { + if config == nil { + config = &waku.DefaultConfig + } + w := waku.New(config, logger) + return newTestWaku(w).(*testWakuWrapper), w.Start() +}