Skip to content

Commit

Permalink
[#noissue] Optimize gRpc TheradLocal
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Feb 11, 2025
1 parent 55ca14a commit 34e56a2
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.navercorp.pinpoint.collector.mapper.grpc.stat.GrpcAgentUriStatMapper;
import com.navercorp.pinpoint.collector.service.AgentUriStatService;
import com.navercorp.pinpoint.common.server.bo.stat.AgentUriStatBo;
import com.navercorp.pinpoint.grpc.Header;
import com.navercorp.pinpoint.grpc.MessageFormatUtils;
import com.navercorp.pinpoint.grpc.trace.PAgentUriStat;
import com.navercorp.pinpoint.io.request.ServerRequest;
Expand Down Expand Up @@ -50,8 +51,9 @@ public void handle(ServerRequest<GeneratedMessageV3> request) {
if (!uriStatEnable) {
return;
}
final Header header = request.getHeader();
final PAgentUriStat agentUriStat = (PAgentUriStat) request.getData();
final AgentUriStatBo agentUriStatBo = agentUriStatMapper.map(agentUriStat);
final AgentUriStatBo agentUriStatBo = agentUriStatMapper.map(header, agentUriStat);
agentUriStatService.save(agentUriStatBo);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@
import com.navercorp.pinpoint.common.server.bo.stat.EachUriStatBo;
import com.navercorp.pinpoint.common.server.bo.stat.UriStatHistogram;
import com.navercorp.pinpoint.grpc.Header;
import com.navercorp.pinpoint.grpc.server.ServerContext;
import com.navercorp.pinpoint.grpc.trace.PAgentUriStat;
import com.navercorp.pinpoint.grpc.trace.PEachUriStat;
import com.navercorp.pinpoint.grpc.trace.PUriHistogram;

import org.springframework.stereotype.Component;

import java.util.List;
Expand All @@ -35,11 +33,9 @@
@Component
public class GrpcAgentUriStatMapper {

public AgentUriStatBo map(final PAgentUriStat agentUriStat) {
final Header agentInfo = ServerContext.getAgentInfo();

final String agentId = agentInfo.getAgentId();
final String applicationName = agentInfo.getApplicationName();
public AgentUriStatBo map(Header header, final PAgentUriStat agentUriStat) {
final String agentId = header.getAgentId();
final String applicationName = header.getApplicationName();

int bucketVersion = agentUriStat.getBucketVersion();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,12 @@ public void requestAgentInfo(PAgentInfo agentInfo, StreamObserver<PResult> respo
logger.debug("Request PAgentInfo={}", MessageFormatUtils.debugLog(agentInfo));
}

final Header header = ServerContext.getAgentInfo();

Check warning on line 73 in collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/service/AgentService.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/service/AgentService.java#L73

Added line #L73 was not covered by tests
try {
executor.execute(new Runnable() {
@Override
public void run() {
final Message<PAgentInfo> message = newMessage(agentInfo, MessageType.AGENT_INFO);
final Message<PAgentInfo> message = newMessage(header, agentInfo, MessageType.AGENT_INFO);

Check warning on line 78 in collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/service/AgentService.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/service/AgentService.java#L78

Added line #L78 was not covered by tests
simpleRequestHandlerAdaptor.request(message, responseObserver);
// Update service type of PingSession
AgentService.this.pingEventHandler.update((short) agentInfo.getServiceType());
Expand Down Expand Up @@ -152,8 +153,7 @@ private long nextSessionId() {
return idAllocator.getAndIncrement();
}

private <T> Message<T> newMessage(T requestData, MessageType type) {
Header header = ServerContext.getAgentInfo();
private <T> Message<T> newMessage(Header header, T requestData, MessageType type) {
return new DefaultMessage<>(header, type, requestData);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.navercorp.pinpoint.collector.receiver.grpc.service;

import com.navercorp.pinpoint.grpc.Header;
import com.navercorp.pinpoint.grpc.server.ServerContext;
import com.navercorp.pinpoint.grpc.server.TransportMetadata;
import com.navercorp.pinpoint.io.request.DefaultServerRequest;
Expand Down Expand Up @@ -47,11 +46,6 @@ public <T> ServerRequest<T> newServerRequest(Message<T> message) throws StatusEx
public <T> ServerRequest<T> newServerRequest(Context context, Message<T> message) throws StatusException {
Objects.requireNonNull(context, "context");

final Header header = message.getHeader();
if (header == null) {
throw Status.INTERNAL.withDescription("Not found request header").asException();
}

final TransportMetadata transportMetadata = ServerContext.getTransportMetadata(context);
if (transportMetadata == null) {
throw Status.INTERNAL.withDescription("Not found transportMetadata").asException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,10 @@ public void onNext(Req req) {

@Override
public void onError(Throwable throwable) {
Header header = ServerContext.getAgentInfo();

Status status = Status.fromThrowable(throwable);
Metadata metadata = Status.trailersFromThrowable(throwable);
if (logger.isInfoEnabled()) {
Header header = ServerContext.getAgentInfo();
logger.info("onError: Failed to span streamId=, {} {} {}", streamId, header, status, metadata);
}

Expand All @@ -68,8 +67,10 @@ public void onError(Throwable throwable) {

@Override
public void onCompleted() {
Header header = ServerContext.getAgentInfo();
logger.info("onCompleted streamId={} {}", streamId, header);
if (logger.isInfoEnabled()) {
Header header = ServerContext.getAgentInfo();
logger.info("onCompleted streamId={} {}", streamId, header);

Check warning on line 72 in collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/service/ServerCallStream.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/service/ServerCallStream.java#L71-L72

Added lines #L71 - L72 were not covered by tests
}

responseCompleted();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@
import com.navercorp.pinpoint.io.request.ServerRequest;
import io.grpc.Context;
import io.grpc.StatusRuntimeException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
Expand All @@ -53,38 +51,28 @@
*/
public class GrpcAgentUriMetricHandlerV2Test {

private Context prevContext;

@BeforeEach
public void setUp() {
Context root = Context.ROOT;
prevContext = root.attach();
}

@AfterEach
public void tearDown() {
Context root = Context.ROOT;
if (prevContext != null) {
root.detach(prevContext);
}
}

@Test
public void throwExceptionTest() {
Assertions.assertThrows(StatusRuntimeException.class, () -> {
AgentUriStatService mockAgentUriStatService = mock(AgentUriStatService.class);
ServerRequest<GeneratedMessageV3> mockServerRequest = mock(ServerRequest.class);
ServerRequest<GeneratedMessageV3> mockServerRequest = serverRequestMock();

GrpcAgentStatHandlerV2 handler = createMockHandler(mockAgentUriStatService, false);

handler.handleSimple(mockServerRequest);
});
}

private ServerRequest<GeneratedMessageV3> serverRequestMock() {
@SuppressWarnings("unchecked")
ServerRequest<GeneratedMessageV3> mockServerRequest = mock(ServerRequest.class);
return mockServerRequest;
}

@Test
public void skipTest() {
AgentUriStatService mockAgentUriStatService = mock(AgentUriStatService.class);
ServerRequest<GeneratedMessageV3> mockServerRequest = mock(ServerRequest.class);
ServerRequest<GeneratedMessageV3> mockServerRequest = serverRequestMock();
when(mockServerRequest.getData()).thenReturn(PAgentUriStat.getDefaultInstance());

GrpcAgentStatHandlerV2 handler = createMockHandler(mockAgentUriStatService, false);
Expand All @@ -95,11 +83,11 @@ public void skipTest() {
public void handleTest() {
AgentUriStatService mockAgentUriStatService = mock(AgentUriStatService.class);

attachContext(new Header("name", "agentId", "agentName", "applicationName", ServiceType.UNKNOWN.getCode(), System.currentTimeMillis(), Header.SOCKET_ID_NOT_EXIST, new ArrayList<>()));

PAgentUriStat pAgentUriStat = createPAgentUriStat();

ServerRequest<GeneratedMessageV3> mockServerRequest = mock(ServerRequest.class);
ServerRequest<GeneratedMessageV3> mockServerRequest = serverRequestMock();
Header header = new Header("name", "agentId", "agentName", "applicationName", ServiceType.UNKNOWN.getCode(), System.currentTimeMillis(), Header.SOCKET_ID_NOT_EXIST, new ArrayList<>());
when(mockServerRequest.getHeader()).thenReturn(header);
when(mockServerRequest.getData()).thenReturn(pAgentUriStat);

GrpcAgentStatHandlerV2 handler = createMockHandler(mockAgentUriStatService, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.navercorp.pinpoint.grpc.Header;
import com.navercorp.pinpoint.grpc.server.ServerContext;
import com.navercorp.pinpoint.grpc.server.TransportMetadata;
import io.grpc.Context;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -42,14 +43,15 @@ public DefaultPingEventHandler(PingSessionRegistry pingSessionRegistry, Lifecycl

@Override
public void connect() {
final TransportMetadata transportMetadata = ServerContext.getTransportMetadata();
final Context context = Context.current();
final TransportMetadata transportMetadata = ServerContext.getTransportMetadata(context);
final Header header = ServerContext.getAgentInfo(context);

Check warning on line 48 in grpc/src/main/java/com/navercorp/pinpoint/grpc/server/lifecycle/DefaultPingEventHandler.java

View check run for this annotation

Codecov / codecov/patch

grpc/src/main/java/com/navercorp/pinpoint/grpc/server/lifecycle/DefaultPingEventHandler.java#L46-L48

Added lines #L46 - L48 were not covered by tests
if (transportMetadata == null) {
logger.info("Skip connect event handle of ping, not found TransportMetadata. header={}", ServerContext.getAgentInfo());
logger.info("Skip connect event handle of ping, not found TransportMetadata. header={}", header);

Check warning on line 50 in grpc/src/main/java/com/navercorp/pinpoint/grpc/server/lifecycle/DefaultPingEventHandler.java

View check run for this annotation

Codecov / codecov/patch

grpc/src/main/java/com/navercorp/pinpoint/grpc/server/lifecycle/DefaultPingEventHandler.java#L50

Added line #L50 was not covered by tests
return;
}

final Long transportId = transportMetadata.getTransportId();
final Header header = ServerContext.getAgentInfo();
final PingSession pingSession = PingSession.of(transportId, header);
pingSession.setLastPingTimeMillis(System.currentTimeMillis());
final PingSession oldSession = pingSessionRegistry.add(pingSession.getId(), pingSession);
Expand All @@ -61,9 +63,10 @@ public void connect() {

@Override
public void ping() {
final TransportMetadata transportMetadata = ServerContext.getTransportMetadata();
final Context context = Context.current();
final TransportMetadata transportMetadata = ServerContext.getTransportMetadata(context);

Check warning on line 67 in grpc/src/main/java/com/navercorp/pinpoint/grpc/server/lifecycle/DefaultPingEventHandler.java

View check run for this annotation

Codecov / codecov/patch

grpc/src/main/java/com/navercorp/pinpoint/grpc/server/lifecycle/DefaultPingEventHandler.java#L66-L67

Added lines #L66 - L67 were not covered by tests
if (transportMetadata == null) {
logger.info("Skip ping event handle of ping, not found TransportMetadata. header={}", ServerContext.getAgentInfo());
logger.info("Skip ping event handle of ping, not found TransportMetadata. header={}", ServerContext.getAgentInfo(context));

Check warning on line 69 in grpc/src/main/java/com/navercorp/pinpoint/grpc/server/lifecycle/DefaultPingEventHandler.java

View check run for this annotation

Codecov / codecov/patch

grpc/src/main/java/com/navercorp/pinpoint/grpc/server/lifecycle/DefaultPingEventHandler.java#L69

Added line #L69 was not covered by tests
return;
}

Expand All @@ -84,9 +87,10 @@ public void ping() {

@Override
public void close() {
final TransportMetadata transportMetadata = ServerContext.getTransportMetadata();
final Context context = Context.current();
final TransportMetadata transportMetadata = ServerContext.getTransportMetadata(context);

Check warning on line 91 in grpc/src/main/java/com/navercorp/pinpoint/grpc/server/lifecycle/DefaultPingEventHandler.java

View check run for this annotation

Codecov / codecov/patch

grpc/src/main/java/com/navercorp/pinpoint/grpc/server/lifecycle/DefaultPingEventHandler.java#L90-L91

Added lines #L90 - L91 were not covered by tests
if (transportMetadata == null) {
logger.info("Skip close event handle of ping, not found TransportMetadata. header={}", ServerContext.getAgentInfo());
logger.info("Skip close event handle of ping, not found TransportMetadata. header={}", ServerContext.getAgentInfo(context));

Check warning on line 93 in grpc/src/main/java/com/navercorp/pinpoint/grpc/server/lifecycle/DefaultPingEventHandler.java

View check run for this annotation

Codecov / codecov/patch

grpc/src/main/java/com/navercorp/pinpoint/grpc/server/lifecycle/DefaultPingEventHandler.java#L93

Added line #L93 was not covered by tests
return;
}

Expand All @@ -102,9 +106,10 @@ public void close() {

@Override
public void update(final short serviceType) {
final TransportMetadata transportMetadata = ServerContext.getTransportMetadata();
final Context context = Context.current();
final TransportMetadata transportMetadata = ServerContext.getTransportMetadata(context);

Check warning on line 110 in grpc/src/main/java/com/navercorp/pinpoint/grpc/server/lifecycle/DefaultPingEventHandler.java

View check run for this annotation

Codecov / codecov/patch

grpc/src/main/java/com/navercorp/pinpoint/grpc/server/lifecycle/DefaultPingEventHandler.java#L109-L110

Added lines #L109 - L110 were not covered by tests
if (transportMetadata == null) {
logger.info("Skip update event handle of ping, not found TransportMetadata. header={}", ServerContext.getAgentInfo());
logger.info("Skip update event handle of ping, not found TransportMetadata. header={}", ServerContext.getAgentInfo(context));

Check warning on line 112 in grpc/src/main/java/com/navercorp/pinpoint/grpc/server/lifecycle/DefaultPingEventHandler.java

View check run for this annotation

Codecov / codecov/patch

grpc/src/main/java/com/navercorp/pinpoint/grpc/server/lifecycle/DefaultPingEventHandler.java#L112

Added line #L112 was not covered by tests
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.navercorp.pinpoint.realtime.collector.sink.EchoPublisher;
import com.navercorp.pinpoint.realtime.collector.sink.Publisher;
import com.navercorp.pinpoint.realtime.collector.sink.SinkRepository;
import io.grpc.Context;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusException;
Expand Down Expand Up @@ -78,12 +79,15 @@ public GrpcCommandService(
@Override
@SuppressWarnings("deprecation")
public StreamObserver<PCmdMessage> handleCommand(StreamObserver<PCmdRequest> requestObserver) {
Long transportId = getTransportIdFromContext();
ClusterKey clusterKey = getClusterKeyFromContext();
final Context context = Context.current();
Long transportId = getTransportIdFromContext(context);

Check warning on line 83 in realtime/realtime-collector/src/main/java/com/navercorp/pinpoint/realtime/collector/receiver/grpc/GrpcCommandService.java

View check run for this annotation

Codecov / codecov/patch

realtime/realtime-collector/src/main/java/com/navercorp/pinpoint/realtime/collector/receiver/grpc/GrpcCommandService.java#L82-L83

Added lines #L82 - L83 were not covered by tests

final Header header = ServerContext.getAgentInfo(context);
ClusterKey clusterKey = getClusterKeyFromContext(header);

Check warning on line 86 in realtime/realtime-collector/src/main/java/com/navercorp/pinpoint/realtime/collector/receiver/grpc/GrpcCommandService.java

View check run for this annotation

Codecov / codecov/patch

realtime/realtime-collector/src/main/java/com/navercorp/pinpoint/realtime/collector/receiver/grpc/GrpcCommandService.java#L85-L86

Added lines #L85 - L86 were not covered by tests

logger.info("{} => local. handleCommand(). transportId:{}.", clusterKey, transportId);

List<Integer> supportCommandCodeList = getSupportCommandCodeListFromContext();
List<Integer> supportCommandCodeList = header.getSupportCommandCodeList();

Check warning on line 90 in realtime/realtime-collector/src/main/java/com/navercorp/pinpoint/realtime/collector/receiver/grpc/GrpcCommandService.java

View check run for this annotation

Codecov / codecov/patch

realtime/realtime-collector/src/main/java/com/navercorp/pinpoint/realtime/collector/receiver/grpc/GrpcCommandService.java#L90

Added line #L90 was not covered by tests
if (supportCommandCodeList != Header.SUPPORT_COMMAND_CODE_LIST_NOT_EXIST) {
logger.warn(
"handleCommand() not support included Header:{}. Connection will be disconnected.",
Expand Down Expand Up @@ -120,7 +124,7 @@ public void onNext(PCmdMessage value) {
List<Integer> supportCommandServiceKeyList =
value.getHandshakeMessage().getSupportCommandServiceKeyList();
GrpcAgentConnection conn =
buildAgentConnection(serverCallStreamObserver, supportCommandServiceKeyList);
buildAgentConnection(header, serverCallStreamObserver, supportCommandServiceKeyList);

Check warning on line 127 in realtime/realtime-collector/src/main/java/com/navercorp/pinpoint/realtime/collector/receiver/grpc/GrpcCommandService.java

View check run for this annotation

Codecov / codecov/patch

realtime/realtime-collector/src/main/java/com/navercorp/pinpoint/realtime/collector/receiver/grpc/GrpcCommandService.java#L127

Added line #L127 was not covered by tests

if (connRef.compareAndSet(null, conn)) {
GrpcCommandService.this.agentConnectionRepository.add(conn);
Expand All @@ -146,9 +150,12 @@ public void onCompleted() {

@Override
public StreamObserver<PCmdMessage> handleCommandV2(StreamObserver<PCmdRequest> requestObserver) {
Long transportId = getTransportIdFromContext();
ClusterKey clusterKey = getClusterKeyFromContext();
List<Integer> supportCommandCodeList = getSupportCommandCodeListFromContext();
final Context context = Context.current();
Long transportId = getTransportIdFromContext(context);

final Header header = ServerContext.getAgentInfo(context);
ClusterKey clusterKey = getClusterKeyFromContext(header);
List<Integer> supportCommandCodeList = header.getSupportCommandCodeList();
logger.info(
"{} => local. handleCommandV2(). transportId:{}, supportCommandCodeList{}",
clusterKey,
Expand All @@ -171,7 +178,7 @@ public StreamObserver<PCmdMessage> handleCommandV2(StreamObserver<PCmdRequest> r
(ServerCallStreamObserver<PCmdRequest>) requestObserver;

serverCallStreamObserver.setOnReadyHandler(() -> {
GrpcAgentConnection conn = buildAgentConnection(serverCallStreamObserver, supportCommandCodeList);
GrpcAgentConnection conn = buildAgentConnection(header, serverCallStreamObserver, supportCommandCodeList);
if (connRef.compareAndSet(null, conn)) {
logger.info("{} => local. ready() transportId:{}", clusterKey, transportId);
GrpcCommandService.this.agentConnectionRepository.add(conn);
Expand Down Expand Up @@ -215,13 +222,13 @@ public void onCompleted() {
};
}

private GrpcAgentConnection buildAgentConnection(
private GrpcAgentConnection buildAgentConnection(Header header,
ServerCallStreamObserver<PCmdRequest> requestObserver,
List<Integer> supportCommandServiceCodeList
) {
return new GrpcAgentConnection(
getRemoteAddressFromContext(),
getClusterKeyFromContext(),
getClusterKeyFromContext(header),
requestObserver,
supportCommandServiceCodeList
);
Expand Down Expand Up @@ -294,7 +301,10 @@ public StreamObserver<PCmdActiveThreadCountRes> commandStreamActiveThreadCount(S

private <T> void emitMono(T response, StreamObserver<Empty> responseObserver, Publisher<T> sink) {
if (sink == null) {
logger.warn("Could not find echo sink: clusterKey = {}", getClusterKeyFromContext());
if (logger.isWarnEnabled()) {
Header header = ServerContext.getAgentInfo();
logger.warn("Could not find echo sink: clusterKey = {}", getClusterKeyFromContext(header));

Check warning on line 306 in realtime/realtime-collector/src/main/java/com/navercorp/pinpoint/realtime/collector/receiver/grpc/GrpcCommandService.java

View check run for this annotation

Codecov / codecov/patch

realtime/realtime-collector/src/main/java/com/navercorp/pinpoint/realtime/collector/receiver/grpc/GrpcCommandService.java#L305-L306

Added lines #L305 - L306 were not covered by tests
}
responseObserver.onError(new StatusException(Status.NOT_FOUND));
return;
}
Expand All @@ -308,18 +318,12 @@ private InetSocketAddress getRemoteAddressFromContext() {
return transportMetadata.getRemoteAddress();
}

private ClusterKey getClusterKeyFromContext() {
Header header = ServerContext.getAgentInfo();
private ClusterKey getClusterKeyFromContext(Header header) {
return new ClusterKey(header.getApplicationName(), header.getAgentId(), header.getAgentStartTime());
}

private List<Integer> getSupportCommandCodeListFromContext() {
Header header = ServerContext.getAgentInfo();
return header.getSupportCommandCodeList();
}

private Long getTransportIdFromContext() {
TransportMetadata transportMetadata = ServerContext.getTransportMetadata();
private Long getTransportIdFromContext(Context context) {
TransportMetadata transportMetadata = ServerContext.getTransportMetadata(context);
return transportMetadata.getTransportId();
}

Expand Down

0 comments on commit 34e56a2

Please sign in to comment.