diff --git a/gearman-client/src/main/java/net/johnewart/gearman/client/NetworkGearmanWorker.java b/gearman-client/src/main/java/net/johnewart/gearman/client/NetworkGearmanWorker.java index 49f2ba6..ce8011f 100644 --- a/gearman-client/src/main/java/net/johnewart/gearman/client/NetworkGearmanWorker.java +++ b/gearman-client/src/main/java/net/johnewart/gearman/client/NetworkGearmanWorker.java @@ -11,6 +11,7 @@ import net.johnewart.gearman.common.JobState; import net.johnewart.gearman.common.JobStatus; import net.johnewart.gearman.common.events.WorkEvent; +import net.johnewart.gearman.common.packets.request.WorkCompleteRequest; import net.johnewart.gearman.common.packets.response.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,7 +97,7 @@ public void doWork() jobConnectionMap.put(nextJob, c); WorkEvent workEvent = new WorkEvent(nextJob, this); result = callbacks.get(nextJob.getFunctionName()).process(workEvent); - c.sendPacket(new WorkCompleteResponse(nextJob.getJobHandle(), result)); + c.sendPacket(new WorkCompleteRequest(nextJob.getJobHandle(), result)); } } catch (IOException ioe) { diff --git a/gearman-common/src/main/java/net/johnewart/gearman/common/packets/request/WorkCompleteRequest.java b/gearman-common/src/main/java/net/johnewart/gearman/common/packets/request/WorkCompleteRequest.java new file mode 100644 index 0000000..eefca46 --- /dev/null +++ b/gearman-common/src/main/java/net/johnewart/gearman/common/packets/request/WorkCompleteRequest.java @@ -0,0 +1,17 @@ +package net.johnewart.gearman.common.packets.request; + +import net.johnewart.gearman.constants.PacketType; + +public class WorkCompleteRequest extends WorkDataRequest { + public WorkCompleteRequest(String jobhandle, byte[] data) + { + super(jobhandle, data); + this.type = PacketType.WORK_COMPLETE; + } + + public WorkCompleteRequest(byte[] pktdata) + { + super(pktdata); + this.type = PacketType.WORK_COMPLETE; + } +} \ No newline at end of file diff --git a/gearman-common/src/main/java/net/johnewart/gearman/common/packets/request/WorkDataRequest.java b/gearman-common/src/main/java/net/johnewart/gearman/common/packets/request/WorkDataRequest.java new file mode 100644 index 0000000..4e247a6 --- /dev/null +++ b/gearman-common/src/main/java/net/johnewart/gearman/common/packets/request/WorkDataRequest.java @@ -0,0 +1,52 @@ +package net.johnewart.gearman.common.packets.request; + +import net.johnewart.gearman.constants.PacketType; + +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicReference; + +public class WorkDataRequest extends RequestPacket implements WorkRequest { + + public AtomicReference jobHandle; + public byte[] data; + + public WorkDataRequest(String jobhandle, byte[] data) + { + this.jobHandle = new AtomicReference<>(jobhandle); + this.data = data.clone(); + this.type = PacketType.WORK_DATA; + } + + public WorkDataRequest(byte[] pktdata) + { + super(pktdata); + this.jobHandle = new AtomicReference<>(); + + int pOff = 0; + pOff = parseString(pOff, jobHandle); + data = Arrays.copyOfRange(rawdata, pOff, rawdata.length); + + } + + @Override + public byte[] toByteArray() + { + byte[] metadata = stringsToTerminatedByteArray(jobHandle.get()); + return concatByteArrays(getHeader(), metadata, data); + } + + public String getJobHandle() { + return jobHandle.get(); + } + + public int getPayloadSize() + { + return this.jobHandle.get().length() + 1 + + this.data.length; + } + + public byte[] getData() + { + return data; + } +} diff --git a/gearman-common/src/main/java/net/johnewart/gearman/common/packets/request/WorkRequest.java b/gearman-common/src/main/java/net/johnewart/gearman/common/packets/request/WorkRequest.java new file mode 100644 index 0000000..06248c9 --- /dev/null +++ b/gearman-common/src/main/java/net/johnewart/gearman/common/packets/request/WorkRequest.java @@ -0,0 +1,8 @@ +package net.johnewart.gearman.common.packets.request; + +import net.johnewart.gearman.constants.PacketType; + +public interface WorkRequest { + public abstract String getJobHandle(); + public abstract PacketType getType(); +}