2021-04-28 20:10:44 +00:00
|
|
|
|
package relay
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"errors"
|
2021-11-01 14:42:55 +00:00
|
|
|
|
"fmt"
|
2021-04-28 20:10:44 +00:00
|
|
|
|
"sync"
|
2023-06-06 18:09:44 +00:00
|
|
|
|
"time"
|
2021-04-28 20:10:44 +00:00
|
|
|
|
|
2023-07-05 19:17:43 +00:00
|
|
|
|
"github.com/libp2p/go-libp2p/core/event"
|
2022-10-19 19:39:32 +00:00
|
|
|
|
"github.com/libp2p/go-libp2p/core/host"
|
2023-06-06 18:09:44 +00:00
|
|
|
|
"github.com/libp2p/go-libp2p/core/peer"
|
2022-10-19 19:39:32 +00:00
|
|
|
|
"github.com/libp2p/go-libp2p/core/protocol"
|
2023-07-05 19:17:43 +00:00
|
|
|
|
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
|
2021-11-01 14:42:55 +00:00
|
|
|
|
"go.opencensus.io/stats"
|
|
|
|
|
"go.opencensus.io/tag"
|
2022-01-18 18:17:06 +00:00
|
|
|
|
"go.uber.org/zap"
|
2023-02-06 22:16:20 +00:00
|
|
|
|
proto "google.golang.org/protobuf/proto"
|
2021-04-28 20:10:44 +00:00
|
|
|
|
|
2021-11-01 14:42:55 +00:00
|
|
|
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
2021-07-29 22:08:53 +00:00
|
|
|
|
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
|
2022-11-03 13:53:33 +00:00
|
|
|
|
"github.com/waku-org/go-waku/logging"
|
2023-03-04 21:51:51 +00:00
|
|
|
|
"github.com/waku-org/go-waku/waku/v2/hash"
|
2022-11-09 19:53:01 +00:00
|
|
|
|
"github.com/waku-org/go-waku/waku/v2/metrics"
|
|
|
|
|
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
|
|
|
|
|
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
2022-12-09 03:08:04 +00:00
|
|
|
|
"github.com/waku-org/go-waku/waku/v2/timesource"
|
2021-04-28 20:10:44 +00:00
|
|
|
|
)
|
|
|
|
|
|
2023-07-19 16:25:35 +00:00
|
|
|
|
// WakuRelayID_v200 is the current protocol ID used for WakuRelay
|
2021-07-29 22:08:53 +00:00
|
|
|
|
const WakuRelayID_v200 = protocol.ID("/vac/waku/relay/2.0.0")
|
2021-11-10 14:28:45 +00:00
|
|
|
|
|
2023-07-19 16:25:35 +00:00
|
|
|
|
// DefaultWakuTopic is the default pubsub topic used across all Waku protocols
|
2021-11-19 16:19:48 +00:00
|
|
|
|
var DefaultWakuTopic string = waku_proto.DefaultPubsubTopic().String()
|
2021-04-28 20:10:44 +00:00
|
|
|
|
|
2023-07-19 16:25:35 +00:00
|
|
|
|
// WakuRelay is the implementation of the Waku Relay protocol
|
2021-04-28 20:10:44 +00:00
|
|
|
|
type WakuRelay struct {
|
2023-06-07 20:18:37 +00:00
|
|
|
|
host host.Host
|
|
|
|
|
opts []pubsub.Option
|
|
|
|
|
pubsub *pubsub.PubSub
|
|
|
|
|
params pubsub.GossipSubParams
|
|
|
|
|
peerScoreParams *pubsub.PeerScoreParams
|
|
|
|
|
peerScoreThresholds *pubsub.PeerScoreThresholds
|
|
|
|
|
topicParams *pubsub.TopicScoreParams
|
|
|
|
|
timesource timesource.Timesource
|
2021-04-28 20:10:44 +00:00
|
|
|
|
|
2022-05-30 15:55:30 +00:00
|
|
|
|
log *zap.Logger
|
2022-01-18 18:17:06 +00:00
|
|
|
|
|
2023-05-05 09:49:15 +00:00
|
|
|
|
bcaster Broadcaster
|
2021-11-06 22:46:58 +00:00
|
|
|
|
|
2021-12-06 08:43:00 +00:00
|
|
|
|
minPeersToPublish int
|
|
|
|
|
|
2021-11-06 22:46:58 +00:00
|
|
|
|
// TODO: convert to concurrent maps
|
2021-04-28 20:10:44 +00:00
|
|
|
|
topicsMutex sync.Mutex
|
2021-11-19 16:19:48 +00:00
|
|
|
|
wakuRelayTopics map[string]*pubsub.Topic
|
|
|
|
|
relaySubs map[string]*pubsub.Subscription
|
2021-11-01 14:42:55 +00:00
|
|
|
|
|
2023-07-05 19:17:43 +00:00
|
|
|
|
events event.Bus
|
|
|
|
|
emitters struct {
|
|
|
|
|
EvtRelaySubscribed event.Emitter
|
|
|
|
|
EvtRelayUnsubscribed event.Emitter
|
|
|
|
|
}
|
|
|
|
|
|
2023-02-08 19:20:42 +00:00
|
|
|
|
ctx context.Context
|
2023-02-08 14:20:40 +00:00
|
|
|
|
cancel context.CancelFunc
|
|
|
|
|
wg sync.WaitGroup
|
2021-07-29 22:08:53 +00:00
|
|
|
|
}
|
|
|
|
|
|
2023-07-05 19:17:43 +00:00
|
|
|
|
// EvtRelaySubscribed is an event emitted when a new subscription to a pubsub topic is created
|
|
|
|
|
type EvtRelaySubscribed struct {
|
|
|
|
|
Topic string
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// EvtRelayUnsubscribed is an event emitted when a subscription to a pubsub topic is closed
|
|
|
|
|
type EvtRelayUnsubscribed struct {
|
|
|
|
|
Topic string
|
|
|
|
|
}
|
|
|
|
|
|
2021-07-29 22:08:53 +00:00
|
|
|
|
func msgIdFn(pmsg *pubsub_pb.Message) string {
|
2023-03-04 21:51:51 +00:00
|
|
|
|
return string(hash.SHA256(pmsg.Data))
|
2021-04-28 20:10:44 +00:00
|
|
|
|
}
|
|
|
|
|
|
2022-05-04 21:08:24 +00:00
|
|
|
|
// NewWakuRelay returns a new instance of a WakuRelay struct
|
2023-05-05 09:49:15 +00:00
|
|
|
|
func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesource.Timesource, log *zap.Logger, opts ...pubsub.Option) *WakuRelay {
|
2021-04-28 20:10:44 +00:00
|
|
|
|
w := new(WakuRelay)
|
2022-12-09 03:08:04 +00:00
|
|
|
|
w.timesource = timesource
|
2021-11-19 16:19:48 +00:00
|
|
|
|
w.wakuRelayTopics = make(map[string]*pubsub.Topic)
|
|
|
|
|
w.relaySubs = make(map[string]*pubsub.Subscription)
|
2021-11-01 14:42:55 +00:00
|
|
|
|
w.bcaster = bcaster
|
2021-12-06 08:43:00 +00:00
|
|
|
|
w.minPeersToPublish = minPeersToPublish
|
2023-02-08 14:20:40 +00:00
|
|
|
|
w.wg = sync.WaitGroup{}
|
2022-01-18 18:17:06 +00:00
|
|
|
|
w.log = log.Named("relay")
|
2023-07-05 19:17:43 +00:00
|
|
|
|
w.events = eventbus.NewBus()
|
2021-07-29 22:08:53 +00:00
|
|
|
|
|
2023-06-06 18:09:44 +00:00
|
|
|
|
cfg := pubsub.DefaultGossipSubParams()
|
|
|
|
|
cfg.PruneBackoff = time.Minute
|
|
|
|
|
cfg.UnsubscribeBackoff = 5 * time.Second
|
|
|
|
|
cfg.GossipFactor = 0.25
|
|
|
|
|
cfg.D = 6
|
|
|
|
|
cfg.Dlo = 4
|
|
|
|
|
cfg.Dhi = 12
|
|
|
|
|
cfg.Dout = 3
|
|
|
|
|
cfg.Dlazy = 6
|
|
|
|
|
cfg.HeartbeatInterval = time.Second
|
|
|
|
|
cfg.HistoryLength = 6
|
|
|
|
|
cfg.HistoryGossip = 3
|
|
|
|
|
cfg.FanoutTTL = time.Minute
|
|
|
|
|
|
2023-06-07 20:18:37 +00:00
|
|
|
|
w.peerScoreParams = &pubsub.PeerScoreParams{
|
|
|
|
|
Topics: make(map[string]*pubsub.TopicScoreParams),
|
2023-06-06 18:09:44 +00:00
|
|
|
|
DecayInterval: 12 * time.Second, // how often peer scoring is updated
|
|
|
|
|
DecayToZero: 0.01, // below this we consider the parameter to be zero
|
|
|
|
|
RetainScore: 10 * time.Minute, // remember peer score during x after it disconnects
|
2023-06-07 20:18:37 +00:00
|
|
|
|
// p5: application specific, unset
|
2023-06-06 18:09:44 +00:00
|
|
|
|
AppSpecificScore: func(p peer.ID) float64 {
|
|
|
|
|
return 0
|
|
|
|
|
},
|
2023-06-07 20:18:37 +00:00
|
|
|
|
AppSpecificWeight: 0.0,
|
|
|
|
|
// p6: penalizes peers sharing more than threshold ips
|
|
|
|
|
IPColocationFactorWeight: -50,
|
|
|
|
|
IPColocationFactorThreshold: 5.0,
|
|
|
|
|
// p7: penalizes bad behaviour (weight and decay)
|
|
|
|
|
BehaviourPenaltyWeight: -10,
|
|
|
|
|
BehaviourPenaltyDecay: 0.986,
|
2023-06-06 18:09:44 +00:00
|
|
|
|
}
|
|
|
|
|
|
2023-06-07 20:18:37 +00:00
|
|
|
|
w.peerScoreThresholds = &pubsub.PeerScoreThresholds{
|
2023-06-06 18:09:44 +00:00
|
|
|
|
GossipThreshold: -100, // no gossip is sent to peers below this score
|
|
|
|
|
PublishThreshold: -1000, // no self-published msgs are sent to peers below this score
|
|
|
|
|
GraylistThreshold: -10000, // used to trigger disconnections + ignore peer if below this score
|
|
|
|
|
OpportunisticGraftThreshold: 0, // grafts better peers if the mesh median score drops below this. unset.
|
2023-06-07 20:18:37 +00:00
|
|
|
|
}
|
2023-06-06 18:09:44 +00:00
|
|
|
|
|
2023-06-07 20:18:37 +00:00
|
|
|
|
w.topicParams = &pubsub.TopicScoreParams{
|
|
|
|
|
TopicWeight: 1,
|
|
|
|
|
// p1: favours peers already in the mesh
|
|
|
|
|
TimeInMeshWeight: 0.01,
|
|
|
|
|
TimeInMeshQuantum: time.Second,
|
|
|
|
|
TimeInMeshCap: 10.0,
|
|
|
|
|
// p2: rewards fast peers
|
|
|
|
|
FirstMessageDeliveriesWeight: 1.0,
|
|
|
|
|
FirstMessageDeliveriesDecay: 0.5,
|
|
|
|
|
FirstMessageDeliveriesCap: 10.0,
|
|
|
|
|
// p3: penalizes lazy peers. safe low value
|
|
|
|
|
MeshMessageDeliveriesWeight: 0,
|
|
|
|
|
MeshMessageDeliveriesDecay: 0,
|
|
|
|
|
MeshMessageDeliveriesCap: 0,
|
|
|
|
|
MeshMessageDeliveriesThreshold: 0,
|
|
|
|
|
MeshMessageDeliveriesWindow: 0,
|
|
|
|
|
MeshMessageDeliveriesActivation: 0,
|
|
|
|
|
// p3b: tracks history of prunes
|
|
|
|
|
MeshFailurePenaltyWeight: 0,
|
|
|
|
|
MeshFailurePenaltyDecay: 0,
|
|
|
|
|
// p4: penalizes invalid messages. highly penalize peers sending wrong messages
|
|
|
|
|
InvalidMessageDeliveriesWeight: -100.0,
|
|
|
|
|
InvalidMessageDeliveriesDecay: 0.5,
|
2023-06-06 18:09:44 +00:00
|
|
|
|
}
|
|
|
|
|
|
2021-07-29 22:08:53 +00:00
|
|
|
|
// default options required by WakuRelay
|
2023-06-05 14:39:38 +00:00
|
|
|
|
w.opts = append([]pubsub.Option{
|
|
|
|
|
pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign),
|
|
|
|
|
pubsub.WithNoAuthor(),
|
|
|
|
|
pubsub.WithMessageIdFn(msgIdFn),
|
|
|
|
|
pubsub.WithGossipSubProtocols(
|
2023-06-28 18:04:51 +00:00
|
|
|
|
[]protocol.ID{WakuRelayID_v200, pubsub.GossipSubID_v11, pubsub.GossipSubID_v10, pubsub.FloodSubID},
|
2023-06-05 14:39:38 +00:00
|
|
|
|
func(feat pubsub.GossipSubFeature, proto protocol.ID) bool {
|
|
|
|
|
switch feat {
|
|
|
|
|
case pubsub.GossipSubFeatureMesh:
|
2023-06-29 13:29:08 +00:00
|
|
|
|
return proto == pubsub.GossipSubID_v11 || proto == pubsub.GossipSubID_v10 || proto == WakuRelayID_v200
|
2023-06-05 14:39:38 +00:00
|
|
|
|
case pubsub.GossipSubFeaturePX:
|
2023-06-29 13:29:08 +00:00
|
|
|
|
return proto == pubsub.GossipSubID_v11 || proto == WakuRelayID_v200
|
2023-06-05 14:39:38 +00:00
|
|
|
|
default:
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
),
|
2023-06-06 18:09:44 +00:00
|
|
|
|
pubsub.WithGossipSubParams(cfg),
|
|
|
|
|
pubsub.WithFloodPublish(true),
|
|
|
|
|
pubsub.WithSeenMessagesTTL(2 * time.Minute),
|
2023-06-07 20:18:37 +00:00
|
|
|
|
pubsub.WithPeerScore(w.peerScoreParams, w.peerScoreThresholds),
|
|
|
|
|
pubsub.WithPeerScoreInspect(w.peerScoreInspector, 6*time.Second),
|
2023-06-06 18:09:44 +00:00
|
|
|
|
pubsub.WithDefaultValidator(func(ctx context.Context, peerID peer.ID, message *pubsub.Message) bool {
|
|
|
|
|
msg := new(pb.WakuMessage)
|
|
|
|
|
err := proto.Unmarshal(message.Data, msg)
|
|
|
|
|
return err == nil
|
|
|
|
|
}),
|
2023-06-05 14:39:38 +00:00
|
|
|
|
}, opts...)
|
2021-04-28 20:10:44 +00:00
|
|
|
|
|
2023-01-06 22:37:57 +00:00
|
|
|
|
return w
|
|
|
|
|
}
|
|
|
|
|
|
2023-06-07 20:18:37 +00:00
|
|
|
|
func (w *WakuRelay) peerScoreInspector(peerScoresSnapshots map[peer.ID]*pubsub.PeerScoreSnapshot) {
|
|
|
|
|
if w.host == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for pid, snap := range peerScoresSnapshots {
|
|
|
|
|
if snap.Score < w.peerScoreThresholds.GraylistThreshold {
|
|
|
|
|
// Disconnect bad peers
|
|
|
|
|
err := w.host.Network().ClosePeer(pid)
|
|
|
|
|
if err != nil {
|
|
|
|
|
w.log.Error("could not disconnect peer", logging.HostID("peer", pid), zap.Error(err))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2023-07-19 16:25:35 +00:00
|
|
|
|
// SetHost sets the host to be able to mount or consume a protocol
|
2023-04-17 00:04:12 +00:00
|
|
|
|
func (w *WakuRelay) SetHost(h host.Host) {
|
|
|
|
|
w.host = h
|
|
|
|
|
}
|
|
|
|
|
|
2023-07-19 16:25:35 +00:00
|
|
|
|
// Start initiates the WakuRelay protocol
|
2023-01-06 22:37:57 +00:00
|
|
|
|
func (w *WakuRelay) Start(ctx context.Context) error {
|
2023-02-08 14:20:40 +00:00
|
|
|
|
w.wg.Wait()
|
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
2023-02-08 19:20:42 +00:00
|
|
|
|
w.ctx = ctx // TODO: create worker for creating subscriptions instead of storing context
|
2023-02-08 14:20:40 +00:00
|
|
|
|
w.cancel = cancel
|
|
|
|
|
|
2023-01-06 22:37:57 +00:00
|
|
|
|
ps, err := pubsub.NewGossipSub(ctx, w.host, w.opts...)
|
2021-04-28 20:10:44 +00:00
|
|
|
|
if err != nil {
|
2023-01-06 22:37:57 +00:00
|
|
|
|
return err
|
2021-04-28 20:10:44 +00:00
|
|
|
|
}
|
|
|
|
|
w.pubsub = ps
|
|
|
|
|
|
2023-07-05 19:17:43 +00:00
|
|
|
|
w.emitters.EvtRelaySubscribed, err = w.events.Emitter(new(EvtRelaySubscribed))
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
w.emitters.EvtRelayUnsubscribed, err = w.events.Emitter(new(EvtRelayUnsubscribed))
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
2022-01-18 18:17:06 +00:00
|
|
|
|
w.log.Info("Relay protocol started")
|
2023-01-06 22:37:57 +00:00
|
|
|
|
return nil
|
2021-04-28 20:10:44 +00:00
|
|
|
|
}
|
|
|
|
|
|
2022-05-04 21:08:24 +00:00
|
|
|
|
// PubSub returns the implementation of the pubsub system
|
2021-07-29 22:08:53 +00:00
|
|
|
|
func (w *WakuRelay) PubSub() *pubsub.PubSub {
|
2021-04-28 20:10:44 +00:00
|
|
|
|
return w.pubsub
|
|
|
|
|
}
|
|
|
|
|
|
2022-05-04 21:08:24 +00:00
|
|
|
|
// Topics returns a list of all the pubsub topics currently subscribed to
|
2021-11-19 16:19:48 +00:00
|
|
|
|
func (w *WakuRelay) Topics() []string {
|
2021-04-28 20:10:44 +00:00
|
|
|
|
defer w.topicsMutex.Unlock()
|
|
|
|
|
w.topicsMutex.Lock()
|
|
|
|
|
|
2021-11-19 16:19:48 +00:00
|
|
|
|
var result []string
|
|
|
|
|
for topic := range w.relaySubs {
|
2021-04-28 20:10:44 +00:00
|
|
|
|
result = append(result, topic)
|
|
|
|
|
}
|
|
|
|
|
return result
|
|
|
|
|
}
|
|
|
|
|
|
2023-07-19 16:25:35 +00:00
|
|
|
|
// IsSubscribed indicates whether the node is subscribed to a pubsub topic or not
|
2023-05-04 14:04:54 +00:00
|
|
|
|
func (w *WakuRelay) IsSubscribed(topic string) bool {
|
|
|
|
|
defer w.topicsMutex.Unlock()
|
|
|
|
|
w.topicsMutex.Lock()
|
|
|
|
|
_, ok := w.relaySubs[topic]
|
|
|
|
|
return ok
|
|
|
|
|
}
|
|
|
|
|
|
2022-05-30 15:55:30 +00:00
|
|
|
|
// SetPubSub is used to set an implementation of the pubsub system
|
2021-07-29 22:08:53 +00:00
|
|
|
|
func (w *WakuRelay) SetPubSub(pubSub *pubsub.PubSub) {
|
2021-04-28 20:10:44 +00:00
|
|
|
|
w.pubsub = pubSub
|
|
|
|
|
}
|
|
|
|
|
|
2021-11-19 16:19:48 +00:00
|
|
|
|
func (w *WakuRelay) upsertTopic(topic string) (*pubsub.Topic, error) {
|
2021-04-28 20:10:44 +00:00
|
|
|
|
w.topicsMutex.Lock()
|
2023-07-05 19:17:43 +00:00
|
|
|
|
defer w.topicsMutex.Unlock()
|
2021-04-28 20:10:44 +00:00
|
|
|
|
|
|
|
|
|
pubSubTopic, ok := w.wakuRelayTopics[topic]
|
|
|
|
|
if !ok { // Joins topic if node hasn't joined yet
|
|
|
|
|
newTopic, err := w.pubsub.Join(string(topic))
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
2023-06-07 20:18:37 +00:00
|
|
|
|
|
|
|
|
|
err = newTopic.SetScoreParams(w.topicParams)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
2021-04-28 20:10:44 +00:00
|
|
|
|
w.wakuRelayTopics[topic] = newTopic
|
|
|
|
|
pubSubTopic = newTopic
|
|
|
|
|
}
|
|
|
|
|
return pubSubTopic, nil
|
|
|
|
|
}
|
|
|
|
|
|
2021-11-19 16:19:48 +00:00
|
|
|
|
func (w *WakuRelay) subscribe(topic string) (subs *pubsub.Subscription, err error) {
|
2021-04-28 20:10:44 +00:00
|
|
|
|
sub, ok := w.relaySubs[topic]
|
|
|
|
|
if !ok {
|
|
|
|
|
pubSubTopic, err := w.upsertTopic(topic)
|
|
|
|
|
if err != nil {
|
2021-11-01 14:42:55 +00:00
|
|
|
|
return nil, err
|
2021-04-28 20:10:44 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sub, err = pubSubTopic.Subscribe()
|
|
|
|
|
if err != nil {
|
2021-11-01 14:42:55 +00:00
|
|
|
|
return nil, err
|
2021-04-28 20:10:44 +00:00
|
|
|
|
}
|
2023-07-05 19:17:43 +00:00
|
|
|
|
|
2021-04-28 20:10:44 +00:00
|
|
|
|
w.relaySubs[topic] = sub
|
2023-07-05 19:17:43 +00:00
|
|
|
|
|
|
|
|
|
err = w.emitters.EvtRelaySubscribed.Emit(EvtRelaySubscribed{topic})
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
2023-05-05 15:04:08 +00:00
|
|
|
|
if w.bcaster != nil {
|
|
|
|
|
w.wg.Add(1)
|
|
|
|
|
go w.subscribeToTopic(topic, sub)
|
|
|
|
|
}
|
2022-07-24 20:51:42 +00:00
|
|
|
|
w.log.Info("subscribing to topic", zap.String("topic", sub.Topic()))
|
2021-04-28 20:10:44 +00:00
|
|
|
|
}
|
|
|
|
|
|
2021-11-01 14:42:55 +00:00
|
|
|
|
return sub, nil
|
2021-04-28 20:10:44 +00:00
|
|
|
|
}
|
|
|
|
|
|
2022-05-04 21:08:24 +00:00
|
|
|
|
// PublishToTopic is used to broadcast a WakuMessage to a pubsub topic
|
2021-11-20 00:03:05 +00:00
|
|
|
|
func (w *WakuRelay) PublishToTopic(ctx context.Context, message *pb.WakuMessage, topic string) ([]byte, error) {
|
2021-04-28 20:10:44 +00:00
|
|
|
|
// Publish a `WakuMessage` to a PubSub topic.
|
|
|
|
|
if w.pubsub == nil {
|
2021-07-29 12:40:54 +00:00
|
|
|
|
return nil, errors.New("PubSub hasn't been set")
|
2021-04-28 20:10:44 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if message == nil {
|
|
|
|
|
return nil, errors.New("message can't be null")
|
|
|
|
|
}
|
|
|
|
|
|
2021-12-06 08:43:00 +00:00
|
|
|
|
if !w.EnoughPeersToPublishToTopic(topic) {
|
2022-05-30 15:55:30 +00:00
|
|
|
|
return nil, errors.New("not enough peers to publish")
|
2021-12-06 08:43:00 +00:00
|
|
|
|
}
|
|
|
|
|
|
2021-11-19 20:01:52 +00:00
|
|
|
|
pubSubTopic, err := w.upsertTopic(topic)
|
2021-04-28 20:10:44 +00:00
|
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
out, err := proto.Marshal(message)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
err = pubSubTopic.Publish(ctx, out)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
2023-03-04 21:51:51 +00:00
|
|
|
|
hash := message.Hash(topic)
|
2021-04-28 20:10:44 +00:00
|
|
|
|
|
2023-05-03 14:23:07 +00:00
|
|
|
|
w.log.Debug("waku.relay published", zap.String("pubsubTopic", topic), logging.HexString("hash", hash), zap.Int64("publishTime", w.timesource.Now().UnixNano()), zap.Int("payloadSizeBytes", len(message.Payload)))
|
2022-11-03 13:53:33 +00:00
|
|
|
|
|
2021-04-28 20:10:44 +00:00
|
|
|
|
return hash, nil
|
|
|
|
|
}
|
|
|
|
|
|
2022-05-04 21:08:24 +00:00
|
|
|
|
// Publish is used to broadcast a WakuMessage to the default waku pubsub topic
|
2021-11-19 20:01:52 +00:00
|
|
|
|
func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage) ([]byte, error) {
|
2021-11-20 00:03:05 +00:00
|
|
|
|
return w.PublishToTopic(ctx, message, DefaultWakuTopic)
|
2021-04-28 20:10:44 +00:00
|
|
|
|
}
|
2021-10-11 22:45:54 +00:00
|
|
|
|
|
2022-05-04 21:08:24 +00:00
|
|
|
|
// Stop unmounts the relay protocol and stops all subscriptions
|
2021-10-11 22:45:54 +00:00
|
|
|
|
func (w *WakuRelay) Stop() {
|
2023-02-08 14:20:40 +00:00
|
|
|
|
if w.cancel == nil {
|
|
|
|
|
return // Not started
|
|
|
|
|
}
|
|
|
|
|
|
2021-10-11 22:45:54 +00:00
|
|
|
|
w.host.RemoveStreamHandler(WakuRelayID_v200)
|
2023-07-05 19:17:43 +00:00
|
|
|
|
w.emitters.EvtRelaySubscribed.Close()
|
|
|
|
|
w.emitters.EvtRelayUnsubscribed.Close()
|
2023-02-08 14:20:40 +00:00
|
|
|
|
w.cancel()
|
|
|
|
|
w.wg.Wait()
|
2021-11-01 14:42:55 +00:00
|
|
|
|
}
|
|
|
|
|
|
2022-05-04 21:08:24 +00:00
|
|
|
|
// EnoughPeersToPublish returns whether there are enough peers connected in the default waku pubsub topic
|
2021-12-06 08:43:00 +00:00
|
|
|
|
func (w *WakuRelay) EnoughPeersToPublish() bool {
|
|
|
|
|
return w.EnoughPeersToPublishToTopic(DefaultWakuTopic)
|
|
|
|
|
}
|
|
|
|
|
|
2022-05-04 21:08:24 +00:00
|
|
|
|
// EnoughPeersToPublish returns whether there are enough peers connected in a pubsub topic
|
2021-12-06 08:43:00 +00:00
|
|
|
|
func (w *WakuRelay) EnoughPeersToPublishToTopic(topic string) bool {
|
|
|
|
|
return len(w.PubSub().ListPeers(topic)) >= w.minPeersToPublish
|
|
|
|
|
}
|
|
|
|
|
|
2022-05-04 21:08:24 +00:00
|
|
|
|
// SubscribeToTopic returns a Subscription to receive messages from a pubsub topic
|
2021-11-20 00:03:05 +00:00
|
|
|
|
func (w *WakuRelay) SubscribeToTopic(ctx context.Context, topic string) (*Subscription, error) {
|
2023-05-05 09:49:15 +00:00
|
|
|
|
_, err := w.subscribe(topic)
|
2021-11-01 14:42:55 +00:00
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Create client subscription
|
2023-05-05 09:49:15 +00:00
|
|
|
|
subscription := NoopSubscription()
|
2021-11-01 14:42:55 +00:00
|
|
|
|
if w.bcaster != nil {
|
2023-05-05 15:05:44 +00:00
|
|
|
|
subscription = w.bcaster.Register(topic, 1024)
|
2021-11-01 14:42:55 +00:00
|
|
|
|
}
|
2023-05-05 09:49:15 +00:00
|
|
|
|
go func() {
|
|
|
|
|
<-ctx.Done()
|
|
|
|
|
subscription.Unsubscribe()
|
|
|
|
|
}()
|
|
|
|
|
return &subscription, nil
|
2021-11-01 14:42:55 +00:00
|
|
|
|
}
|
|
|
|
|
|
2022-05-04 21:08:24 +00:00
|
|
|
|
// SubscribeToTopic returns a Subscription to receive messages from the default waku pubsub topic
|
2021-11-19 20:01:52 +00:00
|
|
|
|
func (w *WakuRelay) Subscribe(ctx context.Context) (*Subscription, error) {
|
2021-11-20 00:03:05 +00:00
|
|
|
|
return w.SubscribeToTopic(ctx, DefaultWakuTopic)
|
2021-11-19 20:01:52 +00:00
|
|
|
|
}
|
|
|
|
|
|
2022-05-04 21:08:24 +00:00
|
|
|
|
// Unsubscribe closes a subscription to a pubsub topic
|
2021-11-19 16:19:48 +00:00
|
|
|
|
func (w *WakuRelay) Unsubscribe(ctx context.Context, topic string) error {
|
2022-07-24 20:51:42 +00:00
|
|
|
|
sub, ok := w.relaySubs[topic]
|
|
|
|
|
if !ok {
|
|
|
|
|
return fmt.Errorf("not subscribed to topic")
|
2021-11-06 10:49:47 +00:00
|
|
|
|
}
|
2022-07-24 20:51:42 +00:00
|
|
|
|
w.log.Info("unsubscribing from topic", zap.String("topic", sub.Topic()))
|
2021-11-06 10:49:47 +00:00
|
|
|
|
|
|
|
|
|
w.relaySubs[topic].Cancel()
|
|
|
|
|
delete(w.relaySubs, topic)
|
|
|
|
|
|
|
|
|
|
err := w.wakuRelayTopics[topic].Close()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
delete(w.wakuRelayTopics, topic)
|
|
|
|
|
|
2023-07-05 19:17:43 +00:00
|
|
|
|
err = w.emitters.EvtRelayUnsubscribed.Emit(EvtRelayUnsubscribed{topic})
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
2021-11-06 10:49:47 +00:00
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2021-11-01 14:42:55 +00:00
|
|
|
|
func (w *WakuRelay) nextMessage(ctx context.Context, sub *pubsub.Subscription) <-chan *pubsub.Message {
|
|
|
|
|
msgChannel := make(chan *pubsub.Message, 1024)
|
2023-05-05 09:49:15 +00:00
|
|
|
|
go func() {
|
|
|
|
|
defer close(msgChannel)
|
2021-11-01 14:42:55 +00:00
|
|
|
|
for {
|
|
|
|
|
msg, err := sub.Next(ctx)
|
|
|
|
|
if err != nil {
|
2023-01-06 22:37:57 +00:00
|
|
|
|
if !errors.Is(err, context.Canceled) {
|
|
|
|
|
w.log.Error("getting message from subscription", zap.Error(err))
|
|
|
|
|
}
|
2021-11-01 14:42:55 +00:00
|
|
|
|
sub.Cancel()
|
2023-05-05 09:49:15 +00:00
|
|
|
|
return
|
2021-11-01 14:42:55 +00:00
|
|
|
|
}
|
|
|
|
|
msgChannel <- msg
|
|
|
|
|
}
|
2023-05-05 09:49:15 +00:00
|
|
|
|
}()
|
2021-11-01 14:42:55 +00:00
|
|
|
|
return msgChannel
|
|
|
|
|
}
|
|
|
|
|
|
2023-05-05 09:49:15 +00:00
|
|
|
|
func (w *WakuRelay) subscribeToTopic(pubsubTopic string, sub *pubsub.Subscription) {
|
2023-02-08 19:20:42 +00:00
|
|
|
|
defer w.wg.Done()
|
|
|
|
|
|
|
|
|
|
ctx, err := tag.New(w.ctx, tag.Insert(metrics.KeyType, "relay"))
|
2021-11-01 14:42:55 +00:00
|
|
|
|
if err != nil {
|
2022-05-30 15:55:30 +00:00
|
|
|
|
w.log.Error("creating tag map", zap.Error(err))
|
2021-11-01 14:42:55 +00:00
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
2023-02-08 19:20:42 +00:00
|
|
|
|
subChannel := w.nextMessage(w.ctx, sub)
|
2021-11-01 14:42:55 +00:00
|
|
|
|
for {
|
|
|
|
|
select {
|
2023-02-08 19:20:42 +00:00
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
return
|
2021-11-01 14:42:55 +00:00
|
|
|
|
// TODO: if there are no more relay subscriptions, close the pubsub subscription
|
2023-05-05 09:49:15 +00:00
|
|
|
|
case msg, ok := <-subChannel:
|
|
|
|
|
if !ok {
|
2021-11-06 10:49:47 +00:00
|
|
|
|
return
|
|
|
|
|
}
|
2023-04-04 18:01:44 +00:00
|
|
|
|
wakuMessage := &pb.WakuMessage{}
|
|
|
|
|
if err := proto.Unmarshal(msg.Data, wakuMessage); err != nil {
|
|
|
|
|
w.log.Error("decoding message", zap.Error(err))
|
|
|
|
|
return
|
|
|
|
|
}
|
2021-11-01 14:42:55 +00:00
|
|
|
|
|
2023-05-03 14:23:07 +00:00
|
|
|
|
payloadSizeInBytes := len(wakuMessage.Payload)
|
|
|
|
|
payloadSizeInKb := payloadSizeInBytes / 1000
|
|
|
|
|
stats.Record(ctx, metrics.Messages.M(1), metrics.MessageSize.M(int64(payloadSizeInKb)))
|
2023-04-25 18:48:47 +00:00
|
|
|
|
|
2023-04-04 18:01:44 +00:00
|
|
|
|
envelope := waku_proto.NewEnvelope(wakuMessage, w.timesource.Now().UnixNano(), pubsubTopic)
|
2023-05-03 14:23:07 +00:00
|
|
|
|
w.log.Debug("waku.relay received", zap.String("pubsubTopic", pubsubTopic), logging.HexString("hash", envelope.Hash()), zap.Int64("receivedTime", envelope.Index().ReceiverTime), zap.Int("payloadSizeBytes", payloadSizeInBytes))
|
2022-11-03 13:53:33 +00:00
|
|
|
|
|
2021-11-01 14:42:55 +00:00
|
|
|
|
if w.bcaster != nil {
|
|
|
|
|
w.bcaster.Submit(envelope)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2023-06-05 14:39:38 +00:00
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
2023-07-19 16:25:35 +00:00
|
|
|
|
// Params returns the gossipsub configuration parameters used by WakuRelay
|
2023-06-05 14:39:38 +00:00
|
|
|
|
func (w *WakuRelay) Params() pubsub.GossipSubParams {
|
|
|
|
|
return w.params
|
2021-10-11 22:45:54 +00:00
|
|
|
|
}
|
2023-07-05 19:17:43 +00:00
|
|
|
|
|
|
|
|
|
// Events returns the event bus on which WakuRelay events will be emitted
|
|
|
|
|
func (w *WakuRelay) Events() event.Bus {
|
|
|
|
|
return w.events
|
|
|
|
|
}
|