Skip to content

Commit 46af7da

Browse files
committed
Fixing serialization
1 parent da75db0 commit 46af7da

File tree

4 files changed

+447
-375
lines changed

4 files changed

+447
-375
lines changed

collector.c

Lines changed: 72 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
PGWS_DIMENSIONS_APPNAME))
4141
static volatile sig_atomic_t shutdown_requested = false;
4242

43-
int saved_profile_dimensions; //TODO should be initialized with the same value as GUC?
43+
int saved_profile_dimensions;
4444
int saved_history_dimensions;
4545

4646
static void handle_sigterm(SIGNAL_ARGS);
@@ -72,12 +72,15 @@ pgws_register_wait_collector(void)
7272
static void
7373
alloc_history(History *observations, int count)
7474
{
75-
observations->items = (HistoryItem *) palloc0(sizeof(HistoryItem) * count);
75+
int serialized_size;
76+
77+
saved_history_dimensions = pgws_history_dimensions;
78+
serialized_size = get_serialized_size(saved_history_dimensions, true);
79+
80+
observations->serialized_items = (char *) palloc0(serialized_size * count);
7681
observations->index = 0;
7782
observations->count = count;
7883
observations->wraparound = false;
79-
80-
saved_history_dimensions = pgws_history_dimensions;
8184
}
8285

8386
/*
@@ -86,13 +89,17 @@ alloc_history(History *observations, int count)
8689
static void
8790
realloc_history(History *observations, int count)
8891
{
89-
HistoryItem *newitems;
92+
char *newitems;
9093
int copyCount,
9194
i,
9295
j;
96+
int serialized_size;
97+
98+
//saved_history_dimensions = pgws_history_dimensions; // TODO вроде как
99+
serialized_size = get_serialized_size(saved_history_dimensions, true);
93100

94101
/* Allocate new array for history */
95-
newitems = (HistoryItem *) palloc0(sizeof(HistoryItem) * count);
102+
newitems = (char *) palloc0(serialized_size * count);
96103

97104
/* Copy entries from old array to the new */
98105
if (observations->wraparound)
@@ -111,19 +118,19 @@ realloc_history(History *observations, int count)
111118
{
112119
if (j >= observations->count)
113120
j = 0;
114-
memcpy(&newitems[i], &observations->items[j], sizeof(HistoryItem));
121+
memcpy((newitems + i * serialized_size),
122+
(observations->serialized_items + j * serialized_size),
123+
serialized_size);
115124
i++;
116125
j++;
117126
}
118127

119128
/* Switch to new history array */
120-
pfree(observations->items);
121-
observations->items = newitems;
129+
pfree(observations->serialized_items);
130+
observations->serialized_items = newitems;
122131
observations->index = copyCount;
123132
observations->count = count;
124133
observations->wraparound = false;
125-
126-
saved_history_dimensions = pgws_history_dimensions;
127134
}
128135

129136
static void
@@ -140,18 +147,19 @@ handle_sigterm(SIGNAL_ARGS)
140147
/*
141148
* Get next item of history with rotation.
142149
*/
143-
static HistoryItem *
150+
static char *
144151
get_next_observation(History *observations)
145152
{
146-
HistoryItem *result;
153+
char *result;
154+
int serialized_size = get_serialized_size(saved_history_dimensions, true);
147155

148156
/* Check for wraparound */
149157
if (observations->index >= observations->count)
150158
{
151159
observations->index = 0;
152160
observations->wraparound = true;
153161
}
154-
result = &observations->items[observations->index];
162+
result = &observations->serialized_items[observations->index * serialized_size];
155163
observations->index++;
156164
return result;
157165
}
@@ -413,8 +421,8 @@ serialize_item(SamplingDimensions dimensions, int dimensions_mask,
413421
}
414422

415423
/* copy all the fields without ts/count */
416-
*serialized_key = palloc0(*serialized_size + 1);
417-
strcpy(*serialized_key, dummy_array);
424+
*serialized_key = palloc0(*serialized_size);
425+
memcpy(*serialized_key, dummy_array, *serialized_size);
418426

419427
if (is_history)
420428
{
@@ -430,8 +438,8 @@ serialize_item(SamplingDimensions dimensions, int dimensions_mask,
430438
}
431439

432440
/* copy everything */
433-
*serialized_item = palloc0(*serialized_size + 1);
434-
strcpy(*serialized_item, dummy_array);
441+
*serialized_item = palloc0(*serialized_size);
442+
memcpy(*serialized_item, dummy_array, *serialized_size);
435443
}
436444

