Skip to content

Commit 3c377c5

Browse files
committed
Add support for timeout notification callbacks.
When building an application (such as a server) that uses multiple messagers, managing calls to curvecpr_messager_next_timeout() can be tedious. This change introduces an optional callback-based mechanism for handling updates to timeouts. Also, there's a CHANGELOG now.
1 parent c8375da commit 3c377c5

10 files changed

+166
-5
lines changed

CHANGELOG.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
## v0.1.2
2+
3+
* Add support for a custom callback to receive timeouts for the messager,
4+
eliminating the need to repeatedly poll every messager instance
5+
(particularly useful in servers).
6+
* Increment the major component of the shared library version due to ABI
7+
compatibility break.
8+
9+
## v0.1.1
10+
11+
* Include `check_extras.h` in `EXTRA_DIST` so `make check` executes correctly.
12+
13+
## v0.1.0
14+
15+
* Initial release.

configure.ac

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ AC_CONFIG_HEADER([config.h])
77
AC_CONFIG_MACRO_DIR([m4])
88

99
# Library version.
10-
CURVECPR_LIBRARY_VERSION=1:0:0
10+
CURVECPR_LIBRARY_VERSION=2:0:0
1111
AC_SUBST(CURVECPR_LIBRARY_VERSION)
1212

1313
# Checks for programs.

libcurvecpr/include/curvecpr/messager.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ struct curvecpr_messager_ops {
2828
int (*recvmarkq_remove_range)(struct curvecpr_messager *messager, unsigned long long start, unsigned long long end);
2929

3030
int (*send)(struct curvecpr_messager *messager, const unsigned char *buf, size_t num);
31+
32+
void (*put_next_timeout)(struct curvecpr_messager *messager, const long long timeout_ns);
3133
};
3234

3335
struct curvecpr_messager_cf {

libcurvecpr/lib/messager.c

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ void curvecpr_messager_new (struct curvecpr_messager *messager, const struct cur
6161
/* If we're in client mode, initiate packets have a maximum size of 512 bytes.
6262
Otherwise, we're in server mode, and we can start at 1024. */
6363
messager->my_maximum_send_bytes = client ? 512 : 1024;
64+
65+
/* Fire off initial timeout notification. */
66+
curvecpr_messager_next_timeout(messager);
6467
}
6568

6669
int curvecpr_messager_recv (struct curvecpr_messager *messager, const unsigned char *buf, size_t num)
@@ -197,6 +200,9 @@ int curvecpr_messager_recv (struct curvecpr_messager *messager, const unsigned c
197200
}
198201
}
199202

203+
/* Update timeout (if callback defined). */
204+
curvecpr_messager_next_timeout(messager);
205+
200206
/* Update acknowledgment information (but only if this isn't a pure
201207
acknowledgment). */
202208
if (id) {
@@ -418,6 +424,9 @@ static int _send_block (struct curvecpr_messager *messager, struct curvecpr_bloc
418424
/* Reset last received ID so we don't acknowledge an old message. */
419425
messager->their_sent_id = 0;
420426

427+
/* Update timeout (if callback defined). */
428+
curvecpr_messager_next_timeout(messager);
429+
421430
return 0;
422431
}
423432

@@ -479,7 +488,7 @@ long long curvecpr_messager_next_timeout (struct curvecpr_messager *messager)
479488

480489
struct curvecpr_block *block = NULL;
481490

482-
long long at;
491+
long long at, timeout;
483492

484493
curvecpr_chicago_refresh_clock(chicago);
485494

@@ -505,7 +514,12 @@ long long curvecpr_messager_next_timeout (struct curvecpr_messager *messager)
505514
/* If the current time is after the next action time, the timeout is 0. However, we
506515
always have at least a 1 millisecond timeout to prevent the CPU from spinning. */
507516
if (chicago->clock > at)
508-
return 1000000;
517+
timeout = 1000000;
509518
else
510-
return at - chicago->clock + 1000000;
519+
timeout = at - chicago->clock + 1000000;
520+
521+
if (cf->ops.put_next_timeout)
522+
cf->ops.put_next_timeout(messager, timeout);
523+
524+
return timeout;
511525
}

libcurvecpr/test/Makefile.am

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ messager_test_recv_requests_removal_from_sendmarkq_SOURCES = messager/test_recv_
1515
check_PROGRAMS += messager/test_send_with_1_failure_moves_message_from_sendq
1616
messager_test_send_with_1_failure_moves_message_from_sendq_SOURCES = messager/test_send_with_1_failure_moves_message_from_sendq.c
1717

18+
check_PROGRAMS += messager/test_timeout_callback_fires
19+
messager_test_timeout_callback_fires_SOURCES = messager/test_timeout_callback_fires.c
20+
1821
check_PROGRAMS += util/test_nanoseconds
1922
util_test_nanoseconds_SOURCES = util/test_nanoseconds.c
2023

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
/test_new_configures_object
22
/test_recv_requests_removal_from_sendmarkq
33
/test_send_with_1_failure_moves_message_from_sendq
4+
/test_timeout_callback_fires

libcurvecpr/test/messager/test_new_configures_object.c

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,32 @@
55

66
static char static_priv[] = "Hello!";
77

