From 4eecb9bc3e4d0952ca4f54585d08a077cbd1f3c3 Mon Sep 17 00:00:00 2001 From: Alexander Artemenko Date: Wed, 15 Jan 2025 01:08:31 +0000 Subject: [PATCH 1/4] Now message-box/bt class will restart thresad if it was aborted because of an error. --- sento.asd | 1 + src/mbox/message-box.lisp | 75 ++++++++++++++++++++++++++++++------- tests/message-box-test.lisp | 72 +++++++++++++++++++++++++++++++++++ 3 files changed, 135 insertions(+), 13 deletions(-) create mode 100644 tests/message-box-test.lisp diff --git a/sento.asd b/sento.asd index 14ef62f..a7f3097 100644 --- a/sento.asd +++ b/sento.asd @@ -94,6 +94,7 @@ (:file "actor-system-test") (:file "actor-tree-test") (:file "spawn-in-receive-test") + (:file "message-box-test") ))) :description "Test system for sento" :perform (test-op (op c) (symbol-call :fiveam :run! diff --git a/src/mbox/message-box.lisp b/src/mbox/message-box.lisp index aaa2a16..3b7e072 100644 --- a/src/mbox/message-box.lisp +++ b/src/mbox/message-box.lisp @@ -113,18 +113,37 @@ This is used to break the environment possibly captured as closure at 'submit' s (defclass message-box/bt (message-box-base) ((queue-thread :initform nil :documentation - "The thread that pops queue items.")) + "The thread that pops queue items.") + (thread-lock :initform (bt2:make-lock) + :documentation + "A lock which should be taken when queue-thread slot is set.")) (:documentation "Bordeaux-Threads based message-box with a single thread operating on a message queue. This is used when the actor is created using a `:pinned` dispatcher type. There is a limit on the maximum number of actors/agents that can be created with this kind of queue because each message-box (and with that each actor) requires exactly one thread.")) + +(declaim (ftype (function (message-box/bt &key (:thread-name (or null string))) + (values &optional)) + start-thread)) + +(defun start-thread (msgbox &key thread-name) + (with-slots (name queue-thread) + msgbox + (flet ((run-processing-loop () + (message-processing-loop msgbox))) + (setf queue-thread + (bt2:make-thread + #'run-processing-loop + :name (or thread-name + (mkstr "message-thread-" name)))))) + (values)) + + (defmethod initialize-instance :after ((self message-box/bt) &key) - (with-slots (name queue-thread) self - (setf queue-thread (bt2:make-thread - (lambda () (message-processing-loop self)) - :name (mkstr "message-thread-" name)))) + (start-thread self) + (when (next-method-p) (call-next-method))) @@ -179,6 +198,23 @@ This function sets the result as `handler-result' in `item'. The return of this (bt2:condition-notify withreply-cvar))) (handler-fun))))) + +(declaim (ftype (function (message-box/bt) + (values &optional)) + ensure-thread-is-running)) + +(defun ensure-thread-is-running (msgbox) + (with-slots (queue-thread thread-lock) + msgbox + (bt2:with-lock-held (thread-lock) + (unless (bt2:thread-alive-p queue-thread) + (log:trace "Restarting thread ~A" + (bt2:thread-name queue-thread)) + (start-thread msgbox + :thread-name (bt2:thread-name queue-thread))) + (values)))) + + (defmethod submit ((self message-box/bt) message withreply-p time-out handler-fun-args) "The `handler-fun-args` argument must contain a handler function as first list item. It will be apply'ed with the rest of the args when the message was 'popped' from queue." @@ -200,14 +236,26 @@ It will be apply'ed with the rest of the args when the message was 'popped' from :time-out time-out :handler-fun-args handler-fun-args :handler-result 'no-result))) - (log:trace "~a: withreply: waiting for arrival of result..." (name msgbox)) - (bt2:with-lock-held (withreply-lock) - (log:trace "~a: pushing item to queue: ~a" (name msgbox) push-item) - (queue:pushq queue push-item) - - (if time-out - (wait-and-probe-for-msg-handler-result msgbox push-item) - (bt2:condition-wait withreply-cvar withreply-lock))) + (cond + (time-out + (bt2:with-lock-held (withreply-lock) + (log:trace "~a: pushing item to queue: ~a" (name msgbox) push-item) + (queue:pushq queue push-item) + (ensure-thread-is-running msgbox)) + + ;; It is important to leave lock withreply-lock + ;; before we will wait for result. Otherwisee handler-fun + ;; will not be able to do it's job: + (log:trace "~a: withreply: waiting for arrival of result..." (name msgbox)) + (wait-and-probe-for-msg-handler-result msgbox push-item)) + (t + (bt2:with-lock-held (withreply-lock) + (log:trace "~a: pushing item to queue: ~a" (name msgbox) push-item) + (queue:pushq queue push-item) + (ensure-thread-is-running msgbox) + + (log:trace "~a: withreply: waiting for arrival of result..." (name msgbox)) + (bt2:condition-wait withreply-cvar withreply-lock)))) (with-slots (handler-result) push-item (log:trace "~a: withreply: result should be available: ~a" (name msgbox) handler-result) @@ -310,6 +358,7 @@ Returns the handler-result if `withreply-p' is eq to `T', otherwise the return i (log:debug "~a: enqueuing... withreply-p: ~a, time-out: ~a, message: ~a" (name self) withreply-p time-out message) (pushq queue push-item) + (ensure-thread-is-running self) (if withreply-p (dispatch/reply self push-item dispatcher dispatcher-fun-args time-out) diff --git a/tests/message-box-test.lisp b/tests/message-box-test.lisp new file mode 100644 index 0000000..b821b56 --- /dev/null +++ b/tests/message-box-test.lisp @@ -0,0 +1,72 @@ +(defpackage :sento.message-box-test + (:use :cl :fiveam :cl-mock :sento.actor :sento.future) + (:shadow #:! #:?) + (:import-from #:miscutils + #:assert-cond + #:await-cond + #:filter) + (:import-from #:timeutils + #:ask-timeout) + (:import-from #:sento.messageb + #:message-box/bt + #:submit + #:no-result + #:queue-thread + #:stop) + (:import-from #:ac + #:actor-of)) + +(in-package :sento.message-box-test) + +(def-suite message-box-tests + :description "message-box tests" + :in sento.tests:test-suite) + +(in-suite message-box-tests) + + +(defun wait-while-thread-will-die (msgbox &key (timeout 10)) + (let ((wait-until (+ (get-internal-real-time) (* timeout + internal-time-units-per-second)))) + (with-slots (queue-thread) + msgbox + (loop while (bt2:thread-alive-p queue-thread) + do (sleep 0.1) + (when (< wait-until + (get-internal-real-time)) + (error "Thread didn't die in ~A seconds." + timeout)))))) + + +(test bt-box-resurrects-thread-after-error + "Tests that if an error happends during message processing, a thread will remain running." + + (let ((box (make-instance 'message-box/bt + :name "foo"))) + (unwind-protect + (progn + (let ((first-reply + (submit box "The Message" + t + ;; Don't wait for result here, because we are + ;; intentionally raise error here and will never + ;; return a result: + nil + (list (lambda (msg) + (handler-bind ((serious-condition #'abort)) + (error "Die, thread, die!"))))))) + (is (equal first-reply + 'no-result))) + + (wait-while-thread-will-die box) + + (let ((result (handler-case + (submit box "The Message" t 1 + (list (lambda (msg) + (reverse msg)))) + (ask-timeout () + :timeout)))) + (is (string= "egasseM ehT" result)))) + + ;; Cleanup a thread: + (stop box t)))) From a40221ecd4bf90dc2c30c4835e103c0fd7eef0bb Mon Sep 17 00:00:00 2001 From: Alexander Artemenko Date: Wed, 15 Jan 2025 08:21:19 +0000 Subject: [PATCH 2/4] Added new test to check if error-handler could help. --- src/mbox/message-box.lisp | 7 +++--- tests/message-box-test.lisp | 49 +++++++++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 3 deletions(-) diff --git a/src/mbox/message-box.lisp b/src/mbox/message-box.lisp index 3b7e072..07a62cc 100644 --- a/src/mbox/message-box.lisp +++ b/src/mbox/message-box.lisp @@ -241,7 +241,8 @@ It will be apply'ed with the rest of the args when the message was 'popped' from (bt2:with-lock-held (withreply-lock) (log:trace "~a: pushing item to queue: ~a" (name msgbox) push-item) (queue:pushq queue push-item) - (ensure-thread-is-running msgbox)) + ;; (ensure-thread-is-running msgbox) + ) ;; It is important to leave lock withreply-lock ;; before we will wait for result. Otherwisee handler-fun @@ -252,7 +253,7 @@ It will be apply'ed with the rest of the args when the message was 'popped' from (bt2:with-lock-held (withreply-lock) (log:trace "~a: pushing item to queue: ~a" (name msgbox) push-item) (queue:pushq queue push-item) - (ensure-thread-is-running msgbox) + ;; (ensure-thread-is-running msgbox) (log:trace "~a: withreply: waiting for arrival of result..." (name msgbox)) (bt2:condition-wait withreply-cvar withreply-lock)))) @@ -358,7 +359,7 @@ Returns the handler-result if `withreply-p' is eq to `T', otherwise the return i (log:debug "~a: enqueuing... withreply-p: ~a, time-out: ~a, message: ~a" (name self) withreply-p time-out message) (pushq queue push-item) - (ensure-thread-is-running self) + ;; (ensure-thread-is-running self) (if withreply-p (dispatch/reply self push-item dispatcher dispatcher-fun-args time-out) diff --git a/tests/message-box-test.lisp b/tests/message-box-test.lisp index b821b56..ddd755c 100644 --- a/tests/message-box-test.lisp +++ b/tests/message-box-test.lisp @@ -53,6 +53,7 @@ ;; return a result: nil (list (lambda (msg) + (declare (ignore msg)) (handler-bind ((serious-condition #'abort)) (error "Die, thread, die!"))))))) (is (equal first-reply @@ -70,3 +71,51 @@ ;; Cleanup a thread: (stop box t)))) + + +(test bt-box-resurrects-thread-after-abort-if-handler-catches-all-signals + "Tests that if an error happends during message processing, a thread will remain running." + + (let ((box (make-instance 'message-box/bt + :name "foo"))) + (unwind-protect + (progn + (let ((first-reply + (submit box "The Message" + t + ;; Don't wait for result here, because we are + ;; intentionally raise error here and will never + ;; return a result: + nil + (list (lambda (msg) + (declare (ignore msg)) + (handler-case + ;; This way we are simulating that the user choose + ;; an ABORT restart in the IDE during debug session: + (handler-bind ((serious-condition #'abort)) + (error "Die, thread, die!")) + ;; This part the same as error handling code in the + ;; SENTO.ACTOR-CELL:HANDLE-MESSAGE function: + ;; + ;; TODO: t was used to check if it is able to + ;; catch stack unwinding because of INVOKE-RESTART, + ;; but it cant. + (t (c) + (log:error "error condition was raised: ~%~a~%" + c) + (cons :handler-error c)))))))) + (is (equal first-reply + 'no-result))) + + (wait-while-thread-will-die box) + + (let ((result (handler-case + (submit box "The Message" t 1 + (list (lambda (msg) + (reverse msg)))) + (ask-timeout () + :timeout)))) + (is (string= "egasseM ehT" result)))) + + ;; Cleanup a thread: + (stop box t)))) From 3df36a79ccd6a3dad93b45afcf47783b4ad56187 Mon Sep 17 00:00:00 2001 From: Alexander Artemenko Date: Wed, 15 Jan 2025 08:51:39 +0000 Subject: [PATCH 3/4] Removed ensure-thread-is-running for DP box. --- src/mbox/message-box.lisp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mbox/message-box.lisp b/src/mbox/message-box.lisp index 07a62cc..6fea15c 100644 --- a/src/mbox/message-box.lisp +++ b/src/mbox/message-box.lisp @@ -350,6 +350,7 @@ Returns the handler-result if `withreply-p' is eq to `T', otherwise the return i processed-messages dispatcher) self (incf processed-messages) + (let ((push-item (make-message-item/dp :message message :handler-fun-args handler-fun-args @@ -359,7 +360,6 @@ Returns the handler-result if `withreply-p' is eq to `T', otherwise the return i (log:debug "~a: enqueuing... withreply-p: ~a, time-out: ~a, message: ~a" (name self) withreply-p time-out message) (pushq queue push-item) - ;; (ensure-thread-is-running self) (if withreply-p (dispatch/reply self push-item dispatcher dispatcher-fun-args time-out) From da61abf7a15e3bf67f93674cf80de0a902009a43 Mon Sep 17 00:00:00 2001 From: Alexander Artemenko Date: Wed, 15 Jan 2025 08:59:48 +0000 Subject: [PATCH 4/4] Returned ensure thread is running. --- src/mbox/message-box.lisp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/mbox/message-box.lisp b/src/mbox/message-box.lisp index 6fea15c..c1c6c2c 100644 --- a/src/mbox/message-box.lisp +++ b/src/mbox/message-box.lisp @@ -241,8 +241,7 @@ It will be apply'ed with the rest of the args when the message was 'popped' from (bt2:with-lock-held (withreply-lock) (log:trace "~a: pushing item to queue: ~a" (name msgbox) push-item) (queue:pushq queue push-item) - ;; (ensure-thread-is-running msgbox) - ) + (ensure-thread-is-running msgbox)) ;; It is important to leave lock withreply-lock ;; before we will wait for result. Otherwisee handler-fun @@ -253,7 +252,7 @@ It will be apply'ed with the rest of the args when the message was 'popped' from (bt2:with-lock-held (withreply-lock) (log:trace "~a: pushing item to queue: ~a" (name msgbox) push-item) (queue:pushq queue push-item) - ;; (ensure-thread-is-running msgbox) + (ensure-thread-is-running msgbox) (log:trace "~a: withreply: waiting for arrival of result..." (name msgbox)) (bt2:condition-wait withreply-cvar withreply-lock))))