RosettaCodeData/Task/Checkpoint-synchronization/E/checkpoint-synchronization.e

115 lines
3.1 KiB
Plaintext

/** A flagSet solves this problem: There are N things, each in a true or false
* state, and we want to know whether they are all true (or all false), and be
* able to bulk-change all of them, and all this without allowing double-
* counting -- setting a flag twice is idempotent.
*/
def makeFlagSet() {
# Each flag object is either in the true set or the false set.
def trues := [].asSet().diverge()
def falses := [].asSet().diverge()
return def flagSet {
/** Add a flag to the set. */
to join() {
def flag {
/** Get the value of this flag. */
to get() :boolean {
}
/** Set the value of this flag. */
to put(v :boolean) {
def [del,add] := if (v) { [falses,trues] } else { [trues,falses] }
if (del.contains(flag)) {
del.remove(flag)
add.addElement(flag)
}
}
/** Remove this flag from the set. */
to leave() :void {
trues.remove(flag)
falses.remove(flag)
}
}
falses.addElement(flag)
return flag
}
/** Are all the flags true (none false)? */
to allTrue() { return falses.size().isZero() }
/** Are all the flags false (none true)? */
to allFalse() { return trues.size().isZero() }
/** Set all the flags to the same value. */
to setAll(v :boolean) {
def [del,add] := if (v) { [falses,trues] } else { [trues,falses] }
add.addAll(del)
del.removeAll(del)
}
}
}
def makeCheckpoint() {
def [var continueSignal, var continueRes] := Ref.promise()
def readies := makeFlagSet()
/** Check whether all tasks have reached the checkpoint, and if so send the
* signal and go to the next round. */
def check() {
if (readies.allTrue()) {
readies.setAll(false)
continueRes.resolve(null) # send the continue signal
def [p, r] := Ref.promise() # prepare a new continue signal
continueSignal := p
continueRes := r
}
}
return def checkpoint {
to join() {
def &flag := readies.join()
return def membership {
to leave() {
(&flag).leave()
check <- ()
}
to deliver() {
flag := true
check <- ()
return continueSignal
}
}
}
}
}
def makeWorker(piece, checkpoint) {
def stops := timer.now() + 3000 + entropy.nextInt(2000)
var count := 0
def checkpointMember := checkpoint <- join()
def stopped
def run() {
# Pretend to do something lengthy; up to 1000 ms.
timer.whenPast(timer.now() + entropy.nextInt(1000), fn {
if (timer.now() >= stops) {
checkpointMember <- leave()
bind stopped := true
} else {
count += 1
println(`Delivering $piece#$count`)
when (checkpointMember <- deliver()) -> {
println(`Delivered $piece#$count`)
run()
}
}
})
}
run()
return stopped
}
def checkpoint := makeCheckpoint()
var waits := []
for piece in 1..5 {
waits with= makeWorker(piece, checkpoint)
}
interp.waitAtTop(promiseAllFulfilled(waits))