diff --git a/README b/README index d342f78..02156e5 100644 --- a/README +++ b/README @@ -1,18 +1,18 @@ Introduction – -Octobot is a task queue worker designed for reliability, ease of use, and +Octobot is a task queue worker designed for reliability, ease of use, and throughput. -Octobot can listen on any number of queues, with any number of workers -processing messages from each. Each queue can be set at a custom priority to +Octobot can listen on any number of queues, with any number of workers +processing messages from each. Each queue can be set at a custom priority to ensure that more system resources are available for more important tasks. AMQP -/ RabbitMQ, Redis, and Beanstalk are supported as backends, with an extensible +/ RabbitMQ, Redis, and Beanstalk are supported as backends, with an extensible architecture to allow for additional backends to be added as needed. Architecture – -Octobotʼs internal architecture is a shared-nothing, threadsafe design that +Octobotʼs internal architecture is a shared-nothing, threadsafe design that allows for any number of concurrent queues and tasks to be processed at once, limited only by system resources. Taking full advantage of parallel execution, Octobotʼs ability to spawn multiple workers on multiple queues can be used to diff --git a/build.xml b/build.xml index 4352907..1210411 100644 --- a/build.xml +++ b/build.xml @@ -38,7 +38,7 @@ - + diff --git a/src/com/urbanairship/octobot/Introspector.java b/src/com/urbanairship/octobot/Introspector.java index 6c765ac..53a6404 100644 --- a/src/com/urbanairship/octobot/Introspector.java +++ b/src/com/urbanairship/octobot/Introspector.java @@ -43,7 +43,7 @@ public void run() { } logger.info("Introspector launched on port: " + port); - + while (true) { try { Socket socket = server.accept(); @@ -64,7 +64,7 @@ public void run() { @SuppressWarnings("unchecked") public String introspect() { HashMap metrics = new HashMap(); - + // Make a quick copy of our runtime metrics data. ArrayList instrumentedTasks; HashMap> executionTimes; @@ -85,21 +85,21 @@ public String introspect() { task.put("failures", taskFailures.get(taskName)); task.put("retries", taskRetries.get(taskName)); task.put("average_time", average(executionTimes.get(taskName))); - + metrics.put("task_" + taskName, task); } - + metrics.put("tasks_instrumented", instrumentedTasks.size()); metrics.put("alive_since", mx.getUptime() / (new Long("1000"))); - + return JSONValue.toJSONString(metrics); } - + // Calculate and return the mean execution time of our sample. private float average(LinkedList times) { if (times == null) return 0; - + long timeSum = 0; for (long time : times) timeSum += time; diff --git a/src/com/urbanairship/octobot/MailQueue.java b/src/com/urbanairship/octobot/MailQueue.java index e53c538..5eef260 100644 --- a/src/com/urbanairship/octobot/MailQueue.java +++ b/src/com/urbanairship/octobot/MailQueue.java @@ -62,7 +62,7 @@ public static int remainingCapacity() { // As this thread runs, it consumes messages from the internal queue and // delivers each to the recipients configured in the YML file. public void run() { - + if (!validSettings()) { logger.error("Email settings invalid; check your configuration."); return; diff --git a/src/com/urbanairship/octobot/Metrics.java b/src/com/urbanairship/octobot/Metrics.java index 274299c..8e67fed 100644 --- a/src/com/urbanairship/octobot/Metrics.java +++ b/src/com/urbanairship/octobot/Metrics.java @@ -8,7 +8,7 @@ public class Metrics { // Keep track of all tasks we've seen executed. protected static final ArrayList instrumentedTasks = new ArrayList(); - + // Keep track of average task throughput (last 10k runs per task). protected static final HashMap> executionTimes = new HashMap>(); @@ -20,7 +20,7 @@ public class Metrics { // Keep track of total failures by task. protected static final HashMap taskFailures = new HashMap(); - + // Keep track of total retries by task. protected static final HashMap taskRetries = new HashMap(); @@ -34,7 +34,7 @@ public static void update(String task, long time, synchronized(metricsLock) { if (!instrumentedTasks.contains(task)) instrumentedTasks.add(task); - + updateExecutionTimes(task, time); updateTaskRetries(task, retries); updateTaskResults(task, status); @@ -67,7 +67,7 @@ private static void updateTaskRetries(String task, int retries) { retriesForTask += retries; taskRetries.put(task, retriesForTask); } - } + } } diff --git a/src/com/urbanairship/octobot/Octobot.java b/src/com/urbanairship/octobot/Octobot.java index 28d41a7..16814f7 100644 --- a/src/com/urbanairship/octobot/Octobot.java +++ b/src/com/urbanairship/octobot/Octobot.java @@ -52,7 +52,7 @@ public static void main(String[] args) { logger.info("Launching Introspector..."); new Thread(new Introspector(), "Introspector").start(); - + logger.info("Launching Workers..."); List> queues = null; try { diff --git a/src/com/urbanairship/octobot/QueueConsumer.java b/src/com/urbanairship/octobot/QueueConsumer.java index c525dab..056a88f 100644 --- a/src/com/urbanairship/octobot/QueueConsumer.java +++ b/src/com/urbanairship/octobot/QueueConsumer.java @@ -91,7 +91,7 @@ private void consumeFromBeanstalk() { try { job = beanstalkClient.reserve(1); } catch (BeanstalkException e) { logger.error("Beanstalk connection error.", e); - beanstalkClient = Beanstalk.getBeanstalkChannel(queue.host, + beanstalkClient = Beanstalk.getBeanstalkChannel(queue.host, queue.port, queue.queueName); continue; } @@ -105,7 +105,7 @@ private void consumeFromBeanstalk() { try { beanstalkClient.delete(job.getJobId()); } catch (BeanstalkException e) { logger.error("Error sending message receipt.", e); - beanstalkClient = Beanstalk.getBeanstalkChannel(queue.host, + beanstalkClient = Beanstalk.getBeanstalkChannel(queue.host, queue.port, queue.queueName); } } @@ -210,7 +210,7 @@ public boolean invokeTask(String rawMessage) { errorMessage = "An error occurred while running the task."; logger.error(errorMessage, e); } - + if (executedSuccessfully) break; else retryCount++; } @@ -229,7 +229,7 @@ public boolean invokeTask(String rawMessage) { long finishedAt = System.nanoTime(); Metrics.update(taskName, finishedAt - startedAt, executedSuccessfully, retryCount); - + return executedSuccessfully; } diff --git a/src/com/urbanairship/octobot/TaskExecutor.java b/src/com/urbanairship/octobot/TaskExecutor.java index 42dba9a..b3eec80 100644 --- a/src/com/urbanairship/octobot/TaskExecutor.java +++ b/src/com/urbanairship/octobot/TaskExecutor.java @@ -11,7 +11,7 @@ public class TaskExecutor { new HashMap(); @SuppressWarnings("unchecked") - public static void execute(String taskName, JSONObject message) + public static void execute(String taskName, JSONObject message) throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException,