Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 20 additions & 8 deletions src/itsy/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
[clojure.tools.logging :refer [debug error info trace warn]]
[clojure.set :as set]
[clj-http.client :as http]
[itsy.queues :as queues]
[itsy.robots :as robots]
[slingshot.slingshot :refer [get-thrown-object try+]])
(:import (java.net URL)
Expand All @@ -23,7 +24,6 @@
(defn- enqueue*
"Internal function to enqueue a url as a map with :url and :count."
[config url]
(trace :enqueue-url url)
(.put (-> config :state :url-queue)
{:url url :count @(-> config :state :url-count)})
(swap! (-> config :state :url-count) inc))
Expand All @@ -42,8 +42,8 @@
(swap! (-> config :state :seen-urls) assoc url 1)
(if-let [host-limiter (:host-limit config)]
(when (.contains (:host url-info) host-limiter)
(enqueue* config url))
(enqueue* config url))))))
(queues/enqueue* config url))
(queues/enqueue* config url))))))


(defn enqueue-urls
Expand All @@ -58,6 +58,7 @@
(defn extract-all
"Dumb URL extraction based on regular expressions. Extracts relative URLs."
[original-url body]
(trace :extracting original-url)
(when body
(let [candidates1 (->> (re-seq #"href=\"([^\"]+)\"" body)
(map second)
Expand All @@ -77,6 +78,15 @@
all (set (concat fq fq-ufq))]
all)))

(defn request-and-record-time
[config a-url]
(let [current-ts (System/currentTimeMillis)
the-host (-> a-url url :host)]
(do
(swap! queues/*host-last-request* merge {the-host current-ts})
(swap! queues/*host-waiting-queue* merge {the-host false})
(http/get a-url (:http-opts config)))))

(defn- crawl-page
"Internal crawling function that fetches a page, enqueues url found on that
page and calls the handler with the page body."
Expand All @@ -85,14 +95,15 @@
(trace :retrieving-body-for url-map)
(let [url (:url url-map)
score (:count url-map)
body (:body (http/get url (:http-opts config)))
body (:body (request-and-record-time config url))
_ (trace :extracting-urls)
urls ((:url-extractor config) url body)]
(enqueue-urls config urls)
(try
((:handler config) (assoc url-map :body body))
(swap! (-> config :state :url-count) inc)
(catch Exception e
(error e "Exception executing handler"))))
(error e (format "Exception executing handler %s" (:url url-map))))))
(catch java.net.SocketTimeoutException e
(trace "connection timed out to" (:url url-map)))
(catch org.apache.http.conn.ConnectTimeoutException e
Expand Down Expand Up @@ -139,8 +150,8 @@
(trace :running? (get @(-> config :state :worker-canaries) tid))
(let [state (:state config)
limit-reached (and (pos? (:url-limit config))
(= @(:url-count state) (:url-limit config))
(zero? (.size (:url-queue state))))]
(>= @(:url-count state) (:url-limit config)))]
(trace :count @(:url-count state))
(when-not (get @(:worker-canaries state) tid)
(debug "my canary has died, terminating myself"))
(when limit-reached
Expand Down Expand Up @@ -233,7 +244,7 @@
config (merge {:workers 5
:url-limit 100
:url-extractor extract-all
:state {:url-queue (LinkedBlockingQueue.)
:state {:url-queue queues/*ready-queue*
:url-count (atom 0)
:running-workers (ref [])
:worker-canaries (ref {})
Expand All @@ -249,6 +260,7 @@
:insecure? true}
(dotimes [_ (:workers config)]
(start-worker config))
(queues/start-url-queue-worker)
(info "Starting crawl of" (:url config))
(enqueue-url config (:url config)))
config))
Expand Down
60 changes: 60 additions & 0 deletions src/itsy/queues.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
(ns itsy.queues
"Maintain queues that enforce delay policies."
(:require [cemerick.url :refer [url]]
[clojure.tools.logging :refer [trace]])
(:import (java.util.concurrent LinkedBlockingQueue)))

(def *host-last-request* (atom {})) ;record the timestamp of the last
;request made to a host
(def *host-urls-queue* (atom {})) ;maintains a queue per each host
(def *ready-queue* (LinkedBlockingQueue.))
(def *host-waiting-queue* (atom {})) ; keep track of a host's url
; waiting to be requested. Next
; url enqueued only when no other
; requests to a host are pending

(defn default-delay-policy
"Waits 3 seconds before making the next request.
We expect a timestamp to be milliseconds since epoch"
[a-host timestamp]
(if-let [last-hit-ts (@*host-last-request* a-host)]
(< 3000 (- timestamp last-hit-ts))
true))

(defn setup-new-host
[a-host]
(swap! *host-urls-queue* merge {a-host (LinkedBlockingQueue.)})
(swap! *host-waiting-queue* merge {a-host false}))

(defn enqueue*
[config a-url]
(let [processed-url (url a-url)
the-host (-> processed-url :host)]
(if-let [host-queue (@*host-urls-queue* the-host)]
(.put host-queue {:url a-url
:count (-> config :state :url-count)})
(do
(setup-new-host the-host)
(recur config a-url)))))

(defn ready?
"Delay policy must be enforced and the host's
queue must not be waiting on a request"
[a-host]
(and (default-delay-policy a-host (System/currentTimeMillis))
(not (@*host-waiting-queue* a-host))))

(defn url-queue-worker-fn
[]
(doseq [[host host-queue] @*host-urls-queue*]
(when (ready? host)
(when-let [next-url (.poll host-queue)]
(trace :next-url-ready (format "%d-%s" (System/currentTimeMillis) next-url))
(.put *ready-queue* next-url)
(swap! *host-waiting-queue* merge {host true}))))
(recur))

(defn start-url-queue-worker
[]
(let [url-thread (Thread. url-queue-worker-fn)]
(.start url-thread)))