Skip to content

Commit 46aae6b

Browse files
committed
Now message-box/bt class will restart thread if it was aborted because of a non-local exit from processing loop.
I've measured performance before these changes and after them and didn't notice significant changes. Results of the benchmark along with the baseline results are available in this gist: https://gist.github.com/svetlyak40wt/a1097ab7d501087ca366d5addb410ebe
1 parent 04a1e39 commit 46aae6b

File tree

5 files changed

+279
-27
lines changed

5 files changed

+279
-27
lines changed

bench.lisp

Lines changed: 56 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@
9292
(wait-if-queue-larger-than 10000 wait-if-queue-larger-than-given-p)
9393
(duration 10)
9494
(num-iterations 60)
95-
(load-threads 8))
95+
(load-threads 8)
96+
(time-out nil))
9697

9798
(log:config :warn)
9899

@@ -124,7 +125,8 @@
124125
:async-ask-p async-ask-p
125126
:num-shared-workers num-shared-workers
126127
:queue-size queue-size
127-
:wait-if-queue-larger-than wait-if-queue-larger-than))
128+
:wait-if-queue-larger-than wait-if-queue-larger-than
129+
:time-out time-out))
128130
(force-output)
129131

130132
(with-timing (num-iterations
@@ -255,33 +257,72 @@
255257

256258

257259
(defun run-all (&key
258-
(num-iterations 10)
259-
(duration 10))
260+
(num-iterations 60)
261+
(duration 10)
262+
(queue-size 100)
263+
(time-out 3)
264+
&aux (started-at (get-internal-real-time)))
260265
(run-benchmark :num-iterations num-iterations
261-
:duration duration)
266+
:duration duration
267+
:with-reply-p nil
268+
:async-ask-p nil)
262269

263-
(format t "Running ~A:~%" '(run-benchmark :with-reply-p t :async-ask-p nil))
264270
(run-benchmark :num-iterations num-iterations
265271
:duration duration
266-
:with-reply-p t :async-ask-p nil)
272+
:with-reply-p t
273+
:async-ask-p nil)
274+
275+
(run-benchmark :num-iterations num-iterations
276+
:duration duration
277+
:with-reply-p t
278+
:async-ask-p t)
279+
280+
281+
(format t "With queue size limited to ~A:~2%"
282+
queue-size)
267283

268-
(format t "Running ~A:~%" '(run-benchmark :with-reply-p t :async-ask-p t))
269284
(run-benchmark :num-iterations num-iterations
270285
:duration duration
271-
:with-reply-p t :async-ask-p t)
286+
:with-reply-p nil
287+
:async-ask-p nil
288+
:queue-size queue-size)
272289

273-
(format t "Running ~A:~%" '(run-benchmark :queue-size 100))
274290
(run-benchmark :num-iterations num-iterations
275291
:duration duration
276-
:queue-size 100)
292+
:with-reply-p t
293+
:async-ask-p nil
294+
:queue-size queue-size)
277295

278-
(format t "Running ~A:~%" '(run-benchmark :with-reply-p t :async-ask-p nil :queue-size 100))
279296
(run-benchmark :num-iterations num-iterations
280297
:duration duration
281-
:with-reply-p t :async-ask-p nil :queue-size 100)
298+
:with-reply-p t
299+
:async-ask-p t
300+
:queue-size queue-size)
301+
302+
303+
(format t "With time-out ~A:~2%"
304+
time-out)
282305

283-
(format t "Running ~A:~%" '(run-benchmark :with-reply-p t :async-ask-p t :queue-size 100))
284306
(run-benchmark :num-iterations num-iterations
285307
:duration duration
286-
:with-reply-p t :async-ask-p t :queue-size 100))
308+
:with-reply-p nil
309+
:async-ask-p nil
310+
;; This should not make sense for with-reply-p = nil
311+
:time-out time-out)
312+
313+
(run-benchmark :num-iterations num-iterations
314+
:duration duration
315+
:with-reply-p t
316+
:async-ask-p nil
317+
:time-out time-out)
318+
319+
(run-benchmark :num-iterations num-iterations
320+
:duration duration
321+
:with-reply-p t
322+
:async-ask-p t
323+
:time-out time-out)
324+
325+
(format t "All tests are performed in ~,2f seconds.~%"
326+
(/ (- (get-internal-real-time) started-at)
327+
internal-time-units-per-second)))
287328

