feature/on-message-receive (#1)

* started working on message recieving handling

* formatting

* updated

* minor refactor

* todo
This commit is contained in:
Dean Eigenmann 2019-07-16 01:22:23 +02:00 committed by GitHub
parent 23e616bc9b
commit 6f508d13fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 153 additions and 31 deletions

View File

@ -2,10 +2,14 @@
package client package client
import ( import (
"log"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/status-im/dasy/protobuf" "github.com/status-im/dasy/protobuf"
mvds "github.com/status-im/mvds/node" mvds "github.com/status-im/mvds/node"
mvdsproto "github.com/status-im/mvds/protobuf"
"github.com/status-im/mvds/state" "github.com/status-im/mvds/state"
"github.com/status-im/mvds/store"
) )
// Chat is the ID for a specific chat. // Chat is the ID for a specific chat.
@ -16,7 +20,8 @@ type Peer state.PeerID
// Client is the actual daisy client. // Client is the actual daisy client.
type Client struct { type Client struct {
node mvds.Node node mvds.Node
store store.MessageStore // @todo we probably need a different message store, not sure tho
lastMessage state.MessageID // @todo maybe make type lastMessage state.MessageID // @todo maybe make type
} }
@ -75,3 +80,31 @@ func (c *Client) send(chat Chat, t protobuf.Message_MessageType, body []byte) er
return nil return nil
} }
func (c *Client) onReceive(message mvdsproto.Message) {
var msg protobuf.Message
err := proto.Unmarshal(message.Body, &msg)
if err != nil {
log.Printf("error while unmarshalling message: %s", err.Error())
return
}
// @todo pump messages to subscriber channels
if len(msg.PreviousMessage) == 0 {
return
}
c.handlePreviousMessage(bytesToGroupID(message.GroupId), bytesToMessageID(msg.PreviousMessage))
}
func (c *Client) handlePreviousMessage(group state.GroupID, previousMessage state.MessageID) {
if c.store.Has(previousMessage) {
return
}
err := c.node.RequestMessage(group, previousMessage)
if err != nil {
log.Printf("error while requesting message: %s", err.Error())
}
}

15
client/util.go Normal file
View File

@ -0,0 +1,15 @@
package client
import "github.com/status-im/mvds/state"
func bytesToMessageID(b []byte) state.MessageID {
var id state.MessageID
copy(id[:], b)
return id
}
func bytesToGroupID(b []byte) state.GroupID {
var id state.GroupID
copy(id[:], b)
return id
}

20
go.mod
View File

@ -2,4 +2,22 @@ module github.com/status-im/dasy
go 1.12 go 1.12
require github.com/status-im/mvds v0.0.16 require (
github.com/btcsuite/btcd v0.0.0-20190629003639-c26ffa870fd8 // indirect
github.com/btcsuite/goleveldb v1.0.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/ethereum/go-ethereum v1.9.0 // indirect
github.com/golang/protobuf v1.3.2
github.com/jessevdk/go-flags v1.4.0 // indirect
github.com/kkdai/bstream v1.0.0 // indirect
github.com/kr/pty v1.1.8 // indirect
github.com/onsi/ginkgo v1.8.0 // indirect
github.com/onsi/gomega v1.5.0 // indirect
github.com/status-im/mvds v0.0.18
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 // indirect
golang.org/x/net v0.0.0-20190628185345-da137c7871d7 // indirect
golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7 // indirect
golang.org/x/text v0.3.2 // indirect
golang.org/x/tools v0.0.0-20190715221914-9b2cb0e5f602 // indirect
gopkg.in/yaml.v2 v2.2.2 // indirect
)

25
go.sum
View File

