diff --git a/eth-node/bridge/geth/waku.go b/eth-node/bridge/geth/waku.go index a1a445cca..7b67b31ce 100644 --- a/eth-node/bridge/geth/waku.go +++ b/eth-node/bridge/geth/waku.go @@ -69,6 +69,11 @@ func (w *gethWakuWrapper) SubscribeToPubsubTopic(topic string, optPublicKey *ecd return errors.New("not available in WakuV1") } +func (w *gethWakuWrapper) UnsubscribeFromPubsubTopic(topic string) error { + // not available in WakuV1 + return errors.New("not available in WakuV1") +} + func (w *gethWakuWrapper) RetrievePubsubTopicKey(topic string) (*ecdsa.PrivateKey, error) { // not available in WakuV1 return nil, errors.New("not available in WakuV1") @@ -79,6 +84,11 @@ func (w *gethWakuWrapper) StorePubsubTopicKey(topic string, privKey *ecdsa.Priva return errors.New("not available in WakuV1") } +func (w *gethWakuWrapper) RemovePubsubTopicKey(topic string) error { + // not available in WakuV1 + return errors.New("not available in WakuV1") +} + // AddRelayPeer function only added for compatibility with waku V2 func (w *gethWakuWrapper) AddRelayPeer(address string) (peer.ID, error) { return "", errors.New("not available in WakuV1") diff --git a/eth-node/bridge/geth/wakuv2.go b/eth-node/bridge/geth/wakuv2.go index 78acb5f5f..f17128d72 100644 --- a/eth-node/bridge/geth/wakuv2.go +++ b/eth-node/bridge/geth/wakuv2.go @@ -237,6 +237,10 @@ func (w *gethWakuV2Wrapper) SubscribeToPubsubTopic(topic string, optPublicKey *e return w.waku.SubscribeToPubsubTopic(topic, optPublicKey) } +func (w *gethWakuV2Wrapper) UnsubscribeFromPubsubTopic(topic string) error { + return w.waku.UnsubscribeFromPubsubTopic(topic) +} + func (w *gethWakuV2Wrapper) RetrievePubsubTopicKey(topic string) (*ecdsa.PrivateKey, error) { return w.waku.RetrievePubsubTopicKey(topic) } @@ -245,6 +249,10 @@ func (w *gethWakuV2Wrapper) StorePubsubTopicKey(topic string, privKey *ecdsa.Pri return w.waku.StorePubsubTopicKey(topic, privKey) } +func (w *gethWakuV2Wrapper) RemovePubsubTopicKey(topic string) error { + return w.waku.RemovePubsubTopicKey(topic) +} + func (w *gethWakuV2Wrapper) AddStorePeer(address string) (peer.ID, error) { return w.waku.AddStorePeer(address) } diff --git a/eth-node/types/waku.go b/eth-node/types/waku.go index db0ff149d..9fed1a5af 100644 --- a/eth-node/types/waku.go +++ b/eth-node/types/waku.go @@ -90,10 +90,14 @@ type Waku interface { SubscribeToPubsubTopic(topic string, optPublicKey *ecdsa.PublicKey) error + UnsubscribeFromPubsubTopic(topic string) error + StorePubsubTopicKey(topic string, privKey *ecdsa.PrivateKey) error RetrievePubsubTopicKey(topic string) (*ecdsa.PrivateKey, error) + RemovePubsubTopicKey(topic string) error + AddStorePeer(address string) (peer.ID, error) AddRelayPeer(address string) (peer.ID, error) diff --git a/protocol/communities/manager.go b/protocol/communities/manager.go index e62f60cc1..0a16cc4db 100644 --- a/protocol/communities/manager.go +++ b/protocol/communities/manager.go @@ -1077,6 +1077,10 @@ func (m *Manager) DeleteCommunity(id types.HexBytes) error { func (m *Manager) UpdateShard(community *Community, shard *shard.Shard, clock uint64) error { community.config.Shard = shard + if shard == nil { + return m.persistence.DeleteCommunityShard(community.ID()) + } + return m.persistence.SaveCommunityShard(community.ID(), shard, clock) } @@ -1086,9 +1090,6 @@ func (m *Manager) SetShard(communityID types.HexBytes, shard *shard.Shard) (*Com if err != nil { return nil, err } - if !community.IsControlNode() { - return nil, errors.New("not admin or owner") - } community.increaseClock() @@ -1105,17 +1106,12 @@ func (m *Manager) SetShard(communityID types.HexBytes, shard *shard.Shard) (*Com return community, nil } -func (m *Manager) UpdatePubsubTopicPrivateKey(community *Community, privKey *ecdsa.PrivateKey) error { - community.SetPubsubTopicPrivateKey(privKey) - +func (m *Manager) UpdatePubsubTopicPrivateKey(topic string, privKey *ecdsa.PrivateKey) error { if privKey != nil { - topic := community.PubsubTopic() - if err := m.transport.StorePubsubTopicKey(topic, privKey); err != nil { - return err - } + return m.transport.StorePubsubTopicKey(topic, privKey) } - return nil + return m.transport.RemovePubsubTopicKey(topic) } // EditCommunity takes a description, updates the community with the description, diff --git a/protocol/messenger_communities.go b/protocol/messenger_communities.go index d519388a4..d9d61a578 100644 --- a/protocol/messenger_communities.go +++ b/protocol/messenger_communities.go @@ -715,6 +715,16 @@ func (m *Messenger) subscribeToCommunityShard(communityID []byte, shard *shard.S return m.transport.SubscribeToPubsubTopic(pubsubTopic, pubK) } +func (m *Messenger) unsubscribeFromShard(shard *shard.Shard) error { + if m.transport.WakuVersion() != 2 { + return nil + } + + // TODO: this should probably be moved completely to transport once pubsub topic logic is implemented + + return m.transport.UnsubscribeFromPubsubTopic(shard.PubsubTopic()) +} + func (m *Messenger) joinCommunity(ctx context.Context, communityID types.HexBytes, forceJoin bool) (*MessengerResponse, error) { logger := m.logger.Named("joinCommunity") @@ -2053,26 +2063,57 @@ func (m *Messenger) SetCommunityShard(request *requests.SetCommunityShard) (*Mes return nil, err } - community, err := m.communitiesManager.SetShard(request.CommunityID, request.Shard) + community, err := m.communitiesManager.GetByID(request.CommunityID) if err != nil { return nil, err } - var topicPrivKey *ecdsa.PrivateKey - if request.PrivateKey != nil { - topicPrivKey, err = crypto.ToECDSA(*request.PrivateKey) - } else { - topicPrivKey, err = crypto.GenerateKey() + if !community.IsControlNode() { + return nil, errors.New("not admin or owner") } + + // Reset the community private key + community.SetPubsubTopicPrivateKey(nil) + + // Removing the private key (if it exist) + err = m.RemovePubsubTopicPrivateKey(community.PubsubTopic()) if err != nil { return nil, err } - err = m.communitiesManager.UpdatePubsubTopicPrivateKey(community, topicPrivKey) + // Unsubscribing from existing shard + if community.Shard() != nil { + err := m.unsubscribeFromShard(community.Shard()) + if err != nil { + return nil, err + } + } + + community, err = m.communitiesManager.SetShard(request.CommunityID, request.Shard) if err != nil { return nil, err } + if request.Shard != nil { + var topicPrivKey *ecdsa.PrivateKey + if request.PrivateKey != nil { + topicPrivKey, err = crypto.ToECDSA(*request.PrivateKey) + } else { + topicPrivKey, err = crypto.GenerateKey() + } + if err != nil { + return nil, err + } + + community.SetPubsubTopicPrivateKey(topicPrivKey) + + err = m.communitiesManager.UpdatePubsubTopicPrivateKey(community.PubsubTopic(), topicPrivKey) + if err != nil { + return nil, err + } + } + + // TODO: Check err = m.UpdateCommunityFilters(community) if err != nil { return nil, err @@ -2094,6 +2135,10 @@ func (m *Messenger) SetCommunityShard(request *requests.SetCommunityShard) (*Mes return response, nil } +func (m *Messenger) RemovePubsubTopicPrivateKey(topic string) error { + return m.transport.RemovePubsubTopicKey(topic) +} + func (m *Messenger) UpdateCommunityFilters(community *communities.Community) error { defaultFilters := m.DefaultFilters(community) publicFiltersToInit := make([]transport.FiltersToInitialize, 0, len(defaultFilters)+len(community.Chats())) @@ -2480,15 +2525,16 @@ func (m *Messenger) SendCommunityShardKey(community *communities.Community, pubk return nil } + keyBytes := make([]byte, 0) key := community.PubsubTopicPrivateKey() - if key == nil { - return nil // No community shard key available + if key != nil { + keyBytes = crypto.FromECDSA(key) } communityShardKey := &protobuf.CommunityShardKey{ Clock: community.Clock(), CommunityId: community.ID(), - PrivateKey: crypto.FromECDSA(key), + PrivateKey: keyBytes, Shard: community.Shard().Protobuffer(), } @@ -2831,14 +2877,32 @@ func (m *Messenger) handleCommunityShardAndFiltersFromProto(community *communiti } var privKey *ecdsa.PrivateKey = nil - if message.PrivateKey != nil { - privKey, err = crypto.ToECDSA(message.PrivateKey) + if message.Shard != nil { + if message.PrivateKey != nil { + privKey, err = crypto.ToECDSA(message.PrivateKey) + if err != nil { + return err + } + } + } + + // Removing the existing private key (if any) + err = m.RemovePubsubTopicPrivateKey(community.PubsubTopic()) + if err != nil { + return err + } + + // Unsubscribing from existing shard + if community.Shard() != nil { + err := m.unsubscribeFromShard(community.Shard()) if err != nil { return err } } - err = m.communitiesManager.UpdatePubsubTopicPrivateKey(community, privKey) + community.SetPubsubTopicPrivateKey(privKey) + + err = m.communitiesManager.UpdatePubsubTopicPrivateKey(community.PubsubTopic(), privKey) if err != nil { return err } diff --git a/protocol/transport/transport.go b/protocol/transport/transport.go index 32adde08c..b3031101a 100644 --- a/protocol/transport/transport.go +++ b/protocol/transport/transport.go @@ -695,6 +695,14 @@ func (t *Transport) SubscribeToPubsubTopic(topic string, optPublicKey *ecdsa.Pub return nil } +// Unsubscribe from a pubsub topic +func (t *Transport) UnsubscribeFromPubsubTopic(topic string) error { + if t.waku.Version() == 2 { + return t.waku.UnsubscribeFromPubsubTopic(topic) + } + return nil +} + func (t *Transport) StorePubsubTopicKey(topic string, privKey *ecdsa.PrivateKey) error { return t.waku.StorePubsubTopicKey(topic, privKey) } @@ -702,3 +710,10 @@ func (t *Transport) StorePubsubTopicKey(topic string, privKey *ecdsa.PrivateKey) func (t *Transport) RetrievePubsubTopicKey(topic string) (*ecdsa.PrivateKey, error) { return t.waku.RetrievePubsubTopicKey(topic) } + +func (t *Transport) RemovePubsubTopicKey(topic string) error { + if t.waku.Version() == 2 { + return t.waku.RemovePubsubTopicKey(topic) + } + return nil +} diff --git a/wakuv2/persistence/signed_messages.go b/wakuv2/persistence/signed_messages.go index e9522f464..aad6d812a 100644 --- a/wakuv2/persistence/signed_messages.go +++ b/wakuv2/persistence/signed_messages.go @@ -17,6 +17,7 @@ type ProtectedTopicsStore struct { insertStmt *sql.Stmt fetchPrivKeyStmt *sql.Stmt + deleteStmt *sql.Stmt } // Creates a new DB store using the db specified via options. @@ -33,11 +34,17 @@ func NewProtectedTopicsStore(log *zap.Logger, db *sql.DB) (*ProtectedTopicsStore return nil, err } + deleteStmt, err := db.Prepare("DELETE FROM pubsubtopic_signing_key WHERE topic = ?") + if err != nil { + return nil, err + } + result := new(ProtectedTopicsStore) result.log = log.Named("protected-topics-store") result.db = db result.insertStmt = insertStmt result.fetchPrivKeyStmt = fetchPrivKeyStmt + result.deleteStmt = deleteStmt return result, nil } @@ -64,6 +71,11 @@ func (p *ProtectedTopicsStore) Insert(pubsubTopic string, privKey *ecdsa.Private return err } +func (p *ProtectedTopicsStore) Delete(pubsubTopic string) error { + _, err := p.deleteStmt.Exec(pubsubTopic) + return err +} + func (p *ProtectedTopicsStore) FetchPrivateKey(topic string) (privKey *ecdsa.PrivateKey, err error) { var privKeyBytes []byte err = p.fetchPrivKeyStmt.QueryRow(topic).Scan(&privKeyBytes) diff --git a/wakuv2/waku.go b/wakuv2/waku.go index f99e10222..cc10657be 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -611,6 +611,18 @@ func (w *Waku) getPubsubTopic(topic string) string { return topic } +func (w *Waku) unsubscribeFromPubsubTopicWithWakuRelay(topic string) error { + topic = w.getPubsubTopic(topic) + + if !w.node.Relay().IsSubscribed(topic) { + return nil + } + + contentFilter := protocol.NewContentFilter(topic) + + return w.node.Relay().Unsubscribe(w.ctx, contentFilter) +} + func (w *Waku) subscribeToPubsubTopicWithWakuRelay(topic string, pubkey *ecdsa.PublicKey) error { if w.settings.LightClient { return errors.New("only available for full nodes") @@ -1546,6 +1558,18 @@ func (w *Waku) SubscribeToPubsubTopic(topic string, pubkey *ecdsa.PublicKey) err return nil } +func (w *Waku) UnsubscribeFromPubsubTopic(topic string) error { + topic = w.getPubsubTopic(topic) + + if !w.settings.LightClient { + err := w.unsubscribeFromPubsubTopicWithWakuRelay(topic) + if err != nil { + return err + } + } + return nil +} + func (w *Waku) RetrievePubsubTopicKey(topic string) (*ecdsa.PrivateKey, error) { topic = w.getPubsubTopic(topic) if w.protectedTopicStore == nil { @@ -1564,6 +1588,15 @@ func (w *Waku) StorePubsubTopicKey(topic string, privKey *ecdsa.PrivateKey) erro return w.protectedTopicStore.Insert(topic, privKey, &privKey.PublicKey) } +func (w *Waku) RemovePubsubTopicKey(topic string) error { + topic = w.getPubsubTopic(topic) + if w.protectedTopicStore == nil { + return nil + } + + return w.protectedTopicStore.Delete(topic) +} + func (w *Waku) StartDiscV5() error { if w.node.DiscV5() == nil { return errors.New("discv5 is not setup")