Skip to content

Commit fe0dfb3

Browse files
author
Komal Yadav
committed
Add SpannerMetadataModule with ExtensionLoader
Format Changes changes
1 parent 60e58d4 commit fe0dfb3

File tree

24 files changed

+908
-363
lines changed

24 files changed

+908
-363
lines changed

cdap-app-fabric/src/main/java/io/cdap/cdap/metadata/MetadataHttpHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@
3131
import io.cdap.cdap.common.security.AuditDetail;
3232
import io.cdap.cdap.common.security.AuditPolicy;
3333
import io.cdap.cdap.data2.metadata.MetadataCompatibility;
34-
import io.cdap.cdap.metadata.elastic.ScopedNameOfKindTypeAdapter;
35-
import io.cdap.cdap.metadata.elastic.ScopedNameTypeAdapter;
34+
import io.cdap.cdap.spi.metadata.ScopedNameTypeAdapter;
35+
import io.cdap.cdap.spi.metadata.ScopedNameOfKindTypeAdapter;
3636
import io.cdap.cdap.proto.EntityScope;
3737
import io.cdap.cdap.proto.ProgramType;
3838
import io.cdap.cdap.proto.codec.NamespacedEntityIdCodec;

cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2114,9 +2114,13 @@ public static final class Metadata {
21142114
public static final String STORAGE_PROVIDER_IMPLEMENTATION = "metadata.storage.implementation";
21152115
public static final String STORAGE_PROVIDER_NOSQL = "nosql";
21162116
public static final String STORAGE_PROVIDER_ELASTICSEARCH = "elastic";
2117+
public static final String STORAGE_PROVIDER_SPANNER = "spanner";
21172118

21182119
public static final String METADATA_WRITER_SUBSCRIBER = "metadata.writer";
21192120
public static final String METADATA_CONSUMER_WRITER_SUBSCRIBER = "metadata.consumer.writer";
2121+
2122+
// Metadata configs
2123+
public static final String METADATA_STORAGE_EXT_DIR = "metadata.storage.extensions.dir";
21202124
}
21212125

