1414
1515import csv
1616import json
17+ import os
1718import random
1819import string
1920import time
2021from io import BytesIO , StringIO
2122from textwrap import dedent
2223
23- import pyarrow .parquet #
24+ import boto3
25+ import pyarrow .parquet
2426from minio import Minio
2527
2628from materialize .mzcompose .composition import (
@@ -178,6 +180,87 @@ def workflow_nightly(c: Composition, parser: WorkflowArgumentParser) -> None:
178180 del actual_parquet
179181
180182
183+ def workflow_gcs (c : Composition ) -> None :
184+ """
185+ Test COPY TO S3 with GCS using HMAC authentication. This test requires access to GCS.
186+ """
187+ # We can't use the current GCP_SERVICE_ACCOUNT_JSON as that is a credentials file used by GCP
188+ # SDK. See <https://cloud.google.com/docs/authentication/application-default-credentials>
189+ #
190+ # We need HMAC keys, which contain access and secret keys (these don't exist in CI today)
191+ # see <https://cloud.google.com/storage/docs/authentication/hmackeys>
192+
193+ def env_get_or_fail (key : str ) -> str :
194+ value = os .environ .get (key )
195+ assert value is not None , f"{ key } evironment variable must be set"
196+ return value
197+
198+ gcs_access_key = env_get_or_fail ("GCS_ACCESS_KEY" )
199+ gcs_secret_key = env_get_or_fail ("GCS_SECRET_KEY" )
200+ gcs_bucket = env_get_or_fail ("GCS_BUCKET" )
201+ gcs_region = env_get_or_fail ("GCS_REGION" )
202+ gcs_endpoint = os .environ .get ("GCS_ENDPOINT" , "https://storage.googleapis.com" )
203+
204+ key_prefix = f"copy_to/{ make_random_key (10 )} /"
205+
206+ s3 = boto3 .client (
207+ "s3" ,
208+ endpoint_url = gcs_endpoint ,
209+ region_name = gcs_region ,
210+ aws_access_key_id = gcs_access_key ,
211+ aws_secret_access_key = gcs_secret_key ,
212+ )
213+ contents = []
214+ try :
215+ with c .override (Testdrive (no_reset = True )):
216+ c .up ("materialized" )
217+ c .testdrive (
218+ dedent (
219+ f"""
220+ > CREATE SECRET gcs_secret AS '{ gcs_secret_key } ';
221+ > CREATE CONNECTION gcs_conn
222+ TO AWS (
223+ ACCESS KEY ID = '{ gcs_access_key } ',
224+ SECRET ACCESS KEY = SECRET gcs_secret,
225+ ENDPOINT = '{ gcs_endpoint } ',
226+ REGION = '{ gcs_region } '
227+ );
228+ > CREATE TABLE t (a int, b text, c float);
229+ > INSERT INTO t VALUES (1, 'a', 1.1), (2, 'b', 2.2);
230+
231+ > COPY (SELECT * FROM t)
232+ TO 's3://{ gcs_bucket } /{ key_prefix } '
233+ WITH (
234+ AWS CONNECTION = gcs_conn, FORMAT = 'csv', HEADER
235+ );
236+ """
237+ )
238+ )
239+ # list should contain a single file (we won't know the name ahead of time, just the prefix).
240+ listing = s3 .list_objects_v2 (Bucket = gcs_bucket , Prefix = key_prefix )
241+
242+ contents = listing ["Contents" ]
243+ assert len (contents ) == 1 , f"expected 1 file, got { contents } "
244+
245+ key = contents [0 ]["Key" ]
246+ assert key .endswith (".csv" ), f"expected .csv suffix, got { key } "
247+
248+ object_response = s3 .get_object (Bucket = gcs_bucket , Key = key )
249+ body = object_response ["Body" ].read ().decode ("utf-8" )
250+ csv_reader = csv .DictReader (StringIO (body ))
251+ actual = [row for row in csv_reader ]
252+
253+ # all fields are read in as strings
254+ expected = [{"a" : "1" , "b" : "a" , "c" : "1.1" }, {"a" : "2" , "b" : "b" , "c" : "2.2" }]
255+ assert actual == expected , f"actual: { actual } != expected: { expected } "
256+
257+ finally :
258+ # This is a best effort cleanup, as the process may exit before reaching this point.
259+ for obj in contents :
260+ key = obj ["Key" ]
261+ s3 .delete_object (Bucket = gcs_bucket , Key = key )
262+
263+
181264def workflow_ci (c : Composition , _parser : WorkflowArgumentParser ) -> None :
182265 """
183266 Workflows to run during CI
@@ -187,6 +270,13 @@ def workflow_ci(c: Composition, _parser: WorkflowArgumentParser) -> None:
187270 c .workflow (name )
188271
189272
273+ def make_random_key (n : int ):
274+ return "" .join (
275+ random .SystemRandom ().choice (string .ascii_uppercase + string .digits )
276+ for _ in range (n )
277+ )
278+
279+
190280def workflow_auth (c : Composition ) -> None :
191281 c .up (Service ("mc" , idle = True ), "materialized" , "minio" )
192282
@@ -195,12 +285,6 @@ def workflow_auth(c: Composition) -> None:
195285 # User 'nodelete': PutObject, ListBucket
196286 # User 'read': GetObject, ListBucket
197287
198- def make_random_key (n : int ):
199- return "" .join (
200- random .SystemRandom ().choice (string .ascii_uppercase + string .digits )
201- for _ in range (n )
202- )
203-
204288 def make_user (username : str , actions : list [str ]):
205289 return (
206290 username ,
0 commit comments