Fix unbounded recursion in post discovery (#23506)
* Add a limit to how many posts can get fetched as a result of a single request * Add tests * Always pass `request_id` when processing `Announce` activities --------- Co-authored-by: nametoolong <nametoolong@users.noreply.github.com>
This commit is contained in:
		@@ -106,7 +106,8 @@ class ActivityPub::Activity
 | 
			
		||||
      actor_id = value_or_id(first_of_value(@object['attributedTo']))
 | 
			
		||||
 | 
			
		||||
      if actor_id == @account.uri
 | 
			
		||||
        return ActivityPub::Activity.factory({ 'type' => 'Create', 'actor' => actor_id, 'object' => @object }, @account).perform
 | 
			
		||||
        virtual_object = { 'type' => 'Create', 'actor' => actor_id, 'object' => @object }
 | 
			
		||||
        return ActivityPub::Activity.factory(virtual_object, @account, request_id: @options[:request_id]).perform
 | 
			
		||||
      end
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
@@ -152,9 +153,9 @@ class ActivityPub::Activity
 | 
			
		||||
  def fetch_remote_original_status
 | 
			
		||||
    if object_uri.start_with?('http')
 | 
			
		||||
      return if ActivityPub::TagManager.instance.local_uri?(object_uri)
 | 
			
		||||
      ActivityPub::FetchRemoteStatusService.new.call(object_uri, id: true, on_behalf_of: @account.followers.local.first)
 | 
			
		||||
      ActivityPub::FetchRemoteStatusService.new.call(object_uri, id: true, on_behalf_of: @account.followers.local.first, request_id: @options[:request_id])
 | 
			
		||||
    elsif @object['url'].present?
 | 
			
		||||
      ::FetchRemoteStatusService.new.call(@object['url'])
 | 
			
		||||
      ::FetchRemoteStatusService.new.call(@object['url'], request_id: @options[:request_id])
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -327,18 +327,18 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
 | 
			
		||||
  def resolve_thread(status)
 | 
			
		||||
    return unless status.reply? && status.thread.nil? && Request.valid_url?(in_reply_to_uri)
 | 
			
		||||
 | 
			
		||||
    ThreadResolveWorker.perform_async(status.id, in_reply_to_uri)
 | 
			
		||||
    ThreadResolveWorker.perform_async(status.id, in_reply_to_uri, { 'request_id' => @options[:request_id]})
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def fetch_replies(status)
 | 
			
		||||
    collection = @object['replies']
 | 
			
		||||
    return if collection.nil?
 | 
			
		||||
 | 
			
		||||
    replies = ActivityPub::FetchRepliesService.new.call(status, collection, false)
 | 
			
		||||
    replies = ActivityPub::FetchRepliesService.new.call(status, collection, allow_synchronous_requests: false, request_id: @options[:request_id])
 | 
			
		||||
    return unless replies.nil?
 | 
			
		||||
 | 
			
		||||
    uri = value_or_id(collection)
 | 
			
		||||
    ActivityPub::FetchRepliesWorker.perform_async(status.id, uri) unless uri.nil?
 | 
			
		||||
    ActivityPub::FetchRepliesWorker.perform_async(status.id, uri, { 'request_id' => @options[:request_id]}) unless uri.nil?
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def conversation_from_uri(uri)
 | 
			
		||||
 
 | 
			
		||||
@@ -2,10 +2,13 @@
 | 
			
		||||
 | 
			
		||||
class ActivityPub::FetchRemoteStatusService < BaseService
 | 
			
		||||
  include JsonLdHelper
 | 
			
		||||
  include Redisable
 | 
			
		||||
 | 
			
		||||
  DISCOVERIES_PER_REQUEST = 1000
 | 
			
		||||
 | 
			
		||||
  # Should be called when uri has already been checked for locality
 | 
			
		||||
  def call(uri, id: true, prefetched_body: nil, on_behalf_of: nil, expected_actor_uri: nil, request_id: nil)
 | 
			
		||||
    @request_id = request_id
 | 
			
		||||
    @request_id = request_id || "#{Time.now.utc.to_i}-status-#{uri}"
 | 
			
		||||
    @json = begin
 | 
			
		||||
      if prefetched_body.nil?
 | 
			
		||||
        fetch_resource(uri, id, on_behalf_of)
 | 
			
		||||
@@ -42,7 +45,13 @@ class ActivityPub::FetchRemoteStatusService < BaseService
 | 
			
		||||
    # activity as an update rather than create
 | 
			
		||||
    activity_json['type'] = 'Update' if equals_or_includes_any?(activity_json['type'], %w(Create)) && Status.where(uri: object_uri, account_id: actor.id).exists?
 | 
			
		||||
 | 
			
		||||
    ActivityPub::Activity.factory(activity_json, actor, request_id: request_id).perform
 | 
			
		||||
    with_redis do |redis|
 | 
			
		||||
      discoveries = redis.incr("status_discovery_per_request:#{@request_id}")
 | 
			
		||||
      redis.expire("status_discovery_per_request:#{@request_id}", 5.minutes.seconds)
 | 
			
		||||
      return nil if discoveries > DISCOVERIES_PER_REQUEST
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    ActivityPub::Activity.factory(activity_json, actor, request_id: @request_id).perform
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  private
 | 
			
		||||
 
 | 
			
		||||
@@ -3,14 +3,14 @@
 | 
			
		||||
class ActivityPub::FetchRepliesService < BaseService
 | 
			
		||||
  include JsonLdHelper
 | 
			
		||||
 | 
			
		||||
  def call(parent_status, collection_or_uri, allow_synchronous_requests = true)
 | 
			
		||||
  def call(parent_status, collection_or_uri, allow_synchronous_requests: true, request_id: nil)
 | 
			
		||||
    @account = parent_status.account
 | 
			
		||||
    @allow_synchronous_requests = allow_synchronous_requests
 | 
			
		||||
 | 
			
		||||
    @items = collection_items(collection_or_uri)
 | 
			
		||||
    return if @items.nil?
 | 
			
		||||
 | 
			
		||||
    FetchReplyWorker.push_bulk(filtered_replies)
 | 
			
		||||
    FetchReplyWorker.push_bulk(filtered_replies) { |reply_uri| [reply_uri, { 'request_id' => request_id}] }
 | 
			
		||||
 | 
			
		||||
    @items
 | 
			
		||||
  end
 | 
			
		||||
 
 | 
			
		||||
@@ -1,7 +1,7 @@
 | 
			
		||||
# frozen_string_literal: true
 | 
			
		||||
 | 
			
		||||
class FetchRemoteStatusService < BaseService
 | 
			
		||||
  def call(url, prefetched_body = nil)
 | 
			
		||||
  def call(url, prefetched_body: nil, request_id: nil)
 | 
			
		||||
    if prefetched_body.nil?
 | 
			
		||||
      resource_url, resource_options = FetchResourceService.new.call(url)
 | 
			
		||||
    else
 | 
			
		||||
@@ -9,6 +9,6 @@ class FetchRemoteStatusService < BaseService
 | 
			
		||||
      resource_options = { prefetched_body: prefetched_body }
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    ActivityPub::FetchRemoteStatusService.new.call(resource_url, **resource_options) unless resource_url.nil?
 | 
			
		||||
    ActivityPub::FetchRemoteStatusService.new.call(resource_url, **resource_options.merge(request_id: request_id)) unless resource_url.nil?
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
 
 | 
			
		||||
@@ -25,7 +25,7 @@ class ResolveURLService < BaseService
 | 
			
		||||
    if equals_or_includes_any?(type, ActivityPub::FetchRemoteActorService::SUPPORTED_TYPES)
 | 
			
		||||
      ActivityPub::FetchRemoteActorService.new.call(resource_url, prefetched_body: body)
 | 
			
		||||
    elsif equals_or_includes_any?(type, ActivityPub::Activity::Create::SUPPORTED_TYPES + ActivityPub::Activity::Create::CONVERTED_TYPES)
 | 
			
		||||
      status = FetchRemoteStatusService.new.call(resource_url, body)
 | 
			
		||||
      status = FetchRemoteStatusService.new.call(resource_url, prefetched_body: body)
 | 
			
		||||
      authorize_with @on_behalf_of, status, :show? unless status.nil?
 | 
			
		||||
      status
 | 
			
		||||
    end
 | 
			
		||||
 
 | 
			
		||||
@@ -6,8 +6,8 @@ class ActivityPub::FetchRepliesWorker
 | 
			
		||||
 | 
			
		||||
  sidekiq_options queue: 'pull', retry: 3
 | 
			
		||||
 | 
			
		||||
  def perform(parent_status_id, replies_uri)
 | 
			
		||||
    ActivityPub::FetchRepliesService.new.call(Status.find(parent_status_id), replies_uri)
 | 
			
		||||
  def perform(parent_status_id, replies_uri, options = {})
 | 
			
		||||
    ActivityPub::FetchRepliesService.new.call(Status.find(parent_status_id), replies_uri, **options.deep_symbolize_keys)
 | 
			
		||||
  rescue ActiveRecord::RecordNotFound
 | 
			
		||||
    true
 | 
			
		||||
  end
 | 
			
		||||
 
 | 
			
		||||
@@ -6,7 +6,7 @@ class FetchReplyWorker
 | 
			
		||||
 | 
			
		||||
  sidekiq_options queue: 'pull', retry: 3
 | 
			
		||||
 | 
			
		||||
  def perform(child_url)
 | 
			
		||||
    FetchRemoteStatusService.new.call(child_url)
 | 
			
		||||
  def perform(child_url, options = {})
 | 
			
		||||
    FetchRemoteStatusService.new.call(child_url, **options.deep_symbolize_keys)
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
 
 | 
			
		||||
@@ -6,9 +6,9 @@ class ThreadResolveWorker
 | 
			
		||||
 | 
			
		||||
  sidekiq_options queue: 'pull', retry: 3
 | 
			
		||||
 | 
			
		||||
  def perform(child_status_id, parent_url)
 | 
			
		||||
  def perform(child_status_id, parent_url, options = {})
 | 
			
		||||
    child_status  = Status.find(child_status_id)
 | 
			
		||||
    parent_status = FetchRemoteStatusService.new.call(parent_url)
 | 
			
		||||
    parent_status = FetchRemoteStatusService.new.call(parent_url, **options.deep_symbolize_keys)
 | 
			
		||||
 | 
			
		||||
    return if parent_status.nil?
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -48,7 +48,7 @@ RSpec.describe ActivityPub::Activity::Add do
 | 
			
		||||
        end
 | 
			
		||||
 | 
			
		||||
        it 'fetches the status and pins it' do
 | 
			
		||||
          allow(service_stub).to receive(:call) do |uri, id: true, on_behalf_of: nil|
 | 
			
		||||
          allow(service_stub).to receive(:call) do |uri, id: true, on_behalf_of: nil, request_id: nil|
 | 
			
		||||
            expect(uri).to eq 'https://example.com/unknown'
 | 
			
		||||
            expect(id).to eq true
 | 
			
		||||
            expect(on_behalf_of&.following?(sender)).to eq true
 | 
			
		||||
@@ -62,7 +62,7 @@ RSpec.describe ActivityPub::Activity::Add do
 | 
			
		||||
 | 
			
		||||
      context 'when there is no local follower' do
 | 
			
		||||
        it 'tries to fetch the status' do
 | 
			
		||||
          allow(service_stub).to receive(:call) do |uri, id: true, on_behalf_of: nil|
 | 
			
		||||
          allow(service_stub).to receive(:call) do |uri, id: true, on_behalf_of: nil, request_id: nil|
 | 
			
		||||
            expect(uri).to eq 'https://example.com/unknown'
 | 
			
		||||
            expect(id).to eq true
 | 
			
		||||
            expect(on_behalf_of).to eq nil
 | 
			
		||||
 
 | 
			
		||||
@@ -223,4 +223,98 @@ RSpec.describe ActivityPub::FetchRemoteStatusService, type: :service do
 | 
			
		||||
      end
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  context 'statuses referencing other statuses' do
 | 
			
		||||
    before do
 | 
			
		||||
      stub_const 'ActivityPub::FetchRemoteStatusService::DISCOVERIES_PER_REQUEST', 5
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    context 'using inReplyTo' do
 | 
			
		||||
      let(:object) do
 | 
			
		||||
        {
 | 
			
		||||
          '@context': 'https://www.w3.org/ns/activitystreams',
 | 
			
		||||
          id: "https://foo.bar/@foo/1",
 | 
			
		||||
          type: 'Note',
 | 
			
		||||
          content: 'Lorem ipsum',
 | 
			
		||||
          inReplyTo: 'https://foo.bar/@foo/2',
 | 
			
		||||
          attributedTo: ActivityPub::TagManager.instance.uri_for(sender),
 | 
			
		||||
        }
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      before do
 | 
			
		||||
        8.times do |i|
 | 
			
		||||
          status_json = {
 | 
			
		||||
            '@context': 'https://www.w3.org/ns/activitystreams',
 | 
			
		||||
            id: "https://foo.bar/@foo/#{i}",
 | 
			
		||||
            type: 'Note',
 | 
			
		||||
            content: 'Lorem ipsum',
 | 
			
		||||
            inReplyTo: "https://foo.bar/@foo/#{i + 1}",
 | 
			
		||||
            attributedTo: ActivityPub::TagManager.instance.uri_for(sender),
 | 
			
		||||
            to: 'as:Public',
 | 
			
		||||
          }.with_indifferent_access
 | 
			
		||||
          stub_request(:get, "https://foo.bar/@foo/#{i}").to_return(status: 200, body: status_json.to_json, headers: { 'Content-Type': 'application/activity+json' })
 | 
			
		||||
        end
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      it 'creates at least some statuses' do
 | 
			
		||||
        expect { subject.call(object[:id], prefetched_body: Oj.dump(object)) }.to change { sender.statuses.count }.by_at_least(2)
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      it 'creates no more account than the limit allows' do
 | 
			
		||||
        expect { subject.call(object[:id], prefetched_body: Oj.dump(object)) }.to change { sender.statuses.count }.by_at_most(5)
 | 
			
		||||
      end
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    context 'using replies' do
 | 
			
		||||
      let(:object) do
 | 
			
		||||
        {
 | 
			
		||||
          '@context': 'https://www.w3.org/ns/activitystreams',
 | 
			
		||||
          id: "https://foo.bar/@foo/1",
 | 
			
		||||
          type: 'Note',
 | 
			
		||||
          content: 'Lorem ipsum',
 | 
			
		||||
          replies: {
 | 
			
		||||
            type: 'Collection',
 | 
			
		||||
            id: 'https://foo.bar/@foo/1/replies',
 | 
			
		||||
            first: {
 | 
			
		||||
              type: 'CollectionPage',
 | 
			
		||||
              partOf: 'https://foo.bar/@foo/1/replies',
 | 
			
		||||
              items: ['https://foo.bar/@foo/2'],
 | 
			
		||||
            },
 | 
			
		||||
          },
 | 
			
		||||
          attributedTo: ActivityPub::TagManager.instance.uri_for(sender),
 | 
			
		||||
        }
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      before do
 | 
			
		||||
        8.times do |i|
 | 
			
		||||
          status_json = {
 | 
			
		||||
            '@context': 'https://www.w3.org/ns/activitystreams',
 | 
			
		||||
            id: "https://foo.bar/@foo/#{i}",
 | 
			
		||||
            type: 'Note',
 | 
			
		||||
            content: 'Lorem ipsum',
 | 
			
		||||
            replies: {
 | 
			
		||||
              type: 'Collection',
 | 
			
		||||
              id: "https://foo.bar/@foo/#{i}/replies",
 | 
			
		||||
              first: {
 | 
			
		||||
                type: 'CollectionPage',
 | 
			
		||||
                partOf: "https://foo.bar/@foo/#{i}/replies",
 | 
			
		||||
                items: ["https://foo.bar/@foo/#{i+1}"],
 | 
			
		||||
              },
 | 
			
		||||
            },
 | 
			
		||||
            attributedTo: ActivityPub::TagManager.instance.uri_for(sender),
 | 
			
		||||
            to: 'as:Public',
 | 
			
		||||
          }.with_indifferent_access
 | 
			
		||||
          stub_request(:get, "https://foo.bar/@foo/#{i}").to_return(status: 200, body: status_json.to_json, headers: { 'Content-Type': 'application/activity+json' })
 | 
			
		||||
        end
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      it 'creates at least some statuses' do
 | 
			
		||||
        expect { subject.call(object[:id], prefetched_body: Oj.dump(object)) }.to change { sender.statuses.count }.by_at_least(2)
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      it 'creates no more account than the limit allows' do
 | 
			
		||||
        expect { subject.call(object[:id], prefetched_body: Oj.dump(object)) }.to change { sender.statuses.count }.by_at_most(5)
 | 
			
		||||
      end
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
 
 | 
			
		||||
@@ -15,7 +15,7 @@ RSpec.describe FetchRemoteStatusService, type: :service do
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  context 'protocol is :activitypub' do
 | 
			
		||||
    subject { described_class.new.call(note[:id], prefetched_body) }
 | 
			
		||||
    subject { described_class.new.call(note[:id], prefetched_body: prefetched_body) }
 | 
			
		||||
    let(:prefetched_body) { Oj.dump(note) }
 | 
			
		||||
 | 
			
		||||
    before do
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user