From e0334289a2946a431bfb49ccf37ff96e1345b0e4 Mon Sep 17 00:00:00 2001 From: "oleksii.diagiliev" Date: Tue, 28 Jan 2025 15:09:12 -0800 Subject: [PATCH] [SPARK-51023] log remote address on RPC exception --- .../server/TransportRequestHandler.java | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java index 687c3040ed083..2727051894b7a 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java @@ -176,8 +176,9 @@ public void onFailure(Throwable e) { } }); } catch (Exception e) { - logger.error("Error while invoking RpcHandler#receive() on RPC id {}", e, - MDC.of(LogKeys.REQUEST_ID$.MODULE$, req.requestId)); + logger.error("Error while invoking RpcHandler#receive() on RPC id {} from {}", e, + MDC.of(LogKeys.REQUEST_ID$.MODULE$, req.requestId), + MDC.of(LogKeys.HOST_PORT$.MODULE$, getRemoteAddress(channel))); respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e))); } finally { req.body().release(); @@ -262,8 +263,9 @@ public String getID() { respond(new RpcResponse(req.requestId, new NioManagedBuffer(blockPushNonFatalFailure.getResponse()))); } else { - logger.error("Error while invoking RpcHandler#receive() on RPC id {}", e, - MDC.of(LogKeys.REQUEST_ID$.MODULE$, req.requestId)); + logger.error("Error while invoking RpcHandler#receive() on RPC id {} from {}", e, + MDC.of(LogKeys.REQUEST_ID$.MODULE$, req.requestId), + MDC.of(LogKeys.HOST_PORT$.MODULE$, getRemoteAddress(channel))); respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e))); } // We choose to totally fail the channel, rather than trying to recover as we do in other @@ -279,7 +281,8 @@ private void processOneWayMessage(OneWayMessage req) { try { rpcHandler.receive(reverseClient, req.body().nioByteBuffer()); } catch (Exception e) { - logger.error("Error while invoking RpcHandler#receive() for one-way message.", e); + logger.error("Error while invoking RpcHandler#receive() for one-way message from {}.", e, + MDC.of(LogKeys.HOST_PORT$.MODULE$, getRemoteAddress(channel))); } finally { req.body().release(); } @@ -304,9 +307,10 @@ public void onFailure(Throwable e) { }); } catch (Exception e) { logger.error("Error while invoking receiveMergeBlockMetaReq() for appId {} shuffleId {} " - + "reduceId {}", e, MDC.of(LogKeys.APP_ID$.MODULE$, req.appId), + + "reduceId {} from {}", e, MDC.of(LogKeys.APP_ID$.MODULE$, req.appId), MDC.of(LogKeys.SHUFFLE_ID$.MODULE$, req.shuffleId), - MDC.of(LogKeys.REDUCE_ID$.MODULE$, req.reduceId)); + MDC.of(LogKeys.REDUCE_ID$.MODULE$, req.reduceId), + MDC.of(LogKeys.HOST_PORT$.MODULE$, getRemoteAddress(channel))); respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e))); } }