feat: use protected topics for communities

refactor: associate chats to pubsub topics and populate these depending if the chat belongs to a community or not
refactor: add pubsub topic to mailserver batches
chore: ensure default relay messages continue working as they should
refactor: mailserver functions should be aware of pubsub topics
fix: use []byte for communityIDs
This commit is contained in:
Richard Ramos 2023-05-22 17:38:02 -04:00 committed by richΛrd
parent 8764170149
commit f9ec588c4e
36 changed files with 1033 additions and 531 deletions

View File

@ -0,0 +1,24 @@
CREATE TABLE IF NOT EXISTS pubsubtopic_signing_key (
topic VARCHAR NOT NULL,
priv_key BLOB NULL,
pub_key BLOB NOT NULL,
PRIMARY KEY (topic)
) WITHOUT ROWID;
CREATE TABLE IF NOT EXISTS mailserver_topics_new (
topic VARCHAR NOT NULL DEFAULT "",
pubsub_topic VARCHAR NOT NULL DEFAULT "/waku/2/default-waku/proto",
chat_ids VARCHAR,
last_request INTEGER DEFAULT 1,
discovery BOOLEAN DEFAULT FALSE,
negotiated BOOLEAN DEFAULT FALSE,
PRIMARY KEY(topic, pubsub_topic)
) WITHOUT ROWID;
INSERT INTO mailserver_topics_new
SELECT topic, "/waku/2/default-waku/proto", chat_ids, last_request, discovery, negotiated
FROM mailserver_topics;
DROP TABLE mailserver_topics;
ALTER TABLE mailserver_topics_new RENAME TO mailserver_topics;

View File

