Skip to content

Commit 9130913

Browse files
authored
HBASE-29473 Obtain target cluster's token for cross clusters job (#7175)
Signed-off-by: Nihal Jain <[email protected]> Signed-off-by: Junegunn Choi <[email protected]> Signed-off-by: Pankaj Kumar <[email protected]>
1 parent 643b161 commit 9130913

File tree

4 files changed

+179
-39
lines changed

4 files changed

+179
-39
lines changed

hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -753,7 +753,7 @@ public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDes
753753
* @see #REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY
754754
* @see #REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY
755755
*/
756-
public static void configureRemoteCluster(Job job, Configuration clusterConf) {
756+
public static void configureRemoteCluster(Job job, Configuration clusterConf) throws IOException {
757757
Configuration conf = job.getConfiguration();
758758

759759
if (!conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
@@ -770,6 +770,8 @@ public static void configureRemoteCluster(Job job, Configuration clusterConf) {
770770
conf.setInt(REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY, clientPort);
771771
conf.set(REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY, parent);
772772

773+
TableMapReduceUtil.initCredentialsForCluster(job, clusterConf);
774+
773775
LOG.info("ZK configs for remote cluster of bulkload is configured: " + quorum + ":" + clientPort
774776
+ "/" + parent);
775777
}
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.mapreduce;
19+
20+
import static org.apache.hadoop.security.UserGroupInformation.loginUserFromKeytab;
21+
import static org.junit.Assert.assertEquals;
22+
import static org.junit.Assert.assertTrue;
23+
24+
import java.io.Closeable;
25+
import java.io.File;
26+
import java.util.ArrayList;
27+
import java.util.List;
28+
import java.util.Map;
29+
import org.apache.commons.io.IOUtils;
30+
import org.apache.hadoop.conf.Configuration;
31+
import org.apache.hadoop.hbase.HBaseClassTestRule;
32+
import org.apache.hadoop.hbase.HBaseTestingUtil;
33+
import org.apache.hadoop.hbase.TableName;
34+
import org.apache.hadoop.hbase.client.RegionLocator;
35+
import org.apache.hadoop.hbase.client.Table;
36+
import org.apache.hadoop.hbase.testclassification.LargeTests;
37+
import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
38+
import org.apache.hadoop.hbase.util.Bytes;
39+
import org.apache.hadoop.io.Text;
40+
import org.apache.hadoop.mapreduce.Job;
41+
import org.apache.hadoop.minikdc.MiniKdc;
42+
import org.apache.hadoop.security.UserGroupInformation;
43+
import org.apache.hadoop.security.token.Token;
44+
import org.apache.hadoop.security.token.TokenIdentifier;
45+
import org.junit.After;
46+
import org.junit.Before;
47+
import org.junit.ClassRule;
48+
import org.junit.Test;
49+
import org.junit.experimental.categories.Category;
50+
51+
/**
52+
* Tests for {@link HFileOutputFormat2} with secure mode.
53+
*/
54+
@Category({ VerySlowMapReduceTests.class, LargeTests.class })
55+
public class TestHFileOutputFormat2WithSecurity extends HFileOutputFormat2TestBase {
56+
@ClassRule
57+
public static final HBaseClassTestRule CLASS_RULE =
58+
HBaseClassTestRule.forClass(TestHFileOutputFormat2WithSecurity.class);
59+
60+
private static final byte[] FAMILIES = Bytes.toBytes("test_cf");
61+
62+
private static final String HTTP_PRINCIPAL = "HTTP/localhost";
63+
64+
private HBaseTestingUtil utilA;
65+
66+
private Configuration confA;
67+
68+
private HBaseTestingUtil utilB;
69+
70+
private MiniKdc kdc;
71+
72+
private List<Closeable> clusters = new ArrayList<>();
73+
74+
@Before
75+
public void setupSecurityClusters() throws Exception {
76+
utilA = new HBaseTestingUtil();
77+
confA = utilA.getConfiguration();
78+
79+
utilB = new HBaseTestingUtil();
80+
81+
// Prepare security configs.
82+
File keytab = new File(utilA.getDataTestDir("keytab").toUri().getPath());
83+
kdc = utilA.setupMiniKdc(keytab);
84+
String username = UserGroupInformation.getLoginUser().getShortUserName();
85+
String userPrincipal = username + "/localhost";
86+
kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
87+
loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath());
88+
89+
// Start security clusterA
90+
clusters.add(utilA.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL));
91+
92+
// Start security clusterB
93+
clusters.add(utilB.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL));
94+
}
95+
96+
@After
97+
public void teardownSecurityClusters() {
98+
IOUtils.closeQuietly(clusters);
99+
clusters.clear();
100+
if (kdc != null) {
101+
kdc.stop();
102+
}
103+
}
104+
105+
@Test
106+
public void testIncrementalLoadInMultiClusterWithSecurity() throws Exception {
107+
TableName tableName = TableName.valueOf("testIncrementalLoadInMultiClusterWithSecurity");
108+
109+
// Create table in clusterB
110+
try (Table table = utilB.createTable(tableName, FAMILIES);
111+
RegionLocator r = utilB.getConnection().getRegionLocator(tableName)) {
112+
113+
// Create job in clusterA
114+
Job job = Job.getInstance(confA, "testIncrementalLoadInMultiClusterWithSecurity");
115+
job.setWorkingDirectory(
116+
utilA.getDataTestDirOnTestFS("testIncrementalLoadInMultiClusterWithSecurity"));
117+
setupRandomGeneratorMapper(job, false);
118+
HFileOutputFormat2.configureIncrementalLoad(job, table, r);
119+
120+
Map<Text, Token<? extends TokenIdentifier>> tokenMap = job.getCredentials().getTokenMap();
121+
assertEquals(2, tokenMap.size());
122+
123+
String remoteClusterId = utilB.getHBaseClusterInterface().getClusterMetrics().getClusterId();
124+
assertTrue(tokenMap.containsKey(new Text(remoteClusterId)));
125+
} finally {
126+
if (utilB.getAdmin().tableExists(tableName)) {
127+
utilB.deleteTable(tableName);
128+
}
129+
}
130+
}
131+
}

hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java

Lines changed: 6 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,8 @@
3030
import org.apache.hadoop.hbase.HBaseClassTestRule;
3131
import org.apache.hadoop.hbase.HBaseTestingUtil;
3232
import org.apache.hadoop.hbase.client.Scan;
33-
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
34-
import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
35-
import org.apache.hadoop.hbase.security.access.AccessController;
36-
import org.apache.hadoop.hbase.security.access.PermissionStorage;
37-
import org.apache.hadoop.hbase.security.access.SecureTestUtil;
3833
import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProviders;
3934
import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
40-
import org.apache.hadoop.hbase.security.token.TokenProvider;
41-
import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil;
4235
import org.apache.hadoop.hbase.testclassification.MapReduceTests;
4336
import org.apache.hadoop.hbase.testclassification.MediumTests;
4437
import org.apache.hadoop.hbase.util.Bytes;
@@ -134,31 +127,6 @@ public void testInitTableMapperJob4() throws Exception {
134127
assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
135128
}
136129

137-
private static Closeable startSecureMiniCluster(HBaseTestingUtil util, MiniKdc kdc,
138-
String principal) throws Exception {
139-
Configuration conf = util.getConfiguration();
140-
141-
SecureTestUtil.enableSecurity(conf);
142-
VisibilityTestUtil.enableVisiblityLabels(conf);
143-
SecureTestUtil.verifyConfiguration(conf);
144-
145-
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
146-
AccessController.class.getName() + ',' + TokenProvider.class.getName());
147-
148-
HBaseKerberosUtils.setSecuredConfiguration(conf, principal + '@' + kdc.getRealm(),
149-
HTTP_PRINCIPAL + '@' + kdc.getRealm());
150-
151-
util.startMiniCluster();
152-
try {
153-
util.waitUntilAllRegionsAssigned(PermissionStorage.ACL_TABLE_NAME);
154-
} catch (Exception e) {
155-
util.shutdownMiniCluster();
156-
throw e;
157-
}
158-
159-
return util::shutdownMiniCluster;
160-
}
161-
162130
@Test
163131
public void testInitCredentialsForCluster1() throws Exception {
164132
HBaseTestingUtil util1 = new HBaseTestingUtil();
@@ -198,8 +166,8 @@ public void testInitCredentialsForCluster2() throws Exception {
198166
kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
199167
loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath());
200168

201-
try (Closeable ignored1 = startSecureMiniCluster(util1, kdc, userPrincipal);
202-
Closeable ignored2 = startSecureMiniCluster(util2, kdc, userPrincipal)) {
169+
try (Closeable ignored1 = util1.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL);
170+
Closeable ignored2 = util2.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL)) {
203171
Configuration conf1 = util1.getConfiguration();
204172
Job job = Job.getInstance(conf1);
205173

@@ -232,7 +200,7 @@ public void testInitCredentialsForCluster3() throws Exception {
232200
kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
233201
loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath());
234202

235-
try (Closeable ignored1 = startSecureMiniCluster(util1, kdc, userPrincipal)) {
203+
try (Closeable ignored1 = util1.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL)) {
236204
HBaseTestingUtil util2 = new HBaseTestingUtil();
237205
// Assume util2 is insecure cluster
238206
// Do not start util2 because cannot boot secured mini cluster and insecure mini cluster at
@@ -268,7 +236,7 @@ public void testInitCredentialsForCluster4() throws Exception {
268236
kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
269237
loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath());
270238

271-
try (Closeable ignored2 = startSecureMiniCluster(util2, kdc, userPrincipal)) {
239+
try (Closeable ignored2 = util2.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL)) {
272240
Configuration conf1 = util1.getConfiguration();
273241
Job job = Job.getInstance(conf1);
274242

@@ -303,8 +271,8 @@ public void testInitCredentialsForClusterUri() throws Exception {
303271
kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
304272
loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath());
305273

306-
try (Closeable ignored1 = startSecureMiniCluster(util1, kdc, userPrincipal);
307-
Closeable ignored2 = startSecureMiniCluster(util2, kdc, userPrincipal)) {
274+
try (Closeable ignored1 = util1.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL);
275+
Closeable ignored2 = util2.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL)) {
308276
Configuration conf1 = util1.getConfiguration();
309277
Job job = Job.getInstance(conf1);
310278

hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtil.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static org.junit.Assert.fail;
2323

2424
import edu.umd.cs.findbugs.annotations.Nullable;
25+
import java.io.Closeable;
2526
import java.io.File;
2627
import java.io.IOException;
2728
import java.io.OutputStream;
@@ -88,6 +89,7 @@
8889
import org.apache.hadoop.hbase.client.TableDescriptor;
8990
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
9091
import org.apache.hadoop.hbase.client.TableState;
92+
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
9193
import org.apache.hadoop.hbase.fs.HFileSystem;
9294
import org.apache.hadoop.hbase.io.compress.Compression;
9395
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
@@ -119,7 +121,12 @@
119121
import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
120122
import org.apache.hadoop.hbase.security.User;
121123
import org.apache.hadoop.hbase.security.UserProvider;
124+
import org.apache.hadoop.hbase.security.access.AccessController;
125+
import org.apache.hadoop.hbase.security.access.PermissionStorage;
126+
import org.apache.hadoop.hbase.security.access.SecureTestUtil;
127+
import org.apache.hadoop.hbase.security.token.TokenProvider;
122128
import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache;
129+
import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil;
123130
import org.apache.hadoop.hbase.util.Bytes;
124131
import org.apache.hadoop.hbase.util.CommonFSUtils;
125132
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -345,6 +352,38 @@ public static void closeRegionAndWAL(final HRegion r) throws IOException {
345352
r.getWAL().close();
346353
}
347354

355+
/**
356+
* Start mini secure cluster with given kdc and principals.
357+
* @param kdc Mini kdc server
358+
* @param servicePrincipal Service principal without realm.
359+
* @param spnegoPrincipal Spnego principal without realm.
360+
* @return Handler to shutdown the cluster
361+
*/
362+
public Closeable startSecureMiniCluster(MiniKdc kdc, String servicePrincipal,
363+
String spnegoPrincipal) throws Exception {
364+
Configuration conf = getConfiguration();
365+
366+
SecureTestUtil.enableSecurity(conf);
367+
VisibilityTestUtil.enableVisiblityLabels(conf);
368+
SecureTestUtil.verifyConfiguration(conf);
369+
370+
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
371+
AccessController.class.getName() + ',' + TokenProvider.class.getName());
372+
373+
HBaseKerberosUtils.setSecuredConfiguration(conf, servicePrincipal + '@' + kdc.getRealm(),
374+
spnegoPrincipal + '@' + kdc.getRealm());
375+
376+
startMiniCluster();
377+
try {
378+
waitUntilAllRegionsAssigned(PermissionStorage.ACL_TABLE_NAME);
379+
} catch (Exception e) {
380+
shutdownMiniCluster();
381+
throw e;
382+
}
383+
384+
return this::shutdownMiniCluster;
385+
}
386+
348387
/**
349388
* Returns this classes's instance of {@link Configuration}. Be careful how you use the returned
350389
* Configuration since {@link Connection} instances can be shared. The Map of Connections is keyed

0 commit comments

Comments
 (0)