From 65d811fb44b7e3cd9af74531b85accdf953fb017 Mon Sep 17 00:00:00 2001 From: William Chargin Date: Thu, 27 Sep 2018 19:12:47 -0700 Subject: [PATCH] mirror: add helpers for full queries and updates (#898) Summary: An update step is now roughly as simple as: _updateData(postQuery(_queryFromPlan(_findOutdated()))) (with some option/config parameters thrown in). This makes progress toward #622. Test Plan: Unit tests included. They are light, because these functions are light. They still retain full coverage. wchargin-branch: mirror-full-pipeline --- src/graphql/mirror.js | 199 +++++++++++++++++ src/graphql/mirror.test.js | 437 +++++++++++++++++++++++++++++++++++++ 2 files changed, 636 insertions(+) diff --git a/src/graphql/mirror.js b/src/graphql/mirror.js index ffeb735..46d80a6 100644 --- a/src/graphql/mirror.js +++ b/src/graphql/mirror.js @@ -4,6 +4,7 @@ import type Database, {BindingDictionary, Statement} from "better-sqlite3"; import stringify from "json-stable-stringify"; import dedent from "../util/dedent"; +import * as MapUtil from "../util/map"; import * as NullUtil from "../util/null"; import * as Schema from "./schema"; import * as Queries from "./queries"; @@ -457,6 +458,164 @@ export class Mirror { }); } + /** + * Create a GraphQL selection set to fetch data corresponding to the + * given query plan. + * + * The resulting GraphQL should be embedded into a top-level query. + * + * The result of this query is an `UpdateResult`. + * + * This function is pure: it does not interact with the database. + */ + _queryFromPlan( + queryPlan: QueryPlan, + options: {| + +connectionLimit: number, + +connectionPageSize: number, + |} + ): Queries.Selection[] { + // Group objects by type, so that we only have to specify each + // type's fieldset once. + const objectsByType: Map = new Map(); + for (const object of queryPlan.objects) { + MapUtil.pushValue(objectsByType, object.typename, object.id); + } + + // Group connections by object, so that we only have to fetch the + // node once. + const connectionsByObject: Map< + Schema.ObjectId, + {| + +typename: Schema.Typename, + +connections: Array<{| + +fieldname: Schema.Fieldname, + +endCursor: EndCursor | void, + |}>, + |} + > = new Map(); + for (const connection of queryPlan.connections.slice( + 0, + options.connectionLimit + )) { + let existing = connectionsByObject.get(connection.objectId); + if (existing == null) { + existing = {typename: connection.objectTypename, connections: []}; + connectionsByObject.set(connection.objectId, existing); + } else { + if (connection.objectTypename !== existing.typename) { + const s = JSON.stringify; + throw new Error( + `Query plan has inconsistent typenames for ` + + `object ${s(connection.objectId)}: ` + + `${s(existing.typename)} vs. ${s(connection.objectTypename)}` + ); + } + } + existing.connections.push({ + fieldname: connection.fieldname, + endCursor: connection.endCursor, + }); + } + + const b = Queries.build; + + // Each top-level field corresponds to either an object type + // (fetching own data for objects of that type) or a particular node + // (updating connections on that node). We alias each such field, + // which is necessary to ensure that their names are all unique. The + // names chosen are sufficient to identify which _kind_ of query the + // field corresponds to (type's own data vs node's connections), but + // do not need to identify the particular type or node in question. + // This is because all descendant selections are self-describing: + // they include the ID of any relevant objects. + return [].concat( + Array.from(objectsByType.entries()).map(([typename, ids]) => { + const name = `${_FIELD_PREFIXES.OWN_DATA}${typename}`; + return b.alias( + name, + b.field("nodes", {ids: b.list(ids.map((id) => b.literal(id)))}, [ + b.inlineFragment(typename, this._queryOwnData(typename)), + ]) + ); + }), + Array.from(connectionsByObject.entries()).map( + ([id, {typename, connections}], i) => { + const name = `${_FIELD_PREFIXES.NODE_CONNECTIONS}${i}`; + return b.alias( + name, + b.field("node", {id: b.literal(id)}, [ + b.field("id"), + b.inlineFragment( + typename, + [].concat( + ...connections.map(({fieldname, endCursor}) => + this._queryConnection( + typename, + fieldname, + endCursor, + options.connectionPageSize + ) + ) + ) + ), + ]) + ); + } + ) + ); + } + + /** + * Ingest data given by an `UpdateResult`. This is porcelain over + * `_updateConnection` and `_updateOwnData`. + * + * See: `_findOutdated`. + * See: `_queryFromPlan`. + */ + _updateData(updateId: UpdateId, queryResult: UpdateResult): void { + _inTransaction(this._db, () => { + this._nontransactionallyUpdateData(updateId, queryResult); + }); + } + + /** + * As `_updateData`, but do not enter any transactions. Other methods + * may call this method as a subroutine in a larger transaction. + */ + _nontransactionallyUpdateData( + updateId: UpdateId, + queryResult: UpdateResult + ): void { + for (const topLevelKey of Object.keys(queryResult)) { + if (topLevelKey.startsWith(_FIELD_PREFIXES.OWN_DATA)) { + const rawValue: OwnDataUpdateResult | NodeConnectionsUpdateResult = + queryResult[topLevelKey]; + const updateRecord: OwnDataUpdateResult = (rawValue: any); + this._nontransactionallyUpdateOwnData(updateId, updateRecord); + } else if (topLevelKey.startsWith(_FIELD_PREFIXES.NODE_CONNECTIONS)) { + const rawValue: OwnDataUpdateResult | NodeConnectionsUpdateResult = + queryResult[topLevelKey]; + const updateRecord: NodeConnectionsUpdateResult = (rawValue: any); + for (const fieldname of Object.keys(updateRecord)) { + if (fieldname === "id") { + continue; + } + this._nontransactionallyUpdateConnection( + updateId, + updateRecord.id, + fieldname, + updateRecord[fieldname] + ); + } + } else { + throw new Error( + "Bad key in query result: " + JSON.stringify(topLevelKey) + ); + } + } + } + /** * Create a GraphQL selection set required to identify the typename * and ID for an object of the given declared type, which may be @@ -1362,6 +1521,46 @@ type OwnDataUpdateResult = $ReadOnlyArray<{ | NodeFieldResult, }>; +/** + * Result describing new elements for connections on a single node. + * + * This type would be exact but for facebook/flow#2977, et al. + */ +type NodeConnectionsUpdateResult = { + +id: Schema.ObjectId, + +[connectionFieldname: Schema.Fieldname]: ConnectionFieldResult, +}; + +/** + * Result describing both own-data updates and connection updates. Each + * key's prefix determines what type of results the corresponding value + * represents (see constants below). No field prefix is a prefix of + * another, so this characterization is complete. + * + * This type would be exact but for facebook/flow#2977, et al. + * + * See: `_FIELD_PREFIXES`. + */ +type UpdateResult = { + // The prefix of each key determines what type of results the value + // represents. See constants below. + +[string]: OwnDataUpdateResult | NodeConnectionsUpdateResult, +}; + +export const _FIELD_PREFIXES = Object.freeze({ + /** + * A key of an `UpdateResult` has this prefix if and only if the + * corresponding value represents `OwnDataUpdateResult`s. + */ + OWN_DATA: "owndata_", + + /** + * A key of an `UpdateResult` has this prefix if and only if the + * corresponding value represents `NodeConnectionsUpdateResult`s. + */ + NODE_CONNECTIONS: "node_", +}); + /** * Execute a function inside a database transaction. * diff --git a/src/graphql/mirror.test.js b/src/graphql/mirror.test.js index fe2eecf..d84df5f 100644 --- a/src/graphql/mirror.test.js +++ b/src/graphql/mirror.test.js @@ -8,6 +8,7 @@ import dedent from "../util/dedent"; import * as Schema from "./schema"; import * as Queries from "./queries"; import { + _FIELD_PREFIXES, _buildSchemaInfo, _inTransaction, _makeSingleUpdateFunction, @@ -522,6 +523,428 @@ describe("graphql/mirror", () => { }); }); + describe("_queryFromPlan", () => { + it("errors if connections for an object have distinct typename", () => { + const db = new Database(":memory:"); + const mirror = new Mirror(db, buildGithubSchema()); + const plan = { + objects: [], + connections: [ + { + objectTypename: "Issue", + objectId: "hmmm", + fieldname: "comments", + endCursor: null, + }, + { + objectTypename: "Repository", + objectId: "hmmm", + fieldname: "issues", + endCursor: null, + }, + ], + }; + expect(() => { + mirror._queryFromPlan(plan, { + connectionLimit: 5, + connectionPageSize: 23, + }); + }).toThrow( + 'Query plan has inconsistent typenames for object "hmmm": ' + + '"Issue" vs. "Repository"' + ); + }); + it("creates a good query", () => { + const db = new Database(":memory:"); + const mirror = new Mirror(db, buildGithubSchema()); + const plan = { + objects: [ + {typename: "Issue", id: "i#1"}, + {typename: "Repository", id: "repo#2"}, + {typename: "Issue", id: "i#3"}, + ], + connections: [ + { + objectTypename: "Issue", + objectId: "i#1", + fieldname: "comments", + endCursor: null, + }, + { + objectTypename: "Issue", + objectId: "i#4", + fieldname: "comments", + endCursor: null, + }, + { + objectTypename: "Issue", + objectId: "i#1", + fieldname: "timeline", + endCursor: undefined, + }, + { + objectTypename: "Issue", + objectId: "i#4", + fieldname: "timeline", + endCursor: "cursor#998", + }, + { + objectTypename: "Repository", + objectId: "repo#5", + fieldname: "issues", + endCursor: "cursor#998", + }, + // cut off below this point due to the page limit + { + objectTypename: "Repository", + objectId: "repo#5", + fieldname: "pulls", + endCursor: "cursor#997", + }, + { + objectTypename: "Repository", + objectId: "repo#6", + fieldname: "issues", + endCursor: "cursor#996", + }, + ], + }; + const actual = mirror._queryFromPlan(plan, { + connectionLimit: 5, + connectionPageSize: 23, + }); + const b = Queries.build; + expect(actual).toEqual([ + b.alias( + "owndata_Issue", + b.field( + "nodes", + {ids: b.list([b.literal("i#1"), b.literal("i#3")])}, + [b.inlineFragment("Issue", mirror._queryOwnData("Issue"))] + ) + ), + b.alias( + "owndata_Repository", + b.field("nodes", {ids: b.list([b.literal("repo#2")])}, [ + b.inlineFragment( + "Repository", + mirror._queryOwnData("Repository") + ), + ]) + ), + b.alias( + "node_0", + b.field("node", {id: b.literal("i#1")}, [ + b.field("id"), + b.inlineFragment("Issue", [ + ...mirror._queryConnection("Issue", "comments", null, 23), + ...mirror._queryConnection("Issue", "timeline", undefined, 23), + ]), + ]) + ), + b.alias( + "node_1", + b.field("node", {id: b.literal("i#4")}, [ + b.field("id"), + b.inlineFragment("Issue", [ + ...mirror._queryConnection("Issue", "comments", null, 23), + ...mirror._queryConnection( + "Issue", + "timeline", + "cursor#998", + 23 + ), + ]), + ]) + ), + b.alias( + "node_2", + b.field("node", {id: b.literal("repo#5")}, [ + b.field("id"), + b.inlineFragment("Repository", [ + ...mirror._queryConnection( + "Repository", + "issues", + "cursor#998", + 23 + ), + ]), + ]) + ), + ]); + }); + }); + + describe("_updateData", () => { + it("throws if given a key with invalid prefix", () => { + const db = new Database(":memory:"); + const mirror = new Mirror(db, buildGithubSchema()); + const updateId = mirror._createUpdate(new Date(123)); + const result = {wat_0: {id: "wot"}}; + expect(() => { + mirror._nontransactionallyUpdateData(updateId, result); + }).toThrow('Bad key in query result: "wat_0"'); + }); + + // We test the happy path lightly, because it just delegates to + // other methods, which are themselves tested. This test is + // sufficient to effect full coverage. + it("processes a reasonable input correctly", () => { + const db = new Database(":memory:"); + const mirror = new Mirror(db, buildGithubSchema()); + const updateId = mirror._createUpdate(new Date(123)); + mirror.registerObject({typename: "Repository", id: "repo:foo/bar"}); + mirror.registerObject({typename: "Issue", id: "issue:#1"}); + mirror.registerObject({typename: "Issue", id: "issue:#2"}); + const result = { + owndata_0: [ + { + __typename: "Repository", + id: "repo:foo/bar", + url: "url://foo/bar", + }, + ], + owndata_1: [ + { + __typename: "Issue", + id: "issue:#1", + url: "url://foo/bar/issue/1", + title: "something wicked", + repository: { + __typename: "Repository", + id: "repo:foo/bar", + }, + author: { + __typename: "User", + id: "user:alice", + }, + }, + { + __typename: "Issue", + id: "issue:#2", + url: "url://foo/bar/issue/2", + title: "this way comes", + repository: { + __typename: "Repository", + id: "repo:foo/bar", + }, + author: { + __typename: "User", + id: "user:alice", + }, + }, + ], + node_0: { + id: "repo:foo/bar", + issues: { + totalCount: 2, + pageInfo: { + hasNextPage: true, + endCursor: "cursor:repo:foo/bar.issues@0", + }, + nodes: [{__typename: "Issue", id: "issue:#1"}], + }, + }, + node_1: { + id: "issue:#1", + comments: { + totalCount: 1, + pageInfo: { + hasNextPage: false, + endCursor: "cursor:issue:#1.comments@0", + }, + nodes: [{__typename: "IssueComment", id: "comment:#1.1"}], + }, + timeline: { + totalCount: 1, + pageInfo: { + hasNextPage: false, + endCursor: "cursor:issue:#1.timeline@0", + }, + nodes: [{__typename: "ClosedEvent", id: "issue:#1!closed#0"}], + }, + }, + }; + mirror._updateData(updateId, result); + + // Check that the right objects are in the database. + expect( + db.prepare("SELECT * FROM objects ORDER BY id ASC").all() + ).toEqual([ + {typename: "IssueComment", id: "comment:#1.1", last_update: null}, + {typename: "Issue", id: "issue:#1", last_update: updateId}, + {typename: "ClosedEvent", id: "issue:#1!closed#0", last_update: null}, + {typename: "Issue", id: "issue:#2", last_update: updateId}, + {typename: "Repository", id: "repo:foo/bar", last_update: updateId}, + {typename: "User", id: "user:alice", last_update: null}, + ]); + + // Check that some objects have the right primitives. + // (These poke at the internals of the storage format a bit.) + expect( + db + .prepare("SELECT * FROM primitives_Repository ORDER BY id ASC") + .all() + ).toEqual([{id: "repo:foo/bar", url: JSON.stringify("url://foo/bar")}]); + expect( + db.prepare("SELECT * FROM primitives_Issue ORDER BY id ASC").all() + ).toEqual([ + { + id: "issue:#1", + url: JSON.stringify("url://foo/bar/issue/1"), + title: JSON.stringify("something wicked"), + }, + { + id: "issue:#2", + url: JSON.stringify("url://foo/bar/issue/2"), + title: JSON.stringify("this way comes"), + }, + ]); + expect( + db.prepare("SELECT * FROM primitives_User ORDER BY id ASC").all() + ).toEqual([{id: "user:alice", login: null, url: null}]); + expect( + db + .prepare("SELECT * FROM primitives_ClosedEvent ORDER BY id ASC") + .all() + ).toEqual([{id: "issue:#1!closed#0"}]); + + // Check that some links are correct. + expect( + db + .prepare( + "SELECT * FROM links WHERE parent_id LIKE 'issue:%' " + + "ORDER BY parent_id, fieldname" + ) + .all() + ).toEqual([ + { + parent_id: "issue:#1", + fieldname: "author", + child_id: "user:alice", + rowid: expect.anything(), + }, + { + parent_id: "issue:#1", + fieldname: "repository", + child_id: "repo:foo/bar", + rowid: expect.anything(), + }, + { + parent_id: "issue:#1!closed#0", + fieldname: "actor", + child_id: null, + rowid: expect.anything(), + }, + { + parent_id: "issue:#2", + fieldname: "author", + child_id: "user:alice", + rowid: expect.anything(), + }, + { + parent_id: "issue:#2", + fieldname: "repository", + child_id: "repo:foo/bar", + rowid: expect.anything(), + }, + ]); + + // Check that the connection metadata are correct. + expect( + db + .prepare("SELECT * FROM connections ORDER BY object_id, fieldname") + .all() + ).toEqual([ + // Issue #1 had both comments and timeline fetched. + { + object_id: "issue:#1", + fieldname: "comments", + last_update: updateId, + has_next_page: +false, + end_cursor: "cursor:issue:#1.comments@0", + total_count: 1, + rowid: expect.anything(), + }, + { + object_id: "issue:#1", + fieldname: "timeline", + last_update: updateId, + has_next_page: +false, + end_cursor: "cursor:issue:#1.timeline@0", + total_count: 1, + rowid: expect.anything(), + }, + // Issue #2 had no connections fetched. + { + object_id: "issue:#2", + fieldname: "comments", + last_update: null, + has_next_page: null, + end_cursor: null, + total_count: null, + rowid: expect.anything(), + }, + { + object_id: "issue:#2", + fieldname: "timeline", + last_update: null, + has_next_page: null, + end_cursor: null, + total_count: null, + rowid: expect.anything(), + }, + // The repository had one issue fetched. + { + object_id: "repo:foo/bar", + fieldname: "issues", + last_update: updateId, + has_next_page: +true, + end_cursor: "cursor:repo:foo/bar.issues@0", + total_count: 2, + rowid: expect.anything(), + }, + ]); + + // Check that the connection entries are correct. + expect( + db + .prepare( + dedent`\ + SELECT + connections.object_id AS objectId, + connections.fieldname AS fieldname, + connection_entries.idx AS idx, + connection_entries.child_id AS childId + FROM connections JOIN connection_entries + ON connection_entries.connection_id = connections.rowid + ORDER BY objectId, fieldname, idx + ` + ) + .all() + ).toEqual([ + { + objectId: "issue:#1", + fieldname: "comments", + idx: 1, + childId: "comment:#1.1", + }, + { + objectId: "issue:#1", + fieldname: "timeline", + idx: 1, + childId: "issue:#1!closed#0", + }, + { + objectId: "repo:foo/bar", + fieldname: "issues", + idx: 1, + childId: "issue:#1", + }, + ]); + }); + }); + describe("_queryShallow", () => { it("fails when given a nonexistent type", () => { const db = new Database(":memory:"); @@ -2210,6 +2633,20 @@ describe("graphql/mirror", () => { }); }); + describe("_FIELD_PREFIXES", () => { + it("has the identity prefix relation", () => { + for (const k1 of Object.keys(_FIELD_PREFIXES)) { + for (const k2 of Object.keys(_FIELD_PREFIXES)) { + expect({k1, k2, isPrefix: k1.startsWith(k2)}).toEqual({ + k1, + k2, + isPrefix: k1 === k2, + }); + } + } + }); + }); + describe("_inTransaction", () => { it("runs its callback inside a transaction", () => { // We use an on-disk database file here because we need to open