Skip to content

Commit 366ff67

Browse files
author
Komal Yadav
committed
Add SpannerMetadataModule with ExtensionLoader
1 parent 60e58d4 commit 366ff67

File tree

23 files changed

+1098
-563
lines changed

23 files changed

+1098
-563
lines changed

cdap-app-fabric/src/main/java/io/cdap/cdap/metadata/MetadataHttpHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@
3131
import io.cdap.cdap.common.security.AuditDetail;
3232
import io.cdap.cdap.common.security.AuditPolicy;
3333
import io.cdap.cdap.data2.metadata.MetadataCompatibility;
34-
import io.cdap.cdap.metadata.elastic.ScopedNameOfKindTypeAdapter;
35-
import io.cdap.cdap.metadata.elastic.ScopedNameTypeAdapter;
34+
import io.cdap.cdap.spi.metadata.ScopedNameTypeAdapter;
35+
import io.cdap.cdap.spi.metadata.ScopedNameOfKindTypeAdapter;
3636
import io.cdap.cdap.proto.EntityScope;
3737
import io.cdap.cdap.proto.ProgramType;
3838
import io.cdap.cdap.proto.codec.NamespacedEntityIdCodec;

cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2114,9 +2114,13 @@ public static final class Metadata {
21142114
public static final String STORAGE_PROVIDER_IMPLEMENTATION = "metadata.storage.implementation";
21152115
public static final String STORAGE_PROVIDER_NOSQL = "nosql";
21162116
public static final String STORAGE_PROVIDER_ELASTICSEARCH = "elastic";
2117+
public static final String STORAGE_PROVIDER_SPANNER = "spanner";
21172118

21182119
public static final String METADATA_WRITER_SUBSCRIBER = "metadata.writer";
21192120
public static final String METADATA_CONSUMER_WRITER_SUBSCRIBER = "metadata.consumer.writer";
2121+
2122+
// Metadata configs
2123+
public static final String METADATA_STORAGE_EXT_DIR = "metadata.storage.extensions.dir";
21202124
}
21212125

