Upgrade status-protocol-go (#1586)

This commit is contained in:
Andrea Maria Piana 2019-08-29 08:33:46 +02:00 committed by GitHub
parent 0b403ce25b
commit b8ea79a3f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 359 additions and 142 deletions

View File

@ -1 +1 @@
0.32.0-beta.0
0.33.0-beta.0

2
go.mod
View File

@ -21,7 +21,7 @@ require (
github.com/status-im/doubleratchet v2.0.0+incompatible
github.com/status-im/migrate/v4 v4.0.0-20190821140204-a9d340ec8fb76af4afda06acf01740d45d2661ed
github.com/status-im/rendezvous v1.3.0
github.com/status-im/status-protocol-go v0.0.0-20190701094942-1dd03f74d712ac9b965b718963dc86ca9c3540fb
github.com/status-im/status-protocol-go v0.0.0-20190701094942-822d18916e417b768a13f85601a7b10e02236655
github.com/status-im/whisper v1.4.14
github.com/stretchr/testify v1.3.1-0.20190712000136-221dbe5ed467
github.com/syndtr/goleveldb v1.0.0

8
go.sum
View File

@ -450,8 +450,8 @@ github.com/status-im/migrate/v4 v4.0.0-20190821140204-a9d340ec8fb76af4afda06acf0
github.com/status-im/migrate/v4 v4.0.0-20190821140204-a9d340ec8fb76af4afda06acf01740d45d2661ed/go.mod h1:r8HggRBZ/k7TRwByq/Hp3P/ubFppIna0nvyavVK0pjA=
github.com/status-im/rendezvous v1.3.0 h1:7RK/MXXW+tlm0asKm1u7Qp7Yni6AO29a7j8+E4Lbjg4=
github.com/status-im/rendezvous v1.3.0/go.mod h1:+hzjuP+j/XzLPeF6E50b88pWOTLdTcwjvNYt+Gh1W1s=
github.com/status-im/status-protocol-go v0.0.0-20190701094942-1dd03f74d712ac9b965b718963dc86ca9c3540fb h1:zJ7gHLGOw3CeTOri61Y6dO+j/vxlg8TKxFgydMWErnU=
github.com/status-im/status-protocol-go v0.0.0-20190701094942-1dd03f74d712ac9b965b718963dc86ca9c3540fb/go.mod h1:ASO3pqc3cEFT3LPKfClp4f39jbP1m2XvSmIfBk9Ssxc=
github.com/status-im/status-protocol-go v0.0.0-20190701094942-822d18916e417b768a13f85601a7b10e02236655 h1:7lJRfkt9fzt+wpL+1rgfhvFThs2yP8VJm3BHGbgfWsg=
github.com/status-im/status-protocol-go v0.0.0-20190701094942-822d18916e417b768a13f85601a7b10e02236655/go.mod h1:zEvtd2lNRzsqo4RCLU6WlK0EKFTGEoAAuYfj+lZjHNQ=
github.com/status-im/whisper v1.4.14 h1:9VHqx4+PUYfhDnYYtDxHkg/3cfVvkHjPNciY4LO83yc=
github.com/status-im/whisper v1.4.14/go.mod h1:WS6z39YJQ8WJa9s+DmTuEM/s2nVF6Iz3B1SZYw5cYf0=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
@ -469,8 +469,8 @@ github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpP
github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/uber/jaeger-client-go v0.0.0-20180607151842-f7e0d4744fa6/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v0.0.0-20180615202729-a51202d6f4a7/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/vacp2p/mvds v0.0.0-20190705123435-a8dc37599bf99958b0ea19431c9bb7a47bd7fa8f h1:iBJcVbX4RaPtufXJ/PJtCF1jYPzlmyEF+imcQq5pTDs=
github.com/vacp2p/mvds v0.0.0-20190705123435-a8dc37599bf99958b0ea19431c9bb7a47bd7fa8f/go.mod h1:Xc3UWtA230lUzZoXStJHQd/BkqJK5BAZyXBsiR6XrXM=
github.com/vacp2p/mvds v0.0.0-20190705123435-943ab5e228078f6389dffe00946c0d0a5b495a5c h1:O7gT6vNipoBxFe19iWtDyUjIcIbkJ5MYhMXLS+RRCFs=
github.com/vacp2p/mvds v0.0.0-20190705123435-943ab5e228078f6389dffe00946c0d0a5b495a5c/go.mod h1:pIqr2Hg4cIkTJniGPCp4ptong2jxgxx6uToVoY94+II=
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1/go.mod h1:8UvriyWtv5Q5EOgjHaSseUEdkQfvwFv1I/In/O2M9gc=
github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc h1:9lDbC6Rz4bwmou+oE6Dt4Cb2BGMur5eR/GYptkKUVHo=
github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc/go.mod h1:bopw91TMyo8J3tvftk8xmU2kPmlrt4nScJQZU2hE5EM=

View File

@ -571,8 +571,8 @@ func (api *PublicAPI) SaveChat(parent context.Context, chat statusproto.Chat) er
return api.service.messenger.SaveChat(chat)
}
func (api *PublicAPI) Chats(parent context.Context, to, from int) ([]*statusproto.Chat, error) {
return api.service.messenger.Chats(to, from)
func (api *PublicAPI) Chats(parent context.Context) ([]*statusproto.Chat, error) {
return api.service.messenger.Chats()
}
func (api *PublicAPI) DeleteChat(parent context.Context, chatID string) error {

View File

@ -1,6 +1,11 @@
package statusproto
import "crypto/ecdsa"
import (
"crypto/ecdsa"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
)
type ChatPagination struct {
From uint
@ -79,3 +84,11 @@ type ChatMember struct {
// Joined indicates if the member has joined the group chat
Joined bool `json:"joined"`
}
func (c ChatMember) PublicKey() (*ecdsa.PublicKey, error) {
b, err := hexutil.Decode(c.ID)
if err != nil {
return nil, err
}
return crypto.UnmarshalPubkey(b)
}

View File

@ -1,5 +1,18 @@
package statusproto
import (
"crypto/ecdsa"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
)
const (
contactBlocked = "contact/blocked"
contactAdded = "contact/added"
contactRequestReceived = "contact/request-received"
)
// ContactDeviceInfo is a struct containing information about a particular device owned by a contact
type ContactDeviceInfo struct {
// The installation id of the device
@ -13,7 +26,7 @@ type ContactDeviceInfo struct {
// Contact has information about a "Contact". A contact is not necessarily one
// that we added or added us, that's based on SystemTags.
type Contact struct {
// ID of the contact
// ID of the contact. It's a hex-encoded public key (prefixed with 0x).
ID string `json:"id"`
// Ethereum address of the contact
Address string `json:"address"`
@ -31,3 +44,33 @@ type Contact struct {
DeviceInfo []ContactDeviceInfo `json:"deviceInfo"`
TributeToTalk string `json:"tributeToTalk"`
}
func (c Contact) PublicKey() (*ecdsa.PublicKey, error) {
b, err := hexutil.Decode(c.ID)
if err != nil {
return nil, err
}
return crypto.UnmarshalPubkey(b)
}
func (c Contact) IsAdded() bool {
return existsInStringSlice(c.SystemTags, contactAdded)
}
func (c Contact) HasBeenAdded() bool {
return existsInStringSlice(c.SystemTags, contactRequestReceived)
}
func (c Contact) IsBlocked() bool {
return existsInStringSlice(c.SystemTags, contactBlocked)
}
// existsInStringSlice checks if a string is in a set.
func existsInStringSlice(set []string, find string) bool {
for _, s := range set {
if s == find {
return true
}
}
return false
}

View File

@ -20,9 +20,7 @@ require (
github.com/status-im/migrate/v4 v4.0.0-20190821140204-a9d340ec8fb76af4afda06acf01740d45d2661ed
github.com/status-im/whisper v1.4.14
github.com/stretchr/testify v1.3.1-0.20190712000136-221dbe5ed467
github.com/vacp2p/mvds v0.0.0-20190705123435-a8dc37599bf99958b0ea19431c9bb7a47bd7fa8f
go.uber.org/atomic v1.4.0 // indirect
go.uber.org/multierr v1.1.0 // indirect
github.com/vacp2p/mvds v0.0.0-20190705123435-943ab5e228078f6389dffe00946c0d0a5b495a5c
go.uber.org/zap v1.10.0
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4
golang.org/x/net v0.0.0-20190628185345-da137c7871d7 // indirect

View File

@ -252,8 +252,8 @@ github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2/go.mod h1:Z4AUp2K
github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/uber/jaeger-client-go v0.0.0-20180607151842-f7e0d4744fa6/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v0.0.0-20180615202729-a51202d6f4a7/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/vacp2p/mvds v0.0.0-20190705123435-a8dc37599bf99958b0ea19431c9bb7a47bd7fa8f h1:iBJcVbX4RaPtufXJ/PJtCF1jYPzlmyEF+imcQq5pTDs=
github.com/vacp2p/mvds v0.0.0-20190705123435-a8dc37599bf99958b0ea19431c9bb7a47bd7fa8f/go.mod h1:Xc3UWtA230lUzZoXStJHQd/BkqJK5BAZyXBsiR6XrXM=
github.com/vacp2p/mvds v0.0.0-20190705123435-943ab5e228078f6389dffe00946c0d0a5b495a5c h1:O7gT6vNipoBxFe19iWtDyUjIcIbkJ5MYhMXLS+RRCFs=
github.com/vacp2p/mvds v0.0.0-20190705123435-943ab5e228078f6389dffe00946c0d0a5b495a5c/go.mod h1:pIqr2Hg4cIkTJniGPCp4ptong2jxgxx6uToVoY94+II=
github.com/xanzy/go-gitlab v0.15.0/go.mod h1:8zdQa/ri1dfn8eS3Ir1SyfvOKlw7WBJ8DVThkpGiXrs=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=

View File

@ -249,6 +249,7 @@ func NewMessenger(
nil,
c.envelopesMonitorConfig,
logger,
transport.SetGenericDiscoveryTopicSupport(c.featureFlags.genericDiscoveryTopicEnabled),
)
if err != nil {
return nil, errors.Wrap(err, "failed to create a WhisperServiceTransport")
@ -272,6 +273,7 @@ func NewMessenger(
datasyncpeer.PublicKeyToPeerID(identity.PublicKey),
datasyncnode.BATCH,
datasync.CalculateSendTime,
logger,
)
if err != nil {
return nil, errors.Wrap(err, "failed to create a persistent datasync node")
@ -315,6 +317,66 @@ func NewMessenger(
return messenger, nil
}
// Init analyzes chats and contacts in order to setup filters
// which are responsible for retrieving messages.
func (m *Messenger) Init() error {
logger := m.logger.With(zap.String("site", "Init"))
var (
publicChatIDs []string
publicKeys []*ecdsa.PublicKey
)
// Get chat IDs and public keys from the existing chats.
// TODO: Get only active chats by the query.
chats, err := m.Chats()
if err != nil {
return err
}
for _, chat := range chats {
if !chat.Active {
continue
}
switch chat.ChatType {
case ChatTypePublic:
publicChatIDs = append(publicChatIDs, chat.ID)
case ChatTypeOneToOne:
publicKeys = append(publicKeys, chat.PublicKey)
case ChatTypePrivateGroupChat:
for _, member := range chat.Members {
publicKey, err := member.PublicKey()
if err != nil {
return errors.Wrapf(err, "invalid public key for member %s in chat %s", member.ID, chat.Name)
}
publicKeys = append(publicKeys, publicKey)
}
default:
return errors.New("invalid chat type")
}
}
// Get chat IDs and public keys from the contacts.
contacts, err := m.Contacts()
if err != nil {
return err
}
for _, contact := range contacts {
// We only need filters for contacts added by us and not blocked.
if !contact.IsAdded() || contact.IsBlocked() {
continue
}
publicKey, err := contact.PublicKey()
if err != nil {
logger.Error("failed to get contact's public key", zap.Error(err))
continue
}
publicKeys = append(publicKeys, publicKey)
}
_, err = m.adapter.transport.InitFilters(publicChatIDs, publicKeys)
return err
}
// Shutdown takes care of ensuring a clean shutdown of Messenger
func (m *Messenger) Shutdown() (err error) {
for _, task := range m.shutdownTasks {
@ -394,8 +456,8 @@ func (m *Messenger) SaveChat(chat Chat) error {
return m.persistence.SaveChat(chat)
}
func (m *Messenger) Chats(from, to int) ([]*Chat, error) {
return m.persistence.Chats(from, to)
func (m *Messenger) Chats() ([]*Chat, error) {
return m.persistence.Chats()
}
func (m *Messenger) DeleteChat(chatID string) error {
@ -592,13 +654,13 @@ func (m *Messenger) RetrieveRawAll() (map[transport.Filter][]*protocol.StatusMes
}
// DEPRECATED
func (m *Messenger) LoadFilters(chats []*transport.Filter) ([]*transport.Filter, error) {
return m.adapter.transport.LoadFilters(chats, m.featureFlags.genericDiscoveryTopicEnabled)
func (m *Messenger) LoadFilters(filters []*transport.Filter) ([]*transport.Filter, error) {
return m.adapter.transport.LoadFilters(filters)
}
// DEPRECATED
func (m *Messenger) RemoveFilters(chats []*transport.Filter) error {
return m.adapter.transport.RemoveFilters(chats)
func (m *Messenger) RemoveFilters(filters []*transport.Filter) error {
return m.adapter.transport.RemoveFilters(filters)
}
// DEPRECATED

View File

@ -113,11 +113,11 @@ func (db sqlitePersistence) DeleteChat(chatID string) error {
return err
}
func (db sqlitePersistence) Chats(from, to int) ([]*Chat, error) {
return db.chats(from, to, nil)
func (db sqlitePersistence) Chats() ([]*Chat, error) {
return db.chats(nil)
}
func (db sqlitePersistence) chats(from, to int, tx *sql.Tx) ([]*Chat, error) {
func (db sqlitePersistence) chats(tx *sql.Tx) ([]*Chat, error) {
var err error
if tx == nil {
@ -137,22 +137,22 @@ func (db sqlitePersistence) chats(from, to int, tx *sql.Tx) ([]*Chat, error) {
}
rows, err := tx.Query(`SELECT
id,
name,
color,
active,
type,
timestamp,
deleted_at_clock_value,
public_key,
unviewed_message_count,
last_clock_value,
last_message_content_type,
last_message_content,
members,
membership_updates
id,
name,
color,
active,
type,
timestamp,
deleted_at_clock_value,
public_key,
unviewed_message_count,
last_clock_value,
last_message_content_type,
last_message_content,
members,
membership_updates
FROM chats
ORDER BY chats.timestamp DESC LIMIT ? OFFSET ?`, to, from)
ORDER BY chats.timestamp DESC`)
if err != nil {
return nil, err
}

View File

@ -377,5 +377,5 @@ func (db sqlitePersistence) BlockContact(contact Contact) ([]*Chat, error) {
}
// return the updated chats
return db.chats(0, -1, tx)
return db.chats(tx)
}

View File

@ -145,29 +145,29 @@ func (s *filtersManager) Init(
s.mutex.Lock()
defer s.mutex.Unlock()
var allChats []*Filter
for _, chat := range s.filters {
allChats = append(allChats, chat)
var allFilters []*Filter
for _, f := range s.filters {
allFilters = append(allFilters, f)
}
return allChats, nil
return allFilters, nil
}
// DEPRECATED
func (s *filtersManager) InitWithChats(chats []*Filter, genericDiscoveryTopicEnabled bool) ([]*Filter, error) {
func (s *filtersManager) InitWithFilters(filters []*Filter, genericDiscoveryTopicEnabled bool) ([]*Filter, error) {
var (
chatIDs []string
publicKeys []*ecdsa.PublicKey
)
for _, chat := range chats {
if chat.Identity != "" && chat.OneToOne {
publicKey, err := strToPublicKey(chat.Identity)
for _, filter := range filters {
if filter.Identity != "" && filter.OneToOne {
publicKey, err := strToPublicKey(filter.Identity)
if err != nil {
return nil, err
}
publicKeys = append(publicKeys, publicKey)
} else if chat.ChatID != "" {
chatIDs = append(chatIDs, chat.ChatID)
} else if filter.ChatID != "" {
chatIDs = append(chatIDs, filter.ChatID)
}
}
@ -175,55 +175,55 @@ func (s *filtersManager) InitWithChats(chats []*Filter, genericDiscoveryTopicEna
}
func (s *filtersManager) Reset() error {
var chats []*Filter
var filters []*Filter
s.mutex.Lock()
for _, chat := range s.filters {
chats = append(chats, chat)
for _, f := range s.filters {
filters = append(filters, f)
}
s.mutex.Unlock()
return s.Remove(chats...)
return s.Remove(filters...)
}
func (s *filtersManager) Chats() (result []*Filter) {
func (s *filtersManager) Filters() (result []*Filter) {
s.mutex.Lock()
defer s.mutex.Unlock()
for _, chat := range s.filters {
result = append(result, chat)
for _, f := range s.filters {
result = append(result, f)
}
return
}
// ChatByID returns a chat by id.
func (s *filtersManager) ChatByID(chatID string) *Filter {
func (s *filtersManager) Filter(chatID string) *Filter {
s.mutex.Lock()
defer s.mutex.Unlock()
return s.filters[chatID]
}
func (s *filtersManager) ChatByFilterID(filterID string) *Filter {
// FilterByFilterID returns a Filter with a given Whisper filter ID.
func (s *filtersManager) FilterByFilterID(filterID string) *Filter {
s.mutex.Lock()
defer s.mutex.Unlock()
for _, chat := range s.filters {
if chat.FilterID == filterID {
return chat
for _, f := range s.filters {
if f.FilterID == filterID {
return f
}
}
return nil
}
func (s *filtersManager) ChatsByPublicKey(publicKey *ecdsa.PublicKey) (result []*Filter) {
func (s *filtersManager) FiltersByPublicKey(publicKey *ecdsa.PublicKey) (result []*Filter) {
s.mutex.Lock()
defer s.mutex.Unlock()
identityStr := publicKeyToStr(publicKey)
for _, chat := range s.filters {
if chat.Identity == identityStr {
result = append(result, chat)
for _, f := range s.filters {
if f.Identity == identityStr {
result = append(result, f)
}
}
@ -231,18 +231,18 @@ func (s *filtersManager) ChatsByPublicKey(publicKey *ecdsa.PublicKey) (result []
}
// Remove remove all the filters associated with a chat/identity
func (s *filtersManager) Remove(chats ...*Filter) error {
func (s *filtersManager) Remove(filters ...*Filter) error {
s.mutex.Lock()
defer s.mutex.Unlock()
for _, chat := range chats {
if err := s.whisper.Unsubscribe(chat.FilterID); err != nil {
for _, f := range filters {
if err := s.whisper.Unsubscribe(f.FilterID); err != nil {
return err
}
if chat.SymKeyID != "" {
s.whisper.DeleteSymKey(chat.SymKeyID)
if f.SymKeyID != "" {
s.whisper.DeleteSymKey(f.SymKeyID)
}
delete(s.filters, chat.ChatID)
delete(s.filters, f.ChatID)
}
return nil

View File

@ -56,16 +56,27 @@ func (m *whisperServiceKeysManager) RawSymKey(id string) ([]byte, error) {
return m.shh.GetSymKey(id)
}
type Option func(*WhisperServiceTransport) error
func SetGenericDiscoveryTopicSupport(val bool) Option {
return func(t *WhisperServiceTransport) error {
t.genericDiscoveryTopicEnabled = val
return nil
}
}
// WhisperServiceTransport is a transport based on Whisper service.
type WhisperServiceTransport struct {
shh *whisper.Whisper
shhAPI *whisper.PublicWhisperAPI // only PublicWhisperAPI implements logic to send messages
keysManager *whisperServiceKeysManager
chats *filtersManager
filters *filtersManager
logger *zap.Logger
mailservers []string
envelopesMonitor *EnvelopesMonitor
genericDiscoveryTopicEnabled bool
}
// NewWhisperService returns a new WhisperServiceTransport.
@ -76,8 +87,9 @@ func NewWhisperServiceTransport(
mailservers []string,
envelopesMonitorConfig *EnvelopesMonitorConfig,
logger *zap.Logger,
opts ...Option,
) (*WhisperServiceTransport, error) {
chats, err := newFiltersManager(db, shh, privateKey, logger)
filtersManager, err := newFiltersManager(db, shh, privateKey, logger)
if err != nil {
return nil, err
}
@ -87,38 +99,54 @@ func NewWhisperServiceTransport(
envelopesMonitor = NewEnvelopesMonitor(shh, envelopesMonitorConfig)
envelopesMonitor.Start()
}
return &WhisperServiceTransport{
t := &WhisperServiceTransport{
shh: shh,
shhAPI: whisper.NewPublicWhisperAPI(shh),
envelopesMonitor: envelopesMonitor,
keysManager: &whisperServiceKeysManager{
shh: shh,
privateKey: privateKey,
passToSymKeyCache: make(map[string]string),
},
chats: chats,
filters: filtersManager,
mailservers: mailservers,
logger: logger.With(zap.Namespace("WhisperServiceTransport")),
}, nil
}
for _, opt := range opts {
if err := opt(t); err != nil {
return nil, err
}
}
return t, nil
}
func (a *WhisperServiceTransport) InitFilters(chatIDs []string, publicKeys []*ecdsa.PublicKey) ([]*Filter, error) {
return a.filters.Init(chatIDs, publicKeys, a.genericDiscoveryTopicEnabled)
}
func (a *WhisperServiceTransport) Filters() []*Filter {
return a.filters.Filters()
}
// DEPRECATED
func (a *WhisperServiceTransport) LoadFilters(chats []*Filter, genericDiscoveryTopicEnabled bool) ([]*Filter, error) {
return a.chats.InitWithChats(chats, genericDiscoveryTopicEnabled)
func (a *WhisperServiceTransport) LoadFilters(filters []*Filter) ([]*Filter, error) {
return a.filters.InitWithFilters(filters, a.genericDiscoveryTopicEnabled)
}
// DEPRECATED
func (a *WhisperServiceTransport) RemoveFilters(chats []*Filter) error {
return a.chats.Remove(chats...)
func (a *WhisperServiceTransport) RemoveFilters(filters []*Filter) error {
return a.filters.Remove(filters...)
}
func (a *WhisperServiceTransport) Reset() error {
return a.chats.Reset()
return a.filters.Reset()
}
func (a *WhisperServiceTransport) ProcessNegotiatedSecret(secret NegotiatedSecret) (*Filter, error) {
filter, err := a.chats.LoadNegotiated(secret)
filter, err := a.filters.LoadNegotiated(secret)
if err != nil {
return nil, err
}
@ -126,26 +154,26 @@ func (a *WhisperServiceTransport) ProcessNegotiatedSecret(secret NegotiatedSecre
}
func (a *WhisperServiceTransport) JoinPublic(chatID string) error {
_, err := a.chats.LoadPublic(chatID)
_, err := a.filters.LoadPublic(chatID)
return err
}
func (a *WhisperServiceTransport) LeavePublic(chatID string) error {
chat := a.chats.ChatByID(chatID)
chat := a.filters.Filter(chatID)
if chat != nil {
return nil
}
return a.chats.Remove(chat)
return a.filters.Remove(chat)
}
func (a *WhisperServiceTransport) JoinPrivate(publicKey *ecdsa.PublicKey) error {
_, err := a.chats.LoadContactCode(publicKey)
_, err := a.filters.LoadContactCode(publicKey)
return err
}
func (a *WhisperServiceTransport) LeavePrivate(publicKey *ecdsa.PublicKey) error {
chats := a.chats.ChatsByPublicKey(publicKey)
return a.chats.Remove(chats...)
filters := a.filters.FiltersByPublicKey(publicKey)
return a.filters.Remove(filters...)
}
type ChatMessages struct {
@ -157,15 +185,15 @@ type ChatMessages struct {
func (a *WhisperServiceTransport) RetrieveAllMessages() ([]ChatMessages, error) {
chatMessages := make(map[string]ChatMessages)
for _, chat := range a.chats.Chats() {
f := a.shh.GetFilter(chat.FilterID)
for _, filter := range a.filters.Filters() {
f := a.shh.GetFilter(filter.FilterID)
if f == nil {
return nil, errors.New("failed to return a filter")
}
messages := chatMessages[chat.ChatID]
messages.ChatID = chat.ChatID
messages.Public = chat.IsPublic()
messages := chatMessages[filter.ChatID]
messages.ChatID = filter.ChatID
messages.Public = filter.IsPublic()
messages.Messages = append(messages.Messages, f.Retrieve()...)
}
@ -177,12 +205,12 @@ func (a *WhisperServiceTransport) RetrieveAllMessages() ([]ChatMessages, error)
}
func (a *WhisperServiceTransport) RetrievePublicMessages(chatID string) ([]*whisper.ReceivedMessage, error) {
chat, err := a.chats.LoadPublic(chatID)
filter, err := a.filters.LoadPublic(chatID)
if err != nil {
return nil, err
}
f := a.shh.GetFilter(chat.FilterID)
f := a.shh.GetFilter(filter.FilterID)
if f == nil {
return nil, errors.New("failed to return a filter")
}
@ -191,8 +219,8 @@ func (a *WhisperServiceTransport) RetrievePublicMessages(chatID string) ([]*whis
}
func (a *WhisperServiceTransport) RetrievePrivateMessages(publicKey *ecdsa.PublicKey) ([]*whisper.ReceivedMessage, error) {
chats := a.chats.ChatsByPublicKey(publicKey)
discoveryChats, err := a.chats.Init(nil, nil, true)
chats := a.filters.FiltersByPublicKey(publicKey)
discoveryChats, err := a.filters.Init(nil, nil, true)
if err != nil {
return nil, err
}
@ -215,14 +243,14 @@ func (a *WhisperServiceTransport) RetrievePrivateMessages(publicKey *ecdsa.Publi
func (a *WhisperServiceTransport) RetrieveRawAll() (map[Filter][]*whisper.ReceivedMessage, error) {
result := make(map[Filter][]*whisper.ReceivedMessage)
allChats := a.chats.Chats()
for _, chat := range allChats {
f := a.shh.GetFilter(chat.FilterID)
allFilters := a.filters.Filters()
for _, filter := range allFilters {
f := a.shh.GetFilter(filter.FilterID)
if f == nil {
return nil, errors.New("failed to return a filter")
}
result[*chat] = append(result[*chat], f.Retrieve()...)
result[*filter] = append(result[*filter], f.Retrieve()...)
}
return result, nil
@ -245,13 +273,13 @@ func (a *WhisperServiceTransport) SendPublic(ctx context.Context, newMessage *wh
return nil, err
}
chat, err := a.chats.LoadPublic(chatName)
filter, err := a.filters.LoadPublic(chatName)
if err != nil {
return nil, err
}
newMessage.SymKeyID = chat.SymKeyID
newMessage.Topic = chat.Topic
newMessage.SymKeyID = filter.SymKeyID
newMessage.Topic = filter.Topic
return a.shhAPI.Post(ctx, *newMessage)
}
@ -261,7 +289,7 @@ func (a *WhisperServiceTransport) SendPrivateWithSharedSecret(ctx context.Contex
return nil, err
}
chat, err := a.chats.LoadNegotiated(NegotiatedSecret{
filter, err := a.filters.LoadNegotiated(NegotiatedSecret{
PublicKey: publicKey,
Key: secret,
})
@ -269,8 +297,8 @@ func (a *WhisperServiceTransport) SendPrivateWithSharedSecret(ctx context.Contex
return nil, err
}
newMessage.SymKeyID = chat.SymKeyID
newMessage.Topic = chat.Topic
newMessage.SymKeyID = filter.SymKeyID
newMessage.Topic = filter.Topic
newMessage.PublicKey = nil
return a.shhAPI.Post(ctx, *newMessage)
@ -281,12 +309,12 @@ func (a *WhisperServiceTransport) SendPrivateWithPartitioned(ctx context.Context
return nil, err
}
chat, err := a.chats.LoadPartitioned(publicKey)
filter, err := a.filters.LoadPartitioned(publicKey)
if err != nil {
return nil, err
}
newMessage.Topic = chat.Topic
newMessage.Topic = filter.Topic
newMessage.PublicKey = crypto.FromECDSAPub(publicKey)
return a.shhAPI.Post(ctx, *newMessage)

View File

@ -6,11 +6,13 @@ package node
import (
"context"
"database/sql"
"encoding/hex"
"fmt"
"log"
"sync/atomic"
"time"
"go.uber.org/zap"
"github.com/vacp2p/mvds/peers"
"github.com/vacp2p/mvds/protobuf"
"github.com/vacp2p/mvds/state"
@ -31,6 +33,8 @@ type CalculateNextEpoch func(count uint64, epoch int64) int64
// Node represents an MVDS node, it runs all the logic like sending and receiving protocol messages.
type Node struct {
// This needs to be declared first: https://github.com/golang/go/issues/9959
epoch int64
ctx context.Context
cancel context.CancelFunc
@ -48,10 +52,11 @@ type Node struct {
ID state.PeerID
epochPersistence *epochSQLitePersistence
epoch int64
mode Mode
subscription chan protobuf.Message
logger *zap.Logger
}
func NewPersistentNode(
@ -60,8 +65,13 @@ func NewPersistentNode(
id state.PeerID,
mode Mode,
nextEpoch CalculateNextEpoch,
logger *zap.Logger,
) (*Node, error) {
ctx, cancel := context.WithCancel(context.Background())
if logger == nil {
logger = zap.NewNop()
}
node := Node{
ID: id,
ctx: ctx,
@ -73,6 +83,7 @@ func NewPersistentNode(
payloads: newPayloads(),
epochPersistence: newEpochSQLitePersistence(db),
nextEpoch: nextEpoch,
logger: logger.With(zap.Namespace("mvds")),
mode: mode,
}
if currentEpoch, err := node.epochPersistence.Get(id); err != nil {
@ -89,8 +100,13 @@ func NewEphemeralNode(
nextEpoch CalculateNextEpoch,
currentEpoch int64,
mode Mode,
logger *zap.Logger,
) *Node {
ctx, cancel := context.WithCancel(context.Background())
if logger == nil {
logger = zap.NewNop()
}
return &Node{
ID: id,
ctx: ctx,
@ -102,6 +118,7 @@ func NewEphemeralNode(
payloads: newPayloads(),
nextEpoch: nextEpoch,
epoch: currentEpoch,
logger: logger.With(zap.Namespace("mvds")),
mode: mode,
}
}
@ -116,8 +133,13 @@ func NewNode(
id state.PeerID,
mode Mode,
pp peers.Persistence,
logger *zap.Logger,
) *Node {
ctx, cancel := context.WithCancel(context.Background())
if logger == nil {
logger = zap.NewNop()
}
return &Node{
ctx: ctx,
cancel: cancel,
@ -129,6 +151,7 @@ func NewNode(
nextEpoch: nextEpoch,
ID: id,
epoch: currentEpoch,
logger: logger.With(zap.Namespace("mvds")),
mode: mode,
}
}
@ -143,7 +166,7 @@ func (n *Node) Start(duration time.Duration) {
for {
select {
case <-n.ctx.Done():
log.Print("Watch stopped")
n.logger.Info("Watch stopped")
return
default:
p := n.transport.Watch()
@ -156,20 +179,20 @@ func (n *Node) Start(duration time.Duration) {
for {
select {
case <-n.ctx.Done():
log.Print("Epoch processing stopped")
n.logger.Info("Epoch processing stopped")
return
default:
log.Printf("Node: %x Epoch: %d", n.ID[:4], n.epoch)
n.logger.Debug("Epoch processing", zap.String("node", hex.EncodeToString(n.ID[:4])), zap.Int64("epoch", n.epoch))
time.Sleep(duration)
err := n.sendMessages()
if err != nil {
log.Printf("Error sending messages: %+v\n", err)
n.logger.Error("Error sending messages.", zap.Error(err))
}
atomic.AddInt64(&n.epoch, 1)
// When a persistent node is used, the epoch needs to be saved.
if n.epochPersistence != nil {
if err := n.epochPersistence.Set(n.ID, n.epoch); err != nil {
log.Printf("Failed to persisten epoch: %v", err)
n.logger.Error("Failed to persisten epoch", zap.Error(err))
}
}
}
@ -179,7 +202,7 @@ func (n *Node) Start(duration time.Duration) {
// Stop message reading and epoch processing
func (n *Node) Stop() {
log.Print("Stopping node")
n.logger.Info("Stopping node")
n.Unsubscribe()
n.cancel()
}
@ -227,7 +250,10 @@ func (n *Node) AppendMessage(groupID state.GroupID, data []byte) (state.MessageI
n.insertSyncState(&groupID, id, p, t)
}
log.Printf("[%x] node %x sending %x\n", groupID[:4], n.ID[:4], id[:4])
n.logger.Debug("Sending message",
zap.String("node", hex.EncodeToString(n.ID[:4])),
zap.String("groupID", hex.EncodeToString(groupID[:4])),
zap.String("id", hex.EncodeToString(id[:4])))
// @todo think about a way to insta trigger send messages when send was selected, we don't wanna wait for ticks here
return id, nil
@ -275,7 +301,12 @@ func (n *Node) sendMessages() error {
n.payloads.AddOffers(p, m[:])
case state.REQUEST:
n.payloads.AddRequests(p, m[:])
log.Printf("sending REQUEST (%x -> %x): %x\n", n.ID[:4], p[:4], m[:4])
n.logger.Debug("sending REQUEST",
zap.String("from", hex.EncodeToString(n.ID[:4])),
zap.String("to", hex.EncodeToString(p[:4])),
zap.String("messageID", hex.EncodeToString(m[:4])),
)
case state.MESSAGE:
g := *s.GroupID
// TODO: Handle errors
@ -290,26 +321,36 @@ func (n *Node) sendMessages() error {
msg, err := n.store.Get(m)
if err != nil {
log.Printf("failed to retreive message %x %s", m[:4], err.Error())
n.logger.Error("Failed to retreive message",
zap.String("messageID", hex.EncodeToString(m[:4])),
zap.Error(err),
)
return s
}
n.payloads.AddMessages(p, msg)
log.Printf("[%x] sending MESSAGE (%x -> %x): %x\n", g[:4], n.ID[:4], p[:4], m[:4])
n.logger.Debug("sending MESSAGE",
zap.String("groupID", hex.EncodeToString(g[:4])),
zap.String("from", hex.EncodeToString(n.ID[:4])),
zap.String("to", hex.EncodeToString(p[:4])),
zap.String("messageID", hex.EncodeToString(m[:4])),
)
}
return n.updateSendEpoch(s)
})
if err != nil {
log.Printf("error while mapping sync state: %s", err.Error())
n.logger.Error("error while mapping sync state", zap.Error(err))
return err
}
return n.payloads.MapAndClear(func(peer state.PeerID, payload protobuf.Payload) error {
err := n.transport.Send(n.ID, peer, payload)
if err != nil {
log.Printf("error sending message: %s", err.Error())
n.logger.Error("error sending message", zap.Error(err))
return err
}
return nil
@ -320,13 +361,13 @@ func (n *Node) sendMessages() error {
func (n *Node) onPayload(sender state.PeerID, payload protobuf.Payload) {
// Acks, Requests and Offers are all arrays of bytes as protobuf doesn't allow type aliases otherwise arrays of messageIDs would be nicer.
if err := n.onAck(sender, payload.Acks); err != nil {
log.Printf("error processing acks: %s", err.Error())
n.logger.Error("error processing acks", zap.Error(err))
}
if err := n.onRequest(sender, payload.Requests); err != nil {
log.Printf("error processing requests: %s", err.Error())
n.logger.Error("error processing requests", zap.Error(err))
}
if err := n.onOffer(sender, payload.Offers); err != nil {
log.Printf("error processing offers: %s", err.Error())
n.logger.Error("error processing offers", zap.Error(err))
}
messageIds := n.onMessages(sender, payload.Messages)
n.payloads.AddAcks(sender, messageIds)
@ -335,7 +376,11 @@ func (n *Node) onPayload(sender state.PeerID, payload protobuf.Payload) {
func (n *Node) onOffer(sender state.PeerID, offers [][]byte) error {
for _, raw := range offers {
id := toMessageID(raw)
log.Printf("OFFER (%x -> %x): %x received.\n", sender[:4], n.ID[:4], id[:4])
n.logger.Debug("OFFER received",
zap.String("from", hex.EncodeToString(sender[:4])),
zap.String("to", hex.EncodeToString(n.ID[:4])),
zap.String("messageID", hex.EncodeToString(id[:4])),
)
exist, err := n.store.Has(id)
// @todo maybe ack?
@ -355,7 +400,11 @@ func (n *Node) onOffer(sender state.PeerID, offers [][]byte) error {
func (n *Node) onRequest(sender state.PeerID, requests [][]byte) error {
for _, raw := range requests {
id := toMessageID(raw)
log.Printf("REQUEST (%x -> %x): %x received.\n", sender[:4], n.ID[:4], id[:4])
n.logger.Debug("REQUEST received",
zap.String("from", hex.EncodeToString(sender[:4])),
zap.String("to", hex.EncodeToString(n.ID[:4])),
zap.String("messageID", hex.EncodeToString(id[:4])),
)
message, err := n.store.Get(id)
if err != nil {
@ -363,7 +412,7 @@ func (n *Node) onRequest(sender state.PeerID, requests [][]byte) error {
}
if message == nil {
log.Printf("message %x does not exist", id[:4])
n.logger.Error("message does not exist", zap.String("messageID", hex.EncodeToString(id[:4])))
continue
}
@ -375,7 +424,10 @@ func (n *Node) onRequest(sender state.PeerID, requests [][]byte) error {
}
if !exist {
log.Printf("[%x] peer %x is not in group", groupID, sender[:4])
n.logger.Error("peer is not in group",
zap.String("groupID", hex.EncodeToString(groupID[:4])),
zap.String("peer", hex.EncodeToString(sender[:4])),
)
continue
}
@ -391,11 +443,16 @@ func (n *Node) onAck(sender state.PeerID, acks [][]byte) error {
err := n.syncState.Remove(id, sender)
if err != nil {
log.Printf("error while removing sync state %s", err.Error())
n.logger.Error("Error while removing sync state.", zap.Error(err))
return err
}
log.Printf("ACK (%x -> %x): %x received.\n", sender[:4], n.ID[:4], id[:4])
n.logger.Debug("ACK received",
zap.String("from", hex.EncodeToString(sender[:4])),
zap.String("to", hex.EncodeToString(n.ID[:4])),
zap.String("messageID", hex.EncodeToString(id[:4])),
)
}
return nil
}
@ -407,12 +464,18 @@ func (n *Node) onMessages(sender state.PeerID, messages []*protobuf.Message) [][
groupID := toGroupID(m.GroupId)
err := n.onMessage(sender, *m)
if err != nil {
log.Printf("Error processing messsage: %+v\n", err)
n.logger.Error("Error processing message", zap.Error(err))
continue
}
id := m.ID()
log.Printf("[%x] sending ACK (%x -> %x): %x\n", groupID[:4], n.ID[:4], sender[:4], id[:4])
n.logger.Debug("sending ACK",
zap.String("groupID", hex.EncodeToString(groupID[:4])),
zap.String("from", hex.EncodeToString(n.ID[:4])),
zap.String("", hex.EncodeToString(sender[:4])),
zap.String("messageID", hex.EncodeToString(id[:4])),
)
a = append(a, id[:])
}
@ -422,7 +485,11 @@ func (n *Node) onMessages(sender state.PeerID, messages []*protobuf.Message) [][
func (n *Node) onMessage(sender state.PeerID, msg protobuf.Message) error {
id := msg.ID()
groupID := toGroupID(msg.GroupId)
log.Printf("MESSAGE (%x -> %x): %x received.\n", sender[:4], n.ID[:4], id[:4])
n.logger.Debug("MESSAGE received",
zap.String("from", hex.EncodeToString(sender[:4])),
zap.String("to", hex.EncodeToString(n.ID[:4])),
zap.String("messageID", hex.EncodeToString(id[:4])),
)
err := n.syncState.Remove(id, sender)
if err != nil {
@ -466,7 +533,13 @@ func (n *Node) insertSyncState(groupID *state.GroupID, messageID state.MessageID
err := n.syncState.Add(s)
if err != nil {
log.Printf("error (%s) setting sync state group: %x id: %x peer: %x", err.Error(), groupID, messageID, peerID)
n.logger.Error("error setting sync states",
zap.Error(err),
zap.String("groupID", hex.EncodeToString(groupID[:4])),
zap.String("messageID", hex.EncodeToString(messageID[:4])),
zap.String("peerID", hex.EncodeToString(peerID[:4])),
)
}
}

6
vendor/modules.txt vendored
View File

@ -31,6 +31,7 @@ github.com/ethereum/go-ethereum/common
github.com/ethereum/go-ethereum/common/hexutil
github.com/ethereum/go-ethereum/crypto
github.com/ethereum/go-ethereum/ethclient
github.com/ethereum/go-ethereum/event
github.com/ethereum/go-ethereum/log
github.com/ethereum/go-ethereum/node
github.com/ethereum/go-ethereum/p2p/enode
@ -40,7 +41,6 @@ github.com/ethereum/go-ethereum/metrics
github.com/ethereum/go-ethereum
github.com/ethereum/go-ethereum/crypto/ecies
github.com/ethereum/go-ethereum/p2p/enr
github.com/ethereum/go-ethereum/event
github.com/ethereum/go-ethereum/rlp
github.com/ethereum/go-ethereum/accounts/abi
github.com/ethereum/go-ethereum/accounts/abi/bind
@ -317,7 +317,7 @@ github.com/status-im/migrate/v4/database/sqlcipher
github.com/status-im/rendezvous
github.com/status-im/rendezvous/protocol
github.com/status-im/rendezvous/server
# github.com/status-im/status-protocol-go v0.0.0-20190701094942-1dd03f74d712ac9b965b718963dc86ca9c3540fb
# github.com/status-im/status-protocol-go v0.0.0-20190701094942-822d18916e417b768a13f85601a7b10e02236655
github.com/status-im/status-protocol-go/zaputil
github.com/status-im/status-protocol-go
github.com/status-im/status-protocol-go/encryption/multidevice
@ -353,7 +353,7 @@ github.com/syndtr/goleveldb/leveldb/filter
github.com/syndtr/goleveldb/leveldb/journal
github.com/syndtr/goleveldb/leveldb/memdb
github.com/syndtr/goleveldb/leveldb/table
# github.com/vacp2p/mvds v0.0.0-20190705123435-a8dc37599bf99958b0ea19431c9bb7a47bd7fa8f
# github.com/vacp2p/mvds v0.0.0-20190705123435-943ab5e228078f6389dffe00946c0d0a5b495a5c
github.com/vacp2p/mvds/node
github.com/vacp2p/mvds/protobuf
github.com/vacp2p/mvds/state