Skip to content

Commit

Permalink
HIVE-24059: Llap external client - Initial changes for running in clo…
Browse files Browse the repository at this point in the history
…ud environment(Shubham Chaurasia, reviewed by Prasanth Jayachandran)
  • Loading branch information
anishek committed Sep 2, 2020
1 parent ff59864 commit 7586af3
Show file tree
Hide file tree
Showing 27 changed files with 1,240 additions and 215 deletions.
26 changes: 26 additions & 0 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -4895,6 +4895,32 @@ public static enum ConfVars {
LLAP_EXTERNAL_CLIENT_USE_HYBRID_CALENDAR("hive.llap.external.client.use.hybrid.calendar",
false,
"Whether to use hybrid calendar for parsing of data/timestamps."),

// ====== confs for llap-external-client cloud deployment ======
LLAP_EXTERNAL_CLIENT_CLOUD_DEPLOYMENT_SETUP_ENABLED(
"hive.llap.external.client.cloud.deployment.setup.enabled", false,
"Tells whether to enable additional RPC port, auth mechanism for llap external clients. This is meant"
+ "for cloud based deployments. When true, it has following effects - \n"
+ "1. Enables an extra RPC port on LLAP daemon to accept fragments from external clients. See"
+ "hive.llap.external.client.cloud.rpc.port\n"
+ "2. Uses external hostnames of LLAP in splits, so that clients can submit from outside of cloud. "
+ "Env variable PUBLIC_HOSTNAME should be available on LLAP machines.\n"
+ "3. Uses JWT based authentication for splits to be validated at LLAP. See "
+ "hive.llap.external.client.cloud.jwt.shared.secret.provider"),
LLAP_EXTERNAL_CLIENT_CLOUD_RPC_PORT("hive.llap.external.client.cloud.rpc.port", 30004,
"The LLAP daemon RPC port for external clients when llap is running in cloud environment."),
LLAP_EXTERNAL_CLIENT_CLOUD_OUTPUT_SERVICE_PORT("hive.llap.external.client.cloud.output.service.port", 30005,
"LLAP output service port when llap is running in cloud environment"),
LLAP_EXTERNAL_CLIENT_CLOUD_JWT_SHARED_SECRET_PROVIDER(
"hive.llap.external.client.cloud.jwt.shared.secret.provider",
"org.apache.hadoop.hive.llap.security.DefaultJwtSharedSecretProvider",
"Shared secret provider to be used to sign JWT"),
LLAP_EXTERNAL_CLIENT_CLOUD_JWT_SHARED_SECRET("hive.llap.external.client.cloud.jwt.shared.secret",
"",
"The LLAP daemon RPC port for external clients when llap is running in cloud environment. "
+ "Length of the secret should be >= 32 bytes"),
// ====== confs for llap-external-client cloud deployment ======

LLAP_ENABLE_GRACE_JOIN_IN_LLAP("hive.llap.enable.grace.join.in.llap", false,
"Override if grace join should be allowed to run in llap."),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,19 @@ public void testWritable() throws Exception {
new SplitLocationInfo("location2", false),
};

LlapDaemonInfo daemonInfo1 = new LlapDaemonInfo("host1", 30004, 15003);
LlapDaemonInfo daemonInfo2 = new LlapDaemonInfo("host2", 30004, 15003);

LlapDaemonInfo[] llapDaemonInfos = {daemonInfo1, daemonInfo2};

ArrayList<FieldDesc> colDescs = new ArrayList<FieldDesc>();
colDescs.add(new FieldDesc("col1", TypeInfoFactory.stringTypeInfo));
colDescs.add(new FieldDesc("col2", TypeInfoFactory.intTypeInfo));
Schema schema = new Schema(colDescs);

byte[] tokenBytes = new byte[] { 1 };
LlapInputSplit split1 = new LlapInputSplit(splitNum, planBytes, fragmentBytes, null,
locations, schema, "hive", tokenBytes);
locations, llapDaemonInfos, schema, "hive", tokenBytes, "some-dummy-jwt");
ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
DataOutputStream dataOut = new DataOutputStream(byteOutStream);
split1.write(dataOut);
Expand Down Expand Up @@ -83,6 +88,8 @@ static void checkLlapSplits(LlapInputSplit split1, LlapInputSplit split2) throws
assertArrayEquals(split1.getLocations(), split2.getLocations());
assertEquals(split1.getSchema().toString(), split2.getSchema().toString());
assertEquals(split1.getLlapUser(), split2.getLlapUser());
assertEquals(split1.getJwt(), split2.getJwt());
assertArrayEquals(split1.getLlapDaemonInfos(), split2.getLlapDaemonInfos());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hive.jdbc;