8+
static unsigned char t_sendmarkq_is_full (struct curvecpr_messager *messager)
9+
{
10+
return 0;
11+
}
12+
13+
static unsigned char t_sendq_is_empty (struct curvecpr_messager *messager)
14+
{
15+
return 1;
16+
}
17+
18+
static int t_sendmarkq_head (struct curvecpr_messager *messager, struct curvecpr_block **block_stored)
19+
{
20+
return 1;
21+
}
22+
823
START_TEST (test_new_configures_object)
924
{
1025
struct curvecpr_messager messager;
11-
struct curvecpr_messager_cf cf = { .priv = static_priv };
26+
struct curvecpr_messager_cf cf = {
27+
.ops = {
28+
.sendmarkq_is_full = t_sendmarkq_is_full,
29+
.sendq_is_empty = t_sendq_is_empty,
30+
.sendmarkq_head = t_sendmarkq_head
31+
},
32+
.priv = static_priv
33+
};
1234

1335
curvecpr_messager_new(&messager, &cf, 1);
1436

libcurvecpr/test/messager/test_recv_requests_removal_from_sendmarkq.c

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,21 @@
55

66
#include <curvecpr/bytes.h>
77

8+
static unsigned char t_sendmarkq_is_full (struct curvecpr_messager *messager)
9+
{
10+
return 0;
11+
}
12+
13+
static unsigned char t_sendq_is_empty (struct curvecpr_messager *messager)
14+
{
15+
return 1;
16+
}
17+
18+
static int t_sendmarkq_head (struct curvecpr_messager *messager, struct curvecpr_block **block_stored)
19+
{
20+
return 1;
21+
}
22+
823
static int t_sendmarkq_remove_range (struct curvecpr_messager *messager, unsigned long long start, unsigned long long end)
924
{
1025
fail_unless(start == 0);
@@ -18,6 +33,9 @@ START_TEST (test_recv_requests_removal_from_sendmarkq)
1833
struct curvecpr_messager messager;
1934
struct curvecpr_messager_cf cf = {
2035
.ops = {
36+
.sendmarkq_is_full = t_sendmarkq_is_full,
37+
.sendq_is_empty = t_sendq_is_empty,
38+
.sendmarkq_head = t_sendmarkq_head,
2139
.sendmarkq_remove_range = t_sendmarkq_remove_range
2240
}
2341
};

libcurvecpr/test/messager/test_send_with_1_failure_moves_message_from_sendq.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ START_TEST (test_send_with_1_failure_moves_message_from_sendq)
6767
struct curvecpr_messager messager;
6868
struct curvecpr_messager_cf cf = {
6969
.ops = {
70+
.sendq_is_empty = t_q_is_empty,
7071
.sendmarkq_is_full = t_q_is_full,
7172
.recvmarkq_is_empty = t_q_is_empty,
7273
.recvmarkq_get_nth_unacknowledged = t_recvmarkq_get_nth_unacknowledged,
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
#include <check.h>
2+
#include <check_extras.h>
3+
4+
#include <curvecpr/messager.h>
5+
6+
long long test_timeout = -1LL;
7+
8+
static struct curvecpr_block static_block = {
9+
.id = 0,
10+
.clock = 0,
11+
.eof = CURVECPR_BLOCK_STREAM,
12+
.data_len = 7,
13+
.data = "Hello!"
14+
};
15+
16+
static unsigned char t_q_is_full (struct curvecpr_messager *messager)
17+
{
18+
return 0;
19+
}
20+
21+
static unsigned char t_q_is_empty (struct curvecpr_messager *messager)
22+
{
23+
return 1;
24+
}
25+
26+
static int t_recvmarkq_get_nth_unacknowledged (struct curvecpr_messager *messager, unsigned int n, struct curvecpr_block **block_stored)
27+
{
28+
return 1;
29+
}
30+
31+
static int t_sendmarkq_head (struct curvecpr_messager *messager, struct curvecpr_block **block_stored)
32+
{
33+
return 1;
34+
}
35+
36+
static int t_sendq_head (struct curvecpr_messager *messager, struct curvecpr_block **block_stored)
37+
{
38+
*block_stored = &static_block;
39+
return 0;
40+
}
41+
42+
static int t_send (struct curvecpr_messager *messager, const unsigned char *buf, size_t num)
43+
{
44+
return 0;
45+
}
46+
47+
static int t_sendq_move_to_sendmarkq (struct curvecpr_messager *messager, const struct curvecpr_block *block, struct curvecpr_block **block_stored)
48+
{
49+
return 0;
50+
}
51+
52+
static void t_put_next_timeout (struct curvecpr_messager *messager, long long timeout)
53+
{
54+
test_timeout = timeout;
55+
}
56+
57+
START_TEST (test_timeout_callback_fires)
58+
{
59+
struct curvecpr_messager messager;
60+
struct curvecpr_messager_cf cf = {
61+
.ops = {
62+
.sendq_is_empty = t_q_is_empty,
63+
.sendmarkq_is_full = t_q_is_full,
64+
.recvmarkq_is_empty = t_q_is_empty,
65+
.recvmarkq_get_nth_unacknowledged = t_recvmarkq_get_nth_unacknowledged,
66+
.sendmarkq_head = t_sendmarkq_head,
67+
.sendq_head = t_sendq_head,
68+
.send = t_send,
69+
.sendq_move_to_sendmarkq = t_sendq_move_to_sendmarkq,
70+
.put_next_timeout = t_put_next_timeout
71+
}
72+
};
73+
74+
curvecpr_messager_new(&messager, &cf, 1);
75+
76+
fail_unless(test_timeout >= 0);
77+
test_timeout = -1LL;
78+
79+
curvecpr_messager_process_sendq(&messager);
80+
81+
fail_unless(test_timeout >= 0);
82+
}
83+
END_TEST
84+
85+
RUN_TEST (test_timeout_callback_fires)

0 commit comments

Comments
 (0)