@ -53,10 +53,11 @@ func (w *gethPublicWakuV2APIWrapper) NewMessageFilter(req types.Criteria) (strin
}
criteria := wakuv2.Criteria{
SymKeyID: req.SymKeyID,
PrivateKeyID: req.PrivateKeyID,
Sig: req.Sig,
Topics: topics,
SymKeyID: req.SymKeyID,
PrivateKeyID: req.PrivateKeyID,
Sig: req.Sig,
PubsubTopic: req.PubsubTopic,
ContentTopics: topics,
}
return w.api.NewMessageFilter(criteria)
}
@ -72,13 +73,14 @@ func (w *gethPublicWakuV2APIWrapper) GetFilterMessages(id string) ([]*types.Mess
wrappedMsgs := make([]*types.Message, len(msgs))
for index, msg := range msgs {
wrappedMsgs[index] = &types.Message{
Sig: msg.Sig,
Timestamp: msg.Timestamp,
Topic: types.TopicType(msg.Topic),
Payload: msg.Payload,
Padding: msg.Padding,
Hash: msg.Hash,
Dst: msg.Dst,
Sig: msg.Sig,
Timestamp: msg.Timestamp,
PubsubTopic: msg.PubsubTopic,
Topic: types.TopicType(msg.ContentTopic),
Payload: msg.Payload,
Padding: msg.Padding,
Hash: msg.Hash,
Dst: msg.Dst,
}
}
return wrappedMsgs, nil
@ -88,14 +90,15 @@ func (w *gethPublicWakuV2APIWrapper) GetFilterMessages(id string) ([]*types.Mess
// returns the hash of the message in case of success.
func (w *gethPublicWakuV2APIWrapper) Post(ctx context.Context, req types.NewMessage) ([]byte, error) {
msg := wakuv2.NewMessage{
SymKeyID: req.SymKeyID,
PublicKey: req.PublicKey,
Sig: req.SigID, // Sig is really a SigID
Topic: wakucommon.TopicType(req.Topic),
Payload: req.Payload,
Padding: req.Padding,
TargetPeer: req.TargetPeer,
Ephemeral: req.Ephemeral,
SymKeyID: req.SymKeyID,
PublicKey: req.PublicKey,
Sig: req.SigID, // Sig is really a SigID
PubsubTopic: req.PubsubTopic,
ContentTopic: wakucommon.TopicType(req.Topic),
Payload: req.Payload,
Padding: req.Padding,
TargetPeer: req.TargetPeer,
Ephemeral: req.Ephemeral,
}
return w.api.Post(ctx, msg)
}

View File

@ -63,6 +63,16 @@ func (w *gethWakuWrapper) AddStorePeer(address string) (peer.ID, error) {
return "", errors.New("not available in WakuV1")
}
// SubscribeToPubsubTopic function only added for compatibility with waku V2
func (w *gethWakuWrapper) SubscribeToPubsubTopic(topic string, optPublicKey *ecdsa.PublicKey) error {
// not available in WakuV1
return errors.New("not available in WakuV1")
}
func (w *gethWakuWrapper) StorePubsubTopicKey(topic string, privKey *ecdsa.PrivateKey) error {
return errors.New("not available in WakuV1")
}
// AddRelayPeer function only added for compatibility with waku V2
func (w *gethWakuWrapper) AddRelayPeer(address string) (peer.ID, error) {
return "", errors.New("not available in WakuV1")
@ -231,7 +241,7 @@ func (w *gethWakuWrapper) SendMessagesRequest(peerID []byte, r types.MessagesReq
Limit: r.Limit,
Cursor: r.Cursor,
Bloom: r.Bloom,
Topics: r.Topics,
Topics: r.ContentTopics,
})
}

View File

@ -130,7 +130,7 @@ func (w *gethWakuV2Wrapper) Subscribe(opts *types.SubscriptionOptions) (string,
}
}
f, err := w.createFilterWrapper("", keyAsym, keySym, opts.PoW, opts.Topics)
f, err := w.createFilterWrapper("", keyAsym, keySym, opts.PoW, opts.PubsubTopic, opts.Topics)
if err != nil {
return "", err
}
@ -160,12 +160,13 @@ func (w *gethWakuV2Wrapper) UnsubscribeMany(ids []string) error {
return w.waku.UnsubscribeMany(ids)
}
func (w *gethWakuV2Wrapper) createFilterWrapper(id string, keyAsym *ecdsa.PrivateKey, keySym []byte, pow float64, topics [][]byte) (types.Filter, error) {
func (w *gethWakuV2Wrapper) createFilterWrapper(id string, keyAsym *ecdsa.PrivateKey, keySym []byte, pow float64, pubsubTopic string, topics [][]byte) (types.Filter, error) {
return NewWakuV2FilterWrapper(&wakucommon.Filter{
KeyAsym: keyAsym,
KeySym: keySym,
Topics: topics,
Messages: wakucommon.NewMemoryMessageStore(),
KeyAsym: keyAsym,
KeySym: keySym,
Topics: topics,
PubsubTopic: pubsubTopic,
Messages: wakucommon.NewMemoryMessageStore(),
}, id), nil
}
@ -194,12 +195,12 @@ func (w *gethWakuV2Wrapper) RequestStoreMessages(ctx context.Context, peerID []b
}))
}
var topics []wakucommon.TopicType
for _, topic := range r.Topics {
topics = append(topics, wakucommon.BytesToTopic(topic))
var contentTopics []wakucommon.TopicType
for _, topic := range r.ContentTopics {
contentTopics = append(contentTopics, wakucommon.BytesToTopic(topic))
}
pbCursor, err := w.waku.Query(ctx, peer, topics, uint64(r.From), uint64(r.To), options)
pbCursor, err := w.waku.Query(ctx, peer, r.PubsubTopic, contentTopics, uint64(r.From), uint64(r.To), options)
if err != nil {
return nil, err
}
@ -229,6 +230,15 @@ func (w *gethWakuV2Wrapper) StopDiscV5() error {
return w.waku.StopDiscV5()
}
// Subscribe to a pubsub topic, passing an optional public key if the pubsub topic is protected
func (w *gethWakuV2Wrapper) SubscribeToPubsubTopic(topic string, optPublicKey *ecdsa.PublicKey) error {
return w.waku.SubscribeToPubsubTopic(topic, optPublicKey)
}
func (w *gethWakuV2Wrapper) StorePubsubTopicKey(topic string, privKey *ecdsa.PrivateKey) error {
return w.waku.StorePubsubTopicKey(topic, privKey)
}
func (w *gethWakuV2Wrapper) AddStorePeer(address string) (peer.ID, error) {
return w.waku.AddStorePeer(address)
}

View File

@ -28,10 +28,11 @@ type MessagesRequest struct {
StoreCursor *StoreRequestCursor `json:"storeCursor"`
// Bloom is a filter to match requested messages.
Bloom []byte `json:"bloom"`
// Topics is a list of topics. A returned message should
// PubsubTopic is the gossipsub topic on which the message was broadcasted
PubsubTopic string
// ContentTopics is a list of topics. A returned message should
// belong to one of the topics from the list.
Topics [][]byte `json:"topics"`
ContentTopics [][]byte `json:"topics"`
}
type StoreRequestCursor struct {

View File

@ -6,17 +6,18 @@ import (
// NewMessage represents a new whisper message that is posted through the RPC.
type NewMessage struct {
SymKeyID string `json:"symKeyID"`
PublicKey []byte `json:"pubKey"`
SigID string `json:"sig"`
TTL uint32 `json:"ttl"`
Topic TopicType `json:"topic"`
Payload []byte `json:"payload"`
Padding []byte `json:"padding"`
PowTime uint32 `json:"powTime"`
PowTarget float64 `json:"powTarget"`
TargetPeer string `json:"targetPeer"`
Ephemeral bool `json:"ephemeral"`
SymKeyID string `json:"symKeyID"`
PublicKey []byte `json:"pubKey"`
SigID string `json:"sig"`
TTL uint32 `json:"ttl"`
PubsubTopic string `json:"pubsubTopic"`
Topic TopicType `json:"topic"`
Payload []byte `json:"payload"`
Padding []byte `json:"padding"`
PowTime uint32 `json:"powTime"`
PowTarget float64 `json:"powTarget"`
TargetPeer string `json:"targetPeer"`
Ephemeral bool `json:"ephemeral"`
}
// Message is the RPC representation of a whisper message.
@ -24,6 +25,7 @@ type Message struct {
Sig []byte `json:"sig,omitempty"`
TTL uint32 `json:"ttl"`
Timestamp uint32 `json:"timestamp"`
PubsubTopic string `json:"pubsubTopic"`
Topic TopicType `json:"topic"`
Payload []byte `json:"payload"`
Padding []byte `json:"padding"`
@ -40,6 +42,7 @@ type Criteria struct {
PrivateKeyID string `json:"privateKeyID"`
Sig []byte `json:"sig"`
MinPow float64 `json:"minPow"`
PubsubTopic string `json:"pubsubTopic"`
Topics []TopicType `json:"topics"`
AllowP2P bool `json:"allowP2P"`
}

View File

@ -6,5 +6,6 @@ type SubscriptionOptions struct {
PrivateKeyID string
SymKeyID string
PoW float64
PubsubTopic string
Topics [][]byte
}

View File

@ -88,6 +88,10 @@ type Waku interface {
StopDiscV5() error
SubscribeToPubsubTopic(topic string, optPublicKey *ecdsa.PublicKey) error
StorePubsubTopicKey(topic string, privKey *ecdsa.PrivateKey) error
AddStorePeer(address string) (peer.ID, error)
AddRelayPeer(address string) (peer.ID, error)

View File

@ -4,6 +4,7 @@ import (
"context"
"crypto/ecdsa"
"database/sql"
"fmt"
"sync"
"time"
@ -336,7 +337,7 @@ func (s *MessageSender) sendCommunity(
if err != nil {
return nil, errors.Wrap(err, "failed to decompress pubkey")
}
hash, newMessage, err = s.dispatchCommunityMessage(ctx, pubkey, payload, messageIDs)
hash, newMessage, err = s.dispatchCommunityMessage(ctx, pubkey, payload, messageIDs, rawMessage.PubsubTopic)
if err != nil {
s.logger.Error("failed to send a community message", zap.Error(err))
return nil, errors.Wrap(err, "failed to send a message spec")
@ -524,10 +525,11 @@ func (s *MessageSender) EncodeAbridgedMembershipUpdate(
func (s *MessageSender) dispatchCommunityChatMessage(ctx context.Context, rawMessage *RawMessage, wrappedMessage []byte) ([]byte, *types.NewMessage, error) {
newMessage := &types.NewMessage{
TTL: whisperTTL,
Payload: wrappedMessage,
PowTarget: calculatePoW(wrappedMessage),
PowTime: whisperPoWTime,
TTL: whisperTTL,
Payload: wrappedMessage,
PowTarget: calculatePoW(wrappedMessage),
PowTime: whisperPoWTime,
PubsubTopic: rawMessage.PubsubTopic,
}
if rawMessage.BeforeDispatch != nil {
@ -586,6 +588,7 @@ func (s *MessageSender) SendPublic(
}
newMessage.Ephemeral = rawMessage.Ephemeral
newMessage.PubsubTopic = rawMessage.PubsubTopic
messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage)
rawMessage.ID = types.EncodeHex(messageID)
@ -786,6 +789,7 @@ func (s *MessageSender) handleErrDeviceNotFound(ctx context.Context, publicKey *
}
func (s *MessageSender) wrapMessageV1(rawMessage *RawMessage) ([]byte, error) {
fmt.Println("wrapMessageV1: pubsubTopic: ", rawMessage.PubsubTopic, " message type", rawMessage.MessageType.String())
wrappedMessage, err := v1protocol.WrapMessageV1(rawMessage.Payload, rawMessage.MessageType, rawMessage.Sender)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap message")
@ -856,10 +860,11 @@ func (s *MessageSender) sendDataSync(ctx context.Context, publicKey *ecdsa.Publi
// sendPrivateRawMessage sends a message not wrapped in an encryption layer
func (s *MessageSender) sendPrivateRawMessage(ctx context.Context, rawMessage *RawMessage, publicKey *ecdsa.PublicKey, payload []byte, messageIDs [][]byte) ([]byte, *types.NewMessage, error) {
newMessage := &types.NewMessage{
TTL: whisperTTL,
Payload: payload,
PowTarget: calculatePoW(payload),
PowTime: whisperPoWTime,
TTL: whisperTTL,
Payload: payload,
PowTarget: calculatePoW(payload),
PowTime: whisperPoWTime,
PubsubTopic: rawMessage.PubsubTopic,
}
var hash []byte
var err error
@ -878,12 +883,13 @@ func (s *MessageSender) sendPrivateRawMessage(ctx context.Context, rawMessage *R
// sendCommunityMessage sends a message not wrapped in an encryption layer
// to a community
func (s *MessageSender) dispatchCommunityMessage(ctx context.Context, publicKey *ecdsa.PublicKey, payload []byte, messageIDs [][]byte) ([]byte, *types.NewMessage, error) {
func (s *MessageSender) dispatchCommunityMessage(ctx context.Context, publicKey *ecdsa.PublicKey, payload []byte, messageIDs [][]byte, pubsubTopic string) ([]byte, *types.NewMessage, error) {
newMessage := &types.NewMessage{
TTL: whisperTTL,
Payload: payload,
PowTarget: calculatePoW(payload),
PowTime: whisperPoWTime,
TTL: whisperTTL,
Payload: payload,
PowTarget: calculatePoW(payload),
PowTime: whisperPoWTime,
PubsubTopic: pubsubTopic,
}
hash, err := s.transport.SendCommunityMessage(ctx, newMessage, publicKey)

View File

@ -36,4 +36,5 @@ type RawMessage struct {
Ephemeral bool
BeforeDispatch func(*RawMessage) error
HashRatchetGroupID []byte
PubsubTopic string
}

View File

@ -10,6 +10,7 @@ import (
"time"
"github.com/golang/protobuf/proto"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"go.uber.org/zap"
"github.com/status-im/status-go/eth-node/crypto"
@ -19,6 +20,7 @@ import (
community_token "github.com/status-im/status-go/protocol/communities/token"
"github.com/status-im/status-go/protocol/protobuf"
"github.com/status-im/status-go/protocol/requests"
"github.com/status-im/status-go/protocol/transport"
"github.com/status-im/status-go/protocol/v1"
)
@ -1194,13 +1196,26 @@ func (o *Community) MemberUpdateChannelID() string {
return o.IDString() + "-memberUpdate"
}
func (o *Community) DefaultFilters() []string {
func (o *Community) PubsubTopic() string {
return transport.GetPubsubTopic(o.ID())
}
func (o *Community) DefaultFilters() []transport.FiltersToInitialize {
cID := o.IDString()
uncompressedPubKey := common.PubkeyToHex(o.config.ID)[2:]
updatesChannelID := o.StatusUpdatesChannelID()
mlChannelID := o.MagnetlinkMessageChannelID()
memberUpdateChannelID := o.MemberUpdateChannelID()
return []string{cID, uncompressedPubKey, updatesChannelID, mlChannelID, memberUpdateChannelID}
communityPubsubTopic := o.PubsubTopic()
return []transport.FiltersToInitialize{
{ChatID: cID, PubsubTopic: relay.DefaultWakuTopic}, // TODO: verify if this goes into default topic
{ChatID: uncompressedPubKey, PubsubTopic: communityPubsubTopic},
{ChatID: updatesChannelID, PubsubTopic: communityPubsubTopic},
{ChatID: mlChannelID, PubsubTopic: communityPubsubTopic},
{ChatID: memberUpdateChannelID, PubsubTopic: communityPubsubTopic},
}
}
func (o *Community) PrivateKey() *ecdsa.PrivateKey {

View File

@ -3369,7 +3369,7 @@ func (m *Manager) GetCommunityChatsTopics(communityID types.HexBytes) ([]types.T
topics := []types.TopicType{}
for _, filter := range filters {
topics = append(topics, filter.Topic)
topics = append(topics, filter.ContentTopic)
}
return topics, nil
@ -3412,7 +3412,7 @@ func (m *Manager) GetHistoryArchivePartitionStartTimestamp(communityID types.Hex
topics := []types.TopicType{}
for _, filter := range filters {
topics = append(topics, filter.Topic)
topics = append(topics, filter.ContentTopic)
}
lastArchiveEndDateTimestamp, err := m.GetLastMessageArchiveEndDate(communityID)

View File

@ -25,14 +25,14 @@ func (ckd *CommunitiesKeyDistributorImpl) Distribute(community *communities.Comm
return communities.ErrNotControlNode
}
err := ckd.distributeKey(community.ID(), community.ID(), &keyActions.CommunityKeyAction)
err := ckd.distributeKey(community, community.ID(), &keyActions.CommunityKeyAction)
if err != nil {
return err
}
for channelID := range keyActions.ChannelKeysActions {
keyAction := keyActions.ChannelKeysActions[channelID]
err := ckd.distributeKey(community.ID(), []byte(community.IDString()+channelID), &keyAction)
err := ckd.distributeKey(community, []byte(community.IDString()+channelID), &keyAction)
if err != nil {
return err
}
@ -46,7 +46,7 @@ func (ckd *CommunitiesKeyDistributorImpl) Rekey(community *communities.Community
return communities.ErrNotControlNode
}
err := ckd.distributeKey(community.ID(), community.ID(), &communities.EncryptionKeyAction{
err := ckd.distributeKey(community, community.ID(), &communities.EncryptionKeyAction{
ActionType: communities.EncryptionKeyRekey,
Members: community.Members(),
})
@ -55,7 +55,7 @@ func (ckd *CommunitiesKeyDistributorImpl) Rekey(community *communities.Community
}
for channelID, channel := range community.Chats() {
err := ckd.distributeKey(community.ID(), []byte(community.IDString()+channelID), &communities.EncryptionKeyAction{
err := ckd.distributeKey(community, []byte(community.IDString()+channelID), &communities.EncryptionKeyAction{
ActionType: communities.EncryptionKeyRekey,
Members: channel.Members,
})
@ -67,7 +67,7 @@ func (ckd *CommunitiesKeyDistributorImpl) Rekey(community *communities.Community
return nil
}
func (ckd *CommunitiesKeyDistributorImpl) distributeKey(communityID, hashRatchetGroupID []byte, keyAction *communities.EncryptionKeyAction) error {
func (ckd *CommunitiesKeyDistributorImpl) distributeKey(community *communities.Community, hashRatchetGroupID []byte, keyAction *communities.EncryptionKeyAction) error {
pubkeys := make([]*ecdsa.PublicKey, len(keyAction.Members))
i := 0
for hex := range keyAction.Members {
@ -82,19 +82,19 @@ func (ckd *CommunitiesKeyDistributorImpl) distributeKey(communityID, hashRatchet
return err
}
err = ckd.sendKeyExchangeMessage(communityID, hashRatchetGroupID, pubkeys, common.KeyExMsgReuse)
err = ckd.sendKeyExchangeMessage(community, hashRatchetGroupID, pubkeys, common.KeyExMsgReuse)
if err != nil {
return err
}
case communities.EncryptionKeyRekey:
err := ckd.sendKeyExchangeMessage(communityID, hashRatchetGroupID, pubkeys, common.KeyExMsgRekey)
err := ckd.sendKeyExchangeMessage(community, hashRatchetGroupID, pubkeys, common.KeyExMsgRekey)
if err != nil {
return err
}
case communities.EncryptionKeySendToMembers:
err := ckd.sendKeyExchangeMessage(communityID, hashRatchetGroupID, pubkeys, common.KeyExMsgReuse)
err := ckd.sendKeyExchangeMessage(community, hashRatchetGroupID, pubkeys, common.KeyExMsgReuse)
if err != nil {
return err
}
@ -103,14 +103,15 @@ func (ckd *CommunitiesKeyDistributorImpl) distributeKey(communityID, hashRatchet
return nil
}
func (ckd *CommunitiesKeyDistributorImpl) sendKeyExchangeMessage(communityID, hashRatchetGroupID []byte, pubkeys []*ecdsa.PublicKey, msgType common.CommKeyExMsgType) error {
func (ckd *CommunitiesKeyDistributorImpl) sendKeyExchangeMessage(community *communities.Community, hashRatchetGroupID []byte, pubkeys []*ecdsa.PublicKey, msgType common.CommKeyExMsgType) error {
rawMessage := common.RawMessage{
SkipProtocolLayer: false,
CommunityID: communityID,
CommunityID: community.ID(),
CommunityKeyExMsgType: msgType,
Recipients: pubkeys,
MessageType: protobuf.ApplicationMetadataMessage_CHAT_MESSAGE,
HashRatchetGroupID: hashRatchetGroupID,
PubsubTopic: community.PubsubTopic(), // TODO: confirm if it should be sent in community pubsub topic
}
_, err := ckd.sender.SendCommunityMessage(context.Background(), rawMessage)

View File

@ -19,6 +19,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"go.uber.org/zap"
"golang.org/x/time/rate"
@ -988,6 +989,7 @@ func (m *Messenger) publishContactCode() error {
}
for _, community := range joinedCommunities {
rawMessage.LocalChatID = community.MemberUpdateChannelID()
rawMessage.PubsubTopic = community.PubsubTopic()
_, err = m.sender.SendPublic(ctx, rawMessage.LocalChatID, rawMessage)
if err != nil {
return err
@ -1586,7 +1588,7 @@ func (m *Messenger) Init() error {
logger := m.logger.With(zap.String("site", "Init"))
var (
publicChatIDs []string
filtersToInit []transport.FiltersToInitialize
publicKeys []*ecdsa.PublicKey
)
@ -1596,7 +1598,7 @@ func (m *Messenger) Init() error {
}
for _, org := range joinedCommunities {
// the org advertise on the public topic derived by the pk
publicChatIDs = append(publicChatIDs, org.DefaultFilters()...)
filtersToInit = append(filtersToInit, org.DefaultFilters()...)
// This is for status-go versions that didn't have `CommunitySettings`
// We need to ensure communities that existed before community settings
@ -1644,21 +1646,24 @@ func (m *Messenger) Init() error {
}
for _, org := range spectatedCommunities {
publicChatIDs = append(publicChatIDs, org.DefaultFilters()...)
filtersToInit = append(filtersToInit, org.DefaultFilters()...)
}
// Init filters for the communities we control
var controlledCommunitiesPks []*ecdsa.PrivateKey
var communityFiltersToInitialize []transport.CommunityFilterToInitialize
controlledCommunities, err := m.communitiesManager.ControlledCommunities()
if err != nil {
return err
}
for _, c := range controlledCommunities {
controlledCommunitiesPks = append(controlledCommunitiesPks, c.PrivateKey())
communityFiltersToInitialize = append(communityFiltersToInitialize, transport.CommunityFilterToInitialize{
CommunityID: c.ID(),
PrivKey: c.PrivateKey(),
})
}
_, err = m.transport.InitCommunityFilters(controlledCommunitiesPks)
_, err = m.transport.InitCommunityFilters(communityFiltersToInitialize)
if err != nil {
return err
}
@ -1688,10 +1693,13 @@ func (m *Messenger) Init() error {
switch chat.ChatType {
case ChatTypePublic, ChatTypeProfile:
publicChatIDs = append(publicChatIDs, chat.ID)
filtersToInit = append(filtersToInit, transport.FiltersToInitialize{ChatID: chat.ID, PubsubTopic: relay.DefaultWakuTopic})
case ChatTypeCommunityChat:
// TODO not public chat now
publicChatIDs = append(publicChatIDs, chat.ID)
communityID, err := hexutil.Decode(chat.CommunityID)
if err != nil {
return err
}
filtersToInit = append(filtersToInit, transport.FiltersToInitialize{ChatID: chat.ID, PubsubTopic: transport.GetPubsubTopic(communityID)})
case ChatTypeOneToOne:
pk, err := chat.PublicKey()
if err != nil {
@ -1766,7 +1774,7 @@ func (m *Messenger) Init() error {
return err
}
_, err = m.transport.InitFilters(publicChatIDs, publicKeys)
_, err = m.transport.InitFilters(filtersToInit, publicKeys)
return err
}
@ -1935,6 +1943,7 @@ func (m *Messenger) reSendRawMessage(ctx context.Context, messageID string) erro
_, err = m.dispatchMessage(ctx, common.RawMessage{
LocalChatID: chat.ID,
Payload: message.Payload,
PubsubTopic: message.PubsubTopic,
MessageType: message.MessageType,
Recipients: message.Recipients,
ResendAutomatically: message.ResendAutomatically,
@ -2047,6 +2056,13 @@ func (m *Messenger) dispatchMessage(ctx context.Context, rawMessage common.RawMe
return rawMessage, err
}
case ChatTypeCommunityChat:
communityID, err := hexutil.Decode(chat.CommunityID)
if err != nil {
return rawMessage, err
}
rawMessage.PubsubTopic = transport.GetPubsubTopic(communityID)
// TODO: add grant
canPost, err := m.communitiesManager.CanPost(&m.identity.PublicKey, chat.CommunityID, chat.CommunityChatID(), nil)
if err != nil {
@ -3423,7 +3439,7 @@ func (m *Messenger) handleImportedMessages(messagesToHandle map[transport.Filter
}
messageState.CurrentMessageState.Message = protoMessage
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.Topic, filter.ChatID, msg.Type, messageState.CurrentMessageState.Message)
m.outputToCSV(msg.TransportMessage.Timestamp, msg.ID, senderID, filter.ContentTopic, filter.ChatID, msg.Type, messageState.CurrentMessageState.Message)
err = m.HandleImportedChatMessage(messageState)
if err != nil {
logger.Warn("failed to handle ChatMessage", zap.Error(err))

View File

@ -415,7 +415,7 @@ func (m *Messenger) deactivateChat(chatID string, deactivationClock uint64, shou
continue
}
err := m.mailserversDatabase.ResetLastRequest(filter.Topic.String())
err := m.mailserversDatabase.ResetLastRequest(filter.PubsubTopic, filter.ContentTopic.String())
if err != nil {
return nil, err
}

View File

@ -100,6 +100,7 @@ func (m *Messenger) publishCommunityEvents(msg *communities.CommunityEventsMessa
// we don't want to wrap in an encryption layer message
SkipProtocolLayer: true,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_EVENTS_MESSAGE,
PubsubTopic: transport.GetPubsubTopic(msg.CommunityID), // TODO: confirm if it should be sent in community pubsub topic
}
// TODO: resend in case of failure?
@ -129,6 +130,7 @@ func (m *Messenger) publishCommunityEventsRejected(community *communities.Commun
// we don't want to wrap in an encryption layer message
SkipProtocolLayer: true,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_EVENTS_MESSAGE_REJECTED,
PubsubTopic: community.PubsubTopic(), // TODO: confirm if it should be sent in community pubsub topic
}
// TODO: resend in case of failure?
@ -568,16 +570,17 @@ func (m *Messenger) CuratedCommunities() (*communities.KnownCommunitiesResponse,
func (m *Messenger) initCommunityChats(community *communities.Community) ([]*Chat, error) {
logger := m.logger.Named("initCommunityChats")
chatIDs := community.DefaultFilters()
publicFiltersToInit := community.DefaultFilters()
chats := CreateCommunityChats(community, m.getTimesource())
for _, chat := range chats {
chatIDs = append(chatIDs, chat.ID)
publicFiltersToInit = append(publicFiltersToInit, transport.FiltersToInitialize{ChatID: chat.ID, PubsubTopic: community.PubsubTopic()})
}
// Load transport filters
filters, err := m.transport.InitPublicFilters(chatIDs)
filters, err := m.transport.InitPublicFilters(publicFiltersToInit)
if err != nil {
logger.Debug("m.transport.InitPublicFilters error", zap.Error(err))
return nil, err
@ -585,7 +588,11 @@ func (m *Messenger) initCommunityChats(community *communities.Community) ([]*Cha
if community.IsControlNode() {
// Init the community filter so we can receive messages on the community
communityFilters, err := m.transport.InitCommunityFilters([]*ecdsa.PrivateKey{community.PrivateKey()})
communityFilters, err := m.transport.InitCommunityFilters([]transport.CommunityFilterToInitialize{{
CommunityID: community.ID(),
PrivKey: community.PrivateKey(),
}})
if err != nil {
return nil, err
}
@ -657,6 +664,16 @@ func (m *Messenger) JoinCommunity(ctx context.Context, communityID types.HexByte
return mr, nil
}
func (m *Messenger) subscribeToCommunityShard(communityID []byte) error {
// TODO: store private key and topic
// TODO: determine pubsub topic and public key for community
// TODO: this should probably be moved completely to transport once pubsub topic logic is implemented
pubsubTopic := transport.GetPubsubTopic(communityID)
var communityPubKey *ecdsa.PublicKey
return m.transport.SubscribeToPubsubTopic(pubsubTopic, communityPubKey)
}
func (m *Messenger) joinCommunity(ctx context.Context, communityID types.HexBytes, forceJoin bool) (*MessengerResponse, error) {
logger := m.logger.Named("joinCommunity")
@ -679,6 +696,10 @@ func (m *Messenger) joinCommunity(ctx context.Context, communityID types.HexByte
if _, err = m.initCommunitySettings(communityID); err != nil {
return nil, err
}
if err = m.subscribeToCommunityShard(communityID); err != nil {
return nil, err
}
}
communitySettings, err := m.communitiesManager.GetCommunitySettingsByID(communityID)
@ -730,6 +751,10 @@ func (m *Messenger) SpectateCommunity(communityID types.HexBytes) (*MessengerRes
response.AddCommunity(community)
if err = m.subscribeToCommunityShard(community.ID()); err != nil {
return nil, err
}
return response, nil
}
@ -1003,6 +1028,7 @@ func (m *Messenger) RequestToJoinCommunity(request *requests.RequestToJoinCommun
CommunityID: community.ID(),
SkipProtocolLayer: true,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_REQUEST_TO_JOIN,
PubsubTopic: community.PubsubTopic(), // TODO: confirm if it should be sent in community pubsub topic
}
_, err = m.sender.SendCommunityMessage(context.Background(), rawMessage)
@ -1146,6 +1172,7 @@ func (m *Messenger) EditSharedAddressesForCommunity(request *requests.EditShared
CommunityID: community.ID(),
SkipProtocolLayer: true,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_EDIT_SHARED_ADDRESSES,
PubsubTopic: community.PubsubTopic(),
}
_, err = m.sender.SendCommunityMessage(context.Background(), rawMessage)
@ -1301,6 +1328,7 @@ func (m *Messenger) CancelRequestToJoinCommunity(request *requests.CancelRequest
CommunityID: community.ID(),
SkipProtocolLayer: true,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_CANCEL_REQUEST_TO_JOIN,
PubsubTopic: community.PubsubTopic(), // TODO: confirm if it should be send in community pubsub topic
}
_, err = m.sender.SendCommunityMessage(context.Background(), rawMessage)
@ -1406,6 +1434,7 @@ func (m *Messenger) AcceptRequestToJoinCommunity(request *requests.AcceptRequest
Sender: community.PrivateKey(),
SkipProtocolLayer: true,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_REQUEST_TO_JOIN_RESPONSE,
PubsubTopic: community.PubsubTopic(), // TODO: confirm if it should be sent in community pubsub topic
}
_, err = m.sender.SendPrivate(context.Background(), pk, rawMessage)
@ -1611,6 +1640,7 @@ func (m *Messenger) LeaveCommunity(communityID types.HexBytes) (*MessengerRespon
CommunityID: communityID,
SkipProtocolLayer: true,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_REQUEST_TO_LEAVE,
PubsubTopic: transport.GetPubsubTopic(communityID), // TODO: confirm if it should be sent in the community pubsub topic
}
_, err = m.sender.SendCommunityMessage(context.Background(), rawMessage)
if err != nil {
@ -1757,16 +1787,17 @@ func (m *Messenger) CreateCommunityChat(communityID types.HexBytes, c *protobuf.
response.CommunityChanges = []*communities.CommunityChanges{changes}
var chats []*Chat
var chatIDs []string
var publicFiltersToInit []transport.FiltersToInitialize
for chatID, chat := range changes.ChatsAdded {
c := CreateCommunityChat(changes.Community.IDString(), chatID, chat, m.getTimesource())
chats = append(chats, c)
chatIDs = append(chatIDs, c.ID)
publicFiltersToInit = append(publicFiltersToInit, transport.FiltersToInitialize{ChatID: c.ID, PubsubTopic: changes.Community.PubsubTopic()})
response.AddChat(c)
}
// Load filters
filters, err := m.transport.InitPublicFilters(chatIDs)
filters, err := m.transport.InitPublicFilters(publicFiltersToInit)
if err != nil {
return nil, err
}
@ -1798,16 +1829,16 @@ func (m *Messenger) EditCommunityChat(communityID types.HexBytes, chatID string,
response.CommunityChanges = []*communities.CommunityChanges{changes}
var chats []*Chat
var chatIDs []string
var publicFiltersToInit []transport.FiltersToInitialize
for chatID, change := range changes.ChatsModified {
c := CreateCommunityChat(community.IDString(), chatID, change.ChatModified, m.getTimesource())
chats = append(chats, c)
chatIDs = append(chatIDs, c.ID)
publicFiltersToInit = append(publicFiltersToInit, transport.FiltersToInitialize{ChatID: c.ID, PubsubTopic: community.PubsubTopic()})
response.AddChat(c)
}
// Load filters
filters, err := m.transport.InitPublicFilters(chatIDs)
filters, err := m.transport.InitPublicFilters(publicFiltersToInit)
if err != nil {
return nil, err
}
@ -1862,8 +1893,15 @@ func (m *Messenger) CreateCommunity(request *requests.CreateCommunity, createDef
return nil, err
}
if err = m.subscribeToCommunityShard(community.ID()); err != nil {
return nil, err
}
// Init the community filter so we can receive messages on the community
_, err = m.transport.InitCommunityFilters([]*ecdsa.PrivateKey{community.PrivateKey()})
_, err = m.transport.InitCommunityFilters([]transport.CommunityFilterToInitialize{{
CommunityID: community.ID(),
PrivKey: community.PrivateKey(),
}})
if err != nil {
return nil, err
}
@ -2359,11 +2397,19 @@ func (m *Messenger) requestCommunityInfoFromMailserver(communityID string, waitF
return nil, nil
}
id, err := hexutil.Decode(communityID)
if err != nil {
return nil, err
}
//If filter wasn't installed we create it and remember for deinstalling after
//response received
filter := m.transport.FilterByChatID(communityID)
if filter == nil {
filters, err := m.transport.InitPublicFilters([]string{communityID})
filters, err := m.transport.InitPublicFilters([]transport.FiltersToInitialize{{
ChatID: communityID,
PubsubTopic: transport.GetPubsubTopic(id),
}})
if err != nil {
return nil, fmt.Errorf("Can't install filter for community: %v", err)
}
@ -2380,9 +2426,9 @@ func (m *Messenger) requestCommunityInfoFromMailserver(communityID string, waitF
to := uint32(m.transport.GetCurrentTime() / 1000)
from := to - oneMonthInSeconds
_, err := m.performMailserverRequest(func() (*MessengerResponse, error) {
_, err = m.performMailserverRequest(func() (*MessengerResponse, error) {
batch := MailserverBatch{From: from, To: to, Topics: []types.TopicType{filter.Topic}}
batch := MailserverBatch{From: from, To: to, Topics: []types.TopicType{filter.ContentTopic}}
m.logger.Info("Requesting historic")
err := m.processMailserverBatch(batch)
return nil, err
@ -2446,11 +2492,19 @@ func (m *Messenger) requestCommunitiesFromMailserver(communityIDs []string) {
continue
}
id, err := hexutil.Decode(communityID)
if err != nil {
continue
}
//If filter wasn't installed we create it and remember for deinstalling after
//response received
filter := m.transport.FilterByChatID(communityID)
if filter == nil {
filters, err := m.transport.InitPublicFilters([]string{communityID})
filters, err := m.transport.InitPublicFilters([]transport.FiltersToInitialize{{
ChatID: communityID,
PubsubTopic: transport.GetPubsubTopic(id),
}})
if err != nil {
m.logger.Error("Can't install filter for community", zap.Error(err))
continue
@ -2465,7 +2519,7 @@ func (m *Messenger) requestCommunitiesFromMailserver(communityIDs []string) {
//we don't remember filter id associated with community because it was already installed
m.requestedCommunities[communityID] = nil
}
topics = append(topics, filter.Topic)
topics = append(topics, filter.ContentTopic)
}
to := uint32(m.transport.GetCurrentTime() / 1000)
@ -2602,7 +2656,7 @@ func (m *Messenger) handleCommunityResponse(state *ReceivedMessageState, communi
// Update relevant chats names and add new ones
// Currently removal is not supported
chats := CreateCommunityChats(community, state.Timesource)
var chatIDs []string
var publicFiltersToInit []transport.FiltersToInitialize
for i, chat := range chats {
oldChat, ok := state.AllChats.Load(chat.ID)
@ -2611,7 +2665,10 @@ func (m *Messenger) handleCommunityResponse(state *ReceivedMessageState, communi
state.AllChats.Store(chat.ID, chats[i])
state.Response.AddChat(chat)
chatIDs = append(chatIDs, chat.ID)
publicFiltersToInit = append(publicFiltersToInit, transport.FiltersToInitialize{
ChatID: chat.ID,
PubsubTopic: community.PubsubTopic(),
})
// Update name, currently is the only field is mutable
} else if oldChat.Name != chat.Name ||
oldChat.Description != chat.Description ||
@ -2636,7 +2693,7 @@ func (m *Messenger) handleCommunityResponse(state *ReceivedMessageState, communi
}
// Load transport filters
filters, err := m.transport.InitPublicFilters(chatIDs)
filters, err := m.transport.InitPublicFilters(publicFiltersToInit)
if err != nil {
return err
}
@ -2953,7 +3010,7 @@ func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community
topics := []types.TopicType{}
for _, filter := range filters {
topics = append(topics, filter.Topic)
topics = append(topics, filter.ContentTopic)
}
// First we need to know the timestamp of the latest waku message
@ -3199,6 +3256,7 @@ func (m *Messenger) dispatchMagnetlinkMessage(communityID string) error {
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_MESSAGE_ARCHIVE_MAGNETLINK,
SkipGroupMessageWrap: true,
PubsubTopic: community.PubsubTopic(),
}
_, err = m.sender.SendPublic(context.Background(), chatID, rawMessage)
@ -4128,7 +4186,10 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor
}
// Init the community filter so we can receive messages on the community
_, err = m.transport.InitCommunityFilters([]*ecdsa.PrivateKey{discordCommunity.PrivateKey()})
_, err = m.transport.InitCommunityFilters([]transport.CommunityFilterToInitialize{{
CommunityID: discordCommunity.ID(),
PrivKey: discordCommunity.PrivateKey(),
}})
if err != nil {
m.cleanUpImport(communityID)
importProgress.AddTaskError(discord.InitCommunityTask, discord.Error(err.Error()))
@ -4307,7 +4368,7 @@ func (m *Messenger) pinMessagesToWakuMessages(pinMessages []*common.PinMessage,
wakuMessage := &types.Message{
Sig: crypto.FromECDSAPub(&c.PrivateKey().PublicKey),
Timestamp: uint32(msg.WhisperTimestamp / 1000),
Topic: filter.Topic,
Topic: filter.ContentTopic,
Payload: wrappedPayload,
Padding: []byte{1},
Hash: hash[:],
@ -4346,7 +4407,7 @@ func (m *Messenger) chatMessagesToWakuMessages(chatMessages []*common.Message, c
wakuMessage := &types.Message{
Sig: crypto.FromECDSAPub(&c.PrivateKey().PublicKey),
Timestamp: uint32(msg.WhisperTimestamp / 1000),
Topic: filter.Topic,
Topic: filter.ContentTopic,
Payload: wrappedPayload,
Padding: []byte{1},
Hash: hash[:],

View File

@ -8,6 +8,7 @@ import (
"time"
"github.com/golang/protobuf/proto"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"go.uber.org/zap"
"github.com/status-im/status-go/deprecation"
@ -468,7 +469,7 @@ func (m *Messenger) addContact(ctx context.Context, pubKey, ensName, nickname, d
if !deprecation.ChatProfileDeprecated {
response.AddChat(profileChat)
_, err = m.transport.InitFilters([]string{profileChat.ID}, []*ecdsa.PublicKey{publicKey})
_, err = m.transport.InitFilters([]transport.FiltersToInitialize{{ChatID: profileChat.ID, PubsubTopic: relay.DefaultWakuTopic}}, []*ecdsa.PublicKey{publicKey})
if err != nil {
return nil, err
}

View File

@ -9,6 +9,7 @@ import (
"time"
"github.com/pkg/errors"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"go.uber.org/zap"
"github.com/status-im/status-go/connection"
@ -217,19 +218,19 @@ func (m *Messenger) filtersForChat(chatID string) ([]*transport.Filter, error) {
return filters, nil
}
func (m *Messenger) topicsForChat(chatID string) ([]types.TopicType, error) {
func (m *Messenger) topicsForChat(chatID string) (string, []types.TopicType, error) {
filters, err := m.filtersForChat(chatID)
if err != nil {
return nil, err
return "", nil, err
}
var topics []types.TopicType
var contentTopics []types.TopicType
for _, filter := range filters {
topics = append(topics, filter.Topic)
contentTopics = append(contentTopics, filter.ContentTopic)
}
return topics, nil
return filters[0].PubsubTopic, contentTopics, nil
}
// Assume is a public chat for now
@ -250,7 +251,7 @@ func (m *Messenger) syncBackup() error {
to := m.calculateMailserverTo()
from := uint32(m.getTimesource().GetCurrentTime()/1000) - oneMonthInSeconds
batch := MailserverBatch{From: from, To: to, Topics: []types.TopicType{filter.Topic}}
batch := MailserverBatch{From: from, To: to, PubsubTopic: relay.DefaultWakuTopic, Topics: []types.TopicType{filter.ContentTopic}}
err := m.processMailserverBatch(batch)
if err != nil {
return err
@ -346,7 +347,7 @@ func (m *Messenger) syncFiltersFrom(filters []*transport.Filter, lastRequest uin
topicsData := make(map[string]mailservers.MailserverTopic)
for _, topic := range topicInfo {
topicsData[topic.Topic] = topic
topicsData[fmt.Sprintf("%s-%s", topic.PubsubTopic, topic.ContentTopic)] = topic
}
batches := make(map[int]MailserverBatch)
@ -371,74 +372,88 @@ func (m *Messenger) syncFiltersFrom(filters []*transport.Filter, lastRequest uin
return nil, err
}
contentTopicsPerPubsubTopic := make(map[string]map[string]*transport.Filter)
for _, filter := range filters {
if !filter.Listen || filter.Ephemeral {
continue
}
var chatID string
// If the filter has an identity, we use it as a chatID, otherwise is a public chat/community chat filter
if len(filter.Identity) != 0 {
chatID = filter.Identity
} else {
chatID = filter.ChatID
}
topicData, ok := topicsData[filter.Topic.String()]
var capToDefaultSyncPeriod = true
contentTopics, ok := contentTopicsPerPubsubTopic[filter.PubsubTopic]
if !ok {
if lastRequest == 0 {
lastRequest = defaultPeriodFromNow
}
topicData = mailservers.MailserverTopic{
Topic: filter.Topic.String(),
LastRequest: int(defaultPeriodFromNow),
}
} else if lastRequest != 0 {
topicData.LastRequest = int(lastRequest)
capToDefaultSyncPeriod = false
contentTopics = make(map[string]*transport.Filter)
}
contentTopics[filter.ContentTopic.String()] = filter
contentTopicsPerPubsubTopic[filter.PubsubTopic] = contentTopics
}
batchID := topicData.LastRequest
if currentBatch < len(prioritizedBatches) {
batch, ok := batches[currentBatch]
if ok {
prevTopicData, ok := topicsData[batch.Topics[0].String()]
if (!ok && topicData.LastRequest != int(defaultPeriodFromNow)) ||
(ok && prevTopicData.LastRequest != topicData.LastRequest) {
currentBatch++
}
for pubsubTopic, contentTopics := range contentTopicsPerPubsubTopic {
for _, filter := range contentTopics {
var chatID string
// If the filter has an identity, we use it as a chatID, otherwise is a public chat/community chat filter
if len(filter.Identity) != 0 {
chatID = filter.Identity
} else {
chatID = filter.ChatID
}
topicData, ok := topicsData[filter.PubsubTopic+filter.ContentTopic.String()]
var capToDefaultSyncPeriod = true
if !ok {
if lastRequest == 0 {
lastRequest = defaultPeriodFromNow
}
topicData = mailservers.MailserverTopic{
PubsubTopic: filter.PubsubTopic,
ContentTopic: filter.ContentTopic.String(),
LastRequest: int(defaultPeriodFromNow),
}
} else if lastRequest != 0 {
topicData.LastRequest = int(lastRequest)
capToDefaultSyncPeriod = false
}
batchID := topicData.LastRequest
if currentBatch < len(prioritizedBatches) {
batchID = currentBatch
currentBatchCap := prioritizedBatches[currentBatch] - 1
if currentBatchCap == 0 {
currentBatch++
} else {
prioritizedBatches[currentBatch] = currentBatchCap
batch, ok := batches[currentBatch]
if ok {
prevTopicData, ok := topicsData[batch.PubsubTopic+batch.Topics[0].String()]
if (!ok && topicData.LastRequest != int(defaultPeriodFromNow)) ||
(ok && prevTopicData.LastRequest != topicData.LastRequest) {
currentBatch++
}
}
if currentBatch < len(prioritizedBatches) {
batchID = currentBatch
currentBatchCap := prioritizedBatches[currentBatch] - 1
if currentBatchCap == 0 {
currentBatch++
} else {
prioritizedBatches[currentBatch] = currentBatchCap
}
}
}
}
batch, ok := batches[batchID]
if !ok {
from := uint32(topicData.LastRequest)
if capToDefaultSyncPeriod {
from, err = m.capToDefaultSyncPeriod(uint32(topicData.LastRequest))
if err != nil {
return nil, err
batch, ok := batches[batchID]
if !ok {
from := uint32(topicData.LastRequest)
if capToDefaultSyncPeriod {
from, err = m.capToDefaultSyncPeriod(uint32(topicData.LastRequest))
if err != nil {
return nil, err
}
}
batch = MailserverBatch{From: from, To: to}
}
batch = MailserverBatch{From: from, To: to}
}
batch.ChatIDs = append(batch.ChatIDs, chatID)
batch.Topics = append(batch.Topics, filter.Topic)
batches[batchID] = batch
// Set last request to the new `to`
topicData.LastRequest = int(to)
syncedTopics = append(syncedTopics, topicData)
batch.ChatIDs = append(batch.ChatIDs, chatID)
batch.PubsubTopic = pubsubTopic
batch.Topics = append(batch.Topics, filter.ContentTopic)
batches[batchID] = batch
// Set last request to the new `to`
topicData.LastRequest = int(to)
syncedTopics = append(syncedTopics, topicData)
}
}
if m.config.messengerSignalsHandler != nil {
@ -460,10 +475,11 @@ func (m *Messenger) syncFiltersFrom(filters []*transport.Filter, lastRequest uin
batch := batches[k]
dayBatch := MailserverBatch{
To: batch.To,
Cursor: batch.Cursor,
Topics: batch.Topics,
ChatIDs: batch.ChatIDs,
To: batch.To,
Cursor: batch.Cursor,
PubsubTopic: batch.PubsubTopic,
Topics: batch.Topics,
ChatIDs: batch.ChatIDs,
}
from := batch.To - 86400
@ -586,9 +602,10 @@ func (m *Messenger) calculateGapForChat(chat *Chat, from uint32) (*common.Messag
}
type work struct {
topics []types.TopicType
cursor []byte
storeCursor *types.StoreRequestCursor
pubsubTopic string
contentTopics []types.TopicType
cursor []byte
storeCursor *types.StoreRequestCursor
}
type messageRequester interface {
@ -598,7 +615,8 @@ type messageRequester interface {
from, to uint32,
previousCursor []byte,
previousStoreCursor *types.StoreRequestCursor,
topics []types.TopicType,
pubsubTopic string,
contentTopics []types.TopicType,
waitForResponse bool,
) (cursor []byte, storeCursor *types.StoreRequestCursor, err error)
}
@ -645,7 +663,8 @@ func processMailserverBatch(ctx context.Context, messageRequester messageRequest
default:
logger.Debug("processBatch producer - creating work")
workCh <- work{
topics: batch.Topics[i:j],
pubsubTopic: batch.PubsubTopic,
contentTopics: batch.Topics[i:j],
}
time.Sleep(50 * time.Millisecond)
}
@ -685,7 +704,7 @@ loop:
}()
queryCtx, queryCancel := context.WithTimeout(ctx, mailserverRequestTimeout)
cursor, storeCursor, err := messageRequester.SendMessagesRequestForTopics(queryCtx, mailserverID, batch.From, batch.To, w.cursor, w.storeCursor, w.topics, true)
cursor, storeCursor, err := messageRequester.SendMessagesRequestForTopics(queryCtx, mailserverID, batch.From, batch.To, w.cursor, w.storeCursor, w.pubsubTopic, w.contentTopics, true)
queryCancel()
@ -700,9 +719,10 @@ loop:
workWg.Add(1)
workCh <- work{
topics: w.topics,
cursor: cursor,
storeCursor: storeCursor,
pubsubTopic: w.pubsubTopic,
contentTopics: w.contentTopics,
cursor: cursor,
storeCursor: storeCursor,
}
}
}(w)
@ -737,45 +757,18 @@ func (m *Messenger) processMailserverBatch(batch MailserverBatch) error {
}
type MailserverBatch struct {
From uint32
To uint32
Cursor string
Topics []types.TopicType
ChatIDs []string
}
func (m *Messenger) RequestHistoricMessagesForFilter(
ctx context.Context,
from, to uint32,
cursor []byte,
previousStoreCursor *types.StoreRequestCursor,
filter *transport.Filter,
waitForResponse bool,
) ([]byte, *types.StoreRequestCursor, error) {
activeMailserverID, err := m.activeMailserverID()
if err != nil {
return nil, nil, err
}
if activeMailserverID == nil {
m.cycleMailservers()
activeMailserverID, err = m.activeMailserverID()
if err != nil {
return nil, nil, err
}
if activeMailserverID == nil {
return nil, nil, errors.New("no mailserver selected")
}
}
return m.transport.SendMessagesRequestForFilter(ctx, activeMailserverID, from, to, cursor, previousStoreCursor, filter, waitForResponse)
From uint32
To uint32
Cursor string
PubsubTopic string
Topics []types.TopicType
ChatIDs []string
}
func (m *Messenger) SyncChatFromSyncedFrom(chatID string) (uint32, error) {
var from uint32
_, err := m.performMailserverRequest(func() (*MessengerResponse, error) {
topics, err := m.topicsForChat(chatID)
pubsubTopic, topics, err := m.topicsForChat(chatID)
if err != nil {
return nil, nil
}
@ -789,11 +782,13 @@ func (m *Messenger) SyncChatFromSyncedFrom(chatID string) (uint32, error) {
if err != nil {
return nil, err
}
batch := MailserverBatch{
ChatIDs: []string{chatID},
To: chat.SyncedFrom,
From: chat.SyncedFrom - defaultSyncPeriod,
Topics: topics,
ChatIDs: []string{chatID},
To: chat.SyncedFrom,
From: chat.SyncedFrom - defaultSyncPeriod,
PubsubTopic: pubsubTopic,
Topics: topics,
}
if m.config.messengerSignalsHandler != nil {
m.config.messengerSignalsHandler.HistoryRequestStarted(1)
@ -835,7 +830,7 @@ func (m *Messenger) FillGaps(chatID string, messageIDs []string) error {
return errors.New("chat not existing")
}
topics, err := m.topicsForChat(chatID)
pubsubTopic, topics, err := m.topicsForChat(chatID)
if err != nil {
return err
}
@ -857,10 +852,11 @@ func (m *Messenger) FillGaps(chatID string, messageIDs []string) error {
}
batch := MailserverBatch{
ChatIDs: []string{chatID},
To: highestTo,
From: lowestFrom,
Topics: topics,
ChatIDs: []string{chatID},
To: highestTo,
From: lowestFrom,
PubsubTopic: pubsubTopic,
Topics: topics,
}
if m.config.messengerSignalsHandler != nil {

View File

@ -41,12 +41,13 @@ func (t *mockTransport) SendMessagesRequestForTopics(
from, to uint32,
previousCursor []byte,
previousStoreCursor *types.StoreRequestCursor,
topics []types.TopicType,
pubsubTopic string,
contentTopics []types.TopicType,
waitForResponse bool,
) (cursor []byte, storeCursor *types.StoreRequestCursor, err error) {
var response queryResponse
if previousCursor == nil {
initialResponse := getInitialResponseKey(topics)
initialResponse := getInitialResponseKey(contentTopics)
response = t.queryResponses[initialResponse]
} else {
response = t.queryResponses[hex.EncodeToString(previousCursor)]

View File

@ -1,6 +1,8 @@
package protocol
import (
"crypto/ecdsa"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/status-im/status-go/eth-node/types"
@ -33,3 +35,12 @@ func (m *Messenger) Peers() map[string]types.WakuV2Peer {
func (m *Messenger) ListenAddresses() ([]string, error) {
return m.transport.ListenAddresses()
}
// Subscribe to a pubsub topic, passing an optional public key if the pubsub topic is protected
func (m *Messenger) SubscribeToPubsubTopic(topic string, optPublicKey *ecdsa.PublicKey) error {
return m.transport.SubscribeToPubsubTopic(topic, optPublicKey)
}
func (m *Messenger) StorePubsubTopicKey(topic string, privKey *ecdsa.PrivateKey) error {
return m.transport.StorePubsubTopicKey(topic, privKey)
}

View File

@ -83,6 +83,7 @@ func (m *Messenger) sendUserStatus(ctx context.Context, status UserStatus) error
}
for _, community := range joinedCommunities {
rawMessage.LocalChatID = community.StatusUpdatesChannelID()
rawMessage.PubsubTopic = community.PubsubTopic()
_, err = m.sender.SendPublic(ctx, rawMessage.LocalChatID, rawMessage)
if err != nil {
return err
@ -171,6 +172,7 @@ func (m *Messenger) sendCurrentUserStatusToCommunity(ctx context.Context, commun
MessageType: protobuf.ApplicationMetadataMessage_STATUS_UPDATE,
ResendAutomatically: true,
Ephemeral: statusUpdate.StatusType == protobuf.StatusUpdate_AUTOMATIC,
PubsubTopic: community.PubsubTopic(),
}
_, err = m.sender.SendPublic(ctx, rawMessage.LocalChatID, rawMessage)

View File

@ -15,8 +15,10 @@ type Filter struct {
// Identity is the public key of the other recipient for non-public filters.
// It's encoded using encoding/hex.
Identity string `json:"identity"`
// Topic is the whisper topic
Topic types.TopicType `json:"topic"`
// PubsubTopic is the waku2 pubsub topic
PubsubTopic string `json:"pubsubTopic"`
// ContentTopic is the waku topic
ContentTopic types.TopicType `json:"topic"`
// Discovery is whether this is a discovery topic
Discovery bool `json:"discovery"`
// Negotiated tells us whether is a negotiated topic

View File

@ -8,6 +8,7 @@ import (
"sync"
"github.com/pkg/errors"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"go.uber.org/zap"
"github.com/status-im/status-go/eth-node/types"
@ -74,7 +75,7 @@ func NewFiltersManager(persistence KeysPersistence, service FiltersService, priv
}
func (f *FiltersManager) Init(
chatIDs []string,
filtersToInit []FiltersToInitialize,
publicKeys []*ecdsa.PublicKey,
) ([]*Filter, error) {
@ -97,8 +98,8 @@ func (f *FiltersManager) Init(
}
// Add public, one-to-one and negotiated filters.
for _, chatID := range chatIDs {
_, err := f.LoadPublic(chatID)
for _, fi := range filtersToInit {
_, err := f.LoadPublic(fi.ChatID, fi.PubsubTopic)
if err != nil {
return nil, err
}
@ -121,11 +122,16 @@ func (f *FiltersManager) Init(
return allFilters, nil
}
func (f *FiltersManager) InitPublicFilters(chatIDs []string) ([]*Filter, error) {
type FiltersToInitialize struct {
ChatID string
PubsubTopic string
}
func (f *FiltersManager) InitPublicFilters(publicFiltersToInit []FiltersToInitialize) ([]*Filter, error) {
var filters []*Filter
// Add public, one-to-one and negotiated filters.
for _, chatID := range chatIDs {
f, err := f.LoadPublic(chatID)
for _, pf := range publicFiltersToInit {
f, err := f.LoadPublic(pf.ChatID, pf.PubsubTopic) // TODO: pubsubtopic
if err != nil {
return nil, err
}
@ -134,15 +140,20 @@ func (f *FiltersManager) InitPublicFilters(chatIDs []string) ([]*Filter, error)
return filters, nil
}
func (f *FiltersManager) InitCommunityFilters(pks []*ecdsa.PrivateKey) ([]*Filter, error) {
type CommunityFilterToInitialize struct {
CommunityID []byte
PrivKey *ecdsa.PrivateKey
}
func (f *FiltersManager) InitCommunityFilters(communityFiltersToInitialize []CommunityFilterToInitialize) ([]*Filter, error) {
var filters []*Filter
f.mutex.Lock()
defer f.mutex.Unlock()
for _, pk := range pks {
identityStr := PublicKeyToStr(&pk.PublicKey)
rawFilter, err := f.addAsymmetric(identityStr, pk, true)
for _, cf := range communityFiltersToInitialize {
pubsubTopic := GetPubsubTopic(cf.CommunityID)
identityStr := PublicKeyToStr(&cf.PrivKey.PublicKey)
rawFilter, err := f.addAsymmetric(identityStr, pubsubTopic, cf.PrivKey, true)
if err != nil {
f.logger.Debug("could not register community filter", zap.Error(err))
return nil, err
@ -150,12 +161,13 @@ func (f *FiltersManager) InitCommunityFilters(pks []*ecdsa.PrivateKey) ([]*Filte
}
filterID := identityStr + "-admin"
filter := &Filter{
ChatID: filterID,
FilterID: rawFilter.FilterID,
Topic: rawFilter.Topic,
Identity: identityStr,
Listen: true,
OneToOne: true,
ChatID: filterID,
FilterID: rawFilter.FilterID,
PubsubTopic: pubsubTopic,
ContentTopic: rawFilter.Topic,
Identity: identityStr,
Listen: true,
OneToOne: true,
}
f.filters[filterID] = filter
@ -170,8 +182,8 @@ func (f *FiltersManager) InitCommunityFilters(pks []*ecdsa.PrivateKey) ([]*Filte
// DEPRECATED
func (f *FiltersManager) InitWithFilters(filters []*Filter) ([]*Filter, error) {
var (
chatIDs []string
publicKeys []*ecdsa.PublicKey
filtersToInit []FiltersToInitialize
publicKeys []*ecdsa.PublicKey
)
for _, filter := range filters {
@ -182,11 +194,11 @@ func (f *FiltersManager) InitWithFilters(filters []*Filter) ([]*Filter, error) {
}
publicKeys = append(publicKeys, publicKey)
} else if filter.ChatID != "" {
chatIDs = append(chatIDs, filter.ChatID)
filtersToInit = append(filtersToInit, FiltersToInitialize{ChatID: filter.ChatID, PubsubTopic: filter.PubsubTopic})
}
}
return f.Init(chatIDs, publicKeys)
return f.Init(filtersToInit, publicKeys)
}
func (f *FiltersManager) Reset(ctx context.Context) error {
@ -234,7 +246,7 @@ func (f *FiltersManager) FilterByTopic(topic []byte) *Filter {
f.mutex.Lock()
defer f.mutex.Unlock()
for _, f := range f.filters {
if bytes.Equal(types.TopicTypeToByteArray(f.Topic), topic) {
if bytes.Equal(types.TopicTypeToByteArray(f.ContentTopic), topic) {
return f
}
}
@ -318,6 +330,8 @@ func (f *FiltersManager) RemoveNoListenFilters() error {
// Remove remove all the filters associated with a chat/identity
func (f *FiltersManager) RemoveFilterByChatID(chatID string) (*Filter, error) {
// TODO: remove subscriptions from waku2 if required. Might need to be implemented in transport
f.mutex.Lock()
filter, ok := f.filters[chatID]
f.mutex.Unlock()
@ -354,21 +368,24 @@ func (f *FiltersManager) LoadPersonal(publicKey *ecdsa.PublicKey, identity *ecds
return f.filters[chatID], nil
}
pubsubTopic := relay.DefaultWakuTopic
// We set up a filter so we can publish,
// but we discard envelopes if listen is false.
filter, err := f.addAsymmetric(chatID, identity, listen)
filter, err := f.addAsymmetric(chatID, pubsubTopic, identity, listen)
if err != nil {
f.logger.Debug("could not register personal topic filter", zap.Error(err))
return nil, err
}
chat := &Filter{
ChatID: chatID,
FilterID: filter.FilterID,
Topic: filter.Topic,
Identity: PublicKeyToStr(publicKey),
Listen: listen,
OneToOne: true,
ChatID: chatID,
FilterID: filter.FilterID,
ContentTopic: filter.Topic,
PubsubTopic: pubsubTopic,
Identity: PublicKeyToStr(publicKey),
Listen: listen,
OneToOne: true,
}
f.filters[chatID] = chat
@ -392,22 +409,25 @@ func (f *FiltersManager) loadPartitioned(publicKey *ecdsa.PublicKey, identity *e
return f.filters[chatID], nil
}
pubsubTopic := relay.DefaultWakuTopic
// We set up a filter so we can publish,
// but we discard envelopes if listen is false.
filter, err := f.addAsymmetric(chatID, identity, listen)
filter, err := f.addAsymmetric(chatID, pubsubTopic, identity, listen)
if err != nil {
f.logger.Debug("could not register partitioned topic", zap.String("chatID", chatID), zap.Error(err))
return nil, err
}
chat := &Filter{
ChatID: chatID,
FilterID: filter.FilterID,
Topic: filter.Topic,
Identity: PublicKeyToStr(publicKey),
Listen: listen,
Ephemeral: ephemeral,
OneToOne: true,
ChatID: chatID,
FilterID: filter.FilterID,
ContentTopic: filter.Topic,
PubsubTopic: pubsubTopic,
Identity: PublicKeyToStr(publicKey),
Listen: listen,
Ephemeral: ephemeral,
OneToOne: true,
}
f.filters[chatID] = chat
@ -428,22 +448,24 @@ func (f *FiltersManager) LoadNegotiated(secret types.NegotiatedSecret) (*Filter,
return f.filters[chatID], nil
}
pubsubTopic := relay.DefaultWakuTopic
keyString := hex.EncodeToString(secret.Key)
filter, err := f.addSymmetric(keyString)
filter, err := f.addSymmetric(keyString, pubsubTopic)
if err != nil {
f.logger.Debug("could not register negotiated topic", zap.Error(err))
return nil, err
}
chat := &Filter{
ChatID: chatID,
Topic: filter.Topic,
SymKeyID: filter.SymKeyID,
FilterID: filter.FilterID,
Identity: PublicKeyToStr(secret.PublicKey),
Negotiated: true,
Listen: true,
OneToOne: true,
ChatID: chatID,
ContentTopic: filter.Topic,
PubsubTopic: pubsubTopic,
SymKeyID: filter.SymKeyID,
FilterID: filter.FilterID,
Identity: PublicKeyToStr(secret.PublicKey),
Negotiated: true,
Listen: true,
OneToOne: true,
}
f.filters[chat.ChatID] = chat
@ -478,25 +500,26 @@ func (f *FiltersManager) LoadDiscovery() ([]*Filter, error) {
// Load personal discovery
personalDiscoveryChat := &Filter{
ChatID: personalDiscoveryTopic,
Identity: identityStr,
Discovery: true,
Listen: true,
OneToOne: true,
ChatID: personalDiscoveryTopic,
Identity: identityStr,
PubsubTopic: relay.DefaultWakuTopic,
Discovery: true,
Listen: true,
OneToOne: true,
}
discoveryResponse, err := f.addAsymmetric(personalDiscoveryChat.ChatID, f.privateKey, true)
discoveryResponse, err := f.addAsymmetric(personalDiscoveryChat.ChatID, personalDiscoveryChat.PubsubTopic, f.privateKey, true)
if err != nil {
f.logger.Debug("could not register discovery topic", zap.String("chatID", personalDiscoveryChat.ChatID), zap.Error(err))
return nil, err
}
personalDiscoveryChat.Topic = discoveryResponse.Topic
personalDiscoveryChat.ContentTopic = discoveryResponse.Topic
personalDiscoveryChat.FilterID = discoveryResponse.FilterID
f.filters[personalDiscoveryChat.ChatID] = personalDiscoveryChat
f.logger.Debug("registering filter for", zap.String("chatID", personalDiscoveryChat.ChatID), zap.String("type", "discovery"), zap.String("topic", personalDiscoveryChat.Topic.String()))
f.logger.Debug("registering filter for", zap.String("chatID", personalDiscoveryChat.ChatID), zap.String("type", "discovery"), zap.String("topic", personalDiscoveryChat.ContentTopic.String()))
return []*Filter{personalDiscoveryChat}, nil
}
@ -508,7 +531,7 @@ func (f *FiltersManager) PersonalTopicFilter() *Filter {
}
// LoadPublic adds a filter for a public chat.
func (f *FiltersManager) LoadPublic(chatID string) (*Filter, error) {
func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string) (*Filter, error) {
f.mutex.Lock()
defer f.mutex.Unlock()
@ -516,19 +539,20 @@ func (f *FiltersManager) LoadPublic(chatID string) (*Filter, error) {
return chat, nil
}
filterAndTopic, err := f.addSymmetric(chatID)
filterAndTopic, err := f.addSymmetric(chatID, pubsubTopic)
if err != nil {
f.logger.Debug("could not register public chat topic", zap.String("chatID", chatID), zap.Error(err))
return nil, err
}
chat := &Filter{
ChatID: chatID,
FilterID: filterAndTopic.FilterID,
SymKeyID: filterAndTopic.SymKeyID,
Topic: filterAndTopic.Topic,
Listen: true,
OneToOne: false,
ChatID: chatID,
FilterID: filterAndTopic.FilterID,
SymKeyID: filterAndTopic.SymKeyID,
ContentTopic: filterAndTopic.Topic,
PubsubTopic: pubsubTopic,
Listen: true,
OneToOne: false,
}
f.filters[chatID] = chat
@ -549,19 +573,22 @@ func (f *FiltersManager) LoadContactCode(pubKey *ecdsa.PublicKey) (*Filter, erro
return f.filters[chatID], nil
}
contactCodeFilter, err := f.addSymmetric(chatID)
pubsubTopic := relay.DefaultWakuTopic
contactCodeFilter, err := f.addSymmetric(chatID, pubsubTopic)
if err != nil {
f.logger.Debug("could not register contact code topic", zap.String("chatID", chatID), zap.Error(err))
return nil, err
}
chat := &Filter{
ChatID: chatID,
FilterID: contactCodeFilter.FilterID,
Topic: contactCodeFilter.Topic,
SymKeyID: contactCodeFilter.SymKeyID,
Identity: PublicKeyToStr(pubKey),
Listen: true,
ChatID: chatID,
FilterID: contactCodeFilter.FilterID,
ContentTopic: contactCodeFilter.Topic,
SymKeyID: contactCodeFilter.SymKeyID,
Identity: PublicKeyToStr(pubKey),
PubsubTopic: pubsubTopic,
Listen: true,
}
f.filters[chatID] = chat
@ -572,7 +599,7 @@ func (f *FiltersManager) LoadContactCode(pubKey *ecdsa.PublicKey) (*Filter, erro
}
// addSymmetric adds a symmetric key filter
func (f *FiltersManager) addSymmetric(chatID string) (*RawFilter, error) {
func (f *FiltersManager) addSymmetric(chatID string, pubsubTopic string) (*RawFilter, error) {
var symKeyID string
var err error
@ -602,9 +629,10 @@ func (f *FiltersManager) addSymmetric(chatID string) (*RawFilter, error) {
}
id, err := f.service.Subscribe(&types.SubscriptionOptions{
SymKeyID: symKeyID,
PoW: minPow,
Topics: topics,
SymKeyID: symKeyID,
PoW: minPow,
Topics: topics,
PubsubTopic: pubsubTopic,
})
if err != nil {
return nil, err
@ -619,7 +647,7 @@ func (f *FiltersManager) addSymmetric(chatID string) (*RawFilter, error) {
// addAsymmetricFilter adds a filter with our private key
// and set minPow according to the listen parameter.
func (f *FiltersManager) addAsymmetric(chatID string, identity *ecdsa.PrivateKey, listen bool) (*RawFilter, error) {
func (f *FiltersManager) addAsymmetric(chatID string, pubsubTopic string, identity *ecdsa.PrivateKey, listen bool) (*RawFilter, error) {
var (
err error
pow = 1.0 // use PoW high enough to discard all messages for the filter
@ -641,6 +669,7 @@ func (f *FiltersManager) addAsymmetric(chatID string, identity *ecdsa.PrivateKey
PrivateKeyID: privateKeyID,
PoW: pow,
Topics: topics,
PubsubTopic: pubsubTopic,
})
if err != nil {
return nil, err

View File

@ -13,6 +13,7 @@ import (
"github.com/google/uuid"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"go.uber.org/zap"
"github.com/ethereum/go-ethereum/common"
@ -136,12 +137,12 @@ func NewTransport(
return t, nil
}
func (t *Transport) InitFilters(chatIDs []string, publicKeys []*ecdsa.PublicKey) ([]*Filter, error) {
func (t *Transport) InitFilters(chatIDs []FiltersToInitialize, publicKeys []*ecdsa.PublicKey) ([]*Filter, error) {
return t.filters.Init(chatIDs, publicKeys)
}
func (t *Transport) InitPublicFilters(chatIDs []string) ([]*Filter, error) {
return t.filters.InitPublicFilters(chatIDs)
func (t *Transport) InitPublicFilters(filtersToInit []FiltersToInitialize) ([]*Filter, error) {
return t.filters.InitPublicFilters(filtersToInit)
}
func (t *Transport) Filters() []*Filter {
@ -164,8 +165,8 @@ func (t *Transport) LoadFilters(filters []*Filter) ([]*Filter, error) {
return t.filters.InitWithFilters(filters)
}
func (t *Transport) InitCommunityFilters(pks []*ecdsa.PrivateKey) ([]*Filter, error) {
return t.filters.InitCommunityFilters(pks)
func (t *Transport) InitCommunityFilters(communityFiltersToInitialize []CommunityFilterToInitialize) ([]*Filter, error) {
return t.filters.InitCommunityFilters(communityFiltersToInitialize)
}
func (t *Transport) RemoveFilters(filters []*Filter) error {
@ -189,7 +190,7 @@ func (t *Transport) ProcessNegotiatedSecret(secret types.NegotiatedSecret) (*Fil
}
func (t *Transport) JoinPublic(chatID string) (*Filter, error) {
return t.filters.LoadPublic(chatID)
return t.filters.LoadPublic(chatID, relay.DefaultWakuTopic)
}
func (t *Transport) LeavePublic(chatID string) error {
@ -279,13 +280,13 @@ func (t *Transport) SendPublic(ctx context.Context, newMessage *types.NewMessage
return nil, err
}
filter, err := t.filters.LoadPublic(chatName)
filter, err := t.filters.LoadPublic(chatName, newMessage.PubsubTopic)
if err != nil {
return nil, err
}
newMessage.SymKeyID = filter.SymKeyID
newMessage.Topic = filter.Topic
newMessage.Topic = filter.ContentTopic
return t.api.Post(ctx, *newMessage)
}
@ -304,7 +305,7 @@ func (t *Transport) SendPrivateWithSharedSecret(ctx context.Context, newMessage
}
newMessage.SymKeyID = filter.SymKeyID
newMessage.Topic = filter.Topic
newMessage.Topic = filter.ContentTopic
newMessage.PublicKey = nil
return t.api.Post(ctx, *newMessage)
@ -320,7 +321,7 @@ func (t *Transport) SendPrivateWithPartitioned(ctx context.Context, newMessage *
return nil, err
}
newMessage.Topic = filter.Topic
newMessage.Topic = filter.ContentTopic
newMessage.PublicKey = crypto.FromECDSAPub(publicKey)
return t.api.Post(ctx, *newMessage)
@ -336,7 +337,7 @@ func (t *Transport) SendPrivateOnPersonalTopic(ctx context.Context, newMessage *
return nil, err
}
newMessage.Topic = filter.Topic
newMessage.Topic = filter.ContentTopic
newMessage.PublicKey = crypto.FromECDSAPub(publicKey)
return t.api.Post(ctx, *newMessage)
@ -356,15 +357,15 @@ func (t *Transport) SendCommunityMessage(ctx context.Context, newMessage *types.
}
// We load the filter to make sure we can post on it
filter, err := t.filters.LoadPublic(PubkeyToHex(publicKey)[2:])
filter, err := t.filters.LoadPublic(PubkeyToHex(publicKey)[2:], newMessage.PubsubTopic)
if err != nil {
return nil, err
}
newMessage.Topic = filter.Topic
newMessage.Topic = filter.ContentTopic
newMessage.PublicKey = crypto.FromECDSAPub(publicKey)
t.logger.Debug("SENDING message", zap.Binary("topic", filter.Topic[:]))
t.logger.Debug("SENDING message", zap.Binary("topic", filter.ContentTopic[:]))
return t.api.Post(ctx, *newMessage)
}
@ -450,7 +451,7 @@ func (t *Transport) createMessagesRequestV1(
topics []types.TopicType,
waitForResponse bool,
) (cursor []byte, err error) {
r := createMessagesRequest(from, to, previousCursor, nil, topics)
r := createMessagesRequest(from, to, previousCursor, nil, "", topics)
events := make(chan types.EnvelopeEvent, 10)
sub := t.waku.SubscribeEnvelopeEvents(events)
@ -481,10 +482,11 @@ func (t *Transport) createMessagesRequestV2(
peerID []byte,
from, to uint32,
previousStoreCursor *types.StoreRequestCursor,
topics []types.TopicType,
pubsubTopic string,
contentTopics []types.TopicType,
waitForResponse bool,
) (storeCursor *types.StoreRequestCursor, err error) {
r := createMessagesRequest(from, to, nil, previousStoreCursor, topics)
r := createMessagesRequest(from, to, nil, previousStoreCursor, pubsubTopic, contentTopics)
if waitForResponse {
resultCh := make(chan struct {
@ -524,55 +526,22 @@ func (t *Transport) SendMessagesRequestForTopics(
from, to uint32,
previousCursor []byte,
previousStoreCursor *types.StoreRequestCursor,
topics []types.TopicType,
pubsubTopic string,
contentTopics []types.TopicType,
waitForResponse bool,
) (cursor []byte, storeCursor *types.StoreRequestCursor, err error) {
switch t.waku.Version() {
case 2:
storeCursor, err = t.createMessagesRequestV2(ctx, peerID, from, to, previousStoreCursor, topics, waitForResponse)
storeCursor, err = t.createMessagesRequestV2(ctx, peerID, from, to, previousStoreCursor, pubsubTopic, contentTopics, waitForResponse)
case 1:
cursor, err = t.createMessagesRequestV1(ctx, peerID, from, to, previousCursor, topics, waitForResponse)
cursor, err = t.createMessagesRequestV1(ctx, peerID, from, to, previousCursor, contentTopics, waitForResponse)
default:
err = fmt.Errorf("unsupported version %d", t.waku.Version())
}
return
}
// RequestHistoricMessages requests historic messages for all registered filters.
func (t *Transport) SendMessagesRequest(
ctx context.Context,
peerID []byte,
from, to uint32,
previousCursor []byte,
previousStoreCursor *types.StoreRequestCursor,
waitForResponse bool,
) (cursor []byte, storeCursor *types.StoreRequestCursor, err error) {
topics := make([]types.TopicType, len(t.Filters()))
for _, f := range t.Filters() {
topics = append(topics, f.Topic)
}
return t.SendMessagesRequestForTopics(ctx, peerID, from, to, previousCursor, previousStoreCursor, topics, waitForResponse)
}
func (t *Transport) SendMessagesRequestForFilter(
ctx context.Context,
peerID []byte,
from, to uint32,
previousCursor []byte,
previousStoreCursor *types.StoreRequestCursor,
filter *Filter,
waitForResponse bool,
) (cursor []byte, storeCursor *types.StoreRequestCursor, err error) {
topics := make([]types.TopicType, len(t.Filters()))
topics = append(topics, filter.Topic)
return t.SendMessagesRequestForTopics(ctx, peerID, from, to, previousCursor, previousStoreCursor, topics, waitForResponse)
}
func createMessagesRequest(from, to uint32, cursor []byte, storeCursor *types.StoreRequestCursor, topics []types.TopicType) types.MessagesRequest {
func createMessagesRequest(from, to uint32, cursor []byte, storeCursor *types.StoreRequestCursor, pubsubTopic string, topics []types.TopicType) types.MessagesRequest {
aUUID := uuid.New()
// uuid is 16 bytes, converted to hex it's 32 bytes as expected by types.MessagesRequest
id := []byte(hex.EncodeToString(aUUID[:]))
@ -581,13 +550,14 @@ func createMessagesRequest(from, to uint32, cursor []byte, storeCursor *types.St
topicBytes = append(topicBytes, topics[idx][:])
}
return types.MessagesRequest{
ID: id,
From: from,
To: to,
Limit: 1000,
Cursor: cursor,
Topics: topicBytes,
StoreCursor: storeCursor,
ID: id,
From: from,
To: to,
Limit: 1000,
Cursor: cursor,
PubsubTopic: pubsubTopic,
ContentTopics: topicBytes,
StoreCursor: storeCursor,
}
}
@ -689,3 +659,25 @@ func (t *Transport) SubscribeToConnStatusChanges() (*types.ConnStatusSubscriptio
func (t *Transport) ConnectionChanged(state connection.State) {
t.waku.ConnectionChanged(state)
}
// Subscribe to a pubsub topic, passing an optional public key if the pubsub topic is protected
func (t *Transport) SubscribeToPubsubTopic(topic string, optPublicKey *ecdsa.PublicKey) error {
if t.waku.Version() == 2 {
return t.waku.SubscribeToPubsubTopic(topic, optPublicKey)
}
return nil
}
func (t *Transport) StorePubsubTopicKey(topic string, privKey *ecdsa.PrivateKey) error {
return t.waku.StorePubsubTopicKey(topic, privKey)
}
func GetPubsubTopic(communityID []byte) string {
// TODO: remove hardcoded pubsub topic and use shard
result := "/waku/2/status-signed-test-1"
if communityID == nil {
result = relay.DefaultWakuTopic
}
return result
}

View File

@ -2,6 +2,7 @@ package ext
import (
"context"
"crypto/ecdsa"
"encoding/hex"
"errors"
"fmt"
@ -1267,6 +1268,37 @@ func (api *PublicAPI) DisableCommunityHistoryArchiveProtocol() error {
return api.service.messenger.DisableCommunityHistoryArchiveProtocol()
}
func (api *PublicAPI) SubscribeToPubsubTopic(topic string, optPublicKey string) error {
var publicKey *ecdsa.PublicKey
if optPublicKey != "" {
keyBytes, err := hexutil.Decode(optPublicKey)
if err != nil {
return err
}
publicKey, err = crypto.UnmarshalPubkey(keyBytes)
if err != nil {
return err
}
}
return api.service.messenger.SubscribeToPubsubTopic(topic, publicKey)
}
func (api *PublicAPI) StorePubsubTopicKey(topic string, privKey string) error {
keyBytes, err := hexutil.Decode(privKey)
if err != nil {
return err
}
p, err := crypto.ToECDSA(keyBytes)
if err != nil {
return err
}
return api.service.messenger.StorePubsubTopicKey(topic, p)
}
func (api *PublicAPI) AddStorePeer(address string) (string, error) {
peerID, err := api.service.messenger.AddStorePeer(address)
return string(peerID), err

View File

@ -51,8 +51,8 @@ func (a *API) GetMailserverTopics(ctx context.Context) ([]MailserverTopic, error
return a.db.Topics()
}
func (a *API) DeleteMailserverTopic(ctx context.Context, topic string) error {
return a.db.DeleteTopic(topic)
func (a *API) DeleteMailserverTopic(ctx context.Context, pubsubTopic string, topic string) error {
return a.db.DeleteTopic(pubsubTopic, topic)
}
func (a *API) AddChatRequestRange(ctx context.Context, req ChatRequestRange) error {

View File

@ -5,6 +5,7 @@ import (
"testing"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/status-im/status-go/appdatabase"
"github.com/status-im/status-go/eth-node/types"
@ -58,9 +59,9 @@ func TestTopic(t *testing.T) {
defer close()
topicA := "0x61000000"
topicD := "0x64000000"
topic1 := MailserverTopic{Topic: topicA, LastRequest: 1}
topic2 := MailserverTopic{Topic: "0x6200000", LastRequest: 2}
topic3 := MailserverTopic{Topic: "0x6300000", LastRequest: 3}
topic1 := MailserverTopic{PubsubTopic: relay.DefaultWakuTopic, ContentTopic: topicA, LastRequest: 1}
topic2 := MailserverTopic{PubsubTopic: relay.DefaultWakuTopic, ContentTopic: "0x6200000", LastRequest: 2}
topic3 := MailserverTopic{PubsubTopic: relay.DefaultWakuTopic, ContentTopic: "0x6300000", LastRequest: 3}
require.NoError(t, db.AddTopic(topic1))
require.NoError(t, db.AddTopic(topic2))
@ -72,12 +73,16 @@ func TestTopic(t *testing.T) {
filters := []*transport.Filter{
// Existing topic, is not updated
{Topic: types.BytesToTopic([]byte{0x61})},
{
PubsubTopic: relay.DefaultWakuTopic,
ContentTopic: types.BytesToTopic([]byte{0x61}),
},
// Non existing topic is not inserted
{
Discovery: true,
Negotiated: true,
Topic: types.BytesToTopic([]byte{0x64}),
Discovery: true,
Negotiated: true,
PubsubTopic: relay.DefaultWakuTopic,
ContentTopic: types.BytesToTopic([]byte{0x64}),
},
}
@ -86,13 +91,13 @@ func TestTopic(t *testing.T) {
topics, err = db.Topics()
require.NoError(t, err)
require.Len(t, topics, 2)
require.Equal(t, topics[0].Topic, topicA)
require.Equal(t, topics[0].ContentTopic, topicA)
require.Equal(t, topics[0].LastRequest, 1)
require.Equal(t, topics[0].Topic, topicA)
require.Equal(t, topics[0].ContentTopic, topicA)
require.Equal(t, topics[0].LastRequest, 1)
require.Equal(t, topics[1].Topic, topicD)
require.Equal(t, topics[1].ContentTopic, topicD)
require.NotEmpty(t, topics[1].LastRequest)
require.True(t, topics[1].Negotiated)
require.True(t, topics[1].Discovery)
@ -152,9 +157,10 @@ func TestAddGetDeleteMailserverTopics(t *testing.T) {
defer close()
api := &API{db: db}
testTopic := MailserverTopic{
Topic: "topic-001",
ChatIDs: []string{"chatID01", "chatID02"},
LastRequest: 10,
PubsubTopic: relay.DefaultWakuTopic,
ContentTopic: "topic-001",
ChatIDs: []string{"chatID01", "chatID02"},
LastRequest: 10,
}
err := api.AddMailserverTopic(context.Background(), testTopic)
require.NoError(t, err)
@ -164,14 +170,14 @@ func TestAddGetDeleteMailserverTopics(t *testing.T) {
require.NoError(t, err)
require.EqualValues(t, []MailserverTopic{testTopic}, topics)
err = api.DeleteMailserverTopic(context.Background(), testTopic.Topic)
err = api.DeleteMailserverTopic(context.Background(), relay.DefaultWakuTopic, testTopic.ContentTopic)
require.NoError(t, err)
topics, err = api.GetMailserverTopics(context.Background())
require.NoError(t, err)
require.EqualValues(t, ([]MailserverTopic)(nil), topics)
// Delete non-existing topic.
err = api.DeleteMailserverTopic(context.Background(), "non-existing-topic")
err = api.DeleteMailserverTopic(context.Background(), relay.DefaultWakuTopic, "non-existing-topic")
require.NoError(t, err)
}

View File

@ -12,6 +12,7 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/status-im/status-go/protocol/transport"
)
@ -83,11 +84,12 @@ type MailserverRequestGap struct {
}
type MailserverTopic struct {
Topic string `json:"topic"`
Discovery bool `json:"discovery?"`
Negotiated bool `json:"negotiated?"`
ChatIDs []string `json:"chat-ids"`
LastRequest int `json:"last-request"` // default is 1
PubsubTopic string `json:"pubsubTopic"`
ContentTopic string `json:"topic"`
Discovery bool `json:"discovery?"`
Negotiated bool `json:"negotiated?"`
ChatIDs []string `json:"chat-ids"`
LastRequest int `json:"last-request"` // default is 1
}
type ChatRequestRange struct {
@ -264,13 +266,15 @@ func (d *Database) AddTopic(topic MailserverTopic) error {
chatIDs := sqlStringSlice(topic.ChatIDs)
_, err := d.db.Exec(`INSERT OR REPLACE INTO mailserver_topics(
pubsub_topic,
topic,
chat_ids,
last_request,
discovery,
negotiated
) VALUES (?, ?, ?,?,?)`,
topic.Topic,
) VALUES (?, ?, ?, ?, ?, ?)`,
topic.PubsubTopic,
topic.ContentTopic,
chatIDs,
topic.LastRequest,
topic.Discovery,
@ -296,13 +300,15 @@ func (d *Database) AddTopics(topics []MailserverTopic) (err error) {
for _, topic := range topics {
chatIDs := sqlStringSlice(topic.ChatIDs)
_, err = tx.Exec(`INSERT OR REPLACE INTO mailserver_topics(
pubsub_topic,
topic,
chat_ids,
last_request,
discovery,
negotiated
) VALUES (?, ?, ?,?,?)`,
topic.Topic,
) VALUES (?, ?, ?, ?, ?, ?)`,
topic.PubsubTopic,
topic.ContentTopic,
chatIDs,
topic.LastRequest,
topic.Discovery,
@ -318,7 +324,7 @@ func (d *Database) AddTopics(topics []MailserverTopic) (err error) {
func (d *Database) Topics() ([]MailserverTopic, error) {
var result []MailserverTopic
rows, err := d.db.Query(`SELECT topic, chat_ids, last_request,discovery,negotiated FROM mailserver_topics`)
rows, err := d.db.Query(`SELECT pubsub_topic, topic, chat_ids, last_request,discovery,negotiated FROM mailserver_topics`)
if err != nil {
return nil, err
}
@ -330,7 +336,8 @@ func (d *Database) Topics() ([]MailserverTopic, error) {
chatIDs sqlStringSlice
)
if err := rows.Scan(
&t.Topic,
&t.PubsubTopic,
&t.ContentTopic,
&chatIDs,
&t.LastRequest,
&t.Discovery,
@ -345,13 +352,13 @@ func (d *Database) Topics() ([]MailserverTopic, error) {
return result, nil
}
func (d *Database) ResetLastRequest(topic string) error {
_, err := d.db.Exec("UPDATE mailserver_topics SET last_request = 0 WHERE topic = ?", topic)
func (d *Database) ResetLastRequest(pubsubTopic, contentTopic string) error {
_, err := d.db.Exec("UPDATE mailserver_topics SET last_request = 0 WHERE pubsub_topic = ? AND topic = ?", pubsubTopic, contentTopic)
return err
}
func (d *Database) DeleteTopic(topic string) error {
_, err := d.db.Exec(`DELETE FROM mailserver_topics WHERE topic = ?`, topic)
func (d *Database) DeleteTopic(pubsubTopic, contentTopic string) error {
_, err := d.db.Exec(`DELETE FROM mailserver_topics WHERE pubsub_topic = ? AND topic = ?`, pubsubTopic, contentTopic)
return err
}
@ -375,16 +382,29 @@ func (d *Database) SetTopics(filters []*transport.Filter) (err error) {
return nil
}
topicsArgs := make([]interface{}, 0, len(filters))
contentTopicsPerPubsubTopic := make(map[string]map[string]struct{})
for _, filter := range filters {
topicsArgs = append(topicsArgs, filter.Topic.String())
contentTopics, ok := contentTopicsPerPubsubTopic[filter.PubsubTopic]
if !ok {
contentTopics = make(map[string]struct{})
}
contentTopics[filter.ContentTopic.String()] = struct{}{}
contentTopicsPerPubsubTopic[filter.PubsubTopic] = contentTopics
}
inVector := strings.Repeat("?, ", len(filters)-1) + "?"
for pubsubTopic, contentTopics := range contentTopicsPerPubsubTopic {
topicsArgs := make([]interface{}, 0, len(contentTopics)+1)
topicsArgs = append(topicsArgs, pubsubTopic)
for ct := range contentTopics {
topicsArgs = append(topicsArgs, ct)
}
// Delete topics
query := "DELETE FROM mailserver_topics WHERE topic NOT IN (" + inVector + ")" // nolint: gosec
_, err = tx.Exec(query, topicsArgs...)
inVector := strings.Repeat("?, ", len(contentTopics)-1) + "?"
// Delete topics
query := "DELETE FROM mailserver_topics WHERE pubsub_topic = ? AND topic NOT IN (" + inVector + ")" // nolint: gosec
_, err = tx.Exec(query, topicsArgs...)
}
// Default to now - 1.day
lastRequest := (time.Now().Add(-24 * time.Hour)).Unix()
@ -392,12 +412,12 @@ func (d *Database) SetTopics(filters []*transport.Filter) (err error) {
for _, filter := range filters {
// fetch
var topic string
err = tx.QueryRow(`SELECT topic FROM mailserver_topics WHERE topic = ?`, filter.Topic.String()).Scan(&topic)
err = tx.QueryRow(`SELECT topic FROM mailserver_topics WHERE topic = ? AND pubsub_topic = ?`, filter.ContentTopic.String(), filter.PubsubTopic).Scan(&topic)
if err != nil && err != sql.ErrNoRows {
return
} else if err == sql.ErrNoRows {
// we insert the topic
_, err = tx.Exec(`INSERT INTO mailserver_topics(topic,last_request,discovery,negotiated) VALUES (?,?,?,?)`, filter.Topic.String(), lastRequest, filter.Discovery, filter.Negotiated)
_, err = tx.Exec(`INSERT INTO mailserver_topics(topic,pubsub_topic,last_request,discovery,negotiated) VALUES (?,?,?,?,?)`, filter.ContentTopic.String(), filter.PubsubTopic, lastRequest, filter.Discovery, filter.Negotiated)
}
if err != nil {
return

View File

@ -42,7 +42,8 @@ func (c *Client) PushReceivedMessages(filter transport.Filter, sshMessage *types
"messageHash": types.EncodeHex(sshMessage.Hash),
"messageId": message.ID,
"sentAt": sshMessage.Timestamp,
"topic": filter.Topic.String(),
"pubsubTopic": filter.PubsubTopic,
"topic": filter.ContentTopic.String(),
"messageType": message.Type.String(),
"receiverKeyUID": c.keyUID,
"nodeName": c.nodeName,

View File

@ -171,14 +171,15 @@ func (api *PublicWakuAPI) BloomFilter() []byte {
// NewMessage represents a new waku message that is posted through the RPC.
type NewMessage struct {
SymKeyID string `json:"symKeyID"`
PublicKey []byte `json:"pubKey"`
Sig string `json:"sig"`
Topic common.TopicType `json:"topic"`
Payload []byte `json:"payload"`
Padding []byte `json:"padding"`
TargetPeer string `json:"targetPeer"`
Ephemeral bool `json:"ephemeral"`
SymKeyID string `json:"symKeyID"`
PublicKey []byte `json:"pubKey"`
Sig string `json:"sig"`
PubsubTopic string `json:"pubsubTopic"`
ContentTopic common.TopicType `json:"topic"`
Payload []byte `json:"payload"`
Padding []byte `json:"padding"`
TargetPeer string `json:"targetPeer"`
Ephemeral bool `json:"ephemeral"`
}
// Post posts a message on the Waku network.
@ -195,27 +196,22 @@ func (api *PublicWakuAPI) Post(ctx context.Context, req NewMessage) (hexutil.Byt
return nil, ErrSymAsym
}
params := &common.MessageParams{
Payload: req.Payload,
Padding: req.Padding,
Topic: req.Topic,
}
var keyInfo *payload.KeyInfo = new(payload.KeyInfo)
// Set key that is used to sign the message
if len(req.Sig) > 0 {
if params.Src, err = api.w.GetPrivateKey(req.Sig); err != nil {
privKey, err := api.w.GetPrivateKey(req.Sig)
if err != nil {
return nil, err
}
keyInfo.PrivKey = params.Src
keyInfo.PrivKey = privKey
}
// Set symmetric key that is used to encrypt the message
if symKeyGiven {
keyInfo.Kind = payload.Symmetric
if params.Topic == (common.TopicType{}) { // topics are mandatory with symmetric encryption
if req.ContentTopic == (common.TopicType{}) { // topics are mandatory with symmetric encryption
return nil, ErrNoTopics
}
if keyInfo.SymKey, err = api.w.GetSymKey(req.SymKeyID); err != nil {
@ -251,13 +247,13 @@ func (api *PublicWakuAPI) Post(ctx context.Context, req NewMessage) (hexutil.Byt
wakuMsg := &pb.WakuMessage{
Payload: payload,
Version: version,
ContentTopic: req.Topic.ContentTopic(),
ContentTopic: req.ContentTopic.ContentTopic(),
Timestamp: api.w.timestamp(),
Meta: []byte{}, // TODO: empty for now. Once we use Waku Archive v2, we should deprecate the timestamp and use an ULID here
Ephemeral: req.Ephemeral,
}
hash, err := api.w.Send(wakuMsg)
hash, err := api.w.Send(req.PubsubTopic, wakuMsg)
if err != nil {
return nil, err
@ -278,10 +274,11 @@ func (api *PublicWakuAPI) Unsubscribe(ctx context.Context, id string) {
// Criteria holds various filter options for inbound messages.
type Criteria struct {
SymKeyID string `json:"symKeyID"`
PrivateKeyID string `json:"privateKeyID"`
Sig []byte `json:"sig"`
Topics []common.TopicType `json:"topics"`
SymKeyID string `json:"symKeyID"`
PrivateKeyID string `json:"privateKeyID"`
Sig []byte `json:"sig"`
PubsubTopic string `json:"pubsubTopic"`
ContentTopics []common.TopicType `json:"topics"`
}
// Messages set up a subscription that fires events when messages arrive that match
@ -314,7 +311,9 @@ func (api *PublicWakuAPI) Messages(ctx context.Context, crit Criteria) (*rpc.Sub
}
}
for _, bt := range crit.Topics {
filter.PubsubTopic = crit.PubsubTopic
for _, bt := range crit.ContentTopics {
filter.Topics = append(filter.Topics, bt[:])
}
@ -378,23 +377,25 @@ func (api *PublicWakuAPI) Messages(ctx context.Context, crit Criteria) (*rpc.Sub
// Message is the RPC representation of a waku message.
type Message struct {
Sig []byte `json:"sig,omitempty"`
Timestamp uint32 `json:"timestamp"`
Topic common.TopicType `json:"topic"`
Payload []byte `json:"payload"`
Padding []byte `json:"padding"`
Hash []byte `json:"hash"`
Dst []byte `json:"recipientPublicKey,omitempty"`
Sig []byte `json:"sig,omitempty"`
Timestamp uint32 `json:"timestamp"`
PubsubTopic string `json:"pubsubTopic"`
ContentTopic common.TopicType `json:"topic"`
Payload []byte `json:"payload"`
Padding []byte `json:"padding"`
Hash []byte `json:"hash"`
Dst []byte `json:"recipientPublicKey,omitempty"`
}
// ToWakuMessage converts an internal message into an API version.
func ToWakuMessage(message *common.ReceivedMessage) *Message {
msg := Message{
Payload: message.Data,
Padding: message.Padding,
Timestamp: message.Sent,
Hash: message.Hash().Bytes(),
Topic: message.Topic,
Payload: message.Data,
Padding: message.Padding,
Timestamp: message.Sent,
Hash: message.Hash().Bytes(),
PubsubTopic: message.PubsubTopic,
ContentTopic: message.ContentTopic,
}
if message.Dst != nil {
@ -494,20 +495,21 @@ func (api *PublicWakuAPI) NewMessageFilter(req Criteria) (string, error) {
}
}
if len(req.Topics) > 0 {
topics = make([][]byte, len(req.Topics))
for i, topic := range req.Topics {
if len(req.ContentTopics) > 0 {
topics = make([][]byte, len(req.ContentTopics))
for i, topic := range req.ContentTopics {
topics[i] = make([]byte, common.TopicLength)
copy(topics[i], topic[:])
}
}
f := &common.Filter{
Src: src,
KeySym: keySym,
KeyAsym: keyAsym,
Topics: topics,
Messages: common.NewMemoryMessageStore(),
Src: src,
KeySym: keySym,
KeyAsym: keyAsym,
PubsubTopic: req.PubsubTopic,
Topics: topics,
Messages: common.NewMemoryMessageStore(),
}
id, err := api.w.Subscribe(f)

View File

@ -23,6 +23,8 @@ import (
"testing"
"time"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/status-im/status-go/wakuv2/common"
)
@ -45,8 +47,8 @@ func TestMultipleTopicCopyInNewMessageFilter(t *testing.T) {
t2 := [4]byte{0xca, 0xfe, 0xde, 0xca}
crit := Criteria{
SymKeyID: keyID,
Topics: []common.TopicType{common.TopicType(t1), common.TopicType(t2)},
SymKeyID: keyID,
ContentTopics: []common.TopicType{common.TopicType(t1), common.TopicType(t2)},
}
_, err = api.NewMessageFilter(crit)
@ -55,7 +57,7 @@ func TestMultipleTopicCopyInNewMessageFilter(t *testing.T) {
}
found := false
candidates := w.filters.GetWatchersByTopic(t1)
candidates := w.filters.GetWatchersByTopic(relay.DefaultWakuTopic, t1)
for _, f := range candidates {
if len(f.Topics) == 2 {
if bytes.Equal(f.Topics[0], t1[:]) && bytes.Equal(f.Topics[1], t2[:]) {

View File

@ -23,6 +23,8 @@ import (
"fmt"
"sync"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
@ -30,22 +32,27 @@ import (
// Filter represents a Waku message filter
type Filter struct {
Src *ecdsa.PublicKey // Sender of the message
KeyAsym *ecdsa.PrivateKey // Private Key of recipient
KeySym []byte // Key associated with the Topic
Topics [][]byte // Topics to filter messages with
SymKeyHash common.Hash // The Keccak256Hash of the symmetric key, needed for optimization
id string // unique identifier
Src *ecdsa.PublicKey // Sender of the message
KeyAsym *ecdsa.PrivateKey // Private Key of recipient
KeySym []byte // Key associated with the Topic
PubsubTopic string // Pubsub topic used to filter messages with
Topics [][]byte // ContentTopics to filter messages with
SymKeyHash common.Hash // The Keccak256Hash of the symmetric key, needed for optimization
id string // unique identifier
Messages MessageStore
}
type FilterSet = map[*Filter]struct{}
type ContentTopicToFilter = map[TopicType]FilterSet
type PubsubTopicToContentTopic = map[string]ContentTopicToFilter
// Filters represents a collection of filters
type Filters struct {
watchers map[string]*Filter
topicMatcher map[TopicType]map[*Filter]struct{} // map a topic to the filters that are interested in being notified when a message matches that topic
allTopicsMatcher map[*Filter]struct{} // list all the filters that will be notified of a new message, no matter what its topic is
topicMatcher PubsubTopicToContentTopic // map a topic to the filters that are interested in being notified when a message matches that topic
allTopicsMatcher map[*Filter]struct{} // list all the filters that will be notified of a new message, no matter what its topic is
mutex sync.RWMutex
}
@ -54,7 +61,7 @@ type Filters struct {
func NewFilters() *Filters {
return &Filters{
watchers: make(map[string]*Filter),
topicMatcher: make(map[TopicType]map[*Filter]struct{}),
topicMatcher: make(PubsubTopicToContentTopic),
allTopicsMatcher: make(map[*Filter]struct{}),
}
}
@ -104,8 +111,10 @@ func (fs *Filters) AllTopics() []TopicType {
var topics []TopicType
fs.mutex.Lock()
defer fs.mutex.Unlock()
for t := range fs.topicMatcher {
topics = append(topics, t)
for _, topicsPerPubsubTopic := range fs.topicMatcher {
for t := range topicsPerPubsubTopic {
topics = append(topics, t)
}
}
return topics
@ -115,36 +124,57 @@ func (fs *Filters) AllTopics() []TopicType {
// If the filter's Topics array is empty, it will be tried on every topic.
// Otherwise, it will be tried on the topics specified.
func (fs *Filters) addTopicMatcher(watcher *Filter) {
if len(watcher.Topics) == 0 {
if len(watcher.Topics) == 0 && (watcher.PubsubTopic == relay.DefaultWakuTopic || watcher.PubsubTopic == "") {
fs.allTopicsMatcher[watcher] = struct{}{}
} else {
filtersPerContentTopic, ok := fs.topicMatcher[watcher.PubsubTopic]
if !ok {
filtersPerContentTopic = make(ContentTopicToFilter)
}
for _, t := range watcher.Topics {
topic := BytesToTopic(t)
if fs.topicMatcher[topic] == nil {
fs.topicMatcher[topic] = make(map[*Filter]struct{})
if filtersPerContentTopic[topic] == nil {
filtersPerContentTopic[topic] = make(FilterSet)
}
fs.topicMatcher[topic][watcher] = struct{}{}
filtersPerContentTopic[topic][watcher] = struct{}{}
}
fs.topicMatcher[watcher.PubsubTopic] = filtersPerContentTopic
}
}
// removeFromTopicMatchers removes a filter from the topic matchers
func (fs *Filters) removeFromTopicMatchers(watcher *Filter) {
delete(fs.allTopicsMatcher, watcher)
filtersPerContentTopic, ok := fs.topicMatcher[watcher.PubsubTopic]
if !ok {
return
}
for _, t := range watcher.Topics {
topic := BytesToTopic(t)
delete(fs.topicMatcher[topic], watcher)
delete(filtersPerContentTopic[topic], watcher)
}
fs.topicMatcher[watcher.PubsubTopic] = filtersPerContentTopic
}
// GetWatchersByTopic returns a slice containing the filters that
// match a specific topic
func (fs *Filters) GetWatchersByTopic(topic TopicType) []*Filter {
func (fs *Filters) GetWatchersByTopic(pubsubTopic string, contentTopic TopicType) []*Filter {
res := make([]*Filter, 0, len(fs.allTopicsMatcher))
for watcher := range fs.allTopicsMatcher {
res = append(res, watcher)
}
for watcher := range fs.topicMatcher[topic] {
filtersPerContentTopic, ok := fs.topicMatcher[pubsubTopic]
if !ok {
return res
}
for watcher := range filtersPerContentTopic[contentTopic] {
res = append(res, watcher)
}
return res
@ -166,10 +196,10 @@ func (fs *Filters) NotifyWatchers(recvMessage *ReceivedMessage) bool {
defer fs.mutex.RUnlock()
var matched bool
candidates := fs.GetWatchersByTopic(recvMessage.Topic)
candidates := fs.GetWatchersByTopic(recvMessage.PubsubTopic, recvMessage.ContentTopic)
if len(candidates) == 0 {
log.Debug("no filters available for this topic", "message", recvMessage.Hash().Hex(), "topic", recvMessage.Topic.String())
log.Debug("no filters available for this topic", "message", recvMessage.Hash().Hex(), "pubsubTopic", recvMessage.PubsubTopic, "contentTopic", recvMessage.ContentTopic.String())
}
for _, watcher := range candidates {

View File

@ -45,10 +45,12 @@ type ReceivedMessage struct {
Padding []byte
Signature []byte
Sent uint32 // Time when the message was posted into the network
Src *ecdsa.PublicKey // Message recipient (identity used to decode the message)
Dst *ecdsa.PublicKey // Message recipient (identity used to decode the message)
Topic TopicType
Sent uint32 // Time when the message was posted into the network
Src *ecdsa.PublicKey // Message recipient (identity used to decode the message)
Dst *ecdsa.PublicKey // Message recipient (identity used to decode the message)
PubsubTopic string
ContentTopic TopicType
SymKeyHash common.Hash // The Keccak256Hash of the key
@ -159,10 +161,11 @@ func NewReceivedMessage(env *protocol.Envelope, msgType MessageType) *ReceivedMe
}
return &ReceivedMessage{
Envelope: env,
MsgType: msgType,
Sent: uint32(env.Message().Timestamp / int64(time.Second)),
Topic: ct,
Envelope: env,
MsgType: msgType,
Sent: uint32(env.Message().Timestamp / int64(time.Second)),
ContentTopic: ct,
PubsubTopic: env.PubsubTopic(),
}
}
@ -242,7 +245,8 @@ func (msg *ReceivedMessage) Open(watcher *Filter) (result *ReceivedMessage) {
return nil
}
result.Topic = ct
result.PubsubTopic = watcher.PubsubTopic
result.ContentTopic = ct
return result
}

View File

@ -0,0 +1,112 @@
package persistence
import (
"crypto/ecdsa"
"database/sql"
"errors"
"go.uber.org/zap"
"github.com/ethereum/go-ethereum/crypto"
)
// DBStore is a MessageProvider that has a *sql.DB connection
type ProtectedTopicsStore struct {
db *sql.DB
log *zap.Logger
insertStmt *sql.Stmt
fetchPrivKeyStmt *sql.Stmt
}
// Creates a new DB store using the db specified via options.
// It will create a messages table if it does not exist and
// clean up records according to the retention policy used
func NewProtectedTopicsStore(log *zap.Logger, db *sql.DB) (*ProtectedTopicsStore, error) {
insertStmt, err := db.Prepare("INSERT OR REPLACE INTO pubsubtopic_signing_key (topic, priv_key, pub_key) VALUES (?, ?, ?)")
if err != nil {
return nil, err
}
fetchPrivKeyStmt, err := db.Prepare("SELECT priv_key FROM pubsubtopic_signing_key WHERE topic = ?")
if err != nil {
return nil, err
}
result := new(ProtectedTopicsStore)
result.log = log.Named("protected-topics-store")
result.db = db
result.insertStmt = insertStmt
result.fetchPrivKeyStmt = fetchPrivKeyStmt
return result, nil
}
func (p *ProtectedTopicsStore) Close() error {
err := p.insertStmt.Close()
if err != nil {
return err
}
return p.fetchPrivKeyStmt.Close()
}
func (p *ProtectedTopicsStore) Insert(pubsubTopic string, privKey *ecdsa.PrivateKey, publicKey *ecdsa.PublicKey) error {
var privKeyBytes []byte
if privKey != nil {
privKeyBytes = crypto.FromECDSA(privKey)
}
pubKeyBytes := crypto.FromECDSAPub(publicKey)
_, err := p.insertStmt.Exec(pubsubTopic, privKeyBytes, pubKeyBytes)
return err
}
func (p *ProtectedTopicsStore) FetchPrivateKey(topic string) (privKey *ecdsa.PrivateKey, err error) {
var privKeyBytes []byte
err = p.fetchPrivKeyStmt.QueryRow(topic).Scan(&privKeyBytes)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
return nil, err
}
return crypto.ToECDSA(privKeyBytes)
}
type ProtectedTopic struct {
PubKey *ecdsa.PublicKey
Topic string
}
func (p *ProtectedTopicsStore) ProtectedTopics() ([]ProtectedTopic, error) {
rows, err := p.db.Query("SELECT pub_key, topic FROM pubsubtopic_signing_key")
if err != nil {
return nil, err
}
var result []ProtectedTopic
for rows.Next() {
var pubKeyBytes []byte
var topic string
err := rows.Scan(&pubKeyBytes, &topic)
if err != nil {
return nil, err
}
pubk, err := crypto.UnmarshalPubkey(pubKeyBytes)
if err != nil {
return nil, err
}
result = append(result, ProtectedTopic{
PubKey: pubk,
Topic: topic,
})
}
return result, nil
}

View File

@ -118,8 +118,9 @@ type Waku struct {
bandwidthCounter *metrics.BandwidthCounter
sendQueue chan *protocol.Envelope
msgQueue chan *common.ReceivedMessage // Message queue for waku messages that havent been decoded
protectedTopicStore *persistence.ProtectedTopicsStore
sendQueue chan *protocol.Envelope
msgQueue chan *common.ReceivedMessage // Message queue for waku messages that havent been decoded
ctx context.Context
cancel context.CancelFunc
@ -305,6 +306,13 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s
opts = append(opts, node.WithMessageProvider(dbStore))
}
if appDB != nil {
waku.protectedTopicStore, err = persistence.NewProtectedTopicsStore(logger, appDB)
if err != nil {
return nil, err
}
}
waku.settings.Options = opts
waku.logger.Info("setup the go-waku node successfully")
@ -602,34 +610,48 @@ func (w *Waku) runPeerExchangeLoop() {
}
}
func (w *Waku) runRelayMsgLoop() {
defer w.wg.Done()
func (w *Waku) subscribeToPubsubTopicWithWakuRelay(topic string, pubkey *ecdsa.PublicKey) error {
if w.settings.LightClient {
return
return errors.New("only available for full nodes")
}
sub, err := w.node.Relay().Subscribe(w.ctx)
if err != nil {
fmt.Println("Could not subscribe:", err)
return
if w.node.Relay().IsSubscribed(topic) {
return nil
}
for {
select {
case <-w.ctx.Done():
sub.Unsubscribe()
return
case env := <-sub.Ch:
envelopeErrors, err := w.OnNewEnvelopes(env, common.RelayedMessageType)
if err != nil {
w.logger.Error("onNewEnvelope error", zap.Error(err))
}
// TODO: should these be handled?
_ = envelopeErrors
_ = err
if pubkey != nil {
err := w.node.Relay().AddSignedTopicValidator(topic, pubkey)
if err != nil {
return err
}
}
sub, err := w.node.Relay().SubscribeToTopic(context.Background(), topic)
if err != nil {
return err
}
w.wg.Add(1)
go func() {
defer w.wg.Done()
for {
select {
case <-w.ctx.Done():
sub.Unsubscribe()
return
case env := <-sub.Ch:
envelopeErrors, err := w.OnNewEnvelopes(env, common.RelayedMessageType)
if err != nil {
w.logger.Error("onNewEnvelope error", zap.Error(err))
}
// TODO: should these be handled?
_ = envelopeErrors
_ = err
}
}
}()
return nil
}
func (w *Waku) runFilterSubscriptionLoop(sub *filter.SubscriptionDetails) {
@ -680,7 +702,7 @@ func (w *Waku) runFilterMsgLoop() {
err := w.isFilterSubAlive(sub)
if err != nil {
// Unsubscribe on light node
contentFilter := w.buildContentFilter(f.Topics)
contentFilter := w.buildContentFilter(f.PubsubTopic, f.Topics)
// TODO Better return value handling for WakuFilterPushResult
_, err := w.node.FilterLightnode().Unsubscribe(w.ctx, contentFilter, filter.Peer(sub.PeerID))
if err != nil {
@ -712,9 +734,9 @@ func (w *Waku) runFilterMsgLoop() {
}
}
}
func (w *Waku) buildContentFilter(topics [][]byte) filter.ContentFilter {
func (w *Waku) buildContentFilter(pubsubTopic string, topics [][]byte) filter.ContentFilter {
contentFilter := filter.ContentFilter{
Topic: relay.DefaultWakuTopic,
Topic: pubsubTopic,
}
for _, topic := range topics {
contentFilter.ContentTopics = append(contentFilter.ContentTopics, common.BytesToTopic(topic).ContentTopic())
@ -1008,6 +1030,9 @@ func (w *Waku) GetSymKey(id string) ([]byte, error) {
// Subscribe installs a new message handler used for filtering, decrypting
// and subsequent storing of incoming messages.
func (w *Waku) Subscribe(f *common.Filter) (string, error) {
if f.PubsubTopic == "" {
f.PubsubTopic = relay.DefaultWakuTopic
}
s, err := w.filters.Install(f)
if err != nil {
@ -1033,7 +1058,7 @@ func (w *Waku) Subscribe(f *common.Filter) (string, error) {
func (w *Waku) Unsubscribe(ctx context.Context, id string) error {
f := w.filters.Get(id)
if f != nil && w.settings.LightClient {
contentFilter := w.buildContentFilter(f.Topics)
contentFilter := w.buildContentFilter(f.PubsubTopic, f.Topics)
if _, err := w.node.FilterLightnode().Unsubscribe(ctx, contentFilter); err != nil {
return fmt.Errorf("failed to unsubscribe: %w", err)
}
@ -1069,15 +1094,15 @@ func (w *Waku) broadcast() {
case envelope := <-w.sendQueue:
var err error
if w.settings.LightClient {
w.logger.Info("publishing message via lightpush", zap.String("envelopeHash", hexutil.Encode(envelope.Hash())))
_, err = w.node.Lightpush().Publish(w.ctx, envelope.Message())
w.logger.Info("publishing message via lightpush", zap.String("envelopeHash", hexutil.Encode(envelope.Hash())), zap.String("pubsubTopic", envelope.PubsubTopic()))
_, err = w.node.Lightpush().PublishToTopic(context.Background(), envelope.Message(), envelope.PubsubTopic())
} else {
w.logger.Info("publishing message via relay", zap.String("envelopeHash", hexutil.Encode(envelope.Hash())))
_, err = w.node.Relay().Publish(w.ctx, envelope.Message())
w.logger.Info("publishing message via relay", zap.String("envelopeHash", hexutil.Encode(envelope.Hash())), zap.String("pubsubTopic", envelope.PubsubTopic()))
_, err = w.node.Relay().PublishToTopic(context.Background(), envelope.Message(), envelope.PubsubTopic())
}
if err != nil {
w.logger.Error("could not send message", zap.String("envelopeHash", hexutil.Encode(envelope.Hash())), zap.Error(err))
w.logger.Error("could not send message", zap.String("envelopeHash", hexutil.Encode(envelope.Hash())), zap.String("pubsubTopic", envelope.PubsubTopic()), zap.Error(err))
w.envelopeFeed.Send(common.EnvelopeEvent{
Hash: gethcommon.BytesToHash(envelope.Hash()),
Event: common.EventEnvelopeExpired,
@ -1101,8 +1126,26 @@ func (w *Waku) broadcast() {
// Send injects a message into the waku send queue, to be distributed in the
// network in the coming cycles.
func (w *Waku) Send(msg *pb.WakuMessage) ([]byte, error) {
envelope := protocol.NewEnvelope(msg, msg.Timestamp, relay.DefaultWakuTopic) // TODO: once sharding is defined, use the correct pubsub topic
func (w *Waku) Send(pubsubTopic string, msg *pb.WakuMessage) ([]byte, error) {
if pubsubTopic == "" {
pubsubTopic = relay.DefaultWakuTopic
}
if w.protectedTopicStore != nil {
privKey, err := w.protectedTopicStore.FetchPrivateKey(pubsubTopic)
if err != nil {
return nil, err
}
if privKey != nil {
err = relay.SignMessage(privKey, msg, pubsubTopic)
if err != nil {
return nil, err
}
}
}
envelope := protocol.NewEnvelope(msg, msg.Timestamp, pubsubTopic)
w.sendQueue <- envelope
@ -1118,7 +1161,7 @@ func (w *Waku) Send(msg *pb.WakuMessage) ([]byte, error) {
return envelope.Hash(), nil
}
func (w *Waku) query(ctx context.Context, peerID peer.ID, topics []common.TopicType, from uint64, to uint64, opts []store.HistoryRequestOption) (*store.Result, error) {
func (w *Waku) query(ctx context.Context, peerID peer.ID, pubsubTopic string, topics []common.TopicType, from uint64, to uint64, opts []store.HistoryRequestOption) (*store.Result, error) {
strTopics := make([]string, len(topics))
for i, t := range topics {
strTopics[i] = t.ContentTopic()
@ -1130,16 +1173,16 @@ func (w *Waku) query(ctx context.Context, peerID peer.ID, topics []common.TopicT
StartTime: int64(from) * int64(time.Second),
EndTime: int64(to) * int64(time.Second),
ContentTopics: strTopics,
Topic: relay.DefaultWakuTopic,
Topic: pubsubTopic,
}
return w.node.Store().Query(ctx, query, opts...)
}
func (w *Waku) Query(ctx context.Context, peerID peer.ID, topics []common.TopicType, from uint64, to uint64, opts []store.HistoryRequestOption) (cursor *storepb.Index, err error) {
func (w *Waku) Query(ctx context.Context, peerID peer.ID, pubsubTopic string, topics []common.TopicType, from uint64, to uint64, opts []store.HistoryRequestOption) (cursor *storepb.Index, err error) {
requestID := protocol.GenerateRequestId()
opts = append(opts, store.WithRequestId(requestID))
result, err := w.query(ctx, peerID, topics, from, to, opts)
result, err := w.query(ctx, peerID, pubsubTopic, topics, from, to, opts)
if err != nil {
w.logger.Error("error querying storenode", zap.String("requestID", hexutil.Encode(requestID)), zap.String("peerID", peerID.String()), zap.Error(err))
if w.onHistoricMessagesRequestFailed != nil {
@ -1153,8 +1196,8 @@ func (w *Waku) Query(ctx context.Context, peerID peer.ID, topics []common.TopicT
// See https://github.com/vacp2p/rfc/issues/563
msg.RateLimitProof = nil
envelope := protocol.NewEnvelope(msg, msg.Timestamp, relay.DefaultWakuTopic)
w.logger.Info("received waku2 store message", zap.Any("envelopeHash", hexutil.Encode(envelope.Hash())))
envelope := protocol.NewEnvelope(msg, msg.Timestamp, pubsubTopic)
w.logger.Info("received waku2 store message", zap.Any("envelopeHash", hexutil.Encode(envelope.Hash())), zap.String("pubsubTopic", pubsubTopic))
_, err = w.OnNewEnvelopes(envelope, common.StoreMessageType)
if err != nil {
return nil, err
@ -1205,7 +1248,7 @@ func (w *Waku) Start() error {
}
}
w.wg.Add(4)
w.wg.Add(3)
go func() {
defer w.wg.Done()
@ -1252,9 +1295,13 @@ func (w *Waku) Start() error {
}()
go w.telemetryBandwidthStats(w.cfg.TelemetryServerURL)
go w.runFilterMsgLoop()
go w.runRelayMsgLoop()
go w.runPeerExchangeLoop()
go w.runFilterMsgLoop()
err = w.setupRelaySubscriptions()
if err != nil {
return err
}
numCPU := runtime.NumCPU()
for i := 0; i < numCPU; i++ {
@ -1267,12 +1314,54 @@ func (w *Waku) Start() error {
return nil
}
func (w *Waku) setupRelaySubscriptions() error {
if w.settings.LightClient {
return nil
}
if w.protectedTopicStore != nil {
protectedTopics, err := w.protectedTopicStore.ProtectedTopics()
if err != nil {
return err
}
for _, pt := range protectedTopics {
// Adding subscription to protected topics
err = w.subscribeToPubsubTopicWithWakuRelay(pt.Topic, pt.PubKey)
if err != nil {
return err
}
}
}
// Default Waku Topic (TODO: remove once sharding is added)
err := w.subscribeToPubsubTopicWithWakuRelay(relay.DefaultWakuTopic, nil)
if err != nil {
return err
}
return nil
}
// Stop implements node.Service, stopping the background data propagation thread
// of the Waku protocol.
func (w *Waku) Stop() error {
w.cancel()
w.identifyService.Close()
err := w.identifyService.Close()
if err != nil {
return err
}
w.node.Stop()
if w.protectedTopicStore != nil {
err = w.protectedTopicStore.Close()
if err != nil {
return err
}
}
close(w.connectionChanged)
w.wg.Wait()
return nil
@ -1293,7 +1382,6 @@ func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.Messag
logger := w.logger.With(zap.String("hash", recvMessage.Hash().Hex()))
logger.Debug("received new envelope")
trouble := false
_, err := w.add(recvMessage)
@ -1368,14 +1456,14 @@ func (w *Waku) processQueue() {
// If not matched we remove it
if !matched {
w.logger.Debug("filters did not match", zap.String("hash", e.Hash().String()), zap.String("contentTopic", e.Topic.ContentTopic()))
w.logger.Debug("filters did not match", zap.String("hash", e.Hash().String()), zap.String("contentTopic", e.ContentTopic.ContentTopic()))
w.storeMsgIDsMu.Lock()
delete(w.storeMsgIDs, e.Hash())
w.storeMsgIDsMu.Unlock()
}
w.envelopeFeed.Send(common.EnvelopeEvent{
Topic: e.Topic,
Topic: e.ContentTopic,
Hash: e.Hash(),
Event: common.EventEnvelopeAvailable,
})
@ -1429,6 +1517,20 @@ func (w *Waku) ListenAddresses() []string {
return result
}
func (w *Waku) SubscribeToPubsubTopic(topic string, pubkey *ecdsa.PublicKey) error {
if !w.settings.LightClient {
err := w.subscribeToPubsubTopicWithWakuRelay(topic, pubkey)
if err != nil {
return err
}
}
return nil
}
func (w *Waku) StorePubsubTopicKey(topic string, privKey *ecdsa.PrivateKey) error {
return w.protectedTopicStore.Insert(topic, privKey, &privKey.PublicKey)
}
func (w *Waku) StartDiscV5() error {
if w.node.DiscV5() == nil {
return errors.New("discv5 is not setup")
@ -1727,7 +1829,7 @@ func (w *Waku) subscribeToFilter(f *common.Filter) error {
peers := w.findFilterPeers()
if len(peers) > 0 {
contentFilter := w.buildContentFilter(f.Topics)
contentFilter := w.buildContentFilter(f.PubsubTopic, f.Topics)
for i := 0; i < len(peers) && i < w.settings.MinPeersForFilter; i++ {
subDetails, err := w.node.FilterLightnode().Subscribe(w.ctx, contentFilter, filter.WithPeer(peers[i]))
if err != nil {

View File

@ -14,6 +14,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/dnsdisc"
waku_filter "github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"github.com/status-im/status-go/protocol/tt"
@ -143,7 +144,7 @@ func TestBasicWakuV2(t *testing.T) {
msgTimestamp := w.timestamp()
contentTopic := common.BytesToTopic(filter.Topics[0])
_, err = w.Send(&pb.WakuMessage{
_, err = w.Send(relay.DefaultWakuTopic, &pb.WakuMessage{
Payload: []byte{1, 2, 3, 4, 5},
ContentTopic: contentTopic.ContentTopic(),
Version: 0,
@ -157,7 +158,7 @@ func TestBasicWakuV2(t *testing.T) {
require.Len(t, messages, 1)
timestampInSeconds := msgTimestamp / int64(time.Second)
storeResult, err := w.query(context.Background(), storeNode.PeerID, []common.TopicType{contentTopic}, uint64(timestampInSeconds-20), uint64(timestampInSeconds+20), []store.HistoryRequestOption{})
storeResult, err := w.query(context.Background(), storeNode.PeerID, relay.DefaultWakuTopic, []common.TopicType{contentTopic}, uint64(timestampInSeconds-20), uint64(timestampInSeconds+20), []store.HistoryRequestOption{})
require.NoError(t, err)
require.NotZero(t, len(storeResult.Messages))
@ -206,7 +207,7 @@ func TestWakuV2Filter(t *testing.T) {
msgTimestamp := w.timestamp()
contentTopic := common.BytesToTopic(filter.Topics[0])
_, err = w.Send(&pb.WakuMessage{
_, err = w.Send(relay.DefaultWakuTopic, &pb.WakuMessage{
Payload: []byte{1, 2, 3, 4, 5},
ContentTopic: contentTopic.ContentTopic(),
Version: 0,