1
0
Fork 1
mirror of https://akkoma.dev/AkkomaGang/akkoma.git synced 2024-11-17 18:49:15 +00:00

Introduce new ingestion pipeline structure, implement internal Likes with it.

This commit is contained in:
lain 2019-10-16 16:16:39 +02:00
parent e3b4a3e96b
commit 6e4f52f8a2
22 changed files with 284 additions and 47 deletions

View file

@ -18,6 +18,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
alias Pleroma.Web.ActivityPub.MRF
alias Pleroma.Web.ActivityPub.Transmogrifier
alias Pleroma.Web.ActivityPub.Utils
alias Pleroma.Web.ActivityPub.ObjectValidator
alias Pleroma.Web.ActivityPub.SideEffects
alias Pleroma.Web.Streamer
alias Pleroma.Web.WebFinger
alias Pleroma.Workers.BackgroundWorker
@ -123,6 +125,38 @@ def increase_poll_votes_if_vote(%{
def increase_poll_votes_if_vote(_create_data), do: :noop
@spec common_pipeline(map(), keyword()) :: {:ok, Activity.t(), keyword()} | {:error, any()}
def common_pipeline(object, meta) do
with {_, {:ok, validated_object, meta}} <-
{:validate_object, ObjectValidator.validate(object, meta)},
{_, {:ok, mrfd_object}} <- {:mrf_object, MRF.filter(validated_object)},
{_, {:ok, %Activity{} = activity, meta}} <-
{:persist_object, persist(mrfd_object, meta)},
{_, {:ok, %Activity{} = activity, meta}} <-
{:execute_side_effects, SideEffects.handle(activity, meta)} do
{:ok, activity, meta}
else
e -> {:error, e}
end
end
# TODO rewrite in with style
@spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()}
def persist(object, meta) do
local = Keyword.get(meta, :local)
{recipients, _, _} = get_recipients(object)
{:ok, activity} =
Repo.insert(%Activity{
data: object,
local: local,
recipients: recipients,
actor: object["actor"]
})
{:ok, activity, meta}
end
def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
with nil <- Activity.normalize(map),
map <- lazy_put_activity_defaults(map, fake),
@ -130,6 +164,7 @@ def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when
{_, true} <- {:remote_limit_error, check_remote_limit(map)},
{:ok, map} <- MRF.filter(map),
{recipients, _, _} = get_recipients(map),
# ???
{:fake, false, map, recipients} <- {:fake, fake, map, recipients},
:ok <- Containment.contain_child(map),
{:ok, map, object} <- insert_full_object(map) do

View file

@ -0,0 +1,43 @@
defmodule Pleroma.Web.ActivityPub.Builder do
@moduledoc """
This module builds the objects. Meant to be used for creating local objects.
This module encodes our addressing policies and general shape of our objects.
"""
alias Pleroma.Web.ActivityPub.Utils
alias Pleroma.Web.ActivityPub.Visibility
alias Pleroma.User
alias Pleroma.Object
@spec like(User.t(), Object.t()) :: {:ok, map(), keyword()}
def like(actor, object) do
object_actor = User.get_cached_by_ap_id(object.data["actor"])
# Address the actor of the object, and our actor's follower collection if the post is public.
to =
if Visibility.is_public?(object) do
[actor.follower_address, object.data["actor"]]
else
[object.data["actor"]]
end
# CC everyone who's been addressed in the object, except ourself and the object actor's
# follower collection
cc =
(object.data["to"] ++ (object.data["cc"] || []))
|> List.delete(actor.ap_id)
|> List.delete(object_actor.follower_address)
{:ok,
%{
"id" => Utils.generate_activity_id(),
"actor" => actor.ap_id,
"type" => "Like",
"object" => object.data["id"],
"to" => to,
"cc" => cc,
"context" => object.data["context"]
}, []}
end
end

View file

@ -0,0 +1,57 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.ActivityPub.ObjectValidator do
@moduledoc """
This module is responsible for validating an object (which can be an activity)
and checking if it is both well formed and also compatible with our view of
the system.
"""
alias Pleroma.User
alias Pleroma.Object
alias Pleroma.Web.ActivityPub.Utils
def validate_id(object, meta) do
with {_, true} <- {:id_presence, Map.has_key?(object, "id")} do
{:ok, object, meta}
else
e -> {:error, e}
end
end
def validate_actor(object, meta) do
with {_, %User{}} <- {:actor_validation, User.get_cached_by_ap_id(object["actor"])} do
{:ok, object, meta}
else
e -> {:error, e}
end
end
def common_validations(object, meta) do
with {_, {:ok, object, meta}} <- {:validate_id, validate_id(object, meta)},
{_, {:ok, object, meta}} <- {:validate_actor, validate_actor(object, meta)} do
{:ok, object, meta}
else
e -> {:error, e}
end
end
@spec validate(map(), keyword()) :: {:ok, map(), keyword()} | {:error, any()}
def validate(object, meta)
def validate(%{"type" => "Like"} = object, meta) do
with {:ok, object, meta} <- common_validations(object, meta),
{_, %Object{} = liked_object} <- {:find_liked_object, Object.normalize(object["object"])},
{_, nil} <- {:existing_like, Utils.get_existing_like(object["actor"], liked_object)} do
{:ok, object, meta}
else
e -> {:error, e}
end
end
def validate(object, meta) do
common_validations(object, meta)
end
end

View file

@ -0,0 +1,28 @@
defmodule Pleroma.Web.ActivityPub.SideEffects do
@moduledoc """
This module looks at an inserted object and executes the side effects that it
implies. For example, a `Like` activity will increase the like count on the
liked object, a `Follow` activity will add the user to the follower
collection, and so on.
"""
alias Pleroma.Web.ActivityPub.Utils
alias Pleroma.Object
alias Pleroma.Notification
def handle(object, meta \\ [])
# Tasks this handles:
# - Add like to object
# - Set up notification
def handle(%{data: %{"type" => "Like"}} = object, meta) do
liked_object = Object.get_by_ap_id(object.data["object"])
Utils.add_like_to_object(object, liked_object)
Notification.create_notifications(object)
{:ok, object, meta}
end
# Nothing to do
def handle(object, meta) do
{:ok, object, meta}
end
end

View file

@ -10,6 +10,7 @@ defmodule Pleroma.Web.CommonAPI do
alias Pleroma.ThreadMute
alias Pleroma.User
alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.ActivityPub.Builder
alias Pleroma.Web.ActivityPub.Utils
alias Pleroma.Web.ActivityPub.Visibility
@ -17,6 +18,7 @@ defmodule Pleroma.Web.CommonAPI do
import Pleroma.Web.CommonAPI.Utils
require Pleroma.Constants
require Logger
def follow(follower, followed) do
timeout = Pleroma.Config.get([:activitypub, :follow_handshake_timeout])
@ -98,16 +100,31 @@ def unrepeat(id_or_ap_id, user) do
end
end
def favorite(id_or_ap_id, user) do
with %Activity{} = activity <- get_by_id_or_ap_id(id_or_ap_id),
object <- Object.normalize(activity),
nil <- Utils.get_existing_like(user.ap_id, object) do
ActivityPub.like(user, object)
@spec favorite(User.t(), binary()) :: {:ok, Activity.t()} | {:error, any()}
def favorite(%User{} = user, id) do
with {_, %Activity{object: object}} <- {:find_object, Activity.get_by_id_with_object(id)},
{_, {:ok, like_object, meta}} <- {:build_object, Builder.like(user, object)},
{_, {:ok, %Activity{} = activity, _meta}} <-
{:common_pipeline,
ActivityPub.common_pipeline(like_object, Keyword.put(meta, :local, true))} do
{:ok, activity}
else
_ -> {:error, dgettext("errors", "Could not favorite")}
e ->
Logger.error("Could not favorite #{id}. Error: #{inspect(e, pretty: true)}")
{:error, dgettext("errors", "Could not favorite")}
end
end
# def favorite(id_or_ap_id, user) do
# with %Activity{} = activity <- get_by_id_or_ap_id(id_or_ap_id),
# object <- Object.normalize(activity),
# nil <- Utils.get_existing_like(user.ap_id, object) do
# ActivityPub.like(user, object)
# else
# _ -> {:error, dgettext("errors", "Could not favorite")}
# end
# end
def unfavorite(id_or_ap_id, user) do
with %Activity{} = activity <- get_by_id_or_ap_id(id_or_ap_id) do
object = Object.normalize(activity)

View file

@ -201,9 +201,9 @@ def unreblog(%{assigns: %{user: user}} = conn, %{"id" => ap_id_or_id}) do
end
@doc "POST /api/v1/statuses/:id/favourite"
def favourite(%{assigns: %{user: user}} = conn, %{"id" => ap_id_or_id}) do
with {:ok, _fav, %{data: %{"id" => id}}} <- CommonAPI.favorite(ap_id_or_id, user),
%Activity{} = activity <- Activity.get_create_by_object_ap_id(id) do
def favourite(%{assigns: %{user: user}} = conn, %{"id" => activity_id}) do
with {:ok, _fav} <- CommonAPI.favorite(user, activity_id),
%Activity{} = activity <- Activity.get_by_id(activity_id) do
try_render(conn, "show.json", activity: activity, for: user, as: :activity)
end
end

View file

@ -431,7 +431,7 @@ test "it does not send notification to mentioned users in likes" do
"status" => "hey @#{other_user.nickname}!"
})
{:ok, activity_two, _} = CommonAPI.favorite(activity_one.id, third_user)
{:ok, activity_two} = CommonAPI.favorite(third_user, activity_one.id)
assert other_user not in Notification.get_notified_from_activity(activity_two)
end
@ -461,7 +461,7 @@ test "liking an activity results in 1 notification, then 0 if the activity is de
assert Enum.empty?(Notification.for_user(user))
{:ok, _, _} = CommonAPI.favorite(activity.id, other_user)
{:ok, _} = CommonAPI.favorite(other_user, activity.id)
assert length(Notification.for_user(user)) == 1
@ -478,7 +478,7 @@ test "liking an activity results in 1 notification, then 0 if the activity is un
assert Enum.empty?(Notification.for_user(user))
{:ok, _, _} = CommonAPI.favorite(activity.id, other_user)
{:ok, _} = CommonAPI.favorite(other_user, activity.id)
assert length(Notification.for_user(user)) == 1
@ -533,7 +533,7 @@ test "liking an activity which is already deleted does not generate a notificati
assert Enum.empty?(Notification.for_user(user))
{:error, _} = CommonAPI.favorite(activity.id, other_user)
{:error, _} = CommonAPI.favorite(other_user, activity.id)
assert Enum.empty?(Notification.for_user(user))
end

