diff --git a/rabbitmq/project.clj b/rabbitmq/project.clj index 35cb2ed1b..96a4b3fc1 100644 --- a/rabbitmq/project.clj +++ b/rabbitmq/project.clj @@ -3,6 +3,6 @@ :url "https://github.com/aphyr/jepsen" :license {:name "Eclipse Public License" :url "http://www.eclipse.org/legal/epl-v10.html"} - :dependencies [[org.clojure/clojure "1.6.0"] - [jepsen "0.0.5"] + :dependencies [[org.clojure/clojure "1.9.0"] + [jepsen "0.1.11"] [com.novemberain/langohr "2.7.1" ]]) diff --git a/rabbitmq/src/jepsen/rabbitmq.clj b/rabbitmq/src/jepsen/rabbitmq.clj index 906663d48..88294dcb5 100644 --- a/rabbitmq/src/jepsen/rabbitmq.clj +++ b/rabbitmq/src/jepsen/rabbitmq.clj @@ -27,7 +27,7 @@ (c/cd "/tmp" (let [version "3.5.6" file (str "rabbitmq-server_" version "-1_all.deb")] - (when-not (cu/file? file) + (when-not (cu/exists? file) (info "Fetching deb package") (c/exec :wget (str "http://www.rabbitmq.com/releases/rabbitmq-server/v" version "/" file))) @@ -40,8 +40,10 @@ (c/exec :dpkg :-i file))) ; Set cookie - (when-not (= "jepsen-rabbitmq" - (c/exec :cat "/var/lib/rabbitmq/.erlang.cookie")) + (when-not (and + (cu/exists? "/var/lib/rabbitmq/.erlang.cookie") + (= "jepsen-rabbitmq" + (c/exec :cat "/var/lib/rabbitmq/.erlang.cookie"))) (info "Setting cookie") (c/exec :service :rabbitmq-server :stop) (c/exec :echo "jepsen-rabbitmq" @@ -106,11 +108,11 @@ ; Rabbit+Langohr's auto-ack dynamics mean that even if we issue a dequeue req ; then crash, the message should be re-delivered and we can count this as a ; failure. - (timeout 5000 (assoc op :type :fail :value :timeout) + (timeout 5000 (assoc op :type :fail :error :timeout) (let [[meta payload] (lb/get ch queue) value (codec/decode payload)] (if (nil? meta) - (assoc op :type :fail :value :exhausted) + (assoc op :type :fail :error :empty) (assoc op :type :ok :value value))))) (defmacro with-ch @@ -125,27 +127,26 @@ (defrecord QueueClient [conn] client/Client - (setup! [_ test node] + (open! [client test node] (let [conn (rmq/connect {:host (name node)})] - (with-ch [ch conn] - ; Initialize queue - (lq/declare ch queue - :durable true - :auto-delete false - :exclusive false)) - - ; Return client + (assoc client :conn conn) (QueueClient. conn))) - (teardown! [_ test] - ; Purge + (setup! [client test] + (with-ch [ch conn] + (lq/declare ch queue + :durable true + :auto-delete false + :exclusive false))) + + (teardown! [client test] (meh (with-ch [ch conn] - (lq/purge ch queue))) + (lq/purge ch queue)))) - ; Close + (close! [client test] (meh (rmq/close conn))) - (invoke! [this test op] + (invoke! [client test op] (with-ch [ch conn] (case (:f op) :enqueue (do @@ -165,20 +166,11 @@ :dequeue (dequeue! ch op) - :drain (do - ; Note that this does more dequeues than strictly necessary - ; owing to lazy sequence chunking. - (->> (repeat op) ; Explode drain into - (map #(assoc % :f :dequeue)) ; infinite dequeues, then - (map (partial dequeue! ch)) ; dequeue something - (take-while op/ok?) ; as long as stuff arrives, - (interleave (repeat op)) ; interleave with invokes - (drop 1) ; except the initial one - (map (fn [completion] - (log-op completion) - (core/conj-op! test completion))) - dorun) - (assoc op :type :ok :value :exhausted)))))) + :drain (loop [values []] + (let [v (dequeue! ch op)] + (if (= (:type v) :ok) + (recur (conj values (:value v))) + (assoc op :type :ok, :value values)))))))) (defn queue-client [] (QueueClient. nil)) diff --git a/rabbitmq/test/jepsen/rabbitmq_test.clj b/rabbitmq/test/jepsen/rabbitmq_test.clj index dc0955839..7d3c3b3ee 100644 --- a/rabbitmq/test/jepsen/rabbitmq_test.clj +++ b/rabbitmq/test/jepsen/rabbitmq_test.clj @@ -5,15 +5,16 @@ clojure.test clojure.pprint) (:require [clojure.string :as str] - [jepsen.util :as util] - [jepsen.os.debian :as debian] [jepsen.checker :as checker] [jepsen.checker.timeline :as timeline] - [jepsen.model :as model] + [jepsen.core :as jepsen] [jepsen.generator :as gen] [jepsen.nemesis :as nemesis] + [jepsen.os.debian :as debian] [jepsen.store :as store] - [jepsen.report :as report])) + [jepsen.report :as report] + [jepsen.util :as util] + [knossos.model :as model])) ;(deftest mutex-test ; (let [test (run! @@ -44,7 +45,7 @@ ; (report/linearizability (:linear (:results test))))) (deftest rabbit-test - (let [test (run! + (let [test (jepsen/run! (assoc noop-test :name "rabbitmq-simple-partition" @@ -54,8 +55,8 @@ :nemesis (nemesis/partition-random-halves) :model (model/unordered-queue) :checker (checker/compose - {:queue checker/queue - :total-queue checker/total-queue}) + {:queue (checker/queue) + :total-queue (checker/total-queue)}) :generator (gen/phases (->> (gen/queue) (gen/delay 1/10) @@ -72,6 +73,5 @@ (gen/sleep 60) (gen/clients (gen/each - (gen/once {:type :invoke - :f :drain}))))))] + (gen/once {:type :invoke, :f :drain}))))))] (is (:valid? (:results test)))))