21222126
/**

cdap-common/src/main/resources/cdap-default.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2845,6 +2845,11 @@
28452845
<value>/opt/cdap/master/ext/log-publisher</value>
28462846
</property>
28472847

2848+
<property>
2849+
<name>metadata.storage.extensions.dir</name>
2850+
<value>/opt/cdap/master/ext/metadata-storage</value>
2851+
</property>
2852+
28482853
<!-- Metrics Configuration -->
28492854

28502855
<property>

cdap-data-fabric/src/main/java/io/cdap/cdap/data/runtime/DataSetsModules.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@
4343
import io.cdap.cdap.data2.registry.BasicUsageRegistry;
4444
import io.cdap.cdap.data2.registry.UsageRegistry;
4545
import io.cdap.cdap.data2.registry.UsageWriter;
46-
import io.cdap.cdap.metadata.elastic.ElasticsearchMetadataStorage;
4746
import io.cdap.cdap.security.impersonation.OwnerStore;
47+
import io.cdap.cdap.spi.metadata.DefaultMetadataStorageProvider;
4848
import io.cdap.cdap.spi.metadata.MetadataStorage;
4949
import io.cdap.cdap.spi.metadata.dataset.DatasetMetadataStorage;
5050
import io.cdap.cdap.spi.metadata.noop.NoopMetadataStorage;
@@ -179,7 +179,7 @@ public MetadataStorage get() {
179179
return injector.getInstance(DatasetMetadataStorage.class);
180180
}
181181
if (Constants.Metadata.STORAGE_PROVIDER_ELASTICSEARCH.equalsIgnoreCase(config)) {
182-
return injector.getInstance(ElasticsearchMetadataStorage.class);
182+
return injector.getInstance(DefaultMetadataStorageProvider.class);
183183
}
184184
throw new IllegalArgumentException("Unsupported MetadataStorage '" + config + "'. Only '"
185185
+ Constants.Metadata.STORAGE_PROVIDER_NOSQL + "' and '"

cdap-data-fabric/src/main/java/io/cdap/cdap/data2/metadata/AuditMetadataStorage.java

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,15 @@
3333
import io.cdap.cdap.proto.audit.AuditType;
3434
import io.cdap.cdap.proto.audit.payload.metadata.MetadataPayload;
3535
import io.cdap.cdap.spi.metadata.Metadata;
36+
import io.cdap.cdap.spi.metadata.MetadataStorage;
37+
import io.cdap.cdap.spi.metadata.SearchRequest;
3638
import io.cdap.cdap.spi.metadata.MetadataChange;
3739
import io.cdap.cdap.spi.metadata.MetadataMutation;
38-
import io.cdap.cdap.spi.metadata.MetadataStorage;
3940
import io.cdap.cdap.spi.metadata.MutationOptions;
4041
import io.cdap.cdap.spi.metadata.Read;
41-
import io.cdap.cdap.spi.metadata.SearchRequest;
4242
import io.cdap.cdap.spi.metadata.SearchResponse;
4343
import java.io.IOException;
44+
import java.util.Collections;
4445
import java.util.EnumSet;
4546
import java.util.List;
4647
import java.util.Map;
@@ -72,7 +73,7 @@ public class AuditMetadataStorage implements MetadataStorage {
7273

7374
@Inject
7475
public AuditMetadataStorage(@Named(DataSetsModules.SPI_BASE_IMPL) MetadataStorage storage,
75-
MetricsCollectionService metricsCollectionService) {
76+
MetricsCollectionService metricsCollectionService) {
7677
this.storage = storage;
7778
this.metricsCollectionService = metricsCollectionService;
7879
}
@@ -83,6 +84,16 @@ public void setAuditPublisher(AuditPublisher auditPublisher) {
8384
this.auditPublisher = auditPublisher;
8485
}
8586

87+
@Override
88+
public String getName() {
89+
return "audit";
90+
}
91+
92+
@Override
93+
public Object getDatasetMetadata(String datasetName) {
94+
return Collections.emptyMap();
95+
}
96+
8697
@Override
8798
public void createIndex() throws IOException {
8899
try {
@@ -107,7 +118,7 @@ public void dropIndex() throws IOException {
107118

108119
@Override
109120
public MetadataChange apply(MetadataMutation mutation, MutationOptions options)
110-
throws IOException {
121+
throws IOException {
111122
MetadataChange change;
112123
try {
113124
change = storage.apply(mutation, options);
@@ -122,7 +133,7 @@ public MetadataChange apply(MetadataMutation mutation, MutationOptions options)
122133

123134
@Override
124135
public List<MetadataChange> batch(List<? extends MetadataMutation> mutations,
125-
MutationOptions options) throws IOException {
136+
MutationOptions options) throws IOException {
126137
List<MetadataChange> changes;
127138
try {
128139
changes = storage.batch(mutations, options);
@@ -173,8 +184,8 @@ public void close() {
173184

174185
private void emitMetrics(String metricSuffix) {
175186
MetricsCollector metricsCollector = metricsCollectionService.getContext(
176-
Constants.Metrics.STORAGE_METRICS_TAGS);
177-
metricsCollector.increment(Constants.Metrics.MetadataStorage.METRICS_PREFIX + metricSuffix, 1L);
187+
Constants.Metrics.STORAGE_METRICS_TAGS);
188+
metricsCollector.increment(Constants.Metrics.MetadataStorage.METRICS_PREFIX + metricSuffix,1L);
178189
}
179190

180191
private void publishAudit(MetadataChange change) {
@@ -196,30 +207,30 @@ private void publishAudit(MetadataChange change, MetadataScope scope) {
196207

197208
// previous state is already given
198209
MetadataRecord previous = new MetadataRecord(change.getEntity(), scope, propsBefore,
199-
tagsBefore);
210+
tagsBefore);
200211

201212
// compute what was added
202213
@SuppressWarnings("ConstantConditions")
203214
Map<String, String> propsAdded = Maps.filterEntries(
204-
propsAfter, entry -> !entry.getValue().equals(propsBefore.get(entry.getKey())));
215+
propsAfter, entry -> !entry.getValue().equals(propsBefore.get(entry.getKey())));
205216
Set<String> tagsAdded = Sets.difference(tagsAfter, tagsBefore);
206217
MetadataRecord additions = new MetadataRecord(change.getEntity(), scope, propsAdded, tagsAdded);
207218

208219
// compute what was deleted
209220
@SuppressWarnings("ConstantConditions")
210221
Map<String, String> propsDeleted = Maps.filterEntries(
211-
propsBefore, entry -> !entry.getValue().equals(propsAfter.get(entry.getKey())));
222+
propsBefore, entry -> !entry.getValue().equals(propsAfter.get(entry.getKey())));
212223
Set<String> tagsDeleted = Sets.difference(tagsBefore, tagsAfter);
213224
MetadataRecord deletions = new MetadataRecord(change.getEntity(), scope, propsDeleted,
214-
tagsDeleted);
225+
tagsDeleted);
215226

216227
// and publish
217228
MetadataPayload payload = new MetadataPayloadBuilder()
218-
.addPrevious(previous)
219-
.addAdditions(additions)
220-
.addDeletions(deletions)
221-
.build();
229+
.addPrevious(previous)
230+
.addAdditions(additions)
231+
.addDeletions(deletions)
232+
.build();
222233
AuditPublishers.publishAudit(auditPublisher, previous.getMetadataEntity(),
223-
AuditType.METADATA_CHANGE, payload);
234+
AuditType.METADATA_CHANGE, payload);
224235
}
225236
}

cdap-data-fabric/src/main/java/io/cdap/cdap/data2/metadata/writer/DefaultMetadataServiceClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@
2222
import io.cdap.cdap.common.http.DefaultHttpRequestConfig;
2323
import io.cdap.cdap.common.internal.remote.RemoteClient;
2424
import io.cdap.cdap.common.internal.remote.RemoteClientFactory;
25-
import io.cdap.cdap.metadata.elastic.ScopedNameOfKindTypeAdapter;
26-
import io.cdap.cdap.metadata.elastic.ScopedNameTypeAdapter;
25+
import io.cdap.cdap.spi.metadata.ScopedNameTypeAdapter;
26+
import io.cdap.cdap.spi.metadata.ScopedNameOfKindTypeAdapter;
2727
import io.cdap.cdap.proto.codec.NamespacedEntityIdCodec;
2828
import io.cdap.cdap.proto.id.NamespacedEntityId;
2929
import io.cdap.cdap.spi.metadata.Metadata;
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package io.cdap.cdap.spi.metadata;
2+
3+
import com.google.inject.Inject;
4+
import io.cdap.cdap.common.conf.CConfiguration;
5+
import io.cdap.cdap.common.conf.Constants;
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
9+
import java.io.IOException;
10+
import java.util.List;
11+
12+
public class DefaultMetadataStorageProvider implements MetadataStorage {
13+
private static final Logger LOG = LoggerFactory.getLogger(DefaultMetadataStorageProvider.class);
14+
15+
private final CConfiguration cConf;
16+
private final MetadataStorageExtensionLoader extensionLoader;
17+
18+
private volatile MetadataStorage delegate;
19+
20+
@Inject
21+
DefaultMetadataStorageProvider(CConfiguration cConf, MetadataStorageExtensionLoader extensionLoader) {
22+
this.cConf = cConf;
23+
this.extensionLoader = extensionLoader;
24+
this.extensionLoader.getAll();
25+
26+
}
27+
28+
@Override
29+
public void createIndex() throws IOException {
30+
getDelegate().createIndex();
31+
}
32+
33+
@Override
34+
public void close() {
35+
if (delegate != null) {
36+
delegate.close();
37+
}
38+
}
39+
40+
public Object getDatasetMetadata(String datasetName) {
41+
return getDelegate().getDatasetMetadata(datasetName);
42+
}
43+
44+
public String getName() {
45+
return cConf.get(Constants.Metadata.STORAGE_PROVIDER_SPANNER);
46+
}
47+
48+
@Override
49+
public void dropIndex() throws IOException {
50+
getDelegate().dropIndex();
51+
}
52+
53+
@Override
54+
public MetadataChange apply(MetadataMutation mutation, MutationOptions options)
55+
throws IOException {
56+
return getDelegate().apply(mutation, options);
57+
}
58+
59+
@Override
60+
public List<MetadataChange> batch(List<? extends MetadataMutation> mutations,
61+
MutationOptions options) throws IOException {
62+
return getDelegate().batch(mutations, options);
63+
}
64+
65+
@Override
66+
public Metadata read(Read read) throws IOException {
67+
return getDelegate().read(read);
68+
}
69+
70+
@Override
71+
public SearchResponse search(SearchRequest request)
72+
throws IOException {
73+
return getDelegate().search(request);
74+
}
75+
76+
private MetadataStorage getDelegate() {
77+
MetadataStorage metadataStorage = this.delegate;
78+
if (metadataStorage != null) {
79+
return metadataStorage;
80+
}
81+
synchronized (this) {
82+
metadataStorage = this.delegate;
83+
if (metadataStorage != null) {
84+
return metadataStorage;
85+
}
86+
LOG.info(extensionLoader.get(getName()).toString());
87+
metadataStorage = extensionLoader.get(getName());
88+
89+
if (metadataStorage == null) {
90+
throw new IllegalArgumentException(
91+
"Unsupported metadata storage implementation " + getName());
92+
}
93+
LOG.info("Metadata Storage {} is loaded", metadataStorage.getName());
94+
try {
95+
metadataStorage.initialize(new DefaultMetadataStorageProviderContext(this.cConf,
96+
metadataStorage.getName()));
97+
} catch (Exception e) {
98+
throw new RuntimeException(e);
99+
}
100+
LOG.info("Metadata storage {} is initialized.", metadataStorage.getName());
101+
102+
this.delegate = metadataStorage;
103+
return metadataStorage;
104+
}
105+
}
106+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package io.cdap.cdap.spi.metadata;
2+
3+
import io.cdap.cdap.common.conf.CConfiguration;
4+
import io.cdap.cdap.common.conf.Constants;
5+
6+
import java.util.Collections;
7+
import java.util.Map;
8+
9+
public class DefaultMetadataStorageProviderContext implements MetadataStorageContext {
10+
11+
private static final String storageImpl = "gcp-spanner";
12+
private final Map<String, String> cConf;
13+
private final Map<String, String> properties;
14+
15+
protected DefaultMetadataStorageProviderContext(CConfiguration cConf, String storageName) {
16+
String propertiesPrefix =
17+
Constants.Dataset.STORAGE_EXTENSION_PROPERTY_PREFIX + storageImpl + ".";
18+
this.cConf = Collections.unmodifiableMap(cConf.getPropsWithPrefix(propertiesPrefix));
19+
this.properties = Collections.unmodifiableMap(cConf.getPropsWithPrefix(propertiesPrefix));
20+
}
21+
22+
@Override
23+
public Map<String, String> getProperties() {
24+
return this.properties;
25+
}
26+
27+
@Override
28+
public Map<String, String> getConfiguration() {
29+
return cConf;
30+
}
31+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package io.cdap.cdap.spi.metadata;
2+
3+
import com.google.inject.Inject;
4+
import io.cdap.cdap.common.conf.CConfiguration;
5+
import io.cdap.cdap.common.conf.Constants;
6+
import io.cdap.cdap.common.lang.ClassPathResources;
7+
import io.cdap.cdap.common.lang.FilterClassLoader;
8+
import io.cdap.cdap.extension.AbstractExtensionLoader;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
11+
12+
import java.io.IOException;
13+
import java.util.Collections;
14+
import java.util.Set;
15+
16+
/**
17+
* Extension loader for {@link MetadataStorage} implementations.
18+
*/
19+
public class MetadataStorageExtensionLoader extends AbstractExtensionLoader<String, MetadataStorage> {
20+
21+
private static final Logger LOG = LoggerFactory.getLogger(MetadataStorageExtensionLoader.class);
22+
private static final Set<String> ALLOWED_RESOURCES = createAllowedResources();
23+
private static final Set<String> ALLOWED_PACKAGES = createPackageSets(ALLOWED_RESOURCES);
24+
25+
/**
26+
* Constructs a {@link MetadataStorageExtensionLoader} to manage the loading of SpannerMetadata
27+
* extensions.
28+
*
29+
* @param cConf The configuration object containing properties for loading SpannerMetadata
30+
* extensions.
31+
*/
32+
@Inject
33+
public MetadataStorageExtensionLoader(CConfiguration cConf) {
34+
super(cConf.get(Constants.Metadata.METADATA_STORAGE_EXT_DIR));
35+
LOG.debug("Metadata Storage extensions directory: {}",
36+
cConf.get(Constants.Metadata.METADATA_STORAGE_EXT_DIR));
37+
}
38+
39+
private static Set<String> createAllowedResources() {
40+
try {
41+
return ClassPathResources.getResourcesWithDependencies(MetadataStorage.class.getClassLoader(),
42+
MetadataStorage.class);
43+
} catch (IOException e) {
44+
throw new RuntimeException("Failed to trace dependencies for MetadataStorage extension.", e);
45+
}
46+
}
47+
48+
@Override
49+
protected Set<String> getSupportedTypesForProvider(MetadataStorage metadataStorage) {
50+
return Collections.singleton(metadataStorage.getName());
51+
}
52+
53+
@Override
54+
protected FilterClassLoader.Filter getExtensionParentClassLoaderFilter() {
55+
return new FilterClassLoader.Filter() {
56+
@Override
57+
public boolean acceptResource(String resource) {
58+
return ALLOWED_RESOURCES.contains(resource);
59+
}
60+
61+
@Override
62+
public boolean acceptPackage(String packageName) {
63+
return ALLOWED_PACKAGES.contains(packageName);
64+
}
65+
};
66+
}
67+
}

0 commit comments

Comments
 (0)