Skip to content
This repository was archived by the owner on Jan 1, 2019. It is now read-only.

Update AKKA to the lastest version ,add support http method for Trace… #67

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>io.parallec</groupId>
<artifactId>parallec-core</artifactId>
<version>0.9.4-beta</version>
<version>0.10.6</version>
<packaging>jar</packaging>
<name>io.parallec:parallec-core</name>
<url>https://github.com/eBay/parallec</url>
Expand Down Expand Up @@ -56,7 +56,7 @@
<commons-codec.version>1.10</commons-codec.version>
<commons-io.version>2.3</commons-io.version>

<async-http-client.version>1.6.5</async-http-client.version>
<async-http-client.version>1.9.40</async-http-client.version>
<akka-actor_2.10.version>2.3.3</akka-actor_2.10.version>
<akka-cluster_2.10.version>2.3.3</akka-cluster_2.10.version>

Expand Down
77 changes: 67 additions & 10 deletions src/main/java/io/parallec/core/ParallelClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,18 @@
*/
package io.parallec.core;

import com.ning.http.client.AsyncHttpClient;
import io.parallec.core.actor.ActorConfig;
import io.parallec.core.monitor.MonitorProvider;
import io.parallec.core.resources.HttpClientStore;
import io.parallec.core.resources.HttpClientType;
import io.parallec.core.resources.HttpMethod;
import io.parallec.core.resources.TcpSshPingResourceStore;
import io.parallec.core.resources.TcpUdpSshPingResourceStore;
import io.parallec.core.task.ParallelTaskManager;

import java.util.concurrent.atomic.AtomicBoolean;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.ning.http.client.AsyncHttpClient;
import java.util.concurrent.atomic.AtomicBoolean;

/**
*
Expand Down Expand Up @@ -95,7 +93,7 @@ public class ParallelClient {
public HttpClientStore httpClientStore = HttpClientStore.getInstance();

/** The tcp client store. */
public TcpSshPingResourceStore tcpSshPingResourceStore = TcpSshPingResourceStore.getInstance();
public TcpUdpSshPingResourceStore tcpSshPingResourceStore = TcpUdpSshPingResourceStore.getInstance();