@ -1,46 +1,71 @@
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/btcsuite/btcd v0.0.0-20190427004231-96897255fd17/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI= github.com/btcsuite/btcd v0.0.0-20190427004231-96897255fd17/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI=
github.com/btcsuite/btcd v0.0.0-20190629003639-c26ffa870fd8/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI=
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA= github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA=
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg= github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg=
github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd/go.mod h1:HHNXQzUsZCxOoE+CPiyCTO6x34Zs86zZUiwtpXoGdtg= github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd/go.mod h1:HHNXQzUsZCxOoE+CPiyCTO6x34Zs86zZUiwtpXoGdtg=
github.com/btcsuite/goleveldb v0.0.0-20160330041536-7834afc9e8cd/go.mod h1:F+uVaaLLH7j4eDXPRvw78tMflu7Ie2bzYOH4Y8rRKBY= github.com/btcsuite/goleveldb v0.0.0-20160330041536-7834afc9e8cd/go.mod h1:F+uVaaLLH7j4eDXPRvw78tMflu7Ie2bzYOH4Y8rRKBY=
github.com/btcsuite/goleveldb v1.0.0/go.mod h1:QiK9vBlgftBg6rWQIj6wFzbPfRjiykIEhBH4obrXJ/I=
github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc= github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc=
github.com/btcsuite/snappy-go v1.0.0/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc=
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY= github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY=
github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs= github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/ethereum/go-ethereum v1.8.27/go.mod h1:PwpWDrCLZrV+tfrhqqF6kPknbISMHaJv9Ln3kPCZLwY= github.com/ethereum/go-ethereum v1.8.27/go.mod h1:PwpWDrCLZrV+tfrhqqF6kPknbISMHaJv9Ln3kPCZLwY=
github.com/ethereum/go-ethereum v1.9.0/go.mod h1:PwpWDrCLZrV+tfrhqqF6kPknbISMHaJv9Ln3kPCZLwY=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ= github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ=
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4= github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4=
github.com/kkdai/bstream v1.0.0/go.mod h1:FDnDOHt5Yx4p3FaHcioFT0QjDOtgUpvjeZqAs+NVZZA=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.1/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/status-im/mvds v0.0.16 h1:09VCUTgETAJnxYW81rtGTh0vJS7yBLUOMRpuOXUucGQ= github.com/status-im/mvds v0.0.16 h1:09VCUTgETAJnxYW81rtGTh0vJS7yBLUOMRpuOXUucGQ=
github.com/status-im/mvds v0.0.16/go.mod h1:cgNgcRbLwE4TMPMSax7xl+Ca/Q8rIklZLDwAD6iBKjo= github.com/status-im/mvds v0.0.16/go.mod h1:cgNgcRbLwE4TMPMSax7xl+Ca/Q8rIklZLDwAD6iBKjo=
github.com/status-im/mvds v0.0.18 h1:GkSbkTmRCAqnBhCSaeeghENlS5CICEPKpfmI3dkUgNg=
github.com/status-im/mvds v0.0.18/go.mod h1:cgNgcRbLwE4TMPMSax7xl+Ca/Q8rIklZLDwAD6iBKjo=
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/net v0.0.0-20180719180050-a680a1efc54d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190525145741-7be61e1b0e51/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20190525145741-7be61e1b0e51/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/tools v0.0.0-20190715221914-9b2cb0e5f602/go.mod h1:jcCCGcm9btYwXyDqrUWc6MKQKKGJCWEQ3AfLSRIbEuI=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

View File

