Skip to content

Commit

Permalink
Merge pull request redpanda-data#6663 from VladLazar/fix-si-mode-sele…
Browse files Browse the repository at this point in the history
…ction

Fix shadow indexing mode selection
  • Loading branch information
Vlad Lazar authored Oct 7, 2022
2 parents 731e2f4 + 55fd2bf commit 31ed4f8
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 9 deletions.
27 changes: 23 additions & 4 deletions src/v/kafka/server/handlers/topics/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,32 @@ get_bool_value(const config_map_t& config, std::string_view key) {

static std::optional<model::shadow_indexing_mode>
get_shadow_indexing_mode(const config_map_t& config) {
std::optional<model::shadow_indexing_mode> mode;
auto arch_enabled = get_bool_value(config, topic_property_remote_write);
if (arch_enabled && *arch_enabled) {
auto si_enabled = get_bool_value(config, topic_property_remote_read);

// If the topic creation does not explicitly specify a shadow indexing mode
// we should use the default shadow indexing mode.
if (!arch_enabled && !si_enabled) {
return std::nullopt;
}

// If one of the topic properties is missing, patch it with the cluster
// config.
if (!arch_enabled) {
arch_enabled
= config::shard_local_cfg().cloud_storage_enable_remote_write();
}

if (!si_enabled) {
si_enabled
= config::shard_local_cfg().cloud_storage_enable_remote_read();
}

model::shadow_indexing_mode mode = model::shadow_indexing_mode::disabled;
if (*arch_enabled) {
mode = model::shadow_indexing_mode::archival;
}
auto si_enabled = get_bool_value(config, topic_property_remote_read);
if (si_enabled && *si_enabled) {
if (*si_enabled) {
mode = mode == model::shadow_indexing_mode::archival
? model::shadow_indexing_mode::full
: model::shadow_indexing_mode::fetch;
Expand Down
119 changes: 114 additions & 5 deletions tests/rptest/tests/topic_creation_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from enum import Enum
import random
import string
import itertools
from time import sleep
from rptest.clients.default import DefaultClient
from rptest.services.admin import Admin
Expand Down Expand Up @@ -156,6 +157,11 @@ def topic_autocreate_test(self):
assert manual_topic_spec == auto_topic_spec


def topic_name():
return "test-topic-" + "".join(
random.choice(string.ascii_lowercase) for _ in range(16))


class CreateTopicsTest(RedpandaTest):

#TODO: add shadow indexing properties:
Expand Down Expand Up @@ -190,16 +196,12 @@ def __init__(self, test_context):
num_brokers=3,
si_settings=si_settings)

def _topic_name(self):
return "test-topic-" + "".join(
random.choice(string.ascii_lowercase) for _ in range(16))

@cluster(num_nodes=3)
def test_create_topic_with_single_configuration_property(self):
rpk = RpkTool(self.redpanda)

for p, generator in CreateTopicsTest._topic_properties.items():
name = self._topic_name()
name = topic_name()
partitions = random.randint(1, 10)
property_value = generator()
rpk.create_topic(topic=name,
Expand All @@ -211,6 +213,113 @@ def test_create_topic_with_single_configuration_property(self):
assert str(cfgs[p][0]) == str(property_value)


class CreateSITopicsTest(RedpandaTest):
def __init__(self, test_context):
super(CreateSITopicsTest, self).__init__(test_context=test_context,
num_brokers=1,
si_settings=SISettings())

def _to_bool(self, x: str) -> bool:
return True if x == "true" else False

def _from_bool(self, x: bool) -> str:
return "true" if x else "false"

@cluster(num_nodes=1)
def test_shadow_indexing_mode(self):
rpk = RpkTool(self.redpanda)

cluster_remote_read = [True, False]
cluster_remote_write = [True, False]
topic_remote_read = [True, False, None]
topic_remote_write = [True, False, None]

cases = list(
itertools.product(cluster_remote_read, cluster_remote_write,
topic_remote_read, topic_remote_write))

for c_read, c_write, t_read, t_write in cases:
self.logger.info(
f"Test case: cloud_storage_enable_remote_read={c_read}, "
f"cloud_storage_enable_remote_write={c_write}, "
f"redpanda.remote.read={t_read}, "
f"redpanda.remote.write={t_write}")

expected_read = t_read if t_read is not None \
else c_read
expected_write = t_write if t_write is not None \
else c_write

self.redpanda.set_cluster_config(
{
"cloud_storage_enable_remote_read": c_read,
"cloud_storage_enable_remote_write": c_write
},
expect_restart=True)

config = {}
if t_read is not None:
config["redpanda.remote.read"] = self._from_bool(t_read)
if t_write is not None:
config["redpanda.remote.write"] = self._from_bool(t_write)

topic = topic_name()
rpk.create_topic(topic=topic,
partitions=1,
replicas=1,
config=config)

ret = rpk.describe_topic_configs(topic=topic)

read = self._to_bool(ret["redpanda.remote.read"][0])
write = self._to_bool(ret["redpanda.remote.write"][0])
assert read == expected_read, f"{read} != {expected_read}"
assert write == expected_write, f"{write} != {expected_write}"

@cluster(num_nodes=1)
def test_shadow_indexing_mode_persistence(self):
rpk = RpkTool(self.redpanda)
self.redpanda.set_cluster_config(
{
"cloud_storage_enable_remote_read": True,
"cloud_storage_enable_remote_write": True
},
expect_restart=True)

default_si_topic = topic_name()
explicit_si_topic = topic_name()
rpk.create_topic(topic=default_si_topic, partitions=1, replicas=1)
rpk.create_topic(topic=explicit_si_topic,
partitions=1,
replicas=1,
config={"redpanda.remote.write": "false"})

self.redpanda.set_cluster_config(
{
"cloud_storage_enable_remote_read": False,
"cloud_storage_enable_remote_write": True
},
expect_restart=True)

default_si_configs = rpk.describe_topic_configs(topic=default_si_topic)
explicit_si_configs = rpk.describe_topic_configs(
topic=explicit_si_topic)

# Since this topic did not specifiy an explicit remote read/write
# policy, the values are bound to the cluster level configs.
assert default_si_configs["redpanda.remote.read"] == ("false",
"DEFAULT_CONFIG")
assert default_si_configs["redpanda.remote.write"] == (
"true", "DEFAULT_CONFIG")

# Since this topic specified an explicit remote read/write policy,
# the values are not overriden by the cluster config change.
assert explicit_si_configs["redpanda.remote.read"] == (
"true", "DYNAMIC_TOPIC_CONFIG")
assert explicit_si_configs["redpanda.remote.write"] == (
"false", "DYNAMIC_TOPIC_CONFIG")


# When quickly recreating topics after deleting them, redpanda's topic
# dir creation can trip up over the topic dir deletion. This is not
# harmful because creation is retried, but it does generate a log error.
Expand Down

0 comments on commit 31ed4f8

Please sign in to comment.