diff --git a/sento.asd b/sento.asd index 14ef62f..a7f3097 100644 --- a/sento.asd +++ b/sento.asd @@ -94,6 +94,7 @@ (:file "actor-system-test") (:file "actor-tree-test") (:file "spawn-in-receive-test") + (:file "message-box-test") ))) :description "Test system for sento" :perform (test-op (op c) (symbol-call :fiveam :run! diff --git a/src/mbox/message-box.lisp b/src/mbox/message-box.lisp index aaa2a16..c1c6c2c 100644 --- a/src/mbox/message-box.lisp +++ b/src/mbox/message-box.lisp @@ -113,18 +113,37 @@ This is used to break the environment possibly captured as closure at 'submit' s (defclass message-box/bt (message-box-base) ((queue-thread :initform nil :documentation - "The thread that pops queue items.")) + "The thread that pops queue items.") + (thread-lock :initform (bt2:make-lock) + :documentation + "A lock which should be taken when queue-thread slot is set.")) (:documentation "Bordeaux-Threads based message-box with a single thread operating on a message queue. This is used when the actor is created using a `:pinned` dispatcher type. There is a limit on the maximum number of actors/agents that can be created with this kind of queue because each message-box (and with that each actor) requires exactly one thread.")) + +(declaim (ftype (function (message-box/bt &key (:thread-name (or null string))) + (values &optional)) + start-thread)) + +(defun start-thread (msgbox &key thread-name) + (with-slots (name queue-thread) + msgbox + (flet ((run-processing-loop () + (message-processing-loop msgbox))) + (setf queue-thread + (bt2:make-thread + #'run-processing-loop + :name (or thread-name + (mkstr "message-thread-" name)))))) + (values)) + + (defmethod initialize-instance :after ((self message-box/bt) &key) - (with-slots (name queue-thread) self - (setf queue-thread (bt2:make-thread - (lambda () (message-processing-loop self)) - :name (mkstr "message-thread-" name)))) + (start-thread self) + (when (next-method-p) (call-next-method))) @@ -179,6 +198,23 @@ This function sets the result as `handler-result' in `item'. The return of this (bt2:condition-notify withreply-cvar))) (handler-fun))))) + +(declaim (ftype (function (message-box/bt) + (values &optional)) + ensure-thread-is-running)) + +(defun ensure-thread-is-running (msgbox) + (with-slots (queue-thread thread-lock) + msgbox + (bt2:with-lock-held (thread-lock) + (unless (bt2:thread-alive-p queue-thread) + (log:trace "Restarting thread ~A" + (bt2:thread-name queue-thread)) + (start-thread msgbox + :thread-name (bt2:thread-name queue-thread))) + (values)))) + + (defmethod submit ((self message-box/bt) message withreply-p time-out handler-fun-args) "The `handler-fun-args` argument must contain a handler function as first list item. It will be apply'ed with the rest of the args when the message was 'popped' from queue." @@ -200,14 +236,26 @@ It will be apply'ed with the rest of the args when the message was 'popped' from :time-out time-out :handler-fun-args handler-fun-args :handler-result 'no-result))) - (log:trace "~a: withreply: waiting for arrival of result..." (name msgbox)) - (bt2:with-lock-held (withreply-lock) - (log:trace "~a: pushing item to queue: ~a" (name msgbox) push-item) - (queue:pushq queue push-item) - - (if time-out - (wait-and-probe-for-msg-handler-result msgbox push-item) - (bt2:condition-wait withreply-cvar withreply-lock))) + (cond + (time-out + (bt2:with-lock-held (withreply-lock) + (log:trace "~a: pushing item to queue: ~a" (name msgbox) push-item) + (queue:pushq queue push-item) + (ensure-thread-is-running msgbox)) + + ;; It is important to leave lock withreply-lock + ;; before we will wait for result. Otherwisee handler-fun + ;; will not be able to do it's job: + (log:trace "~a: withreply: waiting for arrival of result..." (name msgbox)) + (wait-and-probe-for-msg-handler-result msgbox push-item)) + (t + (bt2:with-lock-held (withreply-lock) + (log:trace "~a: pushing item to queue: ~a" (name msgbox) push-item) + (queue:pushq queue push-item) + (ensure-thread-is-running msgbox) + + (log:trace "~a: withreply: waiting for arrival of result..." (name msgbox)) + (bt2:condition-wait withreply-cvar withreply-lock)))) (with-slots (handler-result) push-item (log:trace "~a: withreply: result should be available: ~a" (name msgbox) handler-result) @@ -301,6 +349,7 @@ Returns the handler-result if `withreply-p' is eq to `T', otherwise the return i processed-messages dispatcher) self (incf processed-messages) + (let ((push-item (make-message-item/dp :message message :handler-fun-args handler-fun-args diff --git a/tests/message-box-test.lisp b/tests/message-box-test.lisp new file mode 100644 index 0000000..ddd755c --- /dev/null +++ b/tests/message-box-test.lisp @@ -0,0 +1,121 @@ +(defpackage :sento.message-box-test + (:use :cl :fiveam :cl-mock :sento.actor :sento.future) + (:shadow #:! #:?) + (:import-from #:miscutils + #:assert-cond + #:await-cond + #:filter) + (:import-from #:timeutils + #:ask-timeout) + (:import-from #:sento.messageb + #:message-box/bt + #:submit + #:no-result + #:queue-thread + #:stop) + (:import-from #:ac + #:actor-of)) + +(in-package :sento.message-box-test) + +(def-suite message-box-tests + :description "message-box tests" + :in sento.tests:test-suite) + +(in-suite message-box-tests) + + +(defun wait-while-thread-will-die (msgbox &key (timeout 10)) + (let ((wait-until (+ (get-internal-real-time) (* timeout + internal-time-units-per-second)))) + (with-slots (queue-thread) + msgbox + (loop while (bt2:thread-alive-p queue-thread) + do (sleep 0.1) + (when (< wait-until + (get-internal-real-time)) + (error "Thread didn't die in ~A seconds." + timeout)))))) + + +(test bt-box-resurrects-thread-after-error + "Tests that if an error happends during message processing, a thread will remain running." + + (let ((box (make-instance 'message-box/bt + :name "foo"))) + (unwind-protect + (progn + (let ((first-reply + (submit box "The Message" + t + ;; Don't wait for result here, because we are + ;; intentionally raise error here and will never + ;; return a result: + nil + (list (lambda (msg) + (declare (ignore msg)) + (handler-bind ((serious-condition #'abort)) + (error "Die, thread, die!"))))))) + (is (equal first-reply + 'no-result))) + + (wait-while-thread-will-die box) + + (let ((result (handler-case + (submit box "The Message" t 1 + (list (lambda (msg) + (reverse msg)))) + (ask-timeout () + :timeout)))) + (is (string= "egasseM ehT" result)))) + + ;; Cleanup a thread: + (stop box t)))) + + +(test bt-box-resurrects-thread-after-abort-if-handler-catches-all-signals + "Tests that if an error happends during message processing, a thread will remain running." + + (let ((box (make-instance 'message-box/bt + :name "foo"))) + (unwind-protect + (progn + (let ((first-reply + (submit box "The Message" + t + ;; Don't wait for result here, because we are + ;; intentionally raise error here and will never + ;; return a result: + nil + (list (lambda (msg) + (declare (ignore msg)) + (handler-case + ;; This way we are simulating that the user choose + ;; an ABORT restart in the IDE during debug session: + (handler-bind ((serious-condition #'abort)) + (error "Die, thread, die!")) + ;; This part the same as error handling code in the + ;; SENTO.ACTOR-CELL:HANDLE-MESSAGE function: + ;; + ;; TODO: t was used to check if it is able to + ;; catch stack unwinding because of INVOKE-RESTART, + ;; but it cant. + (t (c) + (log:error "error condition was raised: ~%~a~%" + c) + (cons :handler-error c)))))))) + (is (equal first-reply + 'no-result))) + + (wait-while-thread-will-die box) + + (let ((result (handler-case + (submit box "The Message" t 1 + (list (lambda (msg) + (reverse msg)))) + (ask-timeout () + :timeout)))) + (is (string= "egasseM ehT" result)))) + + ;; Cleanup a thread: + (stop box t))))