/** The is closed is marked when all resources are released/not initialized. */
public static AtomicBoolean isClosed = new AtomicBoolean(true);
Expand All @@ -116,7 +114,6 @@ public void initialize() {
ActorConfig.createAndGetActorSystem();
httpClientStore.init();
tcpSshPingResourceStore.init();
ParallelTaskManager.getInstance();
isClosed.set(false);
logger.info("Parallel Client Resources has been initialized.");
} else {
Expand All @@ -141,8 +138,8 @@ public void releaseExternalResources() {
ActorConfig.shutDownActorSystemForce();
httpClientStore.shutdown();
tcpSshPingResourceStore.shutdown();
taskManager.cleanWaitTaskQueue();
taskManager.cleanInprogressJobMap();
cleanWaitTaskQueue();
cleanInprogressJobMap();
isClosed.set(true);
logger.info("Have released all ParallelClient resources "
+ "(actor system + async+sync http client + task queue)"
Expand Down Expand Up @@ -170,7 +167,7 @@ public void reinitIfClosed() {
} catch (InterruptedException e) {
logger.error("error reinit httpClientStore", e);
}
isClosed.set(true);
isClosed.set(false);
logger.info("Parallel Client Resources has been reinitialized.");
} else {
logger.debug("NO OP. Resource was not released.");
Expand Down Expand Up @@ -215,6 +212,21 @@ public ParallelTaskBuilder prepareTcp(String command) {
cb.getTcpMeta().setCommand(command);
return cb;
}

/**
* Prepare a parallel UDP Task.
*
* @param command
* the command
* @return the parallel task builder
*/
public ParallelTaskBuilder prepareUdp(String command) {
reinitIfClosed();
ParallelTaskBuilder cb = new ParallelTaskBuilder();
cb.setProtocol(RequestProtocol.UDP);
cb.getUdpMeta().setCommand(command);
return cb;
}

/**
* Prepare a parallel HTTP GET Task.
Expand Down Expand Up @@ -310,7 +322,52 @@ public ParallelTaskBuilder prepareHttpOptions(String url) {
return cb;

}
/**
* Prepare a parallel HTTP Trace Task.
*
* @param url
* the UrlPostfix: e.g. in http://localhost:8080/index.html.,the url is "/index.html"
* @return the parallel task builder
*/
public ParallelTaskBuilder prepareHttpTrace(String url) {
reinitIfClosed();
ParallelTaskBuilder cb = new ParallelTaskBuilder();
cb.getHttpMeta().setHttpMethod(HttpMethod.TRACE);
cb.getHttpMeta().setRequestUrlPostfix(url);
return cb;

}

/**
* Prepare a parallel HTTP Connect Task.
*
* @param url
* the UrlPostfix: e.g. in http://localhost:8080/index.html.,the url is "/index.html"
* @return the parallel task builder
*/
public ParallelTaskBuilder prepareHttpConnect(String url) {
reinitIfClosed();
ParallelTaskBuilder cb = new ParallelTaskBuilder();
cb.getHttpMeta().setHttpMethod(HttpMethod.CONNECT);
cb.getHttpMeta().setRequestUrlPostfix(url);
return cb;

}
/**
* Prepare a parallel HTTP PATCH Task.
*
* @param url
* the UrlPostfix: e.g. in http://localhost:8080/index.html.,the url is "/index.html"
* @return the parallel task builder
*/
public ParallelTaskBuilder prepareHttpPatch(String url) {
reinitIfClosed();
ParallelTaskBuilder cb = new ParallelTaskBuilder();
cb.getHttpMeta().setHttpMethod(HttpMethod.PATCH);
cb.getHttpMeta().setRequestUrlPostfix(url);
return cb;

}
/**
* Sets the custom fast client in the httpClientStore.
*
Expand Down
84 changes: 67 additions & 17 deletions src/main/java/io/parallec/core/actor/HttpWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

import io.parallec.core.actor.message.ResponseOnSingeRequest;
import io.parallec.core.actor.message.type.RequestWorkerMsgType;
import io.parallec.core.bean.ResponseHeaderMeta;
import io.parallec.core.config.ParallecGlobalConfig;
import io.parallec.core.exception.ActorMessageTypeInvalidException;
import io.parallec.core.exception.HttpRequestCreateException;
import io.parallec.core.resources.HttpMethod;
Expand All @@ -25,6 +27,9 @@

import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;

Expand All @@ -44,9 +49,9 @@
import com.ning.http.client.AsyncHttpClient.BoundRequestBuilder;
import com.ning.http.client.ListenableFuture;
import com.ning.http.client.Response;
import com.ning.http.util.AsyncHttpProviderUtils;



// TODO: Auto-generated Javadoc
/**
* This is an akka actor with async http client.
*
Expand All @@ -73,9 +78,14 @@ public class HttpWorker extends UntypedActor {
private static Logger logger = LoggerFactory.getLogger(HttpWorker.class);

/** The http header map. */
// 20140310
private final Map<String, String> httpHeaderMap = new HashMap<String, String>();

/**
* The response header meta: which keys are needed to get from response
* header.
*/
private final ResponseHeaderMeta responseHeaderMeta;

/** The sender. */
private ActorRef sender = null;

Expand All @@ -98,7 +108,7 @@ public class HttpWorker extends UntypedActor {

/** The response future. */
ListenableFuture<ResponseOnSingeRequest> responseFuture = null;

/**
* Instantiates a new http worker.
*
Expand All @@ -108,11 +118,13 @@ public class HttpWorker extends UntypedActor {
* @param httpMethod the http method
* @param postData the post data
* @param httpHeaderMap the http header map
* @param responseHeaderMeta the response header meta
*/
public HttpWorker(final int actorMaxOperationTimeoutSec,
final AsyncHttpClient client, final String requestUrl,
final HttpMethod httpMethod, final String postData,
final Map<String, String> httpHeaderMap
final Map<String, String> httpHeaderMap,
final ResponseHeaderMeta responseHeaderMeta

) {
this.actorMaxOperationTimeoutSec = actorMaxOperationTimeoutSec;
Expand All @@ -122,6 +134,7 @@ public HttpWorker(final int actorMaxOperationTimeoutSec,
this.postData = postData;
if (httpHeaderMap != null)
this.httpHeaderMap.putAll(httpHeaderMap);
this.responseHeaderMeta = responseHeaderMeta;

}

Expand Down Expand Up @@ -159,6 +172,17 @@ public BoundRequestBuilder createRequest()
case DELETE:
builder = client.prepareDelete(requestUrl);
break;
case TRACE:
builder = client.prepareTrace(requestUrl);
break;
case CONNECT:
builder = client.prepareConnect(requestUrl);
break;
case PATCH:
builder = client.preparePatch(requestUrl);
break;


default:
break;
}
Expand Down Expand Up @@ -233,7 +257,7 @@ public void onReceive(Object message) throws Exception {
sender = getSender();
reply(null, true, PcConstants.REQUEST_CANCELED,
PcConstants.REQUEST_CANCELED, PcConstants.NA,
PcConstants.NA_INT);
PcConstants.NA_INT, null);
break;

case PROCESS_ON_EXCEPTION:
Expand All @@ -243,7 +267,7 @@ public void onReceive(Object message) throws Exception {
String stackTrace = PcStringUtils.printStackTrace(cause);
cancelCancellable();
reply(null, true, errorSummary, stackTrace, PcConstants.NA,
PcConstants.NA_INT);
PcConstants.NA_INT, null);

break;

Expand All @@ -258,7 +282,7 @@ public void onReceive(Object message) throws Exception {
actorMaxOperationTimeoutSec);

reply(null, true, errorMsg, errorMsg, PcConstants.NA,
PcConstants.NA_INT);
PcConstants.NA_INT, null);
break;

case CHECK_FUTURE_STATE:
Expand Down Expand Up @@ -317,43 +341,69 @@ public void cancelCancellable() {
* the status code
* @param statusCodeInt
* the status code int
* @param responseHeaders
* the response headers
*/
private void reply(final String response, final boolean error,
final String errorMessage, final String stackTrace,
final String statusCode, final int statusCodeInt) {
final String statusCode, final int statusCodeInt,
Map<String, List<String>> responseHeaders) {

if (!sentReply) {
//must update sentReply first to avoid duplicated msg.
// must update sentReply first to avoid duplicated msg.
sentReply = true;

final ResponseOnSingeRequest res = new ResponseOnSingeRequest(
response, error, errorMessage, stackTrace, statusCode,
statusCodeInt, PcDateUtils.getNowDateTimeStrStandard());
statusCodeInt, PcDateUtils.getNowDateTimeStrStandard(),
responseHeaders);
if (!getContext().system().deadLetters().equals(sender)) {
sender.tell(res, getSelf());
}

getContext().stop(getSelf());
if (getContext() != null) {
getContext().stop(getSelf());
}
}

}

/**
* On complete.
* Save response headers when needed.
*
* @param response
* the response
* @return the response on singe request
* @return the response on single request
*/
public ResponseOnSingeRequest onComplete(Response response) {

cancelCancellable();
try {
Map<String, List<String>> responseHeaders = null;
if (responseHeaderMeta != null) {
responseHeaders = new LinkedHashMap<String, List<String>>();
if (responseHeaderMeta.isGetAll()) {
for (Map.Entry<String, List<String>> header : response
.getHeaders()) {
responseHeaders.put(header.getKey().toLowerCase(Locale.ROOT), header.getValue());
}
} else {
for (String key : responseHeaderMeta.getKeys()) {
if (response.getHeaders().containsKey(key)) {
responseHeaders.put(key.toLowerCase(Locale.ROOT),
response.getHeaders().get(key));
}
}
}
}

int statusCodeInt = response.getStatusCode();
String statusCode = statusCodeInt + " " + response.getStatusText();

reply(response.getResponseBody(), false, null, null, statusCode,
statusCodeInt);
String charset = ParallecGlobalConfig.httpResponseBodyCharsetUsesResponseContentType ?
AsyncHttpProviderUtils.parseCharset(response.getContentType())
: ParallecGlobalConfig.httpResponseBodyDefaultCharset;
reply(response.getResponseBody(charset), false, null, null, statusCode,
statusCodeInt, responseHeaders);
} catch (IOException e) {
getLogger().error("fail response.getResponseBody " + e);
}
Expand Down
Loading