437445
void
@@ -570,17 +578,17 @@ probe_waits(History *observations, HTAB *profile_hash,
570578
LWLockAcquire(ProcArrayLock, LW_SHARED);
571579
for (i = 0; i < ProcGlobal->allProcCount; i++)
572580
{
573-
HistoryItem item_history,
574-
*observation;
575-
ProfileItem item_profile;
581+
//HistoryItem item_history,
582+
// *observation;
583+
//ProfileItem item_profile;
576584
PGPROC *proc = &ProcGlobal->allProcs[i];
577585
int pid;
578586
uint32 wait_event_info;
579587
SamplingDimensions common_dimensions,
580588
history_dimensions,
581589
profile_dimensions;
582-
int dimensions_mask_common = pgws_history_dimensions |
583-
pgws_profile_dimensions;
590+
int dimensions_mask_common = saved_history_dimensions |
591+
saved_profile_dimensions;
584592

585593
/* Check if we need to sample this process */
586594
if (!pgws_should_sample_proc(proc, &pid, &wait_event_info))
@@ -598,21 +606,27 @@ probe_waits(History *observations, HTAB *profile_hash,
598606

599607
copy_dimensions(&history_dimensions,
600608
&common_dimensions,
601-
pgws_history_dimensions);
609+
saved_history_dimensions);
602610
copy_dimensions(&profile_dimensions,
603611
&common_dimensions,
604-
pgws_profile_dimensions);
612+
saved_profile_dimensions);
605613

606-
item_history.ts = ts;
607-
item_history.dimensions = history_dimensions;
614+
//item_history.ts = ts;
615+
//item_history.dimensions = history_dimensions;
608616

609617
/* Write to the history if needed */
610618
if (write_history)
611619
{
612-
//TODO вот тут что-то сделать нужно??? потому что мы не запаковываем
613-
//историю
620+
char *serialized_key,
621+
*serialized_item,
622+
*observation;
623+
int serialized_size = 0;
624+
614625
observation = get_next_observation(observations);
615-
*observation = item_history;
626+
serialize_item(history_dimensions, saved_history_dimensions,
627+
&serialized_item, &serialized_key, &serialized_size,
628+
ts, (uint64) 0, true);
629+
memcpy(observation, serialized_item, serialized_size);
616630
}
617631

618632
/* Write to the profile if needed */
@@ -626,9 +640,9 @@ probe_waits(History *observations, HTAB *profile_hash,
626640
*stored_item;
627641

628642
if (!profile_pid)
629-
item_profile.dimensions.pid = 0;
643+
profile_dimensions.pid = 0;
630644

631-
serialize_item(item_profile.dimensions, saved_profile_dimensions,
645+
serialize_item(profile_dimensions, saved_profile_dimensions,
632646
&serialized_item, &serialized_key, &serialized_size,
633647
(TimestampTz) 0, count, false);
634648

@@ -659,8 +673,9 @@ probe_waits(History *observations, HTAB *profile_hash,
659673
* Send waits history to shared memory queue.
660674
*/
661675
static void
662-
send_history(History *observations, shm_mq_handle *mqh)
676+
send_history(History *observations, shm_mq_handle *mqh) //TODO TODO TODO
663677
{
678+
int serialized_size = get_serialized_size(saved_history_dimensions, true);
664679
Size count,
665680
i;
666681
shm_mq_result mq_result;
@@ -679,11 +694,20 @@ send_history(History *observations, shm_mq_handle *mqh)
679694
"receiver of message queue has been detached")));
680695
return;
681696
}
697+
/* Send saved_dimensions next */
698+
mq_result = shm_mq_send_compat(mqh, sizeof(saved_history_dimensions), &saved_history_dimensions, false, true);
699+
if (mq_result == SHM_MQ_DETACHED)
700+
{
701+
ereport(WARNING,
702+
(errmsg("pg_wait_sampling collector: "
703+
"receiver of message queue has been detached")));
704+
return;
705+
}
682706
for (i = 0; i < count; i++)
683707
{
684708
mq_result = shm_mq_send_compat(mqh,
685-
sizeof(HistoryItem),
686-
&observations->items[i],
709+
serialized_size,
710+
(observations->serialized_items + i * serialized_size),
687711
false,
688712
true);
689713
if (mq_result == SHM_MQ_DETACHED)
@@ -703,7 +727,8 @@ static void
703727
send_profile(HTAB *profile_hash, shm_mq_handle *mqh)
704728
{
705729
HASH_SEQ_STATUS scan_status;
706-
ProfileItem *item;
730+
char *serialized_item;
731+
int serialized_size = get_serialized_size(saved_profile_dimensions, true);
707732
Size count = hash_get_num_entries(profile_hash);
708733
shm_mq_result mq_result;
709734

@@ -716,10 +741,19 @@ send_profile(HTAB *profile_hash, shm_mq_handle *mqh)
716741
"receiver of message queue has been detached")));
717742
return;
718743
}
744+
/* Send saved_dimensions next */
745+
mq_result = shm_mq_send_compat(mqh, sizeof(saved_profile_dimensions), &saved_profile_dimensions, false, true);
746+
if (mq_result == SHM_MQ_DETACHED)
747+
{
748+
ereport(WARNING,
749+
(errmsg("pg_wait_sampling collector: "
750+
"receiver of message queue has been detached")));
751+
return;
752+
}
719753
hash_seq_init(&scan_status, profile_hash);
720-
while ((item = (ProfileItem *) hash_seq_search(&scan_status)) != NULL)
754+
while ((serialized_item = (char *) hash_seq_search(&scan_status)) != NULL)
721755
{
722-
mq_result = shm_mq_send_compat(mqh, sizeof(ProfileItem), item, false,
756+
mq_result = shm_mq_send_compat(mqh, serialized_size, serialized_item, false,
723757
true);
724758
if (mq_result == SHM_MQ_DETACHED)
725759
{

pg_wait_sampling--1.1--1.2.sql

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,26 @@
33
-- complain if script is sourced in psql, rather than via ALTER EXTENSION
44
\echo Use "ALTER EXTENSION pg_wait_sampling UPDATE TO 1.2" to load this file. \quit
55

6-
DROP FUNCTION pg_wait_sampling_get_current CASCADE;
7-
DROP FUNCTION pg_wait_sampling_get_profile CASCADE;
8-
DROP FUNCTION pg_wait_sampling_get_history CASCADE;
6+
--DROP FUNCTION pg_wait_sampling_get_current (
7+
-- pid int4,
8+
-- OUT pid int4,
9+
-- OUT event_type text,
10+
-- OUT event text
11+
--) CASCADE;
12+
--
13+
--DROP FUNCTION pg_wait_sampling_get_history (
14+
-- OUT pid int4,
15+
-- OUT ts timestamptz,
16+
-- OUT event_type text,
17+
-- OUT event text
18+
--) CASCADE;
19+
--
20+
--DROP FUNCTION pg_wait_sampling_get_profile (
21+
-- OUT pid int4,
22+
-- OUT event_type text,
23+
-- OUT event text,
24+
-- OUT count bigint
25+
--) CASCADE;
926

1027
CREATE FUNCTION pg_wait_sampling_get_current_extended (
1128
pid int4,
@@ -35,7 +52,6 @@ GRANT SELECT ON pg_wait_sampling_current TO PUBLIC;
3552

3653
CREATE FUNCTION pg_wait_sampling_get_history_extended (
3754
OUT pid int4,
38-
OUT ts timestamptz,
3955
OUT event_type text,
4056
OUT event text,
4157
OUT queryid int8,
@@ -48,7 +64,8 @@ CREATE FUNCTION pg_wait_sampling_get_history_extended (
4864
OUT proc_start timestamptz,
4965
OUT client_addr text,
5066
OUT client_hostname text,
51-
OUT appname text
67+
OUT appname text,
68+
OUT ts timestamptz
5269
)
5370
RETURNS SETOF record
5471
AS 'MODULE_PATHNAME'
@@ -85,9 +102,9 @@ CREATE VIEW pg_wait_sampling_profile_extended AS
85102

86103
GRANT SELECT ON pg_wait_sampling_profile_extended TO PUBLIC;
87104

88-
CREATE VIEW pg_wait_sampling_profile AS
89-
SELECT pid, event_type, event, queryid, SUM(count) FROM pg_wait_sampling_profile_extended
90-
GROUP BY pid, event_type, event, queryid;
91-
92-
GRANT SELECT ON pg_wait_sampling_profile TO PUBLIC;
105+
--CREATE VIEW pg_wait_sampling_profile AS
106+
-- SELECT pid, event_type, event, queryid, SUM(count) FROM pg_wait_sampling_profile_extended
107+
-- GROUP BY pid, event_type, event, queryid;
108+
--
109+
--GRANT SELECT ON pg_wait_sampling_profile TO PUBLIC;
93110

0 commit comments

Comments
 (0)