Skip to content

Commit

Permalink
rabbitmq: update to jepsen 0.1.11
Browse files Browse the repository at this point in the history
  • Loading branch information
vjuranek committed Feb 14, 2019
1 parent 3523e6f commit 167429e
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 44 deletions.
4 changes: 2 additions & 2 deletions rabbitmq/project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
:url "https://github.com/aphyr/jepsen"
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:dependencies [[org.clojure/clojure "1.6.0"]
[jepsen "0.0.5"]
:dependencies [[org.clojure/clojure "1.9.0"]
[jepsen "0.1.11"]
[com.novemberain/langohr "2.7.1" ]])
58 changes: 25 additions & 33 deletions rabbitmq/src/jepsen/rabbitmq.clj
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
(c/cd "/tmp"
(let [version "3.5.6"
file (str "rabbitmq-server_" version "-1_all.deb")]
(when-not (cu/file? file)
(when-not (cu/exists? file)
(info "Fetching deb package")
(c/exec :wget (str "http://www.rabbitmq.com/releases/rabbitmq-server/v" version "/" file)))

Expand All @@ -40,8 +40,10 @@
(c/exec :dpkg :-i file)))

; Set cookie
(when-not (= "jepsen-rabbitmq"
(c/exec :cat "/var/lib/rabbitmq/.erlang.cookie"))
(when-not (and
(cu/exists? "/var/lib/rabbitmq/.erlang.cookie")
(= "jepsen-rabbitmq"
(c/exec :cat "/var/lib/rabbitmq/.erlang.cookie")))
(info "Setting cookie")
(c/exec :service :rabbitmq-server :stop)
(c/exec :echo "jepsen-rabbitmq"
Expand Down Expand Up @@ -106,11 +108,11 @@
; Rabbit+Langohr's auto-ack dynamics mean that even if we issue a dequeue req
; then crash, the message should be re-delivered and we can count this as a
; failure.
(timeout 5000 (assoc op :type :fail :value :timeout)
(timeout 5000 (assoc op :type :fail :error :timeout)
(let [[meta payload] (lb/get ch queue)
value (codec/decode payload)]
(if (nil? meta)
(assoc op :type :fail :value :exhausted)
(assoc op :type :fail :error :empty)
(assoc op :type :ok :value value)))))

(defmacro with-ch
Expand All @@ -125,27 +127,26 @@

(defrecord QueueClient [conn]
client/Client
(setup! [_ test node]
(open! [client test node]
(let [conn (rmq/connect {:host (name node)})]
(with-ch [ch conn]
; Initialize queue
(lq/declare ch queue
:durable true
:auto-delete false
:exclusive false))

; Return client
(assoc client :conn conn)
(QueueClient. conn)))

(teardown! [_ test]
; Purge
(setup! [client test]
(with-ch [ch conn]
(lq/declare ch queue
:durable true
:auto-delete false
:exclusive false)))

(teardown! [client test]
(meh (with-ch [ch conn]
(lq/purge ch queue)))
(lq/purge ch queue))))

; Close
(close! [client test]
(meh (rmq/close conn)))

(invoke! [this test op]
(invoke! [client test op]
(with-ch [ch conn]
(case (:f op)
:enqueue (do
Expand All @@ -165,20 +166,11 @@

:dequeue (dequeue! ch op)

:drain (do
; Note that this does more dequeues than strictly necessary
; owing to lazy sequence chunking.
(->> (repeat op) ; Explode drain into
(map #(assoc % :f :dequeue)) ; infinite dequeues, then
(map (partial dequeue! ch)) ; dequeue something
(take-while op/ok?) ; as long as stuff arrives,
(interleave (repeat op)) ; interleave with invokes
(drop 1) ; except the initial one
(map (fn [completion]
(log-op completion)
(core/conj-op! test completion)))
dorun)
(assoc op :type :ok :value :exhausted))))))
:drain (loop [values []]
(let [v (dequeue! ch op)]
(if (= (:type v) :ok)
(recur (conj values (:value v)))
(assoc op :type :ok, :value values))))))))

(defn queue-client [] (QueueClient. nil))

Expand Down
18 changes: 9 additions & 9 deletions rabbitmq/test/jepsen/rabbitmq_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@
clojure.test
clojure.pprint)
(:require [clojure.string :as str]
[jepsen.util :as util]
[jepsen.os.debian :as debian]
[jepsen.checker :as checker]
[jepsen.checker.timeline :as timeline]
[jepsen.model :as model]
[jepsen.core :as jepsen]
[jepsen.generator :as gen]
[jepsen.nemesis :as nemesis]
[jepsen.os.debian :as debian]
[jepsen.store :as store]
[jepsen.report :as report]))
[jepsen.report :as report]
[jepsen.util :as util]
[knossos.model :as model]))

;(deftest mutex-test
; (let [test (run!
Expand Down Expand Up @@ -44,7 +45,7 @@
; (report/linearizability (:linear (:results test)))))

(deftest rabbit-test
(let [test (run!
(let [test (jepsen/run!
(assoc
noop-test
:name "rabbitmq-simple-partition"
Expand All @@ -54,8 +55,8 @@
:nemesis (nemesis/partition-random-halves)
:model (model/unordered-queue)
:checker (checker/compose
{:queue checker/queue
:total-queue checker/total-queue})
{:queue (checker/queue)
:total-queue (checker/total-queue)})
:generator (gen/phases
(->> (gen/queue)
(gen/delay 1/10)
Expand All @@ -72,6 +73,5 @@
(gen/sleep 60)
(gen/clients
(gen/each
(gen/once {:type :invoke
:f :drain}))))))]
(gen/once {:type :invoke, :f :drain}))))))]
(is (:valid? (:results test)))))

0 comments on commit 167429e

Please sign in to comment.