Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Notebook on System Field ingestion #110

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
308 changes: 308 additions & 0 deletions notebooks/02-ingestion/17-batch-systemfields.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,308 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "0cb3b009-ebde-4d56-9d59-a028d66d8309",
"metadata": {},
"source": [
"# Incorporate data from source systems into table data\n",
"<!--\n",
" ~ Licensed to the Apache Software Foundation (ASF) under one\n",
" ~ or more contributor license agreements. See the NOTICE file\n",
" ~ distributed with this work for additional information\n",
" ~ regarding copyright ownership. The ASF licenses this file\n",
" ~ to you under the Apache License, Version 2.0 (the\n",
" ~ \"License\"); you may not use this file except in compliance\n",
" ~ with the License. You may obtain a copy of the License at\n",
" ~\n",
" ~ http://www.apache.org/licenses/LICENSE-2.0\n",
" ~\n",
" ~ Unless required by applicable law or agreed to in writing,\n",
" ~ software distributed under the License is distributed on an\n",
" ~ \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
" ~ KIND, either express or implied. See the License for the\n",
" ~ specific language governing permissions and limitations\n",
" ~ under the License.\n",
" -->\n",
"\n",
"When doing ingestion from multiple sources, it can be helpful to incorporate specific information from the source of the data.\n",
"\n",
"For example, you might parse a source system URI using a [datetime function](https://druid.apache.org/docs/latest/querying/sql-scalar#date-and-time-functions) to use as the primary `__time` stamp.\n",
"\n",
"This tutorial demonstrates how to incorporate `systemFields` with batch ingestion to add this information to rows in your table."
]
},
{
"cell_type": "markdown",
"id": "bbdbf6ad-ca7b-40f5-8ca3-1070f4a3ee42",
"metadata": {},
"source": [
"## Prerequisites\n",
"\n",
"This tutorial works with Druid 29.0.0 or later.\n",
"\n",
"Launch this tutorial and all prerequisites using the `druid-jupyter` profile of the Docker Compose file for Jupyter-based Druid tutorials. For more information, see the Learn Druid repository [readme](https://github.com/implydata/learn-druid)."
]
},
{
"cell_type": "markdown",
"id": "5007a243-b81a-4601-8f57-5b14940abbff",
"metadata": {},
"source": [
"## Initialization\n",
"\n",
"The following cells set up the notebook and learning environment ready for use."
]
},
{
"cell_type": "markdown",
"id": "0b769122-c5a4-404e-9ef8-9c0ebd97695a",
"metadata": {},
"source": [
"### Set up a connection to Apache Druid\n",
"\n",
"Run the next cell to set up the Druid Python client's connection to Apache Druid.\n",
"\n",
"If successful, the Druid version number will be shown in the output."
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "c1ec783b-df3f-4168-9be2-cdc6ad3e33c2",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Opening a connection to http://router:8888.\n"
]
},
{
"data": {
"text/plain": [
"'31.0.0'"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import druidapi\n",
"import os\n",
"\n",
"druid_headers = {'Content-Type': 'application/json'}\n",
"\n",
"if 'DRUID_HOST' not in os.environ.keys():\n",
" druid_host=f\"http://localhost:8888\"\n",
"else:\n",
" druid_host=f\"http://{os.environ['DRUID_HOST']}:8888\"\n",
"\n",
"print(f\"Opening a connection to {druid_host}.\")\n",
"druid = druidapi.jupyter_client(druid_host)\n",
"display = druid.display\n",
"sql_client = druid.sql\n",
"status_client = druid.status\n",
"\n",
"status_client.version"
]
},
{
"cell_type": "markdown",
"id": "472589e4-1026-4b3b-bb79-eedabb2b44c4",
"metadata": {},
"source": [
"## Create a table with system fields using batch ingestion\n",
"\n",
"The `systemFields` property can be added to a number of [`inputSource` definitions](https://druid.apache.org/docs/latest/ingestion/input-sources) in the EXTERN clause.\n",
"\n",
"Run the cell to ingest three files. Notice:\n",
"\n",
"* The EXTERN statement includes the `systemFields` object, with an array containing two fields to add.\n",
"* The EXTEND includes a type definition for both of the additional fields.\n",
"* The SELECT explicitly names both of the system fields, alongside the fields to ingest.\n",
"* The REGEXP_EXTRACT function applies a simple regular expression to the `__file_path` to store only the filename as `__file_name`.\n",
"* Only rows with a `passenger_count` of 5 are ingested.\n",
"\n",
"When completed, you'll see a description of the final table."
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "f52a94fb-d2e4-403f-ab10-84d3af7bf2c8",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"Loading data, status:[FAILED]: 0%| | 0.0/100.0 [00:07<?, ?it/s] \n"
]
},
{
"ename": "ClientError",
"evalue": "Table \"example-taxitrips-systemfields\" not found.",
"output_type": "error",
"traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mClientError\u001b[0m Traceback (most recent call last)",
"Cell \u001b[0;32mIn[3], line 47\u001b[0m\n\u001b[1;32m 44\u001b[0m request\u001b[38;5;241m.\u001b[39madd_context(\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mmaxNumTasks\u001b[39m\u001b[38;5;124m'\u001b[39m, \u001b[38;5;241m4\u001b[39m)\n\u001b[1;32m 46\u001b[0m display\u001b[38;5;241m.\u001b[39mrun_task(request)\n\u001b[0;32m---> 47\u001b[0m \u001b[43msql_client\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mwait_until_ready\u001b[49m\u001b[43m(\u001b[49m\u001b[43mtable_name\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 48\u001b[0m display\u001b[38;5;241m.\u001b[39mtable(table_name)\n",
"File \u001b[0;32m/opt/conda/lib/python3.11/site-packages/druidapi/sql.py:1108\u001b[0m, in \u001b[0;36mQueryClient.wait_until_ready\u001b[0;34m(self, table_name, verify_load_status)\u001b[0m\n\u001b[1;32m 1096\u001b[0m \u001b[38;5;250m\u001b[39m\u001b[38;5;124;03m'''\u001b[39;00m\n\u001b[1;32m 1097\u001b[0m \u001b[38;5;124;03mWaits for a datasource to be loaded in the cluster, and to become available to SQL.\u001b[39;00m\n\u001b[1;32m 1098\u001b[0m \n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 1105\u001b[0m \u001b[38;5;124;03m If false, tries the test query before checking whether all published segments are loaded.\u001b[39;00m\n\u001b[1;32m 1106\u001b[0m \u001b[38;5;124;03m'''\u001b[39;00m\n\u001b[1;32m 1107\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m verify_load_status:\n\u001b[0;32m-> 1108\u001b[0m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mdruid_client\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mdatasources\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mwait_until_ready\u001b[49m\u001b[43m(\u001b[49m\u001b[43mtable_name\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 1109\u001b[0m \u001b[38;5;28;01mwhile\u001b[39;00m \u001b[38;5;28;01mTrue\u001b[39;00m:\n\u001b[1;32m 1110\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n",
"File \u001b[0;32m/opt/conda/lib/python3.11/site-packages/druidapi/datasource.py:83\u001b[0m, in \u001b[0;36mDatasourceClient.wait_until_ready\u001b[0;34m(self, ds_name)\u001b[0m\n\u001b[1;32m 81\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mwait_until_ready\u001b[39m(\u001b[38;5;28mself\u001b[39m, ds_name):\n\u001b[1;32m 82\u001b[0m \u001b[38;5;28;01mwhile\u001b[39;00m \u001b[38;5;28;01mTrue\u001b[39;00m:\n\u001b[0;32m---> 83\u001b[0m resp \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mload_status\u001b[49m\u001b[43m(\u001b[49m\u001b[43mds_name\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 84\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m dict_get(resp, ds_name) \u001b[38;5;241m==\u001b[39m \u001b[38;5;241m100.0\u001b[39m:\n\u001b[1;32m 85\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m\n",
"File \u001b[0;32m/opt/conda/lib/python3.11/site-packages/druidapi/datasource.py:77\u001b[0m, in \u001b[0;36mDatasourceClient.load_status\u001b[0;34m(self, ds_name)\u001b[0m\n\u001b[1;32m 76\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mload_status\u001b[39m(\u001b[38;5;28mself\u001b[39m, ds_name):\n\u001b[0;32m---> 77\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mload_status_req\u001b[49m\u001b[43m(\u001b[49m\u001b[43mds_name\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43m{\u001b[49m\n\u001b[1;32m 78\u001b[0m \u001b[43m \u001b[49m\u001b[38;5;124;43m'\u001b[39;49m\u001b[38;5;124;43mforceMetadataRefresh\u001b[39;49m\u001b[38;5;124;43m'\u001b[39;49m\u001b[43m:\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;124;43m'\u001b[39;49m\u001b[38;5;124;43mtrue\u001b[39;49m\u001b[38;5;124;43m'\u001b[39;49m\u001b[43m,\u001b[49m\n\u001b[1;32m 79\u001b[0m \u001b[43m \u001b[49m\u001b[38;5;124;43m'\u001b[39;49m\u001b[38;5;124;43minterval\u001b[39;49m\u001b[38;5;124;43m'\u001b[39;49m\u001b[43m:\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;124;43m'\u001b[39;49m\u001b[38;5;124;43m1970-01-01/2999-01-01\u001b[39;49m\u001b[38;5;124;43m'\u001b[39;49m\u001b[43m}\u001b[49m\u001b[43m)\u001b[49m\n",
"File \u001b[0;32m/opt/conda/lib/python3.11/site-packages/druidapi/datasource.py:73\u001b[0m, in \u001b[0;36mDatasourceClient.load_status_req\u001b[0;34m(self, ds_name, params)\u001b[0m\n\u001b[1;32m 71\u001b[0m response \u001b[38;5;241m=\u001b[39m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mrest_client\u001b[38;5;241m.\u001b[39mget(REQ_DS_LOAD_STATUS, args\u001b[38;5;241m=\u001b[39m[ds_name], params\u001b[38;5;241m=\u001b[39mparams)\n\u001b[1;32m 72\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mlen\u001b[39m(response\u001b[38;5;241m.\u001b[39mtext)\u001b[38;5;241m==\u001b[39m\u001b[38;5;241m0\u001b[39m:\n\u001b[0;32m---> 73\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m ClientError(\u001b[38;5;124mf\u001b[39m\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mTable \u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;132;01m{\u001b[39;00mds_name\u001b[38;5;132;01m}\u001b[39;00m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124m not found.\u001b[39m\u001b[38;5;124m'\u001b[39m)\n\u001b[1;32m 74\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m json\u001b[38;5;241m.\u001b[39mloads(response\u001b[38;5;241m.\u001b[39mtext)\n",
"\u001b[0;31mClientError\u001b[0m: Table \"example-taxitrips-systemfields\" not found."
]
}
],
"source": [
"table_name = \"example-taxitrips-systemfields\"\n",
"\n",
"sql = '''\n",
"REPLACE INTO \"''' + table_name + '''\" OVERWRITE ALL\n",
"WITH \"ext\" AS (\n",
" SELECT *\n",
" FROM TABLE(\n",
" EXTERN(\n",
" '{\n",
" \"type\":\"http\",\n",
" \"systemFields\":[\"__file_uri\",\"__file_path\"],\n",
" \"uris\":\n",
" [\"https://static.imply.io/example-data/trips/trips_xaa.csv.gz\",\n",
" \"https://static.imply.io/example-data/trips/trips_xac.csv.gz\"]}',\n",
" '{\"type\":\"csv\",\"findColumnsFromHeader\":false,\"columns\":[\"trip_id\",\"vendor_id\",\"pickup_datetime\",\"dropoff_datetime\",\"store_and_fwd_flag\",\"rate_code_id\",\"pickup_longitude\",\"pickup_latitude\",\"dropoff_longitude\",\"dropoff_latitude\",\"passenger_count\",\"trip_distance\",\"fare_amount\",\"extra\",\"mta_tax\",\"tip_amount\",\"tolls_amount\",\"ehail_fee\",\"improvement_surcharge\",\"total_amount\",\"payment_type\",\"trip_type\",\"pickup\",\"dropoff\",\"cab_type\",\"precipitation\",\"snow_depth\",\"snowfall\",\"max_temperature\",\"min_temperature\",\"average_wind_speed\",\"pickup_nyct2010_gid\",\"pickup_ctlabel\",\"pickup_borocode\",\"pickup_boroname\",\"pickup_ct2010\",\"pickup_boroct2010\",\"pickup_cdeligibil\",\"pickup_ntacode\",\"pickup_ntaname\",\"pickup_puma\",\"dropoff_nyct2010_gid\",\"dropoff_ctlabel\",\"dropoff_borocode\",\"dropoff_boroname\",\"dropoff_ct2010\",\"dropoff_boroct2010\",\"dropoff_cdeligibil\",\"dropoff_ntacode\",\"dropoff_ntaname\",\"dropoff_puma\"]}'\n",
" )\n",
" ) EXTEND (\"__file_uri\" VARCHAR, \"__file_path\" VARCHAR,\n",
" \"trip_id\" BIGINT, \"vendor_id\" BIGINT, \"pickup_datetime\" VARCHAR, \"dropoff_datetime\" VARCHAR, \"store_and_fwd_flag\" VARCHAR, \"rate_code_id\" BIGINT, \"pickup_longitude\" DOUBLE, \"pickup_latitude\" DOUBLE, \"dropoff_longitude\" DOUBLE, \"dropoff_latitude\" DOUBLE, \"passenger_count\" BIGINT, \"trip_distance\" DOUBLE, \"fare_amount\" DOUBLE, \"extra\" DOUBLE, \"mta_tax\" DOUBLE, \"tip_amount\" DOUBLE, \"tolls_amount\" DOUBLE, \"ehail_fee\" VARCHAR, \"improvement_surcharge\" VARCHAR, \"total_amount\" DOUBLE, \"payment_type\" BIGINT, \"trip_type\" VARCHAR, \"pickup\" VARCHAR, \"dropoff\" VARCHAR, \"cab_type\" VARCHAR, \"precipitation\" DOUBLE, \"snow_depth\" BIGINT, \"snowfall\" DOUBLE, \"max_temperature\" BIGINT, \"min_temperature\" BIGINT, \"average_wind_speed\" DOUBLE, \"pickup_nyct2010_gid\" BIGINT, \"pickup_ctlabel\" BIGINT, \"pickup_borocode\" BIGINT, \"pickup_boroname\" VARCHAR, \"pickup_ct2010\" BIGINT, \"pickup_boroct2010\" BIGINT, \"pickup_cdeligibil\" VARCHAR, \"pickup_ntacode\" VARCHAR, \"pickup_ntaname\" VARCHAR, \"pickup_puma\" BIGINT, \"dropoff_nyct2010_gid\" BIGINT, \"dropoff_ctlabel\" BIGINT, \"dropoff_borocode\" BIGINT, \"dropoff_boroname\" VARCHAR, \"dropoff_ct2010\" BIGINT, \"dropoff_boroct2010\" BIGINT, \"dropoff_cdeligibil\" VARCHAR, \"dropoff_ntacode\" VARCHAR, \"dropoff_ntaname\" VARCHAR, \"dropoff_puma\" BIGINT)\n",
")\n",
"SELECT\n",
" TIME_PARSE(TRIM(\"pickup_datetime\")) AS \"__time\",\n",
" \"__file_uri\",\n",
" \"__file_path\",\n",
" REGEXP_EXTRACT(\"__file_path\",'(?:.+\\/)(.+)',1) AS \"__file_name\",\n",
" \"trip_id\",\n",
" \"vendor_id\",\n",
" \"dropoff_datetime\",\n",
" \"rate_code_id\",\n",
" \"passenger_count\",\n",
" \"trip_distance\",\n",
" \"fare_amount\",\n",
" \"extra\",\n",
" \"mta_tax\",\n",
" \"tip_amount\",\n",
" \"tolls_amount\",\n",
" \"total_amount\",\n",
" \"payment_type\"\n",
"FROM \"ext\"\n",
"WHERE \"passenger_count\" = 5\n",
"PARTITIONED BY DAY\n",
"'''\n",
"\n",
"request = druid.sql.sql_request(sql)\n",
"request.add_context('maxNumTasks', 4)\n",
"\n",
"display.run_task(request)\n",
"sql_client.wait_until_ready(table_name)\n",
"display.table(table_name)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "dbfefc28-b7c3-4819-bc5b-196dcc6c3072",
"metadata": {},
"outputs": [],
"source": [
"Run the next cell to run a simple query that shows the results of the ingestion."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "932c7feb-a55d-4395-bd8c-02b2913358c4",
"metadata": {},
"outputs": [],
"source": [
"sql=f'''\n",
"SELECT\n",
" \"__file_uri\",\n",
" \"__file_path\", \n",
" \"__file_name\",\n",
" COUNT(*) AS \"total_file_rows\" \n",
"FROM \"{table_name}\"\n",
"GROUP BY 1,2, 3\n",
"'''\n",
"\n",
"druid.display.sql(sql)"
]
},
{
"cell_type": "markdown",
"id": "44738d6d-cec2-40ad-aaba-998c758c63f4",
"metadata": {},
"source": [
"## Clean up\n",
"\n",
"Run the following cell to drop the table."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "8082b545-ba7f-4ede-bb6e-2a6dd62ba0d8",
"metadata": {},
"outputs": [],
"source": [
"print(f\"Drop table: [{druid.datasources.drop(table_name)}]\")"
]
},
{
"cell_type": "markdown",
"id": "54b8d5fe-ba85-4b5b-9669-0dd47dfbccd1",
"metadata": {},
"source": [
"## Summary\n",
"\n",
"* You learned this\n",
"* Remember this\n",
"\n",
"## Learn more\n",
"\n",
"* Try this out on your own data\n",
"* Solve for problem X that is't covered here\n",
"* Read docs pages\n",
"* Watch or read something cool from the community\n",
"* Do some exploratory stuff on your own"
]
}
],
"metadata": {
"execution": {
"allow_errors": true,
"timeout": 300
},
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.6"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Loading