Skip to content

Commit

Permalink
delay revamp, pools revamp, improved version view
Browse files Browse the repository at this point in the history
  • Loading branch information
hrefhref committed Dec 2, 2018
1 parent 0de576b commit 48d150d
Show file tree
Hide file tree
Showing 18 changed files with 244 additions and 112 deletions.
2 changes: 2 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ server: true,
{'/proxy/[...]', Plug.Adapters.Cowboy.Handler, {Pleroma.Web.Endpoint, []}},
{'/media/[...]', Plug.Adapters.Cowboy.Handler, {Pleroma.Web.Endpoint, []}},
{'/static/[...]', Plug.Adapters.Cowboy.Handler, {Pleroma.Web.Endpoint, []}},
{'/api/v1/instance/[...]', Plug.Adapters.Cowboy.Handler, {Pleroma.Web.Endpoint, []}},
{'/api/statusnet/config', Plug.Adapters.Cowboy.Handler, {Pleroma.Web.Endpoint, []}},
{:_, Plug.Adapters.Cowboy.Handler, {FdWeb.Endpoint, []}}
]}
]
Expand Down
1 change: 0 additions & 1 deletion lib/fd/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ defmodule Fd.Cache do
for key <- get(ctx)||[] do
[ctx, key]
|> key()
|> IO.inspect
|> delete()
end
delete(ctx)
Expand Down
2 changes: 0 additions & 2 deletions lib/fd/domain.ex
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
defmodule Fd.Domain do
def total_count(stats, domain) do
IO.puts "total count #{inspect domain}"
case Enum.find(stats["domains"], fn({d, _}) -> d == domain end) do
{_, %{"total" => total}} -> total
_ -> 0
end
|> IO.inspect()
end
end
8 changes: 7 additions & 1 deletion lib/fd/human_error.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,13 @@ defmodule Fd.HumanError do
end

def format({:error, atom}) when is_atom(atom) do
to_string(atom)
format(atom)
end

def format(atom) when is_atom(atom) do
atom
|> to_string()
|> String.replace("_", " ")
end

def format(error) do
Expand Down
113 changes: 65 additions & 48 deletions lib/fd/instances/crawler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ defmodule Fd.Instances.Crawler do
alias Fd.Instances.{Instance, InstanceCheck, Instrumenter}

@hackney_pool :hackney_crawler
@hackney_pool_opts [{:timeout, 150_000}, {:max_connections, 500}, {:connect_timeout, 300_000}]
@hackney_pool_opts [{:timeout, 50_000}, {:max_connections, 200}, {:connect_timeout, 50_000}]
@hackney_mon_pool :hackney_crawler_mon
@hackney_mon_pool_opts [{:timeout, 150_000}, {:max_connections, 100}, {:connect_timeout, 300_000}]
@hackney_opts [{:connect_timeout, 50_000}, {:recv_timeout, 50_000}, {:pool, @hackney_pool}]
@hackney_mon_opt [{:pool, @hackney_pool}]
@hackney_mon_pool_opts [{:timeout, 50_000}, {:max_connections, 100}, {:connect_timeout, 50_000}]
@hackney_dead_pool :hackney_crawler_dead
@hackney_dead_pool_opts [{:timeout, 50_000}, {:max_connections, 100}, {:connect_timeout, 150_000}]
@hackney_opts [{:connect_timeout, 150_000}, {:recv_timeout, 50_000}, {:pool, @hackney_pool}]
@hackney_mon_opts [{:pool, @hackney_mon_pool}]
@hackney_dead_opts [{:pool, @hackney_dead_pool}]

@down_http_codes [301, 410, 502, 503, 504, 505, 520, 521, 522, 523, 524, 525, 526, 527, 530]
@retry_http_codes [500, 502, 503, 504, 505, 520, 521, 522, 523, 524]
Expand Down Expand Up @@ -74,11 +77,12 @@ defmodule Fd.Instances.Crawler do
def setup() do
:ok = :hackney_pool.start_pool(@hackney_pool, @hackney_pool_opts)
:ok = :hackney_pool.start_pool(@hackney_mon_pool, @hackney_mon_pool_opts)
:ok = :hackney_pool.start_pool(@hackney_dead_pool, @hackney_dead_pool_opts)
end

