Description
Hello!
I am posting this issue with the hope of having some more understanding of what what can be the best and more performing way of accessing large scale KVstores collections through the Python SDK.
In short, my goal is to work with large collections, say >50.000 records collections and especially I need to retrieve all the records fron the collection depending on the context.
By experience, I reckon that the normal method of loading the Kvstore record without stipulating a limit and/or skip leads to partial access to the records rather than the complete list of records.
This for instance:
records = collection.data.query()
Will silently fail to retrieve all records if the collection is >10k -ish depending on the size of the records.
Using limit and skip with a chunk approach allows to retrieve all records and prevents from reaching limitations that can be observed with messages like the following in splunkd:
WARN KVStorageProvider [2491855 TcpChannelThread] - Result size too large, max_size_per_result_mb=52428800, Consider applying a skip and/or limit.
or:
06-01-2025 18:55:33.536 +0000 WARN KVStorageProvider [1752033 TcpChannelThread] - Too many rows in result, max_rows_per_query=50000, Consider applying a skip and/or limit.
My best performing function has far been:
def search_kv_collection_sdkmode(
service, collection_name, page=1, page_count=0, key_filter=None, object_filter=None
):
"""
Get records from a KVstore collection using a Splunk search.
:param service: The Splunk service object.
:param collection_name: The name of the collection to query.
:param page: The page number to retrieve.
:param page_count: The number of records to retrieve per page.
:return: A tuple containing the records, keys, a dictionary of the records, and last_page.
"""
# connect to the collection
collection = service.kvstore[collection_name]
# add filter, if any
if key_filter:
query_string = {"keyid": key_filter}
elif object_filter:
query_string = {"object": object_filter}
else:
query_string = {}
start_time = time.time()
collection_dict = {}
try:
if query_string:
# For filtered queries, we can fetch all matching records at once
process_collection_records = collection.data.query(
query=json.dumps(query_string)
)
for item in process_collection_records:
collection_dict[item.get("_key")] = item
else:
# For unfiltered queries, we need to use chunked fetching
chunk_size = 10000 # KVstore's default limit
skip_tracker = 0
while True:
process_collection_records = collection.data.query(
limit=chunk_size, skip=skip_tracker
)
if not process_collection_records:
break
for item in process_collection_records:
collection_dict[item.get("_key")] = item
skip_tracker += chunk_size
# Convert to list and set only once at the end
collection_records = list(collection_dict.values())
collection_records_keys = set(collection_dict.keys())
# Handle pagination
if page_count == 0:
last_page = 1
else:
total_record_count = len(collection_records)
last_page = (total_record_count + page_count - 1) // page_count
# Apply pagination to the records
start_index = (page - 1) * page_count
end_index = page * page_count
collection_records = collection_records[start_index:end_index]
except Exception as e:
msg = f'main search failed with exception="{str(e)}"'
logging.error(msg)
raise Exception(msg)
logging.info(
f'context="perf", search_kv_collection, KVstore select terminated, no_records="{len(collection_records)}", run_time="{round((time.time() - start_time), 3)}", collection="{collection_name}"'
)
return collection_records, collection_records_keys, collection_dict, last_page
Which works great and uses essentially this logic to retrieve all records without reaching limitations and in the most perforning way possible, at it seems:
chunk_size = 10000 # KVstore's default limit
skip_tracker = 0
while True:
process_collection_records = collection.data.query(
limit=chunk_size, skip=skip_tracker
)
if not process_collection_records:
break
for item in process_collection_records:
collection_dict[item.get("_key")] = item
skip_tracker += chunk_size
Now, as I am doing mass tests performance benches, I am comparaing with two additional approaches, one leverage a Splunk search method using inputlookup, and one leveraging REST and accessing to the KVstore rest endpoint against splunk:
Search method:
def search_kv_collection_searchmode(
service, collection_name, page=1, page_count=0, key_filter=None, object_filter=None
):
"""
Get records from a KVstore collection using a Splunk search.
:param service: The Splunk service object.
:param collection_name: The name of the collection to query.
:param page: The page number to retrieve.
:param page_count: The number of records to retrieve per page.
:return: A tuple containing the records, keys, a dictionary of the records, and last_page.
"""
start_time = time.time()
collection_dict = {}
try:
# Build the search command efficiently
search_parts = [f'| inputlookup {collection_name.replace("kv_", "")}']
# Add filter if specified
if key_filter:
search_parts.append(f'where keyid="{key_filter}"')
elif object_filter:
search_parts.append(f'where object="{object_filter}"')
# Add pagination if needed
if page_count > 0:
search_parts.append(f"| head {page_count} | tail {page_count}")
# Complete the search
search_parts.append("| eval keyid=_key")
search = " ".join(search_parts)
# Optimize search parameters
kwargs_search = {
"earliest_time": "-5m",
"latest_time": "now",
"preview": "false",
"output_mode": "json",
"count": 0,
}
# Execute search and process results
reader = run_splunk_search(
service,
search,
kwargs_search,
24, # max_retries
5, # retry_delay
)
# Process results efficiently
for item in reader:
if isinstance(item, dict):
key = item.get("keyid")
if key: # Only process items with valid keys
collection_dict[key] = item
# Convert to required formats only once
collection_records = list(collection_dict.values())
collection_records_keys = set(collection_dict.keys())
# Handle pagination
if page_count == 0:
last_page = 1
else:
# Get total count for pagination
count_search = f'| inputlookup {collection_name.replace("kv_", "")}'
if key_filter:
count_search += f' where keyid="{key_filter}"'
elif object_filter:
count_search += f' where object="{object_filter}"'
count_search += " | stats count"
count_reader = run_splunk_search(
service,
count_search,
kwargs_search,
24,
5,
)
total_count = 0
for item in count_reader:
if isinstance(item, dict) and "count" in item:
total_count = int(item["count"])
break
last_page = (total_count + page_count - 1) // page_count
except Exception as e:
msg = f'main search failed with exception="{str(e)}"'
logging.error(msg)
raise Exception(msg)
logging.info(
f'context="perf", search_kv_collection, KVstore select terminated, no_records="{len(collection_records)}", run_time="{round((time.time() - start_time), 3)}", collection="{collection_name}"'
)
return collection_records, collection_records_keys, collection_dict, last_page
REST method:
def search_kv_collection_restmode(
headers,
splunkd_uri,
collection_name,
page=1,
page_count=0,
key_filter=None,
object_filter=None,
):
"""
Get records from a KVstore collection using REST API.
:param headers: The headers to use for the request.
:param splunkd_uri: The Splunkd URI.
:param collection_name: The name of the collection to query.
:param page: The page number to retrieve.
:param page_count: The number of records to retrieve per page.
:return: A tuple containing the records, keys, a dictionary of the records, and last_page.
"""
start_time = time.time()
collection_dict = {}
try:
# Create a session for connection pooling
with requests.Session() as session:
session.headers.update(headers)
session.verify = False
# Build base URL
url = f"{splunkd_uri}/servicesNS/nobody/trackme/storage/collections/data/{collection_name}"
# Add filter if specified
if key_filter:
url = f"{url}/{key_filter}"
elif object_filter:
query_dict = {"object": {"$eq": object_filter}}
query = f"?query={urlencode(json.dumps(query_dict))}"
url = f"{url}{query}"
# If pagination is needed, use it directly in the request
if page_count > 0:
skip = (page - 1) * page_count
params = {
"output_mode": "json",
"skip": skip,
"limit": page_count,
}
# Make the request
response = session.get(
url,
params=params,
timeout=300,
)
response.raise_for_status()
response_json = response.json()
# Process results efficiently
for item in response_json:
key = item.get("_key")
if key: # Only process items with valid keys
collection_dict[key] = item
else:
# For non-paginated requests, fetch all records in chunks
chunk_size = 10000 # KVstore's default limit
skip = 0
while True:
params = {
"output_mode": "json",
"skip": skip,
"limit": chunk_size,
}
# Make the request
response = session.get(
url,
params=params,
timeout=300,
)
response.raise_for_status()
response_json = response.json()
# If no more records, break the loop
if not response_json:
break
# Process results efficiently
for item in response_json:
key = item.get("_key")
if key: # Only process items with valid keys
collection_dict[key] = item
# If we got less than chunk_size records, we've reached the end
if len(response_json) < chunk_size:
break
# Move to next chunk
skip += chunk_size
# Convert to required formats only once
collection_records = list(collection_dict.values())
collection_records_keys = set(collection_dict.keys())
# Handle pagination
if page_count == 0:
last_page = 1
else:
# Get total count for pagination
count_url = f"{splunkd_uri}/servicesNS/nobody/trackme/storage/collections/data/{collection_name}/count"
if object_filter:
count_url += f"?query={urlencode(json.dumps({'object': {'$eq': object_filter}}))}"
count_response = session.get(
count_url,
params={"output_mode": "json"},
timeout=300,
)
count_response.raise_for_status()
total_count = count_response.json().get("count", 0)
last_page = (total_count + page_count - 1) // page_count
except Exception as e:
msg = f'REST query failed with exception="{str(e)}"'
logging.error(msg)
raise Exception(msg)
logging.info(
f'context="perf", search_kv_collection_rest, KVstore select terminated, no_records="{len(collection_records)}", run_time="{round((time.time() - start_time), 3)}", collection="{collection_name}"'
)
return collection_records, collection_records_keys, collection_dict, last_page
All the 3 methods allow me to retrieve all record in large collections (>50k records), but performance differ:
- The faster is the search method leveraging inputlookup, for instance in my case a 55.000 collection is retrieved in about 4-5 sec.
- The SDK method is second, with twice the amount and at least 9-10 sec
- REST is the slowest, with 11-12 sec
My objective is to prevent the usage of a Splunk search slot to reduce splunk search usage while still being able to access to entire collection at once and in Python.
However, I am still wondering if there cannot be a faster and more reliable method to access and retrieve the entire records from a KVstore collection?
In the end, 55.000 records is not a lot, inputlookup is capable of loading may more records in a fast way, the SDK and REST and very slowish, the SDK also is not reliable unless you implement strong mechanisms - which I didn't find to be well documented.
Is this all we achieve? Can't we be more performning, more scalable and reliable while working with KVstore collections in Python through the Splunk SDK? Maybe I am missing something?
I will strongly appreciate any help / answers on this very interesting and valuable topic for entire community of Splunk developers and users of the SDK - Thanks!