Skip to content

Increase robustness of LwsApiCall implementation #2134

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 105 additions & 47 deletions src/source/Signaling/LwsApiCalls.c
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,9 @@ INT32 lwsHttpCallbackRoutine(struct lws* wsi, enum lws_callback_reasons reason,
if (size != (INT32) pRequestInfo->bodySize) {
DLOGW("Failed to write out the body of POST request entirely. Expected to write %d, wrote %d", pRequestInfo->bodySize, size);
if (size > 0) {
// Schedule again
// Update remainig data and schedule again
pRequestInfo->bodySize -= size;
pRequestInfo->body += size;
lws_client_http_body_pending(wsi, 1);
lws_callback_on_writable(wsi);
} else {
Expand Down Expand Up @@ -306,6 +308,8 @@ INT32 lwsWssCallbackRoutine(struct lws* wsi, enum lws_callback_reasons reason, P
case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE:
case LWS_CALLBACK_CLIENT_RECEIVE:
case LWS_CALLBACK_CLIENT_WRITEABLE:
case LWS_CALLBACK_TIMER:
case LWS_CALLBACK_CLIENT_RECEIVE_PONG:
break;
default:
DLOGI("WSS callback with reason %d", reason);
Expand Down Expand Up @@ -378,9 +382,16 @@ INT32 lwsWssCallbackRoutine(struct lws* wsi, enum lws_callback_reasons reason, P
pSignalingClient->diagnostics.connectTime = SIGNALING_GET_CURRENT_TIME(pSignalingClient);
MUTEX_UNLOCK(pSignalingClient->diagnosticsLock);

lws_set_timer_usecs(wsi, SIGNALING_SERVICE_WSS_PING_PONG_INTERVAL_IN_SECONDS * HUNDREDS_OF_NANOS_IN_A_SECOND);
// Notify the listener thread
CVAR_BROADCAST(pSignalingClient->connectedCvar);

// Keep connection alive
lws_callback_on_writable(wsi);
break;
case LWS_CALLBACK_TIMER:
lws_callback_on_writable(wsi);
lws_set_timer_usecs(wsi, SIGNALING_SERVICE_WSS_PING_PONG_INTERVAL_IN_SECONDS * HUNDREDS_OF_NANOS_IN_A_SECOND);
break;

case LWS_CALLBACK_CLIENT_CLOSED:
Expand Down Expand Up @@ -426,79 +437,106 @@ INT32 lwsWssCallbackRoutine(struct lws* wsi, enum lws_callback_reasons reason, P

DLOGD("Peer initiated close with %d (0x%08x). Message: %.*s", status, (UINT32) status, size, pCurPtr);

// Store the state as the result
retValue = -1;
if ((status != 0 && status != LWS_CLOSE_STATUS_NORMAL) && !ATOMIC_LOAD_BOOL(&pLwsCallInfo->receiveMessage)) {
ATOMIC_STORE(&pSignalingClient->result, SERVICE_CALL_INTERNAL_ERROR);
retValue = -1;
} else {
// Store normal closure status
ATOMIC_STORE(&pSignalingClient->result, SERVICE_CALL_RESULT_OK);
retValue = 0;
}

ATOMIC_STORE(&pSignalingClient->result, (SIZE_T) status);
break;

case LWS_CALLBACK_CLIENT_RECEIVE_PONG:
DLOGV("Received PONG from server");
break;

case LWS_CALLBACK_CLIENT_RECEIVE:

lwsl_info("WS receive callback, len: %zu, is_final: %d\n", dataSize, lws_is_final_fragment(wsi));

// Check if it's a binary data
if (lws_frame_is_binary(wsi)) {
DLOGW("Received binary data");
}
CHK(!lws_frame_is_binary(wsi), STATUS_SIGNALING_RECEIVE_BINARY_DATA_NOT_SUPPORTED);

// Skip if it's the first and last fragment and the size is 0
CHK(!(lws_is_first_fragment(wsi) && lws_is_final_fragment(wsi) && dataSize == 0), retStatus);
// Mark as receiving a message
ATOMIC_STORE_BOOL(&pLwsCallInfo->receiveMessage, TRUE);

// Check what type of a message it is. We will set the size to 0 on first and flush on last
if (lws_is_first_fragment(wsi)) {
pLwsCallInfo->receiveBufferSize = 0;
}

// Store the data in the buffer
// Store the data in the receive buffer
CHK(pLwsCallInfo->receiveBufferSize + (UINT32) dataSize + LWS_PRE <= SIZEOF(pLwsCallInfo->receiveBuffer),
STATUS_SIGNALING_RECEIVED_MESSAGE_LARGER_THAN_MAX_DATA_LEN);
MEMCPY(&pLwsCallInfo->receiveBuffer[LWS_PRE + pLwsCallInfo->receiveBufferSize], pDataIn, dataSize);
pLwsCallInfo->receiveBufferSize += (UINT32) dataSize;

// Flush on last
// Process complete message
if (lws_is_final_fragment(wsi)) {
CHK_STATUS(receiveLwsMessage(pLwsCallInfo->pSignalingClient, (PCHAR) &pLwsCallInfo->receiveBuffer[LWS_PRE],
CHK_STATUS(receiveLwsMessage(pSignalingClient, (PCHAR) &pLwsCallInfo->receiveBuffer[LWS_PRE],
pLwsCallInfo->receiveBufferSize / SIZEOF(CHAR)));
pLwsCallInfo->receiveBufferSize = 0;
ATOMIC_STORE_BOOL(&pLwsCallInfo->receiveMessage, FALSE);
}

lws_callback_on_writable(wsi);

// Keep connection alive after receiving data
if (!ATOMIC_LOAD_BOOL(&pSignalingClient->shutdown)) {
lws_callback_on_writable(wsi);
}
break;

case LWS_CALLBACK_CLIENT_WRITEABLE:
DLOGD("Client is writable");

// Check if we are attempting to terminate the connection
if (!ATOMIC_LOAD_BOOL(&pSignalingClient->connected) && ATOMIC_LOAD(&pSignalingClient->messageResult) == SERVICE_CALL_UNKNOWN) {
retValue = 1;
CHK(FALSE, retStatus);
// Add buffer state check
if (lws_send_pipe_choked(wsi)) {
DLOGI("WS send pipe choked, retrying");
lws_callback_on_writable(wsi);
break;
}

offset = (UINT32) ATOMIC_LOAD(&pLwsCallInfo->sendOffset);
bufferSize = (UINT32) ATOMIC_LOAD(&pLwsCallInfo->sendBufferSize);
writeSize = (INT32) (bufferSize - offset);

// Check if we need to do anything
CHK(writeSize > 0, retStatus);
// Log buffer state before write
DLOGD("Send buffer size before write: %zu", pLwsCallInfo->sendBufferSize);

// Send data and notify on completion
size = lws_write(wsi, &(pLwsCallInfo->sendBuffer[pLwsCallInfo->sendOffset]), (SIZE_T) writeSize, LWS_WRITE_TEXT);

if (size < 0) {
DLOGW("Write failed. Returned write size is %d", size);
// Quit
retValue = -1;
CHK(FALSE, retStatus);
// Only check termination if we're not in the middle of receiving a message
if (!ATOMIC_LOAD_BOOL(&pLwsCallInfo->receiveMessage)) {
CHK(!ATOMIC_LOAD_BOOL(&pRequestInfo->terminating), retStatus);
}

if (size == writeSize) {
// Notify the listener
ATOMIC_STORE(&pLwsCallInfo->sendOffset, 0);
ATOMIC_STORE(&pLwsCallInfo->sendBufferSize, 0);
CVAR_BROADCAST(pLwsCallInfo->pSignalingClient->sendCvar);
} else {
// Partial write
DLOGV("Failed to write out the data entirely. Wrote %d out of %d", size, writeSize);
// Schedule again
lws_callback_on_writable(wsi);
// Send data if anything is in the buffer
if (pLwsCallInfo->sendBufferSize != 0) {
SIZE_T remainingSize = pLwsCallInfo->sendBufferSize - LWS_PRE;
// Log write attempt
DLOGD("Attempting to write %zu bytes", remainingSize);

retValue = (INT32) lws_write(wsi, pLwsCallInfo->sendBuffer + LWS_PRE, remainingSize, LWS_WRITE_TEXT);
if (retValue < 0) {
DLOGW("Write failed with %d", retValue);
CHK(FALSE, !STATUS_SUCCESS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should put an actual status here -- I believe this would get converted to 1 which is STATUS_NULL_ARG

} else if ((SIZE_T) retValue < remainingSize) {
DLOGW("Partial write occurred: %d of %zu bytes", retValue, remainingSize);
// Move remaining data to start of buffer
MEMMOVE(pLwsCallInfo->sendBuffer + LWS_PRE, pLwsCallInfo->sendBuffer + LWS_PRE + retValue, remainingSize - retValue);
// Update buffer size
pLwsCallInfo->sendBufferSize = (remainingSize - retValue) + LWS_PRE;

// Handle partial write
lws_callback_on_writable(wsi);
} else {
// Complete write
DLOGI("Write complete: %d bytes", retValue);
pLwsCallInfo->sendBufferSize = 0;
// Keep connection alive after write
if (!ATOMIC_LOAD_BOOL(&pSignalingClient->shutdown)) {
lws_callback_on_writable(wsi);
}
ATOMIC_STORE(&pSignalingClient->messageResult, (SIZE_T) SERVICE_CALL_RESULT_OK);
// Signal completion immediately after successful write
CVAR_BROADCAST(pSignalingClient->sendCvar);
}
}
// Always return success from writeable callback
retValue = 0;

break;

Expand Down Expand Up @@ -569,8 +607,10 @@ STATUS lwsCompleteSync(PLwsCallInfo pCallInfo)
// Execute the LWS REST call
MEMSET(&connectInfo, 0x00, SIZEOF(struct lws_client_connect_info));
connectInfo.context = pContext;
connectInfo.ssl_connection = LCCSCF_USE_SSL;
connectInfo.ssl_connection = LCCSCF_USE_SSL | LCCSCF_H2_QUIRK_OVERFLOWS_TXCR; // Add flag to handle H2 flow control
connectInfo.port = SIGNALING_DEFAULT_SSL_PORT;
connectInfo.alpn = "http/1.1"; // Force HTTP/1.1 only
connectInfo.protocol = "http/1.1"; // Force HTTP/1.1 protocol
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering the purpose of the H2 flag if 1.1 is forced, is it for latency/performance optimization?

It seems that HTTP 2 should from this quick test:

curl -sI https://m-xxxxxxxx.kinesisvideo.us-west-2.amazonaws.com -o/dev/null -w '%{http_version}\n'
2
curl -sI https://v-xxxxxxxx.kinesisvideo.us-west-2.amazonaws.com -o/dev/null -w '%{http_version}\n'
2


CHK_STATUS(getRequestHost(pCallInfo->callInfo.pRequestInfo->url, &pHostStart, &pHostEnd));
CHK(pHostEnd == NULL || *pHostEnd == '/' || *pHostEnd == '?', STATUS_INTERNAL_ERROR);
Expand Down Expand Up @@ -1344,6 +1384,7 @@ STATUS createLwsCallInfo(PSignalingClient pSignalingClient, PRequestInfo pReques
pLwsCallInfo->callInfo.pRequestInfo = pRequestInfo;
pLwsCallInfo->pSignalingClient = pSignalingClient;
pLwsCallInfo->protocolIndex = protocolIndex;
ATOMIC_STORE_BOOL(&pLwsCallInfo->receiveMessage, FALSE);

*ppLwsCallInfo = pLwsCallInfo;

Expand Down Expand Up @@ -1907,6 +1948,9 @@ STATUS writeLwsData(PSignalingClient pSignalingClient, BOOL awaitForResponse)
SIZE_T offset, size;
SERVICE_CALL_RESULT result;

UINT32 retryCount = 0;
const UINT32 MAX_RETRY_COUNT = 3;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking this should be a #define (very very minor memory reduction)


CHK(pSignalingClient != NULL && pSignalingClient->pOngoingCallInfo != NULL, STATUS_NULL_ARG);

// See if anything needs to be done
Expand All @@ -1920,22 +1964,36 @@ STATUS writeLwsData(PSignalingClient pSignalingClient, BOOL awaitForResponse)

MUTEX_LOCK(pSignalingClient->sendLock);
sendLocked = TRUE;
while (iterate) {
while (iterate && retryCount < MAX_RETRY_COUNT) {
offset = ATOMIC_LOAD(&pSignalingClient->pOngoingCallInfo->sendOffset);
size = ATOMIC_LOAD(&pSignalingClient->pOngoingCallInfo->sendBufferSize);

result = (SERVICE_CALL_RESULT) ATOMIC_LOAD(&pSignalingClient->messageResult);

if (offset != size && result == SERVICE_CALL_RESULT_NOT_SET) {
CHK_STATUS(CVAR_WAIT(pSignalingClient->sendCvar, pSignalingClient->sendLock, SIGNALING_SEND_TIMEOUT));
retryCount++;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if we can debug log (DLOGD) the retry count increased

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the CVAR_WAIT returned non-success (eg status operation timed out), CHK_STATUS would goto Cleanup and bypass the retry count -- doesn't seem intended since there's a "// Check if we timed out" down below

} else if (result == SERVICE_CALL_UNKNOWN) {
// Partial write occurred, continue waiting
retryCount = 0;
continue;
} else {
iterate = FALSE;
}
if (iterate) {
// Wake up the service event loop
CHK_STATUS(wakeLwsServiceEventLoop(pSignalingClient, PROTOCOL_INDEX_WSS));
}
}

MUTEX_UNLOCK(pSignalingClient->sendLock);
sendLocked = FALSE;

// Check if we timed out
if (retryCount >= MAX_RETRY_COUNT) {
DLOGW("Failed to send data after %d attempts", MAX_RETRY_COUNT);
CHK(FALSE, STATUS_SIGNALING_MESSAGE_DELIVERY_FAILED);
}

// Do not await for the response in case of correlation id not specified
CHK(awaitForResponse, retStatus);

Expand Down
3 changes: 3 additions & 0 deletions src/source/Signaling/LwsApiCalls.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ struct __LwsCallInfo {
// Service exit indicator;
volatile ATOMIC_BOOL cancelService;

// Message receiving indicator
volatile ATOMIC_BOOL receiveMessage;

// Protocol index
UINT32 protocolIndex;

Expand Down
Loading