diff --git a/lib/pleroma/broadway.ex b/lib/pleroma/broadway.ex index a9a8bf3a1..965462f58 100644 --- a/lib/pleroma/broadway.ex +++ b/lib/pleroma/broadway.ex @@ -4,6 +4,9 @@ defmodule Pleroma.Broadway do require Logger @queue "akkoma" + @exchange "akkoma_exchange" + @retry_header "x-retries" + @delay_header "x-delay" def start_link(_args) do Broadway.start_link(__MODULE__, @@ -12,16 +15,9 @@ def start_link(_args) do module: {BroadwayRabbitMQ.Producer, queue: @queue, - declare: [ - durable: true, - auto_delete: false, - exclusive: true, - arguments: [ - {"x-delayed-type", "direct"} - ] - ], - on_failure: :reject_and_requeue - } + after_connect: &declare_rabbitmq/1, + metadata: [:routing_key, :headers], + on_failure: :reject} ], processors: [ default: [ @@ -38,23 +34,61 @@ def start_link(_args) do ) end + defp declare_rabbitmq(amqp_channel) do + declare_exchanges(amqp_channel) + declare_queues(amqp_channel) + declare_bindings(amqp_channel) + end + + defp declare_exchanges(amqp_channel) do + # Main exchange, all messages go here + :ok = + AMQP.Exchange.declare(amqp_channel, @exchange, :"x-delayed-message", + durable: true, + arguments: [{"x-delayed-type", :longstr, "topic"}] + ) + end + + defp declare_queues(amqp_channel) do + # Main queue, bound to main exchange + {:ok, _} = AMQP.Queue.declare(amqp_channel, @queue, durable: true) + end + + defp declare_bindings(amqp_channel) do + :ok = AMQP.Queue.bind(amqp_channel, @queue, @exchange, routing_key: "#") + end + + defp retry_count(:undefined), do: 0 + defp retry_count(headers) do + match = Enum.find(headers, fn {k, _t, _v} -> k == @retry_header end) + if is_nil(match) do + 0 + else + elem(match, 2) + end + end + @impl true - def handle_message(_, %Message{data: data} = message, _) do + def handle_message(_, %Message{data: data, metadata: %{routing_key: routing_key, headers: headers}} = message, _) do + Logger.debug("Received message on #{routing_key}") with {:ok, data} <- Jason.decode(data), {module, data} <- Map.pop(data, "__module__"), module <- String.to_existing_atom(module), :ok <- perform_message(module, data) do - Logger.debug("Received message: #{inspect(data)}") message else err -> - IO.inspect(err) - Message.failed(message, err) + retries = retry_count(headers) + if retries > 5 do + Message.failed(message, err) + else + {:ok, _} = produce(routing_key, data, scheduled_in: 5000, retry_count: retries + 1) + message + end end end defp perform_message(module, args) do - IO.inspect(args) case module.perform(%Oban.Job{args: args}) do :ok -> :ok @@ -74,7 +108,12 @@ def handle_batch(_, batch, _, _) do @impl true def handle_failed(messages, _) do - Logger.error("Failed messages: #{inspect(messages)}") + for message <- messages do + %Message{data: data, metadata: %{routing_key: topic}, status: {:failed, reason}} = message + {:ok, %{"op" => op}} = Jason.decode(data) + Logger.error("Processing task on #{topic}(#{op}) failed: #{inspect(reason)}") + end + messages end @@ -87,35 +126,65 @@ def children do [Pleroma.Broadway] end - defp maybe_add_headers([headers: headers] = opts, key, value) when is_list(headers) do - Keyword.put(opts, :headers, [{key, value} | headers]) - end - defp maybe_add_headers(opts, key, value) do - Keyword.put(opts, :headers, [{key, value}]) + defp add_headers([headers: headers] = opts, key, type, value) when is_list(headers) do + Keyword.put(opts, :headers, [{key, type, value} | headers]) end - defp maybe_with_priority(opts, [priority: priority]) when is_integer(priority) do - Keyword.put(opts, :priority, priority) + defp add_headers(opts, key, type, value) do + Keyword.put(opts, :headers, [{key, type, value}]) end - defp maybe_with_priority(opts, _), do: opts - defp maybe_with_delay(opts, [scheduled_at: scheduled_at]) do - time_in_ms = DateTime.diff(DateTime.utc_now(), scheduled_at) - opts - |> maybe_add_headers("x-delay", to_string(time_in_ms)) + defp maybe_with_priority(opts, params) do + if !is_nil(params[:priority]) do + Keyword.put(opts, :priority, params[:priority]) + else + opts + end + end + + defp maybe_schedule_at(opts, params) do + if !is_nil(params[:scheduled_at]) do + time_in_ms = DateTime.diff(params[:scheduled_at], DateTime.utc_now()) + opts + |> add_headers(@delay_header, :long, time_in_ms) + else + opts + end + end + + defp maybe_schedule_in(opts, params) do + if !is_nil(params[:scheduled_in]) do + opts + |> add_headers(@delay_header, :long, params[:scheduled_in]) + else + opts + end + end + + + defp maybe_with_retry_count(opts, params) do + if !is_nil(params[:retry_count]) do + opts + |> add_headers(@retry_header, :long, params[:retry_count]) + else + opts + end end - defp maybe_with_delay(opts, _), do: opts def produce(topic, args, opts \\ []) do - IO.puts("Producing message on #{topic}: #{inspect(args)}") {:ok, connection} = AMQP.Connection.open() {:ok, channel} = AMQP.Channel.open(connection) - publish_options = - [] - |> maybe_with_priority(opts) - |> maybe_with_delay(opts) - AMQP.Basic.publish(channel, "", @queue, args, publish_options) - AMQP.Connection.close(connection) + publish_options = + [] + |> maybe_with_priority(opts) + |> maybe_schedule_at(opts) + |> maybe_schedule_in(opts) + |> maybe_with_retry_count(opts) + + Logger.debug("Sending to #{topic} with #{inspect(publish_options)}") + :ok = AMQP.Basic.publish(channel, @exchange, topic, args, publish_options) + :ok = AMQP.Connection.close(connection) + {:ok, args} end end diff --git a/lib/pleroma/workers/publisher_worker.ex b/lib/pleroma/workers/publisher_worker.ex index 84ba2d179..545887071 100644 --- a/lib/pleroma/workers/publisher_worker.ex +++ b/lib/pleroma/workers/publisher_worker.ex @@ -15,7 +15,6 @@ def backoff(%Job{attempt: attempt}) when is_integer(attempt) do @impl Oban.Worker def perform(%Job{args: %{"op" => "publish", "activity_id" => activity_id, "object_data" => nil}}) do activity = Activity.get_by_id(activity_id) - IO.inspect(activity) Federator.perform(:publish, activity) end diff --git a/lib/pleroma/workers/purge_expired_activity.ex b/lib/pleroma/workers/purge_expired_activity.ex index e326727ba..2399cb367 100644 --- a/lib/pleroma/workers/purge_expired_activity.ex +++ b/lib/pleroma/workers/purge_expired_activity.ex @@ -7,7 +7,10 @@ defmodule Pleroma.Workers.PurgeExpiredActivity do Worker which purges expired activity. """ - use Pleroma.Workers.WorkerHelper, queue: "activity_expiration", max_attempts: 1, unique: [period: :infinity] + use Pleroma.Workers.WorkerHelper, + queue: "activity_expiration", + max_attempts: 1, + unique: [period: :infinity] import Ecto.Query diff --git a/lib/pleroma/workers/purge_expired_filter.ex b/lib/pleroma/workers/purge_expired_filter.ex index 13f8ec681..67cc7347e 100644 --- a/lib/pleroma/workers/purge_expired_filter.ex +++ b/lib/pleroma/workers/purge_expired_filter.ex @@ -7,7 +7,10 @@ defmodule Pleroma.Workers.PurgeExpiredFilter do Worker which purges expired filters """ - use Pleroma.Workers.WorkerHelper, queue: "filter_expiration", max_attempts: 1, unique: [period: :infinity] + use Pleroma.Workers.WorkerHelper, + queue: "filter_expiration", + max_attempts: 1, + unique: [period: :infinity] import Ecto.Query diff --git a/lib/pleroma/workers/worker_helper.ex b/lib/pleroma/workers/worker_helper.ex index a55dfc14a..39c441ecd 100644 --- a/lib/pleroma/workers/worker_helper.ex +++ b/lib/pleroma/workers/worker_helper.ex @@ -71,7 +71,7 @@ def enqueue(op, params, worker_args \\ []) do |> Map.put("__module__", worker) |> Map.put("op", op) - Pleroma.Broadway.produce(unquote(queue), Jason.encode!(params)) + Pleroma.Broadway.produce(unquote(queue), Jason.encode!(params), worker_args) end end end