def run(instance = %Instance{domain: domain}) do
state = %Crawler{instance: instance, halted?: false, has_mastapi?: false, has_statusnet?: false, has_peertubeapi?:
false, has_nodeinfo?: false, has_misskey?: false, changes: %{}, check: %{}}
false, has_nodeinfo?: false, has_misskey?: false, changes: %{}, check: %{}, diffs: %{}}


start = :erlang.monotonic_time
Expand Down Expand Up @@ -110,33 +114,34 @@ defmodule Fd.Instances.Crawler do
|> Map.put("last_checked_at", DateTime.utc_now())
|> Map.put("nodeinfo", state.nodeinfo)

state = %Crawler{state | changes: changes}

debug(state, "changes: #{inspect changes}")

check = state.check
check_changeset = InstanceCheck.changeset(%InstanceCheck{instance_id: instance.id}, check)
check_changeset = InstanceCheck.changeset(%InstanceCheck{instance_id: instance.id}, state.check)
Fd.Repo.insert!(check_changeset)

state = case Instances.update_instance(instance, changes) do
state = case Instances.update_instance(instance, state.changes) do
{:ok, instance} ->

state = check_for_changes(state)

if Application.get_env(:fd, :monitoring_alerts, false) && state.instance.monitor && state.instance.settings && state.instance.settings.alerts_to_contact do
spawn(fn() ->
became_down? = Map.get(state.diffs, :became_down, false)
became_up? = Map.get(state.diffs, :became_up, false)
if became_down? do
Fd.DownEmail.down_email(state.instance, state.check)
|> Fd.Mailer.deliver()
end
if became_up? do
Fd.UpEmail.up_email(state.instance)
|> Fd.Mailer.deliver()
if Application.get_env(:fd, :monitoring_alerts, false) && state.instance.monitor && state.instance.settings && state.instance.settings.alerts_to_contact do
spawn(fn() ->
became_down? = Map.get(state.diffs, :became_down, false)
became_up? = Map.get(state.diffs, :became_up, false)
if became_down? do
Fd.DownEmail.down_email(state.instance, state.check)
|> Fd.Mailer.deliver()
end
if became_up? do
Fd.UpEmail.up_email(state.instance)
|> Fd.Mailer.deliver()
end
end)
end
end)
end

info(state, "OK -- updated!")
warn(state, "Checked! #{inspect state.diffs}")
state
error ->
error(state, "FAIL: #{inspect error}")
Expand All @@ -147,7 +152,7 @@ defmodule Fd.Instances.Crawler do
pipeline_duration = pipeline_stop - start
total_duration = finished - start

info(state, "finished in #{:erlang.convert_time_unit(total_duration, :native, :millisecond)}ms (pipeline took #{:erlang.convert_time_unit(pipeline_duration, :native, :millisecond)} ms)!")
warn(state, "finished in #{:erlang.convert_time_unit(total_duration, :native, :millisecond)}ms (pipeline took #{:erlang.convert_time_unit(pipeline_duration, :native, :millisecond)} ms)!")

spawn(fn() ->
domains = state.m_peers || []
Expand All @@ -164,8 +169,19 @@ defmodule Fd.Instances.Crawler do
|> Enum.filter(fn(domain) -> domain end)
|> Enum.reject(fn(domain) -> Enum.member?(existings, domain) end)

for domain <- new_domains, do: Instances.create_instance(%{"domain" => domain})
for domain <- new_domains do
case Instances.create_instance(%{"domain" => domain}) do
{:ok, instance} ->
spawn(fn() ->
:timer.sleep(:rand.uniform(60) * 1000)
Fd.Instances.Server.crawl(instance.id)
end)
_ -> :ok
end
end
end)

state
end

defp put_public_suffix(crawler) do
Expand Down Expand Up @@ -220,10 +236,6 @@ defmodule Fd.Instances.Crawler do
server_changed? = if last_up_check && is_up? do
Map.get(crawler.changes, "server", 0) != last_up_check.server
else false end

diffs = %{new: new?, became_up: became_up?, became_down: became_down?, version_changed: version_changed?,
server_changed: server_changed?, is_up?: is_up?, was_up?: was_up?}

{became_open?, became_closed?} = cond do
signup_changed? && Map.get(crawler.changes, "signup", true) == false ->
{false, true}
Expand All @@ -233,13 +245,15 @@ defmodule Fd.Instances.Crawler do
{false, false}
end

