From 2f8e1c61e4f632425419a24b4a2c4ba5e9614ec5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dandelion=20Man=C3=A9?= Date: Thu, 15 Aug 2019 16:08:04 +0200 Subject: [PATCH] Add a Discourse API mirror (#1266) The mirror wraps a SQLite database which will store all of the data we download from Discourse. On a call to `update`, it downloads new data from the server and stores it. Then, when it is asked for information like the topics and posts, it can just pull from its local copy. This means that we don't need to re-download the content every time we load a Discourse instance, which makes the load more performant, more robust to network failures, etc. Thanks to @wchargin, whose work on the GraphQL mirror for GitHub (#622) inspired this mirror. Test plan: I've written unit tests that use a mock fetcher to validate the update logic. I've also used this to do a full load of the real SourceCred Discourse instance, and to create a corresponding graph (using subsequent commits). Progress towards #865. --- src/plugins/discourse/mirror.js | 299 +++++++++++++++++++++++++++ src/plugins/discourse/mirror.test.js | 298 ++++++++++++++++++++++++++ 2 files changed, 597 insertions(+) create mode 100644 src/plugins/discourse/mirror.js create mode 100644 src/plugins/discourse/mirror.test.js diff --git a/src/plugins/discourse/mirror.js b/src/plugins/discourse/mirror.js new file mode 100644 index 0000000..12ab0f8 --- /dev/null +++ b/src/plugins/discourse/mirror.js @@ -0,0 +1,299 @@ +// @flow + +import type Database from "better-sqlite3"; +import stringify from "json-stable-stringify"; +import dedent from "../../util/dedent"; +import { + type Discourse, + type TopicId, + type PostId, + type Topic, + type Post, +} from "./fetch"; + +// The version should be bumped any time the database schema is changed, +// so that the cache will be properly invalidated. +const VERSION = "discourse_mirror_v1"; + +/** + * Mirrors data from the Discourse API into a local sqlite db. + * + * This class allows us to persist a local copy of data from a Discourse + * instance. We have it for reasons similar to why we have a GraphQL mirror for + * GitHub; it allows us to avoid re-doing expensive IO every time we re-load + * SourceCred. It also gives us robustness in the face of network failures (we + * can keep however much we downloaded until the fault). + * + * As implemented, the Mirror will never update already-downloaded content, + * meaning it will not catch edits or deletions. As such, it's advisable to + * replace the cache periodically (perhaps once a week or month). We may + * implement automatic cache invalidation in the future. + * + * Each Mirror instance is tied to a particular server. Trying to use a mirror + * for multiple Discourse servers is not permitted; use separate Mirrors. + */ +export class Mirror { + +_db: Database; + +_fetcher: Discourse; + + /** + * Construct a new Mirror instance. + * + * Takes a Database, which may be a pre-existing Mirror database. The + * provided DiscourseInterface will be used to retrieve new data from Discourse. + * + * A serverUrl is required so that we can ensure that this Mirror is only storing + * data from a particular Discourse server. + */ + constructor(db: Database, fetcher: Discourse, serverUrl: string) { + if (db == null) throw new Error("db: " + String(db)); + this._db = db; + this._fetcher = fetcher; + if (db.inTransaction) { + throw new Error("already in transaction"); + } + try { + db.prepare("BEGIN").run(); + this._initialize(serverUrl); + if (db.inTransaction) { + db.prepare("COMMIT").run(); + } + } finally { + if (db.inTransaction) { + db.prepare("ROLLBACK").run(); + } + } + } + + _initialize(serverUrl: string) { + const db = this._db; + // We store the config in a singleton table `meta`, whose unique row + // has primary key `0`. Only the first ever insert will succeed; we + // are locked into the first config. + db.prepare( + dedent`\ + CREATE TABLE IF NOT EXISTS meta ( + zero INTEGER PRIMARY KEY, + config TEXT NOT NULL + ) + ` + ).run(); + + const config = stringify({ + version: VERSION, + serverUrl: serverUrl, + }); + + const existingConfig: string | void = db + .prepare("SELECT config FROM meta") + .pluck() + .get(); + if (existingConfig === config) { + // Already set up; nothing to do. + return; + } else if (existingConfig !== undefined) { + throw new Error( + "Database already populated with incompatible server or version" + ); + } + db.prepare("INSERT INTO meta (zero, config) VALUES (0, ?)").run(config); + + const tables = [ + dedent`\ + CREATE TABLE topics ( + id INTEGER PRIMARY KEY, + title TEXT NOT NULL, + timestamp_ms INTEGER NOT NULL, + author_username TEXT NOT NULL + ) + `, + dedent`\ + CREATE TABLE posts ( + id INTEGER PRIMARY KEY, + timestamp_ms INTEGER NOT NULL, + author_username TEXT NOT NULL, + topic_id INTEGER NOT NULL, + index_within_topic INTEGER NOT NULL, + reply_to_post_index INTEGER, + FOREIGN KEY(topic_id) REFERENCES topics(id) + ) + `, + ]; + for (const sql of tables) { + db.prepare(sql).run(); + } + } + + topics(): $ReadOnlyArray { + return this._db + .prepare( + dedent`\ + SELECT + id, + timestamp_ms, + author_username, + title + FROM topics` + ) + .all() + .map((x) => ({ + id: x.id, + timestampMs: x.timestamp_ms, + authorUsername: x.author_username, + title: x.title, + })); + } + + posts(): $ReadOnlyArray { + return this._db + .prepare( + dedent`\ + SELECT + id, + timestamp_ms, + author_username, + topic_id, + index_within_topic, + reply_to_post_index + FROM posts` + ) + .all() + .map((x) => ({ + id: x.id, + timestampMs: x.timestamp_ms, + authorUsername: x.author_username, + topicId: x.topic_id, + indexWithinTopic: x.index_within_topic, + replyToPostIndex: x.reply_to_post_index, + })); + } + + /** + * Given a TopicId and a post number, find that numbered post within the topic. + * + * Returns undefined if no such post exists. + */ + findPostInTopic(topicId: TopicId, indexWithinTopic: number): ?PostId { + return this._db + .prepare( + dedent`\ + SELECT id + FROM posts + WHERE topic_id = :topic_id AND index_within_topic = :index_within_topic + ` + ) + .pluck() + .get({topic_id: topicId, index_within_topic: indexWithinTopic}); + } + + async update() { + const db = this._db; + const latestTopicId = await this._fetcher.latestTopicId(); + const lastLocalPostId = + db + .prepare("SELECT MAX(id) FROM posts") + .pluck() + .get() || 0; + + const lastLocalTopicId = + db + .prepare("SELECT MAX(id) FROM topics") + .pluck() + .get() || 0; + + const encounteredPostIds = new Set(); + + function addPost(post: Post) { + const { + id, + timestampMs, + replyToPostIndex, + indexWithinTopic, + topicId, + authorUsername, + } = post; + db.prepare( + dedent`\ + REPLACE INTO posts ( + id, + timestamp_ms, + author_username, + topic_id, + index_within_topic, + reply_to_post_index + ) VALUES ( + :id, + :timestamp_ms, + :author_username, + :topic_id, + :index_within_topic, + :reply_to_post_index + ) + ` + ).run({ + id, + timestamp_ms: timestampMs, + reply_to_post_index: replyToPostIndex, + index_within_topic: indexWithinTopic, + topic_id: topicId, + author_username: authorUsername, + }); + encounteredPostIds.add(id); + } + + for ( + let topicId = lastLocalTopicId + 1; + topicId <= latestTopicId; + topicId++ + ) { + const topicWithPosts = await this._fetcher.topicWithPosts(topicId); + if (topicWithPosts != null) { + const {topic, posts} = topicWithPosts; + const {id, title, timestampMs, authorUsername} = topic; + this._db + .prepare( + dedent`\ + REPLACE INTO topics ( + id, + title, + timestamp_ms, + author_username + ) VALUES ( + :id, + :title, + :timestamp_ms, + :author_username + )` + ) + .run({ + id, + title, + timestamp_ms: timestampMs, + author_username: authorUsername, + }); + for (const post of posts) { + addPost(post); + } + } + } + + const latestPosts = await this._fetcher.latestPosts(); + for (const post of latestPosts) { + if (!encounteredPostIds.has(post.id) && post.id > lastLocalPostId) { + addPost(post); + } + } + + const latestPost = latestPosts[0]; + const latestPostId = latestPost == null ? 0 : latestPost.id; + for (let postId = lastLocalPostId + 1; postId <= latestPostId; postId++) { + if (encounteredPostIds.has(postId)) { + continue; + } + const post = await this._fetcher.post(postId); + if (post != null) { + addPost(post); + } + } + } +} diff --git a/src/plugins/discourse/mirror.test.js b/src/plugins/discourse/mirror.test.js new file mode 100644 index 0000000..c4ed010 --- /dev/null +++ b/src/plugins/discourse/mirror.test.js @@ -0,0 +1,298 @@ +// @flow + +import Database from "better-sqlite3"; +import fs from "fs"; +import tmp from "tmp"; +import {Mirror} from "./mirror"; +import { + type Discourse, + type TopicId, + type PostId, + type Topic, + type Post, + type TopicWithPosts, +} from "./fetch"; +import * as MapUtil from "../../util/map"; +import * as NullUtil from "../../util/null"; + +type PostInfo = {| + +indexWithinTopic: number, + +replyToPostIndex: number | null, + +topicId: number, +|}; +class MockFetcher implements Discourse { + _latestTopicId: number; + _latestPostId: number; + _topicToPostIds: Map; + _posts: Map; + + constructor() { + this._latestTopicId = 1; + this._latestPostId = 1; + this._topicToPostIds = new Map(); + this._posts = new Map(); + } + async latestTopicId(): Promise { + return this._latestTopicId; + } + + async latestPosts(): Promise { + const latestPost = this._post(this._latestPostId - 1); + if (latestPost == null) { + return []; + } + return [latestPost]; + } + + async topicWithPosts(id: TopicId): Promise { + const postIds = this._topicToPostIds.get(id); + if (postIds == null || postIds.length === 0) { + return null; + } + const firstPost = this._post(postIds[0]); + if (firstPost == null) { + throw new Error("invalid firstPost"); + } + // Only return the first post in the posts array, to ensure that we have to + // test the functionality where we manually grab posts by ID + const posts = [firstPost]; + return {topic: this._topic(id), posts}; + } + + _topic(id: TopicId): Topic { + return { + id, + title: `topic ${id}`, + timestampMs: 1000, + authorUsername: "credbot", + }; + } + + async post(id: PostId): Promise { + return this._post(id); + } + + _post(id: PostId): Post | null { + const postInfo = this._posts.get(id); + if (postInfo == null) { + return null; + } + const {replyToPostIndex, topicId, indexWithinTopic} = postInfo; + return { + id, + timestampMs: 2003, + replyToPostIndex, + topicId, + indexWithinTopic, + authorUsername: "credbot", + }; + } + + addPost(topicId: TopicId, replyToNumber: number | null): PostId { + const postId = this._latestPostId++; + this._latestTopicId = Math.max(topicId, this._latestTopicId); + const postsOnTopic = MapUtil.pushValue( + this._topicToPostIds, + topicId, + postId + ); + if (replyToNumber != null && replyToNumber >= postsOnTopic.length) { + throw new Error("invalid replyToNumber"); + } + const postInfo: PostInfo = { + indexWithinTopic: postsOnTopic.length, + replyToPostIndex: replyToNumber, + topicId: topicId, + }; + this._posts.set(postId, postInfo); + return postId; + } +} + +describe("plugins/discourse/mirror", () => { + const example = () => { + const fetcher = new MockFetcher(); + const db = new Database(":memory:"); + const mirror = new Mirror(db, fetcher, "https://some-url.io"); + return {fetcher, mirror}; + }; + + it("rejects a different server url without changing the database", () => { + // We use an on-disk database file here so that we can dump the + // contents to ensure that the database is physically unchanged. + const filename = tmp.fileSync().name; + const db = new Database(filename); + const fetcher = new MockFetcher(); + const url1 = "https://foo.bar"; + const url2 = "https://foo.zod"; + expect(() => new Mirror(db, fetcher, url1)).not.toThrow(); + const data = fs.readFileSync(filename).toJSON(); + + expect(() => new Mirror(db, fetcher, url2)).toThrow( + "incompatible server or version" + ); + expect(fs.readFileSync(filename).toJSON()).toEqual(data); + + expect(() => new Mirror(db, fetcher, url1)).not.toThrow(); + expect(fs.readFileSync(filename).toJSON()).toEqual(data); + }); + + it("mirrors topics from the fetcher", async () => { + const {mirror, fetcher} = example(); + fetcher.addPost(2, null); + fetcher.addPost(3, null); + const topic2 = fetcher._topic(2); + const topic3 = fetcher._topic(3); + await mirror.update(); + expect(mirror.topics()).toEqual([topic2, topic3]); + }); + + it("mirrors posts from the fetcher", async () => { + const {mirror, fetcher} = example(); + const p1 = fetcher.addPost(2, null); + const p2 = fetcher.addPost(3, null); + const p3 = fetcher.addPost(3, 1); + await mirror.update(); + const posts = [fetcher._post(p1), fetcher._post(p2), fetcher._post(p3)]; + expect(mirror.posts()).toEqual(posts); + }); + + describe("update semantics", () => { + it("only fetches new topics on `update`", async () => { + const {mirror, fetcher} = example(); + fetcher.addPost(1, null); + fetcher.addPost(2, null); + await mirror.update(); + fetcher.addPost(3, null); + const fetchTopicWithPosts = jest.spyOn(fetcher, "topicWithPosts"); + await mirror.update(); + expect(fetchTopicWithPosts).toHaveBeenCalledTimes(1); + expect(fetchTopicWithPosts).toHaveBeenCalledWith(3); + expect(mirror.topics().map((x) => x.id)).toEqual([1, 2, 3]); + }); + + it("gets new posts on old topics on update", async () => { + const {mirror, fetcher} = example(); + fetcher.addPost(1, null); + fetcher.addPost(2, null); + await mirror.update(); + const id = fetcher.addPost(1, 1); + fetcher.addPost(3, null); + await mirror.update(); + const latestPosts = await fetcher.latestPosts(); + // The post added to the old topic wasn't retrieved by latest post + expect(latestPosts.map((x) => x.id)).not.toContain(id); + const allPostIds = mirror.posts().map((x) => x.id); + // The post was still included, meaning the mirror scanned for new posts by id + expect(allPostIds).toContain(id); + }); + + it("skips null/missing topics", async () => { + const {mirror, fetcher} = example(); + fetcher.addPost(1, null); + fetcher.addPost(3, null); + await mirror.update(); + expect(mirror.topics().map((x) => x.id)).toEqual([1, 3]); + }); + + it("skips null/missing posts", async () => { + const {mirror, fetcher} = example(); + const p1 = fetcher.addPost(1, null); + fetcher._latestPostId += 2; + const p2 = fetcher.addPost(3, null); + await mirror.update(); + expect(mirror.posts().map((x) => x.id)).toEqual([p1, p2]); + }); + + it("queries explicitly for posts that are not present in topicWithPosts.posts", async () => { + const {mirror, fetcher} = example(); + const p1 = fetcher.addPost(1, null); + const p2 = fetcher.addPost(1, 1); + const p3 = fetcher.addPost(1, 1); + const fetchPost = jest.spyOn(fetcher, "post"); + await mirror.update(); + const getId = (x) => x.id; + + const postsFromTopic = NullUtil.get(await fetcher.topicWithPosts(1)) + .posts; + expect(postsFromTopic.map(getId)).toEqual([p1]); + + const postsFromLatest = await fetcher.latestPosts(); + expect(postsFromLatest.map(getId)).toEqual([p3]); + + expect(fetchPost).toHaveBeenCalledTimes(1); + expect(fetchPost).toHaveBeenCalledWith(p2); + + expect(mirror.posts().map(getId)).toEqual([p1, p2, p3]); + }); + + it("does not explicitly query for posts that were in topicWithPosts.posts", async () => { + const {mirror, fetcher} = example(); + fetcher.addPost(1, null); + const fetchPost = jest.spyOn(fetcher, "post"); + await mirror.update(); + expect(fetchPost).not.toHaveBeenCalled(); + }); + + it("does not explicitly query for posts that were provided in latest posts", async () => { + const {mirror, fetcher} = example(); + fetcher.addPost(1, null); + await mirror.update(); + const id = fetcher.addPost(1, 1); + const fetchPost = jest.spyOn(fetcher, "post"); + await mirror.update(); + expect(fetchPost).not.toHaveBeenCalled(); + expect(mirror.posts().map((x) => x.id)).toContain(id); + }); + + it("does not query for topics at all if there were no new topics", async () => { + const {mirror, fetcher} = example(); + fetcher.addPost(1, null); + await mirror.update(); + const fetchTopic = jest.spyOn(fetcher, "topicWithPosts"); + await mirror.update(); + expect(fetchTopic).not.toHaveBeenCalled(); + }); + }); + + describe("findPostInTopic", () => { + it("works for the first post in a topic", async () => { + const {mirror, fetcher} = example(); + const id = fetcher.addPost(5, null); + const post = NullUtil.get(fetcher._post(id)); + expect(post.topicId).toEqual(5); + expect(post.indexWithinTopic).toEqual(1); + await mirror.update(); + expect(mirror.findPostInTopic(5, 1)).toEqual(id); + }); + + it("works for the second post in a topic", async () => { + const {mirror, fetcher} = example(); + fetcher.addPost(1, null); + const id = fetcher.addPost(1, 1); + const post = NullUtil.get(fetcher._post(id)); + expect(post.indexWithinTopic).toEqual(2); + await mirror.update(); + expect(mirror.findPostInTopic(1, 2)).toEqual(id); + }); + + it("returns undefined for a post with too high an index", async () => { + const {mirror, fetcher} = example(); + fetcher.addPost(1, null); + await mirror.update(); + expect(mirror.findPostInTopic(1, 2)).toBe(undefined); + }); + + it("returns undefined for topic that doesnt exist", async () => { + const {mirror, fetcher} = example(); + fetcher.addPost(1, null); + await mirror.update(); + expect(mirror.findPostInTopic(2, 1)).toBe(undefined); + }); + + it("returns undefined for a mirror that never updated", async () => { + const {mirror} = example(); + expect(mirror.findPostInTopic(1, 1)).toBe(undefined); + }); + }); +});