Skip to content

Commit 6cfb407

Browse files
committed
allow parallel foreign scan.
For postgres >= 10 we now allow a parallel foreign scan. Number of used workers is currently chosen based on the `num_partitions` table option. We might make smarter decisions here in the future. Each worker now creates a todo list in festate->scan_data which should be the same for every worker. festate->scan_data_desc then points to a shared memory segement from which every worker picks and increments the next_scanp work item number.
1 parent 1ee40d5 commit 6cfb407

File tree

10 files changed

+572
-233
lines changed

10 files changed

+572
-233
lines changed

Makefile

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
EXTENSION = kafka_fdw
33
EXTVERSION = $(shell grep default_version $(EXTENSION).control | sed -e "s/default_version[[:space:]]*=[[:space:]]*'\([^']*\)'/\1/")
44

5-
DATA = $(wildcard *--*.sql)
5+
DATA = $(filter-out $(EXTENSION)--$(EXTVERSION).sql, $(wildcard *--*.sql)) $(EXTENSION)--$(EXTVERSION).sql
66
# DOCS = $(wildcard doc/*.md)
77
TESTS = $(wildcard test/sql/*.sql)
88
REGRESS ?= $(patsubst test/sql/%.sql,%,$(TESTS))
@@ -24,6 +24,15 @@ endif
2424
PGXS := $(shell $(PG_CONFIG) --pgxs)
2525
include $(PGXS)
2626

27+
ifeq ($(shell test $(VERSION_NUM) -lt 100000; echo $$?),0)
28+
REGRESS := $(filter-out parallel, $(REGRESS))
29+
endif
30+
31+
ifeq ($(shell test $(VERSION_NUM) -ge 90600; echo $$?),0)
32+
PGOPTIONS+= "--max_parallel_workers_per_gather=0"
33+
endif
34+
35+
2736
PLATFORM = $(shell uname -s)
2837

2938
ifeq ($(PLATFORM),Darwin)
@@ -38,14 +47,16 @@ ifdef TEST
3847
REGRESS = $(TEST)
3948
endif
4049

50+
4151
all: $(EXTENSION)--$(EXTVERSION).sql
4252

4353
$(EXTENSION)--$(EXTVERSION).sql: sql/$(EXTENSION).sql
4454
cp $< $@
4555

56+
installcheck: submake $(REGRESS_PREP)
57+
PGOPTIONS=$(PGOPTIONS) $(pg_regress_installcheck) $(REGRESS_OPTS) $(REGRESS)
4658

4759
prep_kafka:
4860
./test/init_kafka.sh
4961

50-
5162
.PHONY: prep_kafka

kafka_fdw--0.0.1.sql

Lines changed: 0 additions & 22 deletions
This file was deleted.

src/kafka_expr.c

Lines changed: 114 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
/*
2+
TODO consider and simplyfy
3+
SELECT * FROM kafka_test_part where (part between 1 and 2 and offs > 100) OR (part between 1 and 2 and offs > 200);
4+
SELECT * FROM kafka_test_part where (part between 3 and 2 and offs > 100) ;
5+
*/
6+
17
#include "kafka_fdw.h"
28

39
#include "catalog/pg_operator.h"
@@ -27,7 +33,7 @@ enum low_high
2733
HIGH
2834
};
2935

30-
static List *append_scan_p(List *list, KafkaScanP *scan_p, int64 batch_size);
36+
static void append_scan_p(KafkaScanPData *scand, KafkaScanP scan_p, int64 batch_size);
3137

3238
static KafkaScanOp *applyOperator(OpExpr *oper, int partition_attnum, int offset_attnum);
3339
static KafkaScanOp *getKafkaScanOp(kafka_op op, scanfield field, Node *val_p);
@@ -253,6 +259,11 @@ get_offset(List * param_id_list,
253259
return initial;
254260
}
255261

