Skip to content

Commit

Permalink
Replace token bucket based throttling with feedback based throttling (#…
Browse files Browse the repository at this point in the history
…591)

* Replace token bucket based throttling with rate based throttling.

Signed-off-by: Greg Becker <[email protected]>

* Document fields of struct throttle_tls...

Signed-off-by: Greg Becker <[email protected]>

* Rearrange code to appease reviewers.

Signed-off-by: Greg Becker <[email protected]>

* Provide hse_timer_cb_unregister() to undo the effects of hse_timer_cb_register().

Signed-off-by: Greg Becker <[email protected]>

---------

Signed-off-by: Greg Becker <[email protected]>
  • Loading branch information
beckerg authored Feb 17, 2023
1 parent 22b19f2 commit 6a109e8
Show file tree
Hide file tree
Showing 18 changed files with 680 additions and 352 deletions.
1 change: 1 addition & 0 deletions include/hse/kvdb_perfc.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ enum kvdb_perfc_sidx_kvdb_metrics {
};

enum kvdb_perfc_sidx_api_throttle {
PERFC_BA_THSR_KVDB,
PERFC_BA_THSR_CNROOT,
PERFC_BA_THSR_C0SK,
PERFC_BA_THSR_WAL,
Expand Down
9 changes: 8 additions & 1 deletion lib/c0/c0_ingest_work.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
* [HSE_REVISIT]
*/
struct c0_ingest_work {
struct work_struct c0iw_work;
struct throttle_sensor *c0iw_thr_sensor;
struct throttle_tls *c0iw_thr_tls;
struct c0sk *c0iw_c0sk;
struct element_source *c0iw_kvms_sourcev[HSE_C0_INGEST_WIDTH_MAX];
struct c0_kvset_iterator c0iw_kvms_iterv[HSE_C0_INGEST_WIDTH_MAX];
Expand All @@ -64,11 +65,17 @@ struct c0_ingest_work {
uint64_t t0, t3, t4, t5, t6, t7, t8, t9, t10;
uint64_t gencur, gen;

uint64_t c0iw_kbytes;
uint64_t c0iw_vbytes;
uint64_t c0iw_mask;

/* Establishing view for ingest */
uint64_t c0iw_ingest_max_seqno;
uint64_t c0iw_ingest_min_seqno;
uint64_t c0iw_ingest_order;

struct work_struct c0iw_work;

/* c0iw_magic is last field to verify it didn't get clobbered
* by c0kvs_reset().
*/
Expand Down
5 changes: 4 additions & 1 deletion lib/c0/c0sk.c
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,10 @@ c0sk_open(

stashp = HSE_LIKELY(atomic_read(&c0sk->c0sk_replaying) == 0) ? &c0sk->c0sk_stash : NULL;

err = c0kvms_create(c0sk->c0sk_ingest_width, c0sk->c0sk_kvdb_seq, stashp, &c0kvms);
/* Start with the minimum width to elicit a c0 spill as soon as possible, thereby
* allowing the throttle put-rate limit to be quickly determined.
*/
err = c0kvms_create(HSE_C0_INGEST_WIDTH_MIN, c0sk->c0sk_kvdb_seq, stashp, &c0kvms);
if (err)
goto errout;

Expand Down
137 changes: 86 additions & 51 deletions lib/c0/c0sk_internal.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,73 +46,38 @@
static void
c0sk_adjust_throttling(struct c0sk_impl *self, int amt)
{
const uint sensorv[] = { 0, 0, 300, 700, 900, 1000, 1100, 1300, 1500, 1700 };
const struct kvdb_rparams *rp = self->c0sk_kvdb_rp;
uint finlat, new;
uint tfill = 0;
int cnt;
static const uint sensorv[] = { 100, 100, 300, 600, 925, 1000, 1100, 1200, 1300, 1400, 1500 };
int cnt, sval;

if (!self->c0sk_sensor)
return;

finlat = self->c0sk_ingest_finlat;
cnt = self->c0sk_kvmultisets_cnt;

/* Throttle heavily until the first ingest completes.
*/
if (finlat == UINT_MAX && cnt > 0) {
new = 1000 + ((cnt / 2) + 1) * 100;
throttle_sensor_set(self->c0sk_sensor, new);
return;
}

/* Use the ingest finish latency (i.e., the running average time it takes
* to process and write k/v tuples to media) to adjust the hwm to try and
* maintain a max backlog of between 2.9 and 5.2 c0kvms (based upon the
* default value of throttle_c0_hi_th=3.5).
*/

/* If (amt > 0) it means a new ingest was enqueued, so the time taken
* to fill the kvms buffer is now minus the last time we did this...
*/
if (amt > 0) {
tfill = (jclock_ns - self->c0sk_ingest_ctime) / 1000000;
self->c0sk_ingest_ctime = jclock_ns;
}

/* Adjust the throttle depending upon the kvms backlog and whether we
* are adding to or removing from the backlog. The backlog of inflight
* ingests is always (cnt - 1), where "cnt" is used to select throttle
* sensor. Use the ingest finish latency (i.e., the running average
* time it takes to process and write k/v tuples to media) and the
* fill rate to increase or decrease the sensor value.
/* Wait until the first ingest completes.
*/
if ((amt < 0 && (cnt + amt) < 2) || cnt < 1) {
cnt = 0;
} else if (amt > 0 && (cnt + amt) == 2) {
if (tfill < finlat * 110 / 100 || finlat > 8000) {
if (tfill < finlat * 90 / 100) {
cnt = 3; /* (fill rate > ingest rate), high throttle */
} else {
cnt = 2; /* (fill rate == ingest rate), low throttle */
}
} else {
cnt = 0; /* (fill rate < ingest rate), disengage throttle */
}
} else {
cnt += amt; /* normal kvms count based throttling */

if (finlat > 8000 && cnt > rp->c0_ingest_threads)
cnt++;
}
if (self->c0sk_ingest_finlat == UINT_MAX)
return;

/* Use sensor trigger values from throttle.c, where values of 1000
* and above increase throttling, and values below 1000 decrease
* throttling faster inversely proportional to the value.
*/
new = (cnt < NELEM(sensorv)) ? sensorv[cnt] : 1800;
cnt = self->c0sk_kvmultisets_cnt + amt;
sval = (cnt < NELEM(sensorv)) ? sensorv[cnt] : 1800;

/* Add 50 when a new kvms is ready to spill, and subtract 50 once
* a kvms has been spilled.
*/
sval = sval + (amt * 50) + amt;
assert(sval >= 0 && sval <= 1851);

throttle_sensor_set(self->c0sk_sensor, new);
throttle_sensor_set(self->c0sk_sensor, sval);
}

static uint64_t
Expand Down Expand Up @@ -312,6 +277,7 @@ c0sk_cningest_cb(void *rock, struct bonsai_kv *bkv, struct bonsai_val *vlist)
struct cn *cn = c0sk->c0sk_cnv[skidx];
struct kvset_builder **kvbldrs = ingest->c0iw_bldrs;
struct kvset_builder *bldr = kvbldrs[skidx];
size_t klen, vlen;

assert(bkv);
assert(vlist);
Expand Down Expand Up @@ -339,7 +305,9 @@ c0sk_cningest_cb(void *rock, struct bonsai_kv *bkv, struct bonsai_val *vlist)

seqno_prev = UINT64_MAX;
pt_seqno_prev = UINT64_MAX;
key2kobj(&ko, bkv->bkv_key, key_imm_klen(&bkv->bkv_key_imm));
klen = key_imm_klen(&bkv->bkv_key_imm);
vlen = 0;
key2kobj(&ko, bkv->bkv_key, klen);

for (val = vlist; val; val = val->bv_priv) {
enum hse_seqno_state state HSE_MAYBE_UNUSED;
Expand All @@ -361,6 +329,8 @@ c0sk_cningest_cb(void *rock, struct bonsai_kv *bkv, struct bonsai_val *vlist)
else
seqno_prev = seqno;

vlen += bonsai_val_vlen(val);

err = kvset_builder_add_val(
bldr, &ko, val->bv_value, bonsai_val_ulen(val), seqno, bonsai_val_clen(val));

Expand All @@ -372,6 +342,50 @@ c0sk_cningest_cb(void *rock, struct bonsai_kv *bkv, struct bonsai_val *vlist)
if (ev(err))
return err;

ingest->c0iw_kbytes += klen;
ingest->c0iw_vbytes += vlen;

if (ingest->c0iw_kbytes + ingest->c0iw_vbytes > ingest->c0iw_mask) {
struct throttle_tls *tls = ingest->c0iw_thr_tls;

tls->bytes += klen + vlen;

if (tls->bytes > ingest->c0iw_mask) {
struct throttle_sensor *ts = ingest->c0iw_thr_sensor;
uint64_t resid = tls->bytes & ingest->c0iw_mask;
uint64_t gen;

tls->bytes -= resid;

gen = atomic_read_acq(ts->ts_cntrgenp);
if (gen > tls->cntrgen) {
atomic_ulong *cntrp;
uint64_t pct;

pct = (ingest->c0iw_kbytes * 100) / (ingest->c0iw_kbytes + ingest->c0iw_vbytes);
if (pct < 10) {
ingest->c0iw_mask = (32ul << 20) - 1;
} else {
tls->bytes += (tls->bytes * pct) / 100;
ingest->c0iw_mask = (16ul << 20) - 1;
}

/* Average out the byte count for each missed generation.
*/
if (gen - tls->cntrgen > 1)
tls->bytes /= (gen - tls->cntrgen);

cntrp = &ts->ts_cntrv[gen % NELEM(ts->ts_cntrv)];
atomic_add(cntrp, (tls->bytes << 20) | 1);

tls->bytes = 0;
tls->cntrgen = gen;
}

tls->bytes += resid;
}
}

return 0;
}

Expand Down Expand Up @@ -691,10 +705,28 @@ c0sk_ingest_worker(struct work_struct *work)

ingest->t6 = get_time_ns();

/* Initialize the per-thread throttle state so that c0sk_cningest_cb() (called
* repeatedly by bkv_collection_finish_pair()) can update the c0sk throttle
* sensor with the observed spill rate.
*/
if (c0sk->c0sk_sensor) {
ingest->c0iw_thr_sensor = c0sk->c0sk_sensor;
ingest->c0iw_thr_tls = &hse_throttle_tls;
ingest->c0iw_thr_tls->bytes = 0;
ingest->c0iw_thr_tls->cntrgen = atomic_read_acq(ingest->c0iw_thr_sensor->ts_cntrgenp);
ingest->c0iw_mask = (32ul << 20) - 1;
} else {
ingest->c0iw_mask = UINT64_MAX;
}

err = bkv_collection_finish_pair(cn_list[0], cn_list[1]);
if (ev(err))
goto health_err;

/* Prevent any other thread from accessing this thread's local storage via c0iw_thr_tls.
*/
ingest->c0iw_thr_tls = NULL;

ingest->t7 = get_time_ns();

for (i = 0; i < HSE_KVS_COUNT_MAX; ++i) {
Expand Down Expand Up @@ -1068,6 +1100,7 @@ c0sk_queue_ingest(struct c0sk_impl *self, struct c0_kvmultiset *old)
struct c0_kvmultiset *new;
void *_Atomic *stashp;
merr_t err;
uint width;

c0kvms_ingesting(old);

Expand All @@ -1092,7 +1125,9 @@ c0sk_queue_ingest(struct c0sk_impl *self, struct c0_kvmultiset *old)

stashp = HSE_LIKELY(atomic_read(&self->c0sk_replaying) == 0) ? &self->c0sk_stash : NULL;

err = c0kvms_create(self->c0sk_ingest_width, self->c0sk_kvdb_seq, stashp, &new);
width = atomic_read(&self->c0sk_ingest_width);

err = c0kvms_create(width, self->c0sk_kvdb_seq, stashp, &new);
if (!err) {
c0kvms_getref(new);

Expand Down
8 changes: 4 additions & 4 deletions lib/c0/c0sk_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,10 @@ struct c0sk_impl {
atomic_int c0sk_ingest_ldrcnt;
sem_t c0sk_sync_sema;

uint32_t c0sk_ingest_width HSE_L1D_ALIGNED;
int c0sk_boost;
char *c0sk_kvdb_alias;
void * _Atomic c0sk_stash;
atomic_uint c0sk_ingest_width HSE_L1D_ALIGNED;
int c0sk_boost;
char *c0sk_kvdb_alias;
void * _Atomic c0sk_stash;

struct {
atomic_int refcnt HSE_ACP_ALIGNED;
Expand Down
1 change: 0 additions & 1 deletion lib/cn/cn.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@

#define ENDPOINT_FMT_CN_TREE "/kvdbs/%s/kvs/%s/cn/tree"

struct tbkt;
struct mclass_policy;

static struct kmem_cache *cn_cursor_cache;
Expand Down
2 changes: 1 addition & 1 deletion lib/cn/csched_sp3.c
Original file line number Diff line number Diff line change
Expand Up @@ -899,7 +899,7 @@ sp3_dirty_node_locked(struct sp3 *sp, struct cn_tree_node *tn)
sp3_node_unlink(sp, spn);
sp3_node_insert(sp, spn, wtype_garbage, weight);
ev_debug(1);
} else if (garbage > 0) {
} else if (garbage > 0 && nkvsets > 1) {
weight = ((uint64_t)garbage << 32) | (cn_ns_alen(ns) >> 20);

sp3_node_insert(sp, spn, wtype_garbage, weight);
Expand Down
2 changes: 1 addition & 1 deletion lib/cn/csched_sp3_work.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
*/
#define SP3_RSPILL_RUNLEN_MIN (1u) /* root spill requires at least 1 kvset */
#define SP3_RSPILL_RUNLEN_MAX (UINT8_MAX)
#define SP3_RSPILL_RUNLEN_MIN_DEFAULT (7u)
#define SP3_RSPILL_RUNLEN_MIN_DEFAULT (5u)
#define SP3_RSPILL_RUNLEN_MAX_DEFAULT (9u)

#define SP3_RSPILL_WLEN_MIN (0u)
Expand Down
2 changes: 0 additions & 2 deletions lib/cn/kblock_builder.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@
#include "wbt_builder.h"
#include "wbt_reader.h"

extern struct tbkt sp3_tbkt;

/* The max number of keys in a struct hash_set_part is sized such that the allocation
* size of a hash_set_part is one page less than VLB_ALLOCSZ_MAX so that vlb_free()
* can cache it. This eliminates many trips into the kernel that would otherwise
Expand Down
6 changes: 3 additions & 3 deletions lib/include/hse/ikvdb/kvdb_rparams.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,11 @@ struct kvdb_rparams {
uint8_t dur_mclass;

uint64_t throttle_update_ns;
uint throttle_init_policy; /* [HSE_REVISIT]: Make this a fixed width type */
uint64_t throttle_rate_limit;
uint64_t throttle_rate_fastmedia;
uint32_t throttle_debug;
uint32_t throttle_debug_intvl_s;
uint64_t throttle_burst;
uint64_t throttle_rate;
uint throttle_init_policy; /* [HSE_REVISIT]: Make this a fixed width type */

/* The following fields are typically only accessed by kvdb open
* and hence are extremely cold.
Expand Down
Loading

0 comments on commit 6a109e8

Please sign in to comment.