From 34ffa81f45c25a87cdec3d0750543e7165d6a857 Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Sun, 1 Dec 2024 08:27:53 -0500 Subject: [PATCH] CXF-9057: Chunked Stream is closed regularly when Exception is thrown (#2107) (cherry picked from commit ff6cf7b74bca04a1dfa98082d7d0fb115b64c123) (cherry picked from commit 20e65dcc26bbd20c6f61e9c9f9f207b957dc74c6) --- .../interceptor/AttachmentOutInterceptor.java | 2 + .../java/org/apache/cxf/message/Message.java | 7 + .../Soap11FaultOutInterceptor.java | 11 +- .../Soap12FaultOutInterceptor.java | 10 +- systests/jaxws/pom.xml | 12 + .../systest/jaxws/AttachmentChunkingTest.java | 180 +++++++++++ .../test/resources/attachments/cxf9057.wsdl | 84 ++++++ .../http/jaxws/JAXWSAsyncClientTest.java | 282 ++++++++++++++++++ 8 files changed, 586 insertions(+), 2 deletions(-) create mode 100644 systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/AttachmentChunkingTest.java create mode 100644 systests/jaxws/src/test/resources/attachments/cxf9057.wsdl create mode 100644 systests/transports/src/test/java/org/apache/cxf/systest/http/jaxws/JAXWSAsyncClientTest.java diff --git a/core/src/main/java/org/apache/cxf/interceptor/AttachmentOutInterceptor.java b/core/src/main/java/org/apache/cxf/interceptor/AttachmentOutInterceptor.java index 0e0b4dc5382..5fdc049e674 100644 --- a/core/src/main/java/org/apache/cxf/interceptor/AttachmentOutInterceptor.java +++ b/core/src/main/java/org/apache/cxf/interceptor/AttachmentOutInterceptor.java @@ -119,11 +119,13 @@ public void handleMessage(Message message) { AttachmentSerializer ser = message.getContent(AttachmentSerializer.class); if (ser != null) { try { + message.put(Message.PARTIAL_ATTACHMENTS_MESSAGE, true); String cte = (String)message.getContextualProperty(Message.CONTENT_TRANSFER_ENCODING); if (cte != null) { ser.setContentTransferEncoding(cte); } ser.writeAttachments(); + message.put(Message.PARTIAL_ATTACHMENTS_MESSAGE, false); } catch (IOException e) { throw new Fault(new org.apache.cxf.common.i18n.Message("WRITE_ATTACHMENTS", BUNDLE), e); } diff --git a/core/src/main/java/org/apache/cxf/message/Message.java b/core/src/main/java/org/apache/cxf/message/Message.java index 5a74b619f16..16bd59eaafa 100644 --- a/core/src/main/java/org/apache/cxf/message/Message.java +++ b/core/src/main/java/org/apache/cxf/message/Message.java @@ -81,6 +81,13 @@ public interface Message extends StringMap { String EMPTY_PARTIAL_RESPONSE_MESSAGE = "org.apache.cxf.partial.response.empty"; String ONE_WAY_REQUEST = "OnewayRequest"; + /** + * Boolean property specifying if the attachments have been partially written + * (due to I/O error, fe). + */ + String PARTIAL_ATTACHMENTS_MESSAGE = "org.apache.cxf.partial.attachments"; + + /** * Boolean property specifying if oneWay response must be processed. */ diff --git a/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/Soap11FaultOutInterceptor.java b/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/Soap11FaultOutInterceptor.java index 690e1a142e3..5e612a59326 100644 --- a/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/Soap11FaultOutInterceptor.java +++ b/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/Soap11FaultOutInterceptor.java @@ -35,6 +35,8 @@ import org.apache.cxf.common.logging.LogUtils; import org.apache.cxf.common.util.StringUtils; import org.apache.cxf.interceptor.Fault; +import org.apache.cxf.message.Message; +import org.apache.cxf.message.MessageUtils; import org.apache.cxf.phase.Phase; import org.apache.cxf.staxutils.StaxUtils; @@ -61,9 +63,16 @@ static class Soap11FaultOutInterceptorInternal extends AbstractSoapInterceptor { super(Phase.MARSHAL); } public void handleMessage(SoapMessage message) throws Fault { - XMLStreamWriter writer = message.getContent(XMLStreamWriter.class); Fault f = (Fault) message.getContent(Exception.class); + + // If only some attachments have been written (usually, using chunked transfer), we could + // have been streaming some data already and may not be able to inject a fault in the middle + // of the data transfer. + if (MessageUtils.getContextualBoolean(message, Message.PARTIAL_ATTACHMENTS_MESSAGE, false)) { + throw f; + } + XMLStreamWriter writer = message.getContent(XMLStreamWriter.class); SoapFault fault = SoapFault.createFault(f, message.getVersion()); try { diff --git a/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/Soap12FaultOutInterceptor.java b/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/Soap12FaultOutInterceptor.java index 35a6a5bae83..35cd7498a63 100644 --- a/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/Soap12FaultOutInterceptor.java +++ b/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/Soap12FaultOutInterceptor.java @@ -37,6 +37,7 @@ import org.apache.cxf.common.util.StringUtils; import org.apache.cxf.interceptor.Fault; import org.apache.cxf.message.Message; +import org.apache.cxf.message.MessageUtils; import org.apache.cxf.phase.Phase; import org.apache.cxf.staxutils.StaxUtils; @@ -64,9 +65,16 @@ static class Soap12FaultOutInterceptorInternal extends AbstractSoapInterceptor { } public void handleMessage(SoapMessage message) throws Fault { LOG.info(getClass() + (String) message.get(Message.CONTENT_TYPE)); + Fault f = (Fault)message.getContent(Exception.class); + + // If only some attachments have been written (usually, using chunked transfer), we could + // have been streaming some data already and may not be able to inject a fault in the middle + // of the data transfer. + if (MessageUtils.getContextualBoolean(message, Message.PARTIAL_ATTACHMENTS_MESSAGE, false)) { + throw f; + } XMLStreamWriter writer = message.getContent(XMLStreamWriter.class); - Fault f = (Fault)message.getContent(Exception.class); message.put(org.apache.cxf.message.Message.RESPONSE_CODE, f.getStatusCode()); SoapFault fault = SoapFault.createFault(f, message.getVersion()); diff --git a/systests/jaxws/pom.xml b/systests/jaxws/pom.xml index 2200a3b1075..9fc43cc679d 100644 --- a/systests/jaxws/pom.xml +++ b/systests/jaxws/pom.xml @@ -63,6 +63,18 @@ wsdl2java + + generate-attachments-test-sources + generate-test-sources + + ${cxf.codegenplugin.forkmode} + ${basedir}/target/generated/src/test/java + ${basedir}/src/test/resources/attachments + + + wsdl2java + + diff --git a/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/AttachmentChunkingTest.java b/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/AttachmentChunkingTest.java new file mode 100644 index 00000000000..b6e9259b4e1 --- /dev/null +++ b/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/AttachmentChunkingTest.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.systest.jaxws; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.concurrent.ThreadLocalRandom; +import java.util.logging.Logger; + +import javax.activation.DataHandler; +import javax.activation.DataSource; +import javax.xml.ws.Binding; +import javax.xml.ws.BindingProvider; +import javax.xml.ws.Endpoint; +import javax.xml.ws.soap.SOAPBinding; +import javax.xml.ws.soap.SOAPFaultException; + +import org.apache.cxf.Download; +import org.apache.cxf.DownloadFault_Exception; +import org.apache.cxf.DownloadNextResponseType; +import org.apache.cxf.common.logging.LogUtils; +import org.apache.cxf.jaxws.JaxWsProxyFactoryBean; +import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase; +import org.apache.cxf.testutil.common.AbstractBusTestServerBase; + +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +public class AttachmentChunkingTest extends AbstractBusClientServerTestBase { + private static final String PORT = allocatePort(DownloadServer.class); + private static final Logger LOG = LogUtils.getLogger(AttachmentChunkingTest.class); + + private static final class DownloadImpl implements Download { + @Override + public DownloadNextResponseType downloadNext(Boolean simulate) { + final DownloadNextResponseType responseType = new DownloadNextResponseType(); + responseType.setDataContent(new DataHandler(new DataSource() { + @Override + public InputStream getInputStream() { + if (simulate) { + return simulate(); + } else { + return generate(100000); + } + } + + @Override + public OutputStream getOutputStream() { + return null; + } + + @Override + public String getContentType() { + return ""; + } + + @Override + public String getName() { + return ""; + } + })); + + return responseType; + } + } + + public static class DownloadServer extends AbstractBusTestServerBase { + protected void run() { + Object implementor = new DownloadImpl(); + String address = "http://localhost:" + PORT + "/SoapContext/SoapPort"; + Endpoint.publish(address, implementor); + } + + public static void main(String[] args) { + try { + DownloadServer s = new DownloadServer(); + s.start(); + } catch (Exception ex) { + ex.printStackTrace(); + System.exit(-1); + } finally { + LOG.info("done!"); + } + } + } + + @BeforeClass + public static void startServers() throws Exception { + assertTrue("server did not launch correctly", launchServer(DownloadServer.class, true)); + } + + @Test + public void testChunkingPartialFailure() { + final JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean(); + factory.setServiceClass(Download.class); + + final Download client = (Download) factory.create(); + final BindingProvider bindingProvider = (BindingProvider) client; + final Binding binding = bindingProvider.getBinding(); + + final String address = String.format("http://localhost:%s/SoapContext/SoapPort/DownloadPort", PORT); + bindingProvider.getRequestContext().put("jakarta.xml.ws.service.endpoint.address", address); + ((SOAPBinding) binding).setMTOMEnabled(true); + + // See please https://issues.apache.org/jira/browse/CXF-9057 + SOAPFaultException ex = assertThrows(SOAPFaultException.class, () -> client.downloadNext(true)); + assertThat(ex.getMessage(), containsString("simulated error during stream processing")); + } + + @Test + public void testChunking() throws IOException, DownloadFault_Exception { + final JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean(); + factory.setServiceClass(Download.class); + + final Download client = (Download) factory.create(); + final BindingProvider bindingProvider = (BindingProvider) client; + final Binding binding = bindingProvider.getBinding(); + + final String address = String.format("http://localhost:%s/SoapContext/SoapPort/DownloadPort", PORT); + bindingProvider.getRequestContext().put("jakarta.xml.ws.service.endpoint.address", address); + ((SOAPBinding) binding).setMTOMEnabled(true); + + final DownloadNextResponseType response = client.downloadNext(false); + assertThat(response.getDataContent().getInputStream().readAllBytes().length, equalTo(100000)); + } + + private static InputStream generate(int size) { + final byte[] buf = new byte[size]; + Arrays.fill(buf, (byte) 'x'); + return new ByteArrayInputStream(buf); + } + + private static InputStream simulate() { + return new InputStream() { + @Override + public int read() { + return (byte) 'x'; + } + + @Override + public int read(byte[] b, int off, int len) { + if (ThreadLocalRandom.current().nextBoolean()) { + throw new IllegalArgumentException("simulated error during stream processing"); + } + + for (int i = off; i < off + len; i++) { + b[i] = (byte) 'x'; + } + + return len; + } + }; + } +} diff --git a/systests/jaxws/src/test/resources/attachments/cxf9057.wsdl b/systests/jaxws/src/test/resources/attachments/cxf9057.wsdl new file mode 100644 index 00000000000..721fa08bb44 --- /dev/null +++ b/systests/jaxws/src/test/resources/attachments/cxf9057.wsdl @@ -0,0 +1,84 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/systests/transports/src/test/java/org/apache/cxf/systest/http/jaxws/JAXWSAsyncClientTest.java b/systests/transports/src/test/java/org/apache/cxf/systest/http/jaxws/JAXWSAsyncClientTest.java new file mode 100644 index 00000000000..40e287b441d --- /dev/null +++ b/systests/transports/src/test/java/org/apache/cxf/systest/http/jaxws/JAXWSAsyncClientTest.java @@ -0,0 +1,282 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.systest.http.jaxws; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.StringWriter; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import javax.jws.WebService; +import javax.xml.namespace.QName; +import javax.xml.soap.MessageFactory; +import javax.xml.soap.SOAPException; +import javax.xml.soap.SOAPMessage; +import javax.xml.transform.Transformer; +import javax.xml.transform.TransformerFactory; +import javax.xml.transform.dom.DOMSource; +import javax.xml.transform.stream.StreamResult; +import javax.xml.ws.Dispatch; +import javax.xml.ws.Response; +import javax.xml.ws.Service; +import javax.xml.ws.soap.SOAPBinding; +import javax.xml.ws.soap.SOAPFaultException; + +import org.apache.cxf.endpoint.Client; +import org.apache.cxf.frontend.ClientProxy; +import org.apache.cxf.greeter_control.AbstractGreeterImpl; +import org.apache.cxf.greeter_control.Greeter; +import org.apache.cxf.greeter_control.types.GreetMeResponse; +import org.apache.cxf.interceptor.LoggingInInterceptor; +import org.apache.cxf.interceptor.LoggingOutInterceptor; +import org.apache.cxf.jaxws.JaxWsProxyFactoryBean; +import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase; +import org.apache.cxf.testutil.common.AbstractBusTestServerBase; +import org.apache.cxf.transport.http.HTTPConduit; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class JAXWSAsyncClientTest extends AbstractBusClientServerTestBase { + static final String PORT = allocatePort(Server.class); + static ScheduledExecutorService executor; + + public static class Server extends AbstractBusTestServerBase { + + protected void run() { + GreeterImpl implementor = new GreeterImpl(); + String address = "http://localhost:" + PORT + "/SoapContext/GreeterPort"; + javax.xml.ws.Endpoint.publish(address, implementor); + } + + public static void main(String[] args) { + try { + Server s = new Server(); + s.start(); + } catch (Exception ex) { + ex.printStackTrace(); + System.exit(-1); + } finally { + System.out.println("done!"); + } + } + + @WebService(serviceName = "BasicGreeterService", + portName = "GreeterPort", + endpointInterface = "org.apache.cxf.greeter_control.Greeter", + targetNamespace = "http://cxf.apache.org/greeter_control", + wsdlLocation = "testutils/greeter_control.wsdl") + public class GreeterImpl extends AbstractGreeterImpl { + @Override + public String greetMe(String arg) { + if ("timeout".equalsIgnoreCase(arg)) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // Do nothing + } + } + + return super.greetMe(arg); + } + } + } + + + @BeforeClass + public static void startServers() throws Exception { + assertTrue("server did not launch correctly", launchServer(Server.class, true)); + executor = Executors.newScheduledThreadPool(5); + } + + @AfterClass + public static void stopServers() throws Exception { + stopAllServers(); + if (executor != null) { + executor.shutdown(); + if (!executor.awaitTermination(1, TimeUnit.MINUTES)) { + executor.shutdownNow(); + } + } + } + + @Test + public void testAsyncClient() throws Exception { + // setup the feature by using JAXWS front-end API + JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean(); + factory.setAddress("http://localhost:" + PORT + "/SoapContext/GreeterPort"); + factory.setServiceClass(Greeter.class); + Greeter proxy = factory.create(Greeter.class); + + Response response = proxy.greetMeAsync("cxf"); + int waitCount = 0; + while (!response.isDone() && waitCount < 15) { + Thread.sleep(1000); + waitCount++; + } + + assertTrue("Response still not received.", response.isDone()); + assertThat(response.get().getResponseType(), equalTo("CXF")); + } + + @Test + public void testAsyncClientChunking() throws Exception { + // setup the feature by using JAXWS front-end API + JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean(); + factory.setAddress("http://localhost:" + PORT + "/SoapContext/GreeterPort"); + factory.getOutInterceptors().add(new LoggingOutInterceptor()); + factory.getInInterceptors().add(new LoggingInInterceptor()); + factory.setServiceClass(Greeter.class); + Greeter proxy = factory.create(Greeter.class); + + Client client = ClientProxy.getClient(proxy); + HTTPConduit http = (HTTPConduit) client.getConduit(); + http.getClient().setAllowChunking(true); + + final char[] bytes = new char [32 * 1024]; + final Random random = new Random(); + for (int i = 0; i < bytes.length; ++i) { + bytes[i] = (char)(random.nextInt(26) + 'a'); + } + + final String greeting = new String(bytes); + Response response = proxy.greetMeAsync(greeting); + int waitCount = 0; + while (!response.isDone() && waitCount < 15) { + Thread.sleep(1000); + waitCount++; + } + + assertTrue("Response still not received.", response.isDone()); + assertThat(response.get().getResponseType(), equalTo(greeting.toUpperCase())); + } + + @Test + public void testTimeout() throws Exception { + // setup the feature by using JAXWS front-end API + JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean(); + factory.setAddress("http://localhost:" + PORT + "/SoapContext/GreeterPort"); + factory.setServiceClass(Greeter.class); + Greeter proxy = factory.create(Greeter.class); + + HTTPConduit cond = (HTTPConduit)((Client)proxy).getConduit(); + cond.getClient().setReceiveTimeout(500); + + try { + proxy.greetMeAsync("timeout").get(); + fail("Should have faulted"); + } catch (SOAPFaultException ex) { + fail("should not be a SOAPFaultException"); + } catch (ExecutionException ex) { + //expected + assertTrue(ex.getCause().getClass().getName(), + ex.getCause() instanceof java.net.ConnectException + || ex.getCause() instanceof java.net.SocketTimeoutException); + } + } + + /** + * Not 100% reproducible but used to sporadically fail with: + * + * java.util.concurrent.ExecutionException: jakarta.xml.ws.soap.SOAPFaultException: + * Cannot invoke "org.w3c.dom.Node.getOwnerDocument()" because "nd" is null + * at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) + * at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) + * at org.apache.cxf.endpoint.ClientCallback.get(ClientCallback.java:139) + * at org.apache.cxf.jaxws.JaxwsResponseCallback.get(JaxwsResponseCallback.java:48) + * at org.apache.cxf.systest.hc5.jaxws.JAXWSAsyncClientTest.testAsyncWsdl(JAXWSAsyncClientTest.java:193) + * + * @see https://issues.apache.org/jira/browse/CXF-9007 + */ + @Test + public void testAsyncWsdl() throws Exception { + final URL wsdlUrl = getClass().getClassLoader().getResource("greeting.wsdl"); + + final Service service = Service.create(wsdlUrl, new QName("http://apache.org/hello_world", "SOAPService")); + + service.addPort(new QName("http://apache.org/hello_world", "Greeter"), SOAPBinding.SOAP11HTTP_BINDING, + "http://localhost:" + PORT + "/SoapContext/GreeterPort"); + + final Dispatch client = service.createDispatch( + new QName("http://apache.org/hello_world", "Greeter"), + SOAPMessage.class, + Service.Mode.MESSAGE + ); + + final List>> tasks = new ArrayList<>(); + for (int i = 0; i < 50; i++) { + tasks.add(executor.submit(() -> client.invokeAsync(buildMessage()))); + } + + for (Future> task : tasks) { + final SOAPMessage result = task.get(5, TimeUnit.SECONDS).get(); + verifyResult(result); + } + } + + private static void verifyResult(SOAPMessage message) throws Exception { + TransformerFactory tf = TransformerFactory.newInstance(); + Transformer transformer = tf.newTransformer(); + + String output = null; + try (StringWriter writer = new StringWriter()) { + final DOMSource source = new DOMSource(message.getSOAPBody().extractContentAsDocument()); + transformer.transform(source, new StreamResult(writer)); + output = writer.getBuffer().toString().replaceAll("\n|\r", ""); + } + + assertThat(output, containsString("HELLO")); + } + + private static SOAPMessage buildMessage() throws SOAPException, IOException { + String soapMessage = + "" + + "" + + " " + + " " + + " Hello" + + " " + + " " + + ""; + + SOAPMessage message = null; + try (ByteArrayInputStream bis = new ByteArrayInputStream(soapMessage.getBytes())) { + message = MessageFactory.newInstance().createMessage(null, bis); + message.saveChanges(); + } + + return message; + } +}