diff --git a/notebooks/02-ingestion/17-batch-systemfields.ipynb b/notebooks/02-ingestion/17-batch-systemfields.ipynb new file mode 100644 index 00000000..0a3b3da7 --- /dev/null +++ b/notebooks/02-ingestion/17-batch-systemfields.ipynb @@ -0,0 +1,308 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "0cb3b009-ebde-4d56-9d59-a028d66d8309", + "metadata": {}, + "source": [ + "# Incorporate data from source systems into table data\n", + "\n", + "\n", + "When doing ingestion from multiple sources, it can be helpful to incorporate specific information from the source of the data.\n", + "\n", + "For example, you might parse a source system URI using a [datetime function](https://druid.apache.org/docs/latest/querying/sql-scalar#date-and-time-functions) to use as the primary `__time` stamp.\n", + "\n", + "This tutorial demonstrates how to incorporate `systemFields` with batch ingestion to add this information to rows in your table." + ] + }, + { + "cell_type": "markdown", + "id": "bbdbf6ad-ca7b-40f5-8ca3-1070f4a3ee42", + "metadata": {}, + "source": [ + "## Prerequisites\n", + "\n", + "This tutorial works with Druid 29.0.0 or later.\n", + "\n", + "Launch this tutorial and all prerequisites using the `druid-jupyter` profile of the Docker Compose file for Jupyter-based Druid tutorials. For more information, see the Learn Druid repository [readme](https://github.com/implydata/learn-druid)." + ] + }, + { + "cell_type": "markdown", + "id": "5007a243-b81a-4601-8f57-5b14940abbff", + "metadata": {}, + "source": [ + "## Initialization\n", + "\n", + "The following cells set up the notebook and learning environment ready for use." + ] + }, + { + "cell_type": "markdown", + "id": "0b769122-c5a4-404e-9ef8-9c0ebd97695a", + "metadata": {}, + "source": [ + "### Set up a connection to Apache Druid\n", + "\n", + "Run the next cell to set up the Druid Python client's connection to Apache Druid.\n", + "\n", + "If successful, the Druid version number will be shown in the output." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "c1ec783b-df3f-4168-9be2-cdc6ad3e33c2", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Opening a connection to http://router:8888.\n" + ] + }, + { + "data": { + "text/plain": [ + "'31.0.0'" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import druidapi\n", + "import os\n", + "\n", + "druid_headers = {'Content-Type': 'application/json'}\n", + "\n", + "if 'DRUID_HOST' not in os.environ.keys():\n", + " druid_host=f\"http://localhost:8888\"\n", + "else:\n", + " druid_host=f\"http://{os.environ['DRUID_HOST']}:8888\"\n", + "\n", + "print(f\"Opening a connection to {druid_host}.\")\n", + "druid = druidapi.jupyter_client(druid_host)\n", + "display = druid.display\n", + "sql_client = druid.sql\n", + "status_client = druid.status\n", + "\n", + "status_client.version" + ] + }, + { + "cell_type": "markdown", + "id": "472589e4-1026-4b3b-bb79-eedabb2b44c4", + "metadata": {}, + "source": [ + "## Create a table with system fields using batch ingestion\n", + "\n", + "The `systemFields` property can be added to a number of [`inputSource` definitions](https://druid.apache.org/docs/latest/ingestion/input-sources) in the EXTERN clause.\n", + "\n", + "Run the cell to ingest three files. Notice:\n", + "\n", + "* The EXTERN statement includes the `systemFields` object, with an array containing two fields to add.\n", + "* The EXTEND includes a type definition for both of the additional fields.\n", + "* The SELECT explicitly names both of the system fields, alongside the fields to ingest.\n", + "* The REGEXP_EXTRACT function applies a simple regular expression to the `__file_path` to store only the filename as `__file_name`.\n", + "* Only rows with a `passenger_count` of 5 are ingested.\n", + "\n", + "When completed, you'll see a description of the final table." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "f52a94fb-d2e4-403f-ab10-84d3af7bf2c8", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Loading data, status:[FAILED]: 0%| | 0.0/100.0 [00:07 47\u001b[0m \u001b[43msql_client\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mwait_until_ready\u001b[49m\u001b[43m(\u001b[49m\u001b[43mtable_name\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 48\u001b[0m display\u001b[38;5;241m.\u001b[39mtable(table_name)\n", + "File \u001b[0;32m/opt/conda/lib/python3.11/site-packages/druidapi/sql.py:1108\u001b[0m, in \u001b[0;36mQueryClient.wait_until_ready\u001b[0;34m(self, table_name, verify_load_status)\u001b[0m\n\u001b[1;32m 1096\u001b[0m \u001b[38;5;250m\u001b[39m\u001b[38;5;124;03m'''\u001b[39;00m\n\u001b[1;32m 1097\u001b[0m \u001b[38;5;124;03mWaits for a datasource to be loaded in the cluster, and to become available to SQL.\u001b[39;00m\n\u001b[1;32m 1098\u001b[0m \n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 1105\u001b[0m \u001b[38;5;124;03m If false, tries the test query before checking whether all published segments are loaded.\u001b[39;00m\n\u001b[1;32m 1106\u001b[0m \u001b[38;5;124;03m'''\u001b[39;00m\n\u001b[1;32m 1107\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m verify_load_status:\n\u001b[0;32m-> 1108\u001b[0m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mdruid_client\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mdatasources\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mwait_until_ready\u001b[49m\u001b[43m(\u001b[49m\u001b[43mtable_name\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 1109\u001b[0m \u001b[38;5;28;01mwhile\u001b[39;00m \u001b[38;5;28;01mTrue\u001b[39;00m:\n\u001b[1;32m 1110\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n", + "File \u001b[0;32m/opt/conda/lib/python3.11/site-packages/druidapi/datasource.py:83\u001b[0m, in \u001b[0;36mDatasourceClient.wait_until_ready\u001b[0;34m(self, ds_name)\u001b[0m\n\u001b[1;32m 81\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mwait_until_ready\u001b[39m(\u001b[38;5;28mself\u001b[39m, ds_name):\n\u001b[1;32m 82\u001b[0m \u001b[38;5;28;01mwhile\u001b[39;00m \u001b[38;5;28;01mTrue\u001b[39;00m:\n\u001b[0;32m---> 83\u001b[0m resp \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mload_status\u001b[49m\u001b[43m(\u001b[49m\u001b[43mds_name\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 84\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m dict_get(resp, ds_name) \u001b[38;5;241m==\u001b[39m \u001b[38;5;241m100.0\u001b[39m:\n\u001b[1;32m 85\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m\n", + "File \u001b[0;32m/opt/conda/lib/python3.11/site-packages/druidapi/datasource.py:77\u001b[0m, in \u001b[0;36mDatasourceClient.load_status\u001b[0;34m(self, ds_name)\u001b[0m\n\u001b[1;32m 76\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mload_status\u001b[39m(\u001b[38;5;28mself\u001b[39m, ds_name):\n\u001b[0;32m---> 77\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mload_status_req\u001b[49m\u001b[43m(\u001b[49m\u001b[43mds_name\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43m{\u001b[49m\n\u001b[1;32m 78\u001b[0m \u001b[43m \u001b[49m\u001b[38;5;124;43m'\u001b[39;49m\u001b[38;5;124;43mforceMetadataRefresh\u001b[39;49m\u001b[38;5;124;43m'\u001b[39;49m\u001b[43m:\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;124;43m'\u001b[39;49m\u001b[38;5;124;43mtrue\u001b[39;49m\u001b[38;5;124;43m'\u001b[39;49m\u001b[43m,\u001b[49m\n\u001b[1;32m 79\u001b[0m \u001b[43m \u001b[49m\u001b[38;5;124;43m'\u001b[39;49m\u001b[38;5;124;43minterval\u001b[39;49m\u001b[38;5;124;43m'\u001b[39;49m\u001b[43m:\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;124;43m'\u001b[39;49m\u001b[38;5;124;43m1970-01-01/2999-01-01\u001b[39;49m\u001b[38;5;124;43m'\u001b[39;49m\u001b[43m}\u001b[49m\u001b[43m)\u001b[49m\n", + "File \u001b[0;32m/opt/conda/lib/python3.11/site-packages/druidapi/datasource.py:73\u001b[0m, in \u001b[0;36mDatasourceClient.load_status_req\u001b[0;34m(self, ds_name, params)\u001b[0m\n\u001b[1;32m 71\u001b[0m response \u001b[38;5;241m=\u001b[39m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mrest_client\u001b[38;5;241m.\u001b[39mget(REQ_DS_LOAD_STATUS, args\u001b[38;5;241m=\u001b[39m[ds_name], params\u001b[38;5;241m=\u001b[39mparams)\n\u001b[1;32m 72\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mlen\u001b[39m(response\u001b[38;5;241m.\u001b[39mtext)\u001b[38;5;241m==\u001b[39m\u001b[38;5;241m0\u001b[39m:\n\u001b[0;32m---> 73\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m ClientError(\u001b[38;5;124mf\u001b[39m\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mTable \u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;132;01m{\u001b[39;00mds_name\u001b[38;5;132;01m}\u001b[39;00m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124m not found.\u001b[39m\u001b[38;5;124m'\u001b[39m)\n\u001b[1;32m 74\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m json\u001b[38;5;241m.\u001b[39mloads(response\u001b[38;5;241m.\u001b[39mtext)\n", + "\u001b[0;31mClientError\u001b[0m: Table \"example-taxitrips-systemfields\" not found." + ] + } + ], + "source": [ + "table_name = \"example-taxitrips-systemfields\"\n", + "\n", + "sql = '''\n", + "REPLACE INTO \"''' + table_name + '''\" OVERWRITE ALL\n", + "WITH \"ext\" AS (\n", + " SELECT *\n", + " FROM TABLE(\n", + " EXTERN(\n", + " '{\n", + " \"type\":\"http\",\n", + " \"systemFields\":[\"__file_uri\",\"__file_path\"],\n", + " \"uris\":\n", + " [\"https://static.imply.io/example-data/trips/trips_xaa.csv.gz\",\n", + " \"https://static.imply.io/example-data/trips/trips_xac.csv.gz\"]}',\n", + " '{\"type\":\"csv\",\"findColumnsFromHeader\":false,\"columns\":[\"trip_id\",\"vendor_id\",\"pickup_datetime\",\"dropoff_datetime\",\"store_and_fwd_flag\",\"rate_code_id\",\"pickup_longitude\",\"pickup_latitude\",\"dropoff_longitude\",\"dropoff_latitude\",\"passenger_count\",\"trip_distance\",\"fare_amount\",\"extra\",\"mta_tax\",\"tip_amount\",\"tolls_amount\",\"ehail_fee\",\"improvement_surcharge\",\"total_amount\",\"payment_type\",\"trip_type\",\"pickup\",\"dropoff\",\"cab_type\",\"precipitation\",\"snow_depth\",\"snowfall\",\"max_temperature\",\"min_temperature\",\"average_wind_speed\",\"pickup_nyct2010_gid\",\"pickup_ctlabel\",\"pickup_borocode\",\"pickup_boroname\",\"pickup_ct2010\",\"pickup_boroct2010\",\"pickup_cdeligibil\",\"pickup_ntacode\",\"pickup_ntaname\",\"pickup_puma\",\"dropoff_nyct2010_gid\",\"dropoff_ctlabel\",\"dropoff_borocode\",\"dropoff_boroname\",\"dropoff_ct2010\",\"dropoff_boroct2010\",\"dropoff_cdeligibil\",\"dropoff_ntacode\",\"dropoff_ntaname\",\"dropoff_puma\"]}'\n", + " )\n", + " ) EXTEND (\"__file_uri\" VARCHAR, \"__file_path\" VARCHAR,\n", + " \"trip_id\" BIGINT, \"vendor_id\" BIGINT, \"pickup_datetime\" VARCHAR, \"dropoff_datetime\" VARCHAR, \"store_and_fwd_flag\" VARCHAR, \"rate_code_id\" BIGINT, \"pickup_longitude\" DOUBLE, \"pickup_latitude\" DOUBLE, \"dropoff_longitude\" DOUBLE, \"dropoff_latitude\" DOUBLE, \"passenger_count\" BIGINT, \"trip_distance\" DOUBLE, \"fare_amount\" DOUBLE, \"extra\" DOUBLE, \"mta_tax\" DOUBLE, \"tip_amount\" DOUBLE, \"tolls_amount\" DOUBLE, \"ehail_fee\" VARCHAR, \"improvement_surcharge\" VARCHAR, \"total_amount\" DOUBLE, \"payment_type\" BIGINT, \"trip_type\" VARCHAR, \"pickup\" VARCHAR, \"dropoff\" VARCHAR, \"cab_type\" VARCHAR, \"precipitation\" DOUBLE, \"snow_depth\" BIGINT, \"snowfall\" DOUBLE, \"max_temperature\" BIGINT, \"min_temperature\" BIGINT, \"average_wind_speed\" DOUBLE, \"pickup_nyct2010_gid\" BIGINT, \"pickup_ctlabel\" BIGINT, \"pickup_borocode\" BIGINT, \"pickup_boroname\" VARCHAR, \"pickup_ct2010\" BIGINT, \"pickup_boroct2010\" BIGINT, \"pickup_cdeligibil\" VARCHAR, \"pickup_ntacode\" VARCHAR, \"pickup_ntaname\" VARCHAR, \"pickup_puma\" BIGINT, \"dropoff_nyct2010_gid\" BIGINT, \"dropoff_ctlabel\" BIGINT, \"dropoff_borocode\" BIGINT, \"dropoff_boroname\" VARCHAR, \"dropoff_ct2010\" BIGINT, \"dropoff_boroct2010\" BIGINT, \"dropoff_cdeligibil\" VARCHAR, \"dropoff_ntacode\" VARCHAR, \"dropoff_ntaname\" VARCHAR, \"dropoff_puma\" BIGINT)\n", + ")\n", + "SELECT\n", + " TIME_PARSE(TRIM(\"pickup_datetime\")) AS \"__time\",\n", + " \"__file_uri\",\n", + " \"__file_path\",\n", + " REGEXP_EXTRACT(\"__file_path\",'(?:.+\\/)(.+)',1) AS \"__file_name\",\n", + " \"trip_id\",\n", + " \"vendor_id\",\n", + " \"dropoff_datetime\",\n", + " \"rate_code_id\",\n", + " \"passenger_count\",\n", + " \"trip_distance\",\n", + " \"fare_amount\",\n", + " \"extra\",\n", + " \"mta_tax\",\n", + " \"tip_amount\",\n", + " \"tolls_amount\",\n", + " \"total_amount\",\n", + " \"payment_type\"\n", + "FROM \"ext\"\n", + "WHERE \"passenger_count\" = 5\n", + "PARTITIONED BY DAY\n", + "'''\n", + "\n", + "request = druid.sql.sql_request(sql)\n", + "request.add_context('maxNumTasks', 4)\n", + "\n", + "display.run_task(request)\n", + "sql_client.wait_until_ready(table_name)\n", + "display.table(table_name)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "dbfefc28-b7c3-4819-bc5b-196dcc6c3072", + "metadata": {}, + "outputs": [], + "source": [ + "Run the next cell to run a simple query that shows the results of the ingestion." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "932c7feb-a55d-4395-bd8c-02b2913358c4", + "metadata": {}, + "outputs": [], + "source": [ + "sql=f'''\n", + "SELECT\n", + " \"__file_uri\",\n", + " \"__file_path\", \n", + " \"__file_name\",\n", + " COUNT(*) AS \"total_file_rows\" \n", + "FROM \"{table_name}\"\n", + "GROUP BY 1,2, 3\n", + "'''\n", + "\n", + "druid.display.sql(sql)" + ] + }, + { + "cell_type": "markdown", + "id": "44738d6d-cec2-40ad-aaba-998c758c63f4", + "metadata": {}, + "source": [ + "## Clean up\n", + "\n", + "Run the following cell to drop the table." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8082b545-ba7f-4ede-bb6e-2a6dd62ba0d8", + "metadata": {}, + "outputs": [], + "source": [ + "print(f\"Drop table: [{druid.datasources.drop(table_name)}]\")" + ] + }, + { + "cell_type": "markdown", + "id": "54b8d5fe-ba85-4b5b-9669-0dd47dfbccd1", + "metadata": {}, + "source": [ + "## Summary\n", + "\n", + "* You learned this\n", + "* Remember this\n", + "\n", + "## Learn more\n", + "\n", + "* Try this out on your own data\n", + "* Solve for problem X that is't covered here\n", + "* Read docs pages\n", + "* Watch or read something cool from the community\n", + "* Do some exploratory stuff on your own" + ] + } + ], + "metadata": { + "execution": { + "allow_errors": true, + "timeout": 300 + }, + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.6" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}