From 064b5cb038d7a7909c5923e4007e263826433a61 Mon Sep 17 00:00:00 2001 From: Danny van Kooten Date: Wed, 8 Aug 2018 09:43:06 +0200 Subject: [PATCH] re-use buffer (to get rid of allocation) & separate inserts vs. updates early on in collect handler. --- pkg/api/collect.go | 287 ++++++++++++++++++++++++--------------------- pkg/api/routes.go | 5 +- 2 files changed, 158 insertions(+), 134 deletions(-) diff --git a/pkg/api/collect.go b/pkg/api/collect.go index c34bc57..ea787d8 100644 --- a/pkg/api/collect.go +++ b/pkg/api/collect.go @@ -2,18 +2,172 @@ package api import ( "encoding/base64" - log "github.com/sirupsen/logrus" "net/http" "net/url" "strings" "time" + log "github.com/sirupsen/logrus" + "github.com/mssola/user_agent" "github.com/usefathom/fathom/pkg/aggregator" "github.com/usefathom/fathom/pkg/datastore" "github.com/usefathom/fathom/pkg/models" ) +type Collector struct { + Store datastore.Datastore + Pageviews chan *models.Pageview + + // buffer vars + updates []*models.Pageview + inserts []*models.Pageview + sizeu int + sizei int +} + +func NewCollector(store datastore.Datastore) *Collector { + bufferCap := 512 // persist every 512 pageviews + bufferTimeout := 600 * time.Millisecond // or every 600ms + + c := &Collector{ + Store: store, + Pageviews: make(chan *models.Pageview), + updates: make([]*models.Pageview, bufferCap), + inserts: make([]*models.Pageview, bufferCap), + sizeu: 0, + sizei: 0, + } + go c.aggregate() + go c.worker(bufferCap, bufferTimeout) + return c +} + +func (c *Collector) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if !shouldCollect(r) { + return + } + + q := r.URL.Query() + now := time.Now() + + // get pageview details + pageview := &models.Pageview{ + ID: q.Get("id"), + Hostname: parseHostname(q.Get("h")), + Pathname: parsePathname(q.Get("p")), + IsNewVisitor: q.Get("nv") == "1", + IsNewSession: q.Get("ns") == "1", + IsUnique: q.Get("u") == "1", + IsBounce: q.Get("b") != "0", + Referrer: parseReferrer(q.Get("r")), + Duration: 0, + Timestamp: now, + } + + // push pageview onto channel to be inserted (in batch) later + c.Pageviews <- pageview + + // indicate that we're not tracking user data, see https://github.com/usefathom/fathom/issues/65 + w.Header().Set("Tk", "N") + + // headers to prevent caching + w.Header().Set("Content-Type", "image/gif") + w.Header().Set("Expires", "Mon, 01 Jan 1990 00:00:00 GMT") + w.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate") + w.Header().Set("Pragma", "no-cache") + + // response, 1x1 px transparent GIF + w.WriteHeader(http.StatusOK) + b, _ := base64.StdEncoding.DecodeString("R0lGODlhAQABAIAAAAAAAP///yH5BAEAAAAALAAAAAABAAEAAAIBRAA7") + w.Write(b) + + // find previous pageview by same visitor + previousPageviewID := q.Get("pid") + if !pageview.IsNewSession && previousPageviewID != "" { + previousPageview, err := c.Store.GetPageview(previousPageviewID) + if err != nil && err != datastore.ErrNoResults { + log.Errorf("error getting previous pageview: %s", err) + return + } + + // if we have a recent pageview that is less than 30 minutes old + if previousPageview != nil && previousPageview.Timestamp.After(now.Add(-30*time.Minute)) { + previousPageview.Duration = (now.Unix() - previousPageview.Timestamp.Unix()) + previousPageview.IsBounce = false + + // push onto channel to be updated (in batch) later + c.Pageviews <- previousPageview + } + } +} + +func (c *Collector) aggregate() { + agg := aggregator.New(c.Store) + timeout := 1 * time.Minute + + agg.Run() + + for { + select { + case <-time.After(timeout): + agg.Run() + } + } +} + +func (c *Collector) worker(cap int, timeout time.Duration) { + var size int + + for { + select { + // persist pageviews in buffer when buffer at capacity + case p := <-c.Pageviews: + size = c.buffer(p) + if size >= cap { + c.persist() + } + + // or after timeout passed + case <-time.After(timeout): + c.persist() + } + } +} + +func (c *Collector) buffer(p *models.Pageview) int { + // a bounce is always an insert + if p.IsBounce { + c.inserts[c.sizei] = p + c.sizei++ + } else { + c.updates[c.sizeu] = p + c.sizeu++ + } + + return (c.sizeu + c.sizei) +} + +func (c *Collector) persist() { + if (c.sizeu + c.sizei) == 0 { + return + } + + log.Debugf("persisting %d pageviews (%d inserts, %d updates)", (c.sizeu + c.sizei), c.sizei, c.sizeu) + + if err := c.Store.InsertPageviews(c.inserts[0:c.sizei]); err != nil { + log.Errorf("error inserting pageviews: %s", err) + } + + if err := c.Store.UpdatePageviews(c.updates[0:c.sizeu]); err != nil { + log.Errorf("error updating pageviews: %s", err) + } + + // reset buffer + c.sizei = 0 + c.sizeu = 0 +} + func shouldCollect(r *http.Request) bool { // abort if DNT header is set to "1" (these should have been filtered client-side already) if r.Header.Get("DNT") == "1" { @@ -69,134 +223,3 @@ func parseHostname(r string) string { return u.Scheme + "://" + u.Host } - -func (api *API) NewCollectHandler() http.Handler { - pageviews := make(chan *models.Pageview, 10) - go aggregate(api.database) - go collect(api.database, pageviews) - - return HandlerFunc(func(w http.ResponseWriter, r *http.Request) error { - if !shouldCollect(r) { - return nil - } - - q := r.URL.Query() - now := time.Now() - - // get pageview details - pageview := &models.Pageview{ - ID: q.Get("id"), - Hostname: parseHostname(q.Get("h")), - Pathname: parsePathname(q.Get("p")), - IsNewVisitor: q.Get("nv") == "1", - IsNewSession: q.Get("ns") == "1", - IsUnique: q.Get("u") == "1", - IsBounce: q.Get("b") != "0", - Referrer: parseReferrer(q.Get("r")), - Duration: 0, - Timestamp: now, - } - - // find previous pageview by same visitor - previousPageviewID := q.Get("pid") - if !pageview.IsNewSession && previousPageviewID != "" { - previousPageview, err := api.database.GetPageview(previousPageviewID) - if err != nil && err != datastore.ErrNoResults { - return err - } - - // if we have a recent pageview that is less than 30 minutes old - if previousPageview != nil && previousPageview.Timestamp.After(now.Add(-30*time.Minute)) { - previousPageview.Duration = (now.Unix() - previousPageview.Timestamp.Unix()) - previousPageview.IsBounce = false - - // push onto channel to be updated (in batch) later - pageviews <- previousPageview - } - } - - // push pageview onto channel to be inserted (in batch) later - pageviews <- pageview - - // indicate that we're not tracking user data, see https://github.com/usefathom/fathom/issues/65 - w.Header().Set("Tk", "N") - - // headers to prevent caching - w.Header().Set("Content-Type", "image/gif") - w.Header().Set("Expires", "Mon, 01 Jan 1990 00:00:00 GMT") - w.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate") - w.Header().Set("Pragma", "no-cache") - - // response - w.WriteHeader(http.StatusOK) - - // 1x1 px transparent GIF - b, _ := base64.StdEncoding.DecodeString("R0lGODlhAQABAIAAAAAAAP///yH5BAEAAAAALAAAAAABAAEAAAIBRAA7") - w.Write(b) - return nil - }) -} - -// runs the aggregate func every minute -func aggregate(db datastore.Datastore) { - agg := aggregator.New(db) - agg.Run() - - timeout := 1 * time.Minute - - for { - select { - case <-time.After(timeout): - agg.Run() - } - } -} - -func collect(db datastore.Datastore, pageviews chan *models.Pageview) { - var size = 800 - var timeout = 600 * time.Millisecond - var buffer = make([]*models.Pageview, 0) - - for { - select { - case pageview := <-pageviews: - buffer = append(buffer, pageview) - if len(buffer) >= size { - persist(db, buffer) - buffer = make([]*models.Pageview, 0) - } - case <-time.After(timeout): - if len(buffer) > 0 { - persist(db, buffer) - buffer = make([]*models.Pageview, 0) - } - } - } -} - -func persist(db datastore.Datastore, pageviews []*models.Pageview) { - n := len(pageviews) - updates := make([]*models.Pageview, 0, n) - inserts := make([]*models.Pageview, 0, n) - - for i := range pageviews { - if !pageviews[i].IsBounce { - updates = append(updates, pageviews[i]) - } else { - inserts = append(inserts, pageviews[i]) - } - } - - log.Debugf("persisting %d pageviews (%d inserts, %d updates)", len(pageviews), len(inserts), len(updates)) - - var err error - err = db.InsertPageviews(inserts) - if err != nil { - log.Errorf("error inserting pageviews: %s", err) - } - - err = db.UpdatePageviews(updates) - if err != nil { - log.Errorf("error updating pageviews: %s", err) - } -} diff --git a/pkg/api/routes.go b/pkg/api/routes.go index f8fb537..7553037 100644 --- a/pkg/api/routes.go +++ b/pkg/api/routes.go @@ -1,15 +1,16 @@ package api import ( + "net/http" + "github.com/gobuffalo/packr" "github.com/gorilla/mux" - "net/http" ) func (api *API) Routes() *mux.Router { // register routes r := mux.NewRouter() - r.Handle("/collect", api.NewCollectHandler()).Methods(http.MethodGet) + r.Handle("/collect", NewCollector(api.database)).Methods(http.MethodGet) r.Handle("/api/session", HandlerFunc(api.LoginHandler)).Methods(http.MethodPost) r.Handle("/api/session", HandlerFunc(api.LogoutHandler)).Methods(http.MethodDelete)