sento.asd

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
:author "Manfred Bergmann"
6666
:depends-on ("sento"
6767
"fiveam"
68+
"serapeum"
6869
"lparallel"
6970
"cl-mock")
7071
:components ((:module "tests"
@@ -95,7 +96,9 @@
9596
(:file "actor-system-test")
9697
(:file "actor-tree-test")
9798
(:file "spawn-in-receive-test")
98-
)))
99+
(:file "test-utils")
100+
(:file "message-box-test"
101+
:depends-on ("test-utils")))))
99102
:description "Test system for sento"
100103
:perform (test-op (op c) (symbol-call :fiveam :run!
101104
(uiop:find-symbol* '#:test-suite

src/mbox/message-box.lisp

Lines changed: 60 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -113,18 +113,41 @@ This is used to break the environment possibly captured as closure at 'submit' s
113113
(defclass message-box/bt (message-box-base)
114114
((queue-thread :initform nil
115115
:documentation
116-
"The thread that pops queue items."))
116+
"The thread that pops queue items.")
117+
(thread-is-running-p :initform nil
118+
:type boolean
119+
:documentation
120+
"Will be set to NIL if processing loop will be broken because of an error or a restart invocation."))
117121
(:documentation
118122
"Bordeaux-Threads based message-box with a single thread operating on a message queue.
119123
This is used when the actor is created using a `:pinned` dispatcher type.
120124
There is a limit on the maximum number of actors/agents that can be created with
121125
this kind of queue because each message-box (and with that each actor) requires exactly one thread."))
122126

127+
128+
(declaim (ftype (function (message-box/bt &key (:thread-name (or null string)))
129+
(values &optional))
130+
start-thread))
131+
132+
(defun start-thread (msgbox &key thread-name)
133+
(with-slots (name queue-thread thread-is-running-p)
134+
msgbox
135+
(flet ((run-processing-loop ()
136+
(setf thread-is-running-p t)
137+
(unwind-protect
138+
(message-processing-loop msgbox)
139+
(setf thread-is-running-p
140+
nil))))
141+
(setf queue-thread
142+
(bt2:make-thread #'run-processing-loop
143+
:name (or thread-name
144+
(mkstr "message-thread-" name))))))
145+
(values))
146+
147+
123148
(defmethod initialize-instance :after ((self message-box/bt) &key)
124-
(with-slots (name queue-thread) self
125-
(setf queue-thread (bt2:make-thread
126-
(lambda () (message-processing-loop self))
127-
:name (mkstr "message-thread-" name))))
149+
(start-thread self)
150+
128151
(when (next-method-p)
129152
(call-next-method)))
130153

@@ -179,6 +202,24 @@ This function sets the result as `handler-result' in `item'. The return of this
179202
(bt2:condition-notify withreply-cvar)))
180203
(handler-fun)))))
181204

205+
206+
(declaim (ftype (function (message-box/bt)
207+
(values &optional))
208+
ensure-thread-is-running))
209+
210+
(defun ensure-thread-is-running (msgbox)
211+
(with-slots (queue-thread thread-is-running-p should-run)
212+
msgbox
213+
(when (and (not thread-is-running-p)
214+
should-run)
215+
;; Just to be sure that thread is not alive:
216+
(unless (bt2:thread-alive-p queue-thread)
217+
(let ((thread-name (bt2:thread-name queue-thread)))
218+
(log:warn "Restarting thread" thread-name)
219+
(start-thread msgbox
220+
:thread-name thread-name))))
221+
(values)))
222+
182223
(defmethod submit ((self message-box/bt) message withreply-p time-out handler-fun-args)
183224
"The `handler-fun-args` argument must contain a handler function as first list item.
184225
It will be apply'ed with the rest of the args when the message was 'popped' from queue."
@@ -200,15 +241,21 @@ It will be apply'ed with the rest of the args when the message was 'popped' from
200241
:time-out time-out
201242
:handler-fun-args handler-fun-args
202243
:handler-result 'no-result)))
203-
(log:trace "~a: withreply: waiting for arrival of result..." (name msgbox))
244+
204245
(bt2:with-lock-held (withreply-lock)
205246
(log:trace "~a: pushing item to queue: ~a" (name msgbox) push-item)
206247
(queue:pushq queue push-item)
207-
208-
(if time-out
209-
(wait-and-probe-for-msg-handler-result msgbox push-item)
210-
(bt2:condition-wait withreply-cvar withreply-lock)))
211-
248+
(ensure-thread-is-running msgbox)
249+
250+
(log:trace "~a: withreply: waiting for arrival of result..." (name msgbox))
251+
252+
(unless (bt2:condition-wait withreply-cvar withreply-lock
253+
:timeout time-out)
254+
(log:warn "~a: time-out elapsed but result not available yet!" (name msgbox))
255+
(setf (slot-value push-item 'cancelled-p) t)
256+
(error 'ask-timeout
257+
:wait-time time-out)))
258+
212259
(with-slots (handler-result) push-item
213260
(log:trace "~a: withreply: result should be available: ~a" (name msgbox) handler-result)
214261
handler-result)))
@@ -222,6 +269,7 @@ The submitting code has to await the side-effect and possibly handle a timeout."
222269
:handler-fun-args handler-fun-args)))
223270
(log:trace "~a: pushing item to queue: ~a" (name msgbox) push-item)
224271
(queue:pushq queue push-item)
272+
(ensure-thread-is-running msgbox)
225273
t))
226274

