Skip to content

Commit e41f25c

Browse files
Address feedback 2: Try composition and refactor TESTRestCatalog
1 parent f67b364 commit e41f25c

File tree

4 files changed

+1176
-804
lines changed

4 files changed

+1176
-804
lines changed

core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -89,12 +89,12 @@
8989
public abstract class CatalogTests<C extends Catalog & SupportsNamespaces> {
9090
private static final String BASE_TABLE_LOCATION = "file:/tmp";
9191
protected static final Namespace NS = Namespace.of("newdb");
92-
protected static final TableIdentifier TABLE = TableIdentifier.of(NS, "newtable");
92+
public static final TableIdentifier TABLE = TableIdentifier.of(NS, "newtable");
9393
protected static final TableIdentifier RENAMED_TABLE = TableIdentifier.of(NS, "table_renamed");
9494
protected static final TableIdentifier TBL = TableIdentifier.of("ns", "tbl");
9595

9696
// Schema passed to create tables
97-
protected static final Schema SCHEMA =
97+
public static final Schema SCHEMA =
9898
new Schema(
9999
required(3, "id", Types.IntegerType.get(), "unique ID 🤪"),
100100
required(4, "data", Types.StringType.get()));
@@ -116,7 +116,7 @@ public abstract class CatalogTests<C extends Catalog & SupportsNamespaces> {
116116
new Schema(required(1, "some_id", Types.IntegerType.get()));
117117

118118
// Partition spec used to create tables
119-
protected static final PartitionSpec SPEC =
119+
public static final PartitionSpec SPEC =
120120
PartitionSpec.builderFor(SCHEMA).bucket("id", 16).build();
121121

122122
protected static final PartitionSpec TABLE_SPEC =
@@ -135,23 +135,23 @@ public abstract class CatalogTests<C extends Catalog & SupportsNamespaces> {
135135
protected static final SortOrder REPLACE_WRITE_ORDER =
136136
SortOrder.builderFor(REPLACE_SCHEMA).asc(Expressions.bucket("id", 16)).asc("id").build();
137137

138-
protected static final DataFile FILE_A =
138+
public static final DataFile FILE_A =
139139
DataFiles.builder(SPEC)
140140
.withPath("/path/to/data-a.parquet")
141141
.withFileSizeInBytes(10)
142142
.withPartitionPath("id_bucket=0") // easy way to set partition data for now
143143
.withRecordCount(2) // needs at least one record or else metrics will filter it out
144144
.build();
145145

146-
protected static final DataFile FILE_B =
146+
public static final DataFile FILE_B =
147147
DataFiles.builder(SPEC)
148148
.withPath("/path/to/data-b.parquet")
149149
.withFileSizeInBytes(10)
150150
.withPartitionPath("id_bucket=1") // easy way to set partition data for now
151151
.withRecordCount(2) // needs at least one record or else metrics will filter it out
152152
.build();
153153

154-
protected static final DataFile FILE_C =
154+
public static final DataFile FILE_C =
155155
DataFiles.builder(SPEC)
156156
.withPath("/path/to/data-c.parquet")
157157
.withFileSizeInBytes(10)
@@ -160,7 +160,7 @@ public abstract class CatalogTests<C extends Catalog & SupportsNamespaces> {
160160
.build();
161161

162162
// Delete files for testing
163-
protected static final DeleteFile FILE_A_DELETES =
163+
public static final DeleteFile FILE_A_DELETES =
164164
FileMetadata.deleteFileBuilder(SPEC)
165165
.ofPositionDeletes()
166166
.withPath("/path/to/data-a-deletes.parquet")
@@ -169,7 +169,7 @@ public abstract class CatalogTests<C extends Catalog & SupportsNamespaces> {
169169
.withRecordCount(1)
170170
.build();
171171

172-
protected static final DeleteFile FILE_A_EQUALITY_DELETES =
172+
public static final DeleteFile FILE_A_EQUALITY_DELETES =
173173
FileMetadata.deleteFileBuilder(SPEC)
174174
.ofEqualityDeletes(1) // delete on column 1 (id column)
175175
.withPath("/path/to/data-a-equality-deletes.parquet")
@@ -178,7 +178,7 @@ public abstract class CatalogTests<C extends Catalog & SupportsNamespaces> {
178178
.withRecordCount(1)
179179
.build();
180180

181-
protected static final DeleteFile FILE_B_DELETES =
181+
public static final DeleteFile FILE_B_DELETES =
182182
FileMetadata.deleteFileBuilder(SPEC)
183183
.ofPositionDeletes()
184184
.withPath("/path/to/data-b-deletes.parquet")
@@ -187,7 +187,7 @@ public abstract class CatalogTests<C extends Catalog & SupportsNamespaces> {
187187
.withRecordCount(1)
188188
.build();
189189

190-
protected static final DeleteFile FILE_B_EQUALITY_DELETES =
190+
public static final DeleteFile FILE_B_EQUALITY_DELETES =
191191
FileMetadata.deleteFileBuilder(SPEC)
192192
.ofEqualityDeletes(1) // delete on column 1 (id column)
193193
.withPath("/path/to/data-b-equality-deletes.parquet")
@@ -196,7 +196,7 @@ public abstract class CatalogTests<C extends Catalog & SupportsNamespaces> {
196196
.withRecordCount(1)
197197
.build();
198198

199-
protected static final DeleteFile FILE_C_EQUALITY_DELETES =
199+
public static final DeleteFile FILE_C_EQUALITY_DELETES =
200200
FileMetadata.deleteFileBuilder(SPEC)
201201
.ofEqualityDeletes(1) // delete on column 1 (id column)
202202
.withPath("/path/to/data-c-equality-deletes.parquet")
Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.rest;
20+
21+
import static org.assertj.core.api.Assertions.assertThat;
22+
23+
import com.fasterxml.jackson.core.JsonProcessingException;
24+
import com.fasterxml.jackson.databind.ObjectMapper;
25+
import com.fasterxml.jackson.databind.ObjectReader;
26+
import java.io.File;
27+
import java.net.InetAddress;
28+
import java.net.InetSocketAddress;
29+
import java.nio.file.Path;
30+
import java.util.Map;
31+
import java.util.UUID;
32+
import java.util.function.Consumer;
33+
import org.apache.hadoop.conf.Configuration;
34+
import org.apache.iceberg.CatalogProperties;
35+
import org.apache.iceberg.catalog.SessionCatalog;
36+
import org.apache.iceberg.inmemory.InMemoryCatalog;
37+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
38+
import org.apache.iceberg.rest.responses.ErrorResponse;
39+
import org.eclipse.jetty.server.Server;
40+
import org.eclipse.jetty.server.handler.gzip.GzipHandler;
41+
import org.eclipse.jetty.servlet.ServletContextHandler;
42+
import org.eclipse.jetty.servlet.ServletHolder;
43+
import org.mockito.Mockito;
44+
45+
public class RESTCatalogTestInfrastructure {
46+
protected static final ObjectMapper MAPPER = RESTObjectMapper.mapper();
47+
48+
protected RESTCatalog restCatalog;
49+
protected InMemoryCatalog backendCatalog;
50+
protected Server httpServer;
51+
protected RESTCatalogAdapter adapterForRESTServer;
52+
protected ParserContext parserContext;
53+
54+
public void before(Path temp) throws Exception {
55+
File warehouse = temp.toFile();
56+
57+
this.backendCatalog = new InMemoryCatalog();
58+
this.backendCatalog.initialize(
59+
"in-memory",
60+
ImmutableMap.of(CatalogProperties.WAREHOUSE_LOCATION, warehouse.getAbsolutePath()));
61+
62+
HTTPHeaders catalogHeaders =
63+
HTTPHeaders.of(
64+
Map.of(
65+
"Authorization",
66+
"Bearer client-credentials-token:sub=catalog",
67+
"test-header",
68+
"test-value"));
69+
HTTPHeaders contextHeaders =
70+
HTTPHeaders.of(
71+
Map.of(
72+
"Authorization",
73+
"Bearer client-credentials-token:sub=user",
74+
"test-header",
75+
"test-value"));
76+
77+
adapterForRESTServer =
78+
Mockito.spy(
79+
new RESTCatalogAdapter(backendCatalog) {
80+
@Override
81+
public <T extends RESTResponse> T execute(
82+
HTTPRequest request,
83+
Class<T> responseType,
84+
Consumer<ErrorResponse> errorHandler,
85+
Consumer<Map<String, String>> responseHeaders) {
86+
if (!ResourcePaths.tokens().equals(request.path())) {
87+
if (ResourcePaths.config().equals(request.path())) {
88+
assertThat(request.headers().entries()).containsAll(catalogHeaders.entries());
89+
} else {
90+
assertThat(request.headers().entries()).containsAll(contextHeaders.entries());
91+
}
92+
}
93+
Object body = roundTripSerialize(request.body(), "request");
94+
HTTPRequest req = ImmutableHTTPRequest.builder().from(request).body(body).build();
95+
T response = super.execute(req, responseType, errorHandler, responseHeaders);
96+
return roundTripSerialize(response, "response");
97+
}
98+
});
99+
100+
ServletContextHandler servletContext =
101+
new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
102+
servletContext.addServlet(
103+
new ServletHolder(new RESTCatalogServlet(adapterForRESTServer)), "/*");
104+
servletContext.setHandler(new GzipHandler());
105+
106+
this.httpServer = new Server(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
107+
httpServer.setHandler(servletContext);
108+
httpServer.start();
109+
110+
this.restCatalog = initCatalog("prod", ImmutableMap.of());
111+
}
112+
113+
public void after() throws Exception {
114+
if (restCatalog != null) {
115+
restCatalog.close();
116+
}
117+
118+
if (backendCatalog != null) {
119+
backendCatalog.close();
120+
}
121+
122+
if (httpServer != null) {
123+
httpServer.stop();
124+
httpServer.join();
125+
}
126+
}
127+
128+
public RESTCatalog initCatalog(String catalogName, Map<String, String> additionalProperties) {
129+
Configuration conf = new Configuration();
130+
SessionCatalog.SessionContext context =
131+
new SessionCatalog.SessionContext(
132+
UUID.randomUUID().toString(),
133+
"user",
134+
ImmutableMap.of("credential", "user:12345"),
135+
ImmutableMap.of());
136+
137+
RESTCatalog catalog =
138+
new RESTCatalog(
139+
context,
140+
(config) ->
141+
HTTPClient.builder(config)
142+
.uri(config.get(CatalogProperties.URI))
143+
.withHeaders(RESTUtil.configHeaders(config))
144+
.build());
145+
catalog.setConf(conf);
146+
Map<String, String> properties =
147+
ImmutableMap.of(
148+
CatalogProperties.URI,
149+
httpServer.getURI().toString(),
150+
CatalogProperties.FILE_IO_IMPL,
151+
"org.apache.iceberg.inmemory.InMemoryFileIO",
152+
CatalogProperties.TABLE_DEFAULT_PREFIX + "default-key1",
153+
"catalog-default-key1",
154+
CatalogProperties.TABLE_DEFAULT_PREFIX + "default-key2",
155+
"catalog-default-key2",
156+
CatalogProperties.TABLE_DEFAULT_PREFIX + "override-key3",
157+
"catalog-default-key3",
158+
CatalogProperties.TABLE_OVERRIDE_PREFIX + "override-key3",
159+
"catalog-override-key3",
160+
CatalogProperties.TABLE_OVERRIDE_PREFIX + "override-key4",
161+
"catalog-override-key4",
162+
"credential",
163+
"catalog:12345",
164+
"header.test-header",
165+
"test-value");
166+
catalog.initialize(
167+
catalogName,
168+
ImmutableMap.<String, String>builder()
169+
.putAll(properties)
170+
.putAll(additionalProperties)
171+
.build());
172+
return catalog;
173+
}
174+
175+
public RESTCatalog catalog() {
176+
return restCatalog;
177+
}
178+
179+
public InMemoryCatalog backendCatalog() {
180+
return backendCatalog;
181+
}
182+
183+
public Server httpServer() {
184+
return httpServer;
185+
}
186+
187+
public RESTCatalogAdapter adapter() {
188+
return adapterForRESTServer;
189+
}
190+
191+
@SuppressWarnings("unchecked")
192+
public <T> T roundTripSerialize(T payload, String description) {
193+
if (payload == null) {
194+
return null;
195+
}
196+
197+
try {
198+
if (payload instanceof RESTMessage) {
199+
RESTMessage message = (RESTMessage) payload;
200+
ObjectReader reader = MAPPER.readerFor(message.getClass());
201+
if (parserContext != null && !parserContext.isEmpty()) {
202+
reader = reader.with(parserContext.toInjectableValues());
203+
}
204+
return (T) reader.readValue(MAPPER.writeValueAsString(message));
205+
} else {
206+
return payload;
207+
}
208+
} catch (JsonProcessingException e) {
209+
throw new RuntimeException(
210+
String.format("Failed to serialize and deserialize %s: %s", description, payload), e);
211+
}
212+
}
213+
214+
public void setParserContext(org.apache.iceberg.Table table) {
215+
parserContext =
216+
ParserContext.builder().add("specsById", table.specs()).add("caseSensitive", false).build();
217+
}
218+
219+
public ParserContext parserContext() {
220+
return parserContext;
221+
}
222+
}

0 commit comments

Comments
 (0)