Skip to content

Benchmark was refactored to not overflow memory #102

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
322 changes: 233 additions & 89 deletions bench.lisp
Original file line number Diff line number Diff line change
@@ -1,92 +1,203 @@
;;(push #P"~/Development/MySources/sento/" asdf:*central-registry*)
(asdf:load-system "sento")

(log:config :warn)

(defparameter *starttime* 0)
(defparameter *endtime* 0)

(defparameter *withreply-p* nil)

(defparameter *system* nil)
(defparameter *actor* nil)
(defparameter *counter* 0)
(defparameter +threads+ 8)
(defparameter *per-thread* nil)

(defun max-loop () (* *per-thread* +threads+))

(defun runner-bt (&optional (withreply-p nil) (asyncask nil) (queue-size 0))
(declare (ignore queue-size))
;; dispatchers used for the async-ask
(setf *per-thread* 125000)
(setf *system* (asys:make-actor-system '(:dispatchers (:shared (:workers 8)))))
(setf *actor* (ac:actor-of *system*
:receive (lambda (msg)
(declare (ignore msg))
(incf *counter*))
:dispatcher :pinned))
(setf *withreply-p* withreply-p)
(setf *counter* 0)
(setf *starttime* (get-universal-time))
(format t "Times: ~a~%" (max-loop))
(time
(progn
(map nil #'bt2:join-thread
(mapcar (lambda (x)
(bt2:make-thread
(lambda ()
(dotimes (n *per-thread*)
(if withreply-p
(if asyncask
(act:ask *actor* :foo)
(act:ask-s *actor* :foo))
(act:tell *actor* :foo))))
:name x))
(mapcar (lambda (n) (format nil "thread-~a" n))
(loop :for n :from 1 :to +threads+ :collect n))))
(miscutils:assert-cond (lambda () (= *counter* (max-loop))) 20)))
(setf *endtime* (get-universal-time))
(format t "Counter: ~a~%" *counter*)
(format t "Elapsed: ~a~%" (- *endtime* *starttime*))
(print *system*)
(ac:shutdown *system*))

(defun runner-dp (&optional (withreply-p nil) (asyncask nil) (queue-size 0))
(declare (ignore queue-size))
(setf *per-thread* 125000)
(setf *system* (asys:make-actor-system '(:dispatchers (:shared (:workers 8)))))
(setf *actor* (ac:actor-of *system*
:receive (lambda (msg)
(declare (ignore msg))
(incf *counter*))
:dispatcher :shared))
;;(print *actor*)
(setf *withreply-p* withreply-p)
(setf *counter* 0)
(setf *starttime* (get-universal-time))
(format t "Times: ~a~%" (max-loop))
(time
(progn
(map nil #'bt2:join-thread
(mapcar (lambda (x)
(bt2:make-thread
(lambda ()
(dotimes (n *per-thread*)
(if withreply-p
(if asyncask
(act:ask *actor* :foo)
(act:ask-s *actor* :foo))
(act:tell *actor* :foo))))
:name x))
(mapcar (lambda (n) (format nil "thread-~a" n))
(loop for n from 1 to +threads+ collect n))))
(miscutils:assert-cond (lambda () (= *counter* (max-loop))) 120)))
(setf *endtime* (get-universal-time))
(format t "Counter: ~a~%" *counter*)
(format t "Elapsed: ~a~%" (- *endtime* *starttime*))
;;(print *system*)
(ac:shutdown *system*))
(uiop:define-package #:sento/bench
(:use #:cl)
(:import-from #:org.shirakumo.trivial-benchmark
#:*default-samplers*
#:define-sampler
#:with-timing)
(:import-from #:serapeum
#:eval-always
#:defvar-unbound)
(:import-from #:alexandria
#:with-gensyms)
(:import-from #:sento.queue
#:queue-full-error))
(in-package #:sento/bench)


(defun actor-queue-size (a)
(let* ((msgbox (sento.actor-cell:msgbox a))
(queue (slot-value msgbox
'sento.messageb::queue)))
(sento.queue:queued-count queue)))


(defun total-queues-size (system)
(+ (loop for actor in (sento.actor-system::%all-actors system :user)
summing (actor-queue-size actor))
(loop for actor in (sento.actor-system::%all-actors system :internal)
summing (actor-queue-size actor))))


(defvar-unbound *num-processed-messages*
"We will bind this varible during a benchmark.")


(eval-always
(define-sampler messages-per-second (test-duration processed-messages-count)
(:measure (form)
(with-gensyms (test-started-at)
`(let ((*num-processed-messages* 0)
(,test-started-at (get-internal-real-time)))

;; Benchmark code will be called here:
,form

(setf ,test-duration
(- (get-internal-real-time)
,test-started-at))
(setf ,processed-messages-count
*num-processed-messages*))))
(:commit (commit-fn)
`(,commit-fn messages-per-second
(/ (float ,processed-messages-count 0d0)
(/ ,test-duration
internal-time-units-per-second))))))


(eval-always
(defparameter *samplers*
(list* 'messages-per-second
*default-samplers*)))


;; Use only our message counter.
;; *default-samplers* includes different system
;; metrics similar to metrics TIME macro collects.
;; (eval-always
;; (defparameter *samplers*
;; (list 'messages-per-second)))


(defun run-benchmark (&key
(dispatcher :pinned)
(with-reply-p nil)
(async-ask-p nil)
(num-shared-workers 8)
;; When queue-size is given, then Actor will be created
;; with bound-queue. Otherwise, queue will be unbound.
;; To prevent unbound-queue grow, set wait-if-queue-large-than
;; argument to some value.
(queue-size nil queue-size-given-p)
;; When actor's goes abover this value,
;; generator threads will stop and wait while
;; actor will process some messages from the queue.
;; Can be turned off if set to NIL, but this could
;; lead to a high memory consumption and probably
;; program failure.

;; This setting applies some kind of backpressure,
;; when queue-size is 0 and no other way
;; to keep generators from filling all the memory
;; with messages.
(wait-if-queue-large-than 10000 wait-if-queue-large-than-given-p)
(duration 10)
(num-iterations 60)
(load-threads 8))

(log:config :warn)

(check-type dispatcher (member :shared :pinned))

;; Leave only one default
(when (and queue-size-given-p
(not wait-if-queue-large-than-given-p))
(setf wait-if-queue-large-than nil))

(when (and (not queue-size-given-p)
wait-if-queue-large-than-given-p)
(setf queue-size nil))

(when (and queue-size
(not (zerop queue-size))
wait-if-queue-large-than)
(error "Argument WAIT-IF-QUEUE-LARGE-THAN does not makes sense when QUEUE-SIZE is not zero."))

(when (and async-ask-p
(not with-reply-p))
(error "Argument ASYNC-ASK-P should be given together with WITH-REPLY-P argument."))

;; It is useful to save benchmark results along with all params
;; used to run the benchmark.
(format t "~2&Results for benchmark: ~S~%"
(list :dispatcher dispatcher
:with-reply-p with-reply-p
:async-ask-p async-ask-p
:num-shared-workers num-shared-workers
:queue-size queue-size
:wait-if-queue-large-than wait-if-queue-large-than))
(force-output)

(with-timing (num-iterations
:samplers *samplers*)
(let ((counter 0)
(stop-at (+ (get-internal-real-time)
(* duration internal-time-units-per-second))))
(flet ((receiver (msg)
(declare (ignore msg))
(incf counter)))
(let* ((system (asys:make-actor-system `(:dispatchers (:shared (:workers ,num-shared-workers)))))
(actor (ac:actor-of system
:receive #'receiver
:dispatcher dispatcher
:queue-size queue-size)))
(flet ((sender ()
(loop with check-every = 1000
for iteration upfrom 0
while (< (get-internal-real-time)
stop-at)
do (cond
((and wait-if-queue-large-than
;; Calling queue-size function
;; requires lock acquisition which hits performance
;; and makes message generation up to 10 times slower
;; depending on generator threads cound.
;; That is why each thread checks this count only
;; at some iterations:
(zerop
(mod iteration
check-every))
(< wait-if-queue-large-than
(actor-queue-size actor)))
(sleep (random 0.1)))
(t
(if with-reply-p
(if async-ask-p
(act:ask actor :foo)
(act:ask-s actor :foo))
(handler-case
(act:tell actor :foo)
(queue-full-error ()
;; For this test it is ok to just sleep a little
;; before the next attempt to send message
(sleep (random 0.1))))))))))

(unwind-protect
(progn
(let ((threads
(loop for thread-id from 1 upto load-threads
for thread-name = (format nil "thread-~a" thread-id)
collect (bt2:make-thread #'sender
:name thread-name))))

(unwind-protect (mapc #'bt2:join-thread threads)
;; If user will interrupt execution while we are waiting for threads,
;; we need to clean rest threads:
(loop for thread in threads
when (bt2:thread-alive-p thread)
do (bt2:destroy-thread thread))))

;; Wait while receiver will process all messages in the queue
(miscutils:assert-cond
(lambda ()
(zerop (total-queues-size system)))
60)

(trivial-garbage:gc :full t)

;; To make trivial-benchmark collector see our counter.
(setf *num-processed-messages*
counter))
(ac:shutdown system))))))))


;; (defun runner-lp ()
Expand Down Expand Up @@ -141,3 +252,36 @@
;; (format t "Counter: ~a~%" *counter*)
;; (lparallel:end-kernel)
;; (sento.messageb::stop *msgbox*)))


(defun run-all (&key
(num-iterations 10)
(duration 10))
(run-benchmark :num-iterations num-iterations
:duration duration)

(format t "Running ~A:~%" '(run-benchmark :with-reply-p t :async-ask-p nil))
(run-benchmark :num-iterations num-iterations
:duration duration
:with-reply-p t :async-ask-p nil)

(format t "Running ~A:~%" '(run-benchmark :with-reply-p t :async-ask-p t))
(run-benchmark :num-iterations num-iterations
:duration duration
:with-reply-p t :async-ask-p t)

(format t "Running ~A:~%" '(run-benchmark :queue-size 100))
(run-benchmark :num-iterations num-iterations
:duration duration
:queue-size 100)

(format t "Running ~A:~%" '(run-benchmark :with-reply-p t :async-ask-p nil :queue-size 100))
(run-benchmark :num-iterations num-iterations
:duration duration
:with-reply-p t :async-ask-p nil :queue-size 100)

(format t "Running ~A:~%" '(run-benchmark :with-reply-p t :async-ask-p t :queue-size 100))
(run-benchmark :num-iterations num-iterations
:duration duration
:with-reply-p t :async-ask-p t :queue-size 100))

13 changes: 13 additions & 0 deletions sento.asd
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,19 @@
"mgl-pax/full")
:components ((:file "documentation")))


;; --------------------------------
;; benchmark
;; --------------------------------

(defsystem "sento/bench"
:author "Manfred Bergmann"
:description "Benchmark for Sento"
:depends-on ("sento"
"trivial-benchmark"
"trivial-garbage")
:components ((:file "bench")))

;; load system
;; (asdf:load-system "sento")
;;
Expand Down
3 changes: 2 additions & 1 deletion src/actor-context.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ The `actor-system` and the `actor` itself are composed of an `actor-context`."))

(defun %message-box-for-dispatcher-id (context dispatcher-id queue-size)
(case dispatcher-id
(:pinned (make-instance 'mesgb:message-box/bt))
(:pinned (make-instance 'mesgb:message-box/bt
:max-queue-size queue-size))
(otherwise (let ((dispatcher (%get-shared-dispatcher (system context) dispatcher-id)))
(unless dispatcher
(error (format nil "No such dispatcher identifier '~a' exists!" dispatcher-id)))
Expand Down
12 changes: 12 additions & 0 deletions src/queue/queue-locked.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ than the 'queue' implementation of lparallel.
(not (or (queue-head queue)
(queue-tail queue))))

(defun size (queue)
(let ((head (queue-head queue))
(tail (queue-tail queue)))
(+ (length head)
(length tail))))


#|
queue implementation from lparallel.
Expand Down Expand Up @@ -104,3 +110,9 @@ Copyright (c) 2011-2012, James M. Lawrence. All rights reserved.
(defmethod emptyq-p ((self queue-unbounded))
(with-slots (queue) self
(emptyp queue)))


(defmethod queued-count ((self queue-unbounded))
(with-slots (queue lock) self
(bt2:with-lock-held (lock)
(size queue))))
Loading