227275
(defmethod stop ((self message-box/bt) &optional (wait nil))
@@ -301,6 +349,7 @@ Returns the handler-result if `withreply-p' is eq to `T', otherwise the return i
301349
processed-messages
302350
dispatcher) self
303351
(incf processed-messages)
352+
304353
(let ((push-item (make-message-item/dp
305354
:message message
306355
:handler-fun-args handler-fun-args

tests/message-box-test.lisp

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
(defpackage :sento.message-box-test
2+
(:use :cl :fiveam :cl-mock :sento.actor :sento.future)
3+
(:shadow #:! #:?)
4+
(:import-from #:miscutils
5+
#:assert-cond
6+
#:await-cond
7+
#:filter)
8+
(:import-from #:timeutils
9+
#:ask-timeout)
10+
(:import-from #:sento.messageb
11+
#:message-box/bt
12+
#:submit
13+
#:no-result
14+
#:queue-thread
15+
#:stop)
16+
(:import-from #:sento.test-utils
17+
#:parametrized-test)
18+
(:import-from #:ac
19+
#:actor-of))
20+
21+
(in-package :sento.message-box-test)
22+
23+
(def-suite message-box-tests
24+
:description "message-box tests"
25+
:in sento.tests:test-suite)
26+
27+
(in-suite message-box-tests)
28+
29+
30+
(defun wait-while-thread-will-die (msgbox &key (timeout 10))
31+
(let ((wait-until (+ (get-internal-real-time) (* timeout
32+
internal-time-units-per-second))))
33+
(with-slots (queue-thread)
34+
msgbox
35+
(loop :while (bt2:thread-alive-p queue-thread)
36+
:do (sleep 0.1)
37+
(when (< wait-until
38+
(get-internal-real-time))
39+
(error "Thread didn't die in ~A seconds."
40+
timeout))))))
41+
42+
43+
(parametrized-test bt-box-resurrects-thread-after-abort-if-handler-catches-all-signals
44+
((withreply-p timeout)
45+
(nil nil)
46+
(t 1)
47+
(t nil))
48+
49+
"Simulates a situation when error has happened during message processing, and ABORT restart was invoked.
50+
Usually this kill a thread, but here we ensure that by the thread is resurrected when we submit a
51+
subsequent message."
52+
53+
(flet ((kill-by-restart-invoke (msg)
54+
(declare (ignore msg))
55+
(handler-case
56+
;; This way we are simulating that the user choose
57+
;; an ABORT restart in the IDE during debug session:
58+
(handler-bind ((serious-condition #'abort))
59+
(error "Die, thread, die!"))
60+
;; This part the same as error handling code in the
61+
;; SENTO.ACTOR-CELL:HANDLE-MESSAGE function:
62+
;;
63+
;; TODO: t was used to check if it is able to
64+
;; catch stack unwinding because of INVOKE-RESTART,
65+
;; but it can't.
66+
(t (c)
67+
(log:error "error condition was raised: ~%~a~%"
68+
c)
69+
(cons :handler-error c)))))
70+
71+
(let ((box (make-instance 'message-box/bt
72+
:name "foo")))
73+
(unwind-protect
74+
(progn
75+
(let ((first-reply
76+
(submit box "The Message"
77+
t
78+
;; Don't wait for result here, because we are
79+
;; intentionally raise error here and will never
80+
;; return a result:
81+
nil
82+
(list #'kill-by-restart-invoke))))
83+
(is (equal first-reply
84+
'no-result)))
85+
86+
(wait-while-thread-will-die box)
87+
88+
(is (not
89+
(bt2:thread-alive-p
90+
(slot-value box 'queue-thread))))
91+
92+
(let ((result (handler-case
93+
(submit box "The Message"
94+
withreply-p
95+
timeout
96+
(list (lambda (msg)
97+
(reverse msg))))
98+
(ask-timeout ()
99+
:timeout))))
100+
101+
(cond
102+
(withreply-p
103+
(is (string= "egasseM ehT" result)))
104+
(t
105+
(is (eql result t)))))
106+
107+
(is (bt2:thread-alive-p
108+
(slot-value box 'queue-thread))))
109+
110+
;; Cleanup a thread:
111+
(stop box t)))))

0 commit comments

Comments
 (0)