mirror of https://github.com/status-im/go-waku.git
594 lines
16 KiB
Go
594 lines
16 KiB
Go
package main
|
|
|
|
import (
|
|
"chat2-reliable/pb"
|
|
"context"
|
|
"encoding/hex"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/libp2p/go-libp2p/core/peer"
|
|
"github.com/multiformats/go-multiaddr"
|
|
"github.com/waku-org/go-waku/waku/v2/dnsdisc"
|
|
"github.com/waku-org/go-waku/waku/v2/node"
|
|
"github.com/waku-org/go-waku/waku/v2/payload"
|
|
"github.com/waku-org/go-waku/waku/v2/protocol"
|
|
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
|
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
|
|
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
|
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
|
wrln "github.com/waku-org/go-waku/waku/v2/protocol/rln"
|
|
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
|
"google.golang.org/protobuf/proto"
|
|
)
|
|
|
|
const (
|
|
maxMessageHistory = 100
|
|
)
|
|
|
|
type Chat struct {
|
|
ctx context.Context
|
|
wg sync.WaitGroup
|
|
node *node.WakuNode
|
|
ui UI
|
|
uiReady chan struct{}
|
|
inputChan chan string
|
|
options Options
|
|
C chan *protocol.Envelope
|
|
nick string
|
|
lamportTimestamp int32
|
|
bloomFilter *RollingBloomFilter
|
|
outgoingBuffer []UnacknowledgedMessage
|
|
incomingBuffer []*pb.Message
|
|
messageHistory []*pb.Message
|
|
mutex sync.Mutex
|
|
lamportTSMutex sync.Mutex
|
|
}
|
|
|
|
func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node.PeerConnection, options Options) *Chat {
|
|
chat := &Chat{
|
|
ctx: ctx,
|
|
node: node,
|
|
options: options,
|
|
nick: options.Nickname,
|
|
uiReady: make(chan struct{}, 1),
|
|
inputChan: make(chan string, 100),
|
|
lamportTimestamp: 0,
|
|
bloomFilter: NewRollingBloomFilter(),
|
|
outgoingBuffer: make([]UnacknowledgedMessage, 0),
|
|
incomingBuffer: make([]*pb.Message, 0),
|
|
messageHistory: make([]*pb.Message, 0),
|
|
mutex: sync.Mutex{},
|
|
lamportTSMutex: sync.Mutex{},
|
|
}
|
|
|
|
chat.ui = NewUIModel(chat.uiReady, chat.inputChan)
|
|
|
|
topics := options.Relay.Topics.Value()
|
|
if len(topics) == 0 {
|
|
topics = append(topics, relay.DefaultWakuTopic)
|
|
}
|
|
|
|
if options.Filter.Enable {
|
|
cf := protocol.ContentFilter{
|
|
PubsubTopic: relay.DefaultWakuTopic,
|
|
ContentTopics: protocol.NewContentTopicSet(options.ContentTopic),
|
|
}
|
|
var filterOpt filter.FilterSubscribeOption
|
|
peerID, err := options.Filter.NodePeerID()
|
|
if err != nil {
|
|
filterOpt = filter.WithAutomaticPeerSelection()
|
|
} else {
|
|
filterOpt = filter.WithPeer(peerID)
|
|
chat.ui.InfoMessage(fmt.Sprintf("Subscribing to filter node %s", peerID))
|
|
}
|
|
theFilters, err := node.FilterLightnode().Subscribe(ctx, cf, filterOpt)
|
|
if err != nil {
|
|
chat.ui.ErrorMessage(err)
|
|
} else {
|
|
chat.C = theFilters[0].C // Picking first subscription since there is only 1 contentTopic specified.
|
|
}
|
|
} else {
|
|
for _, topic := range topics {
|
|
sub, err := node.Relay().Subscribe(ctx, protocol.NewContentFilter(topic))
|
|
if err != nil {
|
|
chat.ui.ErrorMessage(err)
|
|
} else {
|
|
chat.C = make(chan *protocol.Envelope)
|
|
go func() {
|
|
for e := range sub[0].Ch {
|
|
chat.C <- e
|
|
}
|
|
}()
|
|
}
|
|
}
|
|
}
|
|
|
|
connWg := sync.WaitGroup{}
|
|
connWg.Add(2)
|
|
|
|
chat.wg.Add(7) // Added 2 more goroutines for periodic tasks
|
|
go chat.parseInput()
|
|
go chat.receiveMessages()
|
|
go chat.welcomeMessage()
|
|
go chat.connectionWatcher(connNotifier)
|
|
go chat.staticNodes(&connWg)
|
|
go chat.discoverNodes(&connWg)
|
|
go chat.retrieveHistory(&connWg)
|
|
|
|
chat.initReliabilityProtocol() // Initialize the reliability protocol
|
|
|
|
return chat
|
|
}
|
|
|
|
func (c *Chat) Stop() {
|
|
c.wg.Wait()
|
|
close(c.inputChan)
|
|
}
|
|
|
|
func (c *Chat) connectionWatcher(connNotifier <-chan node.PeerConnection) {
|
|
defer c.wg.Done()
|
|
for {
|
|
select {
|
|
case conn := <-connNotifier:
|
|
if conn.Connected {
|
|
c.ui.InfoMessage(fmt.Sprintf("Peer %s connected", conn.PeerID.String()))
|
|
} else {
|
|
c.ui.InfoMessage(fmt.Sprintf("Peer %s disconnected", conn.PeerID.String()))
|
|
}
|
|
case <-c.ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Chat) receiveMessages() {
|
|
defer c.wg.Done()
|
|
for {
|
|
select {
|
|
case <-c.ctx.Done():
|
|
return
|
|
case value := <-c.C:
|
|
msgContentTopic := value.Message().ContentTopic
|
|
if msgContentTopic != c.options.ContentTopic {
|
|
continue // Discard messages from other topics
|
|
}
|
|
|
|
msg, err := decodeMessage(c.options.ContentTopic, value.Message())
|
|
if err != nil {
|
|
fmt.Printf("Error decoding message: %v\n", err)
|
|
continue
|
|
}
|
|
|
|
c.processReceivedMessage(msg)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Chat) parseInput() {
|
|
defer c.wg.Done()
|
|
|
|
var disconnectedPeers []peer.ID
|
|
|
|
for {
|
|
select {
|
|
case <-c.ctx.Done():
|
|
return
|
|
case line := <-c.inputChan:
|
|
c.ui.SetSending(true)
|
|
go func() {
|
|
defer c.ui.SetSending(false)
|
|
|
|
// bail if requested
|
|
if line == "/exit" {
|
|
c.ui.Quit()
|
|
fmt.Println("Bye!")
|
|
return
|
|
}
|
|
|
|
// add peer
|
|
if strings.HasPrefix(line, "/connect") {
|
|
peer := strings.TrimPrefix(line, "/connect ")
|
|
c.wg.Add(1)
|
|
go func(peer string) {
|
|
defer c.wg.Done()
|
|
|
|
ma, err := multiaddr.NewMultiaddr(peer)
|
|
if err != nil {
|
|
c.ui.ErrorMessage(err)
|
|
return
|
|
}
|
|
|
|
peerID, err := ma.ValueForProtocol(multiaddr.P_P2P)
|
|
if err != nil {
|
|
c.ui.ErrorMessage(err)
|
|
return
|
|
}
|
|
|
|
c.ui.InfoMessage(fmt.Sprintf("Connecting to peer: %s", peerID))
|
|
ctx, cancel := context.WithTimeout(c.ctx, time.Duration(10)*time.Second)
|
|
defer cancel()
|
|
|
|
err = c.node.DialPeerWithMultiAddress(ctx, ma)
|
|
if err != nil {
|
|
c.ui.ErrorMessage(err)
|
|
}
|
|
}(peer)
|
|
return
|
|
}
|
|
|
|
// list peers
|
|
if line == "/peers" {
|
|
peers := c.node.Host().Network().Peers()
|
|
if len(peers) == 0 {
|
|
c.ui.InfoMessage("No peers available")
|
|
} else {
|
|
peerInfoMsg := "Peers: \n"
|
|
for _, p := range peers {
|
|
peerInfo := c.node.Host().Peerstore().PeerInfo(p)
|
|
peerProtocols, err := c.node.Host().Peerstore().GetProtocols(p)
|
|
if err != nil {
|
|
c.ui.ErrorMessage(err)
|
|
return
|
|
}
|
|
peerInfoMsg += fmt.Sprintf("• %s:\n", p.String())
|
|
|
|
var strProtocols []string
|
|
for _, p := range peerProtocols {
|
|
strProtocols = append(strProtocols, string(p))
|
|
}
|
|
|
|
peerInfoMsg += fmt.Sprintf(" Protocols: %s\n", strings.Join(strProtocols, ", "))
|
|
peerInfoMsg += " Addresses:\n"
|
|
for _, addr := range peerInfo.Addrs {
|
|
peerInfoMsg += fmt.Sprintf(" - %s/p2p/%s\n", addr.String(), p.String())
|
|
}
|
|
}
|
|
c.ui.InfoMessage(peerInfoMsg)
|
|
}
|
|
return
|
|
}
|
|
|
|
// change nick
|
|
if strings.HasPrefix(line, "/nick") {
|
|
newNick := strings.TrimSpace(strings.TrimPrefix(line, "/nick "))
|
|
if newNick != "" {
|
|
c.nick = newNick
|
|
} else {
|
|
c.ui.ErrorMessage(errors.New("invalid nickname"))
|
|
}
|
|
return
|
|
}
|
|
|
|
if line == "/help" {
|
|
c.ui.InfoMessage(`Available commands:
|
|
/connect multiaddress - dials a node adding it to the list of connected peers
|
|
/peers - list of peers connected to this node
|
|
/nick newNick - change the user's nickname
|
|
/disconnect - disconnect from all currently connected peers
|
|
/reconnect - attempt to reconnect to previously disconnected peers
|
|
/exit - closes the app`)
|
|
return
|
|
}
|
|
|
|
// Disconnect from peers
|
|
if line == "/disconnect" {
|
|
disconnectedPeers = c.disconnectFromPeers()
|
|
c.ui.InfoMessage("Disconnected from all peers. Use /reconnect to reconnect.")
|
|
return
|
|
}
|
|
|
|
// Reconnect to peers
|
|
if line == "/reconnect" {
|
|
if len(disconnectedPeers) == 0 {
|
|
c.ui.InfoMessage("No disconnection active. Use /disconnect first.")
|
|
} else {
|
|
c.reconnectToPeers(disconnectedPeers)
|
|
disconnectedPeers = nil
|
|
c.ui.InfoMessage("Reconnection initiated.")
|
|
}
|
|
return
|
|
}
|
|
|
|
// If no command matched, send as a regular message
|
|
c.SendMessage(line)
|
|
}()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Chat) publish(ctx context.Context, message *pb.Message) error {
|
|
msgBytes, err := proto.Marshal(message)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
version := uint32(0)
|
|
timestamp := utils.GetUnixEpochFrom(c.node.Timesource().Now())
|
|
keyInfo := &payload.KeyInfo{
|
|
Kind: payload.None,
|
|
}
|
|
|
|
p := new(payload.Payload)
|
|
p.Data = msgBytes
|
|
p.Key = keyInfo
|
|
|
|
payload, err := p.Encode(version)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
wakuMsg := &wpb.WakuMessage{
|
|
Payload: payload,
|
|
Version: proto.Uint32(version),
|
|
ContentTopic: c.options.ContentTopic,
|
|
Timestamp: timestamp,
|
|
}
|
|
|
|
if c.options.RLNRelay.Enable {
|
|
err = c.node.RLNRelay().AppendRLNProof(wakuMsg, c.node.Timesource().Now())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
rateLimitProof, err := wrln.BytesToRateLimitProof(wakuMsg.RateLimitProof)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
c.ui.InfoMessage(fmt.Sprintf("RLN Epoch: %d", rateLimitProof.Epoch.Uint64()))
|
|
}
|
|
|
|
if c.options.LightPush.Enable {
|
|
lightOpt := []lightpush.RequestOption{lightpush.WithDefaultPubsubTopic()}
|
|
var peerID peer.ID
|
|
peerID, err = c.options.LightPush.NodePeerID()
|
|
if err != nil {
|
|
lightOpt = append(lightOpt, lightpush.WithAutomaticPeerSelection())
|
|
} else {
|
|
lightOpt = append(lightOpt, lightpush.WithPeer(peerID))
|
|
}
|
|
|
|
_, err = c.node.Lightpush().Publish(ctx, wakuMsg, lightOpt...)
|
|
} else {
|
|
_, err = c.node.Relay().Publish(ctx, wakuMsg, relay.WithDefaultPubsubTopic())
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func decodeMessage(contentTopic string, wakumsg *wpb.WakuMessage) (*pb.Message, error) {
|
|
keyInfo := &payload.KeyInfo{
|
|
Kind: payload.None,
|
|
}
|
|
|
|
payload, err := payload.DecodePayload(wakumsg, keyInfo)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
msg := &pb.Message{}
|
|
if err := proto.Unmarshal(payload.Data, msg); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return msg, nil
|
|
}
|
|
|
|
func (c *Chat) retrieveHistory(connectionWg *sync.WaitGroup) {
|
|
defer c.wg.Done()
|
|
|
|
connectionWg.Wait() // Wait until node connection operations are
|
|
|
|
if !c.options.Store.Enable {
|
|
return
|
|
}
|
|
|
|
var storeOpt store.RequestOption
|
|
if c.options.Store.Node == nil {
|
|
c.ui.InfoMessage("No store node configured. Choosing one at random...")
|
|
storeOpt = store.WithAutomaticPeerSelection()
|
|
} else {
|
|
pID, err := c.getStoreNodePID()
|
|
if err != nil {
|
|
c.ui.ErrorMessage(err)
|
|
return
|
|
}
|
|
storeOpt = store.WithPeer(*pID)
|
|
c.ui.InfoMessage(fmt.Sprintf("Querying historic messages from %s", pID.String()))
|
|
}
|
|
|
|
tCtx, cancel := context.WithTimeout(c.ctx, 10*time.Second)
|
|
defer cancel()
|
|
|
|
q := store.FilterCriteria{
|
|
ContentFilter: protocol.NewContentFilter(relay.DefaultWakuTopic, c.options.ContentTopic),
|
|
}
|
|
|
|
response, err := c.node.Store().Request(tCtx, q,
|
|
store.WithAutomaticRequestID(),
|
|
storeOpt,
|
|
store.WithPaging(false, 100))
|
|
if err != nil {
|
|
c.ui.ErrorMessage(fmt.Errorf("could not query storenode: %w", err))
|
|
} else {
|
|
if len(response.Messages()) == 0 {
|
|
c.ui.InfoMessage("0 historic messages available")
|
|
} else {
|
|
for _, msg := range response.Messages() {
|
|
c.C <- protocol.NewEnvelope(msg.Message, msg.Message.GetTimestamp(), relay.DefaultWakuTopic)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Chat) staticNodes(connectionWg *sync.WaitGroup) {
|
|
defer c.wg.Done()
|
|
defer connectionWg.Done()
|
|
|
|
<-c.uiReady // wait until UI is ready
|
|
|
|
wg := sync.WaitGroup{}
|
|
|
|
wg.Add(len(c.options.StaticNodes))
|
|
for _, n := range c.options.StaticNodes {
|
|
go func(addr multiaddr.Multiaddr) {
|
|
defer wg.Done()
|
|
ctx, cancel := context.WithTimeout(c.ctx, time.Duration(10)*time.Second)
|
|
defer cancel()
|
|
|
|
c.ui.InfoMessage(fmt.Sprintf("Connecting to %s", addr.String()))
|
|
|
|
err := c.node.DialPeerWithMultiAddress(ctx, addr)
|
|
if err != nil {
|
|
c.ui.ErrorMessage(err)
|
|
}
|
|
}(n)
|
|
}
|
|
|
|
wg.Wait()
|
|
}
|
|
|
|
func (c *Chat) welcomeMessage() {
|
|
defer c.wg.Done()
|
|
|
|
<-c.uiReady // wait until UI is ready
|
|
|
|
c.ui.InfoMessage("Welcome, " + c.nick + "!")
|
|
c.ui.InfoMessage("type /help to see available commands \n")
|
|
|
|
addrMessage := "Listening on:\n"
|
|
for _, addr := range c.node.ListenAddresses() {
|
|
addrMessage += " -" + addr.String() + "\n"
|
|
}
|
|
c.ui.InfoMessage(addrMessage)
|
|
|
|
if !c.options.RLNRelay.Enable {
|
|
return
|
|
}
|
|
|
|
credential, err := c.node.RLNRelay().IdentityCredential()
|
|
if err != nil {
|
|
c.ui.Quit()
|
|
}
|
|
|
|
idx := c.node.RLNRelay().MembershipIndex()
|
|
|
|
idTrapdoor := credential.IDTrapdoor
|
|
idNullifier := credential.IDSecretHash
|
|
idSecretHash := credential.IDSecretHash
|
|
idCommitment := credential.IDCommitment
|
|
|
|
rlnMessage := "RLN config:\n"
|
|
rlnMessage += fmt.Sprintf("- Your membership index is: %d\n", idx)
|
|
rlnMessage += fmt.Sprintf("- Your rln identity trapdoor is: 0x%s\n", hex.EncodeToString(idTrapdoor[:]))
|
|
rlnMessage += fmt.Sprintf("- Your rln identity nullifier is: 0x%s\n", hex.EncodeToString(idNullifier[:]))
|
|
rlnMessage += fmt.Sprintf("- Your rln identity secret hash is: 0x%s\n", hex.EncodeToString(idSecretHash[:]))
|
|
rlnMessage += fmt.Sprintf("- Your rln identity commitment key is: 0x%s\n", hex.EncodeToString(idCommitment[:]))
|
|
|
|
c.ui.InfoMessage(rlnMessage)
|
|
}
|
|
|
|
func (c *Chat) discoverNodes(connectionWg *sync.WaitGroup) {
|
|
defer c.wg.Done()
|
|
defer connectionWg.Done()
|
|
|
|
<-c.uiReady // wait until UI is ready
|
|
|
|
var dnsDiscoveryUrl string
|
|
if c.options.DNSDiscovery.Enable {
|
|
if c.options.Fleet != fleetNone {
|
|
if c.options.Fleet == fleetTest {
|
|
dnsDiscoveryUrl = "enrtree://AOGYWMBYOUIMOENHXCHILPKY3ZRFEULMFI4DOM442QSZ73TT2A7VI@test.waku.nodes.status.im"
|
|
} else {
|
|
// Connect to prod by default
|
|
dnsDiscoveryUrl = "enrtree://AIRVQ5DDA4FFWLRBCHJWUWOO6X6S4ZTZ5B667LQ6AJU6PEYDLRD5O@sandbox.waku.nodes.status.im"
|
|
}
|
|
}
|
|
|
|
if c.options.DNSDiscovery.URL != "" {
|
|
dnsDiscoveryUrl = c.options.DNSDiscovery.URL
|
|
}
|
|
}
|
|
|
|
if dnsDiscoveryUrl != "" {
|
|
c.ui.InfoMessage(fmt.Sprintf("attempting DNS discovery with %s", dnsDiscoveryUrl))
|
|
nodes, err := dnsdisc.RetrieveNodes(c.ctx, dnsDiscoveryUrl, dnsdisc.WithNameserver(c.options.DNSDiscovery.Nameserver))
|
|
if err != nil {
|
|
c.ui.ErrorMessage(errors.New(err.Error()))
|
|
} else {
|
|
var nodeList []peer.AddrInfo
|
|
for _, n := range nodes {
|
|
nodeList = append(nodeList, n.PeerInfo)
|
|
}
|
|
c.ui.InfoMessage(fmt.Sprintf("Discovered and connecting to %v ", nodeList))
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(len(nodeList))
|
|
for _, n := range nodeList {
|
|
go func(ctx context.Context, info peer.AddrInfo) {
|
|
defer wg.Done()
|
|
|
|
ctx, cancel := context.WithTimeout(ctx, time.Duration(20)*time.Second)
|
|
defer cancel()
|
|
err = c.node.DialPeerWithInfo(ctx, info)
|
|
if err != nil {
|
|
c.ui.ErrorMessage(fmt.Errorf("could not connect to %s: %w", info.ID.String(), err))
|
|
}
|
|
}(c.ctx, n)
|
|
}
|
|
wg.Wait()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Chat) disconnectFromPeers() []peer.ID {
|
|
disconnectedPeers := c.node.Host().Network().Peers()
|
|
for _, peerID := range disconnectedPeers {
|
|
c.node.Host().Network().ClosePeer(peerID)
|
|
}
|
|
return disconnectedPeers
|
|
}
|
|
|
|
func (c *Chat) reconnectToPeers(peers []peer.ID) {
|
|
for _, peerID := range peers {
|
|
// We're using a goroutine here to avoid blocking if a peer is unreachable
|
|
go func(p peer.ID) {
|
|
ctx, cancel := context.WithTimeout(c.ctx, 10*time.Second)
|
|
defer cancel()
|
|
if _, err := c.node.Host().Network().DialPeer(ctx, p); err != nil {
|
|
c.ui.ErrorMessage(fmt.Errorf("failed to reconnect to peer %s: %w", p, err))
|
|
} else {
|
|
c.ui.InfoMessage(fmt.Sprintf("Successfully reconnected to peer %s", p))
|
|
}
|
|
}(peerID)
|
|
}
|
|
}
|
|
|
|
func generateUniqueID() string {
|
|
return uuid.New().String()
|
|
}
|
|
|
|
func (c *Chat) getRecentMessageIDs(n int) []string {
|
|
c.mutex.Lock()
|
|
defer c.mutex.Unlock()
|
|
|
|
result := make([]string, 0, n)
|
|
for i := len(c.messageHistory) - 1; i >= 0 && len(result) < n; i-- {
|
|
result = append(result, c.messageHistory[i].MessageId)
|
|
}
|
|
return result
|
|
}
|
|
|
|
func (c *Chat) getStoreNodePID() (*peer.ID, error) {
|
|
pID, err := utils.GetPeerID(*c.options.Store.Node)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &pID, nil
|
|
}
|