Skip to content

Commit a95c938

Browse files
Add interconnect teardown hook
and add ability to get UDP IC stats from anywhere outside udpifc module. Both features are to be used in monitoring extensions.
1 parent 9da8dec commit a95c938

File tree

5 files changed

+101
-51
lines changed

5 files changed

+101
-51
lines changed

gpcontrib/gp_interconnect_stats/gp_interconnect_stats.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
#include "postgres.h"
99

1010
#include "cdb/cdbvars.h"
11-
#include "cdb/ml_ipc.h"
11+
#include "cdb/ic_udpifc.h"
1212
#include "fmgr.h"
1313

1414
PG_MODULE_MAGIC;

src/backend/cdb/motion/ic_common.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include "cdb/ml_ipc.h"
2626
#include "cdb/cdbvars.h"
2727
#include "cdb/cdbdisp.h"
28+
#include "cdb/ic_udpifc.h"
2829

2930
#include <unistd.h>
3031
#include <arpa/inet.h>
@@ -58,6 +59,8 @@ int UDP_listenerFd;
5859
static interconnect_handle_t *open_interconnect_handles;
5960
static bool interconnect_resowner_callback_registered;
6061

62+
ic_teardown_hook_type ic_teardown_hook = NULL;
63+
6164
/*=========================================================================
6265
* FUNCTIONS PROTOTYPES
6366
*/
@@ -576,6 +579,9 @@ TeardownInterconnect(ChunkTransportState *transportStates, bool hasErrors)
576579
{
577580
interconnect_handle_t *h = find_interconnect_handle(transportStates);
578581

582+
if (ic_teardown_hook)
583+
ic_teardown_hook(transportStates, hasErrors);
584+
579585
if (Gp_interconnect_type == INTERCONNECT_TYPE_UDPIFC)
580586
{
581587
TeardownUDPIFCInterconnect(transportStates, hasErrors);

src/backend/cdb/motion/ic_udpifc.c

Lines changed: 10 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
#include "cdb/cdbdisp.h"
5454
#include "cdb/cdbdispatchresult.h"
5555
#include "cdb/cdbicudpfaultinjection.h"
56+
#include "cdb/ic_udpifc.h"
5657

5758
#include <fcntl.h>
5859
#include <limits.h>
@@ -586,52 +587,6 @@ typedef struct AckSendParam
586587
socklen_t peer_len;
587588
} AckSendParam;
588589

589-
/*
590-
* ICStatistics
591-
*
592-
* A structure keeping various statistics about interconnect internal.
593-
*
594-
* Note that the statistics for ic are not accurate for multiple cursor case on QD.
595-
*
596-
* totalRecvQueueSize - receive queue size sum when main thread is trying to get a packet.
597-
* recvQueueSizeCountingTime - counting times when computing totalRecvQueueSize.
598-
* totalCapacity - the capacity sum when packets are tried to be sent.
599-
* capacityCountingTime - counting times used to compute totalCapacity.
600-
* totalBuffers - total buffers available when sending packets.
601-
* bufferCountingTime - counting times when compute totalBuffers.
602-
* activeConnectionsNum - the number of active connections.
603-
* retransmits - the number of packet retransmits.
604-
* mismatchNum - the number of mismatched packets received.
605-
* crcErrors - the number of crc errors.
606-
* sndPktNum - the number of packets sent by sender.
607-
* recvPktNum - the number of packets received by receiver.
608-
* disorderedPktNum - disordered packet number.
609-
* duplicatedPktNum - duplicate packet number.
610-
* recvAckNum - the number of Acks received.
611-
* statusQueryMsgNum - the number of status query messages sent.
612-
*
613-
*/
614-
typedef struct ICStatistics
615-
{
616-
uint64 totalRecvQueueSize;
617-
uint64 recvQueueSizeCountingTime;
618-
uint64 totalCapacity;
619-
uint64 capacityCountingTime;
620-
uint64 totalBuffers;
621-
uint64 bufferCountingTime;
622-
uint32 activeConnectionsNum;
623-
int32 retransmits;
624-
int32 startupCachedPktNum;
625-
int32 mismatchNum;
626-
int32 crcErrors;
627-
int32 sndPktNum;
628-
int32 recvPktNum;
629-
int32 disorderedPktNum;
630-
int32 duplicatedPktNum;
631-
int32 recvAckNum;
632-
int32 statusQueryMsgNum;
633-
} ICStatistics;
634-
635590
/* Statistics for UDP interconnect. */
636591
static ICStatistics ic_statistics;
637592

@@ -7125,6 +7080,15 @@ getActiveMotionConns(void)
71257080
return ic_statistics.activeConnectionsNum;
71267081
}
71277082

