From ccfac2716d29cc46d80006c06c0b148f815c2dba Mon Sep 17 00:00:00 2001 From: Claire Date: Tue, 28 Oct 2025 14:23:05 +0100 Subject: [PATCH] Add streaming server side filtering for live/topic feed settings (#36585) --- streaming/index.js | 127 +++++++++++++++++++++++++++++---------------- 1 file changed, 81 insertions(+), 46 deletions(-) diff --git a/streaming/index.js b/streaming/index.js index 952b4584a..113eabc03 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -19,6 +19,7 @@ import * as Redis from './redis.js'; import { isTruthy, normalizeHashtag, firstParam } from './utils.js'; const environment = process.env.NODE_ENV || 'development'; +const PERMISSION_VIEW_FEEDS = 0x0000000000100000; // Correctly detect and load .env or .env.production file based on environment: const dotenvFile = environment === 'production' ? '.env.production' : '.env'; @@ -44,6 +45,7 @@ initializeLogLevel(process.env, environment); * @property {string[]} scopes * @property {string} accountId * @property {string[]} chosenLanguages + * @property {number} permissions */ @@ -351,7 +353,7 @@ const startServer = async () => { * @returns {Promise} */ const accountFromToken = async (token, req) => { - const result = await pgPool.query('SELECT oauth_access_tokens.id, oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages, oauth_access_tokens.scopes FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id INNER JOIN accounts ON accounts.id = users.account_id WHERE oauth_access_tokens.token = $1 AND oauth_access_tokens.revoked_at IS NULL AND users.disabled IS FALSE AND accounts.suspended_at IS NULL LIMIT 1', [token]); + const result = await pgPool.query('SELECT oauth_access_tokens.id, oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages, oauth_access_tokens.scopes, COALESCE(user_roles.permissions, 0) AS permissions FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id INNER JOIN accounts ON accounts.id = users.account_id LEFT OUTER JOIN user_roles ON user_roles.id = users.role_id WHERE oauth_access_tokens.token = $1 AND oauth_access_tokens.revoked_at IS NULL AND users.disabled IS FALSE AND accounts.suspended_at IS NULL LIMIT 1', [token]); if (result.rows.length === 0) { throw new AuthenticationError('Invalid access token'); @@ -367,6 +369,7 @@ const startServer = async () => { scopes: result.rows[0].scopes.split(' '), accountId: result.rows[0].account_id, chosenLanguages: result.rows[0].chosen_languages, + permissions: result.rows[0].permissions, }; }; @@ -583,6 +586,41 @@ const startServer = async () => { } }; + /** + * @param {string} kind + * @param {ResolvedAccount} account + * @returns {Promise.<{ localAccess: boolean, remoteAccess: boolean }>} + */ + const getFeedAccessSettings = async (kind, account) => { + const access = { localAccess: true, remoteAccess: true }; + + if (account.permissions & PERMISSION_VIEW_FEEDS) { + return access; + } + + let localAccessVar, remoteAccessVar; + + if (kind === 'hashtag') { + localAccessVar = 'local_topic_feed_access'; + remoteAccessVar = 'remote_topic_feed_access'; + } else { + localAccessVar = 'local_live_feed_access'; + remoteAccessVar = 'remote_live_feed_access'; + } + + const result = await pgPool.query('SELECT var, value FROM settings WHERE var IN ($1, $2)', [localAccessVar, remoteAccessVar]); + + result.rows.forEach((row) => { + if (row.var === localAccessVar) { + access.localAccess = row.value !== "--- disabled\n"; + } else { + access.remoteAccess = row.value !== "--- disabled\n"; + } + }); + + return access; + }; + /** * @param {string[]} channelIds * @param {http.IncomingMessage & ResolvedAccount} req @@ -590,10 +628,13 @@ const startServer = async () => { * @param {function(string, string): void} output * @param {undefined | function(string[], SubscriptionListener): void} attachCloseHandler * @param {'websocket' | 'eventsource'} destinationType - * @param {boolean=} needsFiltering + * @param {Object} options + * @param {boolean} options.needsFiltering + * @param {boolean=} options.filterLocal + * @param {boolean=} options.filterRemote * @returns {SubscriptionListener} */ - const streamFrom = (channelIds, req, log, output, attachCloseHandler, destinationType, needsFiltering = false) => { + const streamFrom = (channelIds, req, log, output, attachCloseHandler, destinationType, { needsFiltering, filterLocal, filterRemote } = { needsFiltering: false, filterLocal: false, filterRemote: false }) => { log.info({ channelIds }, `Starting stream`); /** @@ -641,6 +682,12 @@ const startServer = async () => { // The rest of the logic from here on in this function is to handle // filtering of statuses: + const localPayload = payload.account.username === payload.account.acct; + if (localPayload ? filterLocal : filterRemote) { + log.debug(`Message ${payload.id} filtered by feed settings`); + return; + } + // Filter based on language: if (Array.isArray(req.chosenLanguages) && req.chosenLanguages.indexOf(payload.language) === -1) { log.debug(`Message ${payload.id} filtered by language (${payload.language})`); @@ -946,7 +993,7 @@ const startServer = async () => { const onEnd = streamHttpEnd(req, subscriptionHeartbeat(channelIds)); // @ts-ignore - streamFrom(channelIds, req, req.log, onSend, onEnd, 'eventsource', options.needsFiltering); + streamFrom(channelIds, req, req.log, onSend, onEnd, 'eventsource', options); }).catch(err => { const {statusCode, errorMessage } = extractErrorStatusAndMessage(err); @@ -982,9 +1029,25 @@ const startServer = async () => { * @param {any} req * @param {string} name * @param {StreamParams} params - * @returns {Promise.<{ channelIds: string[], options: { needsFiltering: boolean } }>} + * @returns {Promise.<{ channelIds: string[], options: { needsFiltering: boolean, filterLocal?: boolean, filterRemote?: boolean } }>} */ const channelNameToIds = (req, name, params) => new Promise((resolve, reject) => { + /** + * @param {string} feedKind + * @param {string} channelId + * @param {{ needsFiltering: boolean }} options + */ + const resolveFeed = (feedKind, channelId, options) => { + getFeedAccessSettings(feedKind, req).then(({ localAccess, remoteAccess }) => { + resolve({ + channelIds: [channelId], + options: { ...options, filterLocal: !localAccess, filterRemote: !remoteAccess }, + }); + }).catch(() => { + reject(new Error('Error getting feed access settings')); + }); + }; + switch (name) { case 'user': resolve({ @@ -1001,46 +1064,22 @@ const startServer = async () => { break; case 'public': - resolve({ - channelIds: ['timeline:public'], - options: { needsFiltering: true }, - }); - + resolveFeed('public', 'timeline:public', { needsFiltering: true }); break; case 'public:local': - resolve({ - channelIds: ['timeline:public:local'], - options: { needsFiltering: true }, - }); - + resolveFeed('public', 'timeline:public:local', { needsFiltering: true }); break; case 'public:remote': - resolve({ - channelIds: ['timeline:public:remote'], - options: { needsFiltering: true }, - }); - + resolveFeed('public', 'timeline:public:remote', { needsFiltering: true }); break; case 'public:media': - resolve({ - channelIds: ['timeline:public:media'], - options: { needsFiltering: true }, - }); - + resolveFeed('public', 'timeline:public:media', { needsFiltering: true }); break; case 'public:local:media': - resolve({ - channelIds: ['timeline:public:local:media'], - options: { needsFiltering: true }, - }); - + resolveFeed('public', 'timeline:public:local:media', { needsFiltering: true }); break; case 'public:remote:media': - resolve({ - channelIds: ['timeline:public:remote:media'], - options: { needsFiltering: true }, - }); - + resolveFeed('public', 'timeline:public:remote:media', { needsFiltering: true }); break; case 'direct': resolve({ @@ -1052,24 +1091,20 @@ const startServer = async () => { case 'hashtag': if (!params.tag) { reject(new RequestError('Missing tag name parameter')); - } else { - resolve({ - channelIds: [`timeline:hashtag:${normalizeHashtag(params.tag)}`], - options: { needsFiltering: true }, - }); + return; } + resolveFeed('hashtag', `timeline:hashtag:${normalizeHashtag(params.tag)}`, { needsFiltering: true }); + break; case 'hashtag:local': if (!params.tag) { reject(new RequestError('Missing tag name parameter')); - } else { - resolve({ - channelIds: [`timeline:hashtag:${normalizeHashtag(params.tag)}:local`], - options: { needsFiltering: true }, - }); + return; } + resolveFeed('hashtag', `timeline:hashtag:${normalizeHashtag(params.tag)}:local`, { needsFiltering: true }); + break; case 'list': if (!params.list) { @@ -1132,7 +1167,7 @@ const startServer = async () => { const onSend = streamToWs(request, websocket, streamNameFromChannelName(channelName, params)); const stopHeartbeat = subscriptionHeartbeat(channelIds); - const listener = streamFrom(channelIds, request, logger, onSend, undefined, 'websocket', options.needsFiltering); + const listener = streamFrom(channelIds, request, logger, onSend, undefined, 'websocket', options); metrics.connectedChannels.labels({ type: 'websocket', channel: channelName }).inc();