21222126
/**

cdap-common/src/main/resources/cdap-default.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2845,6 +2845,11 @@
28452845
<value>/opt/cdap/master/ext/log-publisher</value>
28462846
</property>
28472847

2848+
<property>
2849+
<name>metadata.storage.extensions.dir</name>
2850+
<value>/opt/cdap/master/ext/metadata-storage</value>
2851+
</property>
2852+
28482853
<!-- Metrics Configuration -->
28492854

28502855
<property>

cdap-data-fabric/src/main/java/io/cdap/cdap/data/runtime/DataSetsModules.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@
4343
import io.cdap.cdap.data2.registry.BasicUsageRegistry;
4444
import io.cdap.cdap.data2.registry.UsageRegistry;
4545
import io.cdap.cdap.data2.registry.UsageWriter;
46-
import io.cdap.cdap.metadata.elastic.ElasticsearchMetadataStorage;
4746
import io.cdap.cdap.security.impersonation.OwnerStore;
47+
import io.cdap.cdap.spi.metadata.DefaultMetadataStorageProvider;
4848
import io.cdap.cdap.spi.metadata.MetadataStorage;
4949
import io.cdap.cdap.spi.metadata.dataset.DatasetMetadataStorage;
5050
import io.cdap.cdap.spi.metadata.noop.NoopMetadataStorage;
@@ -179,7 +179,7 @@ public MetadataStorage get() {
179179
return injector.getInstance(DatasetMetadataStorage.class);
180180
}
181181
if (Constants.Metadata.STORAGE_PROVIDER_ELASTICSEARCH.equalsIgnoreCase(config)) {
182-
return injector.getInstance(ElasticsearchMetadataStorage.class);
182+
return injector.getInstance(DefaultMetadataStorageProvider.class);
183183
}
184184
throw new IllegalArgumentException("Unsupported MetadataStorage '" + config + "'. Only '"
185185
+ Constants.Metadata.STORAGE_PROVIDER_NOSQL + "' and '"

cdap-data-fabric/src/main/java/io/cdap/cdap/data2/metadata/AuditMetadataStorage.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.util.List;
4646
import java.util.Map;
4747
import java.util.Set;
48+
import java.util.Collections;
4849

4950
/**
5051
* A metadata storage that delegates to another storage implementation and publishes all metadata
@@ -83,6 +84,16 @@ public void setAuditPublisher(AuditPublisher auditPublisher) {
8384
this.auditPublisher = auditPublisher;
8485
}
8586

87+
@Override
88+
public String getName() {
89+
return "audit";
90+
}
91+
92+
@Override
93+
public Object getDatasetMetadata(String datasetName) {
94+
return Collections.emptyMap();
95+
}
96+
8697
@Override
8798
public void createIndex() throws IOException {
8899
try {

cdap-data-fabric/src/main/java/io/cdap/cdap/data2/metadata/writer/DefaultMetadataServiceClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@
2222
import io.cdap.cdap.common.http.DefaultHttpRequestConfig;
2323
import io.cdap.cdap.common.internal.remote.RemoteClient;
2424
import io.cdap.cdap.common.internal.remote.RemoteClientFactory;
25-
import io.cdap.cdap.metadata.elastic.ScopedNameOfKindTypeAdapter;
26-
import io.cdap.cdap.metadata.elastic.ScopedNameTypeAdapter;
25+
import io.cdap.cdap.spi.metadata.ScopedNameTypeAdapter;
26+
import io.cdap.cdap.spi.metadata.ScopedNameOfKindTypeAdapter;
2727
import io.cdap.cdap.proto.codec.NamespacedEntityIdCodec;
2828
import io.cdap.cdap.proto.id.NamespacedEntityId;
2929
import io.cdap.cdap.spi.metadata.Metadata;
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
package io.cdap.cdap.spi.metadata;
2+
3+
import com.google.inject.Inject;
4+
import io.cdap.cdap.common.conf.CConfiguration;
5+
import io.cdap.cdap.common.conf.Constants;
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
9+
import java.io.IOException;
10+
import java.util.List;
11+
12+
public class DefaultMetadataStorageProvider implements MetadataStorage {
13+
private static final Logger LOG = LoggerFactory.getLogger(DefaultMetadataStorageProvider.class);
14+
15+
private final CConfiguration cConf;
16+
private final MetadataStorageExtensionLoader extensionLoader;
17+
18+
private volatile MetadataStorage delegate;
19+
20+
@Inject
21+
DefaultMetadataStorageProvider(CConfiguration cConf, MetadataStorageExtensionLoader extensionLoader) {
22+
this.cConf = cConf;
23+
this.extensionLoader = extensionLoader;
24+
this.extensionLoader.getAll();
25+
}
26+
27+
@Override
28+
public void createIndex() throws IOException {
29+
getDelegate().createIndex();
30+
}
31+
32+
@Override
33+
public void close() {
34+
if (delegate != null) {
35+
delegate.close();
36+
}
37+
}
38+
39+
public Object getDatasetMetadata(String datasetName) {
40+
return getDelegate().getDatasetMetadata(datasetName);
41+
}
42+
43+
public String getName() {
44+
return cConf.get(Constants.Metadata.STORAGE_PROVIDER_SPANNER);
45+
}
46+
47+
@Override
48+
public void dropIndex() throws IOException {
49+
getDelegate().dropIndex();
50+
}
51+
52+
@Override
53+
public MetadataChange apply(MetadataMutation mutation, MutationOptions options)
54+
throws IOException {
55+
return getDelegate().apply(mutation, options);
56+
}
57+
58+
@Override
59+
public List<MetadataChange> batch(List<? extends MetadataMutation> mutations,
60+
MutationOptions options) throws IOException {
61+
return getDelegate().batch(mutations, options);
62+
}
63+
64+
@Override
65+
public Metadata read(Read read) throws IOException {
66+
return getDelegate().read(read);
67+
}
68+
69+
@Override
70+
public SearchResponse search(SearchRequest request)
71+
throws IOException {
72+
return getDelegate().search(request);
73+
}
74+
75+
private MetadataStorage getDelegate() {
76+
MetadataStorage metadataStorage = this.delegate;
77+
if (metadataStorage != null) {
78+
return metadataStorage;
79+
}
80+
synchronized (this) {
81+
metadataStorage = this.delegate;
82+
if (metadataStorage != null) {
83+
return metadataStorage;
84+
}
85+
LOG.info(extensionLoader.get(getName()).toString());
86+
metadataStorage = extensionLoader.get(getName());
87+
88+
if (metadataStorage == null) {
89+
throw new IllegalArgumentException(
90+
"Unsupported metadata storage implementation " + getName());
91+
}
92+
LOG.info("Metadata Storage {} is loaded", metadataStorage.getName());
93+
try {
94+
metadataStorage.initialize(new DefaultMetadataStorageProviderContext(this.cConf,
95+
metadataStorage.getName()));
96+
} catch (Exception e) {
97+
throw new RuntimeException(e);
98+
}
99+
LOG.info("Metadata storage {} is initialized.", metadataStorage.getName());
100+
101+
this.delegate = metadataStorage;
102+
return metadataStorage;
103+
}
104+
}
105+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package io.cdap.cdap.spi.metadata;
2+
3+
import io.cdap.cdap.common.conf.CConfiguration;
4+
import io.cdap.cdap.common.conf.Constants;
5+
6+
import java.util.Collections;
7+
import java.util.Map;
8+
9+
public class DefaultMetadataStorageProviderContext implements MetadataStorageContext {
10+
11+
private static final String storageImpl = "gcp-spanner";
12+
private final Map<String, String> cConf;
13+
private final Map<String, String> properties;
14+
15+
protected DefaultMetadataStorageProviderContext(CConfiguration cConf, String storageName) {
16+
String propertiesPrefix =
17+
Constants.Dataset.STORAGE_EXTENSION_PROPERTY_PREFIX + storageImpl + ".";
18+
this.cConf = Collections.unmodifiableMap(cConf.getPropsWithPrefix(propertiesPrefix));
19+
this.properties = Collections.unmodifiableMap(cConf.getPropsWithPrefix(propertiesPrefix));
20+
}
21+
22+
@Override
23+
public Map<String, String> getProperties() {
24+
return this.properties;
25+
}
26+
27+
@Override
28+
public Map<String, String> getConfiguration() {
29+
return cConf;
30+
}
31+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package io.cdap.cdap.spi.metadata;
2+
3+
import com.google.inject.Inject;
4+
import io.cdap.cdap.common.conf.CConfiguration;
5+
import io.cdap.cdap.common.conf.Constants;
6+
import io.cdap.cdap.common.lang.ClassPathResources;
7+
import io.cdap.cdap.common.lang.FilterClassLoader;
8+
import io.cdap.cdap.extension.AbstractExtensionLoader;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
11+
12+
import java.io.IOException;
13+
import java.util.Collections;
14+
import java.util.Set;
15+
16+
/**
17+
* Extension loader for {@link MetadataStorage} implementations.
18+
*/
19+
public class MetadataStorageExtensionLoader extends AbstractExtensionLoader<String, MetadataStorage> {
20+
21+
private static final Logger LOG = LoggerFactory.getLogger(MetadataStorageExtensionLoader.class);
22+
private static final Set<String> ALLOWED_RESOURCES = createAllowedResources();
23+
private static final Set<String> ALLOWED_PACKAGES = createPackageSets(ALLOWED_RESOURCES);
24+
25+
/**
26+
* Constructs a {@link MetadataStorageExtensionLoader} to manage the loading of SpannerMetadata
27+
* extensions.
28+
*
29+
* @param cConf The configuration object containing properties for loading SpannerMetadata
30+
* extensions.
31+
*/
32+
@Inject
33+
public MetadataStorageExtensionLoader(CConfiguration cConf) {
34+
super(cConf.get(Constants.Metadata.METADATA_STORAGE_EXT_DIR));
35+
LOG.debug("Metadata Storage extensions directory: {}",
36+
cConf.get(Constants.Metadata.METADATA_STORAGE_EXT_DIR));
37+
}
38+
39+
private static Set<String> createAllowedResources() {
40+
try {
41+
return ClassPathResources.getResourcesWithDependencies(MetadataStorage.class.getClassLoader(),
42+
MetadataStorage.class);
43+
} catch (IOException e) {
44+
throw new RuntimeException("Failed to trace dependencies for MetadataStorage extension.", e);
45+
}
46+
}
47+
48+
@Override
49+
protected Set<String> getSupportedTypesForProvider(MetadataStorage metadataStorage) {
50+
return Collections.singleton(metadataStorage.getName());
51+
}
52+
53+
@Override
54+
protected FilterClassLoader.Filter getExtensionParentClassLoaderFilter() {
55+
return new FilterClassLoader.Filter() {
56+
@Override
57+
public boolean acceptResource(String resource) {
58+
return ALLOWED_RESOURCES.contains(resource);
59+
}
60+
61+
@Override
62+
public boolean acceptPackage(String packageName) {
63+
return ALLOWED_PACKAGES.contains(packageName);
64+
}
65+
};
66+
}
67+
}

