Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message delivery tag support #362

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions inc/azure_uamqp_c/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ extern "C" {
MOCKABLE_FUNCTION(, int, message_get_body_type, MESSAGE_HANDLE, message, MESSAGE_BODY_TYPE*, body_type);
MOCKABLE_FUNCTION(, int, message_set_message_format, MESSAGE_HANDLE, message, uint32_t, message_format);
MOCKABLE_FUNCTION(, int, message_get_message_format, MESSAGE_HANDLE, message, uint32_t*, message_format);
MOCKABLE_FUNCTION(, int, message_set_delivery_tag, MESSAGE_HANDLE, message, AMQP_VALUE, delivery_tag_value);
MOCKABLE_FUNCTION(, int, message_get_delivery_tag, MESSAGE_HANDLE, message, AMQP_VALUE*, delivery_tag_value);

#ifdef __cplusplus
}
Expand Down
104 changes: 104 additions & 0 deletions src/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ typedef struct MESSAGE_INSTANCE_TAG
application_properties application_properties;
annotations footer;
uint32_t message_format;
AMQP_VALUE delivery_tag;
} MESSAGE_INSTANCE;

MESSAGE_BODY_TYPE internal_get_body_type(MESSAGE_HANDLE message)
Expand Down Expand Up @@ -119,6 +120,7 @@ MESSAGE_HANDLE message_create(void)
result->body_amqp_value = NULL;
result->body_amqp_sequence_items = NULL;
result->body_amqp_sequence_count = 0;
result->delivery_tag = NULL;

/* Codes_SRS_MESSAGE_01_135: [ By default a message on which `message_set_message_format` was not called shall have message format set to 0. ]*/
result->message_format = 0;
Expand Down Expand Up @@ -229,6 +231,17 @@ MESSAGE_HANDLE message_clone(MESSAGE_HANDLE source_message)
}
}

if ((result != NULL) && (source_message->delivery_tag != NULL))
{
result->delivery_tag = amqpvalue_clone(source_message->delivery_tag);
if (result->delivery_tag == NULL)
{
LogError("Cannot clone message delivery tag");
message_destroy(result);
result = NULL;
}
}

if ((result != NULL) && (source_message->body_amqp_data_count > 0))
{
size_t i;
Expand Down Expand Up @@ -375,6 +388,11 @@ void message_destroy(MESSAGE_HANDLE message)
amqpvalue_destroy(message->body_amqp_value);
}

if (message->delivery_tag != NULL)
{
amqpvalue_destroy(message->delivery_tag);
}

/* Codes_SRS_MESSAGE_01_136: [ If the message body is made of several AMQP data items, they shall all be freed. ]*/
free_all_body_data_items(message);

Expand Down Expand Up @@ -1447,3 +1465,89 @@ int message_get_message_format(MESSAGE_HANDLE message, uint32_t *message_format)

return result;
}

int message_set_delivery_tag(MESSAGE_HANDLE message, AMQP_VALUE delivery_tag_value)
{
int result;

if (message == NULL)
{
LogError("NULL message");
result = MU_FAILURE;
}
else
{
if (delivery_tag_value == NULL)
{
if (message->delivery_tag != NULL)
{
amqpvalue_destroy(message->delivery_tag);
message->delivery_tag = NULL;
}

/* Codes_SRS_MESSAGE_01_053: [ On success it shall return 0. ]*/
result = 0;
}
else
{

AMQP_VALUE new_delivery_tag = amqpvalue_clone(delivery_tag_value);
if (new_delivery_tag == NULL)
{
LogError("Cannot clone delivery tag");
result = MU_FAILURE;
}
else
{
if (message->delivery_tag != NULL)
{
amqpvalue_destroy(message->delivery_tag);
}

message->delivery_tag = new_delivery_tag;

/* Codes_SRS_MESSAGE_01_102: [ On success it shall return 0. ]*/
result = 0;
}
}
}

return result;
}

int message_get_delivery_tag(MESSAGE_HANDLE message, AMQP_VALUE *delivery_tag_value)
{
int result;

if ((message == NULL) ||
(delivery_tag_value == NULL))
{
LogError("Bad arguments: message = %p, delivery_tag = %p",
message, delivery_tag_value);
result = MU_FAILURE;
}
else
{
if (message->delivery_tag == NULL)
{
*delivery_tag_value = NULL;
result = 0;
}
else
{
AMQP_VALUE new_delivery_tag = amqpvalue_clone(message->delivery_tag);
if (new_delivery_tag == NULL)
{
LogError("Cannot clone delivery tag");
result = MU_FAILURE;
}
else
{
*delivery_tag_value = new_delivery_tag;
result = 0;
}
}
}

return result;
}
25 changes: 22 additions & 3 deletions src/message_receiver.c
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@ static AMQP_VALUE on_transfer_received(void* context, TRANSFER_HANDLE transfer,
AMQP_VALUE result = NULL;
MESSAGE_RECEIVER_INSTANCE* message_receiver = (MESSAGE_RECEIVER_INSTANCE*)context;

(void)transfer;
if (message_receiver->on_message_received != NULL)
{
MESSAGE_HANDLE message = message_create();
Expand All @@ -236,7 +235,25 @@ static AMQP_VALUE on_transfer_received(void* context, TRANSFER_HANDLE transfer,
}
else
{
AMQPVALUE_DECODER_HANDLE amqpvalue_decoder = amqpvalue_decoder_create(decode_message_value_callback, message_receiver);
delivery_tag received_message_tag;
AMQP_VALUE delivery_tag_value;
AMQPVALUE_DECODER_HANDLE amqpvalue_decoder;

if (transfer_get_delivery_tag(transfer, &received_message_tag) == 0)
{
delivery_tag_value = amqpvalue_create_delivery_tag(received_message_tag);
if ((delivery_tag_value != NULL) && (message_set_delivery_tag(message, delivery_tag_value) != 0))
{
LogError("Could not set message delivery tag");
set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
}
}
else
{
delivery_tag_value = NULL;
}

amqpvalue_decoder = amqpvalue_decoder_create(decode_message_value_callback, message_receiver);
if (amqpvalue_decoder == NULL)
{
LogError("Cannot create AMQP value decoder");
Expand Down Expand Up @@ -266,7 +283,9 @@ static AMQP_VALUE on_transfer_received(void* context, TRANSFER_HANDLE transfer,

amqpvalue_decoder_destroy(amqpvalue_decoder);
}

if ( delivery_tag_value != NULL ) {
amqpvalue_destroy(delivery_tag_value);
}
message_destroy(message);
}
}
Expand Down