mirror of
https://github.com/status-im/go-waku.git
synced 2025-01-15 08:14:33 +00:00
536 lines
14 KiB
Go
536 lines
14 KiB
Go
package main
|
|
|
|
import (
|
|
"chat2/pb"
|
|
"context"
|
|
"encoding/hex"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/libp2p/go-libp2p/core/peer"
|
|
"github.com/multiformats/go-multiaddr"
|
|
"github.com/waku-org/go-waku/waku/v2/dnsdisc"
|
|
"github.com/waku-org/go-waku/waku/v2/node"
|
|
"github.com/waku-org/go-waku/waku/v2/payload"
|
|
"github.com/waku-org/go-waku/waku/v2/protocol"
|
|
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
|
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_store"
|
|
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
|
|
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
|
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
|
wrln "github.com/waku-org/go-waku/waku/v2/protocol/rln"
|
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
|
"google.golang.org/protobuf/proto"
|
|
)
|
|
|
|
// Chat represents a subscription to a single PubSub topic. Messages
|
|
// can be published to the topic with Chat.Publish, and received
|
|
// messages are pushed to the Messages channel.
|
|
type Chat struct {
|
|
ctx context.Context
|
|
wg sync.WaitGroup
|
|
node *node.WakuNode
|
|
ui UI
|
|
uiReady chan struct{}
|
|
inputChan chan string
|
|
options Options
|
|
|
|
C chan *protocol.Envelope
|
|
|
|
nick string
|
|
}
|
|
|
|
func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node.PeerConnection, options Options) *Chat {
|
|
chat := &Chat{
|
|
ctx: ctx,
|
|
node: node,
|
|
options: options,
|
|
nick: options.Nickname,
|
|
uiReady: make(chan struct{}, 1),
|
|
inputChan: make(chan string, 100),
|
|
}
|
|
|
|
chat.ui = NewUIModel(chat.uiReady, chat.inputChan)
|
|
|
|
topics := options.Relay.Topics.Value()
|
|
if len(topics) == 0 {
|
|
topics = append(topics, relay.DefaultWakuTopic)
|
|
}
|
|
|
|
if options.Filter.Enable {
|
|
cf := protocol.ContentFilter{
|
|
PubsubTopic: relay.DefaultWakuTopic,
|
|
ContentTopics: protocol.NewContentTopicSet(options.ContentTopic),
|
|
}
|
|
var filterOpt filter.FilterSubscribeOption
|
|
peerID, err := options.Filter.NodePeerID()
|
|
if err != nil {
|
|
filterOpt = filter.WithAutomaticPeerSelection()
|
|
} else {
|
|
filterOpt = filter.WithPeer(peerID)
|
|
chat.ui.InfoMessage(fmt.Sprintf("Subscribing to filter node %s", peerID))
|
|
}
|
|
theFilters, err := node.FilterLightnode().Subscribe(ctx, cf, filterOpt)
|
|
if err != nil {
|
|
chat.ui.ErrorMessage(err)
|
|
} else {
|
|
chat.C = theFilters[0].C //Picking first subscription since there is only 1 contentTopic specified.
|
|
}
|
|
} else {
|
|
|
|
for _, topic := range topics {
|
|
sub, err := node.Relay().Subscribe(ctx, protocol.NewContentFilter(topic))
|
|
if err != nil {
|
|
chat.ui.ErrorMessage(err)
|
|
} else {
|
|
chat.C = make(chan *protocol.Envelope)
|
|
go func() {
|
|
for e := range sub[0].Ch {
|
|
chat.C <- e
|
|
}
|
|
}()
|
|
}
|
|
}
|
|
}
|
|
|
|
chat.wg.Add(7)
|
|
go chat.parseInput()
|
|
go chat.receiveMessages()
|
|
|
|
connectionWg := sync.WaitGroup{}
|
|
connectionWg.Add(2)
|
|
|
|
go chat.welcomeMessage()
|
|
|
|
go chat.connectionWatcher(&connectionWg, connNotifier)
|
|
go chat.staticNodes(&connectionWg)
|
|
go chat.discoverNodes(&connectionWg)
|
|
go chat.retrieveHistory(&connectionWg)
|
|
|
|
return chat
|
|
}
|
|
|
|
func (c *Chat) Stop() {
|
|
c.wg.Wait()
|
|
close(c.inputChan)
|
|
}
|
|
|
|
func (c *Chat) connectionWatcher(connectionWg *sync.WaitGroup, connNotifier <-chan node.PeerConnection) {
|
|
defer c.wg.Done()
|
|
|
|
for conn := range connNotifier {
|
|
if conn.Connected {
|
|
c.ui.InfoMessage(fmt.Sprintf("Peer %s connected", conn.PeerID.String()))
|
|
} else {
|
|
c.ui.InfoMessage(fmt.Sprintf("Peer %s disconnected", conn.PeerID.String()))
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Chat) receiveMessages() {
|
|
defer c.wg.Done()
|
|
for {
|
|
select {
|
|
case <-c.ctx.Done():
|
|
return
|
|
case value := <-c.C:
|
|
|
|
msgContentTopic := value.Message().ContentTopic
|
|
if msgContentTopic != c.options.ContentTopic {
|
|
continue // Discard messages from other topics
|
|
}
|
|
|
|
msg, err := decodeMessage(c.options.ContentTopic, value.Message())
|
|
if err == nil {
|
|
// send valid messages to the UI
|
|
c.ui.ChatMessage(int64(msg.Timestamp), msg.Nick, string(msg.Payload))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
func (c *Chat) parseInput() {
|
|
defer c.wg.Done()
|
|
for {
|
|
select {
|
|
case <-c.ctx.Done():
|
|
return
|
|
case line := <-c.inputChan:
|
|
c.ui.SetSending(true)
|
|
go func() {
|
|
defer c.ui.SetSending(false)
|
|
|
|
// bail if requested
|
|
if line == "/exit" {
|
|
c.ui.Quit()
|
|
fmt.Println("Bye!")
|
|
return
|
|
}
|
|
|
|
// add peer
|
|
if strings.HasPrefix(line, "/connect") {
|
|
peer := strings.TrimPrefix(line, "/connect ")
|
|
c.wg.Add(1)
|
|
go func(peer string) {
|
|
defer c.wg.Done()
|
|
|
|
ma, err := multiaddr.NewMultiaddr(peer)
|
|
if err != nil {
|
|
c.ui.ErrorMessage(err)
|
|
return
|
|
}
|
|
|
|
peerID, err := ma.ValueForProtocol(multiaddr.P_P2P)
|
|
if err != nil {
|
|
c.ui.ErrorMessage(err)
|
|
return
|
|
}
|
|
|
|
c.ui.InfoMessage(fmt.Sprintf("Connecting to peer: %s", peerID))
|
|
ctx, cancel := context.WithTimeout(c.ctx, time.Duration(10)*time.Second)
|
|
defer cancel()
|
|
|
|
err = c.node.DialPeerWithMultiAddress(ctx, ma)
|
|
if err != nil {
|
|
c.ui.ErrorMessage(err)
|
|
}
|
|
}(peer)
|
|
return
|
|
}
|
|
|
|
// list peers
|
|
if line == "/peers" {
|
|
peers := c.node.Host().Network().Peers()
|
|
if len(peers) == 0 {
|
|
c.ui.InfoMessage("No peers available")
|
|
} else {
|
|
peerInfoMsg := "Peers: \n"
|
|
for _, p := range peers {
|
|
peerInfo := c.node.Host().Peerstore().PeerInfo(p)
|
|
peerProtocols, err := c.node.Host().Peerstore().GetProtocols(p)
|
|
if err != nil {
|
|
c.ui.ErrorMessage(err)
|
|
return
|
|
}
|
|
peerInfoMsg += fmt.Sprintf("• %s:\n", p.String())
|
|
|
|
var strProtocols []string
|
|
for _, p := range peerProtocols {
|
|
strProtocols = append(strProtocols, string(p))
|
|
}
|
|
|
|
peerInfoMsg += fmt.Sprintf(" Protocols: %s\n", strings.Join(strProtocols, ", "))
|
|
peerInfoMsg += " Addresses:\n"
|
|
for _, addr := range peerInfo.Addrs {
|
|
peerInfoMsg += fmt.Sprintf(" - %s/p2p/%s\n", addr.String(), p.String())
|
|
}
|
|
}
|
|
c.ui.InfoMessage(peerInfoMsg)
|
|
}
|
|
return
|
|
}
|
|
|
|
// change nick
|
|
if strings.HasPrefix(line, "/nick") {
|
|
newNick := strings.TrimSpace(strings.TrimPrefix(line, "/nick "))
|
|
if newNick != "" {
|
|
c.nick = newNick
|
|
} else {
|
|
c.ui.ErrorMessage(errors.New("invalid nickname"))
|
|
}
|
|
return
|
|
}
|
|
|
|
if line == "/help" {
|
|
c.ui.InfoMessage(`Available commands:
|
|
/connect multiaddress - dials a node adding it to the list of connected peers
|
|
/peers - list of peers connected to this node
|
|
/nick newNick - change the user's nickname
|
|
/exit - closes the app`)
|
|
return
|
|
}
|
|
|
|
c.SendMessage(line)
|
|
}()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Chat) SendMessage(line string) {
|
|
tCtx, cancel := context.WithTimeout(c.ctx, 3*time.Second)
|
|
defer func() {
|
|
cancel()
|
|
}()
|
|
|
|
err := c.publish(tCtx, line)
|
|
if err != nil {
|
|
if err.Error() == "validation failed" {
|
|
err = errors.New("message rate violation")
|
|
}
|
|
c.ui.ErrorMessage(err)
|
|
}
|
|
}
|
|
|
|
func (c *Chat) publish(ctx context.Context, message string) error {
|
|
msg := &pb.Chat2Message{
|
|
Timestamp: uint64(c.node.Timesource().Now().Unix()),
|
|
Nick: c.nick,
|
|
Payload: []byte(message),
|
|
}
|
|
|
|
msgBytes, err := proto.Marshal(msg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
version := uint32(0)
|
|
timestamp := utils.GetUnixEpochFrom(c.node.Timesource().Now())
|
|
keyInfo := &payload.KeyInfo{
|
|
Kind: payload.None,
|
|
}
|
|
|
|
p := new(payload.Payload)
|
|
p.Data = msgBytes
|
|
p.Key = keyInfo
|
|
|
|
payload, err := p.Encode(version)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
wakuMsg := &wpb.WakuMessage{
|
|
Payload: payload,
|
|
Version: proto.Uint32(version),
|
|
ContentTopic: options.ContentTopic,
|
|
Timestamp: timestamp,
|
|
}
|
|
|
|
if c.options.RLNRelay.Enable {
|
|
// for future version when we support more than one rln protected content topic,
|
|
// we should check the message content topic as well
|
|
err = c.node.RLNRelay().AppendRLNProof(wakuMsg, c.node.Timesource().Now())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
rateLimitProof, err := wrln.BytesToRateLimitProof(wakuMsg.RateLimitProof)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
c.ui.InfoMessage(fmt.Sprintf("RLN Epoch: %d", rateLimitProof.Epoch.Uint64()))
|
|
}
|
|
|
|
if c.options.LightPush.Enable {
|
|
lightOpt := []lightpush.RequestOption{lightpush.WithDefaultPubsubTopic()}
|
|
var peerID peer.ID
|
|
peerID, err = options.LightPush.NodePeerID()
|
|
if err != nil {
|
|
lightOpt = append(lightOpt, lightpush.WithAutomaticPeerSelection())
|
|
} else {
|
|
lightOpt = append(lightOpt, lightpush.WithPeer(peerID))
|
|
}
|
|
|
|
_, err = c.node.Lightpush().Publish(c.ctx, wakuMsg, lightOpt...)
|
|
} else {
|
|
_, err = c.node.Relay().Publish(ctx, wakuMsg, relay.WithDefaultPubsubTopic())
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func decodeMessage(contentTopic string, wakumsg *wpb.WakuMessage) (*pb.Chat2Message, error) {
|
|
keyInfo := &payload.KeyInfo{
|
|
Kind: payload.None,
|
|
}
|
|
|
|
payload, err := payload.DecodePayload(wakumsg, keyInfo)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
msg := &pb.Chat2Message{}
|
|
if err := proto.Unmarshal(payload.Data, msg); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return msg, nil
|
|
}
|
|
|
|
func (c *Chat) retrieveHistory(connectionWg *sync.WaitGroup) {
|
|
defer c.wg.Done()
|
|
|
|
connectionWg.Wait() // Wait until node connection operations are done
|
|
|
|
if !c.options.Store.Enable {
|
|
return
|
|
}
|
|
|
|
var storeOpt legacy_store.HistoryRequestOption
|
|
if c.options.Store.Node == nil {
|
|
c.ui.InfoMessage("No store node configured. Choosing one at random...")
|
|
storeOpt = legacy_store.WithAutomaticPeerSelection()
|
|
} else {
|
|
peerID, err := (*c.options.Store.Node).ValueForProtocol(multiaddr.P_P2P)
|
|
if err != nil {
|
|
c.ui.ErrorMessage(err)
|
|
return
|
|
}
|
|
pID, err := peer.Decode(peerID)
|
|
if err != nil {
|
|
c.ui.ErrorMessage(err)
|
|
return
|
|
}
|
|
storeOpt = legacy_store.WithPeer(pID)
|
|
c.ui.InfoMessage(fmt.Sprintf("Querying historic messages from %s", peerID))
|
|
|
|
}
|
|
|
|
tCtx, cancel := context.WithTimeout(c.ctx, 10*time.Second)
|
|
defer cancel()
|
|
|
|
q := legacy_store.Query{
|
|
ContentTopics: []string{options.ContentTopic},
|
|
}
|
|
|
|
response, err := c.node.LegacyStore().Query(tCtx, q,
|
|
legacy_store.WithAutomaticRequestID(),
|
|
storeOpt,
|
|
legacy_store.WithPaging(false, 100))
|
|
|
|
if err != nil {
|
|
c.ui.ErrorMessage(fmt.Errorf("could not query storenode: %w", err))
|
|
} else {
|
|
if len(response.Messages) == 0 {
|
|
c.ui.InfoMessage("0 historic messages available")
|
|
} else {
|
|
for _, msg := range response.Messages {
|
|
c.C <- protocol.NewEnvelope(msg, msg.GetTimestamp(), relay.DefaultWakuTopic)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Chat) staticNodes(connectionWg *sync.WaitGroup) {
|
|
defer c.wg.Done()
|
|
defer connectionWg.Done()
|
|
|
|
<-c.uiReady // wait until UI is ready
|
|
|
|
wg := sync.WaitGroup{}
|
|
|
|
wg.Add(len(options.StaticNodes))
|
|
for _, n := range options.StaticNodes {
|
|
go func(addr multiaddr.Multiaddr) {
|
|
defer wg.Done()
|
|
ctx, cancel := context.WithTimeout(c.ctx, time.Duration(10)*time.Second)
|
|
defer cancel()
|
|
|
|
c.ui.InfoMessage(fmt.Sprintf("Connecting to %s", addr.String()))
|
|
|
|
err := c.node.DialPeerWithMultiAddress(ctx, addr)
|
|
if err != nil {
|
|
c.ui.ErrorMessage(err)
|
|
}
|
|
}(n)
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
}
|
|
|
|
func (c *Chat) welcomeMessage() {
|
|
defer c.wg.Done()
|
|
|
|
<-c.uiReady // wait until UI is ready
|
|
|
|
c.ui.InfoMessage("Welcome, " + c.nick + "!")
|
|
c.ui.InfoMessage("type /help to see available commands \n")
|
|
|
|
addrMessage := "Listening on:\n"
|
|
for _, addr := range c.node.ListenAddresses() {
|
|
addrMessage += " -" + addr.String() + "\n"
|
|
}
|
|
c.ui.InfoMessage(addrMessage)
|
|
|
|
if !c.options.RLNRelay.Enable {
|
|
return
|
|
}
|
|
|
|
credential, err := c.node.RLNRelay().IdentityCredential()
|
|
if err != nil {
|
|
c.ui.Quit()
|
|
fmt.Println(err.Error())
|
|
}
|
|
|
|
idx := c.node.RLNRelay().MembershipIndex()
|
|
|
|
idTrapdoor := credential.IDTrapdoor
|
|
idNullifier := credential.IDSecretHash
|
|
idSecretHash := credential.IDSecretHash
|
|
idCommitment := credential.IDCommitment
|
|
|
|
rlnMessage := "RLN config:\n"
|
|
rlnMessage += fmt.Sprintf("- Your membership index is: %d\n", idx)
|
|
rlnMessage += fmt.Sprintf("- Your rln identity trapdoor is: 0x%s\n", hex.EncodeToString(idTrapdoor[:]))
|
|
rlnMessage += fmt.Sprintf("- Your rln identity nullifier is: 0x%s\n", hex.EncodeToString(idNullifier[:]))
|
|
rlnMessage += fmt.Sprintf("- Your rln identity secret hash is: 0x%s\n", hex.EncodeToString(idSecretHash[:]))
|
|
rlnMessage += fmt.Sprintf("- Your rln identity commitment key is: 0x%s\n", hex.EncodeToString(idCommitment[:]))
|
|
|
|
c.ui.InfoMessage(rlnMessage)
|
|
}
|
|
|
|
func (c *Chat) discoverNodes(connectionWg *sync.WaitGroup) {
|
|
defer c.wg.Done()
|
|
defer connectionWg.Done()
|
|
|
|
<-c.uiReady // wait until UI is ready
|
|
|
|
var dnsDiscoveryUrl string
|
|
if options.Fleet != fleetNone {
|
|
if options.Fleet == fleetTest {
|
|
dnsDiscoveryUrl = "enrtree://AOGYWMBYOUIMOENHXCHILPKY3ZRFEULMFI4DOM442QSZ73TT2A7VI@test.waku.nodes.status.im"
|
|
} else {
|
|
// Connect to prod by default
|
|
dnsDiscoveryUrl = "enrtree://AIRVQ5DDA4FFWLRBCHJWUWOO6X6S4ZTZ5B667LQ6AJU6PEYDLRD5O@sandbox.waku.nodes.status.im"
|
|
}
|
|
}
|
|
|
|
if options.DNSDiscovery.Enable && options.DNSDiscovery.URL != "" {
|
|
dnsDiscoveryUrl = options.DNSDiscovery.URL
|
|
}
|
|
|
|
if dnsDiscoveryUrl != "" {
|
|
c.ui.InfoMessage(fmt.Sprintf("attempting DNS discovery with %s", dnsDiscoveryUrl))
|
|
nodes, err := dnsdisc.RetrieveNodes(c.ctx, dnsDiscoveryUrl, dnsdisc.WithNameserver(options.DNSDiscovery.Nameserver))
|
|
if err != nil {
|
|
c.ui.ErrorMessage(errors.New(err.Error()))
|
|
} else {
|
|
var nodeList []peer.AddrInfo
|
|
for _, n := range nodes {
|
|
nodeList = append(nodeList, n.PeerInfo)
|
|
}
|
|
c.ui.InfoMessage(fmt.Sprintf("Discovered and connecting to %v ", nodeList))
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(len(nodeList))
|
|
for _, n := range nodeList {
|
|
go func(ctx context.Context, info peer.AddrInfo) {
|
|
defer wg.Done()
|
|
|
|
ctx, cancel := context.WithTimeout(ctx, time.Duration(10)*time.Second)
|
|
defer cancel()
|
|
err = c.node.DialPeerWithInfo(ctx, info)
|
|
if err != nil {
|
|
|
|
c.ui.ErrorMessage(fmt.Errorf("co!!uld not connect to %s: %w", info.ID.String(), err))
|
|
}
|
|
}(c.ctx, n)
|
|
|
|
}
|
|
wg.Wait()
|
|
}
|
|
}
|
|
}
|