Future主要用于处理flutter中的异步输入事件, 通过指定不同的初始化构造函数将异步任务转化为同步执行, 提交到当前Isolate的microTask队列或eventTask队列
此外还提供了遍历的方法对future进行管理, 错捕捉和拦截,future遍历, 合并, 同时还可以转换为更为灵活的Stream处理。在实际使用中应特殊注意异常的捕捉。它和Timer,Zone, Isolate密切相关.
Future执行顺序
根据执行效率来分, Future有3种创建方法, 它们的优先级按如下先后顺序
- Future.sync: 绝对同步指令, 在当前函数栈中同步执行, 优先级最高
- Future.microTask/Future.value: 优先级相同, FIFO
- Future(()=> { }): 异步创建, 优先级最低
顺序补充说明, 其实在理解上面 1, 2, 3之后就很容推导了
- 同优先级别按照先后顺序执行
- 同优先级内层嵌套Future优先级比外层底
- Future.sync后接then, 它的优先级降级为
mircoTask 级别, 其它的Future创建then的优先级沿用创建时的优先级。
以下是测试代码:
void main() {
print('start ....');
Future.microtask(() => print('microtask0'));
//通过Future.value的方式直接返回一个完成的Future,它直接插入到micoktask中
Future.value('feature 0').then((value) => print('feature0->1')).then((value) => print('feature0->2'));
//插入到当前Isolate EventLoop的EventQueue中
Future(()=> print('feature 1'));
//同步执行插入到当前Isolate EventLoop microTask队列中
Future.microtask(() => print('microtask2'));
//Future.sync为同步执行指令,和函数内的同步方法具有相同优先级
final syncFuture = Future.sync(() => print('sync feature2'));
syncFuture.then((value) => print('sync then feature3')).then((value) => print('sync then feature4'));
Future.microtask(() => print('microtask4'));
//顺序执行,插入到EventQueue中,执行顺序 `feature4` -> `feature5` -> feature6`,小于microTask
final future4 = Future(() => print('feature4'));
final microtask = Future.microtask(() => print('microtask5'));
microtask.then((value) => print('microtask5-0'));
future4.then((value) => print('feature5')).then((value) => print('feature6'));
Future.microtask(() => print('microtask6'));
//同步等待 `feature7` 和 `feature8` 执行完毕后再执行后面的 `feature9`
Future.wait([Future(() => print('feature7')), Future(() => print('feature8'))]).then((value) => print('feature 9'));
//同层级的Future执行按先进先出的规则,嵌套的内层feature11,feature12的future要慢于外层 `feature15`
Future((){
print('feature 10');
Future.microtask(() => 'feature 10 mictask');
Future(() => print('feature11')).then((value) => print('feature 12')); //内层feature11,feature12的future要慢于外层 `feature15`
}).then((value) => print('feature 13'));
Future(() => print('feature 15'));
Future.microtask(() => print('microtask15')).then((value) => print('microtask15 then'));
Future.microtask(() => print('microtask16'));
print('end ....');
//Log输出
/**
start ....
sync feature2
end ....
sync feature3
sync feature4
microtask1
microtask1 then
microtask2 then
microtask2
feature 1
feature4
feature5
feature6
feature7
feature8
feature 9
feature 10
*/
}
相关的类介绍
FutureOr<T> //用于描述一个Future对象,返回Future和Future的值
//T可以是任意的数据类型
Comparable (dart.core)
BigInt (dart.core) //大整数,hexacdemical literal
DateTime (dart.core)
Duration (dart.core)
num (dart.core)
double (dart.core)
int (dart.core)
String (dart.core)
````
``` dart
Future (dart.async)
_Future (dart.async) //一个延迟计算的对象,它是一个抽象类
SynchronousFuture (synchronous_future.dart) //提供一个同步执行的futrue
TickerFuture (ticker.dart) //提供一个间隔执行的Future对象,动画vsyn信号回调专用
DelegatingFuture (future.dart) //对 `_Future` 类的包装,简化用户调用接口
ResultFuture (future.dart) //用于提供Future value,当future完成时可以同步获取到它对应的value
Future源码解读
abstract class Future<T> {
//在当前函数栈中同步创建一个带有数据的Future,访问时会被提交到microTaskQueue中
_Future.value(T value) : this.zoneValue(value, Zone.current);
//计算Future执行的命令,如果有嵌套则会异步全部执行完毕
factory Future(FutureOr<T> computation()) {
Timer.run(() { //通过Timer插入到EventLoop的EventQueue中,在CPU分配的时间片内执行本次任务
result._complete(computation());//串联所有的Future,之后计算出最后的结果
//直接将计算任务添加到当前Isolate的microTaskQueue中
factory Future.microtask(FutureOr<T> computation()) {
scheduleMicrotask(() {
//在本次调度的时间片内,完成计算操作
result._complete(computation());
//同步执行,直接采用当前函数分配的时间片执行
factory Future.sync(FutureOr<T> computation()) { ...
var result = computation(); ...
//提交任务到microTask完成和value绑定
@pragma("vm:entry-point")
factory Future.value([FutureOr<T> value]) {
return new _Future<T>.immediate(value);
//提交一个错误的值到 microTaskQueue中
factory Future.error(Object error, [StackTrace stackTrace]) { ...
return new _Future<T>.immediateError(error, stackTrace);
//提交一延迟执行的timer source到 EventLoop的EventQueue,
factory Future.delayed(Duration duration, [FutureOr<T> computation()]) { ...
new Timer(duration, () {
//合并多个Future事件,任何一个错误会导致整个list future抛出异常,通过指定cleanUp可以挽回部分成功的数据
static Future<List<T>> wait<T>(Iterable<Future<T>> futures,
{bool eagerError: false, void cleanUp(T successValue)}) { ...
handleError(Object theError, StackTrace theStackTrace) {
cleanUp(value); ...
//遍历合并数据
for (var future in futures) {
int pos = remaining;
future.then((T value) { ...
result._completeWithValue(values);
/**
* Returns the result of the first future in [futures] to complete.
*
* The returned future is completed with the result of the first
* future in [futures] to report that it is complete,
* whether it's with a value or an error.
* The results of all the other futures are discarded.
*
* If [futures] is empty, or if none of its futures complete,
* the returned future never completes.
*/
//返回第一个future的执行结果
static Future<T> any<T>(Iterable<Future<T>> futures) {
var completer = new Completer<T>.sync();
var onValue = (T value) {
if (!completer.isCompleted) completer.complete(value);
};
var onError = (error, StackTrace stack) {
if (!completer.isCompleted) completer.completeError(error, stack);
};
for (var future in futures) {
future.then(onValue, onError: onError);
}
return completer.future;
}
//遍历执行 `elements` ,可以支持同步和异步
static Future forEach<T>(Iterable<T> elements, FutureOr action(T element)) { ...
//配合forEach对elements进行遍历,并执行
static Future doWhile(FutureOr<bool> action()) {
_Future doneSignal = new _Future();
void Function(bool) nextIteration;
nextIteration = Zone.current.bindUnaryCallbackGuarded((bool keepGoing) {
while (keepGoing) {
FutureOr<bool> result;
try {
result = action(); //具体实现暴露给开发者,对数组类的元素进行转换
//callbak注册 订阅成功/失败/完成的几件
Future<R> then<R>(FutureOr<R> onValue(T value), {Function onError});
Future<T> catchError(Function onError, {bool test(Object error)});
Future<T> whenComplete(FutureOr action());
//转化为流事件
Stream<T> asStream();
//注册timeout事件,通常用于对突发异常进行retry操作
Future<T> timeout(Duration timeLimit, {FutureOr<T> onTimeout()});
}
_Future
它是对 Future 的具体实现, 对future输出的控制
class _Future<T> implements Future<T> { ...
//状态标志位,代表了当前 `_future` 执行的状态
static const int _stateIncomplete = 0;
static const int _statePendingComplete = 1;
static const int _stateChained = 2;
static const int _stateValue = 4;
static const int _stateError = 8;
//初始化当前future的状态,当有订阅时开始变更
int _state = _stateIncomplete;
//当前Future执行的zone,用于提交then的microTask事件,绑定success和error事件
final Zone _zone;
//存储当前的result或listeners,中间存储值
@pragma("vm:entry-point")
var _resultOrListeners;
static List<Function> _continuationFunctions(_Future<Object> future) {
List<Function> result = null;
while (true) {
//当pendding的时候才能添加future的listener
if (future._mayAddListener) return result; ...
//遍历执行listenr的绑定操作,Future每次listener操作完成之后返回一个新的Future用于串联future
(result ??= <Function>[]).add(listener.handleValue);
future = listener.result;
Future<R> then<R>(FutureOr<R> f(T value), {Function onError}) { ...
//绑定成功的订阅事件到CurrentZone
f = currentZone.registerUnaryCallback<FutureOr<R>, T>(f);
//选择性绑定onError事件到CurrentZone
onError = _registerErrorHandler(onError, currentZone);
}
//添加listener,每个listen持有了 then注册的回掉函数,并且在执行完成之后会返回一个新的Future,这也是为什么then能够连续串联好多个,由于 `f` 绑定的是同一个zone,所以他们执行在同一个zone下
_Future<R> result = new _Future<R>();
_addListener(new _FutureListener<T, R>.then(result, f, onError)); ...
/// Registers a system created result and error continuation.
///
/// Used by the implementation of `await` to listen to a future.
/// The system created liseners are not registered in the zone,
/// and the listener is marked as being from an `await` .
/// This marker is used in [_continuationFunctions].
//注册处一个系统回调事件,返回result由 `await` 关键字决定, `await future` 就相当于执行这个方法
Future<E> _thenAwait<E>(FutureOr<E> f(T value), Function onError) {
_Future<E> result = new _Future<E>();
_addListener(new _FutureListener<T, E>.thenAwait(result, f, onError));
return result;
}
Future<T> catchError(Function onError, {bool test(error)}) {
Future<T> whenComplete(dynamic action()) {
//转换成一个future对象
Stream<T> asStream() => new Stream<T>.fromFuture(this);
//添加listener对象
void _addListener(_FutureListener listener) {...
_zone.scheduleMicrotask(() { ...
//反转链表,确保先添加的先执行FIFO
_FutureListener _reverseListeners(_FutureListener listeners) { ...
//传递future用
static void _chainForeignFuture(Future source, _Future target) { ...
static void _chainCoreFuture(_Future source, _Future target) { ...
void _complete(FutureOr<T> value) {
/**
* Propagates the value/error of [source] to its [listeners], executing the
* listeners' callbacks.
*/
//在future完成时遍历listener发送事件给订阅者
static void _propagateToListeners(_Future source, _FutureListener listeners) {
while (true) { ...
assert(source._isComplete); ...
while (listeners._nextListener != null) { ...
void handleValueCallback() { ...
listenerValueOrError = listener.handleValue(sourceResult);
void handleError() {
listenerValueOrError = listener.handleError(asyncError);
//注册timeout订阅事件
Future<T> timeout(Duration timeLimit, {FutureOr<T> onTimeout()}) { ...
timer = new Timer(timeLimit, () { ...
result._complete(zone.run(onTimeout)); ..
result._completeError(e, s);
//自动实现了它的链式订阅
this.then((T v) { ...
result._completeWithValue(v);
result._completeError(e, s);
SynchronousFuture
从名字上看, 它是一个同步执行的future
class SynchronousFuture<T> implements Future<T> { ...
@override
Future<T> catchError(Function onError, { bool test(Object error) }) => Completer<T>().future;
//f函数执行执行,没有绑定zone的复杂流程,Future.sync
@override
Future<E> then<E>(FutureOr<E> f(T value), { Function onError }) {...
final dynamic result = f(_value);
@override
Future<T> whenComplete(FutureOr<dynamic> action()) { ...
}
DelegatingFuture
代理Future, ResultFuture继承于它, 提供了 Future直接访问reuslt的能力, 具体实现在 Result 类中
class DelegatingFuture<T> implements Future<T> { ...
DelegatingFuture(this._future);
@override
Stream<T> asStream() => _future.asStream(); ...
@override
Future<T> catchError(Function onError, {bool Function(Object error) test}) => ...
@override
Future<S> then<S>(FutureOr<S> Function(T) onValue, {Function onError}) => ...
@override
Future<T> whenComplete(FutureOr Function() action) => ...
@override
Future<T> timeout(Duration timeLimit, {FutureOr<T> Function() onTimeout}) ...
class ResultFuture<T> extends DelegatingFuture<T> {
bool get isComplete => result != null;
Result<T> get result => _result;
Result<T> _result;
ResultFuture(Future<T> future) : super(future) {
Result.capture(future).then((result) {
_result = result;
});
}
}
abstract class Result<T> { ....
final T value; ...
static Future<Result<T>> capture<T>(Future<T> future) {
return future.then((value) => ValueResult(value),
onError: (error, StackTrace stackTrace) =>
ErrorResult(error, stackTrace));
}
Completer
用来包装future, 避免then嵌套, 优化代码结构
Completer (dart.async)
_Completer (dart.async)
_AsyncCompleter (dart.async)
_SyncCompleter (dart.async)
与FutureBuilder的结合使用
FutureBuilder用于实现一个异步构建的widget, 根据当前异步事件执行的状态显示不同的widget, 在实际工作中会经常用到, 它的本质就是将 需要传递的数据分成不同的的阶段获取, 然后再拿到每个阶段的数据后调用setState去触发Widget的build, 实际的widget封装在它的构造然后中,
下面是一个简易的FutureBuider, 只是为了理解其中的运作流程, 具体细节暂时忽略
//提供构建的四个阶段的数据,根据实际需要还可以定义得更多。
enum FutureStatus {
initial,
done,
error,
pending,
}
//包装状态和数据,用于构建每个阶段的widget
class FutureSnapshot {
dynamic data;
FutureStatus status;
FutureSnapshot({this.data, this.status});
}
typedef WidgetBuilder = Widget Function(BuildContext, FutureSnapshot);
class CustomFutureBuilder extends StatefulWidget {
final Future future;
final WidgetBuilder builder;
CustomFutureBuilder({this.future, this.builder});
@override
CustomFutureBuilderState createState() => CustomFutureBuilderState();
}
class CustomFutureBuilderState extends State<CustomFutureBuilder> {
FutureSnapshot snapshot;
@override
Widget build(BuildContext context) => widget.builder(context, snapshot);
@override
void initState() {
super.initState();
snapshot = FutureSnapshot(status: FutureStatus.initial, data: null); //初始化阶段,
subscribe();
}
void subscribe() {
widget.future.then((value) {
setState(() {
snapshot = FutureSnapshot(status: FutureStatus.initial, data: value); //数据加载成功阶段
});
}, onError: (error, stackTrace) {
setState(() {
snapshot = FutureSnapshot(
status: FutureStatus.initial,
data: FlutterErrorDetails(exception: error, stack: stackTrace)); //数据加载失败阶段
});
});
snapshot = FutureSnapshot(
status: FutureStatus.initial, data: FlutterErrorDetails()); //等待阶段
}
}
class CustomFutureBuilderDemo extends StatelessWidget {
@override
Widget build(BuildContext context) {
return CustomFutureBuilder( //具体使用.
future: Future.value('xxxxx'),
builder: (contxt, snapShot) {
switch (snapShot.status) {
case FutureStatus.error:
return Text('error');
case FutureStatus.initial:
return Text('initial');
case FutureStatus.done:
return Text('done');
case FutureStatus.pending:
return CircularProgressIndicator();
}
return Container();
});
}
}
上面的例子主要介绍了FutureBuilder的一些基本功能, 在实际的使用中, 还需要考虑到widget的刷新, future的变化, 如何去解除订阅, 重新初始化构建树, 下面是标准的futurebuilder实现。
class _FutureBuilderState<T> extends State<FutureBuilder<T>> {
Object _activeCallbackIdentity; //增加了Callback标志符,来控制future的同步
AsyncSnapshot<T> _snapshot; //同上面的FutureSnapShot一样,包装状态和值
@override
void initState() {
super.initState();
_snapshot = AsyncSnapshot<T>.withData(ConnectionState.none, widget.initialData); //初始化阶段状态和值
_subscribe(); //订阅Future
}
@override
void didUpdateWidget(FutureBuilder<T> oldWidget) {
super.didUpdateWidget(oldWidget);
if (oldWidget.future != widget.future) { //视图更新后对future解绑操作,重新订阅新的future
if (_activeCallbackIdentity != null) {
_unsubscribe();
_snapshot = _snapshot.inState(ConnectionState.none); //重制阶段状态和值
}
_subscribe();
}
}
@override
Widget build(BuildContext context) => widget.builder(context, _snapshot);
@override
void dispose() {
_unsubscribe();
super.dispose();
}
void _subscribe() {
if (widget.future != null) {
final Object callbackIdentity = Object();
_activeCallbackIdentity = callbackIdentity;
widget.future.then<void>((T data) {
if (_activeCallbackIdentity == callbackIdentity) {
setState(() {
_snapshot = AsyncSnapshot<T>.withData(ConnectionState.done, data); //完成阶段状态值
});
}
}, onError: (Object error) {
if (_activeCallbackIdentity == callbackIdentity) {
setState(() {
_snapshot = AsyncSnapshot<T>.withError(ConnectionState.done, error); //错误阶段状态值
});
}
});
_snapshot = _snapshot.inState(ConnectionState.waiting); //等待阶段状态值
}
}
void _unsubscribe() {
_activeCallbackIdentity = null; //控制future和build同步的标志位,类似信号量(不过flutter ui线程为单线程)避免future执行时widget已经被移除了.
}
}
StreamBuilder与Future应用
和Future的设计方式基本一致, 只不过他的状态多一点点, 数据源是持续的,不想future的数据是单次的,StreamBuilder继承了 _StreamBuilderBase , _StreamBuilderBase 用来管理stream流的订阅和更新, StreamBuilder 则更多的是用来实现暴露给用户的接口,状态值的变化, Widget的builder实现.
相比较FutureBuilder主要差异主要在这里,状态值多一些.
class _StreamBuilderBaseState<T, S> extends State<StreamBuilderBase<T, S>> ...
void _subscribe() {
if (widget.stream != null) {
_subscription = widget.stream.listen((T data) {
setState(() {
_summary = widget.afterData(_summary, data);
});
}, onError: (Object error) {
setState(() {
_summary = widget.afterError(_summary, error);
});
}, onDone: () {
setState(() {
_summary = widget.afterDone(_summary);
});
});
_summary = widget.afterConnected(_summary);
}
}
通常而言, 处理简单的网络回调请求一般用 FutureBuider 就能满组需求了,如果是涉及到持续的数据订阅如蓝牙数据, 定位信息等, 可以采用StreamBuilder. 在使用过程中需要特别注意 异步Future/Stream对象变更后, 他所构建的wiget的作用域问题. 因为widget切换会导致他们所依赖的 InheritedWidget 被移除, 如果是项目中使用了Bloc应特别注意。
小结
Future是为适配Isolate中的 microTask和eventTask2而涉及, 包含了常规的数据订阅,输入输出, 异常捕捉, 多个异步数据合并, 通过异步数据顺序执行, 并且提供了便利的Completer对future的使用进行解耦操作, 基于Future的基本功能, 同时也为Stream流和RxData的封装奠定了基石。
|
请发表评论