diff --git a/src/itsy/core.clj b/src/itsy/core.clj index c4d66f6..d127080 100644 --- a/src/itsy/core.clj +++ b/src/itsy/core.clj @@ -5,6 +5,7 @@ [clojure.tools.logging :refer [debug error info trace warn]] [clojure.set :as set] [clj-http.client :as http] + [itsy.queues :as queues] [itsy.robots :as robots] [slingshot.slingshot :refer [get-thrown-object try+]]) (:import (java.net URL) @@ -23,7 +24,6 @@ (defn- enqueue* "Internal function to enqueue a url as a map with :url and :count." [config url] - (trace :enqueue-url url) (.put (-> config :state :url-queue) {:url url :count @(-> config :state :url-count)}) (swap! (-> config :state :url-count) inc)) @@ -42,8 +42,8 @@ (swap! (-> config :state :seen-urls) assoc url 1) (if-let [host-limiter (:host-limit config)] (when (.contains (:host url-info) host-limiter) - (enqueue* config url)) - (enqueue* config url)))))) + (queues/enqueue* config url)) + (queues/enqueue* config url)))))) (defn enqueue-urls @@ -58,6 +58,7 @@ (defn extract-all "Dumb URL extraction based on regular expressions. Extracts relative URLs." [original-url body] + (trace :extracting original-url) (when body (let [candidates1 (->> (re-seq #"href=\"([^\"]+)\"" body) (map second) @@ -77,6 +78,15 @@ all (set (concat fq fq-ufq))] all))) +(defn request-and-record-time + [config a-url] + (let [current-ts (System/currentTimeMillis) + the-host (-> a-url url :host)] + (do + (swap! queues/*host-last-request* merge {the-host current-ts}) + (swap! queues/*host-waiting-queue* merge {the-host false}) + (http/get a-url (:http-opts config))))) + (defn- crawl-page "Internal crawling function that fetches a page, enqueues url found on that page and calls the handler with the page body." @@ -85,14 +95,15 @@ (trace :retrieving-body-for url-map) (let [url (:url url-map) score (:count url-map) - body (:body (http/get url (:http-opts config))) + body (:body (request-and-record-time config url)) _ (trace :extracting-urls) urls ((:url-extractor config) url body)] (enqueue-urls config urls) (try ((:handler config) (assoc url-map :body body)) + (swap! (-> config :state :url-count) inc) (catch Exception e - (error e "Exception executing handler")))) + (error e (format "Exception executing handler %s" (:url url-map)))))) (catch java.net.SocketTimeoutException e (trace "connection timed out to" (:url url-map))) (catch org.apache.http.conn.ConnectTimeoutException e @@ -139,8 +150,8 @@ (trace :running? (get @(-> config :state :worker-canaries) tid)) (let [state (:state config) limit-reached (and (pos? (:url-limit config)) - (= @(:url-count state) (:url-limit config)) - (zero? (.size (:url-queue state))))] + (>= @(:url-count state) (:url-limit config)))] + (trace :count @(:url-count state)) (when-not (get @(:worker-canaries state) tid) (debug "my canary has died, terminating myself")) (when limit-reached @@ -233,7 +244,7 @@ config (merge {:workers 5 :url-limit 100 :url-extractor extract-all - :state {:url-queue (LinkedBlockingQueue.) + :state {:url-queue queues/*ready-queue* :url-count (atom 0) :running-workers (ref []) :worker-canaries (ref {}) @@ -249,6 +260,7 @@ :insecure? true} (dotimes [_ (:workers config)] (start-worker config)) + (queues/start-url-queue-worker) (info "Starting crawl of" (:url config)) (enqueue-url config (:url config))) config)) diff --git a/src/itsy/queues.clj b/src/itsy/queues.clj new file mode 100644 index 0000000..4ebb30d --- /dev/null +++ b/src/itsy/queues.clj @@ -0,0 +1,60 @@ +(ns itsy.queues + "Maintain queues that enforce delay policies." + (:require [cemerick.url :refer [url]] + [clojure.tools.logging :refer [trace]]) + (:import (java.util.concurrent LinkedBlockingQueue))) + +(def *host-last-request* (atom {})) ;record the timestamp of the last + ;request made to a host +(def *host-urls-queue* (atom {})) ;maintains a queue per each host +(def *ready-queue* (LinkedBlockingQueue.)) +(def *host-waiting-queue* (atom {})) ; keep track of a host's url + ; waiting to be requested. Next + ; url enqueued only when no other + ; requests to a host are pending + +(defn default-delay-policy + "Waits 3 seconds before making the next request. +We expect a timestamp to be milliseconds since epoch" + [a-host timestamp] + (if-let [last-hit-ts (@*host-last-request* a-host)] + (< 3000 (- timestamp last-hit-ts)) + true)) + +(defn setup-new-host + [a-host] + (swap! *host-urls-queue* merge {a-host (LinkedBlockingQueue.)}) + (swap! *host-waiting-queue* merge {a-host false})) + +(defn enqueue* + [config a-url] + (let [processed-url (url a-url) + the-host (-> processed-url :host)] + (if-let [host-queue (@*host-urls-queue* the-host)] + (.put host-queue {:url a-url + :count (-> config :state :url-count)}) + (do + (setup-new-host the-host) + (recur config a-url))))) + +(defn ready? + "Delay policy must be enforced and the host's +queue must not be waiting on a request" + [a-host] + (and (default-delay-policy a-host (System/currentTimeMillis)) + (not (@*host-waiting-queue* a-host)))) + +(defn url-queue-worker-fn + [] + (doseq [[host host-queue] @*host-urls-queue*] + (when (ready? host) + (when-let [next-url (.poll host-queue)] + (trace :next-url-ready (format "%d-%s" (System/currentTimeMillis) next-url)) + (.put *ready-queue* next-url) + (swap! *host-waiting-queue* merge {host true})))) + (recur)) + +(defn start-url-queue-worker + [] + (let [url-thread (Thread. url-queue-worker-fn)] + (.start url-thread)))