diff --git a/bench.lisp b/bench.lisp index ef4c81e..ccc018f 100644 --- a/bench.lisp +++ b/bench.lisp @@ -92,7 +92,8 @@ (wait-if-queue-larger-than 10000 wait-if-queue-larger-than-given-p) (duration 10) (num-iterations 60) - (load-threads 8)) + (load-threads 8) + (time-out nil)) (log:config :warn) @@ -124,7 +125,8 @@ :async-ask-p async-ask-p :num-shared-workers num-shared-workers :queue-size queue-size - :wait-if-queue-larger-than wait-if-queue-larger-than)) + :wait-if-queue-larger-than wait-if-queue-larger-than + :time-out time-out)) (force-output) (with-timing (num-iterations @@ -255,33 +257,72 @@ (defun run-all (&key - (num-iterations 10) - (duration 10)) + (num-iterations 60) + (duration 10) + (queue-size 100) + (time-out 3) + &aux (started-at (get-internal-real-time))) (run-benchmark :num-iterations num-iterations - :duration duration) + :duration duration + :with-reply-p nil + :async-ask-p nil) - (format t "Running ~A:~%" '(run-benchmark :with-reply-p t :async-ask-p nil)) (run-benchmark :num-iterations num-iterations :duration duration - :with-reply-p t :async-ask-p nil) + :with-reply-p t + :async-ask-p nil) + + (run-benchmark :num-iterations num-iterations + :duration duration + :with-reply-p t + :async-ask-p t) + + + (format t "With queue size limited to ~A:~2%" + queue-size) - (format t "Running ~A:~%" '(run-benchmark :with-reply-p t :async-ask-p t)) (run-benchmark :num-iterations num-iterations :duration duration - :with-reply-p t :async-ask-p t) + :with-reply-p nil + :async-ask-p nil + :queue-size queue-size) - (format t "Running ~A:~%" '(run-benchmark :queue-size 100)) (run-benchmark :num-iterations num-iterations :duration duration - :queue-size 100) + :with-reply-p t + :async-ask-p nil + :queue-size queue-size) - (format t "Running ~A:~%" '(run-benchmark :with-reply-p t :async-ask-p nil :queue-size 100)) (run-benchmark :num-iterations num-iterations :duration duration - :with-reply-p t :async-ask-p nil :queue-size 100) + :with-reply-p t + :async-ask-p t + :queue-size queue-size) + + + (format t "With time-out ~A:~2%" + time-out) - (format t "Running ~A:~%" '(run-benchmark :with-reply-p t :async-ask-p t :queue-size 100)) (run-benchmark :num-iterations num-iterations :duration duration - :with-reply-p t :async-ask-p t :queue-size 100)) + :with-reply-p nil + :async-ask-p nil + ;; This should not make sense for with-reply-p = nil + :time-out time-out) + + (run-benchmark :num-iterations num-iterations + :duration duration + :with-reply-p t + :async-ask-p nil + :time-out time-out) + + (run-benchmark :num-iterations num-iterations + :duration duration + :with-reply-p t + :async-ask-p t + :time-out time-out) + + (format t "All tests are performed in ~,2f seconds.~%" + (/ (- (get-internal-real-time) started-at) + internal-time-units-per-second))) diff --git a/sento.asd b/sento.asd index 6ccfc25..6a58d83 100644 --- a/sento.asd +++ b/sento.asd @@ -65,9 +65,11 @@ :author "Manfred Bergmann" :depends-on ("sento" "fiveam" + "serapeum" "lparallel" "cl-mock") :components ((:module "tests" + :serial t :components ((:file "all-test") (:file "miscutils-test") @@ -95,7 +97,8 @@ (:file "actor-system-test") (:file "actor-tree-test") (:file "spawn-in-receive-test") - ))) + (:file "test-utils") + (:file "message-box-test")))) :description "Test system for sento" :perform (test-op (op c) (symbol-call :fiveam :run! (uiop:find-symbol* '#:test-suite diff --git a/src/mbox/message-box.lisp b/src/mbox/message-box.lisp index aaa2a16..81fa0ba 100644 --- a/src/mbox/message-box.lisp +++ b/src/mbox/message-box.lisp @@ -113,18 +113,41 @@ This is used to break the environment possibly captured as closure at 'submit' s (defclass message-box/bt (message-box-base) ((queue-thread :initform nil :documentation - "The thread that pops queue items.")) + "The thread that pops queue items.") + (thread-is-running-p :initform nil + :type boolean + :documentation + "Will be set to NIL if processing loop will be broken because of an error or a restart invocation.")) (:documentation "Bordeaux-Threads based message-box with a single thread operating on a message queue. This is used when the actor is created using a `:pinned` dispatcher type. There is a limit on the maximum number of actors/agents that can be created with this kind of queue because each message-box (and with that each actor) requires exactly one thread.")) + +(declaim (ftype (function (message-box/bt &key (:thread-name (or null string))) + (values &optional)) + start-thread)) + +(defun start-thread (msgbox &key thread-name) + (with-slots (name queue-thread thread-is-running-p) + msgbox + (flet ((run-processing-loop () + (setf thread-is-running-p t) + (unwind-protect + (message-processing-loop msgbox) + (setf thread-is-running-p + nil)))) + (setf queue-thread + (bt2:make-thread #'run-processing-loop + :name (or thread-name + (mkstr "message-thread-" name)))))) + (values)) + + (defmethod initialize-instance :after ((self message-box/bt) &key) - (with-slots (name queue-thread) self - (setf queue-thread (bt2:make-thread - (lambda () (message-processing-loop self)) - :name (mkstr "message-thread-" name)))) + (start-thread self) + (when (next-method-p) (call-next-method))) @@ -179,6 +202,24 @@ This function sets the result as `handler-result' in `item'. The return of this (bt2:condition-notify withreply-cvar))) (handler-fun))))) + +(declaim (ftype (function (message-box/bt) + (values &optional)) + ensure-thread-is-running)) + +(defun ensure-thread-is-running (msgbox) + (with-slots (queue-thread thread-is-running-p should-run) + msgbox + (when (and (not thread-is-running-p) + should-run) + ;; Just to be sure that thread is not alive: + (unless (bt2:thread-alive-p queue-thread) + (let ((thread-name (bt2:thread-name queue-thread))) + (log:warn "Restarting thread" thread-name) + (start-thread msgbox + :thread-name thread-name)))) + (values))) + (defmethod submit ((self message-box/bt) message withreply-p time-out handler-fun-args) "The `handler-fun-args` argument must contain a handler function as first list item. It will be apply'ed with the rest of the args when the message was 'popped' from queue." @@ -200,15 +241,21 @@ It will be apply'ed with the rest of the args when the message was 'popped' from :time-out time-out :handler-fun-args handler-fun-args :handler-result 'no-result))) - (log:trace "~a: withreply: waiting for arrival of result..." (name msgbox)) + (bt2:with-lock-held (withreply-lock) (log:trace "~a: pushing item to queue: ~a" (name msgbox) push-item) (queue:pushq queue push-item) - - (if time-out - (wait-and-probe-for-msg-handler-result msgbox push-item) - (bt2:condition-wait withreply-cvar withreply-lock))) - + (ensure-thread-is-running msgbox) + + (log:trace "~a: withreply: waiting for arrival of result..." (name msgbox)) + + (unless (bt2:condition-wait withreply-cvar withreply-lock + :timeout time-out) + (log:warn "~a: time-out elapsed but result not available yet!" (name msgbox)) + (setf (slot-value push-item 'cancelled-p) t) + (error 'ask-timeout + :wait-time time-out))) + (with-slots (handler-result) push-item (log:trace "~a: withreply: result should be available: ~a" (name msgbox) handler-result) handler-result))) @@ -222,6 +269,7 @@ The submitting code has to await the side-effect and possibly handle a timeout." :handler-fun-args handler-fun-args))) (log:trace "~a: pushing item to queue: ~a" (name msgbox) push-item) (queue:pushq queue push-item) + (ensure-thread-is-running msgbox) t)) (defmethod stop ((self message-box/bt) &optional (wait nil)) @@ -301,6 +349,7 @@ Returns the handler-result if `withreply-p' is eq to `T', otherwise the return i processed-messages dispatcher) self (incf processed-messages) + (let ((push-item (make-message-item/dp :message message :handler-fun-args handler-fun-args diff --git a/tests/message-box-test.lisp b/tests/message-box-test.lisp new file mode 100644 index 0000000..bbe1620 --- /dev/null +++ b/tests/message-box-test.lisp @@ -0,0 +1,111 @@ +(defpackage :sento.message-box-test + (:use :cl :fiveam :cl-mock :sento.actor :sento.future) + (:shadow #:! #:?) + (:import-from #:miscutils + #:assert-cond + #:await-cond + #:filter) + (:import-from #:timeutils + #:ask-timeout) + (:import-from #:sento.messageb + #:message-box/bt + #:submit + #:no-result + #:queue-thread + #:stop) + (:import-from #:sento.test-utils + #:parametrized-test) + (:import-from #:ac + #:actor-of)) + +(in-package :sento.message-box-test) + +(def-suite message-box-tests + :description "message-box tests" + :in sento.tests:test-suite) + +(in-suite message-box-tests) + + +(defun wait-while-thread-will-die (msgbox &key (timeout 10)) + (let ((wait-until (+ (get-internal-real-time) (* timeout + internal-time-units-per-second)))) + (with-slots (queue-thread) + msgbox + (loop :while (bt2:thread-alive-p queue-thread) + :do (sleep 0.1) + (when (< wait-until + (get-internal-real-time)) + (error "Thread didn't die in ~A seconds." + timeout)))))) + + +(parametrized-test bt-box-resurrects-thread-after-abort-if-handler-catches-all-signals + ((withreply-p timeout) + (nil nil) + (t 1) + (t nil)) + + "Simulates a situation when error has happened during message processing, and ABORT restart was invoked. + Usually this kill a thread, but here we ensure that by the thread is resurrected when we submit a + subsequent message." + + (flet ((kill-by-restart-invoke (msg) + (declare (ignore msg)) + (handler-case + ;; This way we are simulating that the user choose + ;; an ABORT restart in the IDE during debug session: + (handler-bind ((serious-condition #'abort)) + (error "Die, thread, die!")) + ;; This part the same as error handling code in the + ;; SENTO.ACTOR-CELL:HANDLE-MESSAGE function: + ;; + ;; TODO: t was used to check if it is able to + ;; catch stack unwinding because of INVOKE-RESTART, + ;; but it can't. + (t (c) + (log:error "error condition was raised: ~%~a~%" + c) + (cons :handler-error c))))) + + (let ((box (make-instance 'message-box/bt + :name "foo"))) + (unwind-protect + (progn + (let ((first-reply + (submit box "The Message" + t + ;; Don't wait for result here, because we are + ;; intentionally raise error here and will never + ;; return a result: + nil + (list #'kill-by-restart-invoke)))) + (is (equal first-reply + 'no-result))) + + (wait-while-thread-will-die box) + + (is (not + (bt2:thread-alive-p + (slot-value box 'queue-thread)))) + + (let ((result (handler-case + (submit box "The Message" + withreply-p + timeout + (list (lambda (msg) + (reverse msg)))) + (ask-timeout () + :timeout)))) + + (cond + (withreply-p + (is (string= "egasseM ehT" result))) + (t + (is (eql result t))))) + + (is (bt2:thread-alive-p + (slot-value box 'queue-thread)))) + + ;; Cleanup a thread: + (stop box t))))) diff --git a/tests/test-utils.lisp b/tests/test-utils.lisp new file mode 100644 index 0000000..7d552ff --- /dev/null +++ b/tests/test-utils.lisp @@ -0,0 +1,77 @@ +(defpackage #:sento.test-utils + (:use #:cl) + (:import-from #:serapeum + #:eval-always) + (:import-from #:alexandria + #:parse-body) + (:export #:parametrized-test)) +(in-package #:sento.test-utils) + + + +(eval-always + (defun generate-test-form (base-test-name parameter-names parameters docstring body-form) + (let* ((test-name-str (format nil + "~A-[~{~A=~S~^ ~}]" + base-test-name + (loop :for name :in parameter-names + :for value :in parameters + :appending (list name value)))) + (test-name (intern test-name-str)) + (bindings (loop :for name :in parameter-names + :for value :in parameters + :collect (list name value)))) + `(5am:test ,test-name + ,docstring + (let ,bindings + ,@body-form))))) + + +(defmacro parametrized-test (name ((&rest parameter-names) &rest parameter-tuples) &body body) + "Generates a separate tests for each parameter combination. + + - NAME is the prefix for all tests in the group. The rest of each test name consists of parameters and values. + - PARAMETER-NAMES should be a list of symbolic names of variables to be bound during BODY execution. + - PARAMETER-TUPLES should be a list of lists of values to be bound to variables given in PARAMETER-NAMES. + + Example: + + (parametrized-test bt-box-test + ((withreply-p timeout) + (nil nil) + (t 1) + (t nil)) + + (do-something with-reply-p timeout)) + + This form will be expanded to the code which will remove all 5AM tests starting with BT-BOX-TEST- + and then will create 3 tests like this one: + + + (test |BT-BOX-TEST-[WITHREPLY-P=T TIMEOUT=1]| + (let ((withreply-p t) (timeout 1)) + (do-something with-reply-p timeout))) + + As you can see, this test binds WITHREPLY-P and TIMEOUT variables to a values given in the second row of PARAMETER-TUPLES. + + Name of each test will include parameter variables for this test. This way it will be easy to tell which parameter combination + fails. +" + (multiple-value-bind (forms decls docstring) + (parse-body body :documentation t :whole name) + (let* ((docstring (or docstring "")) + (body-forms (append decls forms))) + + (let ((tests (loop :for parameters :in parameter-tuples + :collect (generate-test-form name parameter-names parameters docstring body-forms)))) + `(progn + ;; If somebody has changed parameters, we need to remove obsolte tests from the 5AM test registry. + (loop :with prefix-to-search := ,(format nil "~A-" name) + :for candidate-name in (5am:test-names) + :for candidate-name-str := (symbol-name candidate-name) + :when (and (serapeum:length<= prefix-to-search candidate-name-str) + (string= (subseq candidate-name-str 0 (length prefix-to-search)) + prefix-to-search)) + :do (5am:rem-test candidate-name)) + ,@tests))))) +