|
1 |
| -;;(push #P"~/Development/MySources/sento/" asdf:*central-registry*) |
2 |
| -(asdf:load-system "sento") |
3 |
| - |
4 |
| -(log:config :warn) |
5 |
| - |
6 |
| -(defparameter *starttime* 0) |
7 |
| -(defparameter *endtime* 0) |
8 |
| - |
9 |
| -(defparameter *withreply-p* nil) |
10 |
| - |
11 |
| -(defparameter *system* nil) |
12 |
| -(defparameter *actor* nil) |
13 |
| -(defparameter *counter* 0) |
14 |
| -(defparameter +threads+ 8) |
15 |
| -(defparameter *per-thread* nil) |
16 |
| - |
17 |
| -(defun max-loop () (* *per-thread* +threads+)) |
18 |
| - |
19 |
| -(defun runner-bt (&optional (withreply-p nil) (asyncask nil) (queue-size 0)) |
20 |
| - (declare (ignore queue-size)) |
21 |
| - ;; dispatchers used for the async-ask |
22 |
| - (setf *per-thread* 125000) |
23 |
| - (setf *system* (asys:make-actor-system '(:dispatchers (:shared (:workers 8))))) |
24 |
| - (setf *actor* (ac:actor-of *system* |
25 |
| - :receive (lambda (msg) |
26 |
| - (declare (ignore msg)) |
27 |
| - (incf *counter*)) |
28 |
| - :dispatcher :pinned)) |
29 |
| - (setf *withreply-p* withreply-p) |
30 |
| - (setf *counter* 0) |
31 |
| - (setf *starttime* (get-universal-time)) |
32 |
| - (format t "Times: ~a~%" (max-loop)) |
33 |
| - (time |
34 |
| - (progn |
35 |
| - (map nil #'bt2:join-thread |
36 |
| - (mapcar (lambda (x) |
37 |
| - (bt2:make-thread |
38 |
| - (lambda () |
39 |
| - (dotimes (n *per-thread*) |
40 |
| - (if withreply-p |
41 |
| - (if asyncask |
42 |
| - (act:ask *actor* :foo) |
43 |
| - (act:ask-s *actor* :foo)) |
44 |
| - (act:tell *actor* :foo)))) |
45 |
| - :name x)) |
46 |
| - (mapcar (lambda (n) (format nil "thread-~a" n)) |
47 |
| - (loop :for n :from 1 :to +threads+ :collect n)))) |
48 |
| - (miscutils:assert-cond (lambda () (= *counter* (max-loop))) 20))) |
49 |
| - (setf *endtime* (get-universal-time)) |
50 |
| - (format t "Counter: ~a~%" *counter*) |
51 |
| - (format t "Elapsed: ~a~%" (- *endtime* *starttime*)) |
52 |
| - (print *system*) |
53 |
| - (ac:shutdown *system*)) |
54 |
| - |
55 |
| -(defun runner-dp (&optional (withreply-p nil) (asyncask nil) (queue-size 0)) |
56 |
| - (declare (ignore queue-size)) |
57 |
| - (setf *per-thread* 125000) |
58 |
| - (setf *system* (asys:make-actor-system '(:dispatchers (:shared (:workers 8))))) |
59 |
| - (setf *actor* (ac:actor-of *system* |
60 |
| - :receive (lambda (msg) |
61 |
| - (declare (ignore msg)) |
62 |
| - (incf *counter*)) |
63 |
| - :dispatcher :shared)) |
64 |
| - ;;(print *actor*) |
65 |
| - (setf *withreply-p* withreply-p) |
66 |
| - (setf *counter* 0) |
67 |
| - (setf *starttime* (get-universal-time)) |
68 |
| - (format t "Times: ~a~%" (max-loop)) |
69 |
| - (time |
70 |
| - (progn |
71 |
| - (map nil #'bt2:join-thread |
72 |
| - (mapcar (lambda (x) |
73 |
| - (bt2:make-thread |
74 |
| - (lambda () |
75 |
| - (dotimes (n *per-thread*) |
76 |
| - (if withreply-p |
77 |
| - (if asyncask |
78 |
| - (act:ask *actor* :foo) |
79 |
| - (act:ask-s *actor* :foo)) |
80 |
| - (act:tell *actor* :foo)))) |
81 |
| - :name x)) |
82 |
| - (mapcar (lambda (n) (format nil "thread-~a" n)) |
83 |
| - (loop for n from 1 to +threads+ collect n)))) |
84 |
| - (miscutils:assert-cond (lambda () (= *counter* (max-loop))) 120))) |
85 |
| - (setf *endtime* (get-universal-time)) |
86 |
| - (format t "Counter: ~a~%" *counter*) |
87 |
| - (format t "Elapsed: ~a~%" (- *endtime* *starttime*)) |
88 |
| - ;;(print *system*) |
89 |
| - (ac:shutdown *system*)) |
| 1 | +(uiop:define-package #:sento/bench |
| 2 | + (:use #:cl) |
| 3 | + (:import-from #:org.shirakumo.trivial-benchmark |
| 4 | + #:*default-samplers* |
| 5 | + #:define-sampler |
| 6 | + #:with-timing) |
| 7 | + (:import-from #:serapeum |
| 8 | + #:eval-always |
| 9 | + #:defvar-unbound) |
| 10 | + (:import-from #:alexandria |
| 11 | + #:with-gensyms) |
| 12 | + (:import-from #:sento.queue |
| 13 | + #:queue-full-error)) |
| 14 | +(in-package #:sento/bench) |
| 15 | + |
| 16 | + |
| 17 | +(defun actor-queue-size (a) |
| 18 | + (let* ((msgbox (sento.actor-cell:msgbox a)) |
| 19 | + (queue (slot-value msgbox |
| 20 | + 'sento.messageb::queue))) |
| 21 | + (sento.queue:queued-count queue))) |
| 22 | + |
| 23 | + |
| 24 | +(defun total-queues-size (system) |
| 25 | + (+ (loop for actor in (sento.actor-system::%all-actors system :user) |
| 26 | + summing (actor-queue-size actor)) |
| 27 | + (loop for actor in (sento.actor-system::%all-actors system :internal) |
| 28 | + summing (actor-queue-size actor)))) |
| 29 | + |
| 30 | + |
| 31 | +(defvar-unbound *num-processed-messages* |
| 32 | + "We will bind this varible during a benchmark.") |
| 33 | + |
| 34 | + |
| 35 | +(eval-always |
| 36 | + (define-sampler messages-per-second (test-duration processed-messages-count) |
| 37 | + (:measure (form) |
| 38 | + (with-gensyms (test-started-at) |
| 39 | + `(let ((*num-processed-messages* 0) |
| 40 | + (,test-started-at (get-internal-real-time))) |
| 41 | + |
| 42 | + ;; Benchmark code will be called here: |
| 43 | + ,form |
| 44 | + |
| 45 | + (setf ,test-duration |
| 46 | + (- (get-internal-real-time) |
| 47 | + ,test-started-at)) |
| 48 | + (setf ,processed-messages-count |
| 49 | + *num-processed-messages*)))) |
| 50 | + (:commit (commit-fn) |
| 51 | + `(,commit-fn messages-per-second |
| 52 | + (/ (float ,processed-messages-count 0d0) |
| 53 | + (/ ,test-duration |
| 54 | + internal-time-units-per-second)))))) |
| 55 | + |
| 56 | + |
| 57 | +(eval-always |
| 58 | + (defparameter *samplers* |
| 59 | + (list* 'messages-per-second |
| 60 | + *default-samplers*))) |
| 61 | + |
| 62 | + |
| 63 | +;; Use only our message counter. |
| 64 | +;; *default-samplers* includes different system |
| 65 | +;; metrics similar to metrics TIME macro collects. |
| 66 | +;; (eval-always |
| 67 | +;; (defparameter *samplers* |
| 68 | +;; (list 'messages-per-second))) |
| 69 | + |
| 70 | + |
| 71 | +(defun run-benchmark (&key |
| 72 | + (dispatcher :pinned) |
| 73 | + (with-reply-p nil) |
| 74 | + (async-ask-p nil) |
| 75 | + (num-shared-workers 8) |
| 76 | + ;; When queue-size is given, then Actor will be created |
| 77 | + ;; with bound-queue. Otherwise, queue will be unbound. |
| 78 | + ;; To prevent unbound-queue grow, set wait-if-queue-large-than |
| 79 | + ;; argument to some value. |
| 80 | + (queue-size nil queue-size-given-p) |
| 81 | + ;; When actor's goes abover this value, |
| 82 | + ;; generator threads will stop and wait while |
| 83 | + ;; actor will process some messages from the queue. |
| 84 | + ;; Can be turned off if set to NIL, but this could |
| 85 | + ;; lead to a high memory consumption and probably |
| 86 | + ;; program failure. |
| 87 | + |
| 88 | + ;; This setting applies some kind of backpressure, |
| 89 | + ;; when queue-size is 0 and no other way |
| 90 | + ;; to keep generators from filling all the memory |
| 91 | + ;; with messages. |
| 92 | + (wait-if-queue-large-than 10000 wait-if-queue-large-than-given-p) |
| 93 | + (duration 10) |
| 94 | + (num-iterations 60) |
| 95 | + (load-threads 8)) |
| 96 | + |
| 97 | + (log:config :warn) |
| 98 | + |
| 99 | + (check-type dispatcher (member :shared :pinned)) |
| 100 | + |
| 101 | + ;; Leave only one default |
| 102 | + (when (and queue-size-given-p |
| 103 | + (not wait-if-queue-large-than-given-p)) |
| 104 | + (setf wait-if-queue-large-than nil)) |
| 105 | + |
| 106 | + (when (and (not queue-size-given-p) |
| 107 | + wait-if-queue-large-than-given-p) |
| 108 | + (setf queue-size nil)) |
| 109 | + |
| 110 | + (when (and queue-size |
| 111 | + (not (zerop queue-size)) |
| 112 | + wait-if-queue-large-than) |
| 113 | + (error "Argument WAIT-IF-QUEUE-LARGE-THAN does not makes sense when QUEUE-SIZE is not zero.")) |
| 114 | + |
| 115 | + (when (and async-ask-p |
| 116 | + (not with-reply-p)) |
| 117 | + (error "Argument ASYNC-ASK-P should be given together with WITH-REPLY-P argument.")) |
| 118 | + |
| 119 | + ;; It is useful to save benchmark results along with all params |
| 120 | + ;; used to run the benchmark. |
| 121 | + (format t "~2&Results for benchmark: ~S~%" |
| 122 | + (list :dispatcher dispatcher |
| 123 | + :with-reply-p with-reply-p |
| 124 | + :async-ask-p async-ask-p |
| 125 | + :num-shared-workers num-shared-workers |
| 126 | + :queue-size queue-size |
| 127 | + :wait-if-queue-large-than wait-if-queue-large-than)) |
| 128 | + (force-output) |
| 129 | + |
| 130 | + (with-timing (num-iterations |
| 131 | + :samplers *samplers*) |
| 132 | + (let ((counter 0) |
| 133 | + (stop-at (+ (get-internal-real-time) |
| 134 | + (* duration internal-time-units-per-second)))) |
| 135 | + (flet ((receiver (msg) |
| 136 | + (declare (ignore msg)) |
| 137 | + (incf counter))) |
| 138 | + (let* ((system (asys:make-actor-system `(:dispatchers (:shared (:workers ,num-shared-workers))))) |
| 139 | + (actor (ac:actor-of system |
| 140 | + :receive #'receiver |
| 141 | + :dispatcher dispatcher |
| 142 | + :queue-size queue-size))) |
| 143 | + (flet ((sender () |
| 144 | + (loop with check-every = 1000 |
| 145 | + for iteration upfrom 0 |
| 146 | + while (< (get-internal-real-time) |
| 147 | + stop-at) |
| 148 | + do (cond |
| 149 | + ((and wait-if-queue-large-than |
| 150 | + ;; Calling queue-size function |
| 151 | + ;; requires lock acquisition which hits performance |
| 152 | + ;; and makes message generation up to 10 times slower |
| 153 | + ;; depending on generator threads cound. |
| 154 | + ;; That is why each thread checks this count only |
| 155 | + ;; at some iterations: |
| 156 | + (zerop |
| 157 | + (mod iteration |
| 158 | + check-every)) |
| 159 | + (< wait-if-queue-large-than |
| 160 | + (actor-queue-size actor))) |
| 161 | + (sleep (random 0.1))) |
| 162 | + (t |
| 163 | + (if with-reply-p |
| 164 | + (if async-ask-p |
| 165 | + (act:ask actor :foo) |
| 166 | + (act:ask-s actor :foo)) |
| 167 | + (handler-case |
| 168 | + (act:tell actor :foo) |
| 169 | + (queue-full-error () |
| 170 | + ;; For this test it is ok to just sleep a little |
| 171 | + ;; before the next attempt to send message |
| 172 | + (sleep (random 0.1)))))))))) |
| 173 | + |
| 174 | + (unwind-protect |
| 175 | + (progn |
| 176 | + (let ((threads |
| 177 | + (loop for thread-id from 1 upto load-threads |
| 178 | + for thread-name = (format nil "thread-~a" thread-id) |
| 179 | + collect (bt2:make-thread #'sender |
| 180 | + :name thread-name)))) |
| 181 | + |
| 182 | + (unwind-protect (mapc #'bt2:join-thread threads) |
| 183 | + ;; If user will interrupt execution while we are waiting for threads, |
| 184 | + ;; we need to clean rest threads: |
| 185 | + (loop for thread in threads |
| 186 | + when (bt2:thread-alive-p thread) |
| 187 | + do (bt2:destroy-thread thread)))) |
| 188 | + |
| 189 | + ;; Wait while receiver will process all messages in the queue |
| 190 | + (miscutils:assert-cond |
| 191 | + (lambda () |
| 192 | + (zerop (total-queues-size system))) |
| 193 | + 60) |
| 194 | + |
| 195 | + (trivial-garbage:gc :full t) |
| 196 | + |
| 197 | + ;; To make trivial-benchmark collector see our counter. |
| 198 | + (setf *num-processed-messages* |
| 199 | + counter)) |
| 200 | + (ac:shutdown system)))))))) |
90 | 201 |
|
91 | 202 |
|
92 | 203 | ;; (defun runner-lp ()
|
|
141 | 252 | ;; (format t "Counter: ~a~%" *counter*)
|
142 | 253 | ;; (lparallel:end-kernel)
|
143 | 254 | ;; (sento.messageb::stop *msgbox*)))
|
| 255 | + |
| 256 | + |
| 257 | +(defun run-all (&key |
| 258 | + (num-iterations 10) |
| 259 | + (duration 10)) |
| 260 | + (run-benchmark :num-iterations num-iterations |
| 261 | + :duration duration) |
| 262 | + |
| 263 | + (format t "Running ~A:~%" '(run-benchmark :with-reply-p t :async-ask-p nil)) |
| 264 | + (run-benchmark :num-iterations num-iterations |
| 265 | + :duration duration |
| 266 | + :with-reply-p t :async-ask-p nil) |
| 267 | + |
| 268 | + (format t "Running ~A:~%" '(run-benchmark :with-reply-p t :async-ask-p t)) |
| 269 | + (run-benchmark :num-iterations num-iterations |
| 270 | + :duration duration |
| 271 | + :with-reply-p t :async-ask-p t) |
| 272 | + |
| 273 | + (format t "Running ~A:~%" '(run-benchmark :queue-size 100)) |
| 274 | + (run-benchmark :num-iterations num-iterations |
| 275 | + :duration duration |
| 276 | + :queue-size 100) |
| 277 | + |
| 278 | + (format t "Running ~A:~%" '(run-benchmark :with-reply-p t :async-ask-p nil :queue-size 100)) |
| 279 | + (run-benchmark :num-iterations num-iterations |
| 280 | + :duration duration |
| 281 | + :with-reply-p t :async-ask-p nil :queue-size 100) |
| 282 | + |
| 283 | + (format t "Running ~A:~%" '(run-benchmark :with-reply-p t :async-ask-p t :queue-size 100)) |
| 284 | + (run-benchmark :num-iterations num-iterations |
| 285 | + :duration duration |
| 286 | + :with-reply-p t :async-ask-p t :queue-size 100)) |
| 287 | + |
0 commit comments