diff --git a/src/source/Signaling/LwsApiCalls.c b/src/source/Signaling/LwsApiCalls.c index 1b6721b374..45d9904834 100644 --- a/src/source/Signaling/LwsApiCalls.c +++ b/src/source/Signaling/LwsApiCalls.c @@ -246,7 +246,9 @@ INT32 lwsHttpCallbackRoutine(struct lws* wsi, enum lws_callback_reasons reason, if (size != (INT32) pRequestInfo->bodySize) { DLOGW("Failed to write out the body of POST request entirely. Expected to write %d, wrote %d", pRequestInfo->bodySize, size); if (size > 0) { - // Schedule again + // Update remainig data and schedule again + pRequestInfo->bodySize -= size; + pRequestInfo->body += size; lws_client_http_body_pending(wsi, 1); lws_callback_on_writable(wsi); } else { @@ -306,6 +308,8 @@ INT32 lwsWssCallbackRoutine(struct lws* wsi, enum lws_callback_reasons reason, P case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE: case LWS_CALLBACK_CLIENT_RECEIVE: case LWS_CALLBACK_CLIENT_WRITEABLE: + case LWS_CALLBACK_TIMER: + case LWS_CALLBACK_CLIENT_RECEIVE_PONG: break; default: DLOGI("WSS callback with reason %d", reason); @@ -378,9 +382,16 @@ INT32 lwsWssCallbackRoutine(struct lws* wsi, enum lws_callback_reasons reason, P pSignalingClient->diagnostics.connectTime = SIGNALING_GET_CURRENT_TIME(pSignalingClient); MUTEX_UNLOCK(pSignalingClient->diagnosticsLock); + lws_set_timer_usecs(wsi, SIGNALING_SERVICE_WSS_PING_PONG_INTERVAL_IN_SECONDS * HUNDREDS_OF_NANOS_IN_A_SECOND); // Notify the listener thread CVAR_BROADCAST(pSignalingClient->connectedCvar); + // Keep connection alive + lws_callback_on_writable(wsi); + break; + case LWS_CALLBACK_TIMER: + lws_callback_on_writable(wsi); + lws_set_timer_usecs(wsi, SIGNALING_SERVICE_WSS_PING_PONG_INTERVAL_IN_SECONDS * HUNDREDS_OF_NANOS_IN_A_SECOND); break; case LWS_CALLBACK_CLIENT_CLOSED: @@ -426,79 +437,106 @@ INT32 lwsWssCallbackRoutine(struct lws* wsi, enum lws_callback_reasons reason, P DLOGD("Peer initiated close with %d (0x%08x). Message: %.*s", status, (UINT32) status, size, pCurPtr); - // Store the state as the result - retValue = -1; + if ((status != 0 && status != LWS_CLOSE_STATUS_NORMAL) && !ATOMIC_LOAD_BOOL(&pLwsCallInfo->receiveMessage)) { + ATOMIC_STORE(&pSignalingClient->result, SERVICE_CALL_INTERNAL_ERROR); + retValue = -1; + } else { + // Store normal closure status + ATOMIC_STORE(&pSignalingClient->result, SERVICE_CALL_RESULT_OK); + retValue = 0; + } - ATOMIC_STORE(&pSignalingClient->result, (SIZE_T) status); + break; + case LWS_CALLBACK_CLIENT_RECEIVE_PONG: + DLOGV("Received PONG from server"); break; case LWS_CALLBACK_CLIENT_RECEIVE: + lwsl_info("WS receive callback, len: %zu, is_final: %d\n", dataSize, lws_is_final_fragment(wsi)); + // Check if it's a binary data + if (lws_frame_is_binary(wsi)) { + DLOGW("Received binary data"); + } CHK(!lws_frame_is_binary(wsi), STATUS_SIGNALING_RECEIVE_BINARY_DATA_NOT_SUPPORTED); - // Skip if it's the first and last fragment and the size is 0 - CHK(!(lws_is_first_fragment(wsi) && lws_is_final_fragment(wsi) && dataSize == 0), retStatus); + // Mark as receiving a message + ATOMIC_STORE_BOOL(&pLwsCallInfo->receiveMessage, TRUE); - // Check what type of a message it is. We will set the size to 0 on first and flush on last - if (lws_is_first_fragment(wsi)) { - pLwsCallInfo->receiveBufferSize = 0; - } - - // Store the data in the buffer + // Store the data in the receive buffer CHK(pLwsCallInfo->receiveBufferSize + (UINT32) dataSize + LWS_PRE <= SIZEOF(pLwsCallInfo->receiveBuffer), STATUS_SIGNALING_RECEIVED_MESSAGE_LARGER_THAN_MAX_DATA_LEN); MEMCPY(&pLwsCallInfo->receiveBuffer[LWS_PRE + pLwsCallInfo->receiveBufferSize], pDataIn, dataSize); pLwsCallInfo->receiveBufferSize += (UINT32) dataSize; - // Flush on last + // Process complete message if (lws_is_final_fragment(wsi)) { - CHK_STATUS(receiveLwsMessage(pLwsCallInfo->pSignalingClient, (PCHAR) &pLwsCallInfo->receiveBuffer[LWS_PRE], + CHK_STATUS(receiveLwsMessage(pSignalingClient, (PCHAR) &pLwsCallInfo->receiveBuffer[LWS_PRE], pLwsCallInfo->receiveBufferSize / SIZEOF(CHAR))); + pLwsCallInfo->receiveBufferSize = 0; + ATOMIC_STORE_BOOL(&pLwsCallInfo->receiveMessage, FALSE); } - lws_callback_on_writable(wsi); - + // Keep connection alive after receiving data + if (!ATOMIC_LOAD_BOOL(&pSignalingClient->shutdown)) { + lws_callback_on_writable(wsi); + } break; case LWS_CALLBACK_CLIENT_WRITEABLE: DLOGD("Client is writable"); - // Check if we are attempting to terminate the connection - if (!ATOMIC_LOAD_BOOL(&pSignalingClient->connected) && ATOMIC_LOAD(&pSignalingClient->messageResult) == SERVICE_CALL_UNKNOWN) { - retValue = 1; - CHK(FALSE, retStatus); + // Add buffer state check + if (lws_send_pipe_choked(wsi)) { + DLOGI("WS send pipe choked, retrying"); + lws_callback_on_writable(wsi); + break; } - offset = (UINT32) ATOMIC_LOAD(&pLwsCallInfo->sendOffset); - bufferSize = (UINT32) ATOMIC_LOAD(&pLwsCallInfo->sendBufferSize); - writeSize = (INT32) (bufferSize - offset); - - // Check if we need to do anything - CHK(writeSize > 0, retStatus); + // Log buffer state before write + DLOGD("Send buffer size before write: %zu", pLwsCallInfo->sendBufferSize); - // Send data and notify on completion - size = lws_write(wsi, &(pLwsCallInfo->sendBuffer[pLwsCallInfo->sendOffset]), (SIZE_T) writeSize, LWS_WRITE_TEXT); - - if (size < 0) { - DLOGW("Write failed. Returned write size is %d", size); - // Quit - retValue = -1; - CHK(FALSE, retStatus); + // Only check termination if we're not in the middle of receiving a message + if (!ATOMIC_LOAD_BOOL(&pLwsCallInfo->receiveMessage)) { + CHK(!ATOMIC_LOAD_BOOL(&pRequestInfo->terminating), retStatus); } - if (size == writeSize) { - // Notify the listener - ATOMIC_STORE(&pLwsCallInfo->sendOffset, 0); - ATOMIC_STORE(&pLwsCallInfo->sendBufferSize, 0); - CVAR_BROADCAST(pLwsCallInfo->pSignalingClient->sendCvar); - } else { - // Partial write - DLOGV("Failed to write out the data entirely. Wrote %d out of %d", size, writeSize); - // Schedule again - lws_callback_on_writable(wsi); + // Send data if anything is in the buffer + if (pLwsCallInfo->sendBufferSize != 0) { + SIZE_T remainingSize = pLwsCallInfo->sendBufferSize - LWS_PRE; + // Log write attempt + DLOGD("Attempting to write %zu bytes", remainingSize); + + retValue = (INT32) lws_write(wsi, pLwsCallInfo->sendBuffer + LWS_PRE, remainingSize, LWS_WRITE_TEXT); + if (retValue < 0) { + DLOGW("Write failed with %d", retValue); + CHK(FALSE, !STATUS_SUCCESS); + } else if ((SIZE_T) retValue < remainingSize) { + DLOGW("Partial write occurred: %d of %zu bytes", retValue, remainingSize); + // Move remaining data to start of buffer + MEMMOVE(pLwsCallInfo->sendBuffer + LWS_PRE, pLwsCallInfo->sendBuffer + LWS_PRE + retValue, remainingSize - retValue); + // Update buffer size + pLwsCallInfo->sendBufferSize = (remainingSize - retValue) + LWS_PRE; + + // Handle partial write + lws_callback_on_writable(wsi); + } else { + // Complete write + DLOGI("Write complete: %d bytes", retValue); + pLwsCallInfo->sendBufferSize = 0; + // Keep connection alive after write + if (!ATOMIC_LOAD_BOOL(&pSignalingClient->shutdown)) { + lws_callback_on_writable(wsi); + } + ATOMIC_STORE(&pSignalingClient->messageResult, (SIZE_T) SERVICE_CALL_RESULT_OK); + // Signal completion immediately after successful write + CVAR_BROADCAST(pSignalingClient->sendCvar); + } } + // Always return success from writeable callback + retValue = 0; break; @@ -569,8 +607,10 @@ STATUS lwsCompleteSync(PLwsCallInfo pCallInfo) // Execute the LWS REST call MEMSET(&connectInfo, 0x00, SIZEOF(struct lws_client_connect_info)); connectInfo.context = pContext; - connectInfo.ssl_connection = LCCSCF_USE_SSL; + connectInfo.ssl_connection = LCCSCF_USE_SSL | LCCSCF_H2_QUIRK_OVERFLOWS_TXCR; // Add flag to handle H2 flow control connectInfo.port = SIGNALING_DEFAULT_SSL_PORT; + connectInfo.alpn = "http/1.1"; // Force HTTP/1.1 only + connectInfo.protocol = "http/1.1"; // Force HTTP/1.1 protocol CHK_STATUS(getRequestHost(pCallInfo->callInfo.pRequestInfo->url, &pHostStart, &pHostEnd)); CHK(pHostEnd == NULL || *pHostEnd == '/' || *pHostEnd == '?', STATUS_INTERNAL_ERROR); @@ -1344,6 +1384,7 @@ STATUS createLwsCallInfo(PSignalingClient pSignalingClient, PRequestInfo pReques pLwsCallInfo->callInfo.pRequestInfo = pRequestInfo; pLwsCallInfo->pSignalingClient = pSignalingClient; pLwsCallInfo->protocolIndex = protocolIndex; + ATOMIC_STORE_BOOL(&pLwsCallInfo->receiveMessage, FALSE); *ppLwsCallInfo = pLwsCallInfo; @@ -1907,6 +1948,9 @@ STATUS writeLwsData(PSignalingClient pSignalingClient, BOOL awaitForResponse) SIZE_T offset, size; SERVICE_CALL_RESULT result; + UINT32 retryCount = 0; + const UINT32 MAX_RETRY_COUNT = 3; + CHK(pSignalingClient != NULL && pSignalingClient->pOngoingCallInfo != NULL, STATUS_NULL_ARG); // See if anything needs to be done @@ -1920,7 +1964,7 @@ STATUS writeLwsData(PSignalingClient pSignalingClient, BOOL awaitForResponse) MUTEX_LOCK(pSignalingClient->sendLock); sendLocked = TRUE; - while (iterate) { + while (iterate && retryCount < MAX_RETRY_COUNT) { offset = ATOMIC_LOAD(&pSignalingClient->pOngoingCallInfo->sendOffset); size = ATOMIC_LOAD(&pSignalingClient->pOngoingCallInfo->sendBufferSize); @@ -1928,14 +1972,28 @@ STATUS writeLwsData(PSignalingClient pSignalingClient, BOOL awaitForResponse) if (offset != size && result == SERVICE_CALL_RESULT_NOT_SET) { CHK_STATUS(CVAR_WAIT(pSignalingClient->sendCvar, pSignalingClient->sendLock, SIGNALING_SEND_TIMEOUT)); + retryCount++; + } else if (result == SERVICE_CALL_UNKNOWN) { + // Partial write occurred, continue waiting + retryCount = 0; + continue; } else { iterate = FALSE; } + if (iterate) { + // Wake up the service event loop + CHK_STATUS(wakeLwsServiceEventLoop(pSignalingClient, PROTOCOL_INDEX_WSS)); + } } - MUTEX_UNLOCK(pSignalingClient->sendLock); sendLocked = FALSE; + // Check if we timed out + if (retryCount >= MAX_RETRY_COUNT) { + DLOGW("Failed to send data after %d attempts", MAX_RETRY_COUNT); + CHK(FALSE, STATUS_SIGNALING_MESSAGE_DELIVERY_FAILED); + } + // Do not await for the response in case of correlation id not specified CHK(awaitForResponse, retStatus); diff --git a/src/source/Signaling/LwsApiCalls.h b/src/source/Signaling/LwsApiCalls.h index 4749b4b7c1..4a7dc44443 100644 --- a/src/source/Signaling/LwsApiCalls.h +++ b/src/source/Signaling/LwsApiCalls.h @@ -208,6 +208,9 @@ struct __LwsCallInfo { // Service exit indicator; volatile ATOMIC_BOOL cancelService; + // Message receiving indicator + volatile ATOMIC_BOOL receiveMessage; + // Protocol index UINT32 protocolIndex;