IO.puts Map.get(crawler.changes, "server")
diffs = %{new: new?, became_up: became_up?, became_down: became_down?, version_changed: version_changed?,
server_changed: server_changed?, is_up?: is_up?, was_up?: was_up?, became_open?: became_open?, became_closed?: became_closed?}


unless (crawler.instance.hidden || false) or Map.get(crawler.changes, "server") == 0 do
if became_up? do
post("{instance} is back up :)", crawler.instance, [:mon])
end
if became_down? do
IO.puts "BECAME DOWN"
error = if error = Map.get(crawler.check, "error_s") do
" (#{error})"
else
Expand Down Expand Up @@ -285,6 +299,7 @@ defmodule Fd.Instances.Crawler do
post("{instance} upgraded #{server} from #{old_version} to #{new_version}:", crawler.instance, [:watch, :mon])
true -> :nothing_changed
end

end

debug(crawler, "Diffs: " <> inspect(diffs))
Expand Down Expand Up @@ -813,7 +828,6 @@ defmodule Fd.Instances.Crawler do
def query_html_index(crawler = %Crawler{halted?: false}) do
case request(crawler, "/", [json: false, follow_redirects: true]) do
{:ok, resp = %HTTPoison.Response{status_code: 200, body: body}} ->
IO.inspect resp
%Crawler{crawler | html: body}
error ->
info(crawler, "failed to get index page #{inspect error}")
Expand All @@ -833,11 +847,18 @@ defmodule Fd.Instances.Crawler do
timeout = Keyword.get(options, :timeout, 15_000)
recv_timeout = Keyword.get(options, :recv_timeout, 15_000)
body = Keyword.get(options, :body, "")
{mon_ua, options} = if crawler.instance.monitor do
mon_ua = " - monitoring enabled https://fediverse.network/monitoring"
{mon_ua, [hackney: @hackney_mon_opts]}
else
{"", [hackney: @hackney_opts]}
{mon_ua, options} = cond do
crawler.instance.monitor ->
mon_ua = " - monitoring enabled https://fediverse.network/monitoring"
{mon_ua, [hackney: @hackney_mon_opts]}
crawler.instance.dead || !crawler.instance.last_up_at ->
{"", [hackney: @hackney_dead_opts]}
crawler.instance.last_up_at && DateTime.diff(DateTime.utc_now(), crawler.instance.last_up_at) >= 2678400 ->
{"", [hackney: @hackney_dead_opts]}
!crawler.instance.server || crawler.instance.server == 0 ->
{"", [hackney: @hackney_dead_opts]}
true ->
{"", [hackney: @hackney_opts]}
end
options = [timeout: timeout, recv_timeout: recv_timeout, follow_redirect: follow_redirects] ++ options
dev_ua = if @env == :dev, do: " [dev]", else: ""
Expand All @@ -851,7 +872,6 @@ defmodule Fd.Instances.Crawler do
Map.put(headers, "Accept", accept)
else headers end
start = :erlang.monotonic_time
IO.puts "-- #{domain} #{path} #{inspect(headers)}"
case HTTPoison.request(method, "https://#{domain}#{path}", body, headers, options) do
{:ok, response = %HTTPoison.Response{status_code: 200, body: body}} ->
Instrumenter.http_request(path, response, start)
Expand Down Expand Up @@ -893,12 +913,12 @@ defmodule Fd.Instances.Crawler do
end

defp retry(crawler, path, options, error, retries) do
if retries > 5 do
if retries > 2 do
error(crawler, "HTTP ERROR (max retries reached): #{inspect error}")
error
else
debug(crawler, "HTTP retry #{inspect retries}: #{inspect error}")
:timer.sleep(:crypto.rand_uniform(retries*2000, retries*5000))
:timer.sleep(:crypto.rand_uniform(retries*500, retries*1000))
Instrumenter.retry_http_request()
request(crawler, path, options, retries + 1)
end
Expand All @@ -912,14 +932,18 @@ defmodule Fd.Instances.Crawler do
domain = crawler.instance.domain
Logger.info "Crawler(#{inspect self()} ##{crawler.instance.id} #{domain}): #{message}"
end
def warn(crawler, message) do
domain = crawler.instance.domain
Logger.warn "Crawler(#{inspect self()} ##{crawler.instance.id} #{domain}): #{message}"
end
def error(crawler, message) do
domain = crawler.instance.domain
Logger.error "Crawler(#{inspect self()} ##{crawler.instance.id} #{domain}): #{message}"
end

