#![feature(mpsc_select)] extern crate num; extern crate schedule_recv; use num::traits::Zero; use num::Float; use schedule_recv::periodic_ms; use std::f64::consts::PI; use std::ops::Mul; use std::sync::mpsc::{self, SendError, Sender}; use std::sync::{Arc, Mutex}; use std::thread; use std::time::Duration; pub type Actor = Sender S + Send>>; pub type ActorResult = Result<(), SendError S + Send>>>; /// Rust supports both shared-memory and actor models of concurrency, and the `Integrator` utilizes /// both. We use an `Actor` to send the `Integrator` new functions, while we use a `Mutex` /// (shared-memory concurrency) to hold the result of the integration. /// /// Note that these are not the only options here--there are many, many ways you can deal with /// concurrent access. But when in doubt, a plain old `Mutex` is often a good bet. For example, /// this might look like a good situation for a `RwLock`--after all, there's no reason for a read /// in the main task to block writes. Unfortunately, unless you have significantly more reads than /// writes (which is certainly not the case here), a `Mutex` will usually outperform a `RwLock`. pub struct Integrator { input: Actor, output: Arc>, } /// In Rust, time durations are strongly typed. This is usually exactly what you want, but for a /// problem like this--where the integrated value has unusual (unspecified?) units--it can actually /// be a bit tricky. Right now, `Duration`s can only be multiplied or divided by `i32`s, so in /// order to be able to actually do math with them we say that the type parameter `S` (the result /// of the function being integrated) must yield `T` (the type of the integrated value) when /// multiplied by `f64`. We could possibly replace `f64` with a generic as well, but it would make /// things a bit more complex. impl Integrator where S: Mul + Float + Zero, T: 'static + Clone + Send + Float, { pub fn new(frequency: u32) -> Integrator { // We create a pipe allowing functions to be sent from tx (the sending end) to input (the // receiving end). In order to change the function we are integrating from the task in // which the Integrator lives, we simply send the function through tx. let (tx, input) = mpsc::channel(); // The easiest way to do shared-memory concurrency in Rust is to use atomic reference // counting, or Arc, around a synchronized type (like Mutex). Arc gives you a guarantee // that memory will not be freed as long as there is at least one reference to it. // It is similar to C++'s shared_ptr, but it is guaranteed to be safe and is never // incremented unless explicitly cloned (by default, it is moved). let s: Arc> = Arc::new(Mutex::new(Zero::zero())); let integrator = Integrator { input: tx, // Here is the aforementioned clone. We have to do it before s enters the closure, // because once that happens it is moved into the closure (and later, the new task) and // becomes inaccessible to the outside world. output: Arc::clone(&s), }; thread::spawn(move || -> () { // The frequency is how often we want to "tick" as we update our integrated total. In // Rust, timers can yield Receivers that are periodically notified with an empty // message (where the period is the frequency). This is useful because it lets us wait // on either a tick or another type of message (in this case, a request to change the // function we are integrating). let periodic = periodic_ms(frequency); let mut t = 0; let mut k: Box S + Send> = Box::new(|_| Zero::zero()); let mut k_0: S = Zero::zero(); loop { // Here's the selection we talked about above. Note that we are careful to call // the *non*-failing function, recv(), here. The reason we do this is because // recv() will return Err when the sending end of a channel is dropped. While // this is unlikely to happen for the timer (so again, you could argue for failure // there), it's normal behavior for the sending end of input to be dropped, since // it just happens when the Integrator falls out of scope. So we handle it cleanly // and break out of the loop, rather than failing. select! { res = periodic.recv() => match res { Ok(_) => { t += frequency; let k_1: S = k(t); // Rust Mutexes are a bit different from Mutexes in many other // languages, in that the protected data is actually encapsulated by // the Mutex. The reason for this is that Rust is actually capable of // enforcing (via its borrow checker) the invariant that the contents // of a Mutex may only be read when you have acquired its lock. This // is enforced by way of a MutexGuard, the return value of lock(), // which implements some special traits (Deref and DerefMut) that allow // access to the inner element "through" the guard. The element so // acquired has a lifetime bounded by that of the MutexGuard, the // MutexGuard can only be acquired by taking a lock, and the only way // to release the lock is by letting the MutexGuard fall out of scope, // so it's impossible to access the data incorrectly. There are some // additional subtleties around the actual implementation, but that's // the basic idea. let mut s = s.lock().unwrap(); *s = *s + (k_1 + k_0) * (f64::from(frequency) / 2.); k_0 = k_1; } Err(_) => break, }, res = input.recv() => match res { Ok(k_new) => k = k_new, Err(_) => break, } } } }); integrator } pub fn input(&self, k: Box S + Send>) -> ActorResult { // The meat of the work is done in the other thread, so to set the // input we just send along the Sender we set earlier... self.input.send(k) } pub fn output(&self) -> T { // ...and to read the input, we simply acquire a lock on the output Mutex and return a // copy. Why do we have to copy it? Because, as mentioned above, Rust won't let us // retain access to the interior of the Mutex unless we have possession of its lock. There // are ways and circumstances in which one can avoid this (e.g. by using atomic types) but // a copy is a perfectly reasonable solution as well, and a lot easier to reason about :) *self.output.lock().unwrap() } } /// This function is fairly straightforward. We create the integrator, set its input function k(t) /// to 2pi * f * t, and then wait as described in the Rosetta stone problem. fn integrate() -> f64 { let object = Integrator::new(10); object .input(Box::new(|t: u32| { let two_seconds_ms = 2 * 1000; let f = 1. / f64::from(two_seconds_ms); (2. * PI * f * f64::from(t)).sin() })) .expect("Failed to set input"); thread::sleep(Duration::from_secs(2)); object.input(Box::new(|_| 0.)).expect("Failed to set input"); thread::sleep(Duration::from_millis(500)); object.output() } fn main() { println!("{}", integrate()); } /// Will fail on a heavily loaded machine #[test] #[ignore] fn solution() { // We should just be able to call integrate, but can't represent the closure properly due to // rust-lang/rust issue #17060 if we make frequency or period a variable. // FIXME(pythonesque): When unboxed closures are fixed, fix integrate() to take two arguments. let object = Integrator::new(10); object .input(Box::new(|t: u32| { let two_seconds_ms = 2 * 1000; let f = 1. / (two_seconds_ms / 10) as f64; (2. * PI * f * t as f64).sin() })) .expect("Failed to set input"); thread::sleep(Duration::from_millis(200)); object.input(Box::new(|_| 0.)).expect("Failed to set input"); thread::sleep(Duration::from_millis(100)); assert_eq!(object.output() as u32, 0) }