Jakub Sokołowski 768539255a
skip EventMsgDelete events in Status
Signed-off-by: Jakub Sokołowski <jakub@status.im>
2019-10-15 12:00:24 +02:00

257 lines
6.2 KiB
Go

package status
import (
"context"
"crypto/ecdsa"
"database/sql"
"fmt"
"strings"
"time"
"github.com/google/uuid"
"github.com/pkg/errors"
"go.uber.org/zap"
"github.com/42wim/matterbridge/bridge"
"github.com/42wim/matterbridge/bridge/config"
crypto "github.com/ethereum/go-ethereum/crypto"
gonode "github.com/status-im/status-go/node"
params "github.com/status-im/status-go/params"
status "github.com/status-im/status-protocol-go"
v1 "github.com/status-im/status-protocol-go/v1"
)
type Bstatus struct {
*bridge.Config
// message fetching loop controls
fetchInterval time.Duration
fetchTimeout time.Duration
fetchDone chan bool
// Whisper node settings
whisperListenPort int
whisperListenAddr string
whisperDataDir string
privateKey *ecdsa.PrivateKey // secret for Status chat identity
nodeConfig *params.NodeConfig // configuration for Whisper node
statusNode *gonode.StatusNode // Ethereum Whisper node to run in background
messenger *status.Messenger // Status messaging layer instance
}
func New(cfg *bridge.Config) bridge.Bridger {
return &Bstatus{
Config: cfg,
fetchDone: make(chan bool),
whisperListenPort: 30303,
whisperListenAddr: "0.0.0.0",
// TODO parametrize those
whisperDataDir: "/tmp/matterbridge-status-data",
fetchTimeout: 500 * time.Millisecond,
fetchInterval: 500 * time.Millisecond,
}
}
func (b *Bstatus) Connect() error {
keyHex := strings.TrimPrefix(b.GetString("PrivateKey"), "0x")
if privKey, err := crypto.HexToECDSA(keyHex); err != nil {
return errors.Wrap(err, "Failed to parse PrivateKey")
} else {
b.privateKey = privKey
}
b.nodeConfig = b.generateConfig()
b.statusNode = gonode.New()
accsMgr, _ := b.statusNode.AccountManager()
if err := b.statusNode.Start(b.nodeConfig, accsMgr); err != nil {
return errors.Wrap(err, "Failed to start Status node")
}
shhService, err := b.statusNode.WhisperService()
if err != nil {
return errors.Wrap(err, "Failed to get Whisper service")
}
// Create a custom logger to suppress DEBUG messages
//logger, _ := zap.NewProduction()
logger := zap.NewNop()
// Using an in-memory SQLite DB since we have nothing worth preserving
db, _ := sql.Open("sqlite3", "file:mem?mode=memory&cache=shared")
options := []status.Option{
status.WithDatabase(db),
status.WithCustomLogger(logger),
}
var instID string = uuid.New().String()
messenger, err := status.NewMessenger(
b.privateKey,
shhService,
instID,
options...,
)
if err != nil {
return errors.Wrap(err, "Failed to create Messenger")
}
b.messenger = messenger
// Start a routine for periodically fetching messages
go b.fetchMessagesLoop()
return nil
}
func (b *Bstatus) Disconnect() error {
b.stopMessagesLoops()
if err := b.messenger.Shutdown(); err != nil {
return errors.Wrap(err, "Failed to stop Status messenger")
}
if err := b.statusNode.Stop(); err != nil {
return errors.Wrap(err, "Failed to stop Status node")
}
return nil
}
func (b *Bstatus) JoinChannel(channel config.ChannelInfo) error {
chat := status.CreatePublicChat(channel.Name)
b.messenger.Join(chat)
b.messenger.SaveChat(chat)
return nil
}
func (b *Bstatus) Send(msg config.Message) (string, error) {
if !b.Connected() {
return "", fmt.Errorf("bridge %s not connected, dropping message %#v to bridge", b.Account, msg)
}
if skipBridgeMessage(msg) {
return "", nil
}
b.Log.Infof("=> Sending message %#v", msg)
// Use a timeout for sending messages
ctx, cancel := context.WithTimeout(context.Background(), b.fetchTimeout)
defer cancel()
payload := []byte(msg.Username + msg.Text)
msgHash, err := b.messenger.Send(ctx, msg.Channel, payload)
if err != nil {
return "", errors.Wrap(err, "failed to send message")
}
// TODO handle the delivery event?
return fmt.Sprintf("%#x", msgHash), nil
}
func (b *Bstatus) Connected() bool {
return b.statusNode.IsRunning()
}
func (b *Bstatus) generateConfig() *params.NodeConfig {
options := []params.Option{
params.WithFleet(params.FleetBeta),
b.withListenAddr(),
}
var configFiles []string
config, err := params.NewNodeConfigWithDefaultsAndFiles(
b.whisperDataDir,
params.MainNetworkID,
options,
configFiles,
)
if err != nil {
b.Log.WithError(err).Error("Failed to generate config")
}
return config
}
func (b *Bstatus) stopMessagesLoops() {
close(b.fetchDone)
}
func (b *Bstatus) fetchMessagesLoop() {
t := time.NewTicker(b.fetchInterval)
defer t.Stop()
for {
select {
case <-t.C:
msgs, err := b.retrieveLatestMessages()
if err != nil {
b.Log.WithError(err).Error("Failed to retrieve messages")
continue
}
for _, msg := range msgs {
if b.skipStatusMessage(msg) {
continue
}
b.propagateMessage(msg)
}
case <-b.fetchDone:
return
}
}
}
func (b *Bstatus) retrieveLatestMessages() (msgs []*v1.Message, err error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
msgs, err = b.messenger.RetrieveAll(ctx, status.RetrieveLatest)
return
}
func (b *Bstatus) propagateMessage(msg *v1.Message) {
pubKey := publicKeyToHex(msg.SigPubKey)
alias := getThreeWordName(pubKey)
// Send message for processing
b.Remote <- config.Message{
Timestamp: msg.Timestamp.Time(),
Username: alias,
UserID: pubKey,
Text: msg.Text,
Channel: msg.ChatID,
ID: fmt.Sprintf("%#x", msg.ID),
Account: b.Account,
}
}
// skipStatusMessage defines which Status messages can be ignored
func (b *Bstatus) skipStatusMessage(msg *v1.Message) bool {
// skip messages from ourselves
if isPubKeyEqual(msg.SigPubKey, &b.privateKey.PublicKey) {
return true
}
// skip empty messages
if msg.Text == "" {
return true
}
return false
}
// skipBridgeMessage defines which messages from the bridge should be ignored
func skipBridgeMessage(msg config.Message) bool {
// skip delete messages
if msg.Event == config.EventMsgDelete {
return true
}
return false
}
func (b *Bstatus) withListenAddr() params.Option {
if addr := b.GetString("ListenAddr"); addr != "" {
b.whisperListenAddr = addr
}
if port := b.GetInt("ListenPort"); port != 0 {
b.whisperListenPort = port
}
return func(c *params.NodeConfig) error {
c.ListenAddr = fmt.Sprintf("%s:%d", b.whisperListenAddr, b.whisperListenPort)
return nil
}
}