2019-09-04 06:25:33 +00:00
package mailservers
2019-09-04 10:04:17 +00:00
import (
"database/sql"
2019-09-04 18:23:17 +00:00
"database/sql/driver"
"encoding/json"
"errors"
2019-09-04 10:04:17 +00:00
"fmt"
"strings"
2021-01-14 22:15:13 +00:00
"time"
2022-11-04 13:56:45 +00:00
"github.com/libp2p/go-libp2p/core/peer"
2024-08-16 18:24:21 +00:00
"github.com/multiformats/go-multiaddr"
2024-09-05 14:25:26 +00:00
"github.com/waku-org/go-waku/waku/v2/protocol/enr"
2024-08-16 18:24:21 +00:00
"github.com/waku-org/go-waku/waku/v2/utils"
2022-01-31 10:33:56 +00:00
"github.com/ethereum/go-ethereum/p2p/enode"
2023-05-22 21:38:02 +00:00
2021-01-14 22:15:13 +00:00
"github.com/status-im/status-go/protocol/transport"
2019-09-04 10:04:17 +00:00
)
2019-09-04 06:25:33 +00:00
2024-09-05 14:25:26 +00:00
func MustDecodeENR ( enrStr string ) * enode . Node {
node , err := enode . Parse ( enode . ValidSchemes , enrStr )
if err != nil || node == nil {
panic ( "could not decode enr: " + enrStr )
}
return node
}
func MustDecodeMultiaddress ( multiaddrsStr string ) * multiaddr . Multiaddr {
maddr , err := multiaddr . NewMultiaddr ( multiaddrsStr )
if err != nil || maddr == nil {
panic ( "could not decode multiaddr: " + multiaddrsStr )
}
return & maddr
}
2019-09-04 06:25:33 +00:00
type Mailserver struct {
2024-09-05 14:25:26 +00:00
ID string ` json:"id" `
Name string ` json:"name" `
Custom bool ` json:"custom" `
ENR * enode . Node ` json:"enr" `
Addr * multiaddr . Multiaddr ` json:"addr" `
// Deprecated: only used with WakuV1
2022-01-31 10:33:56 +00:00
Password string ` json:"password,omitempty" `
Fleet string ` json:"fleet" `
FailedRequests uint ` json:"-" `
}
2024-09-05 14:25:26 +00:00
func ( m Mailserver ) PeerInfo ( ) ( * peer . AddrInfo , error ) {
var maddrs [ ] multiaddr . Multiaddr
2022-01-31 10:33:56 +00:00
2024-09-05 14:25:26 +00:00
if m . ENR != nil {
addrInfo , err := enr . EnodeToPeerInfo ( m . ENR )
2022-01-31 10:33:56 +00:00
if err != nil {
return nil , err
}
2024-09-05 14:25:26 +00:00
addrInfo . Addrs = utils . EncapsulatePeerID ( addrInfo . ID , addrInfo . Addrs ... )
maddrs = append ( maddrs , addrInfo . Addrs ... )
}
2024-08-16 18:24:21 +00:00
2024-09-05 14:25:26 +00:00
if m . Addr != nil {
maddrs = append ( maddrs , * m . Addr )
2022-01-31 10:33:56 +00:00
}
2024-09-05 14:25:26 +00:00
p , err := peer . AddrInfosFromP2pAddrs ( maddrs ... )
2022-01-31 10:33:56 +00:00
if err != nil {
return nil , err
}
2024-09-05 14:25:26 +00:00
if len ( p ) != 1 {
return nil , errors . New ( "invalid mailserver setup" )
2022-01-31 10:33:56 +00:00
}
2024-09-05 14:25:26 +00:00
return & p [ 0 ] , nil
2022-01-31 10:33:56 +00:00
}
2024-09-05 14:25:26 +00:00
func ( m Mailserver ) PeerID ( ) ( peer . ID , error ) {
p , err := m . PeerInfo ( )
if err != nil {
return "" , err
2022-01-31 10:33:56 +00:00
}
2024-09-05 14:25:26 +00:00
return p . ID , nil
2019-09-04 06:25:33 +00:00
}
2019-09-04 18:23:17 +00:00
func ( m Mailserver ) nullablePassword ( ) ( val sql . NullString ) {
if m . Password != "" {
val . String = m . Password
val . Valid = true
}
return
}
2019-09-04 10:04:17 +00:00
type MailserverRequestGap struct {
ID string ` json:"id" `
ChatID string ` json:"chatId" `
From uint64 ` json:"from" `
To uint64 ` json:"to" `
}
2019-09-04 18:23:17 +00:00
type MailserverTopic struct {
2023-05-22 21:38:02 +00:00
PubsubTopic string ` json:"pubsubTopic" `
ContentTopic string ` json:"topic" `
Discovery bool ` json:"discovery?" `
Negotiated bool ` json:"negotiated?" `
ChatIDs [ ] string ` json:"chat-ids" `
LastRequest int ` json:"last-request" ` // default is 1
2019-09-04 18:23:17 +00:00
}
2019-09-06 13:02:31 +00:00
type ChatRequestRange struct {
ChatID string ` json:"chat-id" `
LowestRequestFrom int ` json:"lowest-request-from" `
HighestRequestTo int ` json:"highest-request-to" `
}
2019-09-04 18:23:17 +00:00
// sqlStringSlice helps to serialize a slice of strings into a single column using JSON serialization.
type sqlStringSlice [ ] string
// Scan implements the Scanner interface.
func ( ss * sqlStringSlice ) Scan ( value interface { } ) error {
if value == nil {
* ss = nil
return nil
2019-09-04 06:25:33 +00:00
}
2019-09-04 18:23:17 +00:00
src , ok := value . ( [ ] byte )
if ! ok {
return errors . New ( "invalid value type, expected byte slice" )
}
return json . Unmarshal ( src , ss )
}
// Value implements the driver Valuer interface.
func ( ss sqlStringSlice ) Value ( ) ( driver . Value , error ) {
return json . Marshal ( ss )
2019-09-04 06:25:33 +00:00
}
// Database sql wrapper for operations with mailserver objects.
type Database struct {
db * sql . DB
}
func NewDB ( db * sql . DB ) * Database {
return & Database { db : db }
}
func ( d * Database ) Add ( mailserver Mailserver ) error {
2024-09-05 14:25:26 +00:00
// TODO: we are only storing the multiaddress.
// In a future PR we must allow storing multiple multiaddresses and ENR
2019-09-04 06:25:33 +00:00
_ , err := d . db . Exec ( ` INSERT OR REPLACE INTO mailservers (
id ,
name ,
address ,
password ,
fleet
) VALUES ( ? , ? , ? , ? , ? ) ` ,
mailserver . ID ,
mailserver . Name ,
2024-09-05 14:25:26 +00:00
( * mailserver . Addr ) . String ( ) ,
2019-09-04 06:25:33 +00:00
mailserver . nullablePassword ( ) ,
mailserver . Fleet ,
)
return err
}
func ( d * Database ) Mailservers ( ) ( [ ] Mailserver , error ) {
rows , err := d . db . Query ( ` SELECT id, name, address, password, fleet FROM mailservers ` )
if err != nil {
return nil , err
}
defer rows . Close ( )
2024-02-20 15:49:39 +00:00
return toMailservers ( rows )
}
func toMailservers ( rows * sql . Rows ) ( [ ] Mailserver , error ) {
var result [ ] Mailserver
2019-09-04 06:25:33 +00:00
for rows . Next ( ) {
var (
m Mailserver
2024-09-05 14:25:26 +00:00
addrStr string
2019-09-04 06:25:33 +00:00
password sql . NullString
)
if err := rows . Scan (
& m . ID ,
& m . Name ,
2024-09-05 14:25:26 +00:00
& addrStr ,
2019-09-04 06:25:33 +00:00
& password ,
& m . Fleet ,
) ; err != nil {
return nil , err
}
2022-01-31 10:33:56 +00:00
m . Custom = true
2019-09-04 06:25:33 +00:00
if password . Valid {
m . Password = password . String
}
2024-09-05 14:25:26 +00:00
// TODO: we are only storing the multiaddress.
// In a future PR we must allow storing multiple multiaddresses and ENR
maddr , err := multiaddr . NewMultiaddr ( addrStr )
if err != nil {
return nil , err
}
m . Addr = & maddr
2019-09-04 06:25:33 +00:00
result = append ( result , m )
}
return result , nil
}
func ( d * Database ) Delete ( id string ) error {
_ , err := d . db . Exec ( ` DELETE FROM mailservers WHERE id = ? ` , id )
return err
}
2019-09-04 10:04:17 +00:00
func ( d * Database ) AddGaps ( gaps [ ] MailserverRequestGap ) error {
tx , err := d . db . Begin ( )
if err != nil {
return err
}
defer func ( ) {
if err == nil {
err = tx . Commit ( )
return
}
_ = tx . Rollback ( )
} ( )
for _ , gap := range gaps {
2024-02-20 15:49:39 +00:00
_ , err = tx . Exec ( ` INSERT OR REPLACE INTO mailserver_request_gaps (
2019-09-04 10:04:17 +00:00
id ,
chat_id ,
gap_from ,
gap_to
) VALUES ( ? , ? , ? , ? ) ` ,
gap . ID ,
gap . ChatID ,
gap . From ,
gap . To ,
)
if err != nil {
return err
}
}
return nil
}
2019-09-04 18:23:17 +00:00
func ( d * Database ) RequestGaps ( chatID string ) ( [ ] MailserverRequestGap , error ) {
2019-09-04 10:04:17 +00:00
var result [ ] MailserverRequestGap
rows , err := d . db . Query ( ` SELECT id, chat_id, gap_from, gap_to FROM mailserver_request_gaps WHERE chat_id = ? ` , chatID )
if err != nil {
return nil , err
}
defer rows . Close ( )
for rows . Next ( ) {
var m MailserverRequestGap
if err := rows . Scan (
& m . ID ,
& m . ChatID ,
& m . From ,
& m . To ,
) ; err != nil {
return nil , err
}
result = append ( result , m )
}
return result , nil
}
func ( d * Database ) DeleteGaps ( ids [ ] string ) error {
if len ( ids ) == 0 {
return nil
}
inVector := strings . Repeat ( "?, " , len ( ids ) - 1 ) + "?"
query := fmt . Sprintf ( ` DELETE FROM mailserver_request_gaps WHERE id IN (%s) ` , inVector ) // nolint: gosec
idsArgs := make ( [ ] interface { } , 0 , len ( ids ) )
for _ , id := range ids {
idsArgs = append ( idsArgs , id )
}
_ , err := d . db . Exec ( query , idsArgs ... )
return err
}
func ( d * Database ) DeleteGapsByChatID ( chatID string ) error {
_ , err := d . db . Exec ( ` DELETE FROM mailserver_request_gaps WHERE chat_id = ? ` , chatID )
return err
}
2019-09-04 18:23:17 +00:00
func ( d * Database ) AddTopic ( topic MailserverTopic ) error {
2021-01-14 22:15:13 +00:00
2019-09-04 18:23:17 +00:00
chatIDs := sqlStringSlice ( topic . ChatIDs )
_ , err := d . db . Exec ( ` INSERT OR REPLACE INTO mailserver_topics (
2023-05-22 21:38:02 +00:00
pubsub_topic ,
2019-09-04 18:23:17 +00:00
topic ,
chat_ids ,
Move to protobuf for Message type (#1706)
* Use a single Message type `v1/message.go` and `message.go` are the same now, and they embed `protobuf.ChatMessage`
* Use `SendChatMessage` for sending chat messages, this is basically the old `Send` but a bit more flexible so we can send different message types (stickers,commands), and not just text.
* Remove dedup from services/shhext. Because now we process in status-protocol, dedup makes less sense, as those messages are going to be processed anyway, so removing for now, we can re-evaluate if bringing it to status-go or not.
* Change the various retrieveX method to a single one:
`RetrieveAll` will be processing those messages that it can process (Currently only `Message`), and return the rest in `RawMessages` (still transit). The format for the response is:
`Chats`: -> The chats updated by receiving the message
`Messages`: -> The messages retrieved (already matched to a chat)
`Contacts`: -> The contacts updated by the messages
`RawMessages` -> Anything else that can't be parsed, eventually as we move everything to status-protocol-go this will go away.
2019-12-05 16:25:34 +00:00
last_request ,
discovery ,
negotiated
2023-05-22 21:38:02 +00:00
) VALUES ( ? , ? , ? , ? , ? , ? ) ` ,
topic . PubsubTopic ,
topic . ContentTopic ,
2019-09-04 18:23:17 +00:00
chatIDs ,
topic . LastRequest ,
Move to protobuf for Message type (#1706)
* Use a single Message type `v1/message.go` and `message.go` are the same now, and they embed `protobuf.ChatMessage`
* Use `SendChatMessage` for sending chat messages, this is basically the old `Send` but a bit more flexible so we can send different message types (stickers,commands), and not just text.
* Remove dedup from services/shhext. Because now we process in status-protocol, dedup makes less sense, as those messages are going to be processed anyway, so removing for now, we can re-evaluate if bringing it to status-go or not.
* Change the various retrieveX method to a single one:
`RetrieveAll` will be processing those messages that it can process (Currently only `Message`), and return the rest in `RawMessages` (still transit). The format for the response is:
`Chats`: -> The chats updated by receiving the message
`Messages`: -> The messages retrieved (already matched to a chat)
`Contacts`: -> The contacts updated by the messages
`RawMessages` -> Anything else that can't be parsed, eventually as we move everything to status-protocol-go this will go away.
2019-12-05 16:25:34 +00:00
topic . Discovery ,
topic . Negotiated ,
2019-09-04 18:23:17 +00:00
)
return err
}
2021-01-14 22:15:13 +00:00
func ( d * Database ) AddTopics ( topics [ ] MailserverTopic ) ( err error ) {
var tx * sql . Tx
tx , err = d . db . Begin ( )
if err != nil {
return
}
defer func ( ) {
if err == nil {
err = tx . Commit ( )
return
}
_ = tx . Rollback ( )
} ( )
for _ , topic := range topics {
chatIDs := sqlStringSlice ( topic . ChatIDs )
_ , err = tx . Exec ( ` INSERT OR REPLACE INTO mailserver_topics (
2023-05-22 21:38:02 +00:00
pubsub_topic ,
2021-01-14 22:15:13 +00:00
topic ,
chat_ids ,
last_request ,
discovery ,
negotiated
2023-05-22 21:38:02 +00:00
) VALUES ( ? , ? , ? , ? , ? , ? ) ` ,
topic . PubsubTopic ,
topic . ContentTopic ,
2021-01-14 22:15:13 +00:00
chatIDs ,
topic . LastRequest ,
topic . Discovery ,
topic . Negotiated ,
)
if err != nil {
return
}
}
return
}
2019-09-04 18:23:17 +00:00
func ( d * Database ) Topics ( ) ( [ ] MailserverTopic , error ) {
var result [ ] MailserverTopic
2023-05-22 21:38:02 +00:00
rows , err := d . db . Query ( ` SELECT pubsub_topic, topic, chat_ids, last_request,discovery,negotiated FROM mailserver_topics ` )
2019-09-04 18:23:17 +00:00
if err != nil {
return nil , err
}
2020-05-14 10:51:32 +00:00
defer rows . Close ( )
2019-09-04 18:23:17 +00:00
for rows . Next ( ) {
var (
t MailserverTopic
chatIDs sqlStringSlice
)
if err := rows . Scan (
2023-05-22 21:38:02 +00:00
& t . PubsubTopic ,
& t . ContentTopic ,
2019-09-04 18:23:17 +00:00
& chatIDs ,
& t . LastRequest ,
Move to protobuf for Message type (#1706)
* Use a single Message type `v1/message.go` and `message.go` are the same now, and they embed `protobuf.ChatMessage`
* Use `SendChatMessage` for sending chat messages, this is basically the old `Send` but a bit more flexible so we can send different message types (stickers,commands), and not just text.
* Remove dedup from services/shhext. Because now we process in status-protocol, dedup makes less sense, as those messages are going to be processed anyway, so removing for now, we can re-evaluate if bringing it to status-go or not.
* Change the various retrieveX method to a single one:
`RetrieveAll` will be processing those messages that it can process (Currently only `Message`), and return the rest in `RawMessages` (still transit). The format for the response is:
`Chats`: -> The chats updated by receiving the message
`Messages`: -> The messages retrieved (already matched to a chat)
`Contacts`: -> The contacts updated by the messages
`RawMessages` -> Anything else that can't be parsed, eventually as we move everything to status-protocol-go this will go away.
2019-12-05 16:25:34 +00:00
& t . Discovery ,
& t . Negotiated ,
2019-09-04 18:23:17 +00:00
) ; err != nil {
return nil , err
}
t . ChatIDs = chatIDs
result = append ( result , t )
}
return result , nil
}
2023-05-22 21:38:02 +00:00
func ( d * Database ) ResetLastRequest ( pubsubTopic , contentTopic string ) error {
_ , err := d . db . Exec ( "UPDATE mailserver_topics SET last_request = 0 WHERE pubsub_topic = ? AND topic = ?" , pubsubTopic , contentTopic )
2021-12-09 21:55:00 +00:00
return err
}
2023-05-22 21:38:02 +00:00
func ( d * Database ) DeleteTopic ( pubsubTopic , contentTopic string ) error {
_ , err := d . db . Exec ( ` DELETE FROM mailserver_topics WHERE pubsub_topic = ? AND topic = ? ` , pubsubTopic , contentTopic )
2019-09-04 18:23:17 +00:00
return err
}
2019-09-06 13:02:31 +00:00
2021-01-14 22:15:13 +00:00
// SetTopics deletes all topics excepts the one set, or upsert those if
// missing
func ( d * Database ) SetTopics ( filters [ ] * transport . Filter ) ( err error ) {
var tx * sql . Tx
tx , err = d . db . Begin ( )
if err != nil {
return err
}
defer func ( ) {
if err == nil {
err = tx . Commit ( )
return
}
_ = tx . Rollback ( )
} ( )
if len ( filters ) == 0 {
return nil
}
2023-05-22 21:38:02 +00:00
contentTopicsPerPubsubTopic := make ( map [ string ] map [ string ] struct { } )
2021-01-14 22:15:13 +00:00
for _ , filter := range filters {
2023-05-22 21:38:02 +00:00
contentTopics , ok := contentTopicsPerPubsubTopic [ filter . PubsubTopic ]
if ! ok {
contentTopics = make ( map [ string ] struct { } )
}
contentTopics [ filter . ContentTopic . String ( ) ] = struct { } { }
contentTopicsPerPubsubTopic [ filter . PubsubTopic ] = contentTopics
2021-01-14 22:15:13 +00:00
}
2023-05-22 21:38:02 +00:00
for pubsubTopic , contentTopics := range contentTopicsPerPubsubTopic {
topicsArgs := make ( [ ] interface { } , 0 , len ( contentTopics ) + 1 )
topicsArgs = append ( topicsArgs , pubsubTopic )
for ct := range contentTopics {
topicsArgs = append ( topicsArgs , ct )
}
2021-01-14 22:15:13 +00:00
2023-05-22 21:38:02 +00:00
inVector := strings . Repeat ( "?, " , len ( contentTopics ) - 1 ) + "?"
// Delete topics
query := "DELETE FROM mailserver_topics WHERE pubsub_topic = ? AND topic NOT IN (" + inVector + ")" // nolint: gosec
_ , err = tx . Exec ( query , topicsArgs ... )
}
2021-01-14 22:15:13 +00:00
// Default to now - 1.day
lastRequest := ( time . Now ( ) . Add ( - 24 * time . Hour ) ) . Unix ( )
// Insert if not existing
for _ , filter := range filters {
// fetch
var topic string
2023-05-22 21:38:02 +00:00
err = tx . QueryRow ( ` SELECT topic FROM mailserver_topics WHERE topic = ? AND pubsub_topic = ? ` , filter . ContentTopic . String ( ) , filter . PubsubTopic ) . Scan ( & topic )
2021-01-14 22:15:13 +00:00
if err != nil && err != sql . ErrNoRows {
return
} else if err == sql . ErrNoRows {
// we insert the topic
2023-05-22 21:38:02 +00:00
_ , err = tx . Exec ( ` INSERT INTO mailserver_topics(topic,pubsub_topic,last_request,discovery,negotiated) VALUES (?,?,?,?,?) ` , filter . ContentTopic . String ( ) , filter . PubsubTopic , lastRequest , filter . Discovery , filter . Negotiated )
2021-01-14 22:15:13 +00:00
}
if err != nil {
return
}
}
return
}
2019-09-06 13:02:31 +00:00
func ( d * Database ) AddChatRequestRange ( req ChatRequestRange ) error {
_ , err := d . db . Exec ( ` INSERT OR REPLACE INTO mailserver_chat_request_ranges (
chat_id ,
lowest_request_from ,
highest_request_to
) VALUES ( ? , ? , ? ) ` ,
req . ChatID ,
req . LowestRequestFrom ,
req . HighestRequestTo ,
)
return err
}
2021-01-14 22:15:13 +00:00
func ( d * Database ) AddChatRequestRanges ( reqs [ ] ChatRequestRange ) ( err error ) {
var tx * sql . Tx
tx , err = d . db . Begin ( )
if err != nil {
return err
}
defer func ( ) {
if err == nil {
err = tx . Commit ( )
return
}
_ = tx . Rollback ( )
} ( )
for _ , req := range reqs {
_ , err = tx . Exec ( ` INSERT OR REPLACE INTO mailserver_chat_request_ranges (
chat_id ,
lowest_request_from ,
highest_request_to
) VALUES ( ? , ? , ? ) ` ,
req . ChatID ,
req . LowestRequestFrom ,
req . HighestRequestTo ,
)
if err != nil {
return
}
}
return
}
2019-09-06 13:02:31 +00:00
func ( d * Database ) ChatRequestRanges ( ) ( [ ] ChatRequestRange , error ) {
var result [ ] ChatRequestRange
rows , err := d . db . Query ( ` SELECT chat_id, lowest_request_from, highest_request_to FROM mailserver_chat_request_ranges ` )
if err != nil {
return nil , err
}
2020-05-14 10:51:32 +00:00
defer rows . Close ( )
2019-09-06 13:02:31 +00:00
for rows . Next ( ) {
var req ChatRequestRange
if err := rows . Scan (
& req . ChatID ,
& req . LowestRequestFrom ,
& req . HighestRequestTo ,
) ; err != nil {
return nil , err
}
result = append ( result , req )
}
return result , nil
}
func ( d * Database ) DeleteChatRequestRange ( chatID string ) error {
_ , err := d . db . Exec ( ` DELETE FROM mailserver_chat_request_ranges WHERE chat_id = ? ` , chatID )
return err
}