@ -38,7 +38,6 @@ package proto
import ( import (
"fmt" "fmt"
"log" "log"
"os"
"reflect" "reflect"
"sort" "sort"
"strconv" "strconv"
@ -194,7 +193,7 @@ func (p *Properties) Parse(s string) {
// "bytes,49,opt,name=foo,def=hello!" // "bytes,49,opt,name=foo,def=hello!"
fields := strings.Split(s, ",") // breaks def=, but handled below. fields := strings.Split(s, ",") // breaks def=, but handled below.
if len(fields) < 2 { if len(fields) < 2 {
fmt.Fprintf(os.Stderr, "proto: tag has too few fields: %q\n", s) log.Printf("proto: tag has too few fields: %q", s)
return return
} }
@ -214,7 +213,7 @@ func (p *Properties) Parse(s string) {
p.WireType = WireBytes p.WireType = WireBytes
// no numeric converter for non-numeric types // no numeric converter for non-numeric types
default: default:
fmt.Fprintf(os.Stderr, "proto: tag has unknown wire type: %q\n", s) log.Printf("proto: tag has unknown wire type: %q", s)
return return
} }

View File

@ -37,7 +37,7 @@ type Node struct {
syncState state.SyncState syncState state.SyncState
peers map[state.GroupID][]state.PeerID peers map[state.GroupID][]state.PeerID
payloads payloads payloads payloads
@ -47,6 +47,8 @@ type Node struct {
epoch int64 epoch int64
mode Mode mode Mode
subscription chan<- protobuf.Message
} }
// NewNode returns a new node. // NewNode returns a new node.
@ -62,17 +64,17 @@ func NewNode(
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
return &Node{ return &Node{
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
store: ms, store: ms,
transport: st, transport: st,
syncState: ss, syncState: ss,
peers: make(map[state.GroupID][]state.PeerID), peers: make(map[state.GroupID][]state.PeerID),
payloads: newPayloads(), payloads: newPayloads(),
nextEpoch: nextEpoch, nextEpoch: nextEpoch,
ID: id, ID: id,
epoch: currentEpoch, epoch: currentEpoch,
mode: mode, mode: mode,
} }
} }
@ -113,6 +115,11 @@ func (n *Node) Stop() {
n.cancel() n.cancel()
} }
// Subscribe subscribes to incoming messages.
func (n *Node) Subscribe(sub chan <-protobuf.Message) {
n.subscription = sub
}
// AppendMessage sends a message to a given group. // AppendMessage sends a message to a given group.
func (n *Node) AppendMessage(group state.GroupID, data []byte) (state.MessageID, error) { func (n *Node) AppendMessage(group state.GroupID, data []byte) (state.MessageID, error) {
m := protobuf.Message{ m := protobuf.Message{
@ -121,7 +128,7 @@ func (n *Node) AppendMessage(group state.GroupID, data []byte) (state.MessageID,
Body: data, Body: data,
} }
id := state.ID(m) id := m.ID()
peers, ok := n.peers[group] peers, ok := n.peers[group]
if !ok { if !ok {
@ -154,6 +161,26 @@ func (n *Node) AppendMessage(group state.GroupID, data []byte) (state.MessageID,
return id, nil return id, nil
} }
// RequestMessage adds a REQUEST record to the next payload for a given message ID.
func (n *Node) RequestMessage(group state.GroupID, id state.MessageID) error {
peers, ok := n.peers[group]
if !ok {
return fmt.Errorf("trying to request from an unknown group %x", group[:4])
}
go func() {
for _, p := range peers {
if !n.IsPeerInGroup(group, p) {
continue
}
n.insertSyncState(group, id, p, state.REQUEST)
}
}()
return nil
}
// AddPeer adds a peer to a specific group making it a recipient of messages. // AddPeer adds a peer to a specific group making it a recipient of messages.
func (n *Node) AddPeer(group state.GroupID, id state.PeerID) { func (n *Node) AddPeer(group state.GroupID, id state.PeerID) {
if _, ok := n.peers[group]; !ok { if _, ok := n.peers[group]; !ok {
@ -278,7 +305,7 @@ func (n *Node) onMessages(group state.GroupID, sender state.PeerID, messages []*
continue continue
} }
id := state.ID(*m) id := m.ID()
log.Printf("[%x] sending ACK (%x -> %x): %x\n", group[:4], n.ID[:4], sender[:4], id[:4]) log.Printf("[%x] sending ACK (%x -> %x): %x\n", group[:4], n.ID[:4], sender[:4], id[:4])
a = append(a, id[:]) a = append(a, id[:])
} }
@ -287,7 +314,7 @@ func (n *Node) onMessages(group state.GroupID, sender state.PeerID, messages []*
} }
func (n *Node) onMessage(group state.GroupID, sender state.PeerID, msg protobuf.Message) error { func (n *Node) onMessage(group state.GroupID, sender state.PeerID, msg protobuf.Message) error {
id := state.ID(msg) id := msg.ID()
log.Printf("[%x] MESSAGE (%x -> %x): %x received.\n", group[:4], sender[:4], n.ID[:4], id[:4]) log.Printf("[%x] MESSAGE (%x -> %x): %x received.\n", group[:4], sender[:4], n.ID[:4], id[:4])
err := n.syncState.Remove(group, id, sender) err := n.syncState.Remove(group, id, sender)
@ -305,6 +332,10 @@ func (n *Node) onMessage(group state.GroupID, sender state.PeerID, msg protobuf.
} }
}() }()
if n.subscription != nil {
n.subscription <- msg
}
err = n.store.Add(msg) err = n.store.Add(msg)
if err != nil { if err != nil {
return err return err
@ -316,7 +347,7 @@ func (n *Node) onMessage(group state.GroupID, sender state.PeerID, msg protobuf.
func (n *Node) insertSyncState(group state.GroupID, id state.MessageID, p state.PeerID, t state.RecordType) { func (n *Node) insertSyncState(group state.GroupID, id state.MessageID, p state.PeerID, t state.RecordType) {
s := state.State{ s := state.State{
Type: t, Type: t,
SendEpoch: n.epoch + 1, SendEpoch: n.epoch + 1,
} }

View File

@ -1,17 +1,14 @@
package state package protobuf
import ( import (
"crypto/sha256" "crypto/sha256"
"encoding/binary" "encoding/binary"
"github.com/status-im/mvds/protobuf" "github.com/status-im/mvds/state"
) )
type MessageID [32]byte
type GroupID [32]byte
// ID creates the MessageID for a Message // ID creates the MessageID for a Message
func ID(m protobuf.Message) MessageID { func (m Message) ID() state.MessageID {
t := make([]byte, 8) t := make([]byte, 8)
binary.LittleEndian.PutUint64(t, uint64(m.Timestamp)) binary.LittleEndian.PutUint64(t, uint64(m.Timestamp))

4
vendor/github.com/status-im/mvds/state/sync_types.go generated vendored Normal file
View File

@ -0,0 +1,4 @@
package state
type MessageID [32]byte
type GroupID [32]byte

View File

@ -39,6 +39,6 @@ func (ds *DummyStore) Get(id state.MessageID) (protobuf.Message, error) {
func (ds *DummyStore) Add(message protobuf.Message) error { func (ds *DummyStore) Add(message protobuf.Message) error {
ds.Lock() ds.Lock()
defer ds.Unlock() defer ds.Unlock()
ds.ms[state.ID(message)] = message ds.ms[message.ID()] = message
return nil return nil
} }

6
vendor/modules.txt vendored
View File

@ -1,8 +1,8 @@
# github.com/golang/protobuf v1.3.1 # github.com/golang/protobuf v1.3.2
github.com/golang/protobuf/proto github.com/golang/protobuf/proto
# github.com/status-im/mvds v0.0.16 # github.com/status-im/mvds v0.0.18
github.com/status-im/mvds/node github.com/status-im/mvds/node
github.com/status-im/mvds/state
github.com/status-im/mvds/protobuf github.com/status-im/mvds/protobuf
github.com/status-im/mvds/state
github.com/status-im/mvds/store github.com/status-im/mvds/store
github.com/status-im/mvds/transport github.com/status-im/mvds/transport