cdap-data-fabric/src/main/java/io/cdap/cdap/spi/metadata/dataset/DatasetMetadataStorage.java

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,29 +39,29 @@
3939
import io.cdap.cdap.proto.id.NamespaceId;
4040
import io.cdap.cdap.proto.metadata.MetadataSearchResponse;
4141
import io.cdap.cdap.spi.metadata.Metadata;
42+
import io.cdap.cdap.spi.metadata.MetadataStorage;
43+
import io.cdap.cdap.spi.metadata.SearchRequest;
4244
import io.cdap.cdap.spi.metadata.MetadataChange;
43-
import io.cdap.cdap.spi.metadata.MetadataDirective;
44-
import io.cdap.cdap.spi.metadata.MetadataKind;
4545
import io.cdap.cdap.spi.metadata.MetadataMutation;
46-
import io.cdap.cdap.spi.metadata.MetadataRecord;
47-
import io.cdap.cdap.spi.metadata.MetadataStorage;
4846
import io.cdap.cdap.spi.metadata.MutationOptions;
49-
import io.cdap.cdap.spi.metadata.Read;
5047
import io.cdap.cdap.spi.metadata.ScopedName;
48+
import io.cdap.cdap.spi.metadata.MetadataKind;
5149
import io.cdap.cdap.spi.metadata.ScopedNameOfKind;
52-
import io.cdap.cdap.spi.metadata.SearchRequest;
50+
import io.cdap.cdap.spi.metadata.MetadataDirective;
51+
import io.cdap.cdap.spi.metadata.Read;
5352
import io.cdap.cdap.spi.metadata.SearchResponse;
5453
import io.cdap.cdap.spi.metadata.Sorting;
54+
import io.cdap.cdap.spi.metadata.MetadataRecord;
55+
import org.apache.tephra.TransactionSystemClient;
5556
import java.io.IOException;
56-
import java.util.Collections;
57-
import java.util.EnumSet;
58-
import java.util.HashMap;
59-
import java.util.HashSet;
6057
import java.util.List;
6158
import java.util.Map;
6259
import java.util.Set;
60+
import java.util.EnumSet;
61+
import java.util.HashSet;
62+
import java.util.HashMap;
63+
import java.util.Collections;
6364
import java.util.stream.Collectors;
64-
import org.apache.tephra.TransactionSystemClient;
6565

6666
/**
6767
* A dataset-based implementation of the Metadata SPI.
@@ -74,6 +74,16 @@ public class DatasetMetadataStorage extends SearchHelper implements MetadataStor
7474
super(txClient, tableDefinition);
7575
}
7676

77+
@Override
78+
public String getName(){
79+
return "DatasetMetadataStorage";
80+
}
81+
82+
@Override
83+
public Object getDatasetMetadata(String datasetName) {
84+
return Collections.emptyMap();
85+
}
86+
7787
@Override
7888
public void createIndex() throws IOException {
7989
createDatasets();

0 commit comments

Comments
 (0)