diff --git a/app/controllers/api/v1/statuses_controller.rb b/app/controllers/api/v1/statuses_controller.rb index d3b0e89e9..e25b161af 100644 --- a/app/controllers/api/v1/statuses_controller.rb +++ b/app/controllers/api/v1/statuses_controller.rb @@ -2,6 +2,7 @@ class Api::V1::StatusesController < Api::BaseController include Authorization + include AsyncRefreshesConcern before_action -> { authorize_if_got_token! :read, :'read:statuses' }, except: [:create, :update, :destroy] before_action -> { doorkeeper_authorize! :write, :'write:statuses' }, only: [:create, :update, :destroy] @@ -57,9 +58,17 @@ class Api::V1::StatusesController < Api::BaseController @context = Context.new(ancestors: loaded_ancestors, descendants: loaded_descendants) statuses = [@status] + @context.ancestors + @context.descendants - render json: @context, serializer: REST::ContextSerializer, relationships: StatusRelationshipsPresenter.new(statuses, current_user&.account_id) + refresh_key = "context:#{@status.id}:refresh" + async_refresh = AsyncRefresh.new(refresh_key) - ActivityPub::FetchAllRepliesWorker.perform_async(@status.id) if !current_account.nil? && @status.should_fetch_replies? + if async_refresh.running? + add_async_refresh_header(async_refresh) + elsif !current_account.nil? && @status.should_fetch_replies? + add_async_refresh_header(AsyncRefresh.create(refresh_key)) + ActivityPub::FetchAllRepliesWorker.perform_async(@status.id) + end + + render json: @context, serializer: REST::ContextSerializer, relationships: StatusRelationshipsPresenter.new(statuses, current_user&.account_id) end def create diff --git a/app/javascript/mastodon/actions/statuses_typed.ts b/app/javascript/mastodon/actions/statuses_typed.ts index b98abbe12..cc9c389cd 100644 --- a/app/javascript/mastodon/actions/statuses_typed.ts +++ b/app/javascript/mastodon/actions/statuses_typed.ts @@ -1,3 +1,5 @@ +import { createAction } from '@reduxjs/toolkit'; + import { apiGetContext } from 'mastodon/api/statuses'; import { createDataLoadingThunk } from 'mastodon/store/typed_functions'; @@ -6,13 +8,18 @@ import { importFetchedStatuses } from './importer'; export const fetchContext = createDataLoadingThunk( 'status/context', ({ statusId }: { statusId: string }) => apiGetContext(statusId), - (context, { dispatch }) => { + ({ context, refresh }, { dispatch }) => { const statuses = context.ancestors.concat(context.descendants); dispatch(importFetchedStatuses(statuses)); return { context, + refresh, }; }, ); + +export const completeContextRefresh = createAction<{ statusId: string }>( + 'status/context/complete', +); diff --git a/app/javascript/mastodon/api.ts b/app/javascript/mastodon/api.ts index dc9c20b50..1820e00a5 100644 --- a/app/javascript/mastodon/api.ts +++ b/app/javascript/mastodon/api.ts @@ -20,6 +20,50 @@ export const getLinks = (response: AxiosResponse) => { return LinkHeader.parse(value); }; +export interface AsyncRefreshHeader { + id: string; + retry: number; +} + +const isAsyncRefreshHeader = (obj: object): obj is AsyncRefreshHeader => + 'id' in obj && 'retry' in obj; + +export const getAsyncRefreshHeader = ( + response: AxiosResponse, +): AsyncRefreshHeader | null => { + const value = response.headers['mastodon-async-refresh'] as + | string + | undefined; + + if (!value) { + return null; + } + + const asyncRefreshHeader: Record = {}; + + value.split(/,\s*/).forEach((pair) => { + const [key, val] = pair.split('=', 2); + + let typedValue: string | number; + + if (key && ['id', 'retry'].includes(key) && val) { + if (val.startsWith('"')) { + typedValue = val.slice(1, -1); + } else { + typedValue = parseInt(val); + } + + asyncRefreshHeader[key] = typedValue; + } + }); + + if (isAsyncRefreshHeader(asyncRefreshHeader)) { + return asyncRefreshHeader; + } + + return null; +}; + const csrfHeader: RawAxiosRequestHeaders = {}; const setCSRFHeader = () => { @@ -83,7 +127,7 @@ export default function api(withAuthorization = true) { return instance; } -type ApiUrl = `v${1 | 2}/${string}`; +type ApiUrl = `v${1 | '1_alpha' | 2}/${string}`; type RequestParamsOrData = Record; export async function apiRequest( diff --git a/app/javascript/mastodon/api/async_refreshes.ts b/app/javascript/mastodon/api/async_refreshes.ts new file mode 100644 index 000000000..953300a4a --- /dev/null +++ b/app/javascript/mastodon/api/async_refreshes.ts @@ -0,0 +1,5 @@ +import { apiRequestGet } from 'mastodon/api'; +import type { ApiAsyncRefreshJSON } from 'mastodon/api_types/async_refreshes'; + +export const apiGetAsyncRefresh = (id: string) => + apiRequestGet(`v1_alpha/async_refreshes/${id}`); diff --git a/app/javascript/mastodon/api/statuses.ts b/app/javascript/mastodon/api/statuses.ts index 921a7bfe6..48eff2a69 100644 --- a/app/javascript/mastodon/api/statuses.ts +++ b/app/javascript/mastodon/api/statuses.ts @@ -1,5 +1,14 @@ -import { apiRequestGet } from 'mastodon/api'; +import api, { getAsyncRefreshHeader } from 'mastodon/api'; import type { ApiContextJSON } from 'mastodon/api_types/statuses'; -export const apiGetContext = (statusId: string) => - apiRequestGet(`v1/statuses/${statusId}/context`); +export const apiGetContext = async (statusId: string) => { + const response = await api().request({ + method: 'GET', + url: `/api/v1/statuses/${statusId}/context`, + }); + + return { + context: response.data, + refresh: getAsyncRefreshHeader(response), + }; +}; diff --git a/app/javascript/mastodon/api_types/async_refreshes.ts b/app/javascript/mastodon/api_types/async_refreshes.ts new file mode 100644 index 000000000..2d2fed241 --- /dev/null +++ b/app/javascript/mastodon/api_types/async_refreshes.ts @@ -0,0 +1,7 @@ +export interface ApiAsyncRefreshJSON { + async_refresh: { + id: string; + status: 'running' | 'finished'; + result_count: number; + }; +} diff --git a/app/javascript/mastodon/features/status/components/refresh_controller.tsx b/app/javascript/mastodon/features/status/components/refresh_controller.tsx new file mode 100644 index 000000000..04046302b --- /dev/null +++ b/app/javascript/mastodon/features/status/components/refresh_controller.tsx @@ -0,0 +1,111 @@ +import { useEffect, useState, useCallback } from 'react'; + +import { useIntl, defineMessages, FormattedMessage } from 'react-intl'; + +import classNames from 'classnames'; + +import { + fetchContext, + completeContextRefresh, +} from 'mastodon/actions/statuses'; +import type { AsyncRefreshHeader } from 'mastodon/api'; +import { apiGetAsyncRefresh } from 'mastodon/api/async_refreshes'; +import { LoadingIndicator } from 'mastodon/components/loading_indicator'; +import { useAppSelector, useAppDispatch } from 'mastodon/store'; + +const messages = defineMessages({ + loading: { + id: 'status.context.loading', + defaultMessage: 'Checking for more replies', + }, +}); + +export const RefreshController: React.FC<{ + statusId: string; + withBorder?: boolean; +}> = ({ statusId, withBorder }) => { + const refresh = useAppSelector( + (state) => state.contexts.refreshing[statusId], + ); + const dispatch = useAppDispatch(); + const intl = useIntl(); + const [ready, setReady] = useState(false); + const [loading, setLoading] = useState(false); + + useEffect(() => { + let timeoutId: ReturnType; + + const scheduleRefresh = (refresh: AsyncRefreshHeader) => { + timeoutId = setTimeout(() => { + void apiGetAsyncRefresh(refresh.id).then((result) => { + if (result.async_refresh.status === 'finished') { + dispatch(completeContextRefresh({ statusId })); + + if (result.async_refresh.result_count > 0) { + setReady(true); + } + } else { + scheduleRefresh(refresh); + } + + return ''; + }); + }, refresh.retry * 1000); + }; + + if (refresh) { + scheduleRefresh(refresh); + } + + return () => { + clearTimeout(timeoutId); + }; + }, [dispatch, setReady, statusId, refresh]); + + const handleClick = useCallback(() => { + setLoading(true); + setReady(false); + + dispatch(fetchContext({ statusId })) + .then(() => { + setLoading(false); + return ''; + }) + .catch(() => { + setLoading(false); + }); + }, [dispatch, setReady, statusId]); + + if (ready && !loading) { + return ( + + ); + } + + if (!refresh && !loading) { + return null; + } + + return ( +
+ +
+ ); +}; diff --git a/app/javascript/mastodon/features/status/index.jsx b/app/javascript/mastodon/features/status/index.jsx index 64cd0c4f8..77d23f55f 100644 --- a/app/javascript/mastodon/features/status/index.jsx +++ b/app/javascript/mastodon/features/status/index.jsx @@ -68,7 +68,7 @@ import { attachFullscreenListener, detachFullscreenListener, isFullscreen } from import ActionBar from './components/action_bar'; import { DetailedStatus } from './components/detailed_status'; - +import { RefreshController } from './components/refresh_controller'; const messages = defineMessages({ revealAll: { id: 'status.show_more_all', defaultMessage: 'Show more for all' }, @@ -548,7 +548,7 @@ class Status extends ImmutablePureComponent { render () { let ancestors, descendants, remoteHint; - const { isLoading, status, ancestorsIds, descendantsIds, intl, domain, multiColumn, pictureInPicture } = this.props; + const { isLoading, status, ancestorsIds, descendantsIds, refresh, intl, domain, multiColumn, pictureInPicture } = this.props; const { fullscreen } = this.state; if (isLoading) { @@ -578,11 +578,9 @@ class Status extends ImmutablePureComponent { if (!isLocal) { remoteHint = ( - } - label={{status.getIn(['account', 'acct']).split('@')[1]} }} />} + ); } diff --git a/app/javascript/mastodon/locales/en.json b/app/javascript/mastodon/locales/en.json index 59d39a153..13b7aa421 100644 --- a/app/javascript/mastodon/locales/en.json +++ b/app/javascript/mastodon/locales/en.json @@ -424,8 +424,6 @@ "hints.profiles.see_more_followers": "See more followers on {domain}", "hints.profiles.see_more_follows": "See more follows on {domain}", "hints.profiles.see_more_posts": "See more posts on {domain}", - "hints.threads.replies_may_be_missing": "Replies from other servers may be missing.", - "hints.threads.see_more": "See more replies on {domain}", "home.column_settings.show_quotes": "Show quotes", "home.column_settings.show_reblogs": "Show boosts", "home.column_settings.show_replies": "Show replies", @@ -847,6 +845,8 @@ "status.bookmark": "Bookmark", "status.cancel_reblog_private": "Unboost", "status.cannot_reblog": "This post cannot be boosted", + "status.context.load_new_replies": "New replies available", + "status.context.loading": "Checking for more replies", "status.continued_thread": "Continued thread", "status.copy": "Copy link to post", "status.delete": "Delete", diff --git a/app/javascript/mastodon/reducers/contexts.ts b/app/javascript/mastodon/reducers/contexts.ts index 7ecc6e3b2..cf378b4c0 100644 --- a/app/javascript/mastodon/reducers/contexts.ts +++ b/app/javascript/mastodon/reducers/contexts.ts @@ -4,6 +4,7 @@ import type { Draft, UnknownAction } from '@reduxjs/toolkit'; import type { List as ImmutableList } from 'immutable'; import { timelineDelete } from 'mastodon/actions/timelines_typed'; +import type { AsyncRefreshHeader } from 'mastodon/api'; import type { ApiRelationshipJSON } from 'mastodon/api_types/relationships'; import type { ApiStatusJSON, @@ -12,7 +13,7 @@ import type { import type { Status } from 'mastodon/models/status'; import { blockAccountSuccess, muteAccountSuccess } from '../actions/accounts'; -import { fetchContext } from '../actions/statuses'; +import { fetchContext, completeContextRefresh } from '../actions/statuses'; import { TIMELINE_UPDATE } from '../actions/timelines'; import { compareId } from '../compare_id'; @@ -25,11 +26,13 @@ interface TimelineUpdateAction extends UnknownAction { interface State { inReplyTos: Record; replies: Record; + refreshing: Record; } const initialState: State = { inReplyTos: {}, replies: {}, + refreshing: {}, }; const normalizeContext = ( @@ -127,6 +130,13 @@ export const contextsReducer = createReducer(initialState, (builder) => { builder .addCase(fetchContext.fulfilled, (state, action) => { normalizeContext(state, action.meta.arg.statusId, action.payload.context); + + if (action.payload.refresh) { + state.refreshing[action.meta.arg.statusId] = action.payload.refresh; + } + }) + .addCase(completeContextRefresh, (state, action) => { + delete state.refreshing[action.payload.statusId]; }) .addCase(blockAccountSuccess, (state, action) => { filterContexts( diff --git a/app/models/concerns/status/fetch_replies_concern.rb b/app/models/concerns/status/fetch_replies_concern.rb index fd9929aba..cc117cb5a 100644 --- a/app/models/concerns/status/fetch_replies_concern.rb +++ b/app/models/concerns/status/fetch_replies_concern.rb @@ -3,9 +3,6 @@ module Status::FetchRepliesConcern extend ActiveSupport::Concern - # enable/disable fetching all replies - FETCH_REPLIES_ENABLED = ENV['FETCH_REPLIES_ENABLED'] == 'true' - # debounce fetching all replies to minimize DoS FETCH_REPLIES_COOLDOWN_MINUTES = (ENV['FETCH_REPLIES_COOLDOWN_MINUTES'] || 15).to_i.minutes FETCH_REPLIES_INITIAL_WAIT_MINUTES = (ENV['FETCH_REPLIES_INITIAL_WAIT_MINUTES'] || 5).to_i.minutes @@ -36,7 +33,7 @@ module Status::FetchRepliesConcern def should_fetch_replies? # we aren't brand new, and we haven't fetched replies since the debounce window - FETCH_REPLIES_ENABLED && !local? && created_at <= FETCH_REPLIES_INITIAL_WAIT_MINUTES.ago && ( + !local? && created_at <= FETCH_REPLIES_INITIAL_WAIT_MINUTES.ago && ( fetched_replies_at.nil? || fetched_replies_at <= FETCH_REPLIES_COOLDOWN_MINUTES.ago ) end diff --git a/app/models/worker_batch.rb b/app/models/worker_batch.rb new file mode 100644 index 000000000..f741071ba --- /dev/null +++ b/app/models/worker_batch.rb @@ -0,0 +1,79 @@ +# frozen_string_literal: true + +class WorkerBatch + include Redisable + + TTL = 3600 + + def initialize(id = nil) + @id = id || SecureRandom.hex(12) + end + + attr_reader :id + + # Connect the batch with an async refresh. When the number of processed jobs + # passes the given threshold, the async refresh will be marked as finished. + # @param [String] async_refresh_key + # @param [Float] threshold + def connect(async_refresh_key, threshold: 1.0) + redis.hset(key, { 'async_refresh_key' => async_refresh_key, 'threshold' => threshold }) + end + + # Add jobs to the batch. Usually when the batch is created. + # @param [Array] jids + def add_jobs(jids) + if jids.blank? + async_refresh_key = redis.hget(key, 'async_refresh_key') + + if async_refresh_key.present? + async_refresh = AsyncRefresh.new(async_refresh_key) + async_refresh.finish! + end + + return + end + + redis.multi do |pipeline| + pipeline.sadd(key('jobs'), jids) + pipeline.expire(key('jobs'), TTL) + pipeline.hincrby(key, 'pending', jids.size) + pipeline.expire(key, TTL) + end + end + + # Remove a job from the batch, such as when it's been processed or it has failed. + # @param [String] jid + def remove_job(jid) + _, pending, processed, async_refresh_key, threshold = redis.multi do |pipeline| + pipeline.srem(key('jobs'), jid) + pipeline.hincrby(key, 'pending', -1) + pipeline.hincrby(key, 'processed', 1) + pipeline.hget(key, 'async_refresh_key') + pipeline.hget(key, 'threshold') + end + + if async_refresh_key.present? + async_refresh = AsyncRefresh.new(async_refresh_key) + async_refresh.increment_result_count(by: 1) + async_refresh.finish! if pending.zero? || processed >= threshold.to_f * (processed + pending) + end + end + + # Get pending jobs. + # @returns [Array] + def jobs + redis.smembers(key('jobs')) + end + + # Inspect the batch. + # @returns [Hash] + def info + redis.hgetall(key) + end + + private + + def key(suffix = nil) + "worker_batch:#{@id}#{":#{suffix}" if suffix}" + end +end diff --git a/app/services/activitypub/fetch_all_replies_service.rb b/app/services/activitypub/fetch_all_replies_service.rb index 765e5c8ae..e9c1712ed 100644 --- a/app/services/activitypub/fetch_all_replies_service.rb +++ b/app/services/activitypub/fetch_all_replies_service.rb @@ -6,7 +6,7 @@ class ActivityPub::FetchAllRepliesService < ActivityPub::FetchRepliesService # Limit of replies to fetch per status MAX_REPLIES = (ENV['FETCH_REPLIES_MAX_SINGLE'] || 500).to_i - def call(status_uri, collection_or_uri, max_pages: 1, request_id: nil) + def call(status_uri, collection_or_uri, max_pages: 1, async_refresh_key: nil, request_id: nil) @status_uri = status_uri super diff --git a/app/services/activitypub/fetch_replies_service.rb b/app/services/activitypub/fetch_replies_service.rb index 6a6d9e391..25eb275ca 100644 --- a/app/services/activitypub/fetch_replies_service.rb +++ b/app/services/activitypub/fetch_replies_service.rb @@ -6,7 +6,7 @@ class ActivityPub::FetchRepliesService < BaseService # Limit of fetched replies MAX_REPLIES = 5 - def call(reference_uri, collection_or_uri, max_pages: 1, allow_synchronous_requests: true, request_id: nil) + def call(reference_uri, collection_or_uri, max_pages: 1, allow_synchronous_requests: true, async_refresh_key: nil, request_id: nil) @reference_uri = reference_uri @allow_synchronous_requests = allow_synchronous_requests @@ -14,7 +14,10 @@ class ActivityPub::FetchRepliesService < BaseService return if @items.nil? @items = filter_replies(@items) - FetchReplyWorker.push_bulk(@items) { |reply_uri| [reply_uri, { 'request_id' => request_id }] } + + batch = WorkerBatch.new + batch.connect(async_refresh_key) if async_refresh_key.present? + batch.add_jobs(FetchReplyWorker.push_bulk(@items) { |reply_uri| [reply_uri, { 'request_id' => request_id, 'batch_id' => batch.id }] }) [@items, n_pages] end diff --git a/app/workers/activitypub/fetch_all_replies_worker.rb b/app/workers/activitypub/fetch_all_replies_worker.rb index 40b251cf1..ab9eebc4e 100644 --- a/app/workers/activitypub/fetch_all_replies_worker.rb +++ b/app/workers/activitypub/fetch_all_replies_worker.rb @@ -55,7 +55,7 @@ class ActivityPub::FetchAllRepliesWorker replies_collection_or_uri = get_replies_uri(status) return if replies_collection_or_uri.nil? - ActivityPub::FetchAllRepliesService.new.call(value_or_id(status), replies_collection_or_uri, max_pages: max_pages, **options.deep_symbolize_keys) + ActivityPub::FetchAllRepliesService.new.call(value_or_id(status), replies_collection_or_uri, max_pages: max_pages, async_refresh_key: "context:#{@root_status.id}:refresh", **options.deep_symbolize_keys) end # Get the URI of the replies collection of a status diff --git a/app/workers/fetch_reply_worker.rb b/app/workers/fetch_reply_worker.rb index ecb232bbb..da3b9a8c1 100644 --- a/app/workers/fetch_reply_worker.rb +++ b/app/workers/fetch_reply_worker.rb @@ -7,6 +7,9 @@ class FetchReplyWorker sidekiq_options queue: 'pull', retry: 3 def perform(child_url, options = {}) + batch = WorkerBatch.new(options.delete('batch_id')) if options['batch_id'] FetchRemoteStatusService.new.call(child_url, **options.symbolize_keys) + ensure + batch&.remove_job(jid) end end diff --git a/spec/models/worker_batch_spec.rb b/spec/models/worker_batch_spec.rb new file mode 100644 index 000000000..b58dc4861 --- /dev/null +++ b/spec/models/worker_batch_spec.rb @@ -0,0 +1,104 @@ +# frozen_string_literal: true + +require 'rails_helper' + +RSpec.describe WorkerBatch do + subject { described_class.new } + + let(:async_refresh_key) { 'test_refresh' } + let(:async_refresh) { nil } + + describe '#id' do + it 'returns a string' do + expect(subject.id).to be_a String + end + end + + describe '#connect' do + before do + subject.connect(async_refresh_key, threshold: 0.75) + end + + it 'persists the async refresh key' do + expect(subject.info['async_refresh_key']).to eq async_refresh_key + end + + it 'persists the threshold' do + expect(subject.info['threshold']).to eq '0.75' + end + end + + describe '#add_jobs' do + before do + subject.connect(async_refresh_key, threshold: 0.5) if async_refresh.present? + subject.add_jobs([]) + end + + context 'when called with empty array' do + it 'does not persist the number of pending jobs' do + expect(subject.info).to be_empty + end + + it 'does not persist the job IDs' do + expect(subject.jobs).to eq [] + end + + context 'when async refresh is connected' do + let(:async_refresh) { AsyncRefresh.new(async_refresh_key) } + + it 'immediately marks the async refresh as finished' do + expect(async_refresh.reload.finished?).to be true + end + end + end + + context 'when called with an array of job IDs' do + before do + subject.add_jobs(%w(foo bar)) + end + + it 'persists the number of pending jobs' do + expect(subject.info['pending']).to eq '2' + end + + it 'persists the job IDs' do + expect(subject.jobs).to eq %w(foo bar) + end + end + end + + describe '#remove_job' do + before do + subject.connect(async_refresh_key, threshold: 0.5) if async_refresh.present? + subject.add_jobs(%w(foo bar baz)) + subject.remove_job('foo') + end + + it 'removes the job from pending jobs' do + expect(subject.jobs).to eq %w(bar baz) + end + + it 'decrements the number of pending jobs' do + expect(subject.info['pending']).to eq '2' + end + + context 'when async refresh is connected' do + let(:async_refresh) { AsyncRefresh.new(async_refresh_key) } + + it 'increments async refresh progress' do + expect(async_refresh.reload.result_count).to eq 1 + end + + it 'marks the async refresh as finished when the threshold is reached' do + subject.remove_job('bar') + expect(async_refresh.reload.finished?).to be true + end + end + end + + describe '#info' do + it 'returns a hash' do + expect(subject.info).to be_a Hash + end + end +end diff --git a/spec/workers/activitypub/fetch_all_replies_worker_spec.rb b/spec/workers/activitypub/fetch_all_replies_worker_spec.rb index 9a8bdac03..9795c4619 100644 --- a/spec/workers/activitypub/fetch_all_replies_worker_spec.rb +++ b/spec/workers/activitypub/fetch_all_replies_worker_spec.rb @@ -123,7 +123,6 @@ RSpec.describe ActivityPub::FetchAllRepliesWorker do end before do - stub_const('Status::FetchRepliesConcern::FETCH_REPLIES_ENABLED', true) all_items.each do |item| next if [top_note_uri, reply_note_uri].include? item