Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1.4.0]Fix null pointers in sendemail #5121

Open
wants to merge 28 commits into
base: release-1.4.0
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
4776462
Fix null pointers in sendemail
liaowenlen May 21, 2024
341c7b3
Flink adaptation to scala2.12
liaowenlen May 21, 2024
8c30587
Flink supports jdbc
liaowenlen May 21, 2024
918e295
Failed to submit task1 scala.MatchError: null
liaowenlen May 21, 2024
bcd7e6e
Spark Import/Export Excel
liaowenlen May 21, 2024
c170298
Fix the creation of jdbcURL
liaowenlen Jul 16, 2024
ac6dc30
Fix the creation of jdbcURL(doris)
liaowenlen Jul 16, 2024
8fedf2a
Fix the creation of jdbcURL(kingbase)
liaowenlen Jul 17, 2024
c9872cd
Add prompt info for SqlConnection
liaowenlen Jul 17, 2024
d7621ba
Use uris as a brokers
liaowenlen Jul 22, 2024
f7ff5d7
Fix getAllTables for kingbase SqlConnection
liaowenlen Jul 23, 2024
75bffb4
Fix getColumns for mysql SqlConnection
liaowenlen Jul 23, 2024
8149ec6
Fix getAllTables for kingbase SqlConnection("dbname"."table" -> table)
liaowenlen Jul 23, 2024
a0a7991
clear server
liaowenlen Oct 10, 2024
7763834
fair create engineconn
liaowenlen Oct 11, 2024
a19f9a9
Fix the issue where the value of ${run_date} is one day smaller than …
liaowenlen Oct 31, 2024
06c94e0
Modify the version of Spark engine in SEATUNNEL
liaowenlen Nov 27, 2024
d5cca8f
Fix the health and status addresses of Eureka
liaowenlen Nov 30, 2024
22f8785
Specify the debugging port for EngineConnServer
liaowenlen Dec 30, 2024
e8f6f8b
rollback
liaowenlen Dec 30, 2024
e75daeb
Fix the issue of secondary caching of CSTable in JDBC nodes
liaowenlen Jan 13, 2025
9393540
Provide CSTable as a variable to engine nodes for use
liaowenlen Jan 13, 2025
ea2a585
Provide CSTable as a variable to pipeline engine for use
liaowenlen Jan 17, 2025
7632965
Provide CSTable as a variable to engine nodes for use(external call)
liaowenlen Jan 17, 2025
738aff6
Workflow adds support for OpenLookeng and Impala
liaowenlen Jan 21, 2025
573587f
Fix the issue of columnInfos being empty
liaowenlen Feb 6, 2025
fe0bd0b
Fix kingbase getColumns
liaowenlen Feb 11, 2025
fe8f15d
Remove ContextID from local cache
liaowenlen Feb 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Fix null pointers in sendemail
liaowenlen committed May 21, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit 4776462ea0076fcbd93720962135560fb25543b1
Original file line number Diff line number Diff line change
@@ -20,15 +20,14 @@
import org.apache.linkis.storage.exception.StorageWarnException;
import org.apache.linkis.storage.utils.StorageConfiguration;
import org.apache.linkis.storage.utils.StorageUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.linkis.storage.errorcode.LinkisStorageErrorCodeSummary.FAILED_TO_READ_INTEGER;