import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.LlapArrowRowInputFormat;
import org.apache.hadoop.hive.llap.LlapBaseInputFormat;
import org.apache.hadoop.hive.llap.LlapInputSplit;
import org.apache.hadoop.hive.llap.Row;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.junit.BeforeClass;
import org.junit.Ignore;

import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

/**
* TestLlapExtClientWithCloudDeploymentConfigs
*/
public class TestLlapExtClientWithCloudDeploymentConfigs extends BaseJdbcWithMiniLlap {

@BeforeClass
public static void beforeTest() throws Exception {
System.setProperty("PUBLIC_HOSTNAME", InetAddress.getLocalHost().getHostAddress());

HiveConf conf = defaultConf();
conf.set("minillap.usePortsFromConf", "true");

// enable setup for cloud based deployment
conf.setBoolVar(HiveConf.ConfVars.LLAP_EXTERNAL_CLIENT_CLOUD_DEPLOYMENT_SETUP_ENABLED, true);
conf.setVar(HiveConf.ConfVars.LLAP_EXTERNAL_CLIENT_CLOUD_JWT_SHARED_SECRET,
"Three may keep a secret, if two of them are dead -- Benjamin Franklin");

conf.setBoolVar(HiveConf.ConfVars.LLAP_OUTPUT_FORMAT_ARROW, true);
conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_FILESINK_ARROW_NATIVE_ENABLED, true);
BaseJdbcWithMiniLlap.beforeTest(conf);

}

@Override protected InputFormat<NullWritable, Row> getInputFormat() {
//For unit testing, no harm in hard-coding allocator ceiling to LONG.MAX_VALUE
return new LlapArrowRowInputFormat(Long.MAX_VALUE);
}

@Override public void testDataTypes() throws Exception {
TestJdbcWithMiniLlapVectorArrow testJdbcWithMiniLlapVectorArrow = new TestJdbcWithMiniLlapVectorArrow();
testJdbcWithMiniLlapVectorArrow.testDataTypes();
}

@Override
@Ignore
public void testMultipleBatchesOfComplexTypes() {
// TODO: something else has broken parent test, need to check
}

