diff --git a/src/main/java/com/sforce/async/BulkConnection.java b/src/main/java/com/sforce/async/BulkConnection.java new file mode 100644 index 00000000..d4db9493 --- /dev/null +++ b/src/main/java/com/sforce/async/BulkConnection.java @@ -0,0 +1,932 @@ +/* + * Copyright (c) 2025, salesforce.com, inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without modification, are permitted provided + * that thhttp://www.apache.org/licenses/LICENSE-2.0e following conditions are met: + * + * Redistributions of source code must retain the above copyright notice, this list of conditions and the + * following disclaimer. + * + * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and + * the following disclaimer in the documentation and/or other materials provided with the distribution. + * + * Neither the name of salesforce.com, inc. nor the names of its contributors may be used to endorse or + * promote products derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A + * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR + * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED + * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ +/** + * Every line changed/added is suffixed with a comment // %EDIT% + * Author: @harshpruthi + */ +package com.sforce.async; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.io.ByteStreams; +import com.sforce.ws.ConnectionException; +import com.sforce.ws.ConnectorConfig; +import com.sforce.ws.MessageHandler; +import com.sforce.ws.MessageHandlerWithHeaders; +import com.sforce.ws.bind.CalendarCodec; +import com.sforce.ws.bind.TypeMapper; +import com.sforce.ws.parser.PullParserException; +import com.sforce.ws.parser.XmlInputStream; +import com.sforce.ws.parser.XmlOutputStream; +import com.sforce.ws.transport.Transport; +import com.sforce.ws.util.FileUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TimeZone; +import java.util.zip.GZIPInputStream; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; +import javax.net.ssl.HttpsURLConnection; +import javax.net.ssl.SSLContext; +import javax.xml.namespace.QName; + +/** + * BulkConnection + * + * @author mcheenath + * @since 160 + */ +public class BulkConnection { + public static final String NAMESPACE = "http://www.force.com/2009/06/asyncapi/dataload"; + public static final String SESSION_ID = "X-SFDC-Session"; + public static final String XML_CONTENT_TYPE = "application/xml"; + public static final String CSV_CONTENT_TYPE = "text/csv"; + public static final String JSON_CONTENT_TYPE = "application/json"; + public static final String ZIP_XML_CONTENT_TYPE = "zip/xml"; + public static final String ZIP_CSV_CONTENT_TYPE = "zip/csv"; + public static final String ZIP_JSON_CONTENT_TYPE = "zip/json"; + public static final QName JOB_QNAME = new QName(NAMESPACE, "jobInfo"); + public static final QName BATCH_QNAME = new QName(NAMESPACE, "batchInfo"); + public static final QName BATCH_LIST_QNAME = new QName(NAMESPACE, "batchInfoList"); + public static final QName ERROR_QNAME = new QName(NAMESPACE, "error"); + // CHECKSTYLE:OFF: ConstantName + public static final TypeMapper typeMapper = new TypeMapper(null, null, false); + // CHECKSTYLE:ON: ConstantName + private static final JsonFactory factory = new JsonFactory(new ObjectMapper()); + private static final Logger logger = LoggerFactory.getLogger(BulkConnection.class); // %EDIT% + private final ConnectorConfig config; + private final HashMap headers = new HashMap(); + + public BulkConnection(ConnectorConfig config) throws AsyncApiException { + if (config == null) { + throw new AsyncApiException("config can not be null", AsyncExceptionCode.ClientInputError); + } + if (config.getRestEndpoint() == null) { + throw new AsyncApiException("rest endpoint cannot be null", + AsyncExceptionCode.ClientInputError); + } + this.config = config; + if (config.getSessionId() == null) { + throw new AsyncApiException("session ID not found", + AsyncExceptionCode.ClientInputError); + } + } + + static void parseAndThrowException(InputStream in, ContentType type) throws AsyncApiException { + try { + AsyncApiException exception; + if (type == ContentType.XML || type == ContentType.ZIP_XML || type == ContentType.CSV || + type == ContentType.ZIP_CSV) { + exception = new AsyncApiException(); + XmlInputStream xin = new XmlInputStream(); + xin.setInput(in, "UTF-8"); + exception.load(xin, typeMapper); + } else if (type == ContentType.JSON || type == ContentType.ZIP_JSON) { + JsonParser parser = factory.createJsonParser(in); + exception = parser.readValueAs(AsyncApiException.class); + } else { + throw new AsyncApiException("Server error returned in unknown format", AsyncExceptionCode.ClientInputError); + } + throw exception; + } catch (PullParserException e) { + throw new AsyncApiException("Failed to parse exception ", AsyncExceptionCode.ClientInputError, e); + } catch (IOException e) { + throw new AsyncApiException("Failed to parse exception", AsyncExceptionCode.ClientInputError, e); + } catch (ConnectionException e) { + throw new AsyncApiException("Failed to parse exception ", AsyncExceptionCode.ClientInputError, e); + } + } + + /** + * Serialize to json + * + * @param out + * @param value + * @throws IOException + */ + static void serializeToJson(OutputStream out, Object value) throws IOException { + JsonGenerator generator = factory.createJsonGenerator(out); + ObjectMapper mapper = new ObjectMapper(); + mapper.setDateFormat(CalendarCodec.getDateFormat()); + mapper.writeValue(generator, value); + } + + /** + * Deserialize JSON input + */ + static T deserializeJsonToObject(InputStream in, Class tmpClass) throws IOException { + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, true); + // By default, ObjectMapper generates Calendar instances with UTC TimeZone. + // Here, override that to "GMT" to better match the behavior of the WSC XML parser. + mapper.setTimeZone(TimeZone.getTimeZone("GMT")); + return mapper.readValue(in, tmpClass); + } + + public JobInfo createJob(String object, String operation) throws AsyncApiException { + JobInfo job = new JobInfo(); + job.setObject(object); + job.setOperation(OperationEnum.valueOf(operation)); + return createJob(job); + } + + public JobInfo createJob(JobInfo job) throws AsyncApiException { + String endpoint = getRestEndpoint(); + endpoint = endpoint + "job/"; + return createOrUpdateJob(job, endpoint); + } + + public JobInfo createJob(JobInfo job, ContentType contentType) throws AsyncApiException { + String endpoint = getRestEndpoint(); + endpoint = endpoint + "job/"; + return createOrUpdateJob(job, endpoint, contentType); + } + + private JobInfo createOrUpdateJob(JobInfo job, String endpoint) throws AsyncApiException { + return createOrUpdateJob(job, endpoint, job.getContentType() == null ? ContentType.XML + : job.getContentType()); + } + + private JobInfo createOrUpdateJob(JobInfo job, String endpoint, ContentType contentType) throws AsyncApiException { + InputStream in = null; // %EDIT% + OutputStream out = null; // %EDIT + try { + Transport transport = config.createTransport(); + // OutputStream out; // %EDIT + if (contentType == ContentType.ZIP_JSON || contentType == ContentType.JSON) { + out = transport.connect(endpoint, getHeaders(JSON_CONTENT_TYPE)); + serializeToJson(out, job); + // out.close(); // %EDIT + } else { + out = transport.connect(endpoint, getHeaders(XML_CONTENT_TYPE)); + XmlOutputStream xout = new AsyncXmlOutputStream(out, true); + job.write(JOB_QNAME, xout, typeMapper); + xout.close(); + } + in = transport.getContent(); // %EDIT% + if (transport.isSuccessful()) { + if (contentType == ContentType.ZIP_JSON || contentType == ContentType.JSON) { + return deserializeJsonToObject(in, JobInfo.class); + } else { + XmlInputStream xin = new XmlInputStream(); + xin.setInput(in, "UTF-8"); + JobInfo result = new JobInfo(); + result.load(xin, typeMapper); + return result; + } + } else { + parseAndThrowException(in, contentType); + } + } catch (PullParserException e) { + throw new AsyncApiException("Failed to create job ", AsyncExceptionCode.ClientInputError, e); + } catch (IOException e) { + throw new AsyncApiException("Failed to create job ", AsyncExceptionCode.ClientInputError, e); + } catch (ConnectionException e) { + throw new AsyncApiException("Failed to create job ", AsyncExceptionCode.ClientInputError, e); + } finally { + if (in != null) { // %EDIT% + try { // %EDIT% + in.close(); // %EDIT% + } catch (IOException e) { // %EDIT% + logger.warn("Failed to close output stream with error: {}, cause: {}", e.getMessage(), + e.getCause()); // %EDIT% + } // %EDIT% + } // %EDIT% + if (out != null) { // %EDIT% + try { // %EDIT% + out.close(); // %EDIT% + } catch (IOException e) { // %EDIT% + logger.warn("Failed to close output stream with error: {}, cause: {}", e.getMessage(), + e.getCause()); // %EDIT% + } // %EDIT% + } + } + return null; + } + + public void addHeader(String headerName, String headerValue) { + headers.put(headerName, headerValue); + } + + private String getRestEndpoint() { + String endpoint = config.getRestEndpoint(); + endpoint = endpoint.endsWith("/") ? endpoint : endpoint + "/"; + return endpoint; + } + + public BatchInfo createBatchFromStream(JobInfo jobInfo, InputStream input) throws AsyncApiException { + return createBatchFromStreamImpl(jobInfo, input, false); + } + + public BatchInfo createBatchFromZipStream(JobInfo jobInfo, InputStream zipInput) throws AsyncApiException { + return createBatchFromStreamImpl(jobInfo, zipInput, true); + } + + private BatchInfo createBatchFromStreamImpl(JobInfo jobInfo, InputStream input, boolean isZip) + throws AsyncApiException { + try { + String endpoint = getRestEndpoint(); + Transport transport = config.createTransport(); + endpoint = endpoint + "job/" + jobInfo.getId() + "/batch"; + String contentType = getContentTypeString(jobInfo.getContentType(), isZip); + HashMap httpHeaders = getHeaders(contentType); + // TODO do we want to allow the zip content to be gzipped + boolean allowZipToBeGzipped = false; + OutputStream out = transport.connect(endpoint, httpHeaders, allowZipToBeGzipped || !isZip); + FileUtil.copy(input, out); + InputStream result = transport.getContent(); + if (!transport.isSuccessful()) { + parseAndThrowException(result, jobInfo.getContentType()); + } + // xml/json content type + if (jobInfo.getContentType() == ContentType.JSON || jobInfo.getContentType() == ContentType.ZIP_JSON) { + return deserializeJsonToObject(result, BatchInfo.class); + } + return BatchRequest.loadBatchInfo(result); + } catch (IOException e) { + throw new AsyncApiException("Failed to create batch", AsyncExceptionCode.ClientInputError, e); + } catch (PullParserException e) { + throw new AsyncApiException("Failed to create batch", AsyncExceptionCode.ClientInputError, e); + } catch (ConnectionException e) { + throw new AsyncApiException("Failed to create batch", AsyncExceptionCode.ClientInputError, e); + } + } + + public BatchInfo createBatchFromDir(JobInfo job, InputStream batchContent, File attachmentDir) + throws AsyncApiException { + final List files = FileUtil.listFilesRecursive(attachmentDir, false); + final Map fileMap = new HashMap(files.size()); + final String rootPath = attachmentDir.getAbsolutePath() + File.separator; + for (File f : files) { + String name = f.getAbsolutePath().replace(rootPath, ""); + fileMap.put(name, f); + } + return createBatchWithFileAttachments(job, batchContent, fileMap); + } + + public BatchInfo createBatchWithFileAttachments(JobInfo jobInfo, InputStream batchContent, File rootDirectory, + String... files) throws AsyncApiException { + Map fileMap = new HashMap(files.length); + for (String fileName : files) { + File f = new File(rootDirectory, fileName); + fileMap.put(fileName, f); + } + return createBatchWithFileAttachments(jobInfo, batchContent, fileMap); + } + + public BatchInfo createBatchWithFileAttachments(JobInfo jobInfo, InputStream batchContent, + Map attachedFiles) throws AsyncApiException { + Map inputStreamMap = new HashMap(attachedFiles.size()); + for (Map.Entry entry : attachedFiles.entrySet()) { + final File file = entry.getValue(); + try { + inputStreamMap.put(entry.getKey(), new FileInputStream(file)); + } catch (IOException e) { + throw new AsyncApiException("Failed to create batch. Could not read file : " + file, + AsyncExceptionCode.ClientInputError, e); + } + } + return createBatchWithInputStreamAttachments(jobInfo, batchContent, inputStreamMap); + } + + /** + * @param jobInfo Parent job for new batch. + * @param batchContent InputStream containing the xml or csv content of the batch, or null only if request.txt + * is contained + * in attachments map + * @param attachments Map of attachments where the key is the filename to be used in the zip file and the value is + * the + * InputStream representing that file. + * @return BatchInfo of uploaded batch. + */ + public BatchInfo createBatchWithInputStreamAttachments( + JobInfo jobInfo, + InputStream batchContent, + Map attachments + ) throws AsyncApiException { + if (batchContent != null && attachments.get("request.txt") != null) { + throw new AsyncApiException("Request content cannot be included as both input stream and attachment", + AsyncExceptionCode.ClientInputError); + } + try { + String endpoint = getRestEndpoint(); + endpoint = endpoint + "job/" + jobInfo.getId() + "/batch"; + Transport transport = config.createTransport(); + ZipOutputStream zipOut = new ZipOutputStream(transport.connect(endpoint, getHeaders(getContentTypeString( + jobInfo.getContentType(), true)), false)); + try { + if (batchContent != null) { + zipOut.putNextEntry(new ZipEntry("request.txt")); + FileUtil.copy(batchContent, zipOut, false); + } + for (Map.Entry entry : attachments.entrySet()) { + zipOut.putNextEntry(new ZipEntry(entry.getKey())); + FileUtil.copy(entry.getValue(), zipOut, false); + } + } finally { + zipOut.close(); + } + InputStream result = transport.getContent(); + if (jobInfo.getContentType() == ContentType.JSON || jobInfo.getContentType() == ContentType.ZIP_JSON) { + return deserializeJsonToObject(result, BatchInfo.class); + } + return BatchRequest.loadBatchInfo(result); + } catch (IOException e) { + throw new AsyncApiException("Failed to create batch", AsyncExceptionCode.ClientInputError, e); + } catch (PullParserException e) { + throw new AsyncApiException("Failed to create batch", AsyncExceptionCode.ClientInputError, e); + } catch (ConnectionException e) { + throw new AsyncApiException("Failed to create batch", AsyncExceptionCode.ClientInputError, e); + } + } + + /* + * Creates a compliant Async Api batch from a stream containing an arbitrary CSV source (eg. Outlook contacts). + * The stream does not have to be UTF-8, and it's contents are transformed into a compliant + * batch using the previously created transformation specification (a mapping of columns to fields). + * The stream is still limited according to the same limit rules as apply to normal batches. + */ + public BatchInfo createBatchFromForeignCsvStream(JobInfo jobInfo, InputStream input, String charSet) + throws AsyncApiException { + try { + String endpoint = getRestEndpoint(); + Transport transport = config.createTransport(); + endpoint = endpoint + "job/" + jobInfo.getId() + "/batch"; + String contentType = getContentTypeString(ContentType.CSV, false); + if (charSet != null) { + contentType = contentType + ";charset=" + charSet; + } + HashMap httpHeaders = getHeaders(contentType); + final boolean allowZipToBeGzipped = false; + OutputStream out = transport.connect(endpoint, httpHeaders, allowZipToBeGzipped); + FileUtil.copy(input, out); + InputStream result = transport.getContent(); + if (!transport.isSuccessful()) { + parseAndThrowException(result, jobInfo.getContentType()); + } + return BatchRequest.loadBatchInfo(result); + } catch (IOException e) { + throw new AsyncApiException("Failed to create batch", AsyncExceptionCode.ClientInputError, e); + } catch (PullParserException e) { + throw new AsyncApiException("Failed to create batch", AsyncExceptionCode.ClientInputError, e); + } catch (ConnectionException e) { + throw new AsyncApiException("Failed to create batch", AsyncExceptionCode.ClientInputError, e); + } + } + + /* + * Creates a transformation specification for this job. Any subsequent batches that are submitted will + * be assumed to be non-compliant batches and will be transformed into compliant batches using this specification. + * An example spec for a Contact job might look like ... + * + * Salesforce Field,Csv Header,Value,Hint + * LastName,Surname,#N/A, + * Birthdate,Birthday,,MM-dd-YYYY + * + */ + public void createTransformationSpecFromStream(JobInfo jobInfo, InputStream input) throws AsyncApiException { + try { + String endpoint = getRestEndpoint(); + Transport transport = config.createTransport(); + endpoint = endpoint + "job/" + jobInfo.getId() + "/spec"; + String contentType = getContentTypeString(ContentType.CSV, false); + HashMap httpHeaders = getHeaders(contentType); + final boolean allowZipToBeGzipped = false; + OutputStream out = transport.connect(endpoint, httpHeaders, allowZipToBeGzipped); + FileUtil.copy(input, out); + InputStream result = transport.getContent(); + if (!transport.isSuccessful()) { + parseAndThrowException(result, jobInfo.getContentType()); + } + } catch (IOException e) { + throw new AsyncApiException("Failed to create transformation specification", AsyncExceptionCode.ClientInputError, + e); + } catch (ConnectionException e) { + throw new AsyncApiException("Failed to create transformation specification", AsyncExceptionCode.ClientInputError, + e); + } + } + + private String getContentTypeString(ContentType contentType, boolean isZip) throws AsyncApiException { + ContentType ct = contentType == null ? ContentType.XML : contentType; + if (isZip) { + switch (ct) { + case ZIP_CSV: + return ZIP_CSV_CONTENT_TYPE; + case ZIP_XML: + return ZIP_XML_CONTENT_TYPE; + case ZIP_JSON: + return ZIP_JSON_CONTENT_TYPE; + default: + // non zip content type + throw new AsyncApiException("Invalid zip content type: " + contentType, + AsyncExceptionCode.ClientInputError); + } + } else { + switch (ct) { + case XML: + return XML_CONTENT_TYPE; + case CSV: + return CSV_CONTENT_TYPE; + case JSON: + return JSON_CONTENT_TYPE; + default: + // zip content type + throw new AsyncApiException("Not expecting zip content type: " + contentType, + AsyncExceptionCode.ClientInputError); + } + } + } + + private HashMap getHeaders(String contentType) { + HashMap newMap = new HashMap(); + for (Map.Entry entry : headers.entrySet()) { + newMap.put(entry.getKey(), entry.getValue()); + } + /* 05-Apr-2012, tsaloranta: This is incorrect, as the content type for request + * should indicate type of request payload and NOT the expected response + * payload type (which would be indicated with "Accept" header). + * See [https://gus.salesforce.com/a07B0000000Lp8A] for more details. + * So ideally we would change header to use here; alas, it is bit of risky + * change right, so leaving code as is + * with just a note saying "that ain't working -- that's the way to do it!" + */ + newMap.put("Content-Type", contentType); + newMap.put(SESSION_ID, config.getSessionId()); + return newMap; + } + + public BatchRequest createBatch(JobInfo job) throws AsyncApiException { + try { + String endpoint = getRestEndpoint(); + Transport transport = config.createTransport(); + endpoint = endpoint + "job/" + job.getId() + "/batch"; + ContentType ct = job.getContentType(); + if (ct != null && ct != ContentType.XML && ct != ContentType.JSON) { + throw new AsyncApiException( + "This method can only be used with xml or JSON content type", AsyncExceptionCode.ClientInputError); + } + String jobContentType = ""; + if (ct == null) { + jobContentType = XML_CONTENT_TYPE; + } else { + switch (ct) { + case JSON: + jobContentType = JSON_CONTENT_TYPE; + break; + case XML: + default: + jobContentType = XML_CONTENT_TYPE; + break; + } + } + OutputStream out = transport.connect(endpoint, getHeaders(jobContentType)); + return new BatchRequest(transport, out); + } catch (IOException e) { + throw new AsyncApiException("Failed to create batch", AsyncExceptionCode.ClientInputError, e); + } catch (ConnectionException x) { + throw new AsyncApiException("Failed to create batch", AsyncExceptionCode.ClientInputError, x); + } + } + + public CsvBatchRequest createCsvBatch(JobInfo job) throws AsyncApiException { + try { + String endpoint = getRestEndpoint(); + Transport transport = config.createTransport(); + endpoint = endpoint + "job/" + job.getId() + "/batch"; + ContentType ct = job.getContentType(); + if (ct != null && ct != ContentType.CSV) { + throw new AsyncApiException( + "This method can only be used with csv content type", AsyncExceptionCode.ClientInputError); + } + OutputStream out = transport.connect(endpoint, getHeaders(CSV_CONTENT_TYPE)); + return new CsvBatchRequest(transport, out); + } catch (IOException e) { + throw new AsyncApiException("Failed to create batch", AsyncExceptionCode.ClientInputError, e); + } catch (ConnectionException e) { + throw new AsyncApiException("Failed to create batch", AsyncExceptionCode.ClientInputError, e); + } + } + + public TransformationSpecRequest createTransformationSpec(JobInfo job) throws AsyncApiException { + try { + String endpoint = getRestEndpoint(); + Transport transport = config.createTransport(); + endpoint = endpoint + "job/" + job.getId() + "/spec"; + ContentType ct = job.getContentType(); + if (ct != null && ct != ContentType.CSV) { + throw new AsyncApiException( + "This method can only be used with csv content type", AsyncExceptionCode.ClientInputError); + } + final boolean allowZipToBeGzipped = false; + OutputStream out = transport.connect(endpoint, getHeaders(CSV_CONTENT_TYPE), allowZipToBeGzipped); + return new TransformationSpecRequest(transport, out); + } catch (IOException e) { + throw new AsyncApiException("Failed to create transformation spec", AsyncExceptionCode.ClientInputError, e); + } catch (ConnectionException e) { + throw new AsyncApiException("Failed to create transformation spec", AsyncExceptionCode.ClientInputError, e); + } + } + + public BatchInfoList getBatchInfoList(String jobId) throws AsyncApiException { + return getBatchInfoList(jobId, ContentType.XML); + } + + public BatchInfoList getBatchInfoList(String jobId, ContentType contentType) throws AsyncApiException { + InputStream stream = null; // %EDIT% + try { + String endpoint = getRestEndpoint() + "job/" + jobId + "/batch/"; + URL url = new URL(endpoint); + stream = doHttpGet(url); // %EDIT% + if (contentType == ContentType.JSON || contentType == ContentType.ZIP_JSON) { + return deserializeJsonToObject(stream, BatchInfoList.class); + } else { + XmlInputStream xin = new XmlInputStream(); + xin.setInput(stream, "UTF-8"); + BatchInfoList result = new BatchInfoList(); + result.load(xin, typeMapper); + return result; + } + } catch (IOException e) { + throw new AsyncApiException("Failed to get batch info list ", AsyncExceptionCode.ClientInputError, e); + } catch (PullParserException e) { + throw new AsyncApiException("Failed to get batch info list ", AsyncExceptionCode.ClientInputError, e); + } catch (ConnectionException e) { + throw new AsyncApiException("Failed to get batch info list ", AsyncExceptionCode.ClientInputError, e); + } finally { // %EDIT% + if (stream != null) { // %EDIT% + try { // %EDIT% + stream.close(); // %EDIT% + } catch (IOException e) { // %EDIT% + logger.warn("Failed to close output stream with error: {}, cause: {}", e.getMessage(), + e.getCause()); // %EDIT% + } // %EDIT% + } // %EDIT% + } + } + + public BatchInfo getBatchInfo(String jobId, String batchId) throws AsyncApiException { + return getBatchInfo(jobId, batchId, ContentType.XML); + } + + public BatchInfo getBatchInfo(String jobId, String batchId, ContentType contentType) throws AsyncApiException { + InputStream stream = null; // %EDIT% + try { + String endpoint = getRestEndpoint() + "job/" + jobId + "/batch/" + batchId; + URL url = new URL(endpoint); + stream = doHttpGet(url); // %EDIT% + if (contentType == ContentType.JSON || contentType == ContentType.ZIP_JSON) { + return deserializeJsonToObject(stream, BatchInfo.class); + } else { + XmlInputStream xin = new XmlInputStream(); + xin.setInput(stream, "UTF-8"); + BatchInfo result = new BatchInfo(); + result.load(xin, typeMapper); + return result; + } + } catch (IOException e) { + throw new AsyncApiException("Failed to parse batch info ", AsyncExceptionCode.ClientInputError, e); + } catch (PullParserException e) { + throw new AsyncApiException("Failed to parse batch info ", AsyncExceptionCode.ClientInputError, e); + } catch (ConnectionException e) { + throw new AsyncApiException("Failed to parse batch info ", AsyncExceptionCode.ClientInputError, e); + } finally { // %EDIT% + if (stream != null) { // %EDIT% + try { // %EDIT% + stream.close(); // %EDIT% + } catch (IOException e) { // %EDIT% + logger.warn("Failed to close output stream with error: {}, cause: {}", e.getMessage(), + e.getCause()); // %EDIT% + } // %EDIT% + } + } + } + + public BatchResult getBatchResult(String jobId, String batchId) throws AsyncApiException { + return getBatchResult(jobId, batchId, ContentType.XML); + } + + public BatchResult getBatchResult(String jobId, String batchId, ContentType contentType) throws AsyncApiException { + InputStream stream = null; //%EDIT% + try { + stream = doHttpGet(buildBatchResultURL(jobId, batchId)); //%EDIT% + if (contentType == ContentType.JSON || contentType == ContentType.ZIP_JSON) { + BatchResult batchResult = new BatchResult(); + Result[] results = deserializeJsonToObject(stream, Result[].class); + batchResult.setResult(results); + return batchResult; + } else { + XmlInputStream xin = new XmlInputStream(); + xin.setInput(stream, "UTF-8"); + BatchResult result = new BatchResult(); + result.load(xin, typeMapper); + return result; + } + } catch (PullParserException e) { + throw new AsyncApiException("Failed to parse result ", AsyncExceptionCode.ClientInputError, e); + } catch (IOException e) { + throw new AsyncApiException("Failed to get result ", AsyncExceptionCode.ClientInputError, e); + } catch (ConnectionException e) { + throw new AsyncApiException("Failed to get result ", AsyncExceptionCode.ClientInputError, e); + } finally { // %EDIT% + if (stream != null) { // %EDIT% + try { // %EDIT% + stream.close(); // %EDIT% + } catch (IOException e) { // %EDIT% + logger.warn("Failed to close output stream with error: {}, cause: {}", e.getMessage(), + e.getCause()); // %EDIT% + } // %EDIT% + } + } + } + + public InputStream getBatchResultStream(String jobId, String batchId) throws AsyncApiException { + try { + String endpoint = getRestEndpoint() + "job/" + jobId + "/batch/" + batchId + "/result"; + URL url = new URL(endpoint); + return doHttpGet(url); + } catch (IOException e) { + throw new AsyncApiException("Failed to get result ", AsyncExceptionCode.ClientInputError, e); + } + } + + public URL buildBatchResultURL(String jobId, String batchId) throws AsyncApiException { + try { + return new URL(getRestEndpoint() + "job/" + jobId + "/batch/" + batchId + "/result"); + } catch (MalformedURLException e) { + throw new AsyncApiException("Failed to construct URL for getting batch results: " + e.getMessage(), + AsyncExceptionCode.ClientInputError, e); + } + } + + public InputStream getBatchRequestInputStream(String jobId, String batchId) throws AsyncApiException { + try { + String endpoint = getRestEndpoint() + "job/" + jobId + "/batch/" + batchId + "/request"; + URL url = new URL(endpoint); + return doHttpGet(url); + } catch (IOException e) { + throw new AsyncApiException("Failed to get request ", AsyncExceptionCode.ClientInputError, e); + } + } + + public QueryResultList getQueryResultList(String jobId, String batchId) throws AsyncApiException { + return getQueryResultList(jobId, batchId, ContentType.XML); + } + + public QueryResultList getQueryResultList(String jobId, String batchId, ContentType contentType) throws + AsyncApiException { + InputStream stream = getBatchResultStream(jobId, batchId); + try { + if (contentType == ContentType.JSON || contentType == ContentType.ZIP_JSON) { + String[] results = deserializeJsonToObject(stream, String[].class); + QueryResultList list = new QueryResultList(); + list.setResult(results); + return list; + } else { + XmlInputStream xin = new XmlInputStream(); + xin.setInput(stream, "UTF-8"); + QueryResultList result = new QueryResultList(); + result.load(xin, typeMapper); + return result; + } + } catch (ConnectionException e) { + throw new AsyncApiException("Failed to parse query result list ", AsyncExceptionCode.ClientInputError, e); + } catch (PullParserException e) { + throw new AsyncApiException("Failed to parse query result list ", AsyncExceptionCode.ClientInputError, e); + } catch (IOException e) { + throw new AsyncApiException("Failed to parse query result list ", AsyncExceptionCode.ClientInputError, e); + } finally { // %EDIT% + if (stream != null) { // %EDIT% + try { // %EDIT% + stream.close(); // %EDIT% + } catch (IOException e) { // %EDIT% + logger.warn("Failed to close output stream with error: {}, cause: {}", e.getMessage(), + e.getCause()); // %EDIT% + } // %EDIT% + } + } + } + + public InputStream getQueryResultStream(String jobId, String batchId, String resultId) throws AsyncApiException { + try { + return doHttpGet(buildQueryResultURL(jobId, batchId, resultId)); + } catch (IOException e) { + throw new AsyncApiException("Failed to get result ", AsyncExceptionCode.ClientInputError, e); + } + } + + public URL buildQueryResultURL(String jobId, String batchId, String resultId) throws AsyncApiException { + try { + return new URL(getRestEndpoint() + "job/" + jobId + "/batch/" + batchId + "/result" + "/" + resultId); + } catch (MalformedURLException e) { + throw new AsyncApiException("Failed to construct URL for getting query result: " + e.getMessage(), + AsyncExceptionCode.ClientInputError, e); + } + } + + private InputStream doHttpGet(URL url) throws IOException, AsyncApiException { + HttpURLConnection connection = config.createConnection(url, null); + SSLContext sslContext = config.getSslContext(); + if (sslContext != null && connection instanceof HttpsURLConnection) { + ((HttpsURLConnection) connection).setSSLSocketFactory(sslContext.getSocketFactory()); + } + connection.setRequestProperty(SESSION_ID, config.getSessionId()); + boolean success = true; + InputStream in = null; // %EDIT% + byte[] responseBody = null; // %EDIT% + try { + in = connection.getInputStream(); + } catch (IOException e) { + logger.warn("IOException occurred while doHttpGet", e); // %EDIT% + success = false; + InputStream errorStream = connection.getErrorStream(); // Get error stream separately // %EDIT% + if (errorStream != null) { // %EDIT% + try { // %EDIT% + responseBody = ByteStreams.toByteArray(errorStream); // %EDIT% + logger.warn("Received error response from {}. Status: {}. Response body: {}", // %EDIT% + url.getPath(), connection.getResponseCode(), new String(responseBody)); // %EDIT% + in = new ByteArrayInputStream(responseBody); // %EDIT% + } catch (IOException logEx) { // %EDIT% + logger.warn("Failed to read error stream for logging.", logEx); // %EDIT% + // If reading for logging fails, and we couldn't get the response body, re-throw the original exception + // %EDIT% + if (responseBody == null) { // %EDIT% + throw e; // %EDIT% + } // %EDIT% + } finally { // %EDIT% + try { // %EDIT% + errorStream.close(); // %EDIT% + } catch (IOException closeEx) { // %EDIT% + logger.warn("Failed to close error stream after reading.", closeEx); // %EDIT% + } // %EDIT% + } // %EDIT% + } else { // %EDIT% + logger.warn("Received error response from {}. Status: {}. Error stream is null.", + url.getPath(), connection.getResponseCode()); // %EDIT% + // If error stream is null, re-throw the original exception as we have no response to process // %EDIT% + throw e; // %EDIT% + } + } + + // Check if 'in' is still null after the try-catch block // %EDIT% + if (in == null) { // %EDIT% + // This should ideally not happen if the above logic is correct, but as a safeguard // %EDIT% + throw new IOException("Failed to obtain input or error stream from HTTP connection."); // %EDIT% + } // %EDIT% + + String encoding = connection.getHeaderField("Content-Encoding"); + if ("gzip".equals(encoding)) { + in = new GZIPInputStream(in); + } + if (config.isTraceMessage() || config.hasMessageHandlers()) { + byte[] bytes = FileUtil.toBytes(in); + in = new ByteArrayInputStream(bytes); + if (config.hasMessageHandlers()) { + Iterator it = config.getMessagerHandlers(); + while (it.hasNext()) { + MessageHandler handler = it.next(); + if (handler instanceof MessageHandlerWithHeaders) { + ((MessageHandlerWithHeaders) handler).handleRequest(url, new byte[0], null); + ((MessageHandlerWithHeaders) handler).handleResponse(url, bytes, connection.getHeaderFields()); + } else { + handler.handleRequest(url, new byte[0]); + handler.handleResponse(url, bytes); + } + } + } + if (config.isTraceMessage()) { + config.getTraceStream().println(url.toExternalForm()); + Map> headers = connection.getHeaderFields(); + for (Map.Entry> entry : headers.entrySet()) { + StringBuffer sb = new StringBuffer(); + List values = entry.getValue(); + if (values != null) { + for (String v : values) { + sb.append(v); + } + } + config.getTraceStream().println(entry.getKey() + ": " + sb); + } + config.teeInputStream(bytes); + } + } + if (!success) { + ContentType type = null; + String contentTypeHeader = connection.getContentType(); + if (contentTypeHeader != null) { + if (contentTypeHeader.contains(XML_CONTENT_TYPE)) { + type = ContentType.XML; + } else if (contentTypeHeader.contains(JSON_CONTENT_TYPE)) { + type = ContentType.JSON; + } + } + parseAndThrowException(in, type); + } + return in; + } + + public JobInfo getJobStatus(String jobId) throws AsyncApiException { + return getJobStatus(jobId, ContentType.XML); + } + + public JobInfo getJobStatus(String jobId, ContentType contentType) throws AsyncApiException { + InputStream in = null; // %EDIT% + try { + String endpoint = getRestEndpoint(); + endpoint += "job/" + jobId; + URL url = new URL(endpoint); + in = doHttpGet(url); //%EDIT% + if (contentType == ContentType.JSON || contentType == ContentType.ZIP_JSON) { + return deserializeJsonToObject(in, JobInfo.class); + } else { + JobInfo result = new JobInfo(); + XmlInputStream xin = new XmlInputStream(); + xin.setInput(in, "UTF-8"); + result.load(xin, typeMapper); + return result; + } + } catch (PullParserException e) { + throw new AsyncApiException("Failed to get job status ", AsyncExceptionCode.ClientInputError, e); + } catch (IOException e) { + throw new AsyncApiException("Failed to get job status ", AsyncExceptionCode.ClientInputError, e); + } catch (ConnectionException e) { + throw new AsyncApiException("Failed to get job status ", AsyncExceptionCode.ClientInputError, e); + } finally { // %EDIT% + if (in != null) { // %EDIT% + try { // %EDIT% + in.close(); // %EDIT% + } catch (IOException e) { // %EDIT% + logger.warn("Failed to close output stream with error: {}, cause: {}", e.getMessage(), e.getCause()); + // %EDIT% + } // %EDIT% + } + } + } + + public JobInfo abortJob(String jobId) throws AsyncApiException { + JobInfo job = new JobInfo(); + job.setId(jobId); + job.setState(JobStateEnum.Aborted); + return updateJob(job); + } + + public JobInfo closeJob(String jobId) throws AsyncApiException { + JobInfo job = new JobInfo(); + job.setId(jobId); + job.setState(JobStateEnum.Closed); + return updateJob(job); + } + + public JobInfo updateJob(JobInfo job) throws AsyncApiException { + return updateJob(job, ContentType.XML); + } + + public JobInfo updateJob(JobInfo job, ContentType contentType) throws AsyncApiException { + String endpoint = getRestEndpoint(); + endpoint += "job/" + job.getId(); + return createOrUpdateJob(job, endpoint, contentType); + } + + public ConnectorConfig getConfig() { + return config; + } +} diff --git a/suppressions.xml b/suppressions.xml index fd7a058f..7766b95b 100644 --- a/suppressions.xml +++ b/suppressions.xml @@ -30,8 +30,16 @@ + + + +