Skip to content

Commit b1bafd5

Browse files
Parthibwhoseoyster
authored andcommitted
OPEN-5105: Changes to support monitoring data ingestion without InferencePipelineData objects
1 parent 2cfd273 commit b1bafd5

File tree

2 files changed

+60
-9
lines changed

2 files changed

+60
-9
lines changed

openlayer/__init__.py

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
from typing import Dict, Optional, Tuple
3232

3333
import pandas as pd
34+
import urllib.parse
3435
import yaml
3536

3637
from . import api, constants, exceptions, utils
@@ -882,6 +883,7 @@ def create_inference_pipeline(
882883
inference_pipeline_config = {
883884
"name": name or "Production",
884885
"description": description or "Monitoring production data.",
886+
"storageType": api.STORAGE.value,
885887
}
886888
inference_pipeline_validator = (
887889
inference_pipeline_validators.InferencePipelineValidator(
@@ -920,7 +922,7 @@ def create_inference_pipeline(
920922
{"task_type": task_type.value, **reference_dataset_config}
921923
)
922924

923-
with tempfile.TemporaryDirectory() as tmp_dir:
925+
with tempfile.TemporaryDirectory() as tmp_dir:
924926
# Copy relevant files to tmp dir if reference dataset is provided
925927
if reference_dataset_config_file_path is not None:
926928
utils.write_yaml(
@@ -1134,21 +1136,31 @@ def publish_batch_data(
11341136
tar.add(tmp_dir, arcname=os.path.basename("batch_data"))
11351137

11361138
payload = {
1139+
"performGroundTruthMerge": False,
1140+
}
1141+
1142+
presigned_url_query_params_dict = {
11371143
"earliestTimestamp": int(earliest_timestamp),
11381144
"latestTimestamp": int(latest_timestamp),
1139-
"inferenceIdColumnName": batch_data.get("inferenceIdColumnName"),
1140-
"timestampColumnName": batch_data.get("timestampColumnName"),
1141-
"performGroundTruthMerge": False,
1145+
"storageInterface": api.STORAGE.value,
1146+
"dataType": "data",
11421147
}
11431148

1149+
presigned_url_query_params = urllib.parse.urlencode(
1150+
presigned_url_query_params_dict
1151+
)
1152+
11441153
self.api.upload(
11451154
endpoint=f"inference-pipelines/{inference_pipeline_id}/data",
11461155
file_path=tar_file_path,
11471156
object_name="tarfile",
11481157
body=payload,
11491158
storage_uri_key="storageUri",
11501159
method="POST",
1160+
presigned_url_endpoint=f"inference-pipelines/{inference_pipeline_id}/presigned-url",
1161+
presigned_url_query_params=presigned_url_query_params
11511162
)
1163+
11521164
print("Batch of data published!")
11531165

11541166
def _add_default_column(
@@ -1196,18 +1208,30 @@ def publish_ground_truths(
11961208
with tempfile.TemporaryDirectory() as tmp_dir:
11971209
# Copy save files to tmp dir
11981210
df.to_csv(f"{tmp_dir}/dataset.csv", index=False)
1211+
11991212
payload = {
12001213
"performGroundTruthMerge": True,
12011214
"groundTruthColumnName": ground_truth_column_name,
12021215
"inferenceIdColumnName": inference_id_column_name,
12031216
}
12041217

1218+
presigned_url_query_params_dict = {
1219+
"storageInterface": api.STORAGE.value,
1220+
"dataType": "groundTruths",
1221+
}
1222+
1223+
presigned_url_query_params = urllib.parse.urlencode(
1224+
presigned_url_query_params_dict
1225+
)
1226+
12051227
self.api.upload(
12061228
endpoint=f"inference-pipelines/{inference_pipeline_id}/data",
12071229
file_path=f"{tmp_dir}/dataset.csv",
12081230
object_name="dataset.csv",
12091231
body=payload,
12101232
storage_uri_key="storageUri",
12111233
method="POST",
1234+
presigned_url_endpoint=f"inference-pipelines/{inference_pipeline_id}/presigned-url",
1235+
presigned_url_query_params=presigned_url_query_params
12121236
)
12131237
print("Ground truths published!")

openlayer/api.py

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from requests_toolbelt import MultipartEncoder, MultipartEncoderMonitor
2929
from tqdm import tqdm
3030
from tqdm.utils import CallbackIOWrapper
31+
import urllib.parse
3132

3233
from . import constants
3334
from .exceptions import ExceptionMap, OpenlayerException
@@ -189,6 +190,8 @@ def upload(
189190
body=None,
190191
method: str = "POST",
191192
storage_uri_key: str = "storageUri",
193+
presigned_url_endpoint: str = "storage/presigned-url",
194+
presigned_url_query_params: str = "",
192195
):
193196
"""Generic method to upload data to the default storage medium and create the
194197
appropriate resource in the backend.
@@ -201,13 +204,16 @@ def upload(
201204
upload = self.upload_blob_azure
202205
else:
203206
upload = self.transfer_blob
207+
204208
return upload(
205209
endpoint=endpoint,
206210
file_path=file_path,
207211
object_name=object_name,
208212
body=body,
209213
method=method,
210214
storage_uri_key=storage_uri_key,
215+
presigned_url_endpoint=presigned_url_endpoint,
216+
presigned_url_query_params=presigned_url_query_params,
211217
)
212218

213219
def upload_blob_s3(
@@ -218,12 +224,18 @@ def upload_blob_s3(
218224
body=None,
219225
method: str = "POST",
220226
storage_uri_key: str = "storageUri",
227+
presigned_url_endpoint: str = "storage/presigned-url",
228+
presigned_url_query_params: str = "",
221229
):
222230
"""Generic method to upload data to S3 storage and create the appropriate
223231
resource in the backend.
224232
"""
233+
225234
presigned_json = self.post_request(
226-
f"storage/presigned-url?objectName={object_name}"
235+
(
236+
f"{presigned_url_endpoint}?objectName={object_name}"
237+
f"&{presigned_url_query_params}"
238+
)
227239
)
228240

229241
with tqdm(
@@ -236,7 +248,7 @@ def upload_blob_s3(
236248
with open(file_path, "rb") as f:
237249
# Avoid logging here as it will break the progress bar
238250
fields = presigned_json["fields"]
239-
fields["file"] = (presigned_json["id"], f, "application/x-tar")
251+
fields["file"] = (object_name, f, "application/x-tar")
240252
e = MultipartEncoder(fields=fields)
241253
m = MultipartEncoderMonitor(
242254
e, lambda monitor: t.update(min(t.total, monitor.bytes_read) - t.n)
@@ -267,12 +279,17 @@ def upload_blob_gcs(
267279
body=None,
268280
method: str = "POST",
269281
storage_uri_key: str = "storageUri",
282+
presigned_url_endpoint: str = "storage/presigned-url",
283+
presigned_url_query_params: str = "",
270284
):
271285
"""Generic method to upload data to Google Cloud Storage and create the
272286
appropriate resource in the backend.
273287
"""
274288
presigned_json = self.post_request(
275-
f"storage/presigned-url?objectName={object_name}"
289+
(
290+
f"{presigned_url_endpoint}?objectName={object_name}"
291+
f"&{presigned_url_query_params}"
292+
)
276293
)
277294
with open(file_path, "rb") as f:
278295
with tqdm(
@@ -306,12 +323,17 @@ def upload_blob_azure(
306323
body=None,
307324
method: str = "POST",
308325
storage_uri_key: str = "storageUri",
326+
presigned_url_endpoint: str = "storage/presigned-url",
327+
presigned_url_query_params: str = "",
309328
):
310329
"""Generic method to upload data to Azure Blob Storage and create the
311330
appropriate resource in the backend.
312331
"""
313332
presigned_json = self.post_request(
314-
f"storage/presigned-url?objectName={object_name}"
333+
(
334+
f"{presigned_url_endpoint}?objectName={object_name}"
335+
f"&{presigned_url_query_params}"
336+
)
315337
)
316338
with open(file_path, "rb") as f:
317339
with tqdm(
@@ -348,12 +370,17 @@ def transfer_blob(
348370
body=None,
349371
method: str = "POST",
350372
storage_uri_key: str = "storageUri",
373+
presigned_url_endpoint: str = "storage/presigned-url",
374+
presigned_url_query_params: str = "",
351375
):
352376
"""Generic method to transfer data to the openlayer folder and create the
353377
appropriate resource in the backend when using a local deployment.
354378
"""
355379
presigned_json = self.post_request(
356-
f"storage/presigned-url?objectName={object_name}"
380+
(
381+
f"{presigned_url_endpoint}?objectName={object_name}"
382+
f"&{presigned_url_query_params}"
383+
)
357384
)
358385
blob_path = presigned_json["storageUri"].replace("local://", "")
359386
dir_path = os.path.dirname(blob_path)

0 commit comments

Comments
 (0)