public class Dolphin {
@@ -59,6 +58,9 @@ public class Dolphin {
public static final String NULL = "NULL";
public static final byte[] NULL_BYTES = "NULL".getBytes(Charset.forName("utf-8"));

public static final String LINKIS_NULL = "LINKIS_NULL";
public static final byte[] LINKIS_NULL_BYTES = LINKIS_NULL.getBytes(Charset.forName("utf-8"));

public static final int INT_LEN = 10;

public static final int FILE_EMPTY = 31;
Original file line number Diff line number Diff line change
@@ -20,7 +20,13 @@
public enum StorageErrorCode {

/** */
FS_NOT_INIT(53001, "please init first(请先初始化)");
FS_NOT_INIT(53001, "please init first(请先初始化)"),

INCONSISTENT_DATA(53001, "Inconsistent row data read,read %s,need rowLen %s"),

FS_OOM(53002, "OOM occurred while reading the file"),

FS_ERROR(53003, "Failed to operation fs");

StorageErrorCode(int errorCode, String message) {
this.code = errorCode;
Original file line number Diff line number Diff line change
@@ -29,27 +29,26 @@
import org.apache.linkis.storage.resultset.table.TableMetaData;
import org.apache.linkis.storage.resultset.table.TableRecord;
import org.apache.linkis.storage.resultset.table.TableResultSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ResultSetReaderFactory {
private static final Logger logger = LoggerFactory.getLogger(ResultSetReaderFactory.class);

public static <K extends MetaData, V extends Record> ResultSetReader getResultSetReader(
ResultSet<K, V> resultSet, InputStream inputStream) {
ResultSet<K, V> resultSet, InputStream inputStream) {
return new StorageResultSetReader<>(resultSet, inputStream);
}

public static <K extends MetaData, V extends Record> ResultSetReader getResultSetReader(
ResultSet<K, V> resultSet, String value) {
ResultSet<K, V> resultSet, String value) {
return new StorageResultSetReader<>(resultSet, value);
}

public static ResultSetReader getResultSetReader(String res) {
public static ResultSetReader getResultSetReader(String res) throws IOException {
ResultSetFactory rsFactory = ResultSetFactory.getInstance();
if (rsFactory.isResultSet(res)) {
ResultSet<? extends MetaData, ? extends Record> resultSet = rsFactory.getResultSet(res);
@@ -58,21 +57,12 @@ public static ResultSetReader getResultSetReader(String res) {
FsPath resPath = new FsPath(res);
ResultSet<? extends MetaData, ? extends Record> resultSet =
rsFactory.getResultSetByPath(resPath);
try {
FSFactory.getFs(resPath).init(null);
} catch (IOException e) {
logger.warn("ResultSetReaderFactory fs init failed", e);
}
ResultSetReader reader = null;
try {
reader =
ResultSetReaderFactory.getResultSetReader(
resultSet, FSFactory.getFs(resPath).read(resPath));
} catch (IOException e) {
logger.warn("ResultSetReaderFactory fs read failed", e);
}
Fs fs = FSFactory.getFs(resPath);
fs.init(null);
ResultSetReader reader =
ResultSetReaderFactory.getResultSetReader(resultSet, fs.read(resPath));
if (reader instanceof StorageResultSetReader) {
((StorageResultSetReader<?, ?>) reader).setFs(FSFactory.getFs(resPath));
((StorageResultSetReader<?, ?>) reader).setFs(fs);
}
return (StorageResultSetReader<?, ?>) reader;
}
Original file line number Diff line number Diff line change
@@ -22,14 +22,13 @@
import org.apache.linkis.common.io.Record;
import org.apache.linkis.common.io.resultset.ResultSet;
import org.apache.linkis.common.io.resultset.ResultSetReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ResultSetWriterFactory {
private static final Logger logger = LoggerFactory.getLogger(ResultSetWriterFactory.class);

@@ -51,28 +50,25 @@ org.apache.linkis.common.io.resultset.ResultSetWriter<K, V> getResultSetWriter(
public static Record[] getRecordByWriter(
org.apache.linkis.common.io.resultset.ResultSetWriter<? extends MetaData, ? extends Record>
writer,
long limit) {
long limit)
throws IOException {
String res = writer.toString();
return getRecordByRes(res, limit);
}

public static Record[] getRecordByRes(String res, long limit) {
public static Record[] getRecordByRes(String res, long limit) throws IOException {
ResultSetReader reader = ResultSetReaderFactory.getResultSetReader(res);
int count = 0;
List<Record> records = new ArrayList<>();
try {
reader.getMetaData();
while (reader.hasNext() && count < limit) {
records.add(reader.getRecord());
count++;
}
} catch (IOException e) {
logger.warn("ResultSetWriter getRecordByRes failed", e);
reader.getMetaData();
while (reader.hasNext() && count < limit) {
records.add(reader.getRecord());
count++;
}
return records.toArray(new Record[0]);
}

public static Record getLastRecordByRes(String res) {
public static Record getLastRecordByRes(String res) throws IOException {
ResultSetReader reader = ResultSetReaderFactory.getResultSetReader(res);
Record record = null;
try {
Original file line number Diff line number Diff line change
@@ -17,25 +17,30 @@

package org.apache.linkis.storage.resultset;

import org.apache.linkis.common.io.*;
import org.apache.linkis.common.io.resultset.*;
import org.apache.linkis.common.io.resultset.ResultSetWriter;
import org.apache.linkis.common.utils.*;
import org.apache.linkis.storage.*;
import org.apache.linkis.storage.conf.*;
import org.apache.linkis.storage.domain.*;
import org.apache.linkis.storage.utils.*;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.linkis.common.io.Fs;
import org.apache.linkis.common.io.FsPath;
import org.apache.linkis.common.io.MetaData;
import org.apache.linkis.common.io.Record;
import org.apache.linkis.common.io.resultset.ResultSerializer;
import org.apache.linkis.common.io.resultset.ResultSet;
import org.apache.linkis.common.io.resultset.ResultSetWriter;
import org.apache.linkis.storage.FSFactory;
import org.apache.linkis.storage.conf.LinkisStorageConf;
import org.apache.linkis.storage.domain.Dolphin;
import org.apache.linkis.storage.exception.StorageErrorException;
import org.apache.linkis.storage.utils.FileSystemUtils;
import org.apache.linkis.storage.utils.StorageUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.linkis.storage.exception.StorageErrorCode.FS_ERROR;

public class StorageResultSetWriter<K extends MetaData, V extends Record>
extends ResultSetWriter<K, V> {
@@ -98,8 +103,9 @@ public void createNewFile() {
fs.init(null);
FileSystemUtils.createNewFile(storePath, proxyUser, true);
outputStream = fs.write(storePath, true);
} catch (IOException e) {
logger.warn("StorageResultSetWriter createNewFile failed", e);
} catch (Exception e) {
throw new StorageErrorException(
FS_ERROR.getCode(), "StorageResultSetWriter createNewFile failed", e);
}
logger.info("Succeed to create a new file:{}", storePath);
fileCreated = true;
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@

package org.apache.linkis.storage.source;

import org.apache.linkis.storage.domain.Dolphin;
import org.apache.linkis.storage.resultset.table.TableRecord;
import org.apache.linkis.storage.utils.StorageUtils;

@@ -38,7 +39,12 @@ record -> {
if (r == null || r.equals("NULL")) {
return nullValue;
} else if (r.equals("")) {
return getParams().getOrDefault("nullValue", "");
String emptyValue = getParams().getOrDefault("nullValue", "");
if (emptyValue.equals(Dolphin.LINKIS_NULL)) {
return "";
} else {
return emptyValue;
}
} else if (r instanceof Double) {
return StorageUtils.doubleToString((Double) r);
} else {
Original file line number Diff line number Diff line change
@@ -17,20 +17,18 @@

package org.apache.linkis.storage.utils;

import org.apache.commons.io.IOUtils;
import org.apache.linkis.common.io.FsPath;
import org.apache.linkis.storage.FSFactory;
import org.apache.linkis.storage.fs.FileSystem;
import org.apache.linkis.storage.fs.impl.LocalFileSystem;

import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Objects;
import java.util.Stack;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileSystemUtils {
private static final Logger logger = LoggerFactory.getLogger(FileSystemUtils.class);

@@ -61,23 +59,19 @@ public static void createNewFile(FsPath filePath, boolean createParentWhenNotExi
createNewFile(filePath, StorageUtils.getJvmUser(), createParentWhenNotExists);
}

public static void createNewFile(
FsPath filePath, String user, boolean createParentWhenNotExists) {
public static void createNewFile(FsPath filePath, String user, boolean createParentWhenNotExists)
throws Exception {
FileSystem fileSystem = (FileSystem) FSFactory.getFsByProxyUser(filePath, user);
try {
fileSystem.init(null);
createNewFileWithFileSystem(fileSystem, filePath, user, createParentWhenNotExists);
} catch (IOException e) {
logger.warn("FileSystemUtils createNewFile failed", e);
} catch (Exception e) {
logger.warn("FileSystemUtils createNewFile failed", e);
} finally {
IOUtils.closeQuietly(fileSystem);
}
}

public static void createNewFileWithFileSystem(
FileSystem fileSystem, FsPath filePath, String user, boolean createParentWhenNotExists)
FileSystem fileSystem, FsPath filePath, String user, boolean createParentWhenNotExists)
throws Exception {
if (!fileSystem.exists(filePath)) {
if (!fileSystem.exists(filePath.getParent())) {