From a6ee4c1ff27f398dbe7cc5f07c39a4953970be26 Mon Sep 17 00:00:00 2001 From: Peter Marshall Date: Tue, 9 Jul 2024 11:18:43 +0100 Subject: [PATCH 1/4] Create 16-batch-systemfields.ipynb Brought forward content from the old batch ingestion intro. --- .../02-ingestion/16-batch-systemfields.ipynb | 594 ++++++++++++++++++ 1 file changed, 594 insertions(+) create mode 100644 notebooks/02-ingestion/16-batch-systemfields.ipynb diff --git a/notebooks/02-ingestion/16-batch-systemfields.ipynb b/notebooks/02-ingestion/16-batch-systemfields.ipynb new file mode 100644 index 00000000..1e5d647f --- /dev/null +++ b/notebooks/02-ingestion/16-batch-systemfields.ipynb @@ -0,0 +1,594 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "0cb3b009-ebde-4d56-9d59-a028d66d8309", + "metadata": {}, + "source": [ + "# (Result) by (action) using (feature)\n", + "\n", + "\n", + "Introductory paragraph - for example:\n", + "\n", + "This tutorial demonstrates how to work with [feature](link to feature doc). In this tutorial you perform the following tasks:\n", + "\n", + "- Task 1\n", + "- Task 2\n", + "- Task 3\n", + "- etc" + ] + }, + { + "cell_type": "markdown", + "id": "bbdbf6ad-ca7b-40f5-8ca3-1070f4a3ee42", + "metadata": {}, + "source": [ + "## Prerequisites\n", + "\n", + "This tutorial works with Druid XX.0.0 or later.\n", + "\n", + "\n", + "\n", + "Launch this tutorial and all prerequisites using the `` profile of the Docker Compose file for Jupyter-based Druid tutorials. For more information, see the Learn Druid repository [readme](https://github.com/implydata/learn-druid).\n", + " " + ] + }, + { + "cell_type": "markdown", + "id": "5007a243-b81a-4601-8f57-5b14940abbff", + "metadata": {}, + "source": [ + "## Initialization\n", + "\n", + "The following cells set up the notebook and learning environment ready for use." + ] + }, + { + "cell_type": "markdown", + "id": "0b769122-c5a4-404e-9ef8-9c0ebd97695a", + "metadata": {}, + "source": [ + "### Set up a connection to Apache Druid\n", + "\n", + "Run the next cell to set up the Druid Python client's connection to Apache Druid.\n", + "\n", + "If successful, the Druid version number will be shown in the output." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c1ec783b-df3f-4168-9be2-cdc6ad3e33c2", + "metadata": {}, + "outputs": [], + "source": [ + "import druidapi\n", + "import os\n", + "\n", + "druid_headers = {'Content-Type': 'application/json'}\n", + "\n", + "if 'DRUID_HOST' not in os.environ.keys():\n", + " druid_host=f\"http://localhost:8888\"\n", + "else:\n", + " druid_host=f\"http://{os.environ['DRUID_HOST']}:8888\"\n", + "\n", + "print(f\"Opening a connection to {druid_host}.\")\n", + "druid = druidapi.jupyter_client(druid_host)\n", + "display = druid.display\n", + "sql_client = druid.sql\n", + "status_client = druid.status\n", + "\n", + "status_client.version" + ] + }, + { + "cell_type": "markdown", + "id": "2efdbee0-62da-4fd3-84e1-f66b8c0150b3", + "metadata": {}, + "source": [ + "### Set up a connection to Apache Kafka\n", + "\n", + "\n", + "\n", + "Run the next cell to set up the connection to Apache Kafka." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c075de81-04c9-4b23-8253-20a15d46252e", + "metadata": {}, + "outputs": [], + "source": [ + "if 'KAFKA_HOST' not in os.environ.keys():\n", + " kafka_host=f\"http://localhost:9092\"\n", + "else:\n", + " kafka_host=f\"{os.environ['KAFKA_HOST']}:9092\"" + ] + }, + { + "cell_type": "markdown", + "id": "bbffed9a-87a5-4b26-9f06-972fdbccd55a", + "metadata": {}, + "source": [ + "### Set up a connection to the data generator\n", + "\n", + "\n", + "\n", + "Run the next cell to set up the connection to the data generator." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a807280f-b387-4ffb-8bc3-764c5b511458", + "metadata": {}, + "outputs": [], + "source": [ + "import requests\n", + "import json\n", + "\n", + "datagen_host = \"http://datagen:9999\"\n", + "datagen_headers = {'Content-Type': 'application/json'}" + ] + }, + { + "cell_type": "markdown", + "id": "9c3d6b39-6551-4b2a-bdfb-9606aa92c853", + "metadata": {}, + "source": [ + "\n", + "\n", + "### Import additional modules\n", + "\n", + "Run the following cell to import additional Python modules that you will use to X, Y, Z." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "dc4c2524-0eba-4bc6-84ed-da3a25aa5fbe", + "metadata": {}, + "outputs": [], + "source": [ + "# Add your modules here, remembering to align this with the prerequisites section\n", + "\n", + "import json\n", + "import matplotlib\n", + "import matplotlib.pyplot as plt\n", + "import pandas as pd" + ] + }, + { + "cell_type": "markdown", + "id": "472589e4-1026-4b3b-bb79-eedabb2b44c4", + "metadata": {}, + "source": [ + "## Create a table using batch ingestion\n", + "\n", + "\n", + "\n", + "Run the following cell to create a table using batch ingestion. Notice {the use of X as a timestamp | only required columns are ingested | WHERE / expressions / GROUP BY are front-loaded | partitions on X period and clusters by Y}.\n", + "\n", + "When completed, you'll see a description of the final table." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f52a94fb-d2e4-403f-ab10-84d3af7bf2c8", + "metadata": {}, + "outputs": [], + "source": [ + "# Replace example-dataset-topic with a unique table name for this notebook.\n", + "\n", + "# - Always prefix your table name with `example-`\n", + "# - If using the standard example datasets, use the following standard values for `dataset`:\n", + "\n", + "# wikipedia wikipedia\n", + "# koalas KoalasToTheMax one day\n", + "# koalanest KoalasToTheMax one day (nested)\n", + "# nyctaxi3 NYC Taxi cabs (3 files)\n", + "# nyctaxi NYC Taxi cabs (all files)\n", + "# flights FlightCarrierOnTime (1 month)\n", + "\n", + "# Remember to apply good data modelling practice to your INSERT / REPLACE.\n", + "\n", + "table_name = 'example-dataset-topic'\n", + "\n", + "sql='''\n", + "REPLACE INTO \"''' + table_name + '''\" OVERWRITE ALL\n", + "'''\n", + "\n", + "display.run_task(sql)\n", + "sql_client.wait_until_ready(table_name)\n", + "display.table(table_name)" + ] + }, + { + "cell_type": "markdown", + "id": "5d8ff0b2-ee98-40d6-b0b7-77274e34881d", + "metadata": {}, + "source": [ + "## Create a table using streaming ingestion\n", + "\n", + "In this section, you use the data generator to generate a stream of messages into a Apache Kafka topic. Next, you'll set up an on-going ingestion into Druid." + ] + }, + { + "cell_type": "markdown", + "id": "a590b732-c843-4483-896d-c7972ba9e4be", + "metadata": {}, + "source": [ + "### Use the data generator to populate a Kafka topic\n", + "\n", + "Run the following cell to instruct the data generator to start producing data." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ff2ef424-cd35-416c-9cd0-14f71d43d2e5", + "metadata": {}, + "outputs": [], + "source": [ + "# For more information on the available configurations and settings for the data generator, see the dedicated notebook in \"99-contributing\"\n", + "\n", + "# Replace example-dataset-topic with a unique table name for this notebook.\n", + "\n", + "# - Always prefix your table name with `example-`\n", + "# - If using the standard example datasets, use the following standard values for `dataset`:\n", + "\n", + "# social/socialposts social\n", + "# clickstream/clickstream clickstream\n", + "\n", + "# Remember to apply good data modelling practice to your data schema.\n", + "\n", + "datagen_topic = \"example-dataset-topic\"\n", + "datagen_job = datagen_topic\n", + "datagen_config = \"social/social_posts.json\"\n", + "\n", + "datagen_request = {\n", + " \"name\": datagen_job,\n", + " \"target\": { \"type\": \"kafka\", \"endpoint\": kafka_host, \"topic\": datagen_topic },\n", + " \"config_file\": datagen_config, \n", + " \"time\":\"10m\",\n", + " \"concurrency\":100\n", + "}\n", + "\n", + "requests.post(f\"{datagen_host}/start\", json.dumps(datagen_request), headers=datagen_headers)\n", + "requests.get(f\"{datagen_host}/status/{datagen_job}\").json()" + ] + }, + { + "cell_type": "markdown", + "id": "d5fedf1f-566f-4501-be65-d20e216d2c59", + "metadata": {}, + "source": [ + "### Use streaming ingestion to populate the table\n", + "\n", + "Ingest data from an Apache Kafka topic into Apache Druid by submitting an [ingestion specification](https://druid.apache.org/docs/latest/ingestion/ingestion-spec.html) to the [streaming ingestion supervisor API](https://druid.apache.org/docs/latest/api-reference/supervisor-api).\n", + "\n", + "Run the next cell to set up the [`ioConfig`](https://druid.apache.org/docs/latest/ingestion/ingestion-spec#ioconfig), [`tuningConfig`](https://druid.apache.org/docs/latest/ingestion/ingestion-spec#tuningconfig), and [`dataSchema`](https://druid.apache.org/docs/latest/ingestion/ingestion-spec#dataschema). Notice {the use of X as a timestamp | only required columns are ingested | WHERE / expressions / GROUP BY are front-loaded | partitions on X period and clusters by Y}." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b9c8734c-e968-45de-a1bf-197d3bffb688", + "metadata": {}, + "outputs": [], + "source": [ + "ioConfig = {\n", + " \"type\": \"kafka\",\n", + " \"consumerProperties\": { \"bootstrap.servers\": kafka_host },\n", + " \"topic\": datagen_topic,\n", + " \"inputFormat\": { \"type\": \"json\" },\n", + " \"useEarliestOffset\": \"true\" }\n", + "\n", + "tuningConfig = { \"type\": \"kafka\" }\n", + "\n", + "# Replace example-dataset-topic with a unique table name for this notebook.\n", + "\n", + "# - Always prefix your table name with `example-`\n", + "# - If using the standard example datasets, use the following standard values for `dataset`:\n", + "\n", + "# social/socialposts social\n", + "# clickstream/clickstream clickstream\n", + "\n", + "# Remember to apply good data modelling practice to your data schema.\n", + "\n", + "table_name = 'example-dataset-topic'\n", + "\n", + "dataSchema = {\n", + " \"dataSource\": table_name,\n", + " \"timestampSpec\": { \"column\": \"time\", \"format\": \"iso\" },\n", + " \"granularitySpec\": { \"rollup\": \"false\", \"segmentGranularity\": \"hour\" },\n", + " \"dimensionsSpec\": { \"useSchemaDiscovery\" : \"true\"}\n", + " }\n", + "\n", + "ingestion_spec = {\n", + " \"type\": \"kafka\",\n", + " \"spec\": {\n", + " \"ioConfig\": ioConfig,\n", + " \"tuningConfig\": tuningConfig,\n", + " \"dataSchema\": dataSchema\n", + " }\n", + "}\n", + "\n", + "requests.post(f\"{druid_host}/druid/indexer/v1/supervisor\", json.dumps(ingestion_spec), headers=druid_headers)\n", + "sql_client.wait_until_ready(table_name, verify_load_status=False)\n", + "display.table(table_name)" + ] + }, + { + "cell_type": "markdown", + "id": "1b6c9b88-837d-4c80-a28d-36184ba63355", + "metadata": {}, + "source": [ + "## System Fields for Batch Ingestion\n", + "\n", + "When doing ingestion of multiple files, it is generally helpful to know the specific source of the data. This feature allows you to do just that. It provides system fields that identifty the input source and which can be added to the ingestion job.\n", + "\n", + "Each Input Source has slightly different input fields. In the example below we use HTTP [checkout in the docs to see the fields that are available](https://druid.apache.org/docs/latest/ingestion/input-sources#http-input-source). \n", + "\n", + "To enable this functionality, add the new property \"systemFields\" the Input Source field in the EXTERN clause:\n", + "```sql\n", + "FROM TABLE(\n", + " EXTERN(\n", + " '{\n", + " \"type\":\"http\",\n", + " \"systemFields\":[\"__file_uri\",\"__file_path\"], <<<<<< list of system fields to capture\n", + " \"uris\":[ 2 -- less rows to process for this example\n", + "GROUP BY 1,2,3\n", + "PARTITIONED BY ALL\n", + "'''\n", + "\n", + "request = druid.sql.sql_request(sql)\n", + "request.add_context('maxNumTasks', 3)\n", + "\n", + "druid.display.run_task(request)\n", + "druid.sql.wait_until_ready(table_name)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "dbfefc28-b7c3-4819-bc5b-196dcc6c3072", + "metadata": {}, + "outputs": [], + "source": [ + "Query the system fields that were ingested to see information about how each file was ingested:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "932c7feb-a55d-4395-bd8c-02b2913358c4", + "metadata": {}, + "outputs": [], + "source": [ + "sql='''\n", + "SELECT \"__file_uri\", \"__file_path\", \n", + " SUM(\"row_count\") \"total_file_rows\" \n", + "FROM \"example-taxi-trips-rollup\"\n", + "GROUP BY 1,2\n", + "'''\n", + "\n", + "druid.display.sql(sql)" + ] + }, + { + "cell_type": "markdown", + "id": "8dcb6664-9699-4f56-86f1-5c4543624230", + "metadata": {}, + "source": [ + "While the above examples are rather simple, this is a powerful tool to enhance data when the files are organized in folder structures where the path contains infomation about the data. It is common to see this kind of file system organization in cloud storage where that data has already been partitioned by time or other dimensions. Take this list of files as an example:\n", + "```\n", + "/data/activity_log/customer=501/hour=2024-01-01T10:00:00/datafile1.csv\n", + "/data/activity_log/customer=501/hour=2024-01-01T10:00:00/datafile2.csv\n", + "/data/activity_log/customer=376/hour=2024-01-01T11:00:00/datafile1.csv\n", + "...\n", + "```\n", + "With this example, the __file_uri or __file_path columns can be parsed at ingestion to create other fields using functions like REGEXP_EXTRACT to extract `customer` and `hour` in this example." + ] + }, + { + "cell_type": "markdown", + "id": "44738d6d-cec2-40ad-aaba-998c758c63f4", + "metadata": {}, + "source": [ + "## Clean up\n", + "\n", + "Run the following cell to remove the XXX used in this notebook from the database." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8082b545-ba7f-4ede-bb6e-2a6dd62ba0d8", + "metadata": {}, + "outputs": [], + "source": [ + "# Use this for batch ingested tables\n", + "\n", + "print(f\"Drop table: [{druid.datasources.drop(table_name)}]\")\n", + "\n", + "# Use this when doing streaming with the data generator\n", + "\n", + "print(f\"Stop streaming generator: [{requests.post(f'{datagen_host}/stop/{datagen_job}','')}]\")\n", + "print(f'Pause streaming ingestion: [{requests.post(f\"{druid_host}/druid/indexer/v1/supervisor/{datagen_topic}/suspend\",\"\")}]')\n", + "\n", + "print(f'Shutting down running tasks ...')\n", + "\n", + "tasks = druid.tasks.tasks(state='running', table=table_name)\n", + "while len(tasks)>0:\n", + " for task in tasks:\n", + " print(f\"...stopping task [{task['id']}]\")\n", + " druid.tasks.shut_down_task(task['id'])\n", + " tasks = druid.tasks.tasks(state='running', table=table_name)\n", + "\n", + "print(f'Reset offsets for re-runnability: [{requests.post(f\"{druid_host}/druid/indexer/v1/supervisor/{datagen_topic}/reset\",\"\")}]')\n", + "print(f'Terminate streaming ingestion: [{requests.post(f\"{druid_host}/druid/indexer/v1/supervisor/{datagen_topic}/terminate\",\"\")}]')\n", + "print(f\"Drop table: [{druid.datasources.drop(table_name)}]\")" + ] + }, + { + "cell_type": "markdown", + "id": "54b8d5fe-ba85-4b5b-9669-0dd47dfbccd1", + "metadata": {}, + "source": [ + "## Summary\n", + "\n", + "* You learned this\n", + "* Remember this\n", + "\n", + "## Learn more\n", + "\n", + "* Try this out on your own data\n", + "* Solve for problem X that is't covered here\n", + "* Read docs pages\n", + "* Watch or read something cool from the community\n", + "* Do some exploratory stuff on your own" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ca4d3362-b1a4-47a4-a782-9773c216b3ba", + "metadata": {}, + "outputs": [], + "source": [ + "# Here are some useful code elements that you can re-use.\n", + "\n", + "# When just wanting to display some SQL results\n", + "sql = f'''SELECT * FROM \"{table_name}\" LIMIT 5'''\n", + "display.sql(sql)\n", + "\n", + "# When ingesting data and wanting to describe the schema\n", + "display.run_task(sql)\n", + "sql_client.wait_until_ready('{table_name}')\n", + "display.table('{table_name}')\n", + "\n", + "# When you want to show the native version of a SQL statement\n", + "print(json.dumps(json.loads(sql_client.explain_sql(sql)['PLAN']), indent=2))\n", + "\n", + "# When you want a simple plot\n", + "df = pd.DataFrame(sql_client.sql(sql))\n", + "df.plot(x='x-axis', y='y-axis', marker='o')\n", + "plt.xticks(rotation=45, ha='right')\n", + "plt.gca().get_legend().remove()\n", + "plt.show()\n", + "\n", + "# When you want to add some query context parameters\n", + "req = sql_client.sql_request(sql)\n", + "req.add_context(\"useApproximateTopN\", \"false\")\n", + "resp = sql_client.sql_query(req)\n", + "\n", + "# When you want to compare two different sets of results\n", + "df3 = df1.compare(df2, keep_equal=True)\n", + "df3\n", + "\n", + "# When you want to see some messages from a Kafka topic\n", + "from kafka import KafkaConsumer\n", + "\n", + "consumer = KafkaConsumer(bootstrap_servers=kafka_host)\n", + "consumer.subscribe(topics=datagen_topic)\n", + "count = 0\n", + "for message in consumer:\n", + " count += 1\n", + " if count == 5:\n", + " break\n", + " print (\"%d:%d: v=%s\" % (message.partition,\n", + " message.offset,\n", + " message.value))\n", + "consumer.unsubscribe()" + ] + } + ], + "metadata": { + "execution": { + "allow_errors": true, + "timeout": 300 + }, + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.6" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} From 724f6f35e5fdc4c4c49e7d09d3453493d6186ac5 Mon Sep 17 00:00:00 2001 From: Peter Marshall Date: Tue, 9 Jul 2024 12:13:29 +0100 Subject: [PATCH 2/4] Update 16-batch-systemfields.ipynb Applied the template and added a transform to the field. Reported an issue with the ingestion not completing. --- .../02-ingestion/16-batch-systemfields.ipynb | 432 +++--------------- 1 file changed, 52 insertions(+), 380 deletions(-) diff --git a/notebooks/02-ingestion/16-batch-systemfields.ipynb b/notebooks/02-ingestion/16-batch-systemfields.ipynb index 1e5d647f..e87d5736 100644 --- a/notebooks/02-ingestion/16-batch-systemfields.ipynb +++ b/notebooks/02-ingestion/16-batch-systemfields.ipynb @@ -5,7 +5,7 @@ "id": "0cb3b009-ebde-4d56-9d59-a028d66d8309", "metadata": {}, "source": [ - "# (Result) by (action) using (feature)\n", + "# Incorporate system fields into table data\n", "\n", "\n", - "Introductory paragraph - for example:\n", + "When doing ingestion from multiple sources, it can be helpful to incorporate specific information from the source of the data.\n", "\n", - "This tutorial demonstrates how to work with [feature](link to feature doc). In this tutorial you perform the following tasks:\n", + "For example, you may want to parse a file name to use as the primary or a secondary `__time` stamp.\n", "\n", - "- Task 1\n", - "- Task 2\n", - "- Task 3\n", - "- etc" + "This tutorial demonstrates how to incorporate `systemFields` with batch ingestion to add this information to rows in your table." ] }, { @@ -42,15 +39,9 @@ "source": [ "## Prerequisites\n", "\n", - "This tutorial works with Druid XX.0.0 or later.\n", + "This tutorial works with Druid 29.0.0 or later.\n", "\n", - "\n", - "\n", - "Launch this tutorial and all prerequisites using the `` profile of the Docker Compose file for Jupyter-based Druid tutorials. For more information, see the Learn Druid repository [readme](https://github.com/implydata/learn-druid).\n", - " " + "Launch this tutorial and all prerequisites using the `druid-jupyter` profile of the Docker Compose file for Jupyter-based Druid tutorials. For more information, see the Learn Druid repository [readme](https://github.com/implydata/learn-druid)." ] }, { @@ -101,94 +92,22 @@ "status_client.version" ] }, - { - "cell_type": "markdown", - "id": "2efdbee0-62da-4fd3-84e1-f66b8c0150b3", - "metadata": {}, - "source": [ - "### Set up a connection to Apache Kafka\n", - "\n", - "\n", - "\n", - "Run the next cell to set up the connection to Apache Kafka." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "c075de81-04c9-4b23-8253-20a15d46252e", - "metadata": {}, - "outputs": [], - "source": [ - "if 'KAFKA_HOST' not in os.environ.keys():\n", - " kafka_host=f\"http://localhost:9092\"\n", - "else:\n", - " kafka_host=f\"{os.environ['KAFKA_HOST']}:9092\"" - ] - }, - { - "cell_type": "markdown", - "id": "bbffed9a-87a5-4b26-9f06-972fdbccd55a", - "metadata": {}, - "source": [ - "### Set up a connection to the data generator\n", - "\n", - "\n", - "\n", - "Run the next cell to set up the connection to the data generator." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "a807280f-b387-4ffb-8bc3-764c5b511458", - "metadata": {}, - "outputs": [], - "source": [ - "import requests\n", - "import json\n", - "\n", - "datagen_host = \"http://datagen:9999\"\n", - "datagen_headers = {'Content-Type': 'application/json'}" - ] - }, - { - "cell_type": "markdown", - "id": "9c3d6b39-6551-4b2a-bdfb-9606aa92c853", - "metadata": {}, - "source": [ - "\n", - "\n", - "### Import additional modules\n", - "\n", - "Run the following cell to import additional Python modules that you will use to X, Y, Z." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "dc4c2524-0eba-4bc6-84ed-da3a25aa5fbe", - "metadata": {}, - "outputs": [], - "source": [ - "# Add your modules here, remembering to align this with the prerequisites section\n", - "\n", - "import json\n", - "import matplotlib\n", - "import matplotlib.pyplot as plt\n", - "import pandas as pd" - ] - }, { "cell_type": "markdown", "id": "472589e4-1026-4b3b-bb79-eedabb2b44c4", "metadata": {}, "source": [ - "## Create a table using batch ingestion\n", + "## Create a table with system fields using batch ingestion\n", "\n", - "\n", + "The `systemFields` property can be added to a number of [`inputSource` definitions](https://druid.apache.org/docs/latest/ingestion/input-sources) in the EXTERN clause.\n", "\n", - "Run the following cell to create a table using batch ingestion. Notice {the use of X as a timestamp | only required columns are ingested | WHERE / expressions / GROUP BY are front-loaded | partitions on X period and clusters by Y}.\n", + "Run the cell to ingest three files. Notice:\n", + "\n", + "* The EXTERN statement includes the `systemFields` object, with an array containing two fields to add.\n", + "* The EXTEND includes a type definition for both of the additional fields.\n", + "* The SELECT explicitly names both of the system fields, alongside the fields to ingest.\n", + "* The REGEXP_EXTRACT function applies a simple regular expression to the `__file_path` to store only the filename as `__file_name`.\n", + "* Only rows with a `passenger_count` of 5 are ingested.\n", "\n", "When completed, you'll see a description of the final table." ] @@ -200,187 +119,7 @@ "metadata": {}, "outputs": [], "source": [ - "# Replace example-dataset-topic with a unique table name for this notebook.\n", - "\n", - "# - Always prefix your table name with `example-`\n", - "# - If using the standard example datasets, use the following standard values for `dataset`:\n", - "\n", - "# wikipedia wikipedia\n", - "# koalas KoalasToTheMax one day\n", - "# koalanest KoalasToTheMax one day (nested)\n", - "# nyctaxi3 NYC Taxi cabs (3 files)\n", - "# nyctaxi NYC Taxi cabs (all files)\n", - "# flights FlightCarrierOnTime (1 month)\n", - "\n", - "# Remember to apply good data modelling practice to your INSERT / REPLACE.\n", - "\n", - "table_name = 'example-dataset-topic'\n", - "\n", - "sql='''\n", - "REPLACE INTO \"''' + table_name + '''\" OVERWRITE ALL\n", - "'''\n", - "\n", - "display.run_task(sql)\n", - "sql_client.wait_until_ready(table_name)\n", - "display.table(table_name)" - ] - }, - { - "cell_type": "markdown", - "id": "5d8ff0b2-ee98-40d6-b0b7-77274e34881d", - "metadata": {}, - "source": [ - "## Create a table using streaming ingestion\n", - "\n", - "In this section, you use the data generator to generate a stream of messages into a Apache Kafka topic. Next, you'll set up an on-going ingestion into Druid." - ] - }, - { - "cell_type": "markdown", - "id": "a590b732-c843-4483-896d-c7972ba9e4be", - "metadata": {}, - "source": [ - "### Use the data generator to populate a Kafka topic\n", - "\n", - "Run the following cell to instruct the data generator to start producing data." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "ff2ef424-cd35-416c-9cd0-14f71d43d2e5", - "metadata": {}, - "outputs": [], - "source": [ - "# For more information on the available configurations and settings for the data generator, see the dedicated notebook in \"99-contributing\"\n", - "\n", - "# Replace example-dataset-topic with a unique table name for this notebook.\n", - "\n", - "# - Always prefix your table name with `example-`\n", - "# - If using the standard example datasets, use the following standard values for `dataset`:\n", - "\n", - "# social/socialposts social\n", - "# clickstream/clickstream clickstream\n", - "\n", - "# Remember to apply good data modelling practice to your data schema.\n", - "\n", - "datagen_topic = \"example-dataset-topic\"\n", - "datagen_job = datagen_topic\n", - "datagen_config = \"social/social_posts.json\"\n", - "\n", - "datagen_request = {\n", - " \"name\": datagen_job,\n", - " \"target\": { \"type\": \"kafka\", \"endpoint\": kafka_host, \"topic\": datagen_topic },\n", - " \"config_file\": datagen_config, \n", - " \"time\":\"10m\",\n", - " \"concurrency\":100\n", - "}\n", - "\n", - "requests.post(f\"{datagen_host}/start\", json.dumps(datagen_request), headers=datagen_headers)\n", - "requests.get(f\"{datagen_host}/status/{datagen_job}\").json()" - ] - }, - { - "cell_type": "markdown", - "id": "d5fedf1f-566f-4501-be65-d20e216d2c59", - "metadata": {}, - "source": [ - "### Use streaming ingestion to populate the table\n", - "\n", - "Ingest data from an Apache Kafka topic into Apache Druid by submitting an [ingestion specification](https://druid.apache.org/docs/latest/ingestion/ingestion-spec.html) to the [streaming ingestion supervisor API](https://druid.apache.org/docs/latest/api-reference/supervisor-api).\n", - "\n", - "Run the next cell to set up the [`ioConfig`](https://druid.apache.org/docs/latest/ingestion/ingestion-spec#ioconfig), [`tuningConfig`](https://druid.apache.org/docs/latest/ingestion/ingestion-spec#tuningconfig), and [`dataSchema`](https://druid.apache.org/docs/latest/ingestion/ingestion-spec#dataschema). Notice {the use of X as a timestamp | only required columns are ingested | WHERE / expressions / GROUP BY are front-loaded | partitions on X period and clusters by Y}." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "b9c8734c-e968-45de-a1bf-197d3bffb688", - "metadata": {}, - "outputs": [], - "source": [ - "ioConfig = {\n", - " \"type\": \"kafka\",\n", - " \"consumerProperties\": { \"bootstrap.servers\": kafka_host },\n", - " \"topic\": datagen_topic,\n", - " \"inputFormat\": { \"type\": \"json\" },\n", - " \"useEarliestOffset\": \"true\" }\n", - "\n", - "tuningConfig = { \"type\": \"kafka\" }\n", - "\n", - "# Replace example-dataset-topic with a unique table name for this notebook.\n", - "\n", - "# - Always prefix your table name with `example-`\n", - "# - If using the standard example datasets, use the following standard values for `dataset`:\n", - "\n", - "# social/socialposts social\n", - "# clickstream/clickstream clickstream\n", - "\n", - "# Remember to apply good data modelling practice to your data schema.\n", - "\n", - "table_name = 'example-dataset-topic'\n", - "\n", - "dataSchema = {\n", - " \"dataSource\": table_name,\n", - " \"timestampSpec\": { \"column\": \"time\", \"format\": \"iso\" },\n", - " \"granularitySpec\": { \"rollup\": \"false\", \"segmentGranularity\": \"hour\" },\n", - " \"dimensionsSpec\": { \"useSchemaDiscovery\" : \"true\"}\n", - " }\n", - "\n", - "ingestion_spec = {\n", - " \"type\": \"kafka\",\n", - " \"spec\": {\n", - " \"ioConfig\": ioConfig,\n", - " \"tuningConfig\": tuningConfig,\n", - " \"dataSchema\": dataSchema\n", - " }\n", - "}\n", - "\n", - "requests.post(f\"{druid_host}/druid/indexer/v1/supervisor\", json.dumps(ingestion_spec), headers=druid_headers)\n", - "sql_client.wait_until_ready(table_name, verify_load_status=False)\n", - "display.table(table_name)" - ] - }, - { - "cell_type": "markdown", - "id": "1b6c9b88-837d-4c80-a28d-36184ba63355", - "metadata": {}, - "source": [ - "## System Fields for Batch Ingestion\n", - "\n", - "When doing ingestion of multiple files, it is generally helpful to know the specific source of the data. This feature allows you to do just that. It provides system fields that identifty the input source and which can be added to the ingestion job.\n", - "\n", - "Each Input Source has slightly different input fields. In the example below we use HTTP [checkout in the docs to see the fields that are available](https://druid.apache.org/docs/latest/ingestion/input-sources#http-input-source). \n", - "\n", - "To enable this functionality, add the new property \"systemFields\" the Input Source field in the EXTERN clause:\n", - "```sql\n", - "FROM TABLE(\n", - " EXTERN(\n", - " '{\n", - " \"type\":\"http\",\n", - " \"systemFields\":[\"__file_uri\",\"__file_path\"], <<<<<< list of system fields to capture\n", - " \"uris\":[ 2 -- less rows to process for this example\n", - "GROUP BY 1,2,3\n", - "PARTITIONED BY ALL\n", + "WHERE \"passenger_count\" = 5\n", + "PARTITIONED BY DAY\n", "'''\n", "\n", "request = druid.sql.sql_request(sql)\n", - "request.add_context('maxNumTasks', 3)\n", + "request.add_context('maxNumTasks', 4)\n", "\n", - "druid.display.run_task(request)\n", - "druid.sql.wait_until_ready(table_name)" + "display.run_task(request)\n", + "sql_client.wait_until_ready(table_name)\n", + "display.table(table_name)" ] }, { @@ -428,31 +186,18 @@ "metadata": {}, "outputs": [], "source": [ - "sql='''\n", - "SELECT \"__file_uri\", \"__file_path\", \n", - " SUM(\"row_count\") \"total_file_rows\" \n", - "FROM \"example-taxi-trips-rollup\"\n", + "sql=f'''\n", + "SELECT\n", + " \"__file_uri\",\n", + " \"__file_path\", \n", + " COUNT(*) AS \"total_file_rows\" \n", + "FROM \"{table_name}\"\n", "GROUP BY 1,2\n", "'''\n", "\n", "druid.display.sql(sql)" ] }, - { - "cell_type": "markdown", - "id": "8dcb6664-9699-4f56-86f1-5c4543624230", - "metadata": {}, - "source": [ - "While the above examples are rather simple, this is a powerful tool to enhance data when the files are organized in folder structures where the path contains infomation about the data. It is common to see this kind of file system organization in cloud storage where that data has already been partitioned by time or other dimensions. Take this list of files as an example:\n", - "```\n", - "/data/activity_log/customer=501/hour=2024-01-01T10:00:00/datafile1.csv\n", - "/data/activity_log/customer=501/hour=2024-01-01T10:00:00/datafile2.csv\n", - "/data/activity_log/customer=376/hour=2024-01-01T11:00:00/datafile1.csv\n", - "...\n", - "```\n", - "With this example, the __file_uri or __file_path columns can be parsed at ingestion to create other fields using functions like REGEXP_EXTRACT to extract `customer` and `hour` in this example." - ] - }, { "cell_type": "markdown", "id": "44738d6d-cec2-40ad-aaba-998c758c63f4", @@ -460,7 +205,7 @@ "source": [ "## Clean up\n", "\n", - "Run the following cell to remove the XXX used in this notebook from the database." + "Run the following cell to drop the table." ] }, { @@ -470,26 +215,6 @@ "metadata": {}, "outputs": [], "source": [ - "# Use this for batch ingested tables\n", - "\n", - "print(f\"Drop table: [{druid.datasources.drop(table_name)}]\")\n", - "\n", - "# Use this when doing streaming with the data generator\n", - "\n", - "print(f\"Stop streaming generator: [{requests.post(f'{datagen_host}/stop/{datagen_job}','')}]\")\n", - "print(f'Pause streaming ingestion: [{requests.post(f\"{druid_host}/druid/indexer/v1/supervisor/{datagen_topic}/suspend\",\"\")}]')\n", - "\n", - "print(f'Shutting down running tasks ...')\n", - "\n", - "tasks = druid.tasks.tasks(state='running', table=table_name)\n", - "while len(tasks)>0:\n", - " for task in tasks:\n", - " print(f\"...stopping task [{task['id']}]\")\n", - " druid.tasks.shut_down_task(task['id'])\n", - " tasks = druid.tasks.tasks(state='running', table=table_name)\n", - "\n", - "print(f'Reset offsets for re-runnability: [{requests.post(f\"{druid_host}/druid/indexer/v1/supervisor/{datagen_topic}/reset\",\"\")}]')\n", - "print(f'Terminate streaming ingestion: [{requests.post(f\"{druid_host}/druid/indexer/v1/supervisor/{datagen_topic}/terminate\",\"\")}]')\n", "print(f\"Drop table: [{druid.datasources.drop(table_name)}]\")" ] }, @@ -511,59 +236,6 @@ "* Watch or read something cool from the community\n", "* Do some exploratory stuff on your own" ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "ca4d3362-b1a4-47a4-a782-9773c216b3ba", - "metadata": {}, - "outputs": [], - "source": [ - "# Here are some useful code elements that you can re-use.\n", - "\n", - "# When just wanting to display some SQL results\n", - "sql = f'''SELECT * FROM \"{table_name}\" LIMIT 5'''\n", - "display.sql(sql)\n", - "\n", - "# When ingesting data and wanting to describe the schema\n", - "display.run_task(sql)\n", - "sql_client.wait_until_ready('{table_name}')\n", - "display.table('{table_name}')\n", - "\n", - "# When you want to show the native version of a SQL statement\n", - "print(json.dumps(json.loads(sql_client.explain_sql(sql)['PLAN']), indent=2))\n", - "\n", - "# When you want a simple plot\n", - "df = pd.DataFrame(sql_client.sql(sql))\n", - "df.plot(x='x-axis', y='y-axis', marker='o')\n", - "plt.xticks(rotation=45, ha='right')\n", - "plt.gca().get_legend().remove()\n", - "plt.show()\n", - "\n", - "# When you want to add some query context parameters\n", - "req = sql_client.sql_request(sql)\n", - "req.add_context(\"useApproximateTopN\", \"false\")\n", - "resp = sql_client.sql_query(req)\n", - "\n", - "# When you want to compare two different sets of results\n", - "df3 = df1.compare(df2, keep_equal=True)\n", - "df3\n", - "\n", - "# When you want to see some messages from a Kafka topic\n", - "from kafka import KafkaConsumer\n", - "\n", - "consumer = KafkaConsumer(bootstrap_servers=kafka_host)\n", - "consumer.subscribe(topics=datagen_topic)\n", - "count = 0\n", - "for message in consumer:\n", - " count += 1\n", - " if count == 5:\n", - " break\n", - " print (\"%d:%d: v=%s\" % (message.partition,\n", - " message.offset,\n", - " message.value))\n", - "consumer.unsubscribe()" - ] } ], "metadata": { From a711db34ebcb3cf0bc04bb9f28b71cec62645818 Mon Sep 17 00:00:00 2001 From: Peter Marshall Date: Tue, 9 Jul 2024 13:11:53 +0100 Subject: [PATCH 3/4] Update 16-batch-systemfields.ipynb Added some refs, added a bit to the query. --- notebooks/02-ingestion/16-batch-systemfields.ipynb | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/notebooks/02-ingestion/16-batch-systemfields.ipynb b/notebooks/02-ingestion/16-batch-systemfields.ipynb index e87d5736..3ec6c9e3 100644 --- a/notebooks/02-ingestion/16-batch-systemfields.ipynb +++ b/notebooks/02-ingestion/16-batch-systemfields.ipynb @@ -5,7 +5,7 @@ "id": "0cb3b009-ebde-4d56-9d59-a028d66d8309", "metadata": {}, "source": [ - "# Incorporate system fields into table data\n", + "# Incorporate data from source systems into table data\n", " 47\u001b[0m \u001b[43msql_client\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mwait_until_ready\u001b[49m\u001b[43m(\u001b[49m\u001b[43mtable_name\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 48\u001b[0m display\u001b[38;5;241m.\u001b[39mtable(table_name)\n", + "File \u001b[0;32m/opt/conda/lib/python3.11/site-packages/druidapi/sql.py:1108\u001b[0m, in \u001b[0;36mQueryClient.wait_until_ready\u001b[0;34m(self, table_name, verify_load_status)\u001b[0m\n\u001b[1;32m 1096\u001b[0m \u001b[38;5;250m\u001b[39m\u001b[38;5;124;03m'''\u001b[39;00m\n\u001b[1;32m 1097\u001b[0m \u001b[38;5;124;03mWaits for a datasource to be loaded in the cluster, and to become available to SQL.\u001b[39;00m\n\u001b[1;32m 1098\u001b[0m \n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 1105\u001b[0m \u001b[38;5;124;03m If false, tries the test query before checking whether all published segments are loaded.\u001b[39;00m\n\u001b[1;32m 1106\u001b[0m \u001b[38;5;124;03m'''\u001b[39;00m\n\u001b[1;32m 1107\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m verify_load_status:\n\u001b[0;32m-> 1108\u001b[0m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mdruid_client\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mdatasources\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mwait_until_ready\u001b[49m\u001b[43m(\u001b[49m\u001b[43mtable_name\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 1109\u001b[0m \u001b[38;5;28;01mwhile\u001b[39;00m \u001b[38;5;28;01mTrue\u001b[39;00m:\n\u001b[1;32m 1110\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n", + "File \u001b[0;32m/opt/conda/lib/python3.11/site-packages/druidapi/datasource.py:83\u001b[0m, in \u001b[0;36mDatasourceClient.wait_until_ready\u001b[0;34m(self, ds_name)\u001b[0m\n\u001b[1;32m 81\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mwait_until_ready\u001b[39m(\u001b[38;5;28mself\u001b[39m, ds_name):\n\u001b[1;32m 82\u001b[0m \u001b[38;5;28;01mwhile\u001b[39;00m \u001b[38;5;28;01mTrue\u001b[39;00m:\n\u001b[0;32m---> 83\u001b[0m resp \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mload_status\u001b[49m\u001b[43m(\u001b[49m\u001b[43mds_name\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 84\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m dict_get(resp, ds_name) \u001b[38;5;241m==\u001b[39m \u001b[38;5;241m100.0\u001b[39m:\n\u001b[1;32m 85\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m\n", + "File \u001b[0;32m/opt/conda/lib/python3.11/site-packages/druidapi/datasource.py:77\u001b[0m, in \u001b[0;36mDatasourceClient.load_status\u001b[0;34m(self, ds_name)\u001b[0m\n\u001b[1;32m 76\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mload_status\u001b[39m(\u001b[38;5;28mself\u001b[39m, ds_name):\n\u001b[0;32m---> 77\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mload_status_req\u001b[49m\u001b[43m(\u001b[49m\u001b[43mds_name\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43m{\u001b[49m\n\u001b[1;32m 78\u001b[0m \u001b[43m \u001b[49m\u001b[38;5;124;43m'\u001b[39;49m\u001b[38;5;124;43mforceMetadataRefresh\u001b[39;49m\u001b[38;5;124;43m'\u001b[39;49m\u001b[43m:\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;124;43m'\u001b[39;49m\u001b[38;5;124;43mtrue\u001b[39;49m\u001b[38;5;124;43m'\u001b[39;49m\u001b[43m,\u001b[49m\n\u001b[1;32m 79\u001b[0m \u001b[43m \u001b[49m\u001b[38;5;124;43m'\u001b[39;49m\u001b[38;5;124;43minterval\u001b[39;49m\u001b[38;5;124;43m'\u001b[39;49m\u001b[43m:\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;124;43m'\u001b[39;49m\u001b[38;5;124;43m1970-01-01/2999-01-01\u001b[39;49m\u001b[38;5;124;43m'\u001b[39;49m\u001b[43m}\u001b[49m\u001b[43m)\u001b[49m\n", + "File \u001b[0;32m/opt/conda/lib/python3.11/site-packages/druidapi/datasource.py:73\u001b[0m, in \u001b[0;36mDatasourceClient.load_status_req\u001b[0;34m(self, ds_name, params)\u001b[0m\n\u001b[1;32m 71\u001b[0m response \u001b[38;5;241m=\u001b[39m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mrest_client\u001b[38;5;241m.\u001b[39mget(REQ_DS_LOAD_STATUS, args\u001b[38;5;241m=\u001b[39m[ds_name], params\u001b[38;5;241m=\u001b[39mparams)\n\u001b[1;32m 72\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mlen\u001b[39m(response\u001b[38;5;241m.\u001b[39mtext)\u001b[38;5;241m==\u001b[39m\u001b[38;5;241m0\u001b[39m:\n\u001b[0;32m---> 73\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m ClientError(\u001b[38;5;124mf\u001b[39m\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mTable \u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;132;01m{\u001b[39;00mds_name\u001b[38;5;132;01m}\u001b[39;00m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124m not found.\u001b[39m\u001b[38;5;124m'\u001b[39m)\n\u001b[1;32m 74\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m json\u001b[38;5;241m.\u001b[39mloads(response\u001b[38;5;241m.\u001b[39mtext)\n", + "\u001b[0;31mClientError\u001b[0m: Table \"example-taxitrips-systemfields\" not found." + ] + } + ], "source": [ "table_name = \"example-taxitrips-systemfields\"\n", "\n",