View file

@ -182,7 +182,8 @@ test "preserves internal fields on refetch", %{mock_modified: mock_modified} do
user = insert(:user)
activity = Activity.get_create_by_object_ap_id(object.data["id"])
{:ok, _activity, object} = CommonAPI.favorite(activity.id, user)
{:ok, activity} = CommonAPI.favorite(user, activity.id)
object = Object.get_by_ap_id(activity.data["object"])
assert object.data["like_count"] == 1

View file

@ -102,7 +102,7 @@ test "it turns OrderedCollection likes into empty arrays" do
{:ok, %{id: id, object: object}} = CommonAPI.post(user, %{"status" => "test"})
{:ok, %{object: object2}} = CommonAPI.post(user, %{"status" => "test test"})
CommonAPI.favorite(id, user2)
CommonAPI.favorite(user2, id)
likes = %{
"first" =>

View file

@ -1059,8 +1059,8 @@ test "it deletes a user, all follow relationships and all activities", %{user: u
object_two = insert(:note, user: follower)
activity_two = insert(:note_activity, user: follower, note: object_two)
{:ok, like, _} = CommonAPI.favorite(activity_two.id, user)
{:ok, like_two, _} = CommonAPI.favorite(activity.id, follower)
{:ok, like} = CommonAPI.favorite(user, activity_two.id)
{:ok, like_two} = CommonAPI.favorite(follower, activity.id)
{:ok, repeat, _} = CommonAPI.repeat(activity_two.id, user)
{:ok, job} = User.delete(user)

View file

@ -0,0 +1,21 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.ActivityPub.ObjectValidatorTest do
use Pleroma.DataCase
import Pleroma.Factory
describe "likes" do
test "it is well formed" do
_required_fields = [
"id",
"actor",
"object"
]
_user = insert(:user)
end
end
end

View file

@ -0,0 +1,32 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.ActivityPub.SideEffectsTest do
use Pleroma.DataCase
alias Pleroma.Object
alias Pleroma.Web.CommonAPI
alias Pleroma.Web.ActivityPub.Builder
alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.ActivityPub.SideEffects
import Pleroma.Factory
describe "like objects" do
setup do
user = insert(:user)
{:ok, post} = CommonAPI.post(user, %{"status" => "hey"})
{:ok, like_data, _meta} = Builder.like(user, post.object)
{:ok, like, _meta} = ActivityPub.persist(like_data, [])
%{like: like, user: user}
end
test "add the like to the original object", %{like: like, user: user} do
{:ok, like, _} = SideEffects.handle(like)
object = Object.get_by_ap_id(like.data["object"])
assert object.data["like_count"] == 1
assert user.ap_id in object.data["likes"]
end
end
end

View file

@ -1187,7 +1187,7 @@ test "it translates ostatus IDs to external URLs" do
user = insert(:user)
{:ok, activity, _} = CommonAPI.favorite(referent_activity.id, user)
{:ok, activity} = CommonAPI.favorite(user, referent_activity.id)
{:ok, modified} = Transmogrifier.prepare_outgoing(activity.data)
assert modified["object"] == "http://gs.example.org:4040/index.php/notice/29"

View file

@ -41,7 +41,7 @@ test "renders a like activity" do
object = Object.normalize(note)
user = insert(:user)
{:ok, like_activity, _} = CommonAPI.favorite(note.id, user)
{:ok, like_activity} = CommonAPI.favorite(user, note.id)
result = ObjectView.render("object.json", %{object: like_activity})

View file

@ -251,9 +251,12 @@ test "favoriting a status" do
user = insert(:user)
other_user = insert(:user)
{:ok, activity} = CommonAPI.post(other_user, %{"status" => "cofe"})
{:ok, post_activity} = CommonAPI.post(other_user, %{"status" => "cofe"})
{:ok, %Activity{}, _} = CommonAPI.favorite(activity.id, user)
{:ok, %Activity{data: data}} = CommonAPI.favorite(user, post_activity.id)
assert data["type"] == "Like"
assert data["actor"] == user.ap_id
assert data["object"] == post_activity.data["object"]
end
test "retweeting a status twice returns an error" do
@ -270,8 +273,8 @@ test "favoriting a status twice returns an error" do
other_user = insert(:user)
{:ok, activity} = CommonAPI.post(other_user, %{"status" => "cofe"})
{:ok, %Activity{}, _object} = CommonAPI.favorite(activity.id, user)
{:error, _} = CommonAPI.favorite(activity.id, user)
{:ok, %Activity{}} = CommonAPI.favorite(user, activity.id)
{:error, _} = CommonAPI.favorite(user, activity.id)
end
end

View file

@ -143,7 +143,7 @@ test "filters notifications using exclude_types", %{conn: conn} do
{:ok, mention_activity} = CommonAPI.post(other_user, %{"status" => "hey @#{user.nickname}"})
{:ok, create_activity} = CommonAPI.post(user, %{"status" => "hey"})
{:ok, favorite_activity, _} = CommonAPI.favorite(create_activity.id, other_user)
{:ok, favorite_activity} = CommonAPI.favorite(other_user, create_activity.id)
{:ok, reblog_activity, _} = CommonAPI.repeat(create_activity.id, other_user)
{:ok, _, _, follow_activity} = CommonAPI.follow(other_user, user)

View file

@ -589,7 +589,7 @@ test "reblogged status for another user", %{conn: conn} do
user1 = insert(:user)
user2 = insert(:user)
user3 = insert(:user)
CommonAPI.favorite(activity.id, user2)
{:ok, _} = CommonAPI.favorite(user2, activity.id)
{:ok, _bookmark} = Pleroma.Bookmark.create(user2.id, activity.id)
{:ok, reblog_activity1, _object} = CommonAPI.repeat(activity.id, user1)
{:ok, _, _object} = CommonAPI.repeat(activity.id, user2)
@ -695,7 +695,7 @@ test "unfavorites a status and returns it", %{conn: conn} do
activity = insert(:note_activity)
user = insert(:user)
{:ok, _, _} = CommonAPI.favorite(activity.id, user)
{:ok, _} = CommonAPI.favorite(user, activity.id)
conn =
conn
@ -1047,7 +1047,7 @@ test "Repeated posts that are replies incorrectly have in_reply_to_id null", %{c
test "returns users who have favorited the status", %{conn: conn, activity: activity} do
other_user = insert(:user)
{:ok, _, _} = CommonAPI.favorite(activity.id, other_user)
{:ok, _} = CommonAPI.favorite(other_user, activity.id)
response =
conn
@ -1078,7 +1078,7 @@ test "does not return users who have favorited the status but are blocked", %{
other_user = insert(:user)
{:ok, user} = User.block(user, other_user)
{:ok, _, _} = CommonAPI.favorite(activity.id, other_user)
{:ok, _} = CommonAPI.favorite(other_user, activity.id)
response =
conn
@ -1091,7 +1091,7 @@ test "does not return users who have favorited the status but are blocked", %{
test "does not fail on an unauthenticated request", %{conn: conn, activity: activity} do
other_user = insert(:user)
{:ok, _, _} = CommonAPI.favorite(activity.id, other_user)
{:ok, _} = CommonAPI.favorite(other_user, activity.id)
response =
conn
@ -1112,7 +1112,7 @@ test "requires authentification for private posts", %{conn: conn, user: user} do
"visibility" => "direct"
})
{:ok, _, _} = CommonAPI.favorite(activity.id, other_user)
{:ok, _} = CommonAPI.favorite(other_user, activity.id)
conn
|> assign(:user, nil)
@ -1269,7 +1269,7 @@ test "returns the favorites of a user", %{conn: conn} do
{:ok, _} = CommonAPI.post(other_user, %{"status" => "bla"})
{:ok, activity} = CommonAPI.post(other_user, %{"status" => "traps are happy"})
{:ok, _, _} = CommonAPI.favorite(activity.id, user)
{:ok, _} = CommonAPI.favorite(user, activity.id)
first_conn =
conn
@ -1289,7 +1289,7 @@ test "returns the favorites of a user", %{conn: conn} do
"Trees Are Never Sad Look At Them Every Once In Awhile They're Quite Beautiful."
})
{:ok, _, _} = CommonAPI.favorite(second_activity.id, user)
{:ok, _} = CommonAPI.favorite(user, second_activity.id)
last_like = status["id"]

View file

@ -42,7 +42,7 @@ test "Favourite notification" do
user = insert(:user)
another_user = insert(:user)
{:ok, create_activity} = CommonAPI.post(user, %{"status" => "hey"})
{:ok, favorite_activity, _object} = CommonAPI.favorite(create_activity.id, another_user)
{:ok, favorite_activity} = CommonAPI.favorite(another_user, create_activity.id)
{:ok, [notification]} = Notification.create_notifications(favorite_activity)
create_activity = Activity.get_by_id(create_activity.id)

View file

@ -271,7 +271,7 @@ test "only gets a notice in AS2 format for Create messages", %{conn: conn} do
user = insert(:user)
{:ok, like_activity, _} = CommonAPI.favorite(note_activity.id, user)
{:ok, like_activity} = CommonAPI.favorite(user, note_activity.id)
url = "/notice/#{like_activity.id}"
assert like_activity.data["type"] == "Like"
@ -298,7 +298,7 @@ test "render html for redirect for html format", %{conn: conn} do
user = insert(:user)
{:ok, like_activity, _} = CommonAPI.favorite(note_activity.id, user)
{:ok, like_activity} = CommonAPI.favorite(user, note_activity.id)
assert like_activity.data["type"] == "Like"

View file

@ -165,7 +165,7 @@ test "returns list of statuses favorited by specified user", %{
user: user
} do
[activity | _] = insert_pair(:note_activity)
CommonAPI.favorite(activity.id, user)
CommonAPI.favorite(user, activity.id)
response =
conn
@ -184,7 +184,7 @@ test "returns favorites for specified user_id when user is not logged in", %{
user: user
} do
activity = insert(:note_activity)
CommonAPI.favorite(activity.id, user)
CommonAPI.favorite(user, activity.id)
response =
conn
@ -205,7 +205,7 @@ test "returns favorited DM only when user is logged in and he is one of recipien
"visibility" => "direct"
})
CommonAPI.favorite(direct.id, user)
CommonAPI.favorite(user, direct.id)
response =
conn
@ -236,7 +236,7 @@ test "does not return others' favorited DM when user is not one of recipients",
"visibility" => "direct"
})
CommonAPI.favorite(direct.id, user)
CommonAPI.favorite(user, direct.id)
response =
conn
@ -255,7 +255,7 @@ test "paginates favorites using since_id and max_id", %{
activities = insert_list(10, :note_activity)
Enum.each(activities, fn activity ->
CommonAPI.favorite(activity.id, user)
CommonAPI.favorite(user, activity.id)
end)
third_activity = Enum.at(activities, 2)
@ -283,7 +283,7 @@ test "limits favorites using limit parameter", %{
7
|> insert_list(:note_activity)
|> Enum.each(fn activity ->
CommonAPI.favorite(activity.id, user)
CommonAPI.favorite(user, activity.id)
end)
response =
@ -321,7 +321,7 @@ test "returns 403 error when user has hidden own favorites", %{
} do
user = insert(:user, %{info: %{hide_favorites: true}})
activity = insert(:note_activity)
CommonAPI.favorite(activity.id, user)
CommonAPI.favorite(user, activity.id)
conn =
conn
@ -334,7 +334,7 @@ test "returns 403 error when user has hidden own favorites", %{
test "hides favorites for new users by default", %{conn: conn, current_user: current_user} do
user = insert(:user)
activity = insert(:note_activity)
CommonAPI.favorite(activity.id, user)
CommonAPI.favorite(user, activity.id)
conn =
conn

View file

@ -152,7 +152,7 @@ test "renders body for like activity" do
"<span>Lorem ipsum dolor sit amet</span>, consectetur :firefox: adipiscing elit. Fusce sagittis finibus turpis."
})
{:ok, activity, _} = CommonAPI.favorite(activity.id, user)
{:ok, activity} = CommonAPI.favorite(user, activity.id)
object = Object.normalize(activity)
assert Impl.format_body(%{activity: activity}, user, object) == "@Bob has favorited your post"

View file

@ -68,7 +68,7 @@ test "it doesn't send notify to the 'user:notification' stream when a user is bl
)
{:ok, activity} = CommonAPI.post(user, %{"status" => ":("})
{:ok, notif, _} = CommonAPI.favorite(activity.id, blocked)
{:ok, notif} = CommonAPI.favorite(blocked, activity.id)
Streamer.stream("user:notification", notif)
Task.await(task)
@ -87,7 +87,7 @@ test "it doesn't send notify to the 'user:notification' stream when a thread is
{:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
{:ok, activity} = CommonAPI.add_mute(user, activity)
{:ok, notif, _} = CommonAPI.favorite(activity.id, user2)
{:ok, notif} = CommonAPI.favorite(user2, activity.id)
Streamer.stream("user:notification", notif)
Task.await(task)
end
@ -105,7 +105,7 @@ test "it doesn't send notify to the 'user:notification' stream' when a domain is
{:ok, user} = User.block_domain(user, "hecking-lewd-place.com")
{:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
{:ok, notif, _} = CommonAPI.favorite(activity.id, user2)
{:ok, notif} = CommonAPI.favorite(user2, activity.id)
Streamer.stream("user:notification", notif)
Task.await(task)