mirror of https://github.com/vacp2p/research.git
Fetching previous message dependencies recursively, works surprisingly well
- Also pretty up info prints - Still re-downloads deps
This commit is contained in:
parent
a6f99e568d
commit
98392ebac7
|
@ -114,6 +114,60 @@ func newNode(port int) (*node.Node, error) {
|
||||||
return node.New(cfg)
|
return node.New(cfg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Have in-memory, but these only have json, not hash
|
||||||
|
|
||||||
|
// TODO: How do we know this
|
||||||
|
// XXX: We can probably cheat with in-memory only by downloading all messages and A is restarting
|
||||||
|
func seen(hash string) bool {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func fetchMessage(hash string) {
|
||||||
|
// download from swarm
|
||||||
|
httpClient := feedsapi.NewClient("http://localhost:9602") // XXX 9601
|
||||||
|
response, _, err := httpClient.DownloadRaw(hash)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("Unable to download raw", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
buf := new(bytes.Buffer)
|
||||||
|
buf.ReadFrom(response)
|
||||||
|
str := buf.String()
|
||||||
|
msg := deserialize(str)
|
||||||
|
|
||||||
|
//fmt.Println("***Download raw", str)
|
||||||
|
handleMessage(msg)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// XXX: Assumes no duplicates
|
||||||
|
func handleMessage(msg message) {
|
||||||
|
// Cheating
|
||||||
|
// XXX so much duplication eh
|
||||||
|
//httpClient := feedsapi.NewClient("http://localhost:9602") // XXX 9601
|
||||||
|
|
||||||
|
// How many cases are there?
|
||||||
|
// We have all the dependencies
|
||||||
|
// We need to fetch more dependencies
|
||||||
|
// We have seen it before
|
||||||
|
// We have not seen it before
|
||||||
|
|
||||||
|
|
||||||
|
// Interesting, this seems to unroll and deal with order automatically
|
||||||
|
parent0 := msg.Parents[0]
|
||||||
|
if (parent0 != "") && !seen(parent0) {
|
||||||
|
fmt.Printf("[Unmet dependency, downloading: %s]\n", parent0)
|
||||||
|
fetchMessage(parent0)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only print it if all dependencies are met
|
||||||
|
fmt.Println("Alice:", string(msg.Text))
|
||||||
|
// fmt.Println("Alice:", string(msg.Text), "- parent0:", string(msg.Parents[0]))
|
||||||
|
// fmt.Println("Feed result: ", msg.Text, "- parent0:", msg.Parents[0])
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
func listenForMessages(msgC chan pss.APIMsg) {
|
func listenForMessages(msgC chan pss.APIMsg) {
|
||||||
for {
|
for {
|
||||||
in := <-msgC
|
in := <-msgC
|
||||||
|
@ -126,8 +180,8 @@ func listenForMessages(msgC chan pss.APIMsg) {
|
||||||
//fmt.Println("Alice old:", string(in.Msg))
|
//fmt.Println("Alice old:", string(in.Msg))
|
||||||
// XXX Only one parent
|
// XXX Only one parent
|
||||||
// TODO: Get all the parents here
|
// TODO: Get all the parents here
|
||||||
fmt.Println("Alice:", string(parsed.Text), "- parent0:", string(parsed.Parents[0]))
|
|
||||||
|
|
||||||
|
handleMessage(parsed)
|
||||||
//fmt.Println("\nReceived message", string(in.Msg), "from", fmt.Sprintf("%x", in.Key))
|
//fmt.Println("\nReceived message", string(in.Msg), "from", fmt.Sprintf("%x", in.Key))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -213,8 +267,7 @@ type message struct {
|
||||||
|
|
||||||
// XXX: Lame signature, should be may more compact
|
// XXX: Lame signature, should be may more compact
|
||||||
func runREPL(client *rpc.Client, signer *feed.GenericSigner, receiver string, topic string) {
|
func runREPL(client *rpc.Client, signer *feed.GenericSigner, receiver string, topic string) {
|
||||||
fmt.Println("I am Alice, and I am ready to send messages.")
|
fmt.Println("[I am Alice, and I am ready to send messages.]")
|
||||||
|
|
||||||
fmt.Printf("> ")
|
fmt.Printf("> ")
|
||||||
// Basic REPL functionality
|
// Basic REPL functionality
|
||||||
scanner := bufio.NewScanner(os.Stdin)
|
scanner := bufio.NewScanner(os.Stdin)
|
||||||
|
@ -293,29 +346,7 @@ func pullMessages() {
|
||||||
feedStr := buf.String()
|
feedStr := buf.String()
|
||||||
|
|
||||||
parsed := deserialize(feedStr)
|
parsed := deserialize(feedStr)
|
||||||
parent0 := parsed.Parents[0]
|
handleMessage(parsed)
|
||||||
|
|
||||||
if parent0 != "" {
|
|
||||||
// TODO: Keep track of seen message ids and do basic if
|
|
||||||
// Need unmetdep(hash) bool
|
|
||||||
fmt.Println("Message dependency! We might need to download", parent0)
|
|
||||||
|
|
||||||
response, _, err := httpClient.DownloadRaw(parent0)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Println("Unable to download raw", err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
// TODO: This should loop, so if there are more unmetdeps keep downloading
|
|
||||||
buf := new(bytes.Buffer)
|
|
||||||
buf.ReadFrom(response)
|
|
||||||
str := buf.String()
|
|
||||||
fmt.Println("***Download raw", str)
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
//fmt.Println("Feed result old: ", feedStr)
|
|
||||||
fmt.Println("Feed result: ", parsed.Text, "- parent0:", parsed.Parents[0])
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// XXX: This is so sloppy, passing privatekey around
|
// XXX: This is so sloppy, passing privatekey around
|
||||||
|
@ -524,15 +555,15 @@ func run(port int, privateKey *ecdsa.PrivateKey) {
|
||||||
// XXX: Only running REPL for Alice Sender for now
|
// XXX: Only running REPL for Alice Sender for now
|
||||||
runREPL(client, signer, receiver, topic)
|
runREPL(client, signer, receiver, topic)
|
||||||
} else if port == 9601 {
|
} else if port == 9601 {
|
||||||
fmt.Println("I am Bob, and I am ready to receive messages")
|
fmt.Println("[I am Bob, and I am ready to receive messages.]")
|
||||||
|
|
||||||
fmt.Println("First, let's see if we missed something while gone")
|
fmt.Println("[Syncing data...]")
|
||||||
pullMessages()
|
pullMessages()
|
||||||
fmt.Println("Alright, up to speed, let's listen in background")
|
fmt.Println("[Synced up, listening for new mesages.]")
|
||||||
go listenForMessages(msgC)
|
go listenForMessages(msgC)
|
||||||
mockPassiveREPL()
|
mockPassiveREPL()
|
||||||
} else if port == 9602 {
|
} else if port == 9602 {
|
||||||
fmt.Println("I am Charlie, I'm just chilling here")
|
fmt.Println("[I am Charlie, I'm just chilling here.]")
|
||||||
mockPassiveREPL()
|
mockPassiveREPL()
|
||||||
} else {
|
} else {
|
||||||
fmt.Println("*** I don't know who you are")
|
fmt.Println("*** I don't know who you are")
|
||||||
|
@ -634,8 +665,8 @@ func deserialize(strJSON string) message {
|
||||||
}
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
fmt.Printf("Hello PSS\n")
|
fmt.Println("[Hello PSS/Feeds!]")
|
||||||
fmt.Printf("Setting up node and connecting to the network...\n")
|
fmt.Println("[Setting up node and connecting to the network...]")
|
||||||
|
|
||||||
// XXX Lets use already running node because why not
|
// XXX Lets use already running node because why not
|
||||||
// TODO: Replace with 9600 once end to end
|
// TODO: Replace with 9600 once end to end
|
||||||
|
|
Loading…
Reference in New Issue