Tung varning Jag har aldrig använt det här biblioteket förut, och min lågnivåkunskap om några av begreppen är lite... bristfällig. För det mesta läser jag igenom handledningen. Jag är ganska säker på att alla som har gjort asynkarbete kommer att läsa detta och skratta, men det kan vara en användbar utgångspunkt för andra människor. Varning emptor!
Låt oss börja med något lite enklare, som visar hur en Stream
Arbetar. Vi kan konvertera en iterator av Result
s in i en ström:
extern crate futures;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let payloads: Vec<Result<String, ()>> = vec![Ok("a".into()), Ok("b".into())];
let payloads = stream::iter(payloads.into_iter());
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
}
Detta visar oss ett sätt att konsumera strömmen. Vi använder and_then
att göra något med varje nyttolast (här är det bara att skriva ut det) och sedan for_each
för att konvertera Stream
tillbaka till en Future
. Vi kan sedan köra framtiden genom att anropa den konstigt namngivna forget
metod.
Nästa är att knyta Redis-biblioteket i mixen, hantera bara ett meddelande. Sedan get_message()
metoden blockerar måste vi introducera några trådar i mixen. Det är inte en bra idé att utföra stora mängder arbete i den här typen av asynkrona system eftersom allt annat kommer att blockeras. Till exempel:
Om det inte är arrangerat på annat sätt bör det säkerställas att implementering av denna funktion slutförs mycket snabbt .
I en idealisk värld skulle redis-lådan byggas ovanpå ett bibliotek som terminer och exponera allt detta på ett naturligt sätt.
extern crate redis;
extern crate futures;
use std::thread;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");
let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");
let (tx, payloads) = stream::channel();
let redis_thread = thread::spawn(move || {
let msg = pubsub.get_message().expect("Unable to get message");
let payload: Result<String, _> = msg.get_payload();
tx.send(payload).forget();
});
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
redis_thread.join().expect("unable to join to thread");
}
Min förståelse blir suddigare här. I en separat tråd blockerar vi för meddelandet och trycker in det i kanalen när vi får det. Vad jag inte förstår är varför vi måste hålla i trådens handtag. Jag skulle förvänta mig att foo.forget
skulle blockera sig själv och vänta tills strömmen är tom.
I en telnet-anslutning till Redis-servern, skicka detta:
publish rust awesome
Och du kommer att se att det fungerar. Att lägga till utskriftssatser visar att (för mig) foo.forget
uttalandet körs innan tråden skapas.
Flera meddelanden är svårare. Sender
förbrukar sig själv för att förhindra att den genererande sidan kommer för långt före den konsumerande sidan. Detta uppnås genom att returnera en annan framtid från send
! Vi måste skjutsa tillbaka den därifrån för att återanvända den för nästa iteration av slingan:
extern crate redis;
extern crate futures;
use std::thread;
use std::sync::mpsc;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");
let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");
let (tx, payloads) = stream::channel();
let redis_thread = thread::spawn(move || {
let mut tx = tx;
while let Ok(msg) = pubsub.get_message() {
let payload: Result<String, _> = msg.get_payload();
let (next_tx_tx, next_tx_rx) = mpsc::channel();
tx.send(payload).and_then(move |new_tx| {
next_tx_tx.send(new_tx).expect("Unable to send successor channel tx");
futures::finished(())
}).forget();
tx = next_tx_rx.recv().expect("Unable to receive successor channel tx");
}
});
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
redis_thread.join().expect("unable to join to thread");
}
Jag är säker på att det kommer att finnas fler ekosystem för den här typen av interoperation med tiden. Till exempel kan futures-cpupool-lådan förmodligen utökas för att stödja ett liknande användningsfall som detta.