1
0
Fork 0
forked from mirrors/akkoma

Merge branch 'develop' of gitssh.ihatebeinga.live:IHBAGang/pleroma into develop

This commit is contained in:
sadposter 2021-12-14 14:01:09 +00:00
commit 8f78cb3050
14 changed files with 431 additions and 102 deletions

View file

@ -8,32 +8,38 @@ defmodule Mix.Tasks.Pleroma.Search do
import Ecto.Query
alias Pleroma.Activity
alias Pleroma.Pagination
alias Pleroma.User
alias Pleroma.Hashtag
@shortdoc "Manages elasticsearch"
def run(["import_since", d | _rest]) do
start_pleroma()
{:ok, since, _} = DateTime.from_iso8601(d)
from(a in Activity, where: not ilike(a.actor, "%/relay") and a.inserted_at > ^since)
|> Activity.with_preloaded_object()
|> Activity.with_preloaded_user_actor()
|> get_all
end
def run(["import" | _rest]) do
def run(["import", "activities" | _rest]) do
start_pleroma()
from(a in Activity, where: not ilike(a.actor, "%/relay"))
|> where([a], fragment("(? ->> 'type'::text) = 'Create'", a.data))
|> Activity.with_preloaded_object()
|> Activity.with_preloaded_user_actor()
|> get_all
|> get_all(:activities)
end
defp get_all(query, max_id \\ nil) do
IO.puts(max_id)
params = %{limit: 2000}
def run(["import", "users" | _rest]) do
start_pleroma()
from(u in User, where: u.nickname not in ["internal.fetch", "relay"])
|> get_all(:users)
end
def run(["import", "hashtags" | _rest]) do
start_pleroma()
from(h in Hashtag)
|> Pleroma.Repo.all()
|> Pleroma.Elasticsearch.bulk_post(:hashtags)
end
defp get_all(query, index, max_id \\ nil) do
params = %{limit: 1000}
params =
if max_id == nil do
@ -50,17 +56,9 @@ defp get_all(query, max_id \\ nil) do
:ok
else
res
|> Enum.filter(fn x ->
t =
x.object
|> Map.get(:data, %{})
|> Map.get("type", "")
|> Pleroma.Elasticsearch.bulk_post(index)
t == "Note"
end)
|> Pleroma.Elasticsearch.bulk_post(:activities)
get_all(query, List.last(res).id)
get_all(query, index, List.last(res).id)
end
end
end

View file

@ -0,0 +1,10 @@
defmodule Pleroma.Elasticsearch.DocumentMappings.Hashtag do
def id(obj), do: obj.id
def encode(hashtag) do
%{
hashtag: hashtag.name,
timestamp: hashtag.inserted_at
}
end
end

View file

@ -0,0 +1,13 @@
defmodule Pleroma.Elasticsearch.DocumentMappings.User do
def id(obj), do: obj.id
def encode(%{actor_type: "Person"} = user) do
%{
timestamp: user.inserted_at,
instance: URI.parse(user.ap_id).host,
nickname: user.nickname,
bio: user.bio,
display_name: user.name
}
end
end

View file

