From 6e50f2395ebd598089fef7c50cd42cf2be3377c7 Mon Sep 17 00:00:00 2001 From: Andrew Pietila Date: Mon, 17 Apr 2023 23:19:25 -0500 Subject: [PATCH] Perform fetch_object queue tasks. --- lib/queue_worker.js | 37 ++++++++++++++++++++++++++++++++++--- routes/inbox.js | 1 + 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/lib/queue_worker.js b/lib/queue_worker.js index 5a1e218..6bad41b 100644 --- a/lib/queue_worker.js +++ b/lib/queue_worker.js @@ -8,10 +8,10 @@ var crypto = require('crypto'); module.exports = { do_one_queue: async () => { var result = await db('queue').orderBy('id', 'asc').limit(1); + var data = JSON.parse(result[0]["data"]); if ( result[0] ) { await db('queue').del().where('id', '=', result[0]["id"]); if ( result[0]["type"] === "verify_inbox" ) { - var data = JSON.parse(result[0]["data"]); var sig_header = data.sig_header; var signature_split = sig_header.split(/,/); var signature_elements = {}; @@ -60,8 +60,8 @@ module.exports = { // TODO: We should care that the key hostname and the object hostname match. await db("object_box").insert({ origin: parsedBody.id, - to, - cc, + to: `[${to.join(',')}]`, + cc: `[${cc.join(',')}]`, object: parsedBody, signedby: keyUrl.toString(), expires: (new Date()).getTime() + 604800000 // 7 days. @@ -69,6 +69,37 @@ module.exports = { } else { await db("queue").insert({task: "fetch_object", data: JSON.stringify({object: parsedBody.id, actor: data["actor"]})}); } + } else if ( result[0] === "fetch_object" ) { + await db('queue').del().where('id', '=', result[0]["id"]); + var exists = await db("object_box").where("origin", "=", data["object"]).andWhere("expires", ">=", (new Date()).getTime()); + if ( exists.length !== 0 ) { + return; + } + let parsedBody = await jsonld.compact(JSON.parse(http_agent.request(secondKeyUrl, {actor: data["actor"]})["data"]), ["https://www.w3.org/ns/activitystreams", { security: "https://w3id.org/security#"}]); + let to = parsedBody.to; + if ( typeof to === "string" ) { + to = [to]; + } + let cc = parsedBody.cc; + if ( typeof cc === "string" ) { + cc = [cc]; + } + await db("object_box").insert({ + origin: parsedBody.id, + to: `[${to.join(',')}]`, + cc: `[${cc.join(',')}]`, + object: parsedBody, + signedby: "fetch", + // TODO: We should respect the caching instructions provided by origin. + expires: (new Date()).getTime() + 604800000 // 7 days. + }); + } else { + // TODO: Surface this one as an error. + await db('queue').del().where('id', '=', result[0]["id"]); + await db('queue').insert({ + task: result[0]["task"], + data: result[0]["data"] + }); } } } diff --git a/routes/inbox.js b/routes/inbox.js index f4bc949..45ed467 100644 --- a/routes/inbox.js +++ b/routes/inbox.js @@ -35,6 +35,7 @@ module.exports = { } }).join('\n'); // ... then lets dump this in the queue for now. + // TODO: This'll probably be a bug someday: https://stackoverflow.com/a/69299910 await db("queue").insert({task: "verify_inbox", data: JSON.stringify({sig_header: req.header("signature"), signed_block, body: req.rawBody.toString("UTF-8"), date: (new Date()).toISOString(), actor: `${req.hostname}@${req.hostname}`})}); res.status(204); return res.end();