Scheduler 调度器
Scheduler(调度器)负责管理任务的调度和优先级。定义了任务调度的逻辑,包括任务的优先级、任务队列的管理等。是 React 中一个非常重要的组件。
React 并没有将本身的任务优先级调度和 Scheduler 耦合在一起,为了保证其通用性,Scheduler 是作为一个独立的包存在。可以直接安装 Scheduler 包来使用。
TIP
了解事件循环 EventLoop 能更好掌握本章内容,可参考笔者以前写的消息队列与事件循环
举一个例子
在深入到源码之前,先来看看:如果要实现一套任务调度机制,应该怎么做?
任务调度器的本质就是:将任务按照优先级进行排序,然后按照优先级依次执行任务。
流程示意图如下所示:
schedule
负责创建和 push
任务
push
到 taskQueue
中的任务按照优先级和 id 进行排序
- loop 任务队列所有的任务,并依次执行
关键代码如下:
const taskQueue = [];
let id = 0;
function schedule(execute: () => void, priority: number) {
const task = {
id: id++,
execute,
priority
};
push(task);
console.log("schedule task: ", task);
workLoop();
}
function workLoop() {
while (taskQueue.length) {
const task = peek();
console.log("perform task: ", task);
task.execute();
}
}
function push(task) {
taskQueue.push(task);
taskQueue.sort((a, b) => {
// 先比较优先级,如果优先级相同,再比较 id
const diff = a.priority - b.priority;
return diff !== 0 ? diff : a.id - b.id;
});
}
function peek() {
return taskQueue.shift();
}
最最基础版的 mini-scheduler
就已经实现了,可通过下面的 Demo 体验:
但这个版本有两个最大的问题:
schedule
任务后,就立即开始了 loop 执行(即使同时插入底优,高优任务,也会先执行底优任务),现实中会延后执行
- 单纯的按照
priority
排序,如果期间不断有高优任务插入,低优任务就会一直被阻塞,而被“饿死”
针对这两点,v2 改进的流程示意图如下:
关键代码如下:
const taskQueue = [];
let id = 0;
function schedule(execute: () => void, priority: number) {
// 根据优先级计算出任务的过期时间,并以过期时间作为排序依据
// 即使在插入底优任务后,不断有高优任务插入,也能保证随着时间的推移,底优任务被排在第一位
let timeout;
if (priority === 0) {
timeout = 0;
} else {
timeout = 100;
}
const expirationTime = Date.now() + timeout;
const task = {
id: id++,
execute,
expirationTime
};
push(task);
console.log("schedule task: ", task);
scheduleWorkLoop();
}
// 标识 loop 是否在执行中
let isLoopRunning = false;
function scheduleWorkLoop() {
// 只有在 loop 没有启动时,才需要 schedule
if (!isLoopRunning) {
isLoopRunning = true;
// 暂时用 setTimeout 做延迟处理
setTimeout(() => {
workLoop();
}, 0);
}
}
function workLoop() {
while (taskQueue.length) {
const task = peek();
console.log("perform task: ", task);
task.execute();
}
isLoopRunning = false;
}
function push(task) {
taskQueue.push(task);
taskQueue.sort((a, b) => {
// 先比较过期时间,再比较 id
const diff = a.expirationTime - b.expirationTime;
return diff !== 0 ? diff : a.id - b.id;
});
}
function peek() {
return taskQueue.shift();
}
v2 版本核心链路相对更完善,但也只是达到了 Demo 演示的目的,距离业务可用还相差甚远。
来看看 Scheduler
具体是怎么做的。
Scheduler 流程概览
Scheduler
核心的流程示意图如下:
- 包含两个任务队列
taskQueue
和 timerQueue
, 其中 taskQueue
存储的是需要立即执行的任务,而 timerQueue
存储的是延迟执行的任务,两个队列的优先级排序的功能是一致的
- 因为存在两个优先级队列,那么在调度时,也会存在两种不同的调度方式,
requestHostTimeout
和 requestHostCallback
,但最终入口还是 requestHostCallback
- 在 loop 执行任务时,并不是 Demo 示例一般一直到任务清空,而是设置了一些中断条件,以便让出主线程,让浏览器可以及时响应交互
事件优先级
在 Scheduler
中,任务从高优先级到低优先级分别为:ImmediatePriority
(立即执行)、UserBlockingPriority
(用户阻塞级别)、NormalPriority
(普通优先级)、LowPriority
(低优先级)、IdlePriority
(闲置)。
react/packages/scheduler/src/SchedulerPriorities.js
export type PriorityLevel = 0 | 1 | 2 | 3 | 4 | 5;
export const NoPriority = 0;
export const ImmediatePriority = 1;
export const UserBlockingPriority = 2;
export const NormalPriority = 3;
export const LowPriority = 4;
export const IdlePriority = 5;
在任务开始调度时,会根据不同的优先级设置不同的过期时间:
react/packages/scheduler/src/SchedulerFeatureFlags.js
export const userBlockingPriorityTimeout = 250;
export const normalPriorityTimeout = 5000;
export const lowPriorityTimeout = 10000;
react/packages/scheduler/src/forks/Scheduler.js
var maxSigned31BitInt = 1073741823;
/**
* 任务调度的起点
*/
function unstable_scheduleCallback(
priorityLevel: PriorityLevel,
callback: Callback,
options?: {delay: number},
): Task {
var currentTime = getCurrentTime();
// 如果设置了 delay, 该任务就是延迟执行任务
var startTime;
if (typeof options === 'object' && options !== null) {
var delay = options.delay;
if (typeof delay === 'number' && delay > 0) {
startTime = currentTime + delay;
} else {
startTime = currentTime;
}
} else {
startTime = currentTime;
}
// 根据不同优先级设置过期时间
var timeout;
switch (priorityLevel) {
case ImmediatePriority:
// Times out immediately
timeout = -1;
break;
case UserBlockingPriority:
// Eventually times out
timeout = userBlockingPriorityTimeout;
break;
case IdlePriority:
// Never times out
// Math.pow(2, 30) - 1,一个超大的数,可以认为没有超时时间
timeout = maxSigned31BitInt;
break;
case LowPriority:
// Eventually times out
timeout = lowPriorityTimeout;
break;
case NormalPriority:
default:
// Eventually times out
timeout = normalPriorityTimeout;
break;
}
var expirationTime = startTime + timeout;
var newTask: Task = {
id: taskIdCounter++,
// callback 是实际执行的任务
callback,
priorityLevel,
startTime,
expirationTime,
sortIndex: -1,
};
if (startTime > currentTime) {
// This is a delayed task.
// 延迟执行任务的排序依据是任务开始的时间
newTask.sortIndex = startTime;
// 插入到延迟队列中
push(timerQueue, newTask);
// 获取最高优的立即执行任务,如果当前没有立即执行任务,并且当前的任务是最高优的延迟任务
// 就通过 timeout 去延后调度
if (peek(taskQueue) === null && newTask === peek(timerQueue)) {
// All tasks are delayed, and this is the task with the earliest delay.
if (isHostTimeoutScheduled) {
// Cancel an existing timeout.
cancelHostTimeout();
} else {
isHostTimeoutScheduled = true;
}
// Schedule a timeout.
// 以最高优延迟任务的开始时间作为 delay 的依据,当到期后,该延迟任务也就变成立即执行任务
// 会被放到 taskQueue 中,然后真正被执行
requestHostTimeout(handleTimeout, startTime - currentTime);
}
} else {
// 立即执行任务的排序依据是任务过期的时间
newTask.sortIndex = expirationTime;
// 插入到立即执行队列中
push(taskQueue, newTask);
// Schedule a host callback, if needed. If we're already performing work,
// wait until the next time we yield.
if (!isHostCallbackScheduled && !isPerformingWork) {
isHostCallbackScheduled = true;
requestHostCallback();
}
}
return newTask;
}
小顶堆(优先级队列的实现)
前面 Demo 中,我们直接使用原生的 sort
方法来进行排序,由于它取决于具体实现,无法保证排序的时间和空间复杂度。
优先级队列的使用场景是:
- 数组是动态的,每次需要找最高优任务,找到之后需要从数组里删除这个任务
- 不断有新的任务插入数组
Scheduler
采用小顶堆来实现优先级队列的排序。小顶堆的特点:
- 是一棵完全的二叉树,即:除最后一层外,其它层的节点都是满的,且最后一层靠左排列
- 每一个节点的值都小于其子树节点的每一项
并且下标满足如下规律:
- 根据子节点下标推算父节点下标:
parentIndex = (childIndex - 1) >>> 1
- 根据父节点下标推算子节点下标:
leftIndex = (index +1 )*2 - 1
rightIndex = leftIndex + 1
小顶堆包含三个方法:
push
:向堆中推入数据
pop
:从堆顶取出数据
peek
:获取堆顶节点,即“排序依据最小的值”的节点
react/packages/scheduler/src/SchedulerMinHeap.js
export function push<T: Node>(heap: Heap<T>, node: T): void {
const index = heap.length;
heap.push(node);
// 先把新元素插入数组尾部,然后从下往上调整最小堆:
siftUp(heap, node, index);
}
export function peek<T: Node>(heap: Heap<T>): T | null {
return heap.length === 0 ? null : heap[0];
}
export function pop<T: Node>(heap: Heap<T>): T | null {
if (heap.length === 0) {
return null;
}
const first = heap[0];
const last = heap.pop();
if (last !== first) {
// $FlowFixMe[incompatible-type]
heap[0] = last;
// $FlowFixMe[incompatible-call]
siftDown(heap, last, 0);
}
return first;
}
/**
* 调整尾部元素以及尾部元素的祖先,一直往上调整,直到不再需要调整为止
*/
function siftUp<T: Node>(heap: Heap<T>, node: T, i: number): void {
let index = i;
while (index > 0) {
const parentIndex = (index - 1) >>> 1;
const parent = heap[parentIndex];
if (compare(parent, node) > 0) {
// The parent is larger. Swap positions.
heap[parentIndex] = node;
heap[index] = parent;
index = parentIndex;
} else {
// The parent is smaller. Exit.
return;
}
}
}
/**
* 检查每个子堆的结构,确保最小值在父节点,不满足就交换父节点与最小的子节点
*/
function siftDown<T: Node>(heap: Heap<T>, node: T, i: number): void {
let index = i;
const length = heap.length;
const halfLength = length >>> 1;
while (index < halfLength) {
const leftIndex = (index + 1) * 2 - 1;
const left = heap[leftIndex];
const rightIndex = leftIndex + 1;
const right = heap[rightIndex];
// If the left or right node is smaller, swap with the smaller of those.
if (compare(left, node) < 0) {
if (rightIndex < length && compare(right, left) < 0) {
heap[index] = right;
heap[rightIndex] = node;
index = rightIndex;
} else {
heap[index] = left;
heap[leftIndex] = node;
index = leftIndex;
}
} else if (rightIndex < length && compare(right, node) < 0) {
heap[index] = right;
heap[rightIndex] = node;
index = rightIndex;
} else {
// Neither child is smaller. Exit.
return;
}
}
}
function compare(a: Node, b: Node) {
// Compare sort index first, then task id.
const diff = a.sortIndex - b.sortIndex;
return diff !== 0 ? diff : a.id - b.id;
}
push
和 pop
都涉及到堆化操作,需要在插入和删除时重新构造小顶堆,时间复杂度为 O(log N)
peek
则是直接取小顶堆的堆顶节点,时间复杂度为 O(1)
宏任务的选择
浏览器会在宏任务的间隙执行 Layout,Paint,响应用户的交互。而微任务(比如 Promise
,queueMicrotask
) 是在当前宏任务结束之前执行,会阻塞用户的交互。
延迟任务的目的除了延迟批量执行 task 外,还希望不要阻塞当前用户的交互行为。所以延迟调度的最好选择是通过宏任务调度,并且希望执行时机越早越好,且调用频次足够的高,以保证任务尽快执行。
宏任务的选择通常有:setTimeout
,requestIdleCalback
,requestAnimationFrame
,MessageChannel
等。
setTimeout
存在最小时延 4 ms 的限制,若嵌套层级超 5 层且 timeout 小于 4ms 则设为 4ms。
function scheduleTimeout() {
console.log(Date.now());
setTimeout(scheduleTimeout, 0);
}
scheduleTimeout();
- 按照 60 fps 流畅的帧率来计算,一帧大概只有 16.67 ms(1000/60),再减去页面渲染所需要的时间,留给开发者的时间就很少了,如果使用
setTimeout
会浪费掉宝贵的执行时间。
requestIdleCalback
的初衷是:能够在 EventLoop 中执行低优先级任务,减少对动画,交互等高优任务的影响。所以它主要的用来调度底优任务,且调用频率不可控,而 Scheduler
需要调度各种优先级的任务。
requestAnimationFrame
的执行时机是在下一次 Paint 之前,一般用于更新动画。调用频率是一帧一次,频率很低。
所以综合来看,只有 MessageChannel
满足一帧内可以多次调用,且执行时机足够早,不存在最小时延。
回到源码中,在 scheduleCallback
中,如果不存在任何立即执行任务,只存在延迟任务时,会通过 requestHostTimeout
调度:
react/packages/scheduler/src/forks/Scheduler.js
// 本质上就是 setTimeout 延迟执行,不需要那么高的执行频次,使用 setTimeout 是 ok 的
function requestHostTimeout(
callback: (currentTime: number) => void,
ms: number,
) {
// $FlowFixMe[not-a-function] nullable value
taskTimeoutID = localSetTimeout(() => {
callback(getCurrentTime());
}, ms);
}
function handleTimeout(currentTime: number) {
isHostTimeoutScheduled = false;
// 将所有到期的延迟任务移动到 taskQueue 中
advanceTimers(currentTime);
if (!isHostCallbackScheduled) {
// 延迟后,如果当前存在了立即执行任务,则通过 requestHostCallback 调度
if (peek(taskQueue) !== null) {
isHostCallbackScheduled = true;
requestHostCallback();
} else {
// 否则,继续使用 requestHostTimeout 延后,逻辑和 unstable_scheduleCallback 中的延迟逻辑类似
const firstTimer = peek(timerQueue);
if (firstTimer !== null) {
requestHostTimeout(handleTimeout, firstTimer.startTime - currentTime);
}
}
}
}
/**
* advanceTimers 做的事很简单:
* 把 timerQueue 中所有到执行时间的任务移动到 taskQueue 中
* 并且把排序依据 sortIndex 切换成为 expirationTime
*/
function advanceTimers(currentTime: number) {
// Check for tasks that are no longer delayed and add them to the queue.
let timer = peek(timerQueue);
while (timer !== null) {
if (timer.callback === null) {
// Timer was cancelled.
pop(timerQueue);
} else if (timer.startTime <= currentTime) {
// Timer fired. Transfer to the task queue.
pop(timerQueue);
timer.sortIndex = timer.expirationTime;
push(taskQueue, timer);
if (enableProfiling) {
markTaskStart(timer, currentTime);
timer.isQueued = true;
}
} else {
// Remaining timers are pending.
return;
}
timer = peek(timerQueue);
}
}
不管是延迟任务,还是立即执行任务,最终的调度都会走到 requestHostCallback
,requesHostCallback
本质就是 setImmediate -> MessageChannel -> setTimeout
的降级调用:
react/packages/scheduler/src/forks/Scheduler.js
function requestHostCallback() {
if (!isMessageLoopRunning) {
isMessageLoopRunning = true;
schedulePerformWorkUntilDeadline();
}
}
let schedulePerformWorkUntilDeadline;
// setImmediate 主要是 node 和 老 IE 环境使用,选择的原因和 MessageChannel 是类似的
// 不过不同于 MesagcChamnel,它不会阻止 Nodejs 进程退出
if (typeof localSetImmediate === "function") {
// Node.js and old IE.
// There's a few reasons for why we prefer setImmediate.
//
// Unlike MessageChannel, it doesn't prevent a Node.js process from exiting.
// (Even though this is a DOM fork of the Scheduler, you could get here
// with a mix of Node.js 15+, which has a MessageChannel, and jsdom.)
// https://github.com/facebook/react/issues/20756
//
// But also, it runs earlier which is the semantic we want.
// If other browsers ever implement it, it's better to use it.
// Although both of these would be inferior to native scheduling.
schedulePerformWorkUntilDeadline = () => {
localSetImmediate(performWorkUntilDeadline);
};
} else if (typeof MessageChannel !== "undefined") {
// DOM and Worker environments.
// We prefer MessageChannel because of the 4ms setTimeout clamping.
const channel = new MessageChannel();
const port = channel.port2;
channel.port1.onmessage = performWorkUntilDeadline;
schedulePerformWorkUntilDeadline = () => {
port.postMessage(null);
};
} else {
// We should only fallback here in non-browser environments.
schedulePerformWorkUntilDeadline = () => {
// $FlowFixMe[not-a-function] nullable value
localSetTimeout(performWorkUntilDeadline, 0);
};
}
任务执行
调度完成后,就来到了真正的执行阶段。代码执行链路是 performWorkUntilDeadline -> flushWork -> workLoop
。最关键最复杂的逻辑就在 workLoop
中。
workLoop
和 Demo 示例类似,就是不断把 taskQueue 中的任务取出来执行,只要增加了提前中断和继续调度的场景。
react/packages/scheduler/src/forks/Scheduler.js
function workLoop(initialTime: number) {
let currentTime = initialTime;
advanceTimers(currentTime);
// 取最高优的任务,注意是 peek,而不是 pop
currentTask = peek(taskQueue);
while (
currentTask !== null &&
// 开发调试用,不用关心
!(enableSchedulerDebugging && isSchedulerPaused)
) {
// 中断条件:如果当前任务还到过期时间,但是需要让出主线程,那么就提前中断退出
// 所以,如果当前任务已经过期了,那么即使判断为需要让出主线程,也不会中断,继续执行任务,以保证已经过期的任务能尽快执行完
if (currentTask.expirationTime > currentTime && shouldYieldToHost()) {
// This currentTask hasn't expired, and we've reached the deadline.
break;
}
const callback = currentTask.callback;
// 如果任务被取消或者完成了,callback 会被置为 null, 所以在执行前需要判断当前任务是否有效
if (typeof callback === 'function') {
// 将 callback 先置为 null, 认为该任务会执行完成
currentTask.callback = null;
currentPriorityLevel = currentTask.priorityLevel;
// 判断当前任务是否已经超时了
const didUserCallbackTimeout = currentTask.expirationTime <= currentTime;
if (enableProfiling) {
markTaskRun(currentTask, currentTime);
}
// 和 Demo 的不同点:
// 1. 接收 timeout 作为入参,让调用的任务感知到当前任务是否已经超时,当前任务中的业务逻辑可自行判断是否需要中断任务,
// 比如 React Reconciler 层就会根据此参数判断 Reconciliation 是否需要中断
// 2. 接收一个返回函数,标记当前任务是否还有继续执行的任务,
// 比如上面说的即使任务没有过期, 为了页面的流畅性,任务自行中断了让出主线程
// 那么会返回一个 continuationCallback,标记当前任务还没结束,还需要继续调度
const continuationCallback = callback(didUserCallbackTimeout);
currentTime = getCurrentTime();
if (typeof continuationCallback === 'function') {
// If a continuation is returned, immediately yield to the main thread
// regardless of how much time is left in the current time slice.
// $FlowFixMe[incompatible-use] found when upgrading Flow
currentTask.callback = continuationCallback;
if (enableProfiling) {
// $FlowFixMe[incompatible-call] found when upgrading Flow
markTaskYield(currentTask, currentTime);
}
advanceTimers(currentTime);
// 对于还有继续任务的场景,快速退出,重新调度,这是为了快速让出主线程,影响用户的交互
// true 表示还有任务
return true;
} else {
if (enableProfiling) {
// $FlowFixMe[incompatible-call] found when upgrading Flow
markTaskCompleted(currentTask, currentTime);
// $FlowFixMe[incompatible-use] found when upgrading Flow
currentTask.isQueued = false;
}
// 如果当前任务执行完成了,并且还是最高优的任务(在任务执行中也可能会插入更高优的任务),就把当前任务推出
// 如果不是最高优,就不做处理,因为上面已经在任务执行前将 callback 置为 null,并且在 loop 时做了 check 处理
if (currentTask === peek(taskQueue)) {
pop(taskQueue);
}
advanceTimers(currentTime);
}
} else {
// 说明当前任务已经完成或者被取消,直接 pop 掉即可
pop(taskQueue);
}
currentTask = peek(taskQueue);
}
// 因为 loop 会提前中断,所以需要检测否是还存在任务,存在,返回 true,否则 false。
if (currentTask !== null) {
return true;
} else {
const firstTimer = peek(timerQueue);
if (firstTimer !== null) {
// 如果所有立即执行任务已经完成,但还存在延迟任务,那么再次对延迟任务进行调度
requestHostTimeout(handleTimeout, firstTimer.startTime - currentTime);
}
return false;
}
}
提前中断 yield
来看看 shouldYieldToHost
做了什么:
react/packages/scheduler/src/SchedulerFeatureFlags.js
export const frameYieldMs = 5;
react/packages/scheduler/src/forks/Scheduler.js
let frameInterval = frameYieldMs;
function shouldYieldToHost(): boolean {
const timeElapsed = getCurrentTime() - startTime;
if (timeElapsed < frameInterval) {
// The main thread has only been blocked for a really short amount of time;
// smaller than a single frame. Don't yield yet.
return false;
}
// Yield now.
return true;
}
默认情况下, shouldYieldToHost
就是判断是否已经过了 5 ms, 是的话就返回 true
表明当前需要让出主线程。
所以在 React 项目的性能火焰图中,通常会看到如下的结果(每次宏任务的时长大概在 5ms 左右):
当然,Scheduler
也提供了更改 frameInterval
的方法:
react/packages/scheduler/src/forks/Scheduler.js
function forceFrameRate(fps: number) {
if (fps < 0 || fps > 125) {
// Using console['error'] to evade Babel and ESLint
console['error'](
'forceFrameRate takes a positive int between 0 and 125, ' +
'forcing frame rates higher than 125 fps is not supported',
);
return;
}
if (fps > 0) {
frameInterval = Math.floor(1000 / fps);
} else {
// reset the framerate
frameInterval = frameYieldMs;
}
}
再次调度
workLoop
返回 true
表示还有任务需要执行,但当前 loop 已经结束,那么就需要再次调度剩余的任务:
react/packages/scheduler/src/forks/Scheduler.js
const performWorkUntilDeadline = () => {
if (isMessageLoopRunning) {
const currentTime = getCurrentTime();
// Keep track of the start time so we can measure how long the main thread
// has been blocked.
startTime = currentTime;
// If a scheduler task throws, exit the current browser task so the
// error can be observed.
//
// Intentionally not using a try-catch, since that makes some debugging
// techniques harder. Instead, if `flushWork` errors, then `hasMoreWork` will
// remain true, and we'll continue the work loop.
let hasMoreWork = true;
try {
// flushWork 返回的就是 workLoop 的结果
hasMoreWork = flushWork(currentTime);
} finally {
if (hasMoreWork) {
// 如果还有任务,继续通过 schedulePerformWorkUntilDeadline 调度
schedulePerformWorkUntilDeadline();
} else {
isMessageLoopRunning = false;
}
}
}
};
function flushWork(initialTime: number) {
if (enableProfiling) {
markSchedulerUnsuspended(initialTime);
}
isHostCallbackScheduled = false;
// workLoop 开始任务执行前, 延时任务的 timer 可以取消掉,因为在任务执行过程中,延时任务队列可能还会变,之前的 timer 已经过时了
if (isHostTimeoutScheduled) {
// We scheduled a timeout but it's no longer needed. Cancel it.
isHostTimeoutScheduled = false;
cancelHostTimeout();
}
isPerformingWork = true;
const previousPriorityLevel = currentPriorityLevel;
try {
if (enableProfiling) {
try {
return workLoop(initialTime);
} catch (error) {
if (currentTask !== null) {
const currentTime = getCurrentTime();
// $FlowFixMe[incompatible-call] found when upgrading Flow
markTaskErrored(currentTask, currentTime);
// $FlowFixMe[incompatible-use] found when upgrading Flow
currentTask.isQueued = false;
}
throw error;
}
} else {
// No catch in prod code path.
return workLoop(initialTime);
}
} finally {
currentTask = null;
currentPriorityLevel = previousPriorityLevel;
isPerformingWork = false;
if (enableProfiling) {
const currentTime = getCurrentTime();
markSchedulerSuspended(currentTime);
}
}
}
至此,就是 Scheduler
主链路的流程。