monitor Monitor(value: Type) {
def foo() {
statements
}
...
}It is equivalent to:
class Monitor(value: Type) {
synchronized def foo() {
try {
statements
} finally {
this.notifyAll;
}
}
...
}wait condition until deadlineconditionis a boolean expression, evaluated on each loop iteration.deadlineis of type Time, computed on each iteration.
Wait for condition to become true until deadline occurs.
It is equivalent to
while (!condition && deadline > now()) {
this.wait(deadline - now());
}wait conditionThe statement is unblocked when the condition becomes true.
wait until deadlineIt just waits till system time reaches the given deadline.
wait condition for timeoutThis is equivalent to
val beg = now()
wait condition until beg + timeoutIt also has the following short forms, like:
wait condition
wait for timeoutwhile (condition1) {
statements1
} else if (condition2) {
statements2
...
}I find elif is nicer than else if, but it is very unusual for Java/Scala world.
The Dijkstra's while loop is equivalent to
while (true) {
if (condition1) {
statements1
} else if (condition2) {
statements2
...
} else {
break;
}
}We prefer to use not, or, and over !, ||, &&.
monitor AtomicInc(var value: Int = 0) {
def get = value
def incAndGet: Int = {
value++
value
}
def set(v: Int) {
value = v
}
def waitUntilExceeds(v: Int) {
wait value > v
}
}monitor CompareAndSwap[V](var value: V = _) {
def get = value
def cas(prevValue: V, newValue: V): Boolean =
if (value == prevValue) {
value = newValue
true
} else false
}monitor Barrier(maxArrivals: Int) {
private var nArrivals: Int
def arrive() {
nArrivals++
wait nArrivals >= maxArrivals
}
}monitor AutoBarrier(maxArrivals: Int) {
private var nArrivals: Int
def arrive() {
nArrivals++
thisMaxArrivals = nArrivals - nArrivals % maxArrivals + maxArrivals
wait nArrivals >= thisMaxArrivals
}
}monitor Event {
private var signaled: Boolean
def signal() {
signaled = true
}
def wait() {
wait signaled
}
def reset() {
signaled = false
}
}monitor AutoEvent {
private var signaled: Boolean
def signal() {
signaled = true
}
def wait() {
wait signaled
signaled = false
}
}monitor ConditionEvent {
private var nSignaled: Int
private var nPending: Int
def signal() {
nSignaled++
}
def signalAll() {
nSignaled = nPending
}
def wait() {
val id = nPending
nPending++
wait nSignaled > id
}
}Note that ConditionVar is just a class, since we use ConditionEvent
class ConditionVar(mutex: Mutex) {
private val event = new ConditionEvent
def await() {
mutex.unlock
try {
event.wait
} finally {
mutex.lock
}
}
def notify() {
event.signal
}
def notifyAll() {
event.signalAll
}
}monitor Mutex {
private var locked: Boolean
def isLocked = locked
def lock() {
wait not locked
locked = true
}
def unlock() {
locked = false
}
def tryLock(timeout: Duration = 0): Boolean = {
wait not locked for timeout
if (not locked) {
locked = true
true
} else false
}
def conditionVar(): ConditionVar = new ConditionVar(this)
}monitor Semaphore(nUnits: Int = 0) {
def release() {
nUnits++
}
def acquire() {
wait nUnits > 0
nUnits--
}
def tryAcquire(timeout: Duration = 0): Boolean = {
wait nUnits > 0 for timeout
if (nUnits > 0) {
nUnits--
true
} else false
}
}monitor ReadWriteLock {
private var nReaders: Int
private var nWriters: Int
def lockRead() {
wait nWriters == 0
nReaders++
}
def unlockRead() {
nReaders--
}
def lockWrite() {
wait nWriters + nReaders == 0
nWriters++
}
def unlockWrite() {
nWriters--
}
}monitor ReadWriteLock {
private var nReaders: Int
private var nWriters: Int
private var nWriteReqs: Int
def lockRead() {
wait nWriters + nWriteReqs == 0
nReaders++
}
def unlockRead() {
nReaders--
}
def lockWrite() {
nWriteReqs++
wait nWriters + nReaders == 0
nWriteReqs--
nWriters++
}
def unlockWrite() {
nWriters--
}
}monitor UnboundedBlockingQueue[V] {
private val values = new Queue[V]
def enqueue(value: V) {
values.enqueue(value)
}
def dequeue(): V = {
wait not values.isEmpty
values.dequeue
}
}monitor BlockingQueue[V](capacity: Int) {
private val values = new Queue[V]
def enqueue(value: V) {
wait values.size < capacity
values.enqueue(value)
}
def dequeue(): V = {
wait not values.isEmpty
values.dequeue
}
}monitor DelayQueue[V] {
case class Entry(value: V, when: Time)
private val entries = new PriorityQueue[Entry](e => e.when)
def enqueue(value: V, timeout: Duration = 0) {
entries.enqueue(Entry(value, now() + timeout))
}
def dequeue(): V = {
while (entries.isEmpty) {
this.wait
} else if (entries.head.when > now()) {
this.wait(entries.head.when - now())
}
entries.dequeue.value
}
}monitor DelayQueue[V] {
case class Entry(value: V, when: Time)
private val entries = new PriorityQueue[Entry]()(Ordering.by{e => -e.when})
def enqueue(value: V, timeout: Duration = 0) {
entries.enqueue(Entry(value, now() + timeout))
}
def dequeue(): V = {
wait not entries.isEmpty and entries.head.when <= now()
until if (entries.isEmpty) INFINITY else entries.head.when
entries.dequeue.value
}
}class DelayExecutor(exec: Executor) {
private val runnables = new DelayQueue[Runnable]
private val worker = new Thread {
override def run() {
while (true) {
exec.execute(runnables.dequeue)
}
}
}
worker.start
def execute(r: Runnable, timeout: Duration = 0) {
if (timeout == 0) {
exec.execute(r)
} else {
runnables.enqueue(r, timeout)
}
}
}monitor ExpireMap[K, V] {
class MEntry(var value: V = _, var it: DelayQueue[Entry].iterator=_)
private val keys = new DelayQueue[K]
private val table = new HashMap[K, MEntry]
private val worker = new Thread {
override def run() {
while (true) {
remove(keys.dequeue)
}
}
}
worker.start
def contains(key K): Boolean = table.contains(key)
def get(key: K): V = table.getOrElse(key, new MEntry).value
def put(key: K, value: V) {
table.getOrElseUpdate(key, new MEntry).value = value
}
def remove(key: K, timeout: Duration = 0) {
table.get(key) match {
case Some(e) => {
if (e.it != null) {
e.it.remove
}
if (timeout > 0) {
e.it = keys.enqueue(key, timeout)
} else {
table.remove(key)
}
}
case _ =>
}
}
}