go-waku/waku/v2/protocol/relay/waku_relay.go
2023-06-09 14:57:43 -04:00

398 lines
11 KiB
Go
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package relay
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.uber.org/zap"
proto "google.golang.org/protobuf/proto"
pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/hash"
"github.com/waku-org/go-waku/waku/v2/metrics"
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/timesource"
)
const WakuRelayID_v200 = protocol.ID("/vac/waku/relay/2.0.0")
var DefaultWakuTopic string = waku_proto.DefaultPubsubTopic().String()
type WakuRelay struct {
host host.Host
opts []pubsub.Option
pubsub *pubsub.PubSub
params pubsub.GossipSubParams
timesource timesource.Timesource
log *zap.Logger
bcaster Broadcaster
minPeersToPublish int
// TODO: convert to concurrent maps
topicsMutex sync.Mutex
wakuRelayTopics map[string]*pubsub.Topic
relaySubs map[string]*pubsub.Subscription
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
func msgIdFn(pmsg *pubsub_pb.Message) string {
return string(hash.SHA256(pmsg.Data))
}
// NewWakuRelay returns a new instance of a WakuRelay struct
func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesource.Timesource, log *zap.Logger, opts ...pubsub.Option) *WakuRelay {
w := new(WakuRelay)
w.timesource = timesource
w.wakuRelayTopics = make(map[string]*pubsub.Topic)
w.relaySubs = make(map[string]*pubsub.Subscription)
w.bcaster = bcaster
w.minPeersToPublish = minPeersToPublish
w.wg = sync.WaitGroup{}
w.log = log.Named("relay")
cfg := pubsub.DefaultGossipSubParams()
cfg.PruneBackoff = time.Minute
cfg.UnsubscribeBackoff = 5 * time.Second
cfg.GossipFactor = 0.25
cfg.D = 6
cfg.Dlo = 4
cfg.Dhi = 12
cfg.Dout = 3
cfg.Dlazy = 6
cfg.HeartbeatInterval = time.Second
cfg.HistoryLength = 6
cfg.HistoryGossip = 3
cfg.FanoutTTL = time.Minute
peerScoreParams := &pubsub.PeerScoreParams{
DecayInterval: 12 * time.Second, // how often peer scoring is updated
DecayToZero: 0.01, // below this we consider the parameter to be zero
RetainScore: 10 * time.Minute, // remember peer score during x after it disconnects
AppSpecificScore: func(p peer.ID) float64 {
return 0
},
AppSpecificWeight: 0.0, // p5: application specific, unset
IPColocationFactorWeight: -50, // p6: penalizes peers sharing more than threshold ips
IPColocationFactorThreshold: 5.0, //
BehaviourPenaltyWeight: -10, // p7: penalizes bad behaviour (weight and decay)
BehaviourPenaltyDecay: 0.986,
}
peerScoreThresholds := &pubsub.PeerScoreThresholds{
GossipThreshold: -100, // no gossip is sent to peers below this score
PublishThreshold: -1000, // no self-published msgs are sent to peers below this score
GraylistThreshold: -10000, // used to trigger disconnections + ignore peer if below this score
OpportunisticGraftThreshold: 0, // grafts better peers if the mesh median score drops below this. unset.
}
// default options required by WakuRelay
w.opts = append([]pubsub.Option{
pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign),
pubsub.WithNoAuthor(),
pubsub.WithMessageIdFn(msgIdFn),
pubsub.WithGossipSubProtocols(
[]protocol.ID{pubsub.GossipSubID_v11, pubsub.GossipSubID_v10, pubsub.FloodSubID, WakuRelayID_v200},
func(feat pubsub.GossipSubFeature, proto protocol.ID) bool {
switch feat {
case pubsub.GossipSubFeatureMesh:
return proto == pubsub.GossipSubID_v11 || proto == pubsub.GossipSubID_v10
case pubsub.GossipSubFeaturePX:
return proto == pubsub.GossipSubID_v11
default:
return false
}
},
),
pubsub.WithGossipSubParams(cfg),
pubsub.WithFloodPublish(true),
pubsub.WithSeenMessagesTTL(2 * time.Minute),
pubsub.WithPeerScore(peerScoreParams, peerScoreThresholds),
pubsub.WithDefaultValidator(func(ctx context.Context, peerID peer.ID, message *pubsub.Message) bool {
msg := new(pb.WakuMessage)
err := proto.Unmarshal(message.Data, msg)
return err == nil
}),
}, opts...)
return w
}
// Sets the host to be able to mount or consume a protocol
func (w *WakuRelay) SetHost(h host.Host) {
w.host = h
}
func (w *WakuRelay) Start(ctx context.Context) error {
w.wg.Wait()
ctx, cancel := context.WithCancel(ctx)
w.ctx = ctx // TODO: create worker for creating subscriptions instead of storing context
w.cancel = cancel
ps, err := pubsub.NewGossipSub(ctx, w.host, w.opts...)
if err != nil {
return err
}
w.pubsub = ps
w.log.Info("Relay protocol started")
return nil
}
// PubSub returns the implementation of the pubsub system
func (w *WakuRelay) PubSub() *pubsub.PubSub {
return w.pubsub
}
// Topics returns a list of all the pubsub topics currently subscribed to
func (w *WakuRelay) Topics() []string {
defer w.topicsMutex.Unlock()
w.topicsMutex.Lock()
var result []string
for topic := range w.relaySubs {
result = append(result, topic)
}
return result
}
func (w *WakuRelay) IsSubscribed(topic string) bool {
defer w.topicsMutex.Unlock()
w.topicsMutex.Lock()
_, ok := w.relaySubs[topic]
return ok
}
// SetPubSub is used to set an implementation of the pubsub system
func (w *WakuRelay) SetPubSub(pubSub *pubsub.PubSub) {
w.pubsub = pubSub
}
func (w *WakuRelay) upsertTopic(topic string) (*pubsub.Topic, error) {
defer w.topicsMutex.Unlock()
w.topicsMutex.Lock()
pubSubTopic, ok := w.wakuRelayTopics[topic]
if !ok { // Joins topic if node hasn't joined yet
newTopic, err := w.pubsub.Join(string(topic))
if err != nil {
return nil, err
}
w.wakuRelayTopics[topic] = newTopic
pubSubTopic = newTopic
}
return pubSubTopic, nil
}
func (w *WakuRelay) subscribe(topic string) (subs *pubsub.Subscription, err error) {
sub, ok := w.relaySubs[topic]
if !ok {
pubSubTopic, err := w.upsertTopic(topic)
if err != nil {
return nil, err
}
sub, err = pubSubTopic.Subscribe()
if err != nil {
return nil, err
}
w.relaySubs[topic] = sub
if w.bcaster != nil {
w.wg.Add(1)
go w.subscribeToTopic(topic, sub)
}
w.log.Info("subscribing to topic", zap.String("topic", sub.Topic()))
}
return sub, nil
}
// PublishToTopic is used to broadcast a WakuMessage to a pubsub topic
func (w *WakuRelay) PublishToTopic(ctx context.Context, message *pb.WakuMessage, topic string) ([]byte, error) {
// Publish a `WakuMessage` to a PubSub topic.
if w.pubsub == nil {
return nil, errors.New("PubSub hasn't been set")
}
if message == nil {
return nil, errors.New("message can't be null")
}
if !w.EnoughPeersToPublishToTopic(topic) {
return nil, errors.New("not enough peers to publish")
}
pubSubTopic, err := w.upsertTopic(topic)
if err != nil {
return nil, err
}
out, err := proto.Marshal(message)
if err != nil {
return nil, err
}
err = pubSubTopic.Publish(ctx, out)
if err != nil {
return nil, err
}
hash := message.Hash(topic)
w.log.Debug("waku.relay published", zap.String("pubsubTopic", topic), logging.HexString("hash", hash), zap.Int64("publishTime", w.timesource.Now().UnixNano()), zap.Int("payloadSizeBytes", len(message.Payload)))
return hash, nil
}
// Publish is used to broadcast a WakuMessage to the default waku pubsub topic
func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage) ([]byte, error) {
return w.PublishToTopic(ctx, message, DefaultWakuTopic)
}
// Stop unmounts the relay protocol and stops all subscriptions
func (w *WakuRelay) Stop() {
if w.cancel == nil {
return // Not started
}
w.host.RemoveStreamHandler(WakuRelayID_v200)
w.cancel()
w.wg.Wait()
}
// EnoughPeersToPublish returns whether there are enough peers connected in the default waku pubsub topic
func (w *WakuRelay) EnoughPeersToPublish() bool {
return w.EnoughPeersToPublishToTopic(DefaultWakuTopic)
}
// EnoughPeersToPublish returns whether there are enough peers connected in a pubsub topic
func (w *WakuRelay) EnoughPeersToPublishToTopic(topic string) bool {
return len(w.PubSub().ListPeers(topic)) >= w.minPeersToPublish
}
// SubscribeToTopic returns a Subscription to receive messages from a pubsub topic
func (w *WakuRelay) SubscribeToTopic(ctx context.Context, topic string) (*Subscription, error) {
_, err := w.subscribe(topic)
if err != nil {
return nil, err
}
// Create client subscription
subscription := NoopSubscription()
if w.bcaster != nil {
subscription = w.bcaster.Register(topic, 1024)
}
go func() {
<-ctx.Done()
subscription.Unsubscribe()
}()
return &subscription, nil
}
// SubscribeToTopic returns a Subscription to receive messages from the default waku pubsub topic
func (w *WakuRelay) Subscribe(ctx context.Context) (*Subscription, error) {
return w.SubscribeToTopic(ctx, DefaultWakuTopic)
}
// Unsubscribe closes a subscription to a pubsub topic
func (w *WakuRelay) Unsubscribe(ctx context.Context, topic string) error {
sub, ok := w.relaySubs[topic]
if !ok {
return fmt.Errorf("not subscribed to topic")
}
w.log.Info("unsubscribing from topic", zap.String("topic", sub.Topic()))
w.relaySubs[topic].Cancel()
delete(w.relaySubs, topic)
err := w.wakuRelayTopics[topic].Close()
if err != nil {
return err
}
delete(w.wakuRelayTopics, topic)
return nil
}
func (w *WakuRelay) nextMessage(ctx context.Context, sub *pubsub.Subscription) <-chan *pubsub.Message {
msgChannel := make(chan *pubsub.Message, 1024)
go func() {
defer close(msgChannel)
for {
msg, err := sub.Next(ctx)
if err != nil {
if !errors.Is(err, context.Canceled) {
w.log.Error("getting message from subscription", zap.Error(err))
}
sub.Cancel()
return
}
msgChannel <- msg
}
}()
return msgChannel
}
func (w *WakuRelay) subscribeToTopic(pubsubTopic string, sub *pubsub.Subscription) {
defer w.wg.Done()
ctx, err := tag.New(w.ctx, tag.Insert(metrics.KeyType, "relay"))
if err != nil {
w.log.Error("creating tag map", zap.Error(err))
return
}
subChannel := w.nextMessage(w.ctx, sub)
for {
select {
case <-ctx.Done():
return
// TODO: if there are no more relay subscriptions, close the pubsub subscription
case msg, ok := <-subChannel:
if !ok {
return
}
wakuMessage := &pb.WakuMessage{}
if err := proto.Unmarshal(msg.Data, wakuMessage); err != nil {
w.log.Error("decoding message", zap.Error(err))
return
}
payloadSizeInBytes := len(wakuMessage.Payload)
payloadSizeInKb := payloadSizeInBytes / 1000
stats.Record(ctx, metrics.Messages.M(1), metrics.MessageSize.M(int64(payloadSizeInKb)))
envelope := waku_proto.NewEnvelope(wakuMessage, w.timesource.Now().UnixNano(), pubsubTopic)
w.log.Debug("waku.relay received", zap.String("pubsubTopic", pubsubTopic), logging.HexString("hash", envelope.Hash()), zap.Int64("receivedTime", envelope.Index().ReceiverTime), zap.Int("payloadSizeBytes", payloadSizeInBytes))
if w.bcaster != nil {
w.bcaster.Submit(envelope)
}
}
}
}
func (w *WakuRelay) Params() pubsub.GossipSubParams {
return w.params
}