7083+
ICStatistics
7084+
UDPIFCGetICStats(void)
7085+
{
7086+
pthread_mutex_lock(&ic_control_info.lock);
7087+
ICStatistics stats = ic_statistics;
7088+
pthread_mutex_unlock(&ic_control_info.lock);
7089+
return stats;
7090+
}
7091+
71287092
Datum
71297093
GpInterconnectGetStatsUDPIFC(PG_FUNCTION_ARGS)
71307094
{

src/include/cdb/ic_udpifc.h

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*-------------------------------------------------------------------------
2+
* ic_udpifc.h
3+
* Motion Layer IPC Layer.
4+
*
5+
* Portions Copyright (c) 2005-2008, Greenplum inc
6+
* Portions Copyright (c) 2012-2025 Pivotal Software, Inc.
7+
* Portions Copyright (c) 2025- Present open-gpdb
8+
*
9+
*
10+
* IDENTIFICATION
11+
* src/include/cdb/ic_udpifc.h
12+
*
13+
*-------------------------------------------------------------------------
14+
*/
15+
#ifndef IC_UDPIFC_H
16+
#define IC_UDPIFC_H
17+
18+
#include "postgres.h"
19+
#include "fmgr.h"
20+
21+
22+
#include <unistd.h>
23+
24+
/*
25+
* ICStatistics
26+
*
27+
* A structure keeping various statistics about interconnect internal.
28+
*
29+
* Note that the statistics for ic are not accurate for multiple cursor case on QD.
30+
*
31+
* totalRecvQueueSize - receive queue size sum when main thread is trying to get a packet.
32+
* recvQueueSizeCountingTime - counting times when computing totalRecvQueueSize.
33+
* totalCapacity - the capacity sum when packets are tried to be sent.
34+
* capacityCountingTime - counting times used to compute totalCapacity.
35+
* totalBuffers - total buffers available when sending packets.
36+
* bufferCountingTime - counting times when compute totalBuffers.
37+
* activeConnectionsNum - the number of active connections.
38+
* retransmits - the number of packet retransmits.
39+
* mismatchNum - the number of mismatched packets received.
40+
* crcErrors - the number of crc errors.
41+
* sndPktNum - the number of packets sent by sender.
42+
* recvPktNum - the number of packets received by receiver.
43+
* disorderedPktNum - disordered packet number.
44+
* duplicatedPktNum - duplicate packet number.
45+
* recvAckNum - the number of Acks received.
46+
* statusQueryMsgNum - the number of status query messages sent.
47+
*
48+
*/
49+
typedef struct ICStatistics
50+
{
51+
uint64 totalRecvQueueSize;
52+
uint64 recvQueueSizeCountingTime;
53+
uint64 totalCapacity;
54+
uint64 capacityCountingTime;
55+
uint64 totalBuffers;
56+
uint64 bufferCountingTime;
57+
uint32 activeConnectionsNum;
58+
int32 retransmits;
59+
int32 startupCachedPktNum;
60+
int32 mismatchNum;
61+
int32 crcErrors;
62+
int32 sndPktNum;
63+
int32 recvPktNum;
64+
int32 disorderedPktNum;
65+
int32 duplicatedPktNum;
66+
int32 recvAckNum;
67+
int32 statusQueryMsgNum;
68+
} ICStatistics;
69+
70+
extern void InterconnectShmemInitUDPIFC(void);
71+
extern Size InterconnectShmemSizeUDPIFC(void);
72+
extern void WaitInterconnectQuitUDPIFC(void);
73+
74+
/* Get interconnect statistics local to this slice */
75+
extern ICStatistics UDPIFCGetICStats(void);
76+
77+
/* Get global cummulative IC stats */
78+
extern Datum GpInterconnectGetStatsUDPIFC(PG_FUNCTION_ARGS);
79+
80+
#endif

src/include/cdb/ml_ipc.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -307,16 +307,12 @@ extern ChunkTransportStateEntry *removeChunkTransportState(ChunkTransportState *
307307
extern TupleChunkListItem RecvTupleChunk(MotionConn *conn, ChunkTransportState *transportStates);
308308

309309
extern Size InterconnectShmemSize(void);
310-
extern Size InterconnectShmemSizeUDPIFC(void);
311310
extern void InterconnectShmemInit(void);
312-
extern void InterconnectShmemInitUDPIFC(void);
313-
extern Datum GpInterconnectGetStatsUDPIFC(PG_FUNCTION_ARGS);
314311
extern void InitMotionTCP(int *listenerSocketFd, uint16 *listenerPort);
315312
extern void InitMotionUDPIFC(int *listenerSocketFd, uint16 *listenerPort);
316313
extern void markUDPConnInactiveIFC(MotionConn *conn);
317314
extern void CleanupMotionTCP(void);
318315
extern void CleanupMotionUDPIFC(void);
319-
extern void WaitInterconnectQuitUDPIFC(void);
320316
extern void SetupTCPInterconnect(EState *estate);
321317
extern void SetupUDPIFCInterconnect(EState *estate);
322318
extern void TeardownTCPInterconnect(ChunkTransportState *transportStates,
@@ -328,4 +324,8 @@ extern uint32 getActiveMotionConns(void);
328324

329325
extern char *format_sockaddr(struct sockaddr_storage *sa, char *buf, size_t len);
330326

327+
#define IC_TEARDOWN_HOOK
328+
typedef void (*ic_teardown_hook_type)(ChunkTransportState *, bool);
329+
extern PGDLLIMPORT ic_teardown_hook_type ic_teardown_hook;
330+
331331
#endif /* ML_IPC_H */

0 commit comments

Comments
 (0)