Scheduler 调度器

Scheduler(调度器)负责管理任务的调度和优先级。定义了任务调度的逻辑,包括任务的优先级、任务队列的管理等。是 React 中一个非常重要的组件。

React 并没有将本身的任务优先级调度和 Scheduler 耦合在一起,为了保证其通用性,Scheduler 是作为一个独立的包存在。可以直接安装 Scheduler 包来使用。

TIP

了解事件循环 EventLoop 能更好掌握本章内容,可参考笔者以前写的消息队列与事件循环

举一个例子

在深入到源码之前,先来看看:如果要实现一套任务调度机制,应该怎么做?

任务调度器的本质就是:将任务按照优先级进行排序,然后按照优先级依次执行任务。

流程示意图如下所示:

  1. schedule 负责创建和 push 任务
  2. pushtaskQueue 中的任务按照优先级和 id 进行排序
  3. loop 任务队列所有的任务,并依次执行

关键代码如下:

const taskQueue = []; let id = 0; function schedule(execute: () => void, priority: number) { const task = { id: id++, execute, priority }; push(task); console.log("schedule task: ", task); workLoop(); } function workLoop() { while (taskQueue.length) { const task = peek(); console.log("perform task: ", task); task.execute(); } } function push(task) { taskQueue.push(task); taskQueue.sort((a, b) => { // 先比较优先级,如果优先级相同,再比较 id const diff = a.priority - b.priority; return diff !== 0 ? diff : a.id - b.id; }); } function peek() { return taskQueue.shift(); }

最最基础版的 mini-scheduler 就已经实现了,可通过下面的 Demo 体验:

但这个版本有两个最大的问题:

  1. schedule 任务后,就立即开始了 loop 执行(即使同时插入底优,高优任务,也会先执行底优任务),现实中会延后执行
  2. 单纯的按照 priority 排序,如果期间不断有高优任务插入,低优任务就会一直被阻塞,而被“饿死”

针对这两点,v2 改进的流程示意图如下:

关键代码如下:

const taskQueue = []; let id = 0; function schedule(execute: () => void, priority: number) { // 根据优先级计算出任务的过期时间,并以过期时间作为排序依据 // 即使在插入底优任务后,不断有高优任务插入,也能保证随着时间的推移,底优任务被排在第一位 let timeout; if (priority === 0) { timeout = 0; } else { timeout = 100; } const expirationTime = Date.now() + timeout; const task = { id: id++, execute, expirationTime }; push(task); console.log("schedule task: ", task); scheduleWorkLoop(); } // 标识 loop 是否在执行中 let isLoopRunning = false; function scheduleWorkLoop() { // 只有在 loop 没有启动时,才需要 schedule if (!isLoopRunning) { isLoopRunning = true; // 暂时用 setTimeout 做延迟处理 setTimeout(() => { workLoop(); }, 0); } } function workLoop() { while (taskQueue.length) { const task = peek(); console.log("perform task: ", task); task.execute(); } isLoopRunning = false; } function push(task) { taskQueue.push(task); taskQueue.sort((a, b) => { // 先比较过期时间,再比较 id const diff = a.expirationTime - b.expirationTime; return diff !== 0 ? diff : a.id - b.id; }); } function peek() { return taskQueue.shift(); }

v2 版本核心链路相对更完善,但也只是达到了 Demo 演示的目的,距离业务可用还相差甚远。

来看看 Scheduler 具体是怎么做的。

Scheduler 流程概览

Scheduler 核心的流程示意图如下:

  • 包含两个任务队列 taskQueuetimerQueue, 其中 taskQueue 存储的是需要立即执行的任务,而 timerQueue 存储的是延迟执行的任务,两个队列的优先级排序的功能是一致的
  • 因为存在两个优先级队列,那么在调度时,也会存在两种不同的调度方式,requestHostTimeoutrequestHostCallback,但最终入口还是 requestHostCallback
  • 在 loop 执行任务时,并不是 Demo 示例一般一直到任务清空,而是设置了一些中断条件,以便让出主线程,让浏览器可以及时响应交互

事件优先级

Scheduler 中,任务从高优先级到低优先级分别为:ImmediatePriority(立即执行)、UserBlockingPriority(用户阻塞级别)、NormalPriority(普通优先级)、LowPriority(低优先级)、IdlePriority(闲置)。

react/packages/scheduler/src/SchedulerPriorities.js
export type PriorityLevel = 0 | 1 | 2 | 3 | 4 | 5; export const NoPriority = 0; export const ImmediatePriority = 1; export const UserBlockingPriority = 2; export const NormalPriority = 3; export const LowPriority = 4; export const IdlePriority = 5;

在任务开始调度时,会根据不同的优先级设置不同的过期时间:

react/packages/scheduler/src/SchedulerFeatureFlags.js
export const userBlockingPriorityTimeout = 250; export const normalPriorityTimeout = 5000; export const lowPriorityTimeout = 10000;
react/packages/scheduler/src/forks/Scheduler.js
var maxSigned31BitInt = 1073741823; /** * 任务调度的起点 */ function unstable_scheduleCallback( priorityLevel: PriorityLevel, callback: Callback, options?: {delay: number}, ): Task { var currentTime = getCurrentTime(); // 如果设置了 delay, 该任务就是延迟执行任务 var startTime; if (typeof options === 'object' && options !== null) { var delay = options.delay; if (typeof delay === 'number' && delay > 0) { startTime = currentTime + delay; } else { startTime = currentTime; } } else { startTime = currentTime; } // 根据不同优先级设置过期时间 var timeout; switch (priorityLevel) { case ImmediatePriority: // Times out immediately timeout = -1; break; case UserBlockingPriority: // Eventually times out timeout = userBlockingPriorityTimeout; break; case IdlePriority: // Never times out // Math.pow(2, 30) - 1,一个超大的数,可以认为没有超时时间 timeout = maxSigned31BitInt; break; case LowPriority: // Eventually times out timeout = lowPriorityTimeout; break; case NormalPriority: default: // Eventually times out timeout = normalPriorityTimeout; break; } var expirationTime = startTime + timeout; var newTask: Task = { id: taskIdCounter++, // callback 是实际执行的任务 callback, priorityLevel, startTime, expirationTime, sortIndex: -1, }; if (startTime > currentTime) { // This is a delayed task. // 延迟执行任务的排序依据是任务开始的时间 newTask.sortIndex = startTime; // 插入到延迟队列中 push(timerQueue, newTask); // 获取最高优的立即执行任务,如果当前没有立即执行任务,并且当前的任务是最高优的延迟任务 // 就通过 timeout 去延后调度 if (peek(taskQueue) === null && newTask === peek(timerQueue)) { // All tasks are delayed, and this is the task with the earliest delay. if (isHostTimeoutScheduled) { // Cancel an existing timeout. cancelHostTimeout(); } else { isHostTimeoutScheduled = true; } // Schedule a timeout. // 以最高优延迟任务的开始时间作为 delay 的依据,当到期后,该延迟任务也就变成立即执行任务 // 会被放到 taskQueue 中,然后真正被执行 requestHostTimeout(handleTimeout, startTime - currentTime); } } else { // 立即执行任务的排序依据是任务过期的时间 newTask.sortIndex = expirationTime; // 插入到立即执行队列中 push(taskQueue, newTask); // Schedule a host callback, if needed. If we're already performing work, // wait until the next time we yield. if (!isHostCallbackScheduled && !isPerformingWork) { isHostCallbackScheduled = true; requestHostCallback(); } } return newTask; }

小顶堆(优先级队列的实现)

前面 Demo 中,我们直接使用原生的 sort 方法来进行排序,由于它取决于具体实现,无法保证排序的时间和空间复杂度。

优先级队列的使用场景是:

  1. 数组是动态的,每次需要找最高优任务,找到之后需要从数组里删除这个任务
  2. 不断有新的任务插入数组

Scheduler 采用小顶堆来实现优先级队列的排序。小顶堆的特点:

  1. 是一棵完全的二叉树,即:除最后一层外,其它层的节点都是满的,且最后一层靠左排列
  2. 每一个节点的值都小于其子树节点的每一项

并且下标满足如下规律:

  1. 根据子节点下标推算父节点下标:parentIndex = (childIndex - 1) >>> 1
  2. 根据父节点下标推算子节点下标:
    • leftIndex = (index +1 )*2 - 1
    • rightIndex = leftIndex + 1

小顶堆包含三个方法:

  • push:向堆中推入数据
  • pop:从堆顶取出数据
  • peek:获取堆顶节点,即“排序依据最小的值”的节点
react/packages/scheduler/src/SchedulerMinHeap.js
export function push<T: Node>(heap: Heap<T>, node: T): void { const index = heap.length; heap.push(node); // 先把新元素插入数组尾部,然后从下往上调整最小堆: siftUp(heap, node, index); } export function peek<T: Node>(heap: Heap<T>): T | null { return heap.length === 0 ? null : heap[0]; } export function pop<T: Node>(heap: Heap<T>): T | null { if (heap.length === 0) { return null; } const first = heap[0]; const last = heap.pop(); if (last !== first) { // $FlowFixMe[incompatible-type] heap[0] = last; // $FlowFixMe[incompatible-call] siftDown(heap, last, 0); } return first; } /** * 调整尾部元素以及尾部元素的祖先,一直往上调整,直到不再需要调整为止 */ function siftUp<T: Node>(heap: Heap<T>, node: T, i: number): void { let index = i; while (index > 0) { const parentIndex = (index - 1) >>> 1; const parent = heap[parentIndex]; if (compare(parent, node) > 0) { // The parent is larger. Swap positions. heap[parentIndex] = node; heap[index] = parent; index = parentIndex; } else { // The parent is smaller. Exit. return; } } } /** * 检查每个子堆的结构,确保最小值在父节点,不满足就交换父节点与最小的子节点 */ function siftDown<T: Node>(heap: Heap<T>, node: T, i: number): void { let index = i; const length = heap.length; const halfLength = length >>> 1; while (index < halfLength) { const leftIndex = (index + 1) * 2 - 1; const left = heap[leftIndex]; const rightIndex = leftIndex + 1; const right = heap[rightIndex]; // If the left or right node is smaller, swap with the smaller of those. if (compare(left, node) < 0) { if (rightIndex < length && compare(right, left) < 0) { heap[index] = right; heap[rightIndex] = node; index = rightIndex; } else { heap[index] = left; heap[leftIndex] = node; index = leftIndex; } } else if (rightIndex < length && compare(right, node) < 0) { heap[index] = right; heap[rightIndex] = node; index = rightIndex; } else { // Neither child is smaller. Exit. return; } } } function compare(a: Node, b: Node) { // Compare sort index first, then task id. const diff = a.sortIndex - b.sortIndex; return diff !== 0 ? diff : a.id - b.id; }
  • pushpop 都涉及到堆化操作,需要在插入和删除时重新构造小顶堆,时间复杂度为 O(log N)
  • peek 则是直接取小顶堆的堆顶节点,时间复杂度为 O(1)
INFO

对算法感兴趣的同学,可以看看:

宏任务的选择

浏览器会在宏任务的间隙执行 Layout,Paint,响应用户的交互。而微任务(比如 PromisequeueMicrotask ) 是在当前宏任务结束之前执行,会阻塞用户的交互。

延迟任务的目的除了延迟批量执行 task 外,还希望不要阻塞当前用户的交互行为。所以延迟调度的最好选择是通过宏任务调度,并且希望执行时机越早越好,且调用频次足够的高,以保证任务尽快执行。

宏任务的选择通常有:setTimeoutrequestIdleCalbackrequestAnimationFrameMessageChannel等。

setTimeout 存在最小时延 4 ms 的限制,若嵌套层级超 5 层且 timeout 小于 4ms 则设为 4ms。

function scheduleTimeout() { console.log(Date.now()); setTimeout(scheduleTimeout, 0); } scheduleTimeout();

  • 按照 60 fps 流畅的帧率来计算,一帧大概只有 16.67 ms(1000/60),再减去页面渲染所需要的时间,留给开发者的时间就很少了,如果使用 setTimeout 会浪费掉宝贵的执行时间。
  • requestIdleCalback 的初衷是:能够在 EventLoop 中执行低优先级任务,减少对动画,交互等高优任务的影响。所以它主要的用来调度底优任务,且调用频率不可控,而 Scheduler 需要调度各种优先级的任务。
  • requestAnimationFrame 的执行时机是在下一次 Paint 之前,一般用于更新动画。调用频率是一帧一次,频率很低。

所以综合来看,只有 MessageChannel 满足一帧内可以多次调用,且执行时机足够早,不存在最小时延。

回到源码中,在 scheduleCallback 中,如果不存在任何立即执行任务,只存在延迟任务时,会通过 requestHostTimeout 调度:

react/packages/scheduler/src/forks/Scheduler.js
// 本质上就是 setTimeout 延迟执行,不需要那么高的执行频次,使用 setTimeout 是 ok 的 function requestHostTimeout( callback: (currentTime: number) => void, ms: number, ) { // $FlowFixMe[not-a-function] nullable value taskTimeoutID = localSetTimeout(() => { callback(getCurrentTime()); }, ms); } function handleTimeout(currentTime: number) { isHostTimeoutScheduled = false; // 将所有到期的延迟任务移动到 taskQueue 中 advanceTimers(currentTime); if (!isHostCallbackScheduled) { // 延迟后,如果当前存在了立即执行任务,则通过 requestHostCallback 调度 if (peek(taskQueue) !== null) { isHostCallbackScheduled = true; requestHostCallback(); } else { // 否则,继续使用 requestHostTimeout 延后,逻辑和 unstable_scheduleCallback 中的延迟逻辑类似 const firstTimer = peek(timerQueue); if (firstTimer !== null) { requestHostTimeout(handleTimeout, firstTimer.startTime - currentTime); } } } } /** * advanceTimers 做的事很简单: * 把 timerQueue 中所有到执行时间的任务移动到 taskQueue 中 * 并且把排序依据 sortIndex 切换成为 expirationTime */ function advanceTimers(currentTime: number) { // Check for tasks that are no longer delayed and add them to the queue. let timer = peek(timerQueue); while (timer !== null) { if (timer.callback === null) { // Timer was cancelled. pop(timerQueue); } else if (timer.startTime <= currentTime) { // Timer fired. Transfer to the task queue. pop(timerQueue); timer.sortIndex = timer.expirationTime; push(taskQueue, timer); if (enableProfiling) { markTaskStart(timer, currentTime); timer.isQueued = true; } } else { // Remaining timers are pending. return; } timer = peek(timerQueue); } }

不管是延迟任务,还是立即执行任务,最终的调度都会走到 requestHostCallbackrequesHostCallback 本质就是 setImmediate -> MessageChannel -> setTimeout 的降级调用:

react/packages/scheduler/src/forks/Scheduler.js
function requestHostCallback() { if (!isMessageLoopRunning) { isMessageLoopRunning = true; schedulePerformWorkUntilDeadline(); } } let schedulePerformWorkUntilDeadline; // setImmediate 主要是 node 和 老 IE 环境使用,选择的原因和 MessageChannel 是类似的 // 不过不同于 MesagcChamnel,它不会阻止 Nodejs 进程退出 if (typeof localSetImmediate === "function") { // Node.js and old IE. // There's a few reasons for why we prefer setImmediate. // // Unlike MessageChannel, it doesn't prevent a Node.js process from exiting. // (Even though this is a DOM fork of the Scheduler, you could get here // with a mix of Node.js 15+, which has a MessageChannel, and jsdom.) // https://github.com/facebook/react/issues/20756 // // But also, it runs earlier which is the semantic we want. // If other browsers ever implement it, it's better to use it. // Although both of these would be inferior to native scheduling. schedulePerformWorkUntilDeadline = () => { localSetImmediate(performWorkUntilDeadline); }; } else if (typeof MessageChannel !== "undefined") { // DOM and Worker environments. // We prefer MessageChannel because of the 4ms setTimeout clamping. const channel = new MessageChannel(); const port = channel.port2; channel.port1.onmessage = performWorkUntilDeadline; schedulePerformWorkUntilDeadline = () => { port.postMessage(null); }; } else { // We should only fallback here in non-browser environments. schedulePerformWorkUntilDeadline = () => { // $FlowFixMe[not-a-function] nullable value localSetTimeout(performWorkUntilDeadline, 0); }; }

任务执行

调度完成后,就来到了真正的执行阶段。代码执行链路是 performWorkUntilDeadline -> flushWork -> workLoop。最关键最复杂的逻辑就在 workLoop 中。

workLoop 和 Demo 示例类似,就是不断把 taskQueue 中的任务取出来执行,只要增加了提前中断和继续调度的场景。

react/packages/scheduler/src/forks/Scheduler.js
function workLoop(initialTime: number) { let currentTime = initialTime; advanceTimers(currentTime); // 取最高优的任务,注意是 peek,而不是 pop currentTask = peek(taskQueue); while ( currentTask !== null && // 开发调试用,不用关心 !(enableSchedulerDebugging && isSchedulerPaused) ) { // 中断条件:如果当前任务还到过期时间,但是需要让出主线程,那么就提前中断退出 // 所以,如果当前任务已经过期了,那么即使判断为需要让出主线程,也不会中断,继续执行任务,以保证已经过期的任务能尽快执行完 if (currentTask.expirationTime > currentTime && shouldYieldToHost()) { // This currentTask hasn't expired, and we've reached the deadline. break; } const callback = currentTask.callback; // 如果任务被取消或者完成了,callback 会被置为 null, 所以在执行前需要判断当前任务是否有效 if (typeof callback === 'function') { // 将 callback 先置为 null, 认为该任务会执行完成 currentTask.callback = null; currentPriorityLevel = currentTask.priorityLevel; // 判断当前任务是否已经超时了 const didUserCallbackTimeout = currentTask.expirationTime <= currentTime; if (enableProfiling) { markTaskRun(currentTask, currentTime); } // 和 Demo 的不同点: // 1. 接收 timeout 作为入参,让调用的任务感知到当前任务是否已经超时,当前任务中的业务逻辑可自行判断是否需要中断任务, // 比如 React Reconciler 层就会根据此参数判断 Reconciliation 是否需要中断 // 2. 接收一个返回函数,标记当前任务是否还有继续执行的任务, // 比如上面说的即使任务没有过期, 为了页面的流畅性,任务自行中断了让出主线程 // 那么会返回一个 continuationCallback,标记当前任务还没结束,还需要继续调度 const continuationCallback = callback(didUserCallbackTimeout); currentTime = getCurrentTime(); if (typeof continuationCallback === 'function') { // If a continuation is returned, immediately yield to the main thread // regardless of how much time is left in the current time slice. // $FlowFixMe[incompatible-use] found when upgrading Flow currentTask.callback = continuationCallback; if (enableProfiling) { // $FlowFixMe[incompatible-call] found when upgrading Flow markTaskYield(currentTask, currentTime); } advanceTimers(currentTime); // 对于还有继续任务的场景,快速退出,重新调度,这是为了快速让出主线程,影响用户的交互 // true 表示还有任务 return true; } else { if (enableProfiling) { // $FlowFixMe[incompatible-call] found when upgrading Flow markTaskCompleted(currentTask, currentTime); // $FlowFixMe[incompatible-use] found when upgrading Flow currentTask.isQueued = false; } // 如果当前任务执行完成了,并且还是最高优的任务(在任务执行中也可能会插入更高优的任务),就把当前任务推出 // 如果不是最高优,就不做处理,因为上面已经在任务执行前将 callback 置为 null,并且在 loop 时做了 check 处理 if (currentTask === peek(taskQueue)) { pop(taskQueue); } advanceTimers(currentTime); } } else { // 说明当前任务已经完成或者被取消,直接 pop 掉即可 pop(taskQueue); } currentTask = peek(taskQueue); } // 因为 loop 会提前中断,所以需要检测否是还存在任务,存在,返回 true,否则 false。 if (currentTask !== null) { return true; } else { const firstTimer = peek(timerQueue); if (firstTimer !== null) { // 如果所有立即执行任务已经完成,但还存在延迟任务,那么再次对延迟任务进行调度 requestHostTimeout(handleTimeout, firstTimer.startTime - currentTime); } return false; } }

提前中断 yield

来看看 shouldYieldToHost 做了什么:

react/packages/scheduler/src/SchedulerFeatureFlags.js
export const frameYieldMs = 5;
react/packages/scheduler/src/forks/Scheduler.js
let frameInterval = frameYieldMs; function shouldYieldToHost(): boolean { const timeElapsed = getCurrentTime() - startTime; if (timeElapsed < frameInterval) { // The main thread has only been blocked for a really short amount of time; // smaller than a single frame. Don't yield yet. return false; } // Yield now. return true; }

默认情况下, shouldYieldToHost 就是判断是否已经过了 5 ms, 是的话就返回 true 表明当前需要让出主线程。

所以在 React 项目的性能火焰图中,通常会看到如下的结果(每次宏任务的时长大概在 5ms 左右):

当然,Scheduler 也提供了更改 frameInterval 的方法:

react/packages/scheduler/src/forks/Scheduler.js
function forceFrameRate(fps: number) { if (fps < 0 || fps > 125) { // Using console['error'] to evade Babel and ESLint console['error']( 'forceFrameRate takes a positive int between 0 and 125, ' + 'forcing frame rates higher than 125 fps is not supported', ); return; } if (fps > 0) { frameInterval = Math.floor(1000 / fps); } else { // reset the framerate frameInterval = frameYieldMs; } }

再次调度

workLoop 返回 true 表示还有任务需要执行,但当前 loop 已经结束,那么就需要再次调度剩余的任务:

react/packages/scheduler/src/forks/Scheduler.js
const performWorkUntilDeadline = () => { if (isMessageLoopRunning) { const currentTime = getCurrentTime(); // Keep track of the start time so we can measure how long the main thread // has been blocked. startTime = currentTime; // If a scheduler task throws, exit the current browser task so the // error can be observed. // // Intentionally not using a try-catch, since that makes some debugging // techniques harder. Instead, if `flushWork` errors, then `hasMoreWork` will // remain true, and we'll continue the work loop. let hasMoreWork = true; try { // flushWork 返回的就是 workLoop 的结果 hasMoreWork = flushWork(currentTime); } finally { if (hasMoreWork) { // 如果还有任务,继续通过 schedulePerformWorkUntilDeadline 调度 schedulePerformWorkUntilDeadline(); } else { isMessageLoopRunning = false; } } } }; function flushWork(initialTime: number) { if (enableProfiling) { markSchedulerUnsuspended(initialTime); } isHostCallbackScheduled = false; // workLoop 开始任务执行前, 延时任务的 timer 可以取消掉,因为在任务执行过程中,延时任务队列可能还会变,之前的 timer 已经过时了 if (isHostTimeoutScheduled) { // We scheduled a timeout but it's no longer needed. Cancel it. isHostTimeoutScheduled = false; cancelHostTimeout(); } isPerformingWork = true; const previousPriorityLevel = currentPriorityLevel; try { if (enableProfiling) { try { return workLoop(initialTime); } catch (error) { if (currentTask !== null) { const currentTime = getCurrentTime(); // $FlowFixMe[incompatible-call] found when upgrading Flow markTaskErrored(currentTask, currentTime); // $FlowFixMe[incompatible-use] found when upgrading Flow currentTask.isQueued = false; } throw error; } } else { // No catch in prod code path. return workLoop(initialTime); } } finally { currentTask = null; currentPriorityLevel = previousPriorityLevel; isPerformingWork = false; if (enableProfiling) { const currentTime = getCurrentTime(); markSchedulerSuspended(currentTime); } } }

至此,就是 Scheduler 主链路的流程。