Add Fetch All Replies Part 1: Backend (#32615)
Signed-off-by: sneakers-the-rat <sneakers-the-rat@protonmail.com> Co-authored-by: jonny <j@nny.fyi> Co-authored-by: Claire <claire.github-309c@sitedethib.com> Co-authored-by: Kouhai <66407198+kouhaidev@users.noreply.github.com>
This commit is contained in:
		@@ -86,3 +86,24 @@ S3_ALIAS_HOST=files.example.com
 | 
			
		||||
# -----------------------
 | 
			
		||||
IP_RETENTION_PERIOD=31556952
 | 
			
		||||
SESSION_RETENTION_PERIOD=31556952
 | 
			
		||||
 | 
			
		||||
# Fetch All Replies Behavior
 | 
			
		||||
# --------------------------
 | 
			
		||||
# When a user expands a post (DetailedStatus view), fetch all of its replies
 | 
			
		||||
# (default: true if unset, set explicitly to ``false`` to disable)
 | 
			
		||||
FETCH_REPLIES_ENABLED=true
 | 
			
		||||
 | 
			
		||||
# Period to wait between fetching replies (in minutes)
 | 
			
		||||
FETCH_REPLIES_COOLDOWN_MINUTES=15
 | 
			
		||||
 | 
			
		||||
# Period to wait after a post is first created before fetching its replies (in minutes)
 | 
			
		||||
FETCH_REPLIES_INITIAL_WAIT_MINUTES=5
 | 
			
		||||
 | 
			
		||||
# Max number of replies to fetch - total, recursively through a whole reply tree
 | 
			
		||||
FETCH_REPLIES_MAX_GLOBAL=1000
 | 
			
		||||
 | 
			
		||||
# Max number of replies to fetch - for a single post
 | 
			
		||||
FETCH_REPLIES_MAX_SINGLE=500
 | 
			
		||||
 | 
			
		||||
# Max number of replies Collection pages to fetch - total
 | 
			
		||||
FETCH_REPLIES_MAX_PAGES=500
 | 
			
		||||
 
 | 
			
		||||
@@ -58,6 +58,8 @@ class Api::V1::StatusesController < Api::BaseController
 | 
			
		||||
    statuses = [@status] + @context.ancestors + @context.descendants
 | 
			
		||||
 | 
			
		||||
    render json: @context, serializer: REST::ContextSerializer, relationships: StatusRelationshipsPresenter.new(statuses, current_user&.account_id)
 | 
			
		||||
 | 
			
		||||
    ActivityPub::FetchAllRepliesWorker.perform_async(@status.id) if !current_account.nil? && @status.should_fetch_replies?
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def create
 | 
			
		||||
 
 | 
			
		||||
@@ -155,24 +155,49 @@ module JsonLdHelper
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def fetch_resource(uri, id_is_known, on_behalf_of = nil, request_options: {})
 | 
			
		||||
  # Fetch the resource given by uri.
 | 
			
		||||
  # @param uri [String]
 | 
			
		||||
  # @param id_is_known [Boolean]
 | 
			
		||||
  # @param on_behalf_of [nil, Account]
 | 
			
		||||
  # @param raise_on_error [Boolean, Symbol<:all, :temporary>] See {#fetch_resource_without_id_validation} for possible values
 | 
			
		||||
  def fetch_resource(uri, id_is_known, on_behalf_of = nil, raise_on_error: false, request_options: {})
 | 
			
		||||
    unless id_is_known
 | 
			
		||||
      json = fetch_resource_without_id_validation(uri, on_behalf_of)
 | 
			
		||||
      json = fetch_resource_without_id_validation(uri, on_behalf_of, raise_on_error: raise_on_error)
 | 
			
		||||
 | 
			
		||||
      return if !json.is_a?(Hash) || unsupported_uri_scheme?(json['id'])
 | 
			
		||||
 | 
			
		||||
      uri = json['id']
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    json = fetch_resource_without_id_validation(uri, on_behalf_of, request_options: request_options)
 | 
			
		||||
    json = fetch_resource_without_id_validation(uri, on_behalf_of, raise_on_error: raise_on_error, request_options: request_options)
 | 
			
		||||
    json.present? && json['id'] == uri ? json : nil
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def fetch_resource_without_id_validation(uri, on_behalf_of = nil, raise_on_temporary_error = false, request_options: {})
 | 
			
		||||
  # Fetch the resource given by uri
 | 
			
		||||
  #
 | 
			
		||||
  # If an error is raised, it contains the response and can be captured for handling like
 | 
			
		||||
  #
 | 
			
		||||
  #     begin
 | 
			
		||||
  #       fetch_resource_without_id_validation(uri, nil, true)
 | 
			
		||||
  #     rescue Mastodon::UnexpectedResponseError => e
 | 
			
		||||
  #       e.response
 | 
			
		||||
  #     end
 | 
			
		||||
  #
 | 
			
		||||
  # @param uri [String]
 | 
			
		||||
  # @param on_behalf_of [nil, Account]
 | 
			
		||||
  # @param raise_on_error [Boolean, Symbol<:all, :temporary>]
 | 
			
		||||
  #   - +true+, +:all+ - raise if response code is not in the 2** range
 | 
			
		||||
  #   - +:temporary+ - raise if the response code is not an "unsalvageable error" like a 404
 | 
			
		||||
  #     (see {#response_error_unsalvageable} )
 | 
			
		||||
  #   - +false+ - do not raise, return +nil+
 | 
			
		||||
  def fetch_resource_without_id_validation(uri, on_behalf_of = nil, raise_on_error: false, request_options: {})
 | 
			
		||||
    on_behalf_of ||= Account.representative
 | 
			
		||||
 | 
			
		||||
    build_request(uri, on_behalf_of, options: request_options).perform do |response|
 | 
			
		||||
      raise Mastodon::UnexpectedResponseError, response unless response_successful?(response) || response_error_unsalvageable?(response) || !raise_on_temporary_error
 | 
			
		||||
      raise Mastodon::UnexpectedResponseError, response if !response_successful?(response) && (
 | 
			
		||||
        [true, :all].include?(raise_on_error) ||
 | 
			
		||||
        (!response_error_unsalvageable?(response) && raise_on_error == :temporary)
 | 
			
		||||
      )
 | 
			
		||||
 | 
			
		||||
      body_to_json(response.body_with_limit) if response.code == 200 && valid_activitypub_content_type?(response)
 | 
			
		||||
    end
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										53
									
								
								app/models/concerns/status/fetch_replies_concern.rb
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										53
									
								
								app/models/concerns/status/fetch_replies_concern.rb
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,53 @@
 | 
			
		||||
# frozen_string_literal: true
 | 
			
		||||
 | 
			
		||||
module Status::FetchRepliesConcern
 | 
			
		||||
  extend ActiveSupport::Concern
 | 
			
		||||
 | 
			
		||||
  # enable/disable fetching all replies
 | 
			
		||||
  FETCH_REPLIES_ENABLED = ENV.key?('FETCH_REPLIES_ENABLED') ? ENV['FETCH_REPLIES_ENABLED'] == 'true' : true
 | 
			
		||||
 | 
			
		||||
  # debounce fetching all replies to minimize DoS
 | 
			
		||||
  FETCH_REPLIES_COOLDOWN_MINUTES = (ENV['FETCH_REPLIES_COOLDOWN_MINUTES'] || 15).to_i.minutes
 | 
			
		||||
  FETCH_REPLIES_INITIAL_WAIT_MINUTES = (ENV['FETCH_REPLIES_INITIAL_WAIT_MINUTES'] || 5).to_i.minutes
 | 
			
		||||
 | 
			
		||||
  included do
 | 
			
		||||
    scope :created_recently, -> { where(created_at: FETCH_REPLIES_INITIAL_WAIT_MINUTES.ago..) }
 | 
			
		||||
    scope :not_created_recently, -> { where(created_at: ..FETCH_REPLIES_INITIAL_WAIT_MINUTES.ago) }
 | 
			
		||||
    scope :fetched_recently, -> { where(fetched_replies_at: FETCH_REPLIES_COOLDOWN_MINUTES.ago..) }
 | 
			
		||||
    scope :not_fetched_recently, -> { where(fetched_replies_at: [nil, ..FETCH_REPLIES_COOLDOWN_MINUTES.ago]) }
 | 
			
		||||
 | 
			
		||||
    scope :should_not_fetch_replies, -> { local.or(created_recently.or(fetched_recently)) }
 | 
			
		||||
    scope :should_fetch_replies, -> { remote.not_created_recently.not_fetched_recently }
 | 
			
		||||
 | 
			
		||||
    # statuses for which we won't receive update or deletion actions,
 | 
			
		||||
    # and should update when fetching replies
 | 
			
		||||
    # Status from an account which either
 | 
			
		||||
    # a) has only remote followers
 | 
			
		||||
    # b) has local follows that were created after the last update time, or
 | 
			
		||||
    # c) has no known followers
 | 
			
		||||
    scope :unsubscribed, lambda {
 | 
			
		||||
      remote.merge(
 | 
			
		||||
        Status.left_outer_joins(account: :followers).where.not(followers_accounts: { domain: nil })
 | 
			
		||||
              .or(where.not('follows.created_at < statuses.updated_at'))
 | 
			
		||||
              .or(where(follows: { id: nil }))
 | 
			
		||||
      )
 | 
			
		||||
    }
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def should_fetch_replies?
 | 
			
		||||
    # we aren't brand new, and we haven't fetched replies since the debounce window
 | 
			
		||||
    FETCH_REPLIES_ENABLED && !local? && created_at <= FETCH_REPLIES_INITIAL_WAIT_MINUTES.ago && (
 | 
			
		||||
      fetched_replies_at.nil? || fetched_replies_at <= FETCH_REPLIES_COOLDOWN_MINUTES.ago
 | 
			
		||||
    )
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def unsubscribed?
 | 
			
		||||
    return false if local?
 | 
			
		||||
 | 
			
		||||
    !Follow.joins(:account).exists?(
 | 
			
		||||
      target_account: account.id,
 | 
			
		||||
      account: { domain: nil },
 | 
			
		||||
      created_at: ..updated_at
 | 
			
		||||
    )
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
@@ -27,6 +27,7 @@
 | 
			
		||||
#  edited_at                    :datetime
 | 
			
		||||
#  trendable                    :boolean
 | 
			
		||||
#  ordered_media_attachment_ids :bigint(8)        is an Array
 | 
			
		||||
#  fetched_replies_at           :datetime
 | 
			
		||||
#
 | 
			
		||||
 | 
			
		||||
class Status < ApplicationRecord
 | 
			
		||||
@@ -34,6 +35,7 @@ class Status < ApplicationRecord
 | 
			
		||||
  include Discard::Model
 | 
			
		||||
  include Paginable
 | 
			
		||||
  include RateLimitable
 | 
			
		||||
  include Status::FetchRepliesConcern
 | 
			
		||||
  include Status::SafeReblogInsert
 | 
			
		||||
  include Status::SearchConcern
 | 
			
		||||
  include Status::SnapshotConcern
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										68
									
								
								app/services/activitypub/fetch_all_replies_service.rb
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										68
									
								
								app/services/activitypub/fetch_all_replies_service.rb
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,68 @@
 | 
			
		||||
# frozen_string_literal: true
 | 
			
		||||
 | 
			
		||||
class ActivityPub::FetchAllRepliesService < ActivityPub::FetchRepliesService
 | 
			
		||||
  include JsonLdHelper
 | 
			
		||||
 | 
			
		||||
  # Limit of replies to fetch per status
 | 
			
		||||
  MAX_REPLIES = (ENV['FETCH_REPLIES_MAX_SINGLE'] || 500).to_i
 | 
			
		||||
 | 
			
		||||
  def call(collection_or_uri, status_uri, max_pages = nil, request_id: nil)
 | 
			
		||||
    @allow_synchronous_requests = true
 | 
			
		||||
    @collection_or_uri = collection_or_uri
 | 
			
		||||
    @status_uri = status_uri
 | 
			
		||||
 | 
			
		||||
    @items, n_pages = collection_items(collection_or_uri, max_pages)
 | 
			
		||||
    @items = filtered_replies
 | 
			
		||||
    return if @items.nil?
 | 
			
		||||
 | 
			
		||||
    FetchReplyWorker.push_bulk(@items) { |reply_uri| [reply_uri, { 'request_id' => request_id }] }
 | 
			
		||||
 | 
			
		||||
    [@items, n_pages]
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  private
 | 
			
		||||
 | 
			
		||||
  def filtered_replies
 | 
			
		||||
    return if @items.nil?
 | 
			
		||||
 | 
			
		||||
    # Find all statuses that we *shouldn't* update the replies for, and use that as a filter.
 | 
			
		||||
    # We don't assume that we have the statuses before they're created,
 | 
			
		||||
    # hence the negative filter -
 | 
			
		||||
    # "keep all these uris except the ones we already have"
 | 
			
		||||
    # instead of
 | 
			
		||||
    # "keep all these uris that match some conditions on existing Status objects"
 | 
			
		||||
    #
 | 
			
		||||
    # Typically we assume the number of replies we *shouldn't* fetch is smaller than the
 | 
			
		||||
    # replies we *should* fetch, so we also minimize the number of uris we should load here.
 | 
			
		||||
    uris = @items.map { |item| value_or_id(item) }
 | 
			
		||||
 | 
			
		||||
    # Expand collection to get replies in the DB that were
 | 
			
		||||
    # - not included in the collection,
 | 
			
		||||
    # - that we have locally
 | 
			
		||||
    # - but we have no local followers and thus don't get updates/deletes for
 | 
			
		||||
    parent_id = Status.where(uri: @status_uri).pick(:id)
 | 
			
		||||
    unless parent_id.nil?
 | 
			
		||||
      unsubscribed_replies = Status
 | 
			
		||||
                             .where.not(uri: uris)
 | 
			
		||||
                             .where(in_reply_to_id: parent_id)
 | 
			
		||||
                             .unsubscribed
 | 
			
		||||
                             .pluck(:uri)
 | 
			
		||||
      uris.concat(unsubscribed_replies)
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    dont_update = Status.where(uri: uris).should_not_fetch_replies.pluck(:uri)
 | 
			
		||||
 | 
			
		||||
    # touch all statuses that already exist and that we're about to update
 | 
			
		||||
    Status.where(uri: uris).should_fetch_replies.touch_all(:fetched_replies_at)
 | 
			
		||||
 | 
			
		||||
    # Reject all statuses that we already have in the db
 | 
			
		||||
    uris = (uris - dont_update).take(MAX_REPLIES)
 | 
			
		||||
 | 
			
		||||
    Rails.logger.debug { "FetchAllRepliesService - #{@collection_or_uri}: Fetching filtered statuses: #{uris}" }
 | 
			
		||||
    uris
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def filter_by_host?
 | 
			
		||||
    false
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
@@ -33,7 +33,7 @@ class ActivityPub::FetchFeaturedCollectionService < BaseService
 | 
			
		||||
    return collection_or_uri if collection_or_uri.is_a?(Hash)
 | 
			
		||||
    return if non_matching_uri_hosts?(@account.uri, collection_or_uri)
 | 
			
		||||
 | 
			
		||||
    fetch_resource_without_id_validation(collection_or_uri, local_follower, true)
 | 
			
		||||
    fetch_resource_without_id_validation(collection_or_uri, local_follower, raise_on_error: :temporary)
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def process_items(items)
 | 
			
		||||
 
 | 
			
		||||
@@ -45,7 +45,7 @@ class ActivityPub::FetchFeaturedTagsCollectionService < BaseService
 | 
			
		||||
    return collection_or_uri if collection_or_uri.is_a?(Hash)
 | 
			
		||||
    return if non_matching_uri_hosts?(@account.uri, collection_or_uri)
 | 
			
		||||
 | 
			
		||||
    fetch_resource_without_id_validation(collection_or_uri, local_follower, true)
 | 
			
		||||
    fetch_resource_without_id_validation(collection_or_uri, local_follower, raise_on_error: :temporary)
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def process_items(items)
 | 
			
		||||
 
 | 
			
		||||
@@ -13,7 +13,7 @@ class ActivityPub::FetchRemoteStatusService < BaseService
 | 
			
		||||
 | 
			
		||||
    @request_id = request_id || "#{Time.now.utc.to_i}-status-#{uri}"
 | 
			
		||||
    @json = if prefetched_body.nil?
 | 
			
		||||
              fetch_resource(uri, true, on_behalf_of)
 | 
			
		||||
              fetch_status(uri, true, on_behalf_of)
 | 
			
		||||
            else
 | 
			
		||||
              body_to_json(prefetched_body, compare_id: uri)
 | 
			
		||||
            end
 | 
			
		||||
@@ -80,4 +80,20 @@ class ActivityPub::FetchRemoteStatusService < BaseService
 | 
			
		||||
  def expected_object_type?
 | 
			
		||||
    equals_or_includes_any?(@json['type'], ActivityPub::Activity::Create::SUPPORTED_TYPES + ActivityPub::Activity::Create::CONVERTED_TYPES)
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def fetch_status(uri, id_is_known, on_behalf_of = nil)
 | 
			
		||||
    begin
 | 
			
		||||
      fetch_resource(uri, id_is_known, on_behalf_of, raise_on_error: true)
 | 
			
		||||
    rescue Mastodon::UnexpectedResponseError => e
 | 
			
		||||
      return unless e.response.code == 404
 | 
			
		||||
 | 
			
		||||
      # If this is a 404 from a status from an account that has no local followers, delete it
 | 
			
		||||
      existing_status = Status.find_by(uri: uri)
 | 
			
		||||
      if !existing_status.nil? && existing_status.unsubscribed? && existing_status.distributable?
 | 
			
		||||
        Rails.logger.debug { "FetchRemoteStatusService - Got 404 for orphaned status with URI #{uri}, deleting" }
 | 
			
		||||
        Tombstone.find_or_create_by(uri: uri, account: existing_status.account)
 | 
			
		||||
        RemoveStatusService.new.call(existing_status, redraft: false)
 | 
			
		||||
      end
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
 
 | 
			
		||||
@@ -3,11 +3,14 @@
 | 
			
		||||
class ActivityPub::FetchRepliesService < BaseService
 | 
			
		||||
  include JsonLdHelper
 | 
			
		||||
 | 
			
		||||
  # Limit of fetched replies
 | 
			
		||||
  MAX_REPLIES = 5
 | 
			
		||||
 | 
			
		||||
  def call(parent_status, collection_or_uri, allow_synchronous_requests: true, request_id: nil)
 | 
			
		||||
    @account = parent_status.account
 | 
			
		||||
    @allow_synchronous_requests = allow_synchronous_requests
 | 
			
		||||
 | 
			
		||||
    @items = collection_items(collection_or_uri)
 | 
			
		||||
    @items, = collection_items(collection_or_uri)
 | 
			
		||||
    return if @items.nil?
 | 
			
		||||
 | 
			
		||||
    FetchReplyWorker.push_bulk(filtered_replies) { |reply_uri| [reply_uri, { 'request_id' => request_id }] }
 | 
			
		||||
@@ -17,25 +20,39 @@ class ActivityPub::FetchRepliesService < BaseService
 | 
			
		||||
 | 
			
		||||
  private
 | 
			
		||||
 | 
			
		||||
  def collection_items(collection_or_uri)
 | 
			
		||||
  def collection_items(collection_or_uri, max_pages = nil)
 | 
			
		||||
    collection = fetch_collection(collection_or_uri)
 | 
			
		||||
    return unless collection.is_a?(Hash)
 | 
			
		||||
 | 
			
		||||
    collection = fetch_collection(collection['first']) if collection['first'].present?
 | 
			
		||||
    return unless collection.is_a?(Hash)
 | 
			
		||||
 | 
			
		||||
    case collection['type']
 | 
			
		||||
    when 'Collection', 'CollectionPage'
 | 
			
		||||
      as_array(collection['items'])
 | 
			
		||||
    when 'OrderedCollection', 'OrderedCollectionPage'
 | 
			
		||||
      as_array(collection['orderedItems'])
 | 
			
		||||
    all_items = []
 | 
			
		||||
    n_pages = 1
 | 
			
		||||
    while collection.is_a?(Hash)
 | 
			
		||||
      items = case collection['type']
 | 
			
		||||
              when 'Collection', 'CollectionPage'
 | 
			
		||||
                collection['items']
 | 
			
		||||
              when 'OrderedCollection', 'OrderedCollectionPage'
 | 
			
		||||
                collection['orderedItems']
 | 
			
		||||
              end
 | 
			
		||||
 | 
			
		||||
      all_items.concat(as_array(items))
 | 
			
		||||
 | 
			
		||||
      break if all_items.size >= MAX_REPLIES
 | 
			
		||||
      break if !max_pages.nil? && n_pages >= max_pages
 | 
			
		||||
 | 
			
		||||
      collection = collection['next'].present? ? fetch_collection(collection['next']) : nil
 | 
			
		||||
      n_pages += 1
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    [all_items, n_pages]
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def fetch_collection(collection_or_uri)
 | 
			
		||||
    return collection_or_uri if collection_or_uri.is_a?(Hash)
 | 
			
		||||
    return unless @allow_synchronous_requests
 | 
			
		||||
    return if non_matching_uri_hosts?(@account.uri, collection_or_uri)
 | 
			
		||||
    return if filter_by_host? && non_matching_uri_hosts?(@account.uri, collection_or_uri)
 | 
			
		||||
 | 
			
		||||
    # NOTE: For backward compatibility reasons, Mastodon signs outgoing
 | 
			
		||||
    # queries incorrectly by default.
 | 
			
		||||
@@ -45,19 +62,28 @@ class ActivityPub::FetchRepliesService < BaseService
 | 
			
		||||
    #
 | 
			
		||||
    # Therefore, retry with correct signatures if this fails.
 | 
			
		||||
    begin
 | 
			
		||||
      fetch_resource_without_id_validation(collection_or_uri, nil, true)
 | 
			
		||||
      fetch_resource_without_id_validation(collection_or_uri, nil, raise_on_error: :temporary)
 | 
			
		||||
    rescue Mastodon::UnexpectedResponseError => e
 | 
			
		||||
      raise unless e.response && e.response.code == 401 && Addressable::URI.parse(collection_or_uri).query.present?
 | 
			
		||||
 | 
			
		||||
      fetch_resource_without_id_validation(collection_or_uri, nil, true, request_options: { omit_query_string: false })
 | 
			
		||||
      fetch_resource_without_id_validation(collection_or_uri, nil, raise_on_error: :temporary, request_options: { omit_query_string: false })
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def filtered_replies
 | 
			
		||||
    # Only fetch replies to the same server as the original status to avoid
 | 
			
		||||
    # amplification attacks.
 | 
			
		||||
    if filter_by_host?
 | 
			
		||||
      # Only fetch replies to the same server as the original status to avoid
 | 
			
		||||
      # amplification attacks.
 | 
			
		||||
 | 
			
		||||
    # Also limit to 5 fetched replies to limit potential for DoS.
 | 
			
		||||
    @items.map { |item| value_or_id(item) }.reject { |uri| non_matching_uri_hosts?(@account.uri, uri) }.take(5)
 | 
			
		||||
      # Also limit to 5 fetched replies to limit potential for DoS.
 | 
			
		||||
      @items.map { |item| value_or_id(item) }.reject { |uri| non_matching_uri_hosts?(@account.uri, uri) }.take(MAX_REPLIES)
 | 
			
		||||
    else
 | 
			
		||||
      @items.map { |item| value_or_id(item) }.take(MAX_REPLIES)
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  # Whether replies with a different domain than the replied_to post should be rejected
 | 
			
		||||
  def filter_by_host?
 | 
			
		||||
    true
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
 
 | 
			
		||||
@@ -69,6 +69,6 @@ class ActivityPub::SynchronizeFollowersService < BaseService
 | 
			
		||||
    return collection_or_uri if collection_or_uri.is_a?(Hash)
 | 
			
		||||
    return if non_matching_uri_hosts?(@account.uri, collection_or_uri)
 | 
			
		||||
 | 
			
		||||
    fetch_resource_without_id_validation(collection_or_uri, nil, true)
 | 
			
		||||
    fetch_resource_without_id_validation(collection_or_uri, nil, raise_on_error: :temporary)
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										77
									
								
								app/workers/activitypub/fetch_all_replies_worker.rb
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										77
									
								
								app/workers/activitypub/fetch_all_replies_worker.rb
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,77 @@
 | 
			
		||||
# frozen_string_literal: true
 | 
			
		||||
 | 
			
		||||
# Fetch all replies to a status, querying recursively through
 | 
			
		||||
# ActivityPub replies collections, fetching any statuses that
 | 
			
		||||
# we either don't already have or we haven't checked for new replies
 | 
			
		||||
# in the Status::FETCH_REPLIES_COOLDOWN_MINUTES interval
 | 
			
		||||
class ActivityPub::FetchAllRepliesWorker
 | 
			
		||||
  include Sidekiq::Worker
 | 
			
		||||
  include ExponentialBackoff
 | 
			
		||||
  include JsonLdHelper
 | 
			
		||||
 | 
			
		||||
  sidekiq_options queue: 'pull', retry: 3
 | 
			
		||||
 | 
			
		||||
  # Global max replies to fetch per request (all replies, recursively)
 | 
			
		||||
  MAX_REPLIES = (ENV['FETCH_REPLIES_MAX_GLOBAL'] || 1000).to_i
 | 
			
		||||
  MAX_PAGES = (ENV['FETCH_REPLIES_MAX_PAGES'] || 500).to_i
 | 
			
		||||
 | 
			
		||||
  def perform(parent_status_id, options = {})
 | 
			
		||||
    @parent_status = Status.find(parent_status_id)
 | 
			
		||||
    return unless @parent_status.should_fetch_replies?
 | 
			
		||||
 | 
			
		||||
    @parent_status.touch(:fetched_replies_at)
 | 
			
		||||
    Rails.logger.debug { "FetchAllRepliesWorker - #{@parent_status.uri}: Fetching all replies for status: #{@parent_status}" }
 | 
			
		||||
 | 
			
		||||
    uris_to_fetch, n_pages = get_replies(@parent_status.uri, MAX_PAGES, options)
 | 
			
		||||
    return if uris_to_fetch.nil?
 | 
			
		||||
 | 
			
		||||
    fetched_uris = uris_to_fetch.clone.to_set
 | 
			
		||||
 | 
			
		||||
    until uris_to_fetch.empty? || fetched_uris.length >= MAX_REPLIES || n_pages >= MAX_PAGES
 | 
			
		||||
      next_reply = uris_to_fetch.pop
 | 
			
		||||
      next if next_reply.nil?
 | 
			
		||||
 | 
			
		||||
      new_reply_uris, new_n_pages = get_replies(next_reply, MAX_PAGES - n_pages, options)
 | 
			
		||||
      next if new_reply_uris.nil?
 | 
			
		||||
 | 
			
		||||
      new_reply_uris = new_reply_uris.reject { |uri| fetched_uris.include?(uri) }
 | 
			
		||||
 | 
			
		||||
      uris_to_fetch.concat(new_reply_uris)
 | 
			
		||||
      fetched_uris = fetched_uris.merge(new_reply_uris)
 | 
			
		||||
      n_pages += new_n_pages
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    Rails.logger.debug { "FetchAllRepliesWorker - #{parent_status_id}: fetched #{fetched_uris.length} replies" }
 | 
			
		||||
    fetched_uris
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  private
 | 
			
		||||
 | 
			
		||||
  def get_replies(status_uri, max_pages, options = {})
 | 
			
		||||
    replies_collection_or_uri = get_replies_uri(status_uri)
 | 
			
		||||
    return if replies_collection_or_uri.nil?
 | 
			
		||||
 | 
			
		||||
    ActivityPub::FetchAllRepliesService.new.call(replies_collection_or_uri, status_uri, max_pages, **options.deep_symbolize_keys)
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def get_replies_uri(parent_status_uri)
 | 
			
		||||
    begin
 | 
			
		||||
      json_status = fetch_resource(parent_status_uri, true)
 | 
			
		||||
      if json_status.nil?
 | 
			
		||||
        Rails.logger.debug { "FetchAllRepliesWorker - #{@parent_status.uri}: Could not get replies URI for #{parent_status_uri}, returned nil" }
 | 
			
		||||
        nil
 | 
			
		||||
      elsif !json_status.key?('replies')
 | 
			
		||||
        Rails.logger.debug { "FetchAllRepliesWorker - #{@parent_status.uri}: No replies collection found in ActivityPub object: #{json_status}" }
 | 
			
		||||
        nil
 | 
			
		||||
      else
 | 
			
		||||
        json_status['replies']
 | 
			
		||||
      end
 | 
			
		||||
    rescue => e
 | 
			
		||||
      Rails.logger.error { "FetchAllRepliesWorker - #{@parent_status.uri}: Caught exception while resolving replies URI #{parent_status_uri}: #{e} - #{e.message}" }
 | 
			
		||||
      # Raise if we can't get the collection for top-level status to trigger retry
 | 
			
		||||
      raise e if parent_status_uri == @parent_status.uri
 | 
			
		||||
 | 
			
		||||
      nil
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
@@ -0,0 +1,7 @@
 | 
			
		||||
# frozen_string_literal: true
 | 
			
		||||
 | 
			
		||||
class AddFetchedRepliesAtToStatus < ActiveRecord::Migration[7.1]
 | 
			
		||||
  def change
 | 
			
		||||
    add_column :statuses, :fetched_replies_at, :datetime, null: true
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
@@ -1053,6 +1053,7 @@ ActiveRecord::Schema[8.0].define(version: 2025_03_05_074104) do
 | 
			
		||||
    t.datetime "edited_at", precision: nil
 | 
			
		||||
    t.boolean "trendable"
 | 
			
		||||
    t.bigint "ordered_media_attachment_ids", array: true
 | 
			
		||||
    t.datetime "fetched_replies_at"
 | 
			
		||||
    t.index ["account_id", "id", "visibility", "updated_at"], name: "index_statuses_20190820", order: { id: :desc }, where: "(deleted_at IS NULL)"
 | 
			
		||||
    t.index ["account_id"], name: "index_statuses_on_account_id"
 | 
			
		||||
    t.index ["deleted_at"], name: "index_statuses_on_deleted_at", where: "(deleted_at IS NOT NULL)"
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										132
									
								
								spec/models/concerns/status/fetch_replies_concern_spec.rb
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										132
									
								
								spec/models/concerns/status/fetch_replies_concern_spec.rb
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,132 @@
 | 
			
		||||
# frozen_string_literal: true
 | 
			
		||||
 | 
			
		||||
require 'rails_helper'
 | 
			
		||||
 | 
			
		||||
RSpec.describe Status::FetchRepliesConcern do
 | 
			
		||||
  ActiveRecord.verbose_query_logs = true
 | 
			
		||||
 | 
			
		||||
  let!(:alice)  { Fabricate(:account, username: 'alice') }
 | 
			
		||||
  let!(:bob)    { Fabricate(:account, username: 'bob', domain: 'other.com') }
 | 
			
		||||
 | 
			
		||||
  let!(:account) { alice }
 | 
			
		||||
  let!(:status_old) { Fabricate(:status, account: account, fetched_replies_at: 1.year.ago, created_at: 1.year.ago) }
 | 
			
		||||
  let!(:status_fetched_recently) { Fabricate(:status, account: account, fetched_replies_at: 1.second.ago, created_at: 1.year.ago) }
 | 
			
		||||
  let!(:status_created_recently) { Fabricate(:status, account: account, created_at: 1.second.ago) }
 | 
			
		||||
  let!(:status_never_fetched) { Fabricate(:status, account: account, created_at: 1.year.ago) }
 | 
			
		||||
 | 
			
		||||
  describe 'should_fetch_replies' do
 | 
			
		||||
    let!(:statuses) { Status.should_fetch_replies.all }
 | 
			
		||||
 | 
			
		||||
    context 'with a local status' do
 | 
			
		||||
      it 'never fetches local replies' do
 | 
			
		||||
        expect(statuses).to eq([])
 | 
			
		||||
      end
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    context 'with a remote status' do
 | 
			
		||||
      let(:account) { bob }
 | 
			
		||||
 | 
			
		||||
      it 'fetches old statuses' do
 | 
			
		||||
        expect(statuses).to include(status_old)
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      it 'fetches statuses that have never been fetched and weren\'t created recently' do
 | 
			
		||||
        expect(statuses).to include(status_never_fetched)
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      it 'does not fetch statuses that were fetched recently' do
 | 
			
		||||
        expect(statuses).to_not include(status_fetched_recently)
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      it 'does not fetch statuses that were created recently' do
 | 
			
		||||
        expect(statuses).to_not include(status_created_recently)
 | 
			
		||||
      end
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  describe 'should_not_fetch_replies' do
 | 
			
		||||
    let!(:statuses) { Status.should_not_fetch_replies.all }
 | 
			
		||||
 | 
			
		||||
    context 'with a local status' do
 | 
			
		||||
      it 'does not fetch local statuses' do
 | 
			
		||||
        expect(statuses).to include(status_old, status_never_fetched, status_fetched_recently, status_never_fetched)
 | 
			
		||||
      end
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    context 'with a remote status' do
 | 
			
		||||
      let(:account) { bob }
 | 
			
		||||
 | 
			
		||||
      it 'fetches old statuses' do
 | 
			
		||||
        expect(statuses).to_not include(status_old)
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      it 'fetches statuses that have never been fetched and weren\'t created recently' do
 | 
			
		||||
        expect(statuses).to_not include(status_never_fetched)
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      it 'does not fetch statuses that were fetched recently' do
 | 
			
		||||
        expect(statuses).to include(status_fetched_recently)
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      it 'does not fetch statuses that were created recently' do
 | 
			
		||||
        expect(statuses).to include(status_created_recently)
 | 
			
		||||
      end
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  describe 'unsubscribed' do
 | 
			
		||||
    let!(:spike)  { Fabricate(:account, username: 'spike', domain: 'other.com') }
 | 
			
		||||
    let!(:status) { Fabricate(:status, account: bob, updated_at: 1.day.ago) }
 | 
			
		||||
 | 
			
		||||
    context 'when the status is from an account with only remote followers after last update' do
 | 
			
		||||
      before do
 | 
			
		||||
        Fabricate(:follow, account: spike, target_account: bob)
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      it 'shows the status as unsubscribed' do
 | 
			
		||||
        expect(Status.unsubscribed).to eq([status])
 | 
			
		||||
        expect(status.unsubscribed?).to be(true)
 | 
			
		||||
      end
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    context 'when the status is from an account with only remote followers before last update' do
 | 
			
		||||
      before do
 | 
			
		||||
        Fabricate(:follow, account: spike, target_account: bob, created_at: 2.days.ago)
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      it 'shows the status as unsubscribed' do
 | 
			
		||||
        expect(Status.unsubscribed).to eq([status])
 | 
			
		||||
        expect(status.unsubscribed?).to be(true)
 | 
			
		||||
      end
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    context 'when status is from account with local followers after last update' do
 | 
			
		||||
      before do
 | 
			
		||||
        Fabricate(:follow, account: alice, target_account: bob)
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      it 'shows the status as unsubscribed' do
 | 
			
		||||
        expect(Status.unsubscribed).to eq([status])
 | 
			
		||||
        expect(status.unsubscribed?).to be(true)
 | 
			
		||||
      end
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    context 'when status is from account with local followers before last update' do
 | 
			
		||||
      before do
 | 
			
		||||
        Fabricate(:follow, account: alice, target_account: bob, created_at: 2.days.ago)
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      it 'does not show the status as unsubscribed' do
 | 
			
		||||
        expect(Status.unsubscribed).to eq([])
 | 
			
		||||
        expect(status.unsubscribed?).to be(false)
 | 
			
		||||
      end
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    context 'when the status has no followers' do
 | 
			
		||||
      it 'shows the status as unsubscribed' do
 | 
			
		||||
        expect(Status.unsubscribed).to eq([status])
 | 
			
		||||
        expect(status.unsubscribed?).to be(true)
 | 
			
		||||
      end
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
							
								
								
									
										90
									
								
								spec/services/activitypub/fetch_all_replies_service_spec.rb
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										90
									
								
								spec/services/activitypub/fetch_all_replies_service_spec.rb
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,90 @@
 | 
			
		||||
# frozen_string_literal: true
 | 
			
		||||
 | 
			
		||||
require 'rails_helper'
 | 
			
		||||
 | 
			
		||||
RSpec.describe ActivityPub::FetchAllRepliesService do
 | 
			
		||||
  subject { described_class.new }
 | 
			
		||||
 | 
			
		||||
  let(:actor)          { Fabricate(:account, domain: 'example.com', uri: 'http://example.com/account') }
 | 
			
		||||
  let(:status)         { Fabricate(:status, account: actor) }
 | 
			
		||||
  let(:collection_uri) { 'http://example.com/replies/1' }
 | 
			
		||||
 | 
			
		||||
  let(:items) do
 | 
			
		||||
    [
 | 
			
		||||
      'http://example.com/self-reply-1',
 | 
			
		||||
      'http://example.com/self-reply-2',
 | 
			
		||||
      'http://example.com/self-reply-3',
 | 
			
		||||
      'http://other.com/other-reply-1',
 | 
			
		||||
      'http://other.com/other-reply-2',
 | 
			
		||||
      'http://other.com/other-reply-3',
 | 
			
		||||
      'http://example.com/self-reply-4',
 | 
			
		||||
      'http://example.com/self-reply-5',
 | 
			
		||||
      'http://example.com/self-reply-6',
 | 
			
		||||
    ]
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  let(:payload) do
 | 
			
		||||
    {
 | 
			
		||||
      '@context': 'https://www.w3.org/ns/activitystreams',
 | 
			
		||||
      type: 'Collection',
 | 
			
		||||
      id: collection_uri,
 | 
			
		||||
      items: items,
 | 
			
		||||
    }.with_indifferent_access
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  describe '#call' do
 | 
			
		||||
    it 'fetches more than the default maximum and from multiple domains' do
 | 
			
		||||
      allow(FetchReplyWorker).to receive(:push_bulk)
 | 
			
		||||
 | 
			
		||||
      subject.call(payload, status.uri)
 | 
			
		||||
 | 
			
		||||
      expect(FetchReplyWorker).to have_received(:push_bulk).with(%w(http://example.com/self-reply-1 http://example.com/self-reply-2 http://example.com/self-reply-3 http://other.com/other-reply-1 http://other.com/other-reply-2 http://other.com/other-reply-3 http://example.com/self-reply-4
 | 
			
		||||
                                                                    http://example.com/self-reply-5 http://example.com/self-reply-6))
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    context 'with a recent status' do
 | 
			
		||||
      before do
 | 
			
		||||
        Fabricate(:status, uri: 'http://example.com/self-reply-2', fetched_replies_at: 1.second.ago, local: false)
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      it 'skips statuses that have been updated recently' do
 | 
			
		||||
        allow(FetchReplyWorker).to receive(:push_bulk)
 | 
			
		||||
 | 
			
		||||
        subject.call(payload, status.uri)
 | 
			
		||||
 | 
			
		||||
        expect(FetchReplyWorker).to have_received(:push_bulk).with(%w(http://example.com/self-reply-1 http://example.com/self-reply-3 http://other.com/other-reply-1 http://other.com/other-reply-2 http://other.com/other-reply-3 http://example.com/self-reply-4 http://example.com/self-reply-5 http://example.com/self-reply-6))
 | 
			
		||||
      end
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    context 'with an old status' do
 | 
			
		||||
      before do
 | 
			
		||||
        Fabricate(:status, uri: 'http://other.com/other-reply-1', fetched_replies_at: 1.year.ago, created_at: 1.year.ago, account: actor)
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      it 'updates the time that fetched statuses were last fetched' do
 | 
			
		||||
        allow(FetchReplyWorker).to receive(:push_bulk)
 | 
			
		||||
 | 
			
		||||
        subject.call(payload, status.uri)
 | 
			
		||||
 | 
			
		||||
        expect(Status.find_by(uri: 'http://other.com/other-reply-1').fetched_replies_at).to be >= 1.minute.ago
 | 
			
		||||
      end
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    context 'with unsubscribed replies' do
 | 
			
		||||
      before do
 | 
			
		||||
        remote_actor = Fabricate(:account, domain: 'other.com', uri: 'http://other.com/account')
 | 
			
		||||
        # reply not in the collection from the remote instance, but we know about anyway without anyone following the account
 | 
			
		||||
        Fabricate(:status, account: remote_actor, in_reply_to_id: status.id, uri: 'http://other.com/account/unsubscribed', fetched_replies_at: 1.year.ago, created_at: 1.year.ago)
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      it 'updates the unsubscribed replies' do
 | 
			
		||||
        allow(FetchReplyWorker).to receive(:push_bulk)
 | 
			
		||||
 | 
			
		||||
        subject.call(payload, status.uri)
 | 
			
		||||
 | 
			
		||||
        expect(FetchReplyWorker).to have_received(:push_bulk).with(%w(http://example.com/self-reply-1 http://example.com/self-reply-2 http://example.com/self-reply-3 http://other.com/other-reply-1 http://other.com/other-reply-2 http://other.com/other-reply-3 http://example.com/self-reply-4
 | 
			
		||||
                                                                      http://example.com/self-reply-5 http://example.com/self-reply-6 http://other.com/account/unsubscribed))
 | 
			
		||||
      end
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
@@ -9,6 +9,9 @@ RSpec.describe ActivityPub::FetchRemoteStatusService do
 | 
			
		||||
 | 
			
		||||
  let!(:sender) { Fabricate(:account, domain: 'foo.bar', uri: 'https://foo.bar') }
 | 
			
		||||
 | 
			
		||||
  let(:follower) { Fabricate(:account, username: 'alice') }
 | 
			
		||||
  let(:follow) { nil }
 | 
			
		||||
  let(:response) { { body: Oj.dump(object), headers: { 'content-type': 'application/activity+json' } } }
 | 
			
		||||
  let(:existing_status) { nil }
 | 
			
		||||
 | 
			
		||||
  let(:note) do
 | 
			
		||||
@@ -23,13 +26,14 @@ RSpec.describe ActivityPub::FetchRemoteStatusService do
 | 
			
		||||
 | 
			
		||||
  before do
 | 
			
		||||
    stub_request(:get, 'https://foo.bar/watch?v=12345').to_return(status: 404, body: '')
 | 
			
		||||
    stub_request(:get, object[:id]).to_return(body: Oj.dump(object))
 | 
			
		||||
    stub_request(:get, object[:id]).to_return(**response)
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  describe '#call' do
 | 
			
		||||
    before do
 | 
			
		||||
      follow
 | 
			
		||||
      existing_status
 | 
			
		||||
      subject.call(object[:id], prefetched_body: Oj.dump(object))
 | 
			
		||||
      subject.call(object[:id])
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    context 'with Note object' do
 | 
			
		||||
@@ -254,6 +258,51 @@ RSpec.describe ActivityPub::FetchRemoteStatusService do
 | 
			
		||||
          expect(existing_status.text).to eq 'Lorem ipsum'
 | 
			
		||||
          expect(existing_status.edits).to_not be_empty
 | 
			
		||||
        end
 | 
			
		||||
 | 
			
		||||
        context 'when the status appears to have been deleted at source' do
 | 
			
		||||
          let(:response) { { status: 404, body: '' } }
 | 
			
		||||
 | 
			
		||||
          shared_examples 'no delete' do
 | 
			
		||||
            it 'does not delete the status' do
 | 
			
		||||
              existing_status.reload
 | 
			
		||||
              expect(existing_status.text).to eq 'Foo'
 | 
			
		||||
              expect(existing_status.edits).to be_empty
 | 
			
		||||
            end
 | 
			
		||||
          end
 | 
			
		||||
 | 
			
		||||
          context 'when the status is orphaned/unsubscribed' do
 | 
			
		||||
            it 'deletes the orphaned status' do
 | 
			
		||||
              expect { existing_status.reload }.to raise_error(ActiveRecord::RecordNotFound)
 | 
			
		||||
            end
 | 
			
		||||
          end
 | 
			
		||||
 | 
			
		||||
          context 'when the status is from an account with only remote followers' do
 | 
			
		||||
            let(:follower) { Fabricate(:account, username: 'alice', domain: 'foo.bar') }
 | 
			
		||||
            let(:follow) { Fabricate(:follow, account: follower, target_account: sender, created_at: 2.days.ago) }
 | 
			
		||||
 | 
			
		||||
            it 'deletes the orphaned status' do
 | 
			
		||||
              expect { existing_status.reload }.to raise_error(ActiveRecord::RecordNotFound)
 | 
			
		||||
            end
 | 
			
		||||
 | 
			
		||||
            context 'when the status is private' do
 | 
			
		||||
              let(:existing_status) { Fabricate(:status, account: sender, text: 'Foo', uri: note[:id], visibility: :private) }
 | 
			
		||||
 | 
			
		||||
              it_behaves_like 'no delete'
 | 
			
		||||
            end
 | 
			
		||||
 | 
			
		||||
            context 'when the status is direct' do
 | 
			
		||||
              let(:existing_status) { Fabricate(:status, account: sender, text: 'Foo', uri: note[:id], visibility: :direct) }
 | 
			
		||||
 | 
			
		||||
              it_behaves_like 'no delete'
 | 
			
		||||
            end
 | 
			
		||||
          end
 | 
			
		||||
 | 
			
		||||
          context 'when the status is from an account with local followers' do
 | 
			
		||||
            let(:follow) { Fabricate(:follow, account: follower, target_account: sender, created_at: 2.days.ago) }
 | 
			
		||||
 | 
			
		||||
            it_behaves_like 'no delete'
 | 
			
		||||
          end
 | 
			
		||||
        end
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      context 'with a Create activity' do
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										280
									
								
								spec/workers/activitypub/fetch_all_replies_worker_spec.rb
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										280
									
								
								spec/workers/activitypub/fetch_all_replies_worker_spec.rb
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,280 @@
 | 
			
		||||
# frozen_string_literal: true
 | 
			
		||||
 | 
			
		||||
require 'rails_helper'
 | 
			
		||||
 | 
			
		||||
RSpec.describe ActivityPub::FetchAllRepliesWorker do
 | 
			
		||||
  subject { described_class.new }
 | 
			
		||||
 | 
			
		||||
  let(:top_items) do
 | 
			
		||||
    [
 | 
			
		||||
      'http://example.com/self-reply-1',
 | 
			
		||||
      'http://other.com/other-reply-2',
 | 
			
		||||
      'http://example.com/self-reply-3',
 | 
			
		||||
    ]
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  let(:top_items_paged) do
 | 
			
		||||
    [
 | 
			
		||||
      'http://example.com/self-reply-4',
 | 
			
		||||
      'http://other.com/other-reply-5',
 | 
			
		||||
      'http://example.com/self-reply-6',
 | 
			
		||||
    ]
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  let(:nested_items) do
 | 
			
		||||
    [
 | 
			
		||||
      'http://example.com/nested-self-reply-1',
 | 
			
		||||
      'http://other.com/nested-other-reply-2',
 | 
			
		||||
      'http://example.com/nested-self-reply-3',
 | 
			
		||||
    ]
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  let(:nested_items_paged) do
 | 
			
		||||
    [
 | 
			
		||||
      'http://example.com/nested-self-reply-4',
 | 
			
		||||
      'http://other.com/nested-other-reply-5',
 | 
			
		||||
      'http://example.com/nested-self-reply-6',
 | 
			
		||||
    ]
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  let(:all_items) do
 | 
			
		||||
    top_items + top_items_paged + nested_items + nested_items_paged
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  let(:top_note_uri) do
 | 
			
		||||
    'http://example.com/top-post'
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  let(:top_collection_uri) do
 | 
			
		||||
    'http://example.com/top-post/replies'
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  # The reply uri that has the nested replies under it
 | 
			
		||||
  let(:reply_note_uri) do
 | 
			
		||||
    'http://other.com/other-reply-2'
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  # The collection uri of nested replies
 | 
			
		||||
  let(:reply_collection_uri) do
 | 
			
		||||
    'http://other.com/other-reply-2/replies'
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  let(:replies_top) do
 | 
			
		||||
    {
 | 
			
		||||
      '@context': 'https://www.w3.org/ns/activitystreams',
 | 
			
		||||
      id: top_collection_uri,
 | 
			
		||||
      type: 'Collection',
 | 
			
		||||
      items: top_items + top_items_paged,
 | 
			
		||||
    }
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  let(:replies_nested) do
 | 
			
		||||
    {
 | 
			
		||||
      '@context': 'https://www.w3.org/ns/activitystreams',
 | 
			
		||||
      id: reply_collection_uri,
 | 
			
		||||
      type: 'Collection',
 | 
			
		||||
      items: nested_items + nested_items_paged,
 | 
			
		||||
    }
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  # The status resource for the top post
 | 
			
		||||
  let(:top_object) do
 | 
			
		||||
    {
 | 
			
		||||
      '@context': 'https://www.w3.org/ns/activitystreams',
 | 
			
		||||
      id: top_note_uri,
 | 
			
		||||
      type: 'Note',
 | 
			
		||||
      content: 'Lorem ipsum',
 | 
			
		||||
      replies: replies_top,
 | 
			
		||||
      attributedTo: 'https://example.com',
 | 
			
		||||
    }
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  # The status resource that has the uri to the replies collection
 | 
			
		||||
  let(:reply_object) do
 | 
			
		||||
    {
 | 
			
		||||
      '@context': 'https://www.w3.org/ns/activitystreams',
 | 
			
		||||
      id: reply_note_uri,
 | 
			
		||||
      type: 'Note',
 | 
			
		||||
      content: 'Lorem ipsum',
 | 
			
		||||
      replies: replies_nested,
 | 
			
		||||
      attributedTo: 'https://other.com',
 | 
			
		||||
    }
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  let(:empty_object) do
 | 
			
		||||
    {
 | 
			
		||||
      '@context': 'https://www.w3.org/ns/activitystreams',
 | 
			
		||||
      id: 'https://example.com/empty',
 | 
			
		||||
      type: 'Note',
 | 
			
		||||
      content: 'Lorem ipsum',
 | 
			
		||||
      replies: [],
 | 
			
		||||
      attributedTo: 'https://example.com',
 | 
			
		||||
    }
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  let(:account) { Fabricate(:account, domain: 'example.com') }
 | 
			
		||||
  let(:status) do
 | 
			
		||||
    Fabricate(
 | 
			
		||||
      :status,
 | 
			
		||||
      account: account,
 | 
			
		||||
      uri: top_note_uri,
 | 
			
		||||
      created_at: 1.day.ago - Status::FetchRepliesConcern::FETCH_REPLIES_INITIAL_WAIT_MINUTES
 | 
			
		||||
    )
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  before do
 | 
			
		||||
    allow(FetchReplyWorker).to receive(:push_bulk)
 | 
			
		||||
    all_items.each do |item|
 | 
			
		||||
      next if [top_note_uri, reply_note_uri].include? item
 | 
			
		||||
 | 
			
		||||
      stub_request(:get, item).to_return(status: 200, body: Oj.dump(empty_object), headers: { 'Content-Type': 'application/activity+json' })
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    stub_request(:get, top_note_uri).to_return(status: 200, body: Oj.dump(top_object), headers: { 'Content-Type': 'application/activity+json' })
 | 
			
		||||
    stub_request(:get, reply_note_uri).to_return(status: 200, body: Oj.dump(reply_object), headers: { 'Content-Type': 'application/activity+json' })
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  shared_examples 'fetches all replies' do
 | 
			
		||||
    it 'fetches statuses recursively' do
 | 
			
		||||
      got_uris = subject.perform(status.id)
 | 
			
		||||
      expect(got_uris).to match_array(all_items)
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    it 'respects the maximum limits set by not recursing after the max is reached' do
 | 
			
		||||
      stub_const('ActivityPub::FetchAllRepliesWorker::MAX_REPLIES', 5)
 | 
			
		||||
      got_uris = subject.perform(status.id)
 | 
			
		||||
      expect(got_uris).to match_array(top_items + top_items_paged)
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  describe 'perform' do
 | 
			
		||||
    context 'when the payload is a Note with replies as a Collection of inlined replies' do
 | 
			
		||||
      it_behaves_like 'fetches all replies'
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    context 'when the payload is a Note with replies as a URI to a Collection' do
 | 
			
		||||
      let(:top_object) do
 | 
			
		||||
        {
 | 
			
		||||
          '@context': 'https://www.w3.org/ns/activitystreams',
 | 
			
		||||
          id: top_note_uri,
 | 
			
		||||
          type: 'Note',
 | 
			
		||||
          content: 'Lorem ipsum',
 | 
			
		||||
          replies: top_collection_uri,
 | 
			
		||||
          attributedTo: 'https://example.com',
 | 
			
		||||
        }
 | 
			
		||||
      end
 | 
			
		||||
      let(:reply_object) do
 | 
			
		||||
        {
 | 
			
		||||
          '@context': 'https://www.w3.org/ns/activitystreams',
 | 
			
		||||
          id: reply_note_uri,
 | 
			
		||||
          type: 'Note',
 | 
			
		||||
          content: 'Lorem ipsum',
 | 
			
		||||
          replies: reply_collection_uri,
 | 
			
		||||
          attributedTo: 'https://other.com',
 | 
			
		||||
        }
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      before do
 | 
			
		||||
        stub_request(:get, top_collection_uri).to_return(status: 200, body: Oj.dump(replies_top), headers: { 'Content-Type': 'application/activity+json' })
 | 
			
		||||
        stub_request(:get, reply_collection_uri).to_return(status: 200, body: Oj.dump(replies_nested), headers: { 'Content-Type': 'application/activity+json' })
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      it_behaves_like 'fetches all replies'
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    context 'when the payload is a Note with replies as a paginated collection' do
 | 
			
		||||
      let(:top_page_2_uri) do
 | 
			
		||||
        "#{top_collection_uri}/2"
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      let(:reply_page_2_uri) do
 | 
			
		||||
        "#{reply_collection_uri}/2"
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      let(:top_object) do
 | 
			
		||||
        {
 | 
			
		||||
          '@context': 'https://www.w3.org/ns/activitystreams',
 | 
			
		||||
          id: top_note_uri,
 | 
			
		||||
          type: 'Note',
 | 
			
		||||
          content: 'Lorem ipsum',
 | 
			
		||||
          replies: {
 | 
			
		||||
            type: 'Collection',
 | 
			
		||||
            id: top_collection_uri,
 | 
			
		||||
            first: {
 | 
			
		||||
              type: 'CollectionPage',
 | 
			
		||||
              partOf: top_collection_uri,
 | 
			
		||||
              items: top_items,
 | 
			
		||||
              next: top_page_2_uri,
 | 
			
		||||
            },
 | 
			
		||||
          },
 | 
			
		||||
          attributedTo: 'https://example.com',
 | 
			
		||||
        }
 | 
			
		||||
      end
 | 
			
		||||
      let(:reply_object) do
 | 
			
		||||
        {
 | 
			
		||||
          '@context': 'https://www.w3.org/ns/activitystreams',
 | 
			
		||||
          id: reply_note_uri,
 | 
			
		||||
          type: 'Note',
 | 
			
		||||
          content: 'Lorem ipsum',
 | 
			
		||||
          replies: {
 | 
			
		||||
            type: 'Collection',
 | 
			
		||||
            id: reply_collection_uri,
 | 
			
		||||
            first: {
 | 
			
		||||
              type: 'CollectionPage',
 | 
			
		||||
              partOf: reply_collection_uri,
 | 
			
		||||
              items: nested_items,
 | 
			
		||||
              next: reply_page_2_uri,
 | 
			
		||||
            },
 | 
			
		||||
          },
 | 
			
		||||
          attributedTo: 'https://other.com',
 | 
			
		||||
        }
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      let(:top_page_two) do
 | 
			
		||||
        {
 | 
			
		||||
          type: 'CollectionPage',
 | 
			
		||||
          id: top_page_2_uri,
 | 
			
		||||
          partOf: top_collection_uri,
 | 
			
		||||
          items: top_items_paged,
 | 
			
		||||
        }
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      let(:reply_page_two) do
 | 
			
		||||
        {
 | 
			
		||||
          type: 'CollectionPage',
 | 
			
		||||
          id: reply_page_2_uri,
 | 
			
		||||
          partOf: reply_collection_uri,
 | 
			
		||||
          items: nested_items_paged,
 | 
			
		||||
        }
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      before do
 | 
			
		||||
        stub_request(:get, top_page_2_uri).to_return(status: 200, body: Oj.dump(top_page_two), headers: { 'Content-Type': 'application/activity+json' })
 | 
			
		||||
        stub_request(:get, reply_page_2_uri).to_return(status: 200, body: Oj.dump(reply_page_two), headers: { 'Content-Type': 'application/activity+json' })
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      it_behaves_like 'fetches all replies'
 | 
			
		||||
 | 
			
		||||
      it 'limits by max pages' do
 | 
			
		||||
        stub_const('ActivityPub::FetchAllRepliesWorker::MAX_PAGES', 3)
 | 
			
		||||
        got_uris = subject.perform(status.id)
 | 
			
		||||
        expect(got_uris).to match_array(top_items + top_items_paged + nested_items)
 | 
			
		||||
      end
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    context 'when replies should not be fetched' do
 | 
			
		||||
      # ensure that we should not fetch by setting the status to be created in the debounce window
 | 
			
		||||
      let(:status) { Fabricate(:status, account: account, uri: top_note_uri, created_at: DateTime.now) }
 | 
			
		||||
 | 
			
		||||
      before do
 | 
			
		||||
        stub_const('Status::FetchRepliesConcern::FETCH_REPLIES_INITIAL_WAIT_MINUTES', 1.week)
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      it 'returns nil without fetching' do
 | 
			
		||||
        got_uris = subject.perform(status.id)
 | 
			
		||||
        expect(got_uris).to be_nil
 | 
			
		||||
        assert_not_requested :get, top_note_uri
 | 
			
		||||
      end
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
		Reference in New Issue
	
	Block a user