@Override protected int processQuery(String currentDatabase, String query, int numSplits, RowProcessor rowProcessor)
throws Exception {
String url = miniHS2.getJdbcURL();
String user = System.getProperty("user.name");
String pwd = user;
String handleId = UUID.randomUUID().toString();

InputFormat<NullWritable, Row> inputFormat = getInputFormat();

// Get splits
JobConf job = new JobConf(conf);
job.set(LlapBaseInputFormat.URL_KEY, url);
job.set(LlapBaseInputFormat.USER_KEY, user);
job.set(LlapBaseInputFormat.PWD_KEY, pwd);
job.set(LlapBaseInputFormat.QUERY_KEY, query);
job.set(LlapBaseInputFormat.HANDLE_ID, handleId);
job.set(LlapBaseInputFormat.USE_NEW_SPLIT_FORMAT, "true");
if (currentDatabase != null) {
job.set(LlapBaseInputFormat.DB_KEY, currentDatabase);
}

InputSplit[] splits = inputFormat.getSplits(job, numSplits);

if (splits.length <= 1) {
return 0;
}


// populate actual splits with schema and planBytes[]
LlapInputSplit schemaSplit = (LlapInputSplit) splits[0];
LlapInputSplit planSplit = (LlapInputSplit) splits[1];

List<LlapInputSplit> actualSplits = new ArrayList<>();

for (int i = 2; i < splits.length; i++) {
LlapInputSplit actualSplit = (LlapInputSplit) splits[i];
actualSplit.setSchema(schemaSplit.getSchema());
actualSplit.setPlanBytes(planSplit.getPlanBytes());
actualSplits.add(actualSplit);
}

// Fetch rows from splits
int rowCount = 0;
for (InputSplit split : actualSplits) {
System.out.println("Processing split " + split.getLocations());
RecordReader<NullWritable, Row> reader = inputFormat.getRecordReader(split, job, null);
Row row = reader.createValue();
while (reader.next(NullWritable.get(), row)) {
rowProcessor.process(row);
++rowCount;
}
//In arrow-mode this will throw exception unless all buffers have been released
//See org.apache.hadoop.hive.llap.LlapArrowBatchRecordReader
reader.close();
}
LlapBaseInputFormat.close(handleId);

return rowCount;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,37 +20,43 @@
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.hive.llap.Schema;
import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
import org.apache.hadoop.hive.llap.ext.LlapDaemonInfo;
import org.apache.hadoop.mapred.InputSplitWithLocationInfo;
import org.apache.hadoop.mapred.SplitLocationInfo;
import org.apache.hadoop.security.token.Token;

public class LlapInputSplit implements InputSplitWithLocationInfo {

private int splitNum;
private byte[] planBytes;
private byte[] fragmentBytes;
private SplitLocationInfo[] locations;
private LlapDaemonInfo[] llapDaemonInfos;
private Schema schema;
private String llapUser;
private byte[] fragmentBytesSignature;
private byte[] tokenBytes;
//only needed in cloud deployments for llap server to validate request from external llap clients.
//HS2 generates a JWT and populates this field while get_splits() call, this jwt gets validated at LLAP server
//when LlapInputSplit is submitted.
private String jwt;

public LlapInputSplit() {
}

public LlapInputSplit(int splitNum, byte[] planBytes, byte[] fragmentBytes,
byte[] fragmentBytesSignature, SplitLocationInfo[] locations, Schema schema,
String llapUser, byte[] tokenBytes) {
byte[] fragmentBytesSignature, SplitLocationInfo[] locations,
LlapDaemonInfo[] llapDaemonInfos, Schema schema,
String llapUser, byte[] tokenBytes, String jwt) {
this.planBytes = planBytes;
this.fragmentBytes = fragmentBytes;
this.fragmentBytesSignature = fragmentBytesSignature;
this.locations = locations;
this.llapDaemonInfos = llapDaemonInfos;
this.schema = schema;
this.splitNum = splitNum;
this.llapUser = llapUser;
this.tokenBytes = tokenBytes;
this.jwt = jwt;
}

public Schema getSchema() {
Expand Down Expand Up @@ -99,6 +105,10 @@ public void setSchema(Schema schema) {
this.schema = schema;
}

public String getJwt() {
return jwt;
}

@Override
public void write(DataOutput out) throws IOException {
out.writeInt(splitNum);
Expand All @@ -119,6 +129,11 @@ public void write(DataOutput out) throws IOException {
out.writeUTF(locations[i].getLocation());
}

out.writeInt(llapDaemonInfos.length);
for (LlapDaemonInfo llapDaemonInfo : llapDaemonInfos) {
llapDaemonInfo.write(out);
}

schema.write(out);
out.writeUTF(llapUser);
if (tokenBytes != null) {
Expand All @@ -127,6 +142,10 @@ public void write(DataOutput out) throws IOException {
} else {
out.writeInt(0);
}

if (jwt != null) {
out.writeUTF(jwt);
}
}

@Override
Expand All @@ -152,6 +171,12 @@ public void readFields(DataInput in) throws IOException {
locations[i] = new SplitLocationInfo(in.readUTF(), false);
}

llapDaemonInfos = new LlapDaemonInfo[in.readInt()];
for (int i = 0; i < llapDaemonInfos.length; i++) {
llapDaemonInfos[i] = new LlapDaemonInfo();
llapDaemonInfos[i].readFields(in);
}

schema = new Schema();
schema.readFields(in);
llapUser = in.readUTF();
Expand All @@ -160,6 +185,7 @@ public void readFields(DataInput in) throws IOException {
tokenBytes = new byte[length];
in.readFully(tokenBytes);
}
jwt = in.readUTF();
}

@Override
Expand All @@ -170,4 +196,8 @@ public SplitLocationInfo[] getLocationInfo() throws IOException {
public String getLlapUser() {
return llapUser;
}

public LlapDaemonInfo[] getLlapDaemonInfos() {
return llapDaemonInfos;
}
}
Loading

0 comments on commit 7586af3

Please sign in to comment.