@ -1,26 +1,32 @@
defmodule Pleroma.Elasticsearch do
alias Pleroma.Activity
alias Pleroma.User
alias Pleroma.Elasticsearch.DocumentMappings
alias Pleroma.Config
require Logger
defp url do
Config.get([:elasticsearch, :url])
end
def put_by_id(id) do
defp enabled? do
Config.get([:search, :provider]) == Pleroma.Search.Elasticsearch
end
def put_by_id(:activity, id) do
id
|> Activity.get_by_id_with_object()
|> maybe_put_into_elasticsearch()
end
def maybe_put_into_elasticsearch({:ok, activity}) do
maybe_put_into_elasticsearch(activity)
def maybe_put_into_elasticsearch({:ok, item}) do
maybe_put_into_elasticsearch(item)
end
def maybe_put_into_elasticsearch(
%{data: %{"type" => "Create"}, object: %{data: %{"type" => "Note"}}} = activity
) do
if Config.get([:search, :provider]) == Pleroma.Search.Elasticsearch do
if enabled?() do
actor = Pleroma.Activity.user_actor(activity)
activity
@ -29,23 +35,50 @@ def maybe_put_into_elasticsearch(
end
end
def maybe_put_into_elasticsearch(%User{} = user) do
if enabled?() do
put(user)
end
end
def maybe_put_into_elasticsearch(_) do
{:ok, :skipped}
end
def put(%Activity{} = activity) do
Elastix.Document.index(
{:ok, _} = Elastix.Document.index(
url(),
"activities",
"activity",
DocumentMappings.Activity.id(activity),
DocumentMappings.Activity.encode(activity)
)
{:ok, _} = bulk_post(
activity.object.hashtags, :hashtags
)
end
def put(%User{} = user) do
{:ok, _ } = Elastix.Document.index(
url(),
"users",
"user",
DocumentMappings.User.id(user),
DocumentMappings.User.encode(user)
)
end
def bulk_post(data, :activities) do
d =
data
|> Enum.filter(fn x ->
t =
x.object
|> Map.get(:data, %{})
|> Map.get("type", "")
t == "Note"
end)
|> Enum.map(fn d ->
[
%{index: %{_id: DocumentMappings.Activity.id(d)}},
@ -54,7 +87,7 @@ def bulk_post(data, :activities) do
end)
|> List.flatten()
Elastix.Bulk.post(
{:ok, %{body: %{"errors" => false}}} = Elastix.Bulk.post(
url(),
d,
index: "activities",
@ -62,12 +95,92 @@ def bulk_post(data, :activities) do
)
end
def search_activities(q) do
Elastix.Search.search(
def bulk_post(data, :users) do
d =
data
|> Enum.map(fn d ->
[
%{index: %{_id: DocumentMappings.User.id(d)}},
DocumentMappings.User.encode(d)
]
end)
|> List.flatten()
Elastix.Bulk.post(
url(),
"activities",
["activity"],
q
d,
index: "users",
type: "user"
)
end
def bulk_post(data, :hashtags) do
d =
data
|> Enum.map(fn d ->
[
%{index: %{_id: DocumentMappings.Hashtag.id(d)}},
DocumentMappings.Hashtag.encode(d)
]
end)
|> List.flatten()
Elastix.Bulk.post(
url(),
d,
index: "hashtags",
type: "hashtag"
)
end
def search(:raw, index, type, q) do
with {:ok, raw_results} <- Elastix.Search.search(url(), index, [type], q) do
results =
raw_results
|> Map.get(:body, %{})
|> Map.get("hits", %{})
|> Map.get("hits", [])
{:ok, results}
else
{:error, e} ->
Logger.error(e)
{:error, e}
end
end
def search(:activities, q) do
with {:ok, results} <- search(:raw, "activities", "activity", q) do
results
|> Enum.map(fn result -> result["_id"] end)
|> Pleroma.Activity.all_by_ids_with_object()
else
e ->
Logger.error(e)
[]
end
end
def search(:users, q) do
with {:ok, results} <- search(:raw, "users", "user", q) do
results
|> Enum.map(fn result -> result["_id"] end)
|> Pleroma.User.get_all_by_ids()
else
e ->
Logger.error(e)
[]
end
end
def search(:hashtags, q) do
with {:ok, results} <- search(:raw, "hashtags", "hashtag", q) do
results
|> Enum.map(fn result -> result["_source"]["hashtag"] end)
else
e ->
Logger.error(e)
[]
end
end
end

View file

@ -2,50 +2,13 @@ defmodule Pleroma.Search.Elasticsearch do
@behaviour Pleroma.Search
alias Pleroma.Web.MastodonAPI.StatusView
alias Pleroma.Web.MastodonAPI.AccountView
alias Pleroma.Web.ActivityPub.Visibility
alias Pleroma.Search.Elasticsearch.Parsers
alias Pleroma.Web.Endpoint
defp to_es(term) when is_binary(term) do
defp es_query(:activity, query) do
%{
match: %{
content: %{
query: term,
operator: "AND"
}
}
}
end
defp to_es({:quoted, term}), do: to_es(term)
defp to_es({:filter, ["hashtag", query]}) do
%{
term: %{
hashtags: %{
value: query
}
}
}
end
defp to_es({:filter, [field, query]}) do
%{
term: %{
field => %{
value: query
}
}
}
end
defp parse(query) do
query
|> SearchParser.parse!()
|> Enum.map(&to_es/1)
end
@impl Pleroma.Search
def search(%{assigns: %{user: user}} = _conn, %{q: query} = _params, _options) do
q = %{
size: 500,
terminate_after: 500,
timeout: "10s",
@ -54,34 +17,94 @@ def search(%{assigns: %{user: user}} = _conn, %{q: query} = _params, _options) d
],
query: %{
bool: %{
must: parse(String.trim(query))
must: Parsers.Activity.parse(query)
}
}
}
end
out = Pleroma.Elasticsearch.search_activities(q)
with {:ok, raw_results} <- out do
results =
raw_results
|> Map.get(:body, %{})
|> Map.get("hits", %{})
|> Map.get("hits", [])
|> Enum.map(fn result -> result["_id"] end)
|> Pleroma.Activity.all_by_ids_with_object()
|> Enum.filter(fn x -> Visibility.visible_for_user?(x, user) end)
|> Enum.reverse()
%{
"accounts" => [],
"hashtags" => [],
"statuses" =>
StatusView.render("index.json",
activities: results,
for: user,
as: :activity
)
defp es_query(:user, query) do
%{
size: 50,
terminate_after: 50,
timeout: "10s",
sort: [
%{"_timestamp" => "desc"}
],
query: %{
bool: %{
must: Parsers.User.parse(query)
}
}
end
}
end
defp es_query(:hashtag, query) do
%{
size: 50,
terminate_after: 50,
timeout: "10s",
query: %{
bool: %{
must: Parsers.Hashtag.parse(query)
}
}
}
end
@impl Pleroma.Search
def search(%{assigns: %{user: user}} = _conn, %{q: query} = _params, _options) do
parsed_query =
query
|> String.trim()
|> SearchParser.parse!()
activity_task =
Task.async(fn ->
q = es_query(:activity, parsed_query)
Pleroma.Elasticsearch.search(:activities, q)
|> Enum.filter(fn x -> Visibility.visible_for_user?(x, user) end)
end)
user_task =
Task.async(fn ->
q = es_query(:user, parsed_query)
Pleroma.Elasticsearch.search(:users, q)
|> Enum.filter(fn x -> Pleroma.User.visible_for(x, user) == :visible end)
end)
hashtag_task =
Task.async(fn ->
q = es_query(:hashtag, parsed_query)
Pleroma.Elasticsearch.search(:hashtags, q)
end)
activity_results = Task.await(activity_task)
user_results = Task.await(user_task)
hashtag_results = Task.await(hashtag_task)
%{
"accounts" =>
AccountView.render("index.json",
users: user_results,
for: user
),
"hashtags" =>
Enum.map(hashtag_results, fn x ->
%{
url: Endpoint.url() <> "/tag/" <> x,
name: x
}
end),
"statuses" =>
StatusView.render("index.json",
activities: activity_results,
for: user,
as: :activity
)
}
end
end

View file

@ -0,0 +1,38 @@
defmodule Pleroma.Search.Elasticsearch.Parsers.Activity do
defp to_es(term) when is_binary(term) do
%{
match: %{
content: %{
query: term,
operator: "AND"
}
}
}
end
defp to_es({:quoted, term}), do: to_es(term)
defp to_es({:filter, ["hashtag", query]}) do
%{
term: %{
hashtags: %{
value: query
}
}
}
end
defp to_es({:filter, [field, query]}) do
%{
term: %{
field => %{
value: query
}
}
}
end
def parse(q) do
Enum.map(q, &to_es/1)
end
end

View file

@ -0,0 +1,30 @@
defmodule Pleroma.Search.Elasticsearch.Parsers.Hashtag do
defp to_es(term) when is_binary(term) do
%{
term: %{
hashtag: %{
value: String.downcase(term),
}
}
}
end
defp to_es({:quoted, term}), do: to_es(term)
defp to_es({:filter, ["hashtag", query]}) do
%{
term: %{
hashtag: %{
value: String.downcase(query)
}
}
}
end
defp to_es({:filter, _}), do: nil
def parse(q) do
Enum.map(q, &to_es/1)
|> Enum.filter(fn x -> x != nil end)
end
end

View file

@ -0,0 +1,53 @@
defmodule Pleroma.Search.Elasticsearch.Parsers.User do
defp to_es(term) when is_binary(term) do
%{
bool: %{
minimum_should_match: 1,
should: [
%{
match: %{
bio: %{
query: term,
operator: "AND"
}
}
},
%{
term: %{
nickname: %{
value: term
}
}
},
%{
match: %{
display_name: %{
query: term,
operator: "AND"
}
}
}
]
}
}
end
defp to_es({:quoted, term}), do: to_es(term)
defp to_es({:filter, ["user", query]}) do
%{
term: %{
nickname: %{
value: query
}
}
}
end
defp to_es({:filter, _}), do: nil
def parse(q) do
Enum.map(q, &to_es/1)
|> Enum.filter(fn x -> x != nil end)
end
end

View file

@ -1088,6 +1088,7 @@ def update_and_set_cache(struct, params) do
def update_and_set_cache(changeset) do
with {:ok, user} <- Repo.update(changeset, stale_error_field: :id) do
Pleroma.Elasticsearch.maybe_put_into_elasticsearch(user)
set_cache(user)
end
end

View file

@ -538,7 +538,7 @@ defp add_notifications(meta, notifications) do
@impl true
def handle_after_transaction(%Pleroma.Activity{data: %{"type" => "Create"}} = activity) do
Pleroma.Elasticsearch.put_by_id(activity.id)
Pleroma.Elasticsearch.put_by_id(:activity, activity.id)
end
def handle_after_transaction(%Pleroma.Activity{}) do

View file

@ -0,0 +1,21 @@
{
"properties": {
"_timestamp": {
"type": "date",
"index": true
},
"instance": {
"type": "keyword"
},
"content": {
"type": "text"
},
"hashtags": {
"type": "keyword"
},
"user": {
"type": "text"
}
}
}

View file

@ -0,0 +1,11 @@
{
"properties": {
"timestamp": {
"type": "date",
"index": true
},
"hashtag": {
"type": "text"
}
}
}

View file

@ -0,0 +1,18 @@
{
"properties": {
"timestamp": {
"type": "date",
"index": true
},
"instance": {
"type": "keyword"
},
"nickname": {
"type": "text"
},
"bio": {
"type": "text"
}
}
}