diff --git a/.gitignore b/.gitignore index 7692143cb7..d00c04e809 100644 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,4 @@ dependency-reduced-pom.xml /.metadata/ /workspace/ .vscode +gogo/command/.to diff --git a/gogo/runtime/pom.xml b/gogo/runtime/pom.xml index 9737db71bd..42c9f69f6d 100644 --- a/gogo/runtime/pom.xml +++ b/gogo/runtime/pom.xml @@ -85,6 +85,7 @@ org.apache.felix.service.command, org.apache.felix.service.command.annotations, org.apache.felix.service.threadio, + org.apache.felix.service.systemio, org.osgi.service.event.*; resolution:=optional, @@ -95,4 +96,31 @@ + + + felix + + true + + + + org.apache.felix + org.apache.felix.framework + test + + + + + equinox + + + org.eclipse + org.eclipse.osgi + 3.8.0.v20120529-1548 + test + + + + + diff --git a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/CommandSessionImpl.java b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/CommandSessionImpl.java index a2271b6605..4ef8a0081d 100644 --- a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/CommandSessionImpl.java +++ b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/CommandSessionImpl.java @@ -24,6 +24,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.io.PrintStream; +import java.lang.ref.WeakReference; import java.lang.reflect.Method; import java.lang.reflect.Modifier; import java.nio.channels.Channel; @@ -63,819 +64,845 @@ public class CommandSessionImpl implements CommandSession, Converter { - public static final String SESSION_CLOSED = "session is closed"; - public static final String VARIABLES = ".variables"; - public static final String COMMANDS = ".commands"; - public static final String CONSTANTS = ".constants"; - private static final String COLUMN = "%-20s %s\n"; - - // Streams and channels - protected InputStream in; - protected OutputStream out; - protected PrintStream pout; - protected OutputStream err; - protected PrintStream perr; - protected Channel[] channels; - - private final CommandProcessorImpl processor; - protected final ConcurrentMap variables = new ConcurrentHashMap<>(); - private volatile boolean closed; - private final List jobs = new ArrayList<>(); - private JobListener jobListener; - - private final ExecutorService executor; - - private Path currentDir; - private ClassLoader classLoader; - - protected CommandSessionImpl(CommandProcessorImpl shell, CommandSessionImpl parent) - { - this.currentDir = parent.currentDir; - this.executor = Executors.newCachedThreadPool(ThreadUtils.namedThreadFactory("session")); - this.processor = shell; - this.channels = parent.channels; - this.in = parent.in; - this.out = parent.out; - this.err = parent.err; - this.pout = parent.pout; - this.perr = parent.perr; - } - - protected CommandSessionImpl(CommandProcessorImpl shell, InputStream in, OutputStream out, OutputStream err) - { - this.currentDir = Paths.get(System.getProperty("user.dir")).toAbsolutePath().normalize(); - this.executor = Executors.newCachedThreadPool(ThreadUtils.namedThreadFactory("session")); - this.processor = shell; - ReadableByteChannel inCh = Channels.newChannel(in); - WritableByteChannel outCh = Channels.newChannel(out); - WritableByteChannel errCh = out == err ? outCh : Channels.newChannel(err); - this.channels = new Channel[] { inCh, outCh, errCh }; - this.in = in; - this.out = out; - this.err = err; - this.pout = out instanceof PrintStream ? (PrintStream) out : new PrintStream(out, true); - this.perr = out == err ? pout : err instanceof PrintStream ? (PrintStream) err : new PrintStream(err, true); - } - - ThreadIO threadIO() - { - return processor.threadIO; - } - - public CommandProcessor processor() - { - return processor; - } - - public ConcurrentMap getVariables() - { - return variables; - } - - public Path currentDir() - { - return currentDir; - } - - public void currentDir(Path path) - { - currentDir = path; - } - - public ClassLoader classLoader() - { - return classLoader != null ? classLoader : getClass().getClassLoader(); - } - - public void classLoader(ClassLoader classLoader) - { - this.classLoader = classLoader; - } - - public void close() - { - if (!this.closed) - { - this.closed = true; - this.processor.closeSession(this); - executor.shutdownNow(); - } - } - - public Object execute(CharSequence commandline) throws Exception - { - assert processor != null; - - if (closed) - { - throw new IllegalStateException(SESSION_CLOSED); - } - - processor.beforeExecute(this, commandline); - - try - { - Closure impl = new Closure(this, null, commandline); - Object result = impl.execute(this, null); - processor.afterExecute(this, commandline, result); - return result; - } - catch (Exception e) - { - processor.afterExecute(this, commandline, e); - throw e; - } - } - - public InputStream getKeyboard() - { - return in; - } - - public Object get(String name) - { - // there is no API to list all variables, so overload name == null - if (name == null || VARIABLES.equals(name)) - { - return Collections.unmodifiableSet(variables.keySet()); - } - - if (COMMANDS.equals(name)) - { - return processor.getCommands(); - } - - if (CONSTANTS.equals(name)) - { - return Collections.unmodifiableSet(processor.constants.keySet()); - } - - Object val = processor.constants.get(name); - if (val != null) - { - return val; - } - - val = variables.get("#" + name); - if (val instanceof Function) - { - try - { - val = ((Function) val).execute(this, null); - } - catch (Exception e) - { - // Ignore - } - return val; - } - else if (val != null) - { - return val; - } - - val = variables.get(name); - if (val != null) - { - return val; - } - - return processor.getCommand(name, variables.get("SCOPE")); - } - - public Object put(String name, Object value) - { - if (value != null) - { - return variables.put(name, value); - } - else - { - return variables.remove(name); - } - } - - public PrintStream getConsole() - { - return pout; - } - - @SuppressWarnings("unchecked") - public CharSequence format(Object target, int level, Converter escape) throws Exception - { - if (target == null) - { - return "null"; - } - - if (target instanceof CharSequence) - { - return (CharSequence) target; - } - - for (Converter c : processor.converters) - { - CharSequence s = c.format(target, level, this); - if (s != null) - { - return s; - } - } - - if (target.getClass().isArray()) - { - if (target.getClass().getComponentType().isPrimitive()) - { - if (target.getClass().getComponentType() == boolean.class) - { - return Arrays.toString((boolean[]) target); - } - else - { - if (target.getClass().getComponentType() == byte.class) - { - return Arrays.toString((byte[]) target); - } - else - { - if (target.getClass().getComponentType() == short.class) - { - return Arrays.toString((short[]) target); - } - else - { - if (target.getClass().getComponentType() == int.class) - { - return Arrays.toString((int[]) target); - } - else - { - if (target.getClass().getComponentType() == long.class) - { - return Arrays.toString((long[]) target); - } - else - { - if (target.getClass().getComponentType() == float.class) - { - return Arrays.toString((float[]) target); - } - else - { - if (target.getClass().getComponentType() == double.class) - { - return Arrays.toString((double[]) target); - } - else - { - if (target.getClass().getComponentType() == char.class) - { - return Arrays.toString((char[]) target); - } - } - } - } - } - } - } - } - } - target = Arrays.asList((Object[]) target); - } - if (target instanceof Collection) - { - if (level == Converter.INSPECT) - { - StringBuilder sb = new StringBuilder(); - Collection c = (Collection) target; - for (Object o : c) - { - sb.append(format(o, level + 1, this)); - sb.append("\n"); - } - return sb; + public static final String SESSION_CLOSED = "session is closed"; + public static final String VARIABLES = ".variables"; + public static final String COMMANDS = ".commands"; + public static final String CONSTANTS = ".constants"; + private static final String COLUMN = "%-20s %s\n"; + + // Streams and channels + protected InputStream in; + protected OutputStream out; + protected PrintStream pout; + protected OutputStream err; + protected PrintStream perr; + protected Channel[] channels; + + private final CommandProcessorImpl processor; + protected final ConcurrentMap variables = new ConcurrentHashMap<>(); + private volatile boolean closed; + private final List jobs = new ArrayList<>(); + private JobListener jobListener; + private final List> onClose = new ArrayList<>(); + + private final ExecutorService executor; + + private Path currentDir; + private ClassLoader classLoader; + + protected CommandSessionImpl(CommandProcessorImpl shell, CommandSessionImpl parent) + { + this.currentDir = parent.currentDir; + this.executor = Executors.newCachedThreadPool(ThreadUtils.namedThreadFactory("session")); + this.processor = shell; + this.channels = parent.channels; + this.in = parent.in; + this.out = parent.out; + this.err = parent.err; + this.pout = parent.pout; + this.perr = parent.perr; + } + + protected CommandSessionImpl(CommandProcessorImpl shell, InputStream in, OutputStream out, OutputStream err) + { + this.currentDir = Paths.get(System.getProperty("user.dir")).toAbsolutePath().normalize(); + this.executor = Executors.newCachedThreadPool(ThreadUtils.namedThreadFactory("session")); + this.processor = shell; + ReadableByteChannel inCh = Channels.newChannel(in); + WritableByteChannel outCh = Channels.newChannel(out); + WritableByteChannel errCh = out == err ? outCh : Channels.newChannel(err); + this.channels = new Channel[] {inCh, outCh, errCh}; + this.in = in; + this.out = out; + this.err = err; + this.pout = out instanceof PrintStream ? (PrintStream) out : new PrintStream(out, true); + this.perr = out == err ? pout : err instanceof PrintStream ? (PrintStream) err : new PrintStream(err, true); + } + + ThreadIO threadIO() + { + return processor.threadIO; + } + + public CommandProcessor processor() + { + return processor; + } + + public ConcurrentMap getVariables() + { + return variables; + } + + public Path currentDir() + { + return currentDir; + } + + public void currentDir(Path path) + { + currentDir = path; + } + + public ClassLoader classLoader() + { + return classLoader != null ? classLoader : getClass().getClassLoader(); + } + + public void classLoader(ClassLoader classLoader) + { + this.classLoader = classLoader; + } + + public void close() + { + if (!this.closed) + { + for (WeakReference r : onClose) + { + Runnable runnable = r.get(); + if (runnable != null) + try + { + runnable.run(); + } + catch (Exception e) + { + // ignore, best effort + } + } + this.closed = true; + this.processor.closeSession(this); + executor.shutdownNow(); + } + } + + public Object execute(CharSequence commandline) throws Exception + { + assert processor != null; + + if (closed) + { + throw new IllegalStateException(SESSION_CLOSED); + } + + processor.beforeExecute(this, commandline); + + try + { + Closure impl = new Closure(this, null, commandline); + Object result = impl.execute(this, null); + processor.afterExecute(this, commandline, result); + return result; + } + catch (Exception e) + { + processor.afterExecute(this, commandline, e); + throw e; + } + } + + public InputStream getKeyboard() + { + return in; + } + + public Object get(String name) + { + // there is no API to list all variables, so overload name == null + if (name == null || VARIABLES.equals(name)) + { + return Collections.unmodifiableSet(variables.keySet()); + } + + if (COMMANDS.equals(name)) + { + return processor.getCommands(); + } + + if (CONSTANTS.equals(name)) + { + return Collections.unmodifiableSet(processor.constants.keySet()); + } + + Object val = processor.constants.get(name); + if (val != null) + { + return val; + } + + val = variables.get("#" + name); + if (val instanceof Function) + { + try + { + val = ((Function) val).execute(this, null); + } + catch (Exception e) + { + // Ignore + } + return val; + } + else if (val != null) + { + return val; + } + + val = variables.get(name); + if (val != null) + { + return val; + } + + return processor.getCommand(name, variables.get("SCOPE")); + } + + public Object put(String name, Object value) + { + if (value != null) + { + return variables.put(name, value); + } + else + { + return variables.remove(name); + } + } + + public PrintStream getConsole() + { + return pout; + } + + @SuppressWarnings("unchecked") + public CharSequence format(Object target, int level, Converter escape) throws Exception + { + if (target == null) + { + return "null"; + } + + if (target instanceof CharSequence) + { + return (CharSequence) target; + } + + for (Converter c : processor.converters) + { + CharSequence s = c.format(target, level, this); + if (s != null) + { + return s; + } + } + + if (target.getClass().isArray()) + { + if (target.getClass().getComponentType().isPrimitive()) + { + if (target.getClass().getComponentType() == boolean.class) + { + return Arrays.toString((boolean[]) target); } else { - if (level == Converter.LINE) - { - StringBuilder sb = new StringBuilder(); - Collection c = (Collection) target; - sb.append("["); - for (Object o : c) - { - if (sb.length() > 1) + if (target.getClass().getComponentType() == byte.class) + { + return Arrays.toString((byte[]) target); + } + else + { + if (target.getClass().getComponentType() == short.class) + { + return Arrays.toString((short[]) target); + } + else + { + if (target.getClass().getComponentType() == int.class) + { + return Arrays.toString((int[]) target); + } + else + { + if (target.getClass().getComponentType() == long.class) { - sb.append(", "); + return Arrays.toString((long[]) target); } - sb.append(format(o, level + 1, this)); - } - sb.append("]"); - return sb; - } - } - } - if (target instanceof Dictionary) - { - Map result = new HashMap<>(); - for (Enumeration e = ((Dictionary) target).keys(); e.hasMoreElements();) - { - Object key = e.nextElement(); - result.put(key, ((Dictionary) target).get(key)); - } - target = result; - } - if (target instanceof Map) - { - if (level == Converter.INSPECT) - { - StringBuilder sb = new StringBuilder(); - Map c = (Map) target; - for (Map.Entry entry : c.entrySet()) - { - CharSequence key = format(entry.getKey(), level + 1, this); - sb.append(key); - for (int i = key.length(); i < 20; i++) - { - sb.append(' '); - } - sb.append(format(entry.getValue(), level + 1, this)); - sb.append("\n"); - } - return sb; - } - else - { - if (level == Converter.LINE) - { - StringBuilder sb = new StringBuilder(); - Map c = (Map) target; - sb.append("["); - for (Map.Entry entry : c.entrySet()) - { - if (sb.length() > 1) + else { - sb.append(", "); + if (target.getClass().getComponentType() == float.class) + { + return Arrays.toString((float[]) target); + } + else + { + if (target.getClass().getComponentType() == double.class) + { + return Arrays.toString((double[]) target); + } + else + { + if (target.getClass().getComponentType() == char.class) + { + return Arrays.toString((char[]) target); + } + } + } } - sb.append(format(entry, level + 1, this)); - } - sb.append("]"); - return sb; - } - } - } - if (target instanceof Path) - { - return target.toString(); - } - if (level == Converter.INSPECT) - { - return inspect(target); - } - else - { - return target.toString(); - } - } - - CharSequence inspect(Object b) - { - boolean found = false; - try (Formatter f = new Formatter();) - { - Method methods[] = b.getClass().getMethods(); - for (Method m : methods) - { - try - { - String name = m.getName(); - if (m.getName().startsWith("get") && !m.getName().equals("getClass") && m.getParameterTypes().length == 0 && Modifier.isPublic(m.getModifiers())) - { - found = true; - name = name.substring(3); - m.setAccessible(true); - Object value = m.invoke(b, (Object[]) null); - f.format(COLUMN, name, format(value, Converter.LINE, this)); - } - } - catch (Exception e) - { - // Ignore - } - } - if (found) - { - return (StringBuilder) f.out(); - } - else - { - return b.toString(); - } - } - } - - public Object convert(Class desiredType, Object in) - { - return processor.convert(this, desiredType, in); - } - - public Object doConvert(Class desiredType, Object in) - { - if (desiredType == Class.class) - { + } + } + } + } + } + target = Arrays.asList((Object[]) target); + } + if (target instanceof Collection) + { + if (level == Converter.INSPECT) + { + StringBuilder sb = new StringBuilder(); + Collection c = (Collection) target; + for (Object o : c) + { + sb.append(format(o, level + 1, this)); + sb.append("\n"); + } + return sb; + } + else + { + if (level == Converter.LINE) + { + StringBuilder sb = new StringBuilder(); + Collection c = (Collection) target; + sb.append("["); + for (Object o : c) + { + if (sb.length() > 1) + { + sb.append(", "); + } + sb.append(format(o, level + 1, this)); + } + sb.append("]"); + return sb; + } + } + } + if (target instanceof Dictionary) + { + Map result = new HashMap<>(); + for (Enumeration e = ((Dictionary) target).keys(); e.hasMoreElements();) + { + Object key = e.nextElement(); + result.put(key, ((Dictionary) target).get(key)); + } + target = result; + } + if (target instanceof Map) + { + if (level == Converter.INSPECT) + { + StringBuilder sb = new StringBuilder(); + Map c = (Map) target; + for (Map.Entry entry : c.entrySet()) + { + CharSequence key = format(entry.getKey(), level + 1, this); + sb.append(key); + for (int i = key.length(); i < 20; i++) + { + sb.append(' '); + } + sb.append(format(entry.getValue(), level + 1, this)); + sb.append("\n"); + } + return sb; + } + else + { + if (level == Converter.LINE) + { + StringBuilder sb = new StringBuilder(); + Map c = (Map) target; + sb.append("["); + for (Map.Entry entry : c.entrySet()) + { + if (sb.length() > 1) + { + sb.append(", "); + } + sb.append(format(entry, level + 1, this)); + } + sb.append("]"); + return sb; + } + } + } + if (target instanceof Path) + { + return target.toString(); + } + if (level == Converter.INSPECT) + { + return inspect(target); + } + else + { + return target.toString(); + } + } + + CharSequence inspect(Object b) + { + boolean found = false; + try (Formatter f = new Formatter();) + { + Method methods[] = b.getClass().getMethods(); + for (Method m : methods) + { try { - return Class.forName(in.toString(), true, classLoader()); - } - catch (ClassNotFoundException e) - { - return null; - } - } - return processor.doConvert(desiredType, in); - } - - public CharSequence format(Object result, int inspect) - { - try - { - return format(result, inspect, this); - } - catch (Exception e) - { - return " args) throws Exception - { - return processor.invoke(this, target, name, args); - } - - public Path redirect(Path path, int mode) - { - return processor.redirect(this, path, mode); - } - - @Override - public List jobs() - { - synchronized (jobs) - { - return Collections.unmodifiableList(jobs); - } - } - - public static JobImpl currentJob() - { - return (JobImpl) Job.Utils.current(); - } - - @Override - public JobImpl foregroundJob() - { - List jobs; - synchronized (this.jobs) - { - jobs = new ArrayList<>(this.jobs); - } - for (JobImpl j : jobs) { - if (j.parent == null && j.status() == Status.Foreground) { - return j; - } - } - return null; - } - - @Override - public void setJobListener(JobListener listener) - { - synchronized (jobs) - { - jobListener = listener; - } - } - - public JobImpl createJob(CharSequence command) - { - synchronized (jobs) - { - int id = 1; - - boolean found; - do - { - found = false; - for (Job job : jobs) - { - if (job.id() == id) - { - found = true; - id++; - break; - } - } - } - while (found); - - JobImpl cur = currentJob(); - JobImpl job = new JobImpl(id, cur, command); - if (cur == null) - { - jobs.add(job); - } - else - { - cur.add(job); - } - return job; - } - } - - class JobImpl implements Job, Runnable - { - private final int id; - private final JobImpl parent; - private final CharSequence command; - private final List pipes = new ArrayList<>(); - private final List children = new ArrayList<>(); - private Status status = Status.Created; - private Future future; - private Result result; - - public JobImpl(int id, JobImpl parent, CharSequence command) - { - this.id = id; - this.parent = parent; - this.command = command; - } - - void addPipe(Pipe pipe) - { - pipes.add(pipe); - } - - @Override - public int id() - { - return id; - } - - public CharSequence command() - { - return command; - } - - @Override - public synchronized Status status() - { - return status; - } - - @Override - public synchronized void suspend() - { - if (status == Status.Done) - { - throw new IllegalStateException("Job is finished"); - } - if (status != Status.Suspended) - { - setStatus(Status.Suspended); - } - } - - @Override - public synchronized void background() - { - if (status == Status.Done) - { - throw new IllegalStateException("Job is finished"); - } - if (status != Status.Background) - { - setStatus(Status.Background); - } - } - - @Override - public synchronized void foreground() - { - if (status == Status.Done) - { - throw new IllegalStateException("Job is finished"); - } - JobImpl cr = CommandSessionImpl.currentJob(); - JobImpl fg = foregroundJob(); - if (parent == null && fg != null && fg != this && fg != cr) - { - throw new IllegalStateException("A job is already in foreground"); - } - if (status != Status.Foreground) - { - setStatus(Status.Foreground); - } - } - - @Override - public void interrupt() - { - Future future; - List children; - synchronized (this) - { - future = this.future; - } - synchronized (this.children) - { - children = new ArrayList<>(this.children); - } - for (Job job : children) - { - job.interrupt(); - } - if (future != null) - { - future.cancel(true); - } - } - - protected synchronized void done() - { - if (status == Status.Done) - { - throw new IllegalStateException("Job is finished"); - } - setStatus(Status.Done); - } - - private void setStatus(Status newStatus) - { - setStatus(newStatus, true); - } - - private void setStatus(Status newStatus, boolean callListeners) - { - Status previous; - synchronized (this) - { - previous = this.status; - status = newStatus; - } - if (callListeners) - { - JobListener listener; - synchronized (jobs) - { - listener = jobListener; - if (newStatus == Status.Done) - { - jobs.remove(this); - } - } - if (listener != null) - { - listener.jobChanged(this, previous, newStatus); - } - } - synchronized (this) - { - JobImpl.this.notifyAll(); - } - } - - @Override - public synchronized Result result() - { - return result; - } - - @Override - public Job parent() - { - return parent; - } - - /** - * Start the job. - * If the job is started in foreground, - * waits for the job to finish or to be - * suspended or moved to background. - * - * @param status the desired job status - * @return null if the job - * has been suspended or moved to background, - * - */ - public synchronized Result start(Status status) throws InterruptedException - { - if (status == Status.Created || status == Status.Done) - { - throw new IllegalArgumentException("Illegal start status"); - } - if (this.status != Status.Created) - { - throw new IllegalStateException("Job already started"); - } - switch (status) - { - case Suspended: - suspend(); - break; - case Background: - background(); - break; - case Foreground: - foreground(); - break; - case Created: - case Done: - } - future = executor.submit(this); - while (this.status == Status.Foreground) - { - JobImpl.this.wait(); - } - return result; - } - - public List processes() - { - return Collections.unmodifiableList((List)pipes); - } - - @Override - public CommandSession session() - { - return CommandSessionImpl.this; - } - - public void run() - { - Thread thread = Thread.currentThread(); - String name = thread.getName(); - try - { - thread.setName("job controller " + id); - - List> wrapped = new ArrayList>(pipes); - List> results = executor.invokeAll(wrapped); - - // Get pipe exceptions - Exception pipeException = null; - for (int i = 0; i < results.size() - 1; i++) - { - Future future = results.get(i); - Throwable e; - try - { - Result r = future.get(); - e = r.exception; - } - catch (ExecutionException ee) - { - e = ee.getCause(); - } - if (e != null) - { - if (pipeException == null) - { - pipeException = new Exception("Exception caught during pipe execution"); - } - pipeException.addSuppressed(e); - } - } - put(Closure.PIPE_EXCEPTION, pipeException); - - result = results.get(results.size() - 1).get(); + String name = m.getName(); + if (m.getName().startsWith("get") && !m.getName().equals("getClass") && m.getParameterTypes().length == 0 + && Modifier.isPublic(m.getModifiers())) + { + found = true; + name = name.substring(3); + m.setAccessible(true); + Object value = m.invoke(b, (Object[]) null); + f.format(COLUMN, name, format(value, Converter.LINE, this)); + } } catch (Exception e) { - result = new Result(e); - } - catch (Throwable t) - { - result = new Result(new ExecutionException(t)); - } - finally - { - done(); - thread.setName(name); - } - } - - public void add(Job child) - { - synchronized (children) - { - children.add(child); - } - } - } + // Ignore + } + } + if (found) + { + return (StringBuilder) f.out(); + } + else + { + return b.toString(); + } + } + } + + public Object convert(Class desiredType, Object in) + { + return processor.convert(this, desiredType, in); + } + + public Object doConvert(Class desiredType, Object in) + { + if (desiredType == Class.class) + { + try + { + return Class.forName(in.toString(), true, classLoader()); + } + catch (ClassNotFoundException e) + { + return null; + } + } + return processor.doConvert(desiredType, in); + } + + public CharSequence format(Object result, int inspect) + { + try + { + return format(result, inspect, this); + } + catch (Exception e) + { + return " args) throws Exception + { + return processor.invoke(this, target, name, args); + } + + public Path redirect(Path path, int mode) + { + return processor.redirect(this, path, mode); + } + + @Override + public List jobs() + { + synchronized (jobs) + { + return Collections.unmodifiableList(jobs); + } + } + + public static JobImpl currentJob() + { + return (JobImpl) Job.Utils.current(); + } + + @Override + public JobImpl foregroundJob() + { + List jobs; + synchronized (this.jobs) + { + jobs = new ArrayList<>(this.jobs); + } + for (JobImpl j : jobs) + { + if (j.parent == null && j.status() == Status.Foreground) + { + return j; + } + } + return null; + } + + @Override + public void setJobListener(JobListener listener) + { + synchronized (jobs) + { + jobListener = listener; + } + } + + public JobImpl createJob(CharSequence command) + { + synchronized (jobs) + { + int id = 1; + + boolean found; + do + { + found = false; + for (Job job : jobs) + { + if (job.id() == id) + { + found = true; + id++; + break; + } + } + } + while (found); + + JobImpl cur = currentJob(); + JobImpl job = new JobImpl(id, cur, command); + if (cur == null) + { + jobs.add(job); + } + else + { + cur.add(job); + } + return job; + } + } + + class JobImpl implements Job, Runnable + { + private final int id; + private final JobImpl parent; + private final CharSequence command; + private final List pipes = new ArrayList<>(); + private final List children = new ArrayList<>(); + private Status status = Status.Created; + private Future future; + private Result result; + + public JobImpl(int id, JobImpl parent, CharSequence command) + { + this.id = id; + this.parent = parent; + this.command = command; + } + + void addPipe(Pipe pipe) + { + pipes.add(pipe); + } + + @Override + public int id() + { + return id; + } + + public CharSequence command() + { + return command; + } + + @Override + public synchronized Status status() + { + return status; + } + + @Override + public synchronized void suspend() + { + if (status == Status.Done) + { + throw new IllegalStateException("Job is finished"); + } + if (status != Status.Suspended) + { + setStatus(Status.Suspended); + } + } + + @Override + public synchronized void background() + { + if (status == Status.Done) + { + throw new IllegalStateException("Job is finished"); + } + if (status != Status.Background) + { + setStatus(Status.Background); + } + } + + @Override + public synchronized void foreground() + { + if (status == Status.Done) + { + throw new IllegalStateException("Job is finished"); + } + JobImpl cr = CommandSessionImpl.currentJob(); + JobImpl fg = foregroundJob(); + if (parent == null && fg != null && fg != this && fg != cr) + { + throw new IllegalStateException("A job is already in foreground"); + } + if (status != Status.Foreground) + { + setStatus(Status.Foreground); + } + } + + @Override + public void interrupt() + { + Future future; + List children; + synchronized (this) + { + future = this.future; + } + synchronized (this.children) + { + children = new ArrayList<>(this.children); + } + for (Job job : children) + { + job.interrupt(); + } + if (future != null) + { + future.cancel(true); + } + } + + protected synchronized void done() + { + if (status == Status.Done) + { + throw new IllegalStateException("Job is finished"); + } + setStatus(Status.Done); + } + + private void setStatus(Status newStatus) + { + setStatus(newStatus, true); + } + + private void setStatus(Status newStatus, boolean callListeners) + { + Status previous; + synchronized (this) + { + previous = this.status; + status = newStatus; + } + if (callListeners) + { + JobListener listener; + synchronized (jobs) + { + listener = jobListener; + if (newStatus == Status.Done) + { + jobs.remove(this); + } + } + if (listener != null) + { + listener.jobChanged(this, previous, newStatus); + } + } + synchronized (this) + { + JobImpl.this.notifyAll(); + } + } + + @Override + public synchronized Result result() + { + return result; + } + + @Override + public Job parent() + { + return parent; + } + + /** + * Start the job. If the job is started in foreground, waits for the job to finish or to be suspended or moved to + * background. + * + * @param status the desired job status + * @return null if the job has been suspended or moved to background, + * + */ + public synchronized Result start(Status status) throws InterruptedException + { + if (status == Status.Created || status == Status.Done) + { + throw new IllegalArgumentException("Illegal start status"); + } + if (this.status != Status.Created) + { + throw new IllegalStateException("Job already started"); + } + switch (status) + { + case Suspended: + suspend(); + break; + case Background: + background(); + break; + case Foreground: + foreground(); + break; + case Created: + case Done: + } + future = executor.submit(this); + while (this.status == Status.Foreground) + { + JobImpl.this.wait(); + } + return result; + } + + public List processes() + { + return Collections.unmodifiableList((List) pipes); + } + + @Override + public CommandSession session() + { + return CommandSessionImpl.this; + } + + public void run() + { + Thread thread = Thread.currentThread(); + String name = thread.getName(); + try + { + thread.setName("job controller " + id); + + List> wrapped = new ArrayList>(pipes); + List> results = executor.invokeAll(wrapped); + + // Get pipe exceptions + Exception pipeException = null; + for (int i = 0; i < results.size() - 1; i++) + { + Future future = results.get(i); + Throwable e; + try + { + Result r = future.get(); + e = r.exception; + } + catch (ExecutionException ee) + { + e = ee.getCause(); + } + if (e != null) + { + if (pipeException == null) + { + pipeException = new Exception("Exception caught during pipe execution"); + } + pipeException.addSuppressed(e); + } + } + put(Closure.PIPE_EXCEPTION, pipeException); + + result = results.get(results.size() - 1).get(); + } + catch (Exception e) + { + result = new Result(e); + } + catch (Throwable t) + { + result = new Result(new ExecutionException(t)); + } + finally + { + done(); + thread.setName(name); + } + } + + public void add(Job child) + { + synchronized (children) + { + children.add(child); + } + } + } + + /** + * Register a runnable that is run when the session is closed. This should be a quick running method. + *

+ * The session will maintain a weak reference to this runnable. This means that if you do not hold a reference to it, + * it will be garbage collected to prevent class leaks when bundles are stopped. So make sure to hold a strong reference. + */ + @Override + public void onClose(Runnable runnable) + { + onClose.add(new WeakReference<>(runnable)); + } } diff --git a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/activator/Activator.java b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/activator/Activator.java index 7f4f2420e0..07f67372dd 100644 --- a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/activator/Activator.java +++ b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/activator/Activator.java @@ -18,6 +18,7 @@ */ package org.apache.felix.gogo.runtime.activator; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -27,10 +28,12 @@ import org.apache.felix.gogo.runtime.CommandProcessorImpl; import org.apache.felix.gogo.runtime.CommandProxy; +import org.apache.felix.gogo.runtime.systemio.SystemIOImpl; import org.apache.felix.gogo.runtime.threadio.ThreadIOImpl; import org.apache.felix.service.command.CommandSessionListener; import org.apache.felix.service.command.CommandProcessor; import org.apache.felix.service.command.Converter; +import org.apache.felix.service.systemio.SystemIO; import org.apache.felix.service.threadio.ThreadIO; import org.osgi.annotation.bundle.Header; import org.osgi.framework.BundleActivator; @@ -46,16 +49,19 @@ public class Activator implements BundleActivator { protected CommandProcessorImpl processor; + private SystemIOImpl systemio; private ThreadIOImpl threadio; private ServiceTracker commandTracker; private ServiceTracker converterTracker; private ServiceTracker listenerTracker; private ServiceRegistration processorRegistration; private ServiceRegistration threadioRegistration; + private ServiceRegistration systemioRegistration; + private ServiceFacade systemioFacade; public static final String CONTEXT = ".context"; - protected ServiceRegistration newProcessor(ThreadIO tio, BundleContext context) + protected ServiceRegistration newProcessor(ThreadIO tio, SystemIO sio, BundleContext context) { processor = new CommandProcessorImpl(tio); try @@ -78,11 +84,22 @@ protected ServiceRegistration newProcessor(ThreadIO tio, BundleContext contex public void start(final BundleContext context) throws Exception { - threadio = new ThreadIOImpl(); + long timeout = toLong(context.getProperty(SystemIO.TIMEOUT)); + if ( timeout <= 0 ) { + systemio = new SystemIOImpl(); + systemio.start(); + systemioRegistration = context.registerService(SystemIO.class.getName(), systemio, null); + threadio = new ThreadIOImpl(systemio); + } else { + systemioFacade = new ServiceFacade<>(SystemIO.class, context, timeout); + SystemIO systemio = systemioFacade.get(); + threadio = new ThreadIOImpl(systemio); + } threadio.start(); threadioRegistration = context.registerService(ThreadIO.class.getName(), threadio, null); - processorRegistration = newProcessor(threadio, context); + + processorRegistration = newProcessor(threadio, systemio, context); commandTracker = trackOSGiCommands(context); commandTracker.open(); @@ -134,6 +151,19 @@ public void stop(BundleContext context) { listenerTracker.close(); threadio.stop(); processor.stop(); + if( systemioRegistration != null) { + systemioRegistration.unregister(); + systemio.stop(); + } else { + try + { + systemioFacade.close(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } } private ServiceTracker trackOSGiCommands(final BundleContext context) @@ -212,5 +242,16 @@ public void removedService(ServiceReference reference, List serv } }; } + + private long toLong(String v) { + if ( v != null) { + try { + return Long.parseLong(v); + } catch( NumberFormatException nfe) { + // ignore + } + } + return Long.MIN_VALUE; + } } diff --git a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/activator/ServiceFacade.java b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/activator/ServiceFacade.java new file mode 100644 index 0000000000..61b697c698 --- /dev/null +++ b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/activator/ServiceFacade.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.felix.gogo.runtime.activator; + +import java.io.Closeable; +import java.io.IOException; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; + +import org.osgi.framework.BundleContext; +import org.osgi.framework.ServiceException; +import org.osgi.util.tracker.ServiceTracker; + +public class ServiceFacade implements Closeable +{ + final ServiceTracker tracker; + final S facade; + + @SuppressWarnings("unchecked") + public ServiceFacade(final Class clazz, BundleContext context, final long timeout) + { + this.tracker = new ServiceTracker<>(context, clazz, null); + this.tracker.open(); + this.facade = (S) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[] {clazz}, new InvocationHandler() { + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable + { + try + { + S s = tracker.waitForService(timeout); + if ( s == null) + throw new ServiceException( "No such service " + clazz.getName() + ", waited " + timeout + "ms"); + return method.invoke(s, args); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + + }); + } + + public S get() { + return facade; + } + + @Override + public void close() throws IOException + { + tracker.close(); + } +} diff --git a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/systemio/DelegateStream.java b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/systemio/DelegateStream.java new file mode 100644 index 0000000000..3026fd3594 --- /dev/null +++ b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/systemio/DelegateStream.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.felix.gogo.runtime.systemio; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.List; + +class DelegateStream extends OutputStream +{ + + final List outs; + + + DelegateStream(List outs) + { + this.outs = outs; + } + + @Override + public void write(int b) throws IOException + { + for ( OutputStream out : outs) try { + out.write(b); + } catch( Exception e) { + // ignore + } + } + + @Override + public void write(byte b[]) throws IOException + { + write(b, 0, b.length); + } + + @Override + public void write(byte b[], int off, int len) throws IOException + { + for ( OutputStream out : outs) try { + out.write(b, off, len); + } catch( Exception e) { + // ignore + } + } + + + @Override + public void flush() + { + for ( OutputStream out : outs) try { + out.flush(); + } catch( Exception e) { + // ignore + } + + } + +} diff --git a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/systemio/SystemIOImpl.java b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/systemio/SystemIOImpl.java new file mode 100644 index 0000000000..17a95dfebc --- /dev/null +++ b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/systemio/SystemIOImpl.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.felix.gogo.runtime.systemio; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PrintStream; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.logging.Logger; + +import org.apache.felix.service.systemio.SystemIO; +import org.osgi.annotation.bundle.Capability; +import org.osgi.namespace.service.ServiceNamespace; + +/** + * Implement the SystemIO API to allow the System streams to be overridden + */ +@Capability(namespace = ServiceNamespace.SERVICE_NAMESPACE, + attribute = "objectClass='org.apache.felix.service.systemio.SystemIO'") +public class SystemIOImpl extends InputStream implements SystemIO +{ + static private final Logger log = Logger.getLogger(SystemIOImpl.class.getName()); + + final List stdins = new CopyOnWriteArrayList<>(); + final List stdouts = new CopyOnWriteArrayList<>(); + final List stderrs = new CopyOnWriteArrayList<>(); + + final InputStream in = System.in; + final PrintStream out = System.out; + final PrintStream err = System.err; + + private PrintStream rout; + private PrintStream rerr; + + + public void start() + { + stdins.add(in); + stdouts.add(out); + stderrs.add(err); + rout = new PrintStream(new DelegateStream(stdouts), true); + rerr = new PrintStream(new DelegateStream(stderrs), true); + System.setOut(rout); + System.setErr(rerr); + System.setIn(this); + } + + public void stop() + { + if (System.in == this) + { + System.setIn(in); + } + else + { + log.warning("conflict: the dispatching input stream was replaced"); + } + if (System.out == rout) + { + System.setOut(out); + } + else + { + log.warning("conflict: the dispatching stdout stream was replaced"); + } + if (System.err == rerr) + { + System.setErr(err); + } + else + { + log.warning("conflict: the dispatching stderr stream was replaced"); + } + } + + @Override + public Closeable system(final InputStream stdin, final OutputStream stdout, final OutputStream stderr) + { + if (stdin != null && stdin != System.in && stdin != in) + { + stdins.add(0,stdin); + } + if (stdout != null) + stdouts.add(stdout); + if (stderr != null) + stderrs.add(stderr); + return new Closeable() + { + @Override + public void close() throws IOException + { + if (stdin != null && stdin != System.in && stdin != in) + { + int inInFront = stdins.indexOf(stdin); + assert inInFront >= 0; + stdins.remove(inInFront); + assert stdins.size() > 0; + } + if (stdout != null) + stdouts.remove(stdout); + if (stderr != null) + stderrs.remove(stderr); + } + }; + } + + @Override + public int read() throws IOException + { + assert stdins.size() > 0; + for ( InputStream in : stdins) { + int b = in.read(); + if ( b != SystemIO.NO_DATA) + return b; + } + return -1; // unreachable because stdin is at the end + } + +} diff --git a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/threadio/Marker.java b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/threadio/Marker.java deleted file mode 100644 index 84dc7cef8e..0000000000 --- a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/threadio/Marker.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.felix.gogo.runtime.threadio; - -import java.io.InputStream; -import java.io.PrintStream; - -public class Marker -{ - final Marker previous; - InputStream in; - PrintStream out; - PrintStream err; - volatile boolean deactivated; - - public Marker(InputStream in, PrintStream out, PrintStream err, Marker previous) - { - this.previous = previous; - this.in = in; - this.out = out; - this.err = err; - } - - public InputStream getIn() - { - return deactivated ? previous.getIn() : in; - } - - public PrintStream getOut() - { - return deactivated ? previous.getOut() : out; - } - - public PrintStream getErr() - { - return deactivated ? previous.getErr() : err; - } - - void deactivate() - { - deactivated = true; - // Set to null for garbage collection - in = null; - out = null; - err = null; - } -} diff --git a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/threadio/ThreadIOImpl.java b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/threadio/ThreadIOImpl.java index 60d4ddfdba..b6e659486a 100644 --- a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/threadio/ThreadIOImpl.java +++ b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/threadio/ThreadIOImpl.java @@ -19,120 +19,149 @@ // DWB20: ThreadIO should check and reset IO if something (e.g. jetty) overrides package org.apache.felix.gogo.runtime.threadio; +import java.io.Closeable; +import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.io.PrintStream; import java.util.logging.Logger; +import org.apache.felix.service.systemio.SystemIO; import org.apache.felix.service.threadio.ThreadIO; import org.osgi.annotation.bundle.Capability; import org.osgi.namespace.service.ServiceNamespace; -@Capability( - namespace = ServiceNamespace.SERVICE_NAMESPACE, - attribute = "objectClass='org.apache.felix.service.threadio.ThreadIO'" -) -public class ThreadIOImpl implements ThreadIO +@Capability(namespace = ServiceNamespace.SERVICE_NAMESPACE, + attribute = "objectClass='org.apache.felix.service.threadio.ThreadIO'") +public class ThreadIOImpl extends InputStream implements ThreadIO { - static private final Logger log = Logger.getLogger(ThreadIOImpl.class.getName()); - - final Marker defaultMarker = new Marker(System.in, System.out, System.err, null); - final ThreadPrintStream err = new ThreadPrintStream(this, System.err, true); - final ThreadPrintStream out = new ThreadPrintStream(this, System.out, false); - final ThreadInputStream in = new ThreadInputStream(this, System.in); - final ThreadLocal current = new InheritableThreadLocal() - { - @Override - protected Marker initialValue() - { - return defaultMarker; - } - }; - - public void start() - { - if (System.out instanceof ThreadPrintStream) - { - throw new IllegalStateException("Thread Print Stream already set"); - } - System.setOut(out); - System.setIn(in); - System.setErr(err); - } - - public void stop() - { - System.setErr(defaultMarker.err); - System.setOut(defaultMarker.out); - System.setIn(defaultMarker.in); - } - - private void checkIO() - { // derek - if (System.in != in) - { - log.fine("ThreadIO: eek! who's set System.in=" + System.in); - System.setIn(in); - } - - if (System.out != out) - { - log.fine("ThreadIO: eek! who's set System.out=" + System.out); - System.setOut(out); - } - - if (System.err != err) - { - log.fine("ThreadIO: eek! who's set System.err=" + System.err); - System.setErr(err); - } - } - - Marker current() - { - Marker m = current.get(); - if (m.deactivated) - { - while (m.deactivated) + static final Logger log = Logger.getLogger(ThreadIOImpl.class.getName()); + final SystemIO systemio; + final ThreadLocal threadLocal = new ThreadLocal<>(); + Closeable system; + + class Streams + { + final PrintStream out; + final PrintStream err; + final InputStream in; + Streams prev; + + Streams(InputStream in, PrintStream out, PrintStream err) + { + this.in = in; + this.out = out; + this.err = err; + } + + } + + abstract class ThreadOutStream extends OutputStream + { + @Override + public void write(int b) throws IOException + { + Streams streams = threadLocal.get(); + if (streams == null) + return; + + get(streams).write(b); + } + + @Override + public void write(byte[] b) throws IOException + { + write(b, 0, b.length); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException + { + Streams streams = threadLocal.get(); + if (streams == null) + return; + + get(streams).write(b, off, len); + } + + abstract OutputStream get(Streams s); + } + + + public ThreadIOImpl(SystemIO systemio) + { + this.systemio = systemio; + } + + public void start() + {} + + public void stop() + { + if (system != null) + try + { + system.close(); + } + catch (IOException e) + { + // ignore + } + } + + public void close() + { + Streams streams = threadLocal.get(); + if (streams != null) + threadLocal.set(streams.prev); + } + + public void setStreams(InputStream in, PrintStream out, PrintStream err) + { + init(); + Streams s = new Streams(in, out, err); + s.prev = threadLocal.get(); + threadLocal.set(s); + } + + private synchronized void init() + { + if (system == null) + { + system = systemio.system(this, new ThreadOutStream() + { + + @Override + OutputStream get(Streams s) { - m = m.previous; + return s.out; } - current.set(m); - } - return m; - } - - public void close() - { - checkIO(); // derek - Marker top = this.current.get(); - if (top == null) - { - throw new IllegalStateException("No thread io active"); - } - if (top != defaultMarker) - { - top.deactivate(); - this.current.set(top.previous); - } - } - - public void setStreams(InputStream in, PrintStream out, PrintStream err) - { - assert in != null; - assert out != null; - assert err != null; - checkIO(); // derek - Marker prev = current(); - if (in == this.in) { - in = prev.getIn(); - } - if (out == this.out) { - out = prev.getOut(); - } - if (err == this.err) { - err = prev.getErr(); - } - Marker marker = new Marker(in, out, err, prev); - this.current.set(marker); - } + + }, new ThreadOutStream() + { + + @Override + OutputStream get(Streams s) + { + return s.err; + } + }); + } + } + + @Override + public int read() throws IOException + { + Streams s = threadLocal.get(); + while (s != null) + { + if (s.in == null) + s = s.prev; + else + { + return s.in.read(); + } + } + return SystemIO.NO_DATA; + } } diff --git a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/threadio/ThreadInputStream.java b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/threadio/ThreadInputStream.java deleted file mode 100644 index af86a316f0..0000000000 --- a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/threadio/ThreadInputStream.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.felix.gogo.runtime.threadio; - -import java.io.IOException; -import java.io.InputStream; - -public class ThreadInputStream extends InputStream -{ - final InputStream dflt; - final ThreadIOImpl io; - - public ThreadInputStream(ThreadIOImpl threadIO, InputStream in) - { - io = threadIO; - dflt = in; - } - - private InputStream getCurrent() - { - Marker marker = io.current(); - return marker.getIn(); - } - - /** - * Access to the root stream through reflection - * @return InputStream - */ - public InputStream getRoot() - { - return dflt; - } - - // - // Delegate methods - // - - public int read() throws IOException - { - return getCurrent().read(); - } - - public int read(byte[] b) throws IOException - { - return getCurrent().read(b); - } - - public int read(byte[] b, int off, int len) throws IOException - { - return getCurrent().read(b, off, len); - } - - public long skip(long n) throws IOException - { - return getCurrent().skip(n); - } - - public int available() throws IOException - { - return getCurrent().available(); - } - - public void close() throws IOException - { - getCurrent().close(); - } - - public void mark(int readlimit) - { - getCurrent().mark(readlimit); - } - - public void reset() throws IOException - { - getCurrent().reset(); - } - - public boolean markSupported() - { - return getCurrent().markSupported(); - } -} diff --git a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/threadio/ThreadPrintStream.java b/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/threadio/ThreadPrintStream.java deleted file mode 100644 index decca062fe..0000000000 --- a/gogo/runtime/src/main/java/org/apache/felix/gogo/runtime/threadio/ThreadPrintStream.java +++ /dev/null @@ -1,226 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.felix.gogo.runtime.threadio; - -import java.io.IOException; -import java.io.PrintStream; -import java.util.Locale; - -public class ThreadPrintStream extends PrintStream -{ - final PrintStream dflt; - final ThreadIOImpl io; - final boolean errorStream; - - public ThreadPrintStream(ThreadIOImpl threadIO, PrintStream out, boolean error) - { - super(out); - dflt = out; - io = threadIO; - errorStream = error; - } - - public PrintStream getCurrent() - { - Marker marker = io.current(); - return errorStream ? marker.getErr() : marker.getOut(); - } - - /** - * Access to the root stream through reflection - */ - public PrintStream getRoot() - { - return dflt; - } - - // - // Delegate methods - // - - public void flush() - { - getCurrent().flush(); - } - - public void close() - { - getCurrent().close(); - } - - public boolean checkError() - { - return getCurrent().checkError(); - } - - public void setError() - { - // getCurrent().setError(); - } - - public void clearError() - { - // getCurrent().clearError(); - } - - public void write(int b) - { - getCurrent().write(b); - } - - public void write(byte[] buf, int off, int len) - { - getCurrent().write(buf, off, len); - } - - public void print(boolean b) - { - getCurrent().print(b); - } - - public void print(char c) - { - getCurrent().print(c); - } - - public void print(int i) - { - getCurrent().print(i); - } - - public void print(long l) - { - getCurrent().print(l); - } - - public void print(float f) - { - getCurrent().print(f); - } - - public void print(double d) - { - getCurrent().print(d); - } - - public void print(char[] s) - { - getCurrent().print(s); - } - - public void print(String s) - { - getCurrent().print(s); - } - - public void print(Object obj) - { - getCurrent().print(obj); - } - - public void println() - { - getCurrent().println(); - } - - public void println(boolean x) - { - getCurrent().println(x); - } - - public void println(char x) - { - getCurrent().println(x); - } - - public void println(int x) - { - getCurrent().println(x); - } - - public void println(long x) - { - getCurrent().println(x); - } - - public void println(float x) - { - getCurrent().println(x); - } - - public void println(double x) - { - getCurrent().println(x); - } - - public void println(char[] x) - { - getCurrent().println(x); - } - - public void println(String x) - { - getCurrent().println(x); - } - - public void println(Object x) - { - getCurrent().println(x); - } - - public PrintStream printf(String format, Object... args) - { - return getCurrent().printf(format, args); - } - - public PrintStream printf(Locale l, String format, Object... args) - { - return getCurrent().printf(l, format, args); - } - - public PrintStream format(String format, Object... args) - { - return getCurrent().format(format, args); - } - - public PrintStream format(Locale l, String format, Object... args) - { - return getCurrent().format(l, format, args); - } - - public PrintStream append(CharSequence csq) - { - return getCurrent().append(csq); - } - - public PrintStream append(CharSequence csq, int start, int end) - { - return getCurrent().append(csq, start, end); - } - - public PrintStream append(char c) - { - return getCurrent().append(c); - } - - public void write(byte[] b) throws IOException - { - getCurrent().write(b); - } -} diff --git a/gogo/runtime/src/main/java/org/apache/felix/service/command/CommandSession.java b/gogo/runtime/src/main/java/org/apache/felix/service/command/CommandSession.java index e4e6df1800..7082d2e352 100644 --- a/gogo/runtime/src/main/java/org/apache/felix/service/command/CommandSession.java +++ b/gogo/runtime/src/main/java/org/apache/felix/service/command/CommandSession.java @@ -111,6 +111,12 @@ public interface CommandSession extends AutoCloseable * @return Object */ Object convert(Class type, Object instance); + + /** + * When this session is stopped, execute the runnable. + * @param runnable the runnable to run + */ + void onClose( Runnable runnable); // // Job support diff --git a/gogo/runtime/src/main/java/org/apache/felix/service/systemio/SystemIO.java b/gogo/runtime/src/main/java/org/apache/felix/service/systemio/SystemIO.java new file mode 100644 index 0000000000..c02ab2d1a1 --- /dev/null +++ b/gogo/runtime/src/main/java/org/apache/felix/service/systemio/SystemIO.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.felix.service.systemio; + +import java.io.Closeable; +import java.io.InputStream; +import java.io.OutputStream; + +/** + * A simple service to listen to the system streams. The System.out and System.err writes will be dispatched to all + * registered listeners as well as the original streams. If a read is done on System.in, the last registered will handle + * the read. If no one is registered, the original System.in is used. + *

+ * The purpose of this service is to share the System.in, System.out, and System.err singletons. + *

+ * Implementations must warn if someone else overrides the System.xxx streams and not reset them if this happens. + * + */ +public interface SystemIO +{ + /** + * A framework property signalling that Gogo should use an external SystemIO service instead of its build in one. The + * property value must be the number of milliseconds to wait for this external service. Any value <=0 or not numeric + * will result in using the internal implementation. + */ + + String TIMEOUT = "org.apache.felix.gogo.systemio.timeout"; + + /** + * An input stream can return this from {@link InputStream#read()} when it has no data. This should force the + * implementation to look for another input stream. + */ + int NO_DATA = -42; + + /** + * Register overrides for the System streams. If a stream is null, it will not be registered. The stdin InputStream + * can return {@link #NO_DATA} if it does not want to be used as the last input stream. This can be used to filter + * for example by the current thread. + * + * @param stdin the System.in handler or null. + * @param stdout the System.out listener + * @param stderr the System.err listener + * @return a closeable that when closed will unregister the streams + */ + Closeable system(InputStream stdin, OutputStream stdout, OutputStream stderr); +} diff --git a/gogo/runtime/src/main/java/org/apache/felix/service/threadio/ThreadIO.java b/gogo/runtime/src/main/java/org/apache/felix/service/threadio/ThreadIO.java index 08b04f53aa..b408433b91 100644 --- a/gogo/runtime/src/main/java/org/apache/felix/service/threadio/ThreadIO.java +++ b/gogo/runtime/src/main/java/org/apache/felix/service/threadio/ThreadIO.java @@ -21,6 +21,8 @@ import java.io.InputStream; import java.io.PrintStream; +import org.apache.felix.service.systemio.SystemIO; + /** * Enable multiplexing of the standard IO streams for input, output, and error. *

diff --git a/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/AbstractParserTest.java b/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/AbstractParserTest.java index 323842588b..eb88e11b33 100644 --- a/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/AbstractParserTest.java +++ b/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/AbstractParserTest.java @@ -23,6 +23,7 @@ import java.io.OutputStream; import java.io.PrintStream; +import org.apache.felix.gogo.runtime.systemio.SystemIOImpl; import org.apache.felix.gogo.runtime.threadio.ThreadIOImpl; import org.junit.After; import org.junit.Before; @@ -33,19 +34,23 @@ public abstract class AbstractParserTest { private InputStream sin; private PrintStream sout; private PrintStream serr; + private SystemIOImpl systemIO; @Before public void setUp() { sin = new NoCloseInputStream(System.in); sout = new NoClosePrintStream(System.out); serr = new NoClosePrintStream(System.err); - threadIO = new ThreadIOImpl(); + systemIO = new SystemIOImpl(); + systemIO.start(); + threadIO = new ThreadIOImpl(systemIO); threadIO.start(); } @After public void tearDown() { threadIO.stop(); + systemIO.stop(); } public class Context extends org.apache.felix.gogo.runtime.Context { diff --git a/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/CommandSessionTest.java b/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/CommandSessionTest.java new file mode 100644 index 0000000000..4cc5a08545 --- /dev/null +++ b/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/CommandSessionTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.felix.gogo.runtime; + +import static org.junit.Assert.assertEquals; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + + +public class CommandSessionTest +{ + + @Test + public void onCloseTest() + { + CommandProcessorImpl processor = new CommandProcessorImpl(null); + ByteArrayInputStream bais = new ByteArrayInputStream("".getBytes()); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + final AtomicInteger b = new AtomicInteger(0); + try (CommandSessionImpl session = processor.createSession(bais, baos, baos)) + { + + + Runnable runnable = new Runnable() + { + + @Override + public void run() + { + b.incrementAndGet(); + } + }; + session.onClose(runnable); + session.onClose(runnable); + + assertEquals(0,b.get()); + System.gc(); + } + + assertEquals(2,b.get()); + } + + @Test + public void onCloseTestGc() + { + CommandProcessorImpl processor = new CommandProcessorImpl(null); + ByteArrayInputStream bais = new ByteArrayInputStream("".getBytes()); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + final AtomicInteger b = new AtomicInteger(0); + try (CommandSessionImpl session = processor.createSession(bais, baos, baos)) + { + + + session.onClose(new Runnable() + { + + @Override + public void run() + { + b.incrementAndGet(); + } + }); + session.onClose(new Runnable() + { + + @Override + public void run() + { + b.incrementAndGet(); + } + }); + + assertEquals(0,b.get()); + } + + assertEquals(2,b.get()); + } + +} diff --git a/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/TestTokenizer.java b/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/TestTokenizer.java index d41d2617b7..44d50768ca 100644 --- a/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/TestTokenizer.java +++ b/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/TestTokenizer.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; +import org.apache.felix.gogo.runtime.systemio.SystemIOImpl; import org.apache.felix.gogo.runtime.threadio.ThreadIOImpl; import org.junit.Before; import org.junit.Test; @@ -501,7 +502,9 @@ public void close() { public void close() { } }; - ThreadIOImpl tio = new ThreadIOImpl(); + SystemIOImpl sio = new SystemIOImpl(); + sio.start(); + ThreadIOImpl tio = new ThreadIOImpl(sio); tio.start(); try @@ -520,6 +523,7 @@ public void close() { finally { tio.stop(); + sio.stop(); } } diff --git a/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/threadio/TestIOWithFramework.java b/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/threadio/TestIOWithFramework.java new file mode 100644 index 0000000000..59a4c55c85 --- /dev/null +++ b/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/threadio/TestIOWithFramework.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.felix.gogo.runtime.threadio; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.util.HashMap; +import java.util.Map; +import java.util.ServiceLoader; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.felix.gogo.runtime.activator.Activator; +import org.apache.felix.gogo.runtime.activator.ServiceFacade; +import org.apache.felix.service.systemio.SystemIO; +import org.apache.felix.service.threadio.ThreadIO; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.osgi.framework.BundleContext; +import org.osgi.framework.BundleException; +import org.osgi.framework.Constants; +import org.osgi.framework.ServiceException; +import org.osgi.framework.ServiceReference; +import org.osgi.framework.launch.Framework; + +public class TestIOWithFramework +{ + private Framework framework; + private File tmp; + + @Before + public void setup() throws Exception + { + tmp = Files.createTempDirectory("TestIOWithFramework").toFile(); + } + + private void fw(Map configuration) throws BundleException + { + if (configuration == null) + { + configuration = new HashMap<>(); + } + + configuration.put(Constants.FRAMEWORK_STORAGE, tmp.getAbsolutePath()); + configuration.put(Constants.FRAMEWORK_STORAGE_CLEAN, Constants.FRAMEWORK_STORAGE_CLEAN_ONFIRSTINIT); + framework = ServiceLoader.load(org.osgi.framework.launch.FrameworkFactory.class).iterator().next() + .newFramework(configuration); + framework.init(); + framework.start(); + } + + @After + public void after() throws Exception + { + framework.stop(); + framework.waitForStop(100000); + delete(tmp); + } + + void delete(File f) + { + if (f.isFile()) + { + f.delete(); + } + else + { + for (File sub : f.listFiles()) + { + delete(sub); + } + } + } + + @Test + public void testWithFrameworkAndDefault() throws Exception + { + fw(null); + BundleContext context = framework.getBundleContext(); + Activator a = new Activator(); + a.start(context); + assertNotNull(context.getServiceReference(SystemIO.class)); + assertNotNull(context.getServiceReference(ThreadIO.class)); + a.stop(context); + } + + @Test + public void testWithExternalSystemIO() throws Exception + { + Map configuration = new HashMap<>(); + configuration.put("org.apache.felix.gogo.systemio.timeout", "5000"); + fw(configuration); + BundleContext context = framework.getBundleContext(); + + Closeable c = mock(Closeable.class); + SystemIO sio = mock(SystemIO.class); + when(sio.system(Mockito.any(InputStream.class), Mockito.any(OutputStream.class), Mockito.any(OutputStream.class))) + .thenReturn(c); + context.registerService(SystemIO.class, sio, null); + + Activator a = new Activator(); + a.start(context); + + ServiceReference ref = context.getServiceReference(ThreadIO.class); + assertNotNull(ref); + ThreadIO tio = context.getService(ref); + assertNotNull(tio); + tio.setStreams(null, null, null); + a.stop(context); + + Mockito.verify(c).close(); + } + + public interface Foo + { + void bar(); + } + + @Test(expected = ServiceException.class) + public void testFacadeWithoutService() throws BundleException, IOException + { + fw(null); + try (ServiceFacade sf = new ServiceFacade<>(Foo.class, framework.getBundleContext(), 500)) + { + Foo foo = sf.get(); + + foo.bar(); + } + } + + @Test + public void testFacadeWithService() throws BundleException, IOException + { + fw(null); + final AtomicInteger ai = new AtomicInteger(0); + framework.getBundleContext().registerService(Foo.class, new Foo() { + + @Override + public void bar() + { + ai.incrementAndGet(); + } + + }, null); + try (ServiceFacade sf = new ServiceFacade<>(Foo.class, framework.getBundleContext(), 5000)) + { + Foo foo = sf.get(); + + long time = System.currentTimeMillis(); + foo.bar(); + if ( System.currentTimeMillis() > time + 4000) + fail("Took too much time"); + + assertEquals(1, ai.get()); + } + } +} diff --git a/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/threadio/TestThreadIO.java b/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/threadio/TestThreadIO.java index 5ffe41737e..0b3b39e437 100644 --- a/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/threadio/TestThreadIO.java +++ b/gogo/runtime/src/test/java/org/apache/felix/gogo/runtime/threadio/TestThreadIO.java @@ -18,74 +18,154 @@ */ package org.apache.felix.gogo.runtime.threadio; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.Closeable; +import java.io.IOException; import java.io.PrintStream; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import org.apache.felix.gogo.runtime.systemio.SystemIOImpl; import org.junit.Assert; import org.junit.Test; -public class TestThreadIO { +public class TestThreadIO +{ - /** - * Test if the threadio works in a nested fashion. We first push - * ten markers on the stack and print a message for each, capturing - * the output in a ByteArrayOutputStream. Then we pop them, also printing - * a message identifying the level. Then we verify the output for each level. - */ - @Test - public void testNested() - { - ThreadIOImpl tio = new ThreadIOImpl(); - tio.start(); - List list = new ArrayList<>(); - for (int i = 0; i < 10; i++) - { + /** + * Test if the threadio works in a nested fashion. We first push ten markers on the stack and print a message for + * each, capturing the output in a ByteArrayOutputStream. Then we pop them, also printing a message identifying the + * level. Then we verify the output for each level. + */ + @Test + public void testNested() + { + SystemIOImpl systemio = new SystemIOImpl(); + systemio.start(); + ThreadIOImpl tio = new ThreadIOImpl(systemio); + try + { + tio.start(); + List list = new ArrayList<>(); + for (int i = 0; i < 10; i++) + { ByteArrayOutputStream out = new ByteArrayOutputStream(); list.add(out); tio.setStreams(System.in, new PrintStream(out), System.err); System.out.print("b" + i); - } - for (int i = 9; i >= 0; i--) - { + } + for (int i = 9; i >= 0; i--) + { System.out.println("e" + i); tio.close(); - } - tio.stop(); - for (int i = 0; i < 10; i++) - { + } + for (int i = 0; i < 10; i++) + { String message = list.get(i).toString().trim(); Assert.assertEquals("b" + i + "e" + i, message); - } - } + } + } + finally + { + tio.stop(); + systemio.stop(); + } + } - /** - * Simple test too see if the basics work. - */ - @Test - public void testSimple() - { - ThreadIOImpl tio = new ThreadIOImpl(); - tio.start(); - System.out.println("Hello World"); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - ByteArrayOutputStream err = new ByteArrayOutputStream(); - tio.setStreams(System.in, new PrintStream(out), new PrintStream(err)); - try - { - System.out.println("Simple Normal Message"); - System.err.println("Simple Error Message"); - } - finally - { - tio.close(); - } - tio.stop(); - String normal = out.toString().trim(); - //String error = err.toString().trim(); - Assert.assertEquals("Simple Normal Message", normal); - //assertEquals("Simple Error Message", error ); - System.out.println("Goodbye World"); - } + /** + * Simple test too see if the basics work. + * @throws IOException + */ + @SuppressWarnings("resource") + @Test + public void testSimple() throws IOException + { + SystemIOImpl systemio = new SystemIOImpl(); + systemio.start(); + ThreadIOImpl tio = new ThreadIOImpl(systemio); + tio.start(); + try + { + System.out.println("Hello World"); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + ByteArrayOutputStream err = new ByteArrayOutputStream(); + tio.setStreams(System.in, new PrintStream(out), new PrintStream(err)); + + System.out.println("Simple Normal Message"); + System.err.println("Simple Error Message"); + tio.stop(); + String normal = out.toString().trim(); + // String error = err.toString().trim(); + Assert.assertEquals("Simple Normal Message", normal); + // assertEquals("Simple Error Message", error ); + System.out.println("Goodbye World"); + } + finally + { + systemio.close(); + tio.close(); + } + } + + @Test + @SuppressWarnings("resource") + public void testNullInputStream() throws IOException { + SystemIOImpl systemio = new SystemIOImpl(); + systemio.start(); + ThreadIOImpl tio = new ThreadIOImpl(systemio); + tio.start(); + try + { + byte[] test = "abc".getBytes(StandardCharsets.UTF_8); + ByteArrayInputStream bin = new ByteArrayInputStream( test); + tio.setStreams(bin, null, null); + tio.setStreams(null, null, null); + byte data[] = new byte[3]; + System.in.read(data); + assertTrue(Arrays.equals(test, data)); + + tio.close(); + } + finally + { + tio.stop(); + systemio.close(); + } + } + + @Test + @SuppressWarnings("resource") + public void testNoInputStreamSetInThreadIO() throws IOException { + SystemIOImpl systemio = new SystemIOImpl(); + systemio.start(); + byte[] test = "abc".getBytes(StandardCharsets.UTF_8); + ByteArrayInputStream bin = new ByteArrayInputStream( test); + Closeable system = systemio.system(bin, null,null); + ThreadIOImpl tio = new ThreadIOImpl(systemio); + tio.start(); + try + { + byte data[] = new byte[3]; + System.in.read(data); + assertTrue(Arrays.equals(test, data)); + tio.close(); + } + finally + { + system.close(); + systemio.close(); + tio.stop(); + } + } + + @Test + public void testWithFrameworkService() { + + } }