在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
转自:https://www.cnblogs.com/hymenz/p/9334297.html 在许多编程语言里,我们都非常乐于去研究在这个语言中所使用的异步网络编程的框架,比如说Python的 Gevent、asyncio,Nginx 和 OpenResty,Go 等,今年年初我开始接触 Rust,并被其无 GC、内存安全、极小的运行时等特性所吸引,经过一段时间的学习,开始寻找构建实际项目的解决方案,很快 mio、tokio 等框架进入了我的视野,于是开始从更加底层的 mio 出发实验。 可以看到 mio 是一个非常底层的异步编程的框架,这意味着如果我们要在实际的项目开发中使用它时,就不得不从 event loop 开始编写我们的软件,这并不是我们所期望的,于是我们需要一个更高层次抽象的框架,这便是本文要为大家讲述的 tokio。 tokiotokio 是 Rust 中的异步编程框架,它将复杂的异步编程抽象为 Futures、Tasks 和 Executor,并提供了 Timers 等基础设施,下文中我们将一一展开。 运行时模型tokio 是一个基于轮训的模型。比如我们要在 tokio 上调度我们的 task,我们需要为其实现 /// A task that polls a single widget and writes it to STDOUT. pub struct MyTask; impl Future for MyTask { type Item = (); type Error = (); fn poll(&mut self) -> Result<Async<()>, ()> { match poll_widget() { Async::Ready(widget) => { println!("widget={:?}", widget); Ok(Async::Ready(())) } Async::NotReady => { return Ok(Async::NotReady); } } } } 在最简单的情况下,Executor 可能会长这样。(注:这不是真实的实现,只是用来说明概念) pub struct SpinExecutor { tasks: VecDeque<Box<Future<Item = (), Error = ()>>>, } impl SpinExecutor { pub fn spawn<T>(&mut self, task: T) where T: Future<Item = (), Error = ()> + 'static { self.tasks.push_back(Box::new(task)); } pub fn run(&mut self) { while let Some(mut task) = self.tasks.pop_front() { match task.poll().unwrap() { Async::Ready(_) => {} Async::NotReady => { self.tasks.push_back(task); } } } } } Executor 频繁地轮询所有 task,即使某些 task 仍然会以 Futuresfuture 是对一个未来事件的抽象。比如你可以将各种事件抽象为 future:
这里我们举一个例子: extern crate futures; extern crate tokio; extern crate tokio_core; use std::error::Error; use futures::Future; use futures::future::{ok, done}; use tokio_core::reactor::Core; fn my_fn_squared(i: u32) -> Result<u32, Box<Error>> { Ok(i * i) } fn my_fut_squared(i: u32) -> impl Future<Item = u32, Error = Box<Error + 'static>> { ok(i * i) } fn my_fut() -> impl Future<Item = u32, Error = Box<Error + 'static>> { ok(10) } fn main() { let mut reactor = Core::new().unwrap(); let chained_future = my_fut().and_then(|retval| { done(my_fn_squared(retval)).and_then(|retval2| my_fut_squared(retval2)) }); let retval3 = reactor.run(chained_future).unwrap(); println!("{:?}", retval3); } 这里,我们的 TasksTasks 是应用程序的 “逻辑单元”。他们以 Future trait 来表示。一旦 task 完成处理,task 的 future 实现将以值 Tasks 被传递给 Executor,Executor 处理 task 的调度。Executor 通常在一组或一组线程中调度许多 task。task 不得执行计算繁重的逻辑,否则将阻止其他 task 执行。 Tasks 既可以通过实现 Future trait 来实现,也可以通过使用 I/O
所有这些类型都提供了 Tokio 网络类型被一个基于 使用 future API一些帮助使用 future API 的函数包括:
这些函数中的许多都是源于
注意 例如,以下是如何接受连接,从中读取5个字节,然后将5个字节写回 socket 的例子: let server = listener.incoming().for_each(|socket| { println!("accepted socket; addr={:?}", socket.peer_addr().unwrap()); let buf = vec![0; 5]; let connection = io::read_exact(socket, buf) .and_then(|(socket, buf)| { io::write_all(socket, buf) }) .then(|_| Ok(())); // Just discard the socket and buffer // Spawn a new task that processes the socket: tokio::spawn(connection); Ok(()) }) 使用 Poll API当手动实现 Future 时,需要使用基于 Poll 的 API,并且你需要返回 例如,这就是如何为 TcpStream 实现 pub struct ReadExact { state: State, } enum State { Reading { stream: TcpStream, buf: Vec<u8>, pos: usize, }, Empty, } impl Future for ReadExact { type Item = (TcpStream, Vec<u8>); type Error = io::Error; fn poll(&mut self) -> Result<Async<Self::Item>, io::Error> { match self.state { State::Reading { ref mut stream, ref mut buf, ref mut pos } => { while *pos < buf.len() { let n = try_ready!({ stream.poll_read(&mut buf[*pos..]) }); *pos += n; if n == 0 { let err = io::Error::new( io::ErrorKind::UnexpectedEof, "early eof"); return Err(err) } } } State::Empty => panic!("poll a ReadExact after it's done"), } match mem::replace(&mut self.state, State::Empty) { State::Reading { stream, buf, .. } => { Ok(Async::Ready((stream, buf))) } State::Empty => panic!(), } } } 数据报UdpSocket 类型提供了许多方便的方法:
示例#[macro_use] extern crate log; extern crate futures; extern crate pretty_env_logger; extern crate tokio; use futures::future::{done, ok}; use futures::{Future, Stream}; use tokio::io::{self as tio, AsyncRead}; use tokio::net::{TcpListener, TcpStream}; use std::error; use std::fmt; use std::io; fn client_fut(socket: TcpStream) -> impl Future<Item = (), Error = ()> + 'static + Send { futures::lazy(move || match socket.peer_addr() { Ok(peer) => { info!("Tcp connection [{:?}] connected to server", peer); Ok((socket, peer)) } Err(err) => { error!("Fetch peer address failed: {:?}", err); Err(()) } }).and_then(move |(socket, peer)| { let buf = vec![0; 5]; let svc_fut = tio::read_exact(socket, buf) .and_then(|(socket, buf)| { tio::write_all(socket, buf) }) .then(|_| Ok(())); tokio::spawn(svc_fut); ok(()) }) } fn server_fut(listener: TcpListener) -> impl Future<Item = (), Error = ()> + 'static + Send { listener .incoming() .for_each(|socket| { tokio::spawn(client_fut(socket)); Ok(()) }) .map_err(|err| { error!("Accept connection failed: {:?}", err); }) } fn run() -> Result<(), io::Error> { let addr = "127.0.0.1:1234".parse().unwrap(); info!("Listening on {:?}", addr); let listener = TcpListener::bind(&addr)?; let server_fut = server_fut(listener); tokio::run(server_fut); Ok(()) } fn print<T: fmt::Debug, E: error::Error>(result: Result<T, E>) { match result { Ok(any) => info!("Result: {:?}", any), Err(err) => error!("Error: {:?}", err), } } fn init() { pretty_env_logger::init(); } fn main() { init(); print(run()); } Timers在编写基于网络的应用程序时,通常需要根据时间执行操作。
这些用例通过使用 延迟运行代码在这个例子中,我们希望在一段时间后执行任务。为此,我们使用 use tokio::prelude::*; use tokio::timer::Delay; use std::time::{Duration, Instant}; fn main() { let when = Instant::now() + Duration::from_millis(100); let task = Delay::new(when) .and_then(|_| { println!("Hello world!"); Ok(()) }) .map_err(|e| panic!("delay errored; err={:?}", e)); tokio::run(task); } 为长时间运行的操作设置 Timeout在编写健壮的网络应用程序时,确保在合理的时间内完成操作至关重要。在等待来自外部的,不受信任的来源的数据时尤其如此。 该 use tokio::io; use tokio::net::TcpStream; use tokio::prelude::*; use std::time::{Duration, Instant}; fn read_four_bytes(socket: TcpStream) -> Box<Future<Item = (TcpStream, Vec<u8>), Error = ()>> { // The instant at which the read will be aborted if // it has not yet completed. let when = Instant::now() + Duration::from_secs(5); let buf = vec![0; 4]; let fut = io::read_exact(socket, buf) .deadline(when) .map_err(|_| println!("failed to read 4 bytes by deadline")); Box::new(fut) } 周期性运行代码在一个时间间隔内重复运行代码对于在套接字上发送 PING 消息,或经常检查配置文件等情况很有用。
use tokio::prelude::*; use tokio::timer::Interval; use std::time::{Duration, Instant}; fn main() { let task = Interval::new(Instant::now(), Duration::from_millis(100)) .take(10) .for_each(|instant| { println!("fire; instant={:?}", instant); Ok(()) }) .map_err(|e| panic!("interval errored; err={:?}", e)); tokio::run(task); } 计时器的注意事项Tokio 计时器的粒度为 1 毫秒。任何更小的间隔都会向上舍入到最接近的毫秒。定时器在用户域中实现(即不使用操作系统定时器,像 linux 上的 timerfd)。它使用分层散列计时器轮实现,在创建,取消和触发超时时提供有效的恒定时间复杂度。 Tokio 运行时包括每个工作线程一个计时器实例。这意味着,如果运行时启动4个工作线程,则将有4个计时器实例。这在大多数情况下避免了同步,因为当使用计时器时,任务将在位于当前线程上的状态下操作。 也就是说,计时器实现是线程安全的,并支持从任何线程使用。 基本组合器下面是关于 Future 的图表,来自于 Cheatsheet for Futures 。 // Constructing leaf futures fn empty () -> Future<T, E> fn ok (T) -> Future<T, E> fn err (E) -> Future<T, E> fn result(Result<T, E>) -> Future<T, E> // General future constructor fn poll_fn(FnMut(thread_local!(Task)) -> Poll<T, E>) -> Future<T, E> // Mapping futures fn Future::map (Future<T, E>, FnOnce(T) -> U) -> Future<U, E> fn Future::map_err (Future<T, E>, FnOnce(E) -> F) -> Future<T, F> fn Future::from_err(Future<T, Into<E>>) -> Future<T, E> // Chaining (sequencing) futures fn Future::then (Future<T, E>, FnOnce(Result<T, E>) -> IntoFuture<U, F>) -> Future<U, F> fn Future::and_then(Future<T, E>, FnOnce(T) -> IntoFuture<U, E>) -> Future<U, E> fn Future::or_else (Future<T, E>, FnOnce(E) -> IntoFuture<T, F>) -> Future<T, F> fn Future::flatten (Future<Future<T, E>, Into<E>>) -> Future<T, E> // Joining (waiting) futures fn Future::join (Future<T, E>, IntoFuture<U, E>) -> Future<(T, U), E> fn Future::join3(Future<T, E>, IntoFuture<U, E>, IntoFuture<V, E>) -> Future<(T, U, V), E> fn Future::join4(Future<T, E>, IntoFuture<U, E>, IntoFuture<V, E>, IntoFuture<W, E>) -> Future<(T, U, V, W), E> fn Future::join5(Future<T, E>, IntoFuture<U, E>, IntoFuture<V, E>, IntoFuture<W, E>, IntoFuture<X, E>) -> Future<(T, U, V, W, X), E> fn join_all (IntoIterator<IntoFuture<T, E>>) -> Future<Vec<T>, E> // Selecting (racing) futures fn Future::select (Future<T, E>, IntoFuture<T, E>) -> Future<(T, Future<T, E>), (E, Future<T, E>)> fn Future::select2(Future<T, E>, IntoFuture<U, F>) -> Future<Either<(T, Future<U, F>), (U, Future<T, E>)>, Either<(E, Future<U, F>), (F, Future<T, E>)>> fn select_all (IntoIterator<IntoFuture<T, E>>) -> Future<(T, usize, Vec<Future<T, E>>), (E, usize, Vec<Future<T, E>>)> fn select_ok (IntoIterator<IntoFuture<T, E>>) -> Future<(T, Vec<Future<T, E>>), E> // Utility fn lazy (FnOnce() -> IntoFuture<T, E>) -> Future<T, E> fn loop_fn (S, FnMut(S) -> IntoFuture<Loop<T, S>, E>) -> Future<T, E> // Miscellaneous fn Future::into_stream (Future<T, E>) -> Stream<T, E> fn Future::flatten_stream(Future<Stream<T, E>, E>) -> Stream<T, E> fn Future::fuse (Future<T, E>) -> Future<T, E> fn Future::catch_unwind (Future<T, E>+UnwindSafe) -> Future<Result<T, E>, Any+Send> fn Future::shared (Future<T, E>) -> Future<SharedItem<T>, SharedError<E>>+Clone fn Future::wait (Future<T, E>) -> Result<T, E> 这部分的内容推荐参考这篇文章,https://www.jianshu.com/p/5059c403a335。 本文不再赘述。 返回 futures在使用 futures 时,您可能需要做的第一件事就是返回一个
Trait 对象首先,您始终可以选择返回一个 boxed fn foo() -> Box<Future<Item = u32, Error = io::Error>> { // ... } 这个策略的好处是它很容易写出来并且易于创建。 这种方法的缺点是,在构建 future 时需要运行时分配,在使用该 future 时需要动态分派。 通常可以通过仅在您想要返回的 future 链的最后来
|
2023-10-27
2022-08-15
2022-08-17
2022-09-23
2022-08-13
请发表评论