defp post(text, instance, accounts, replaces \\ %{}) do
Logger.warn inspect(instance.settings)
[post_acct | repeat_accts] = if Map.get(instance.settings || %{}, :fedibot) do
accounts = if Map.get(instance.settings || %{}, :fedibot) do
[instance.domain] ++ accounts
else
accounts
Expand All @@ -944,14 +968,7 @@ defmodule Fd.Instances.Crawler do
|> Enum.filter(&(&1))
|> Enum.join(" - ")

case Fd.Pleroma.post(post_acct, text) do
{:ok, activity} ->
Fd.Pleroma.repeat(activity.id, repeat_accts)
{:ok, activity}
{:error, error} ->
Logger.error "Failed to post status: #{inspect error}"
{:error, error}
end
Fd.Pleroma.post(accounts, text)
end

def downcase(nil), do: nil
Expand Down
28 changes: 20 additions & 8 deletions lib/fd/instances/crawler/nodeinfo.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,22 @@ defmodule Fd.Instances.Crawler.Nodeinfo do
href = Map.get(schema, "href")
uri = URI.parse(href)
version = detect_version(Map.get(schema, "rel"))
if version && uri.host == crawler.instance.domain do
[{version, uri.path} | acc]
else
acc
cond do
version && uri.host == crawler.instance.domain -> [{version, uri.path} | acc]
true -> acc
end
end)
|> Enum.sort_by(fn({v, _}) -> v end, &>=/2)
{version, path} = List.first(links)
query_nodeinfo(crawler, version, path)
query_nodeinfo(crawler, List.first(links))
end

defp query_nodeinfo(crawler, version, path) do
error(crawler, "Should crawl Nodeinfo ver #{inspect version} at path #{inspect path}")
defp query_nodeinfo(crawler) do
%Crawler{crawler | has_nodeinfo?: false}
end


defp query_nodeinfo(crawler, {version, path}) do
debug(crawler, "Should crawl Nodeinfo ver #{inspect version} at path #{inspect path}")
case request(crawler, path) do
{:ok, %HTTPoison.Response{status_code: 200, body: body}} ->
debug(crawler, "got nodeinfo#{inspect version} #{inspect path} " <> inspect(body))
Expand All @@ -72,6 +75,15 @@ defmodule Fd.Instances.Crawler.Nodeinfo do
end
end

defp query_nodeinfo(crawler, nil) do
debug(crawler, "no valid nodeinfo")
%Crawler{crawler | halted?: true, fatal_error: :invalid_nodeinfo}
end

defp query_nodeinfo(crawler, _) do
%Crawler{crawler | has_nodeinfo?: false}
end

defp detect_version("http://nodeinfo.diaspora.software/ns/schema/"<>float) do
case Float.parse(float) do
{version, _} -> version
Expand Down
22 changes: 22 additions & 0 deletions lib/fd/instances/instances.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ defmodule Fd.Instances do
** (Ecto.NoResultsError)
"""

def get_instance!(id), do: Repo.get!(Instance, id)

def get_instance_by_domain!(domain) do
Expand Down Expand Up @@ -324,4 +325,25 @@ defmodule Fd.Instances do
|> Map.put_new(:settings, %InstanceSettings{})
|> Instance.changeset(%{})
end

def delay(%Instance{} = instance) do
cond do
dead_instance?(instance) -> :instance_dead
instance.settings && instance.monitor && instance.settings.keep_calm -> :instance_monitor_calm
instance.monitor -> :instance_monitor
instance.settings && instance.settings.keep_calm -> :instance_calm
instance.server == 0 -> :instance_calm
true -> :instance_default
end
end

def dead?(%Instance{dead: true}), do: true
def dead?(%Instance{} = instance}) do
cond do
instance.last_up_at && DateTime.diff(DateTime.utc_now(), instance.last_up_at) >= @dead_after_secs -> true
!instance.last_up_at && DateTime.diff(DateTime.utc_now(), instance.inserted_at) >= @dead_after_secs -> true
true -> false
end
end

end
Loading

0 comments on commit 48d150d

Please sign in to comment.