262+
/*
263+
* get the high/low partition number for given parameters and operatoer
264+
* param_id_list is an inter list of parameter ids (aka the column number)
265+
* param_op_list is the to be applied operater (typically > < =)
266+
*/
256267
static int32
257268
get_partition(List * param_id_list,
258269
List * param_op_list,
@@ -268,6 +279,11 @@ get_partition(List * param_id_list,
268279
kafka_op op;
269280
*isnull = false;
270281

282+
/*
283+
* loop through both lists find column and operator and evaluate
284+
* the parameter as needed
285+
* given low_high we return min/max
286+
*/
271287
forboth(lc_id, param_id_list, lc_op, param_op_list)
272288
{
273289
id = lfirst_int(lc_id);
@@ -300,7 +316,7 @@ get_partition(List * param_id_list,
300316

301317
if (val >= INT32_MAX)
302318
ereport(ERROR, (errcode(ERRCODE_FDW_ERROR), errmsg("partition number out of range")));
303-
319+
/* if we are looking for the low water mark we need max otherwisee min*/
304320
if (low_high == LOW)
305321
initial = max_int64(initial, val);
306322
else
@@ -318,24 +334,25 @@ get_partition(List * param_id_list,
318334
we go through each element of that list and check if we need to expand
319335
and existing item in the result list or append to it
320336
*/
321-
List *
337+
void
322338
KafkaFlattenScanlist(List * scan_list,
323339
KafKaPartitionList *partition_list,
324340
int64 batch_size,
325341
KafkaParamValue * param_values,
326-
int num_params)
342+
int num_params,
343+
KafkaScanPData * scanp_data)
327344
{
328-
ListCell * lc;
329-
List * scan_op_list;
330-
KafkaScanP *scan_p;
331-
List * result = NIL;
332-
int32 p = 0;
333-
int32 lowest_p, highest_p;
345+
ListCell *lc;
346+
List * scan_op_list;
347+
int32 p = 0;
348+
int32 lowest_p, highest_p;
334349

335350
qsort(partition_list->partitions, partition_list->partition_cnt, sizeof(int32), cmpfunc);
336351
lowest_p = partition_list->partitions[0];
337352
highest_p = partition_list->partitions[partition_list->partition_cnt - 1];
338353

354+
DEBUGLOG("PARTITION LIST LOW %d, HIGH %d, LENGTH %d", lowest_p, highest_p, partition_list->partition_cnt);
355+
339356
foreach (lc, scan_list)
340357
{
341358
int32 pl, ph;
@@ -351,103 +368,128 @@ KafkaFlattenScanlist(List * scan_list,
351368
oh = ScanopListGetOh(scan_op_list);
352369
ph_infinite = ScanopListGetPhInvinite(scan_op_list);
353370
oh_infinite = ScanopListGetOhInvinite(scan_op_list);
371+
ph = ph_infinite ? highest_p : ph;
372+
oh = oh_infinite ? PG_INT64_MAX : oh;
354373

355-
pl = get_partition(list_nth(scan_op_list, PartitionParamId),
356-
list_nth(scan_op_list, PartitionParamOP),
357-
param_values,
358-
pl,
359-
num_params,
360-
LOW,
361-
&isnull);
362-
if (isnull)
363-
continue;
364-
365-
ph = get_partition(list_nth(scan_op_list, PartitionParamId),
366-
list_nth(scan_op_list, PartitionParamOP),
367-
param_values,
368-
ph_infinite ? highest_p : ph,
369-
num_params,
370-
HIGH,
371-
&isnull);
372-
if (isnull)
373-
continue;
374-
375-
ol = get_offset(list_nth(scan_op_list, OffsetParamId),
376-
list_nth(scan_op_list, OffsetParamOP),
377-
param_values,
378-
ol,
379-
num_params,
380-
LOW,
381-
&isnull);
382-
383-
if (isnull)
384-
continue;
385-
386-
oh = get_offset(list_nth(scan_op_list, OffsetParamId),
387-
list_nth(scan_op_list, OffsetParamOP),
388-
param_values,
389-
oh_infinite ? PG_INT64_MAX : oh,
390-
num_params,
391-
HIGH,
392-
&isnull);
393-
394-
if (isnull)
395-
continue;
374+
/* if we have any params evaluate them */
375+
if (num_params > 0)
376+
{
377+
pl = get_partition(list_nth(scan_op_list, PartitionParamId),
378+
list_nth(scan_op_list, PartitionParamOP),
379+
param_values,
380+
pl,
381+
num_params,
382+
LOW,
383+
&isnull);
384+
if (isnull)
385+
continue;
386+
387+
if (num_params > 0)
388+
ph = get_partition(list_nth(scan_op_list, PartitionParamId),
389+
list_nth(scan_op_list, PartitionParamOP),
390+
param_values,
391+
ph_infinite ? highest_p : ph,
392+
num_params,
393+
HIGH,
394+
&isnull);
395+
if (isnull)
396+
continue;
397+
398+
ol = get_offset(list_nth(scan_op_list, OffsetParamId),
399+
list_nth(scan_op_list, OffsetParamOP),
400+
param_values,
401+
ol,
402+
num_params,
403+
LOW,
404+
&isnull);
405+
406+
if (isnull)
407+
continue;
408+
409+
oh = get_offset(list_nth(scan_op_list, OffsetParamId),
410+
list_nth(scan_op_list, OffsetParamOP),
411+
param_values,
412+
oh_infinite ? PG_INT64_MAX : oh,
413+
num_params,
414+
HIGH,
415+
&isnull);
416+
417+
if (isnull)
418+
continue;
419+
}
396420

421+
/* if what we got makes any sense we create a KafkaScanP item per partition needed */
397422
if (pl <= ph && ol <= oh)
398423
{
399424
for (p = max_int32(pl, lowest_p); p <= ph; p++)
400425
{
401426
if (partion_member(partition_list, p))
402427
{
403-
scan_p = palloc(sizeof(KafkaScanP));
404-
scan_p->partition = p;
405-
scan_p->offset = ol;
406-
scan_p->offset_lim = oh == PG_INT64_MAX ? -1 : oh;
407-
result = append_scan_p(result, scan_p, batch_size);
428+
KafkaScanP scan_p = { .partition = p, .offset = ol, .offset_lim = (oh == PG_INT64_MAX ? -1 : oh) };
429+
append_scan_p(scanp_data, scan_p, batch_size);
408430
}
409431
}
410432
}
411433
}
434+
}
412435

413-
return result;
436+
static void
437+
appendKafkaScanPData(KafkaScanPData *scanp_data, KafkaScanP scan_p)
438+
{
439+
int newlen;
440+
DEBUGLOG("%s ", __func__);
441+
442+
/* enlarge if needed */
443+
if (scanp_data->len + 1 >= scanp_data->max_len)
444+
{
445+
newlen = 2 * scanp_data->max_len;
446+
scanp_data->data = (KafkaScanP *) repalloc(scanp_data->data, sizeof(KafkaScanP) * newlen);
447+
scanp_data->max_len = newlen;
448+
}
449+
scanp_data->data[scanp_data->len].partition = scan_p.partition;
450+
scanp_data->data[scanp_data->len].offset = scan_p.offset;
451+
scanp_data->data[scanp_data->len].offset_lim = scan_p.offset_lim;
452+
++scanp_data->len;
414453
}
415454

416455
/*
417456
* takes a list of KafkaScanP and
418457
* either expands an existing element with scan_p in case of overlap
419458
* or appends scan_p to the list
420459
*/
421-
static List *
422-
append_scan_p(List *list, KafkaScanP *scan_p, int64 batch_size)
460+
static void
461+
append_scan_p(KafkaScanPData *scanp_data, KafkaScanP scan_p, int64 batch_size)
423462
{
424-
ListCell * lc;
425463
KafkaScanP *cur_scan_p;
464+
int i = 0;
426465

427-
if (list == NIL)
428-
return lappend(list, scan_p);
466+
if (scanp_data->len == 0)
467+
{
468+
appendKafkaScanPData(scanp_data, scan_p);
469+
return;
470+
}
429471

430-
foreach (lc, list)
472+
for (i = 0; i < scanp_data->len; i++)
431473
{
432-
cur_scan_p = (KafkaScanP *) lfirst(lc);
433-
if (cur_scan_p->partition == scan_p->partition)
474+
cur_scan_p = &scanp_data->data[i];
475+
if (cur_scan_p->partition == scan_p.partition)
434476
{
435477
// expand if overlap
436478
// [a, b] overlaps with [x, y] if a <= y and x <= b.
437-
if ((cur_scan_p->offset <= scan_p->offset_lim + batch_size || scan_p->offset_lim == -1) &&
438-
(scan_p->offset <= cur_scan_p->offset_lim + batch_size || cur_scan_p->offset_lim == -1))
479+
if ((cur_scan_p->offset <= scan_p.offset_lim + batch_size || scan_p.offset_lim == -1) &&
480+
(scan_p.offset <= cur_scan_p->offset_lim + batch_size || cur_scan_p->offset_lim == -1))
439481
{
440-
cur_scan_p->offset = min_int64(cur_scan_p->offset, scan_p->offset);
441-
cur_scan_p->offset_lim = cur_scan_p->offset_lim == -1 || scan_p->offset_lim == -1
482+
cur_scan_p->offset = min_int64(cur_scan_p->offset, scan_p.offset);
483+
cur_scan_p->offset_lim = cur_scan_p->offset_lim == -1 || scan_p.offset_lim == -1
442484
? -1
443-
: max_int64(cur_scan_p->offset_lim, scan_p->offset_lim);
444-
return list;
485+
: max_int64(cur_scan_p->offset_lim, scan_p.offset_lim);
486+
return;
445487
}
446488
}
447489
}
448490

449491
// we did not return in the expand case thus append
450-
return lappend(list, scan_p);
492+
appendKafkaScanPData(scanp_data, scan_p);
451493
}
452494

453495
bool
@@ -732,12 +774,10 @@ applyKafkaScanOpList(List *a, List *b)
732774

733775
if (a == NIL)
734776
{
735-
DEBUGLOG("a NIL");
736777
return b;
737778
}
738779
if (b == NIL)
739780
{
740-
DEBUGLOG("b NIL");
741781
return a;
742782
}
743783

0 commit comments

Comments
 (0)