diff --git a/go.mod b/go.mod index cf7c59b..74ef14a 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/fjl/memsize v0.0.0-20180929194037-2a09253e352a // indirect github.com/go-playground/locales v0.12.1 // indirect github.com/go-playground/universal-translator v0.16.0 // indirect + github.com/gogo/protobuf v1.2.1 github.com/golang-migrate/migrate v3.5.4+incompatible // indirect github.com/golang-migrate/migrate/v4 v4.3.1 // indirect github.com/golang/mock v1.3.1 @@ -33,6 +34,7 @@ require ( github.com/status-im/doubleratchet v2.0.0+incompatible // indirect github.com/status-im/migrate v3.5.1-status+incompatible // indirect github.com/status-im/migrate/v4 v4.3.1-status + github.com/status-im/mvds v0.0.14 github.com/status-im/rendezvous v1.2.0 // indirect github.com/status-im/status-go v0.25.0-beta.2 github.com/status-im/whisper v1.4.13 diff --git a/go.sum b/go.sum index f6a31c6..c6bb3f7 100644 --- a/go.sum +++ b/go.sum @@ -40,9 +40,13 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dR github.com/btcsuite/btcd v0.0.0-20171128150713-2e60448ffcc6/go.mod h1:Dmm/EzmjnCiweXmzRIAiUWCInVmPgjkzgv5k4tVyXiQ= github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32 h1:qkOC5Gd33k54tobS36cXdAzJbeHaduLtnLQQwNoIi78= github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8= +github.com/btcsuite/btcd v0.0.0-20190427004231-96897255fd17 h1:m0N5Vg5nP3zEz8TREZpwX3gt4Biw3/8fbIf4A3hO96g= +github.com/btcsuite/btcd v0.0.0-20190427004231-96897255fd17/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI= github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA= github.com/btcsuite/btcutil v0.0.0-20190207003914-4c204d697803 h1:j3AgPKKZtZStM2nyhrDSLSYgT7YHrZKdSkq1OYeLjvM= github.com/btcsuite/btcutil v0.0.0-20190207003914-4c204d697803/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg= +github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d h1:yJzD/yFppdVCf6ApMkVy8cUxV0XrxdP9rVf6D87/Mng= +github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg= github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd/go.mod h1:HHNXQzUsZCxOoE+CPiyCTO6x34Zs86zZUiwtpXoGdtg= github.com/btcsuite/goleveldb v0.0.0-20160330041536-7834afc9e8cd/go.mod h1:F+uVaaLLH7j4eDXPRvw78tMflu7Ie2bzYOH4Y8rRKBY= github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc= @@ -109,6 +113,7 @@ github.com/elastic/gosigar v0.0.0-20180330100440-37f05ff46ffa/go.mod h1:cdorVVzy github.com/elastic/gosigar v0.10.2 h1:ktO7B8lAQQCf/CieNp1lQzKnbijg3FbfQqFUfAe6h/A= github.com/elastic/gosigar v0.10.2/go.mod h1:cdorVVzy1fhmEqmtgqkoE3bYtCfSCkVyjTyCIo22xvs= github.com/ethereum/go-ethereum v1.8.20/go.mod h1:PwpWDrCLZrV+tfrhqqF6kPknbISMHaJv9Ln3kPCZLwY= +github.com/ethereum/go-ethereum v1.8.27/go.mod h1:PwpWDrCLZrV+tfrhqqF6kPknbISMHaJv9Ln3kPCZLwY= github.com/fatih/color v1.3.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= @@ -488,6 +493,8 @@ github.com/status-im/migrate v3.5.1-status+incompatible h1:MhM/sOyGDrWMkJxamqc9A github.com/status-im/migrate v3.5.1-status+incompatible/go.mod h1:dZ3YxYs9/ik9SOXvNX3Yjl8kcaDbN5b4vZH5efdK+bk= github.com/status-im/migrate/v4 v4.3.1-status h1:tJwsEYLgbFkvlTSMk89APwRDfpr4yG8wcwPeudsqPwo= github.com/status-im/migrate/v4 v4.3.1-status/go.mod h1:r8HggRBZ/k7TRwByq/Hp3P/ubFppIna0nvyavVK0pjA= +github.com/status-im/mvds v0.0.14 h1:CBZNgewqYBzpo1C1bCLuFfm/2N+qvr0P4qlolQi5AnU= +github.com/status-im/mvds v0.0.14/go.mod h1:cgNgcRbLwE4TMPMSax7xl+Ca/Q8rIklZLDwAD6iBKjo= github.com/status-im/rendezvous v1.2.0 h1:d3HysveJI9w3QFFIGf0rz1HHW51LMb0F0sgGDF+Qq+c= github.com/status-im/rendezvous v1.2.0/go.mod h1:vUZNaCYr2AsFngV2IW5NsnPBRK/h+x57nsMINkhrVCQ= github.com/status-im/status-go v0.25.0-beta.2 h1:7o0CaezNhdHtxxeBliY7EoboDyaatRcsVtoKkTgr2gs= @@ -617,6 +624,7 @@ golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3 golang.org/x/tools v0.0.0-20190312170243-e65039ee4138/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20190425222832-ad9eeb80039a/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20190525145741-7be61e1b0e51/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= google.golang.org/api v0.0.0-20180724000608-2c45710c7f3f/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0= google.golang.org/api v0.3.1/go.mod h1:6wY9I6uQWHQ8EM57III9mq/AjF+i8G65rmVagqKMtkk= google.golang.org/api v0.3.2/go.mod h1:6wY9I6uQWHQ8EM57III9mq/AjF+i8G65rmVagqKMtkk= diff --git a/main.go b/main.go index 213b854..3d429c8 100644 --- a/main.go +++ b/main.go @@ -16,18 +16,28 @@ import ( "github.com/ethereum/go-ethereum/crypto" gethnode "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/rpc" - "github.com/jroimartin/gocui" - "github.com/peterbourgon/ff" - "github.com/pkg/errors" - "github.com/status-im/status-console-client/protocol/adapter" - "github.com/status-im/status-console-client/protocol/client" - "github.com/status-im/status-console-client/protocol/gethservice" - "github.com/status-im/status-console-client/protocol/transport" + "github.com/status-im/status-go/logutils" "github.com/status-im/status-go/node" "github.com/status-im/status-go/params" "github.com/status-im/status-go/services/shhext/chat" "github.com/status-im/status-go/signal" + + "github.com/jroimartin/gocui" + "github.com/peterbourgon/ff" + "github.com/pkg/errors" + + datasyncnode "github.com/status-im/mvds/node" + "github.com/status-im/mvds/state" + "github.com/status-im/mvds/store" + + "github.com/status-im/status-console-client/protocol/adapter" + "github.com/status-im/status-console-client/protocol/client" + "github.com/status-im/status-console-client/protocol/datasync" + dspeer "github.com/status-im/status-console-client/protocol/datasync/peer" + "github.com/status-im/status-console-client/protocol/gethservice" + "github.com/status-im/status-console-client/protocol/transport" + "github.com/status-im/status-console-client/protocol/v1" ) var g *gocui.Gui @@ -44,11 +54,12 @@ var ( addContact = fs.String("add-contact", "", "add contact using format: type,name[,public-key] where type can be 'private' or 'public' and 'public-key' is required for 'private' type") // flags for in-proc node - dataDir = fs.String("data-dir", filepath.Join(os.TempDir(), "status-term-client"), "data directory for Ethereum node") - noNamespace = fs.Bool("no-namespace", false, "disable data dir namespacing with public key") - fleet = fs.String("fleet", params.FleetBeta, fmt.Sprintf("Status nodes cluster to connect to: %s", []string{params.FleetBeta, params.FleetStaging})) - configFile = fs.String("node-config", "", "a JSON file with node config") - pfsEnabled = fs.Bool("pfs", false, "enable PFS") + dataDir = fs.String("data-dir", filepath.Join(os.TempDir(), "status-term-client"), "data directory for Ethereum node") + noNamespace = fs.Bool("no-namespace", false, "disable data dir namespacing with public key") + fleet = fs.String("fleet", params.FleetBeta, fmt.Sprintf("Status nodes cluster to connect to: %s", []string{params.FleetBeta, params.FleetStaging})) + configFile = fs.String("node-config", "", "a JSON file with node config") + pfsEnabled = fs.Bool("pfs", false, "enable PFS") + dataSyncEnabled = fs.Bool("ds", false, "enable data sync") // flags for external node providerURI = fs.String("provider", "", "an URI pointing at a provider") @@ -316,29 +327,50 @@ func createMessengerInProc(pk *ecdsa.PrivateKey, db client.Database) (*client.Me return nil, errors.Wrap(err, "failed to get Whisper service") } - var pfs *chat.ProtocolService + var protocolAdapter protocol.Protocol - // TODO: should be removed from StatusNode - if *pfsEnabled { - databasesDir := filepath.Join(*dataDir, "databases") + if *dataSyncEnabled { + transport := transport.NewWhisperServiceTransport(statusNode, shhService, pk) - if err := os.MkdirAll(databasesDir, 0755); err != nil { - exitErr(errors.Wrap(err, "failed to create databases dir")) + dataSyncTransport := datasync.NewDataSyncNodeTransport(transport) + dataSyncStore := store.NewDummyStore() + dataSyncNode := datasyncnode.NewNode( + &dataSyncStore, + dataSyncTransport, + state.NewSyncState(), // @todo sqlite syncstate + datasync.CalculateSendTime, + 0, + dspeer.PublicKeyToPeerID(pk.PublicKey), + datasyncnode.BATCH, + ) + + protocolAdapter = adapter.NewDataSyncWhisperAdapter(dataSyncNode, transport, dataSyncTransport) + } else { + var pfs *chat.ProtocolService + + // TODO: should be removed from StatusNode + if *pfsEnabled { + databasesDir := filepath.Join(*dataDir, "databases") + + if err := os.MkdirAll(databasesDir, 0755); err != nil { + exitErr(errors.Wrap(err, "failed to create databases dir")) + } + + pfs, err = initPFS(databasesDir) + if err != nil { + exitErr(errors.Wrap(err, "initialize PFS")) + } + + log.Printf("PFS has been initialized") } - pfs, err = initPFS(databasesDir) - if err != nil { - exitErr(errors.Wrap(err, "initialize PFS")) - } - - log.Printf("PFS has been initialized") + transport := transport.NewWhisperServiceTransport(statusNode, shhService, pk) + protocolAdapter = adapter.NewProtocolWhisperAdapter(transport, pfs) } - transport := transport.NewWhisperServiceTransport(statusNode, shhService, pk) - adapter := adapter.NewProtocolWhisperAdapter(transport, pfs) - messenger := client.NewMessenger(pk, adapter, db) + messenger := client.NewMessenger(pk, protocolAdapter, db) - protocolGethService.SetProtocol(adapter) + protocolGethService.SetProtocol(protocolAdapter) protocolGethService.SetMessenger(messenger) return messenger, nil diff --git a/protocol/adapter/datasync_whisper.go b/protocol/adapter/datasync_whisper.go new file mode 100644 index 0000000..fef9792 --- /dev/null +++ b/protocol/adapter/datasync_whisper.go @@ -0,0 +1,173 @@ +package adapter + +import ( + "context" + "crypto/ecdsa" + "errors" + "log" + "sort" + + "github.com/gogo/protobuf/proto" + + "github.com/status-im/mvds/node" + "github.com/status-im/mvds/protobuf" + "github.com/status-im/mvds/state" + dstrns "github.com/status-im/mvds/transport" + + dspeer "github.com/status-im/status-console-client/protocol/datasync/peer" + "github.com/status-im/status-console-client/protocol/subscription" + "github.com/status-im/status-console-client/protocol/transport" + "github.com/status-im/status-console-client/protocol/v1" + + whisper "github.com/status-im/whisper/whisperv6" +) + +type PacketHandler interface { + AddPacket(dstrns.Packet) +} + +type DataSyncWhisperAdapter struct { + node *node.Node + transport transport.WhisperTransport + packets PacketHandler +} + +// DataSyncWhisperAdapter must implement Protocol interface. +var _ protocol.Protocol = (*DataSyncWhisperAdapter)(nil) + +func NewDataSyncWhisperAdapter(n *node.Node, t transport.WhisperTransport, h PacketHandler) *DataSyncWhisperAdapter { + return &DataSyncWhisperAdapter{ + node: n, + transport: t, + packets: h, + } +} + +// Subscribe listens to new messages. +func (w *DataSyncWhisperAdapter) Subscribe( + ctx context.Context, + messages chan<- *protocol.Message, + options protocol.SubscribeOptions, +) (*subscription.Subscription, error) { + if err := options.Validate(); err != nil { + return nil, err + } + + filter := newFilter(w.transport.KeysManager()) + if err := updateFilterFromSubscribeOptions(filter, options); err != nil { + return nil, err + } + + // Messages income in batches and hence a buffered channel is used. + in := make(chan *whisper.ReceivedMessage, 1024) + sub, err := w.transport.Subscribe(ctx, in, filter.Filter) + if err != nil { + return nil, err + } + + go func() { + for item := range in { + payload, err := w.decodePayload(item) + if err != nil { + log.Printf("failed to decode message %#+x: %v", item.EnvelopeHash.Bytes(), err) + continue + } + + packet := dstrns.Packet{ + Group: toGroupId(item.Topic), + Sender: dspeer.PublicKeyToPeerID(*item.Src), + Payload: payload, + } + w.packets.AddPacket(packet) + + for _, m := range w.decodeMessages(payload) { + m.SigPubKey = item.Src + messages <- m + } + } + }() + + return sub, nil +} + +func (w *DataSyncWhisperAdapter) decodePayload(message *whisper.ReceivedMessage) (payload protobuf.Payload, err error) { + err = proto.Unmarshal(message.Payload, &payload) + return +} + +func (w *DataSyncWhisperAdapter) decodeMessages(payload protobuf.Payload) []*protocol.Message { + messages := make([]*protocol.Message, len(payload.Messages)) + + for _, message := range payload.Messages { + decoded, err := protocol.DecodeMessage(message.Body) + if err != nil { + // @todo log or something? + continue + } + + id := state.ID(*message) + decoded.ID = id[:] + + messages = append(messages, &decoded) + } + + sort.Slice(messages, func(i, j int) bool { + return messages[i].Clock < messages[j].Clock + }) + + return messages +} + +// Send sends a message to the network. +// Identity is required as the protocol requires +// all messages to be signed. +func (w *DataSyncWhisperAdapter) Send(ctx context.Context, data []byte, options protocol.SendOptions) ([]byte, error) { + if err := options.Validate(); err != nil { + return nil, err + } + + if options.ChatName == "" { + return nil, errors.New("missing chat name") + } + + topic, err := ToTopic(options.ChatName) + if err != nil { + return nil, err + } + + gid := toGroupId(topic) + + w.peer(gid, options.Recipient) + + id, err := w.node.AppendMessage(gid, data) + if err != nil { + return nil, err + } + + return id[:], nil +} + +// Request retrieves historic messages. +func (m *DataSyncWhisperAdapter) Request(ctx context.Context, params protocol.RequestOptions) error { + return nil +} + +func (c *DataSyncWhisperAdapter) peer(id state.GroupID, peer *ecdsa.PublicKey) { + if peer == nil { + return + } + + p := dspeer.PublicKeyToPeerID(*peer) + + if c.node.IsPeerInGroup(id, p) { + return + } + + c.node.AddPeer(id, p) +} + +func toGroupId(topicType whisper.TopicType) state.GroupID { + g := state.GroupID{} + copy(g[:], topicType[:]) + return g +} diff --git a/protocol/adapter/mvds_whisper.go b/protocol/adapter/mvds_whisper.go deleted file mode 100644 index 78c27c4..0000000 --- a/protocol/adapter/mvds_whisper.go +++ /dev/null @@ -1,38 +0,0 @@ -package adapter - -import ( - "context" - - "github.com/status-im/status-console-client/protocol/subscription" - "github.com/status-im/status-console-client/protocol/transport" - "github.com/status-im/status-console-client/protocol/v1" -) - -type MVDSWhisperAdapter struct { - transport transport.WhisperTransport //nolint: structcheck,unused - // TODO: probably some *mvds.MVDS struct implementing mvds. -} - -// MVDSWhisperAdapter must implement Protocol interface. -var _ protocol.Protocol = (*MVDSWhisperAdapter)(nil) - -// Subscribe listens to new messages. -func (m *MVDSWhisperAdapter) Subscribe( - ctx context.Context, - messages chan<- *protocol.Message, - options protocol.SubscribeOptions, -) (*subscription.Subscription, error) { - return nil, nil -} - -// Send sends a message to the network. -// Identity is required as the protocol requires -// all messages to be signed. -func (m *MVDSWhisperAdapter) Send(ctx context.Context, data []byte, options protocol.SendOptions) ([]byte, error) { - return nil, nil -} - -// Request retrieves historic messages. -func (m *MVDSWhisperAdapter) Request(ctx context.Context, params protocol.RequestOptions) error { - return nil -} diff --git a/protocol/adapter/protocol_whisper.go b/protocol/adapter/protocol_whisper.go index 6326df9..3e8e1e7 100644 --- a/protocol/adapter/protocol_whisper.go +++ b/protocol/adapter/protocol_whisper.go @@ -127,7 +127,7 @@ func (w *ProtocolWhisperAdapter) Send(ctx context.Context, data []byte, options data = encryptedData } - newMessage, err := newNewMessage(w.transport.KeysManager(), data) + newMessage, err := NewNewMessage(w.transport.KeysManager(), data) if err != nil { return nil, err } diff --git a/protocol/adapter/whisper.go b/protocol/adapter/whisper.go index 5a803ce..cab1bf5 100644 --- a/protocol/adapter/whisper.go +++ b/protocol/adapter/whisper.go @@ -93,18 +93,18 @@ func updateFilterFromSubscribeOptions(f *filter, options protocol.SubscribeOptio } } -type newMessage struct { +type NewMessage struct { whisper.NewMessage keys keysManager } -func newNewMessage(keys keysManager, data []byte) (*newMessage, error) { +func NewNewMessage(keys keysManager, data []byte) (*NewMessage, error) { sigKey, err := keys.AddOrGetKeyPair(keys.PrivateKey()) if err != nil { return nil, err } - return &newMessage{ + return &NewMessage{ NewMessage: whisper.NewMessage{ TTL: WhisperTTL, Payload: data, @@ -116,7 +116,7 @@ func newNewMessage(keys keysManager, data []byte) (*newMessage, error) { }, nil } -func (m *newMessage) updateForPrivate(name string, recipient *ecdsa.PublicKey) (err error) { +func (m *NewMessage) updateForPrivate(name string, recipient *ecdsa.PublicKey) (err error) { m.Topic, err = ToTopic(name) if err != nil { return @@ -127,7 +127,7 @@ func (m *newMessage) updateForPrivate(name string, recipient *ecdsa.PublicKey) ( return } -func (m *newMessage) updateForPublicGroup(name string) (err error) { +func (m *NewMessage) updateForPublicGroup(name string) (err error) { m.Topic, err = ToTopic(name) if err != nil { return @@ -137,7 +137,7 @@ func (m *newMessage) updateForPublicGroup(name string) (err error) { return } -func updateNewMessageFromSendOptions(m *newMessage, options protocol.SendOptions) error { +func updateNewMessageFromSendOptions(m *NewMessage, options protocol.SendOptions) error { if options.Recipient != nil && options.ChatName != "" { return m.updateForPrivate(options.ChatName, options.Recipient) } else if options.ChatName != "" { diff --git a/protocol/datasync/peer/utils.go b/protocol/datasync/peer/utils.go new file mode 100644 index 0000000..ce98d08 --- /dev/null +++ b/protocol/datasync/peer/utils.go @@ -0,0 +1,15 @@ +package peer + +import ( + "crypto/ecdsa" + + "github.com/ethereum/go-ethereum/crypto" + + "github.com/status-im/mvds/state" +) + +func PublicKeyToPeerID(k ecdsa.PublicKey) state.PeerID { + var p state.PeerID + copy(p[:], crypto.FromECDSAPub(&k)) + return p +} diff --git a/protocol/datasync/transport.go b/protocol/datasync/transport.go new file mode 100644 index 0000000..59de7fc --- /dev/null +++ b/protocol/datasync/transport.go @@ -0,0 +1,68 @@ +package datasync + +import ( + "context" + + "github.com/gogo/protobuf/proto" + + "github.com/status-im/mvds/protobuf" + "github.com/status-im/mvds/state" + "github.com/status-im/mvds/transport" + + "github.com/status-im/status-console-client/protocol/adapter" + prototrns "github.com/status-im/status-console-client/protocol/transport" + + whisper "github.com/status-im/whisper/whisperv6" +) + +type DataSyncNodeTransport struct { + transport prototrns.WhisperTransport + packets chan transport.Packet +} + +func NewDataSyncNodeTransport(t prototrns.WhisperTransport) *DataSyncNodeTransport { + return &DataSyncNodeTransport{ + transport: t, + packets: make(chan transport.Packet), + } +} + +func (t *DataSyncNodeTransport) AddPacket(p transport.Packet) { + t.packets <- p +} + +func (t *DataSyncNodeTransport) Watch() transport.Packet { + return <-t.packets +} + +func (t *DataSyncNodeTransport) Send(group state.GroupID, _ state.PeerID, peer state.PeerID, payload protobuf.Payload) error { + data, err := proto.Marshal(&payload) + if err != nil { + return err + } + + newMessage, err := adapter.NewNewMessage(t.transport.KeysManager(), data) + if err != nil { + return err + } + + newMessage.Topic = toTopicType(group) + + // @todo set SymKeyID or PublicKey depending on chat type + newMessage.PublicKey = peer[:] + + _, err = t.transport.Send(context.Background(), newMessage.NewMessage) + return err +} + +func toTopicType(g state.GroupID) whisper.TopicType { + t := whisper.TopicType{} + copy(t[:], g[:4]) + return t +} + +// CalculateSendTime calculates the next epoch +// at which a message should be sent. +func CalculateSendTime(count uint64, time int64) int64 { + return time + int64(count*2) // @todo this should match that time is increased by whisper periods, aka we only retransmit the first time when a message has expired. +} diff --git a/vendor/github.com/btcsuite/btcd/chaincfg/params.go b/vendor/github.com/btcsuite/btcd/chaincfg/params.go index 9da7102..54117b8 100644 --- a/vendor/github.com/btcsuite/btcd/chaincfg/params.go +++ b/vendor/github.com/btcsuite/btcd/chaincfg/params.go @@ -272,6 +272,13 @@ var MainNetParams = Params{ {343185, newHashFromStr("0000000000000000072b8bf361d01a6ba7d445dd024203fafc78768ed4368554")}, {352940, newHashFromStr("000000000000000010755df42dba556bb72be6a32f3ce0b6941ce4430152c9ff")}, {382320, newHashFromStr("00000000000000000a8dc6ed5b133d0eb2fd6af56203e4159789b092defd8ab2")}, + {400000, newHashFromStr("000000000000000004ec466ce4732fe6f1ed1cddc2ed4b328fff5224276e3f6f")}, + {430000, newHashFromStr("000000000000000001868b2bb3a285f3cc6b33ea234eb70facf4dcdf22186b87")}, + {460000, newHashFromStr("000000000000000000ef751bbce8e744ad303c47ece06c8d863e4d417efc258c")}, + {490000, newHashFromStr("000000000000000000de069137b17b8d5a3dfbd5b145b2dcfb203f15d0c4de90")}, + {520000, newHashFromStr("0000000000000000000d26984c0229c9f6962dc74db0a6d525f2f1640396f69c")}, + {550000, newHashFromStr("000000000000000000223b7a2298fb1c6c75fb0efc28a4c56853ff4112ec6bc9")}, + {560000, newHashFromStr("0000000000000000002c7b276daf6efb2b6aa68e2ce3be67ef925b3264ae7122")}, }, // Consensus rule change deployments. @@ -439,6 +446,9 @@ var TestNet3Params = Params{ {800010, newHashFromStr("000000000017ed35296433190b6829db01e657d80631d43f5983fa403bfdb4c1")}, {900000, newHashFromStr("0000000000356f8d8924556e765b7a94aaebc6b5c8685dcfa2b1ee8b41acd89b")}, {1000007, newHashFromStr("00000000001ccb893d8a1f25b70ad173ce955e5f50124261bbbc50379a612ddf")}, + {1100007, newHashFromStr("00000000000abc7b2cd18768ab3dee20857326a818d1946ed6796f42d66dd1e8")}, + {1200007, newHashFromStr("00000000000004f2dc41845771909db57e04191714ed8c963f7e56713a7b6cea")}, + {1300007, newHashFromStr("0000000072eab69d54df75107c052b26b0395b44f77578184293bf1bb1dbd9fa")}, }, // Consensus rule change deployments. diff --git a/vendor/github.com/btcsuite/btcutil/address.go b/vendor/github.com/btcsuite/btcutil/address.go index 9e3592a..8966de4 100644 --- a/vendor/github.com/btcsuite/btcutil/address.go +++ b/vendor/github.com/btcsuite/btcutil/address.go @@ -188,8 +188,8 @@ func DecodeAddress(addr string, defaultNet *chaincfg.Params) (Address, error) { } switch len(decoded) { case ripemd160.Size: // P2PKH or P2SH - isP2PKH := chaincfg.IsPubKeyHashAddrID(netID) - isP2SH := chaincfg.IsScriptHashAddrID(netID) + isP2PKH := netID == defaultNet.PubKeyHashAddrID + isP2SH := netID == defaultNet.ScriptHashAddrID switch hash160 := decoded; { case isP2PKH && isP2SH: return nil, ErrAddressCollision diff --git a/vendor/github.com/status-im/mvds/LICENSE b/vendor/github.com/status-im/mvds/LICENSE new file mode 100644 index 0000000..feec255 --- /dev/null +++ b/vendor/github.com/status-im/mvds/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2019 Status + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/github.com/status-im/mvds/node/node.go b/vendor/github.com/status-im/mvds/node/node.go new file mode 100644 index 0000000..b6fbc14 --- /dev/null +++ b/vendor/github.com/status-im/mvds/node/node.go @@ -0,0 +1,355 @@ +// Package Node contains node logic. +package node + +// @todo this is a very rough implementation that needs cleanup + +import ( + "bytes" + "context" + "fmt" + "log" + "sync/atomic" + "time" + + "github.com/status-im/mvds/protobuf" + "github.com/status-im/mvds/state" + "github.com/status-im/mvds/store" + "github.com/status-im/mvds/transport" +) + +// Mode represents the synchronization mode. +type Mode int + +const ( + INTERACTIVE Mode = iota + BATCH +) + +type calculateNextEpoch func(count uint64, epoch int64) int64 + +// Node represents an MVDS node, it runs all the logic like sending and receiving protocol messages. +type Node struct { + ctx context.Context + cancel context.CancelFunc + + store store.MessageStore + transport transport.Transport + + syncState state.SyncState + + peers map[state.GroupID][]state.PeerID + + payloads payloads + + nextEpoch calculateNextEpoch + + ID state.PeerID + + epoch int64 + mode Mode +} + +// NewNode returns a new node. +func NewNode( + ms store.MessageStore, + st transport.Transport, + ss state.SyncState, + nextEpoch calculateNextEpoch, + currentEpoch int64, + id state.PeerID, + mode Mode, +) *Node { + ctx, cancel := context.WithCancel(context.Background()) + + return &Node{ + ctx: ctx, + cancel: cancel, + store: ms, + transport: st, + syncState: ss, + peers: make(map[state.GroupID][]state.PeerID), + payloads: newPayloads(), + nextEpoch: nextEpoch, + ID: id, + epoch: currentEpoch, + mode: mode, + } +} + +// Start listens for new messages received by the node and sends out those required every epoch. +func (n *Node) Start() { + go func() { + for { + select { + case <-n.ctx.Done(): + log.Print("Watch stopped") + return + default: + p := n.transport.Watch() + go n.onPayload(p.Group, p.Sender, p.Payload) + } + } + }() + + go func() { + for { + select { + case <-n.ctx.Done(): + log.Print("Epoch processing stopped") + return + default: + log.Printf("Node: %x Epoch: %d", n.ID[:4], n.epoch) + time.Sleep(1 * time.Second) + + n.sendMessages() + atomic.AddInt64(&n.epoch, 1) + } + } + }() +} + +// Stop message reading and epoch processing +func (n *Node) Stop() { + n.cancel() +} + +// AppendMessage sends a message to a given group. +func (n *Node) AppendMessage(group state.GroupID, data []byte) (state.MessageID, error) { + m := protobuf.Message{ + GroupId: group[:], + Timestamp: time.Now().Unix(), + Body: data, + } + + id := state.ID(m) + + peers, ok := n.peers[group] + if !ok { + return state.MessageID{}, fmt.Errorf("trying to send to unknown group %x", group[:4]) + } + + err := n.store.Add(m) + if err != nil { + return state.MessageID{}, err + } + + go func() { + for _, p := range peers { + if !n.IsPeerInGroup(group, p) { + continue + } + + if n.mode == INTERACTIVE { + s := state.State{} + s.SendEpoch = n.epoch + 1 + err := n.syncState.Set(group, id, p, s) + + if err != nil { + log.Printf("error while setting sync state %s", err.Error()) + } + } + + if n.mode == BATCH { + // @TODO this if flawed cause we never retransmit + n.payloads.AddMessages(group, p, &m) + log.Printf("[%x] sending MESSAGE (%x -> %x): %x\n", group[:4], n.ID[:4], p[:4], id[:4]) + } + } + }() + + log.Printf("[%x] node %x sending %x\n", group[:4], n.ID[:4], id[:4]) + // @todo think about a way to insta trigger send messages when send was selected, we don't wanna wait for ticks here + + return id, nil +} + +// AddPeer adds a peer to a specific group making it a recipient of messages. +func (n *Node) AddPeer(group state.GroupID, id state.PeerID) { + if _, ok := n.peers[group]; !ok { + n.peers[group] = make([]state.PeerID, 0) + } + + n.peers[group] = append(n.peers[group], id) +} + +// IsPeerInGroup checks whether a peer is in the specified group. +func (n Node) IsPeerInGroup(g state.GroupID, p state.PeerID) bool { + for _, peer := range n.peers[g] { + if bytes.Equal(peer[:], p[:]) { + return true + } + } + + return false +} + +func (n *Node) sendMessages() { + err := n.syncState.Map(n.epoch, func(g state.GroupID, m state.MessageID, p state.PeerID, s state.State) state.State { + if !n.IsPeerInGroup(g, p) { + return s + } + + n.payloads.AddOffers(g, p, m[:]) + return n.updateSendEpoch(s) + }) + + if err != nil { + log.Printf("error while mapping sync state: %s", err.Error()) + } + + n.payloads.MapAndClear(func(id state.GroupID, peer state.PeerID, payload protobuf.Payload) { + err := n.transport.Send(id, n.ID, peer, payload) + if err != nil { + log.Printf("error sending message: %s", err.Error()) + // @todo + } + }) +} + +func (n *Node) onPayload(group state.GroupID, sender state.PeerID, payload protobuf.Payload) { + if payload.Ack != nil { + n.onAck(group, sender, *payload.Ack) + } + + if payload.Request != nil { + n.payloads.AddMessages(group, sender, n.onRequest(group, sender, *payload.Request)...) + } + + if payload.Offer != nil { + n.payloads.AddRequests(group, sender, n.onOffer(group, sender, *payload.Offer)...) + } + + if payload.Messages != nil { + n.payloads.AddAcks(group, sender, n.onMessages(group, sender, payload.Messages)...) + } +} + +func (n *Node) onOffer(group state.GroupID, sender state.PeerID, msg protobuf.Offer) [][]byte { + r := make([][]byte, 0) + + for _, raw := range msg.Id { + id := toMessageID(raw) + log.Printf("[%x] OFFER (%x -> %x): %x received.\n", group[:4], sender[:4], n.ID[:4], id[:4]) + + // @todo maybe ack? + if n.store.Has(id) { + continue + } + + r = append(r, raw) + log.Printf("[%x] sending REQUEST (%x -> %x): %x\n", group[:4], n.ID[:4], sender[:4], id[:4]) + } + + return r +} + +func (n *Node) onRequest(group state.GroupID, sender state.PeerID, msg protobuf.Request) []*protobuf.Message { + m := make([]*protobuf.Message, 0) + + for _, raw := range msg.Id { + id := toMessageID(raw) + log.Printf("[%x] REQUEST (%x -> %x): %x received.\n", group[:4], sender[:4], n.ID[:4], id[:4]) + + if !n.IsPeerInGroup(group, sender) { + log.Printf("[%x] peer %x is not in group", group[:4], sender[:4]) + continue + } + + message, err := n.store.Get(id) + if err != nil { + log.Printf("error requesting message %x", id[:4]) + continue + } + + // @todo this probably change the sync state to retransmit messages rather than offers + s, err := n.syncState.Get(group, id, sender) + if err != nil { + log.Printf("error (%s) getting sync state group: %x id: %x peer: %x", err.Error(), group[:4], id[:4], sender[:4]) + continue + } + + err = n.syncState.Set(group, id, sender, n.updateSendEpoch(s)) + if err != nil { + log.Printf("error (%s) setting sync state group: %x id: %x peer: %x", err.Error(), group[:4], id[:4], sender[:4]) + continue + } + + m = append(m, &message) + + log.Printf("[%x] sending MESSAGE (%x -> %x): %x\n", group[:4], n.ID[:4], sender[:4], id[:4]) + } + + return m +} + +func (n *Node) onAck(group state.GroupID, sender state.PeerID, msg protobuf.Ack) { + for _, raw := range msg.Id { + id := toMessageID(raw) + + err := n.syncState.Remove(group, id, sender) + if err != nil { + log.Printf("error while removing sync state %s", err.Error()) + continue + } + + log.Printf("[%x] ACK (%x -> %x): %x received.\n", group[:4], sender[:4], n.ID[:4], id[:4]) + } +} + +func (n *Node) onMessages(group state.GroupID, sender state.PeerID, messages []*protobuf.Message) [][]byte { + a := make([][]byte, 0) + + for _, m := range messages { + err := n.onMessage(group, sender, *m) + if err != nil { + // @todo + continue + } + + id := state.ID(*m) + log.Printf("[%x] sending ACK (%x -> %x): %x\n", group[:4], n.ID[:4], sender[:4], id[:4]) + a = append(a, id[:]) + } + + return a +} + +func (n *Node) onMessage(group state.GroupID, sender state.PeerID, msg protobuf.Message) error { + id := state.ID(msg) + log.Printf("[%x] MESSAGE (%x -> %x): %x received.\n", group[:4], sender[:4], n.ID[:4], id[:4]) + + go func() { + for _, peer := range n.peers[group] { + if peer == sender { + continue + } + + s := state.State{} + s.SendEpoch = n.epoch + 1 + err := n.syncState.Set(group, id, peer, s) + if err != nil { + log.Printf("error while setting sync state %s", err.Error()) + } + } + }() + + err := n.store.Add(msg) + if err != nil { + return err + // @todo process, should this function ever even have an error? + } + + return nil +} + +func (n Node) updateSendEpoch(s state.State) state.State { + s.SendCount += 1 + s.SendEpoch += n.nextEpoch(s.SendCount, n.epoch) + return s +} + +func toMessageID(b []byte) state.MessageID { + var id state.MessageID + copy(id[:], b) + return id +} diff --git a/vendor/github.com/status-im/mvds/node/payloads.go b/vendor/github.com/status-im/mvds/node/payloads.go new file mode 100644 index 0000000..e1f8838 --- /dev/null +++ b/vendor/github.com/status-im/mvds/node/payloads.go @@ -0,0 +1,103 @@ +package node + +import ( + "sync" + + "github.com/status-im/mvds/protobuf" + "github.com/status-im/mvds/state" +) + +type payloads struct { + sync.Mutex + + payloads map[state.GroupID]map[state.PeerID]protobuf.Payload +} +// @todo check in all the functions below that we aren't duplicating stuff + +func newPayloads() payloads { + return payloads{ + payloads: make(map[state.GroupID]map[state.PeerID]protobuf.Payload), + } +} + +func (p *payloads) AddOffers(group state.GroupID, peer state.PeerID, offers ...[]byte) { + p.Lock() + defer p.Unlock() + + payload := p.get(group, peer) + if payload.Offer == nil { + payload.Offer = &protobuf.Offer{Id: make([][]byte, 0)} + } + + payload.Offer.Id = append(payload.Offer.Id, offers...) + + p.set(group, peer, payload) +} + +func (p *payloads) AddAcks(group state.GroupID, peer state.PeerID, acks ...[]byte) { + p.Lock() + defer p.Unlock() + + payload := p.get(group, peer) + if payload.Ack == nil { + payload.Ack = &protobuf.Ack{Id: make([][]byte, 0)} + } + + payload.Ack.Id = append(payload.Ack.Id, acks...) + + p.set(group, peer, payload) +} + +func (p *payloads) AddRequests(group state.GroupID, peer state.PeerID, request ...[]byte) { + p.Lock() + defer p.Unlock() + + payload := p.get(group, peer) + if payload.Request == nil { + payload.Request = &protobuf.Request{Id: make([][]byte, 0)} + } + + payload.Request.Id = append(payload.Request.Id, request...) + + p.set(group, peer, payload) +} + +func (p *payloads) AddMessages(group state.GroupID, peer state.PeerID, messages ...*protobuf.Message) { + p.Lock() + defer p.Unlock() + + payload := p.get(group, peer) + if payload.Messages == nil { + payload.Messages = make([]*protobuf.Message, 0) + } + + payload.Messages = append(payload.Messages, messages...) + p.set(group, peer, payload) +} + +func (p *payloads) MapAndClear(f func(state.GroupID, state.PeerID, protobuf.Payload)) { + p.Lock() + defer p.Unlock() + + for g, payloads := range p.payloads { + for peer, payload := range payloads { + f(g, peer, payload) + } + } + + p.payloads = make(map[state.GroupID]map[state.PeerID]protobuf.Payload) +} + +func (p *payloads) get(id state.GroupID, peer state.PeerID) protobuf.Payload { + payload, _ := p.payloads[id][peer] + return payload +} + +func (p *payloads) set(id state.GroupID, peer state.PeerID, payload protobuf.Payload) { + _, ok := p.payloads[id] + if !ok { + p.payloads[id] = make(map[state.PeerID]protobuf.Payload) + } + + p.payloads[id][peer] = payload +} diff --git a/vendor/github.com/status-im/mvds/protobuf/sync.pb.go b/vendor/github.com/status-im/mvds/protobuf/sync.pb.go new file mode 100644 index 0000000..16e717b --- /dev/null +++ b/vendor/github.com/status-im/mvds/protobuf/sync.pb.go @@ -0,0 +1,287 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: protobuf/sync.proto + +package protobuf + +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +type Payload struct { + Ack *Ack `protobuf:"bytes,1,opt,name=ack,proto3" json:"ack,omitempty"` + Offer *Offer `protobuf:"bytes,2,opt,name=offer,proto3" json:"offer,omitempty"` + Request *Request `protobuf:"bytes,3,opt,name=request,proto3" json:"request,omitempty"` + Messages []*Message `protobuf:"bytes,4,rep,name=messages,proto3" json:"messages,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Payload) Reset() { *m = Payload{} } +func (m *Payload) String() string { return proto.CompactTextString(m) } +func (*Payload) ProtoMessage() {} +func (*Payload) Descriptor() ([]byte, []int) { + return fileDescriptor_2dca527c092c79d7, []int{0} +} + +func (m *Payload) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Payload.Unmarshal(m, b) +} +func (m *Payload) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Payload.Marshal(b, m, deterministic) +} +func (m *Payload) XXX_Merge(src proto.Message) { + xxx_messageInfo_Payload.Merge(m, src) +} +func (m *Payload) XXX_Size() int { + return xxx_messageInfo_Payload.Size(m) +} +func (m *Payload) XXX_DiscardUnknown() { + xxx_messageInfo_Payload.DiscardUnknown(m) +} + +var xxx_messageInfo_Payload proto.InternalMessageInfo + +func (m *Payload) GetAck() *Ack { + if m != nil { + return m.Ack + } + return nil +} + +func (m *Payload) GetOffer() *Offer { + if m != nil { + return m.Offer + } + return nil +} + +func (m *Payload) GetRequest() *Request { + if m != nil { + return m.Request + } + return nil +} + +func (m *Payload) GetMessages() []*Message { + if m != nil { + return m.Messages + } + return nil +} + +type Ack struct { + Id [][]byte `protobuf:"bytes,1,rep,name=id,proto3" json:"id,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Ack) Reset() { *m = Ack{} } +func (m *Ack) String() string { return proto.CompactTextString(m) } +func (*Ack) ProtoMessage() {} +func (*Ack) Descriptor() ([]byte, []int) { + return fileDescriptor_2dca527c092c79d7, []int{1} +} + +func (m *Ack) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Ack.Unmarshal(m, b) +} +func (m *Ack) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Ack.Marshal(b, m, deterministic) +} +func (m *Ack) XXX_Merge(src proto.Message) { + xxx_messageInfo_Ack.Merge(m, src) +} +func (m *Ack) XXX_Size() int { + return xxx_messageInfo_Ack.Size(m) +} +func (m *Ack) XXX_DiscardUnknown() { + xxx_messageInfo_Ack.DiscardUnknown(m) +} + +var xxx_messageInfo_Ack proto.InternalMessageInfo + +func (m *Ack) GetId() [][]byte { + if m != nil { + return m.Id + } + return nil +} + +type Message struct { + GroupId []byte `protobuf:"bytes,1,opt,name=group_id,json=groupId,proto3" json:"group_id,omitempty"` + Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + Body []byte `protobuf:"bytes,3,opt,name=body,proto3" json:"body,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Message) Reset() { *m = Message{} } +func (m *Message) String() string { return proto.CompactTextString(m) } +func (*Message) ProtoMessage() {} +func (*Message) Descriptor() ([]byte, []int) { + return fileDescriptor_2dca527c092c79d7, []int{2} +} + +func (m *Message) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Message.Unmarshal(m, b) +} +func (m *Message) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Message.Marshal(b, m, deterministic) +} +func (m *Message) XXX_Merge(src proto.Message) { + xxx_messageInfo_Message.Merge(m, src) +} +func (m *Message) XXX_Size() int { + return xxx_messageInfo_Message.Size(m) +} +func (m *Message) XXX_DiscardUnknown() { + xxx_messageInfo_Message.DiscardUnknown(m) +} + +var xxx_messageInfo_Message proto.InternalMessageInfo + +func (m *Message) GetGroupId() []byte { + if m != nil { + return m.GroupId + } + return nil +} + +func (m *Message) GetTimestamp() int64 { + if m != nil { + return m.Timestamp + } + return 0 +} + +func (m *Message) GetBody() []byte { + if m != nil { + return m.Body + } + return nil +} + +type Offer struct { + Id [][]byte `protobuf:"bytes,1,rep,name=id,proto3" json:"id,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Offer) Reset() { *m = Offer{} } +func (m *Offer) String() string { return proto.CompactTextString(m) } +func (*Offer) ProtoMessage() {} +func (*Offer) Descriptor() ([]byte, []int) { + return fileDescriptor_2dca527c092c79d7, []int{3} +} + +func (m *Offer) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Offer.Unmarshal(m, b) +} +func (m *Offer) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Offer.Marshal(b, m, deterministic) +} +func (m *Offer) XXX_Merge(src proto.Message) { + xxx_messageInfo_Offer.Merge(m, src) +} +func (m *Offer) XXX_Size() int { + return xxx_messageInfo_Offer.Size(m) +} +func (m *Offer) XXX_DiscardUnknown() { + xxx_messageInfo_Offer.DiscardUnknown(m) +} + +var xxx_messageInfo_Offer proto.InternalMessageInfo + +func (m *Offer) GetId() [][]byte { + if m != nil { + return m.Id + } + return nil +} + +type Request struct { + Id [][]byte `protobuf:"bytes,1,rep,name=id,proto3" json:"id,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Request) Reset() { *m = Request{} } +func (m *Request) String() string { return proto.CompactTextString(m) } +func (*Request) ProtoMessage() {} +func (*Request) Descriptor() ([]byte, []int) { + return fileDescriptor_2dca527c092c79d7, []int{4} +} + +func (m *Request) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Request.Unmarshal(m, b) +} +func (m *Request) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Request.Marshal(b, m, deterministic) +} +func (m *Request) XXX_Merge(src proto.Message) { + xxx_messageInfo_Request.Merge(m, src) +} +func (m *Request) XXX_Size() int { + return xxx_messageInfo_Request.Size(m) +} +func (m *Request) XXX_DiscardUnknown() { + xxx_messageInfo_Request.DiscardUnknown(m) +} + +var xxx_messageInfo_Request proto.InternalMessageInfo + +func (m *Request) GetId() [][]byte { + if m != nil { + return m.Id + } + return nil +} + +func init() { + proto.RegisterType((*Payload)(nil), "mvds.Payload") + proto.RegisterType((*Ack)(nil), "mvds.Ack") + proto.RegisterType((*Message)(nil), "mvds.Message") + proto.RegisterType((*Offer)(nil), "mvds.Offer") + proto.RegisterType((*Request)(nil), "mvds.Request") +} + +func init() { proto.RegisterFile("protobuf/sync.proto", fileDescriptor_2dca527c092c79d7) } + +var fileDescriptor_2dca527c092c79d7 = []byte{ + // 258 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x64, 0x90, 0x41, 0x4f, 0xb4, 0x30, + 0x10, 0x86, 0x03, 0x65, 0x3f, 0xd8, 0x59, 0x3e, 0x0f, 0x63, 0x8c, 0xdd, 0xe8, 0x01, 0xb9, 0x88, + 0x17, 0x4c, 0xf4, 0x17, 0xac, 0x37, 0x0f, 0x46, 0xd3, 0x83, 0x07, 0x2f, 0xa6, 0xd0, 0xb2, 0x21, + 0x88, 0x45, 0x0a, 0x26, 0xfc, 0x18, 0xff, 0xab, 0x61, 0xba, 0xeb, 0x26, 0x7a, 0x9b, 0x79, 0x9f, + 0x27, 0xe9, 0xbc, 0x85, 0xe3, 0xae, 0x37, 0x83, 0x29, 0xc6, 0xea, 0xda, 0x4e, 0xef, 0x65, 0x4e, + 0x1b, 0x06, 0xed, 0xa7, 0xb2, 0xe9, 0x97, 0x07, 0xe1, 0x93, 0x9c, 0xde, 0x8c, 0x54, 0x78, 0x06, + 0x4c, 0x96, 0x0d, 0xf7, 0x12, 0x2f, 0x5b, 0xdd, 0x2c, 0xf3, 0x99, 0xe7, 0x9b, 0xb2, 0x11, 0x73, + 0x8a, 0x17, 0xb0, 0x30, 0x55, 0xa5, 0x7b, 0xee, 0x13, 0x5e, 0x39, 0xfc, 0x38, 0x47, 0xc2, 0x11, + 0xbc, 0x84, 0xb0, 0xd7, 0x1f, 0xa3, 0xb6, 0x03, 0x67, 0x24, 0xfd, 0x77, 0x92, 0x70, 0xa1, 0xd8, + 0x53, 0xbc, 0x82, 0xa8, 0xd5, 0xd6, 0xca, 0xad, 0xb6, 0x3c, 0x48, 0xd8, 0xc1, 0x7c, 0x70, 0xa9, + 0xf8, 0xc1, 0xe9, 0x09, 0xb0, 0x4d, 0xd9, 0xe0, 0x11, 0xf8, 0xb5, 0xe2, 0x5e, 0xc2, 0xb2, 0x58, + 0xf8, 0xb5, 0x4a, 0x9f, 0x21, 0xdc, 0xb9, 0xb8, 0x86, 0x68, 0xdb, 0x9b, 0xb1, 0x7b, 0x25, 0xc1, + 0xcb, 0x62, 0x11, 0xd2, 0x7e, 0xaf, 0xf0, 0x1c, 0x96, 0x43, 0xdd, 0x6a, 0x3b, 0xc8, 0xb6, 0xa3, + 0xbb, 0x99, 0x38, 0x04, 0x88, 0x10, 0x14, 0x46, 0x4d, 0x74, 0x6b, 0x2c, 0x68, 0x4e, 0x4f, 0x61, + 0x41, 0x95, 0xfe, 0x3c, 0xb8, 0x86, 0x70, 0x57, 0xe3, 0x37, 0xba, 0x83, 0x97, 0x68, 0xff, 0xbf, + 0xc5, 0x3f, 0x9a, 0x6e, 0xbf, 0x03, 0x00, 0x00, 0xff, 0xff, 0xd9, 0xf0, 0x22, 0x6e, 0x72, 0x01, + 0x00, 0x00, +} diff --git a/vendor/github.com/status-im/mvds/protobuf/sync.proto b/vendor/github.com/status-im/mvds/protobuf/sync.proto new file mode 100644 index 0000000..bbc48e9 --- /dev/null +++ b/vendor/github.com/status-im/mvds/protobuf/sync.proto @@ -0,0 +1,29 @@ +syntax = "proto3"; + +package mvds; +option go_package = "protobuf"; + +message Payload { + Ack ack = 1; + Offer offer = 2; + Request request = 3; + repeated Message messages = 4; +} + +message Ack { + repeated bytes id = 1; +} + +message Message { + bytes group_id = 1; + int64 timestamp = 2; + bytes body = 3; +} + +message Offer { + repeated bytes id = 1; +} + +message Request { + repeated bytes id = 1; +} diff --git a/vendor/github.com/status-im/mvds/state/peerid.go b/vendor/github.com/status-im/mvds/state/peerid.go new file mode 100644 index 0000000..d6cb837 --- /dev/null +++ b/vendor/github.com/status-im/mvds/state/peerid.go @@ -0,0 +1,3 @@ +package state + +type PeerID [65]byte diff --git a/vendor/github.com/status-im/mvds/state/state.go b/vendor/github.com/status-im/mvds/state/state.go new file mode 100644 index 0000000..2b84f97 --- /dev/null +++ b/vendor/github.com/status-im/mvds/state/state.go @@ -0,0 +1,14 @@ +// Package state contains everything related to the synchronization state for MVDS. +package state + +type State struct { + SendCount uint64 + SendEpoch int64 +} + +type SyncState interface { + Get(group GroupID, id MessageID, peer PeerID) (State, error) + Set(group GroupID, id MessageID, peer PeerID, newState State) error + Remove(group GroupID, id MessageID, peer PeerID) error + Map(epoch int64, process func(GroupID, MessageID, PeerID, State) State) error +} diff --git a/vendor/github.com/status-im/mvds/state/state_memory.go b/vendor/github.com/status-im/mvds/state/state_memory.go new file mode 100644 index 0000000..5045560 --- /dev/null +++ b/vendor/github.com/status-im/mvds/state/state_memory.go @@ -0,0 +1,68 @@ +package state + +import ( + "sync" +) + +type memorySyncState struct { + sync.Mutex + + state map[GroupID]map[MessageID]map[PeerID]State +} + +func NewSyncState() *memorySyncState { + return &memorySyncState{ + state: make(map[GroupID]map[MessageID]map[PeerID]State), + } +} + +func (s *memorySyncState) Get(group GroupID, id MessageID, peer PeerID) (State, error) { + s.Lock() + defer s.Unlock() + + state, _ := s.state[group][id][peer] + return state, nil +} + +func (s *memorySyncState) Set(group GroupID, id MessageID, peer PeerID, newState State) error { + s.Lock() + defer s.Unlock() + + if _, ok := s.state[group]; !ok { + s.state[group] = make(map[MessageID]map[PeerID]State) + } + + if _, ok := s.state[group][id]; !ok { + s.state[group][id] = make(map[PeerID]State) + } + + s.state[group][id][peer] = newState + return nil +} + +func (s *memorySyncState) Remove(group GroupID, id MessageID, peer PeerID) error { + s.Lock() + defer s.Unlock() + + delete(s.state[group][id], peer) + return nil +} + +func (s *memorySyncState) Map(epoch int64, process func(GroupID, MessageID, PeerID, State) State) error { + s.Lock() + defer s.Unlock() + + for group, syncstate := range s.state { + for id, peers := range syncstate { + for peer, state := range peers { + if state.SendEpoch < epoch { + continue + } + + s.state[group][id][peer] = process(group, id, peer, state) + } + } + } + + return nil +} diff --git a/vendor/github.com/status-im/mvds/state/sync_messageid.go b/vendor/github.com/status-im/mvds/state/sync_messageid.go new file mode 100644 index 0000000..64c1516 --- /dev/null +++ b/vendor/github.com/status-im/mvds/state/sync_messageid.go @@ -0,0 +1,23 @@ +package state + +import ( + "crypto/sha256" + "encoding/binary" + + "github.com/status-im/mvds/protobuf" +) + +type MessageID [32]byte +type GroupID [32]byte + +// ID creates the MessageID for a Message +func ID(m protobuf.Message) MessageID { + t := make([]byte, 8) + binary.LittleEndian.PutUint64(t, uint64(m.Timestamp)) + + b := append([]byte("MESSAGE_ID"), m.GroupId[:]...) + b = append(b, t...) + b = append(b, m.Body...) + + return sha256.Sum256(b) +} diff --git a/vendor/github.com/status-im/mvds/store/messagestore.go b/vendor/github.com/status-im/mvds/store/messagestore.go new file mode 100644 index 0000000..4976bb6 --- /dev/null +++ b/vendor/github.com/status-im/mvds/store/messagestore.go @@ -0,0 +1,13 @@ +// Package store contains everything storage related for MVDS. +package store + +import ( + "github.com/status-im/mvds/protobuf" + "github.com/status-im/mvds/state" +) + +type MessageStore interface { + Has(id state.MessageID) bool + Get(id state.MessageID) (protobuf.Message, error) + Add(message protobuf.Message) error +} diff --git a/vendor/github.com/status-im/mvds/store/messagestore_dummy.go b/vendor/github.com/status-im/mvds/store/messagestore_dummy.go new file mode 100644 index 0000000..e6ec962 --- /dev/null +++ b/vendor/github.com/status-im/mvds/store/messagestore_dummy.go @@ -0,0 +1,44 @@ +package store + +import ( + "errors" + "sync" + + "github.com/status-im/mvds/protobuf" + "github.com/status-im/mvds/state" +) + +type DummyStore struct { + sync.Mutex + ms map[state.MessageID]protobuf.Message +} + +func NewDummyStore() DummyStore { + return DummyStore{ms: make(map[state.MessageID]protobuf.Message)} +} + +func (ds *DummyStore) Has(id state.MessageID) bool { + ds.Lock() + defer ds.Unlock() + + _, ok := ds.ms[id]; return ok +} + +func (ds *DummyStore) Get(id state.MessageID) (protobuf.Message, error) { + ds.Lock() + defer ds.Unlock() + + m, ok := ds.ms[id] + if !ok { + return protobuf.Message{}, errors.New("message does not exist") + } + + return m, nil +} + +func (ds *DummyStore) Add(message protobuf.Message) error { + ds.Lock() + defer ds.Unlock() + ds.ms[state.ID(message)] = message + return nil +} diff --git a/vendor/github.com/status-im/mvds/transport/channel_transport.go b/vendor/github.com/status-im/mvds/transport/channel_transport.go new file mode 100644 index 0000000..c78642f --- /dev/null +++ b/vendor/github.com/status-im/mvds/transport/channel_transport.go @@ -0,0 +1,54 @@ +package transport + +import ( + "errors" + math "math/rand" + "sync" + "time" + + "github.com/status-im/mvds/protobuf" + "github.com/status-im/mvds/state" +) + +// ChannelTransport implements a basic MVDS transport using channels for basic testing purposes. +type ChannelTransport struct { + sync.Mutex + + offline int + + in <-chan Packet + out map[state.PeerID]chan<- Packet +} + +func NewChannelTransport(offline int, in <-chan Packet) *ChannelTransport { + return &ChannelTransport{ + offline: offline, + in: in, + out: make(map[state.PeerID]chan<- Packet), + } +} + +func (t *ChannelTransport) AddOutput(id state.PeerID, c chan<-Packet) { + t.out[id] = c +} + +func (t *ChannelTransport) Watch() Packet { + return <-t.in +} + +func (t *ChannelTransport) Send(group state.GroupID, sender state.PeerID, peer state.PeerID, payload protobuf.Payload) error { + // @todo we can do this better, we put node onlineness into a goroutine where we just stop the nodes for x seconds + // outside of this class + math.Seed(time.Now().UnixNano()) + if math.Intn(100) < t.offline { + return nil + } + + c, ok := t.out[peer] + if !ok { + return errors.New("peer unknown") + } + + c <- Packet{Group: group, Sender: sender, Payload: payload} + return nil +} \ No newline at end of file diff --git a/vendor/github.com/status-im/mvds/transport/transport.go b/vendor/github.com/status-im/mvds/transport/transport.go new file mode 100644 index 0000000..c369ede --- /dev/null +++ b/vendor/github.com/status-im/mvds/transport/transport.go @@ -0,0 +1,19 @@ +// Package transport contains transport related logic for MVDS. +package transport + +import ( + "github.com/status-im/mvds/protobuf" + "github.com/status-im/mvds/state" +) + +type Packet struct { + Group state.GroupID + Sender state.PeerID + Payload protobuf.Payload +} + +// Transport defines an interface allowing for agnostic transport implementations. +type Transport interface { + Watch() Packet + Send(group state.GroupID, sender state.PeerID, peer state.PeerID, payload protobuf.Payload) error +} diff --git a/vendor/modules.txt b/vendor/modules.txt index e0fb9fa..58ae026 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -5,12 +5,12 @@ github.com/allegro/bigcache/queue github.com/aristanetworks/goarista/monotime # github.com/beevik/ntp v0.2.0 github.com/beevik/ntp -# github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32 +# github.com/btcsuite/btcd v0.0.0-20190427004231-96897255fd17 github.com/btcsuite/btcd/btcec github.com/btcsuite/btcd/chaincfg github.com/btcsuite/btcd/chaincfg/chainhash github.com/btcsuite/btcd/wire -# github.com/btcsuite/btcutil v0.0.0-20190207003914-4c204d697803 +# github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d github.com/btcsuite/btcutil github.com/btcsuite/btcutil/base58 github.com/btcsuite/btcutil/bech32 @@ -320,6 +320,12 @@ github.com/status-im/migrate/database/postgres github.com/status-im/migrate/v4 github.com/status-im/migrate/v4/database/sqlcipher github.com/status-im/migrate/v4/source/go_bindata +# github.com/status-im/mvds v0.0.14 +github.com/status-im/mvds/node +github.com/status-im/mvds/state +github.com/status-im/mvds/store +github.com/status-im/mvds/protobuf +github.com/status-im/mvds/transport # github.com/status-im/rendezvous v1.2.0 github.com/status-im/rendezvous github.com/status-im/rendezvous/protocol