Dart VM 线程池剖析

Dart VM 线程池剖析

要研究Dart VM,我们可以到Dart 的 GitHub仓库 去下载查看源码。

源码剖析

今天这篇文章主要研究VM底层的线程池,具体源码参见 sdk\runtime\vm\thread_pool.hsdk\runtime\vm\thread_pool.cc

class ThreadPool {
public:
  class Task : public IntrusiveDListEntry<Task> {
   protected:
    Task() {}
   public:
    virtual ~Task() {}
    virtual void Run() = 0;
   private:
    DISALLOW_COPY_AND_ASSIGN(Task);
  };
 
  explicit ThreadPool(uintptr_t max_pool_size = 0);
  virtual ~ThreadPool();
 
  // 在线程池上运行任务
  template <typename T, typename... Args>
  bool Run(Args&&... args) {
    return RunImpl(std::unique_ptr<Task>(new T(std::forward<Args>(args)...)));
  }
 
  bool CurrentThreadIsWorker();
  void MarkCurrentWorkerAsBlocked();
  void MarkCurrentWorkerAsUnBlocked();
 
  // 关闭,阻止调度新任务。
  void Shutdown();
 
private:
  class Worker : public IntrusiveDListEntry<Worker> {
   public:
    explicit Worker(ThreadPool* pool);
 
    // 启动工作线程
    void StartThread();
 
   private:
    friend class ThreadPool;
 
    // 新工作线程的主要入口点
    static void Main(uword args);
 
    ThreadPool* pool_;
    ThreadJoinId join_id_;
    OSThread* os_thread_ = nullptr;
    bool is_blocked_ = false;
 
    DISALLOW_COPY_AND_ASSIGN(Worker);
  };
 
protected:
  virtual void OnEnterIdleLocked(MonitorLocker* ml) {}
  bool ShuttingDownLocked() { return shutting_down_; }
  bool TasksWaitingToRunLocked() { return !tasks_.IsEmpty(); }
 
private:
  using TaskList = IntrusiveDList<Task>;
  using WorkerList = IntrusiveDList<Worker>;
 
  bool RunImpl(std::unique_ptr<Task> task);
  void WorkerLoop(Worker* worker);
 
  Worker* ScheduleTaskLocked(MonitorLocker* ml, std::unique_ptr<Task> task);
 
  void IdleToRunningLocked(Worker* worker);
  void RunningToIdleLocked(Worker* worker);
  void IdleToDeadLocked(Worker* worker);
  void ObtainDeadWorkersLocked(WorkerList* dead_workers_to_join);
  void JoinDeadWorkersLocked(WorkerList* dead_workers_to_join);
 
  Monitor pool_monitor_;
  bool shutting_down_ = false;
  uint64_t count_running_ = 0;
  uint64_t count_idle_ = 0;
  uint64_t count_dead_ = 0;
  WorkerList running_workers_;
  WorkerList idle_workers_;
  WorkerList dead_workers_;
  uint64_t pending_tasks_ = 0;
  TaskList tasks_;
 
  Monitor exit_monitor_;
  std::atomic<bool> all_workers_dead_;
  uintptr_t max_pool_size_ = 0;
  DISALLOW_COPY_AND_ASSIGN(ThreadPool);
};

以上是thread_pool.h删除部分注释和函数之后的简化版,线程池的大概逻辑还是很清晰的,如果你从来没有了解过线程池的概念,那么建议先阅读一些参考资料,熟悉线程池的概念再看此文。

这里,ThreadPool类就代表了线程池本身,它定义了一些公共接口和私有成员,现在我们来详细了解一下:

  • TaskWorker

    class Task : public IntrusiveDListEntry<Task> {
        // ...
    }
    
    class Worker : public IntrusiveDListEntry<Worker> {
        // ...
    }
    

    我们看到,线程池内部定义了两个嵌套类:TaskWorker,它们都继承自IntrusiveDListEntry。这里的IntrusiveDListEntry是用于Dart 内部实现的一个IntrusiveDList双向链表数据结构,当往双向链表存放元素时,这个元素必须继承IntrusiveDListEntry,相关的说明,可以查看sdk\runtime\vm\intrusive_dlist.h,它甚至提供了一个Usage example。总之,继承IntrusiveDListEntry之后,TaskWorker实例就可以添加到双向链表中。

    这里Task表示可以在ThreadPool上运行的任务,而Worker则表示线程池中的工作线程

  • ThreadPool构造方法

    explicit ThreadPool(uintptr_t max_pool_size = 0);
    

    它接受一个可选的max_pool_size参数,指定线程池的最大大小,一般是指线程池中允许存在的最大工作线程数量。

  • Run函数

    template <typename T, typename... Args>
    bool Run(Args&&... args) {
        return RunImpl(std::unique_ptr<Task>(new T(std::forward<Args>(args)...)));
    }
    

    这里的Run函数是一个模板函数,用于创建一个任务对象并将其添加到线程池中运行。其具体实现,是交给了RunImpl函数。

  • WorkerLoop函数

    这是一个最为关键的函数,在头文件中只有声明,具体实现在thread_pool.cc中,此函数定义了工作线程的执行逻辑,我们会在后面详细解析其内部实现。

  • MonitorLocker

    thread_pool.h头文件中,仅对其做了一个前置声明class MonitorLocker;。要注意,在线程池代码中,带Monitor字眼的,都是和线程同步相关的处理,这里的MonitorLocker就是一个同步锁。

接下来,我们详细看看thread_pool.cc中关于RunImpl函数的详细实现代码:

bool ThreadPool::RunImpl(std::unique_ptr<Task> task) {
  Worker* new_worker = nullptr;
  {
    MonitorLocker ml(&pool_monitor_);
    if (shutting_down_) {
      return false;
    }
    new_worker = ScheduleTaskLocked(&ml, std::move(task));
  }
  if (new_worker != nullptr) {
    new_worker->StartThread();
  }
  return true;
}

这个方法是线程池执行任务的具体实现。它接收一个任务对象 task,并尝试在线程池中安排这个任务执行。它首先创建了一个 MonitorLocker 锁对象,并通过持有pool_monitor_对象保护花括号包裹的代码块,确保线程安全。接下来判断shutting_down_变量,如果线程池正在关闭,则不接受新的任务,直接返回 false。然后调用 ScheduleTaskLocked 方法尝试调度任务。需要注意,如果线程池启动了一个新的工作线程,则该方法会返回一个新的 Worker 对象指针。接下来的代码离开了花括号的代码块,ml 会自动释放同步锁。最后,如果新的 Worker 对象不为空,则启动这个新线程。

再来详细看看ScheduleTaskLocked方法的具体实现:

ThreadPool::Worker* ThreadPool::ScheduleTaskLocked(MonitorLocker* ml,
                                                   std::unique_ptr<Task> task) {
  // 将新任务放入队列
  tasks_.Append(task.release());
  pending_tasks_++;
  ASSERT(pending_tasks_ >= 1);
 
  // 通知已有的空闲工作线程(如果有),其实就是唤醒休眠的线程
  if (count_idle_ >= pending_tasks_) {
    ASSERT(!idle_workers_.IsEmpty());
    ml->Notify();
    return nullptr;
  }
 
  // 如果已达到运行线程数的上限,将不会启动新线程
  if (max_pool_size_ > 0 && (count_idle_ + count_running_) >= max_pool_size_) {
    if (!idle_workers_.IsEmpty()) {
      ml->Notify();
    }
    return nullptr;
  }
 
  // 否则启动一个新的工作线程
  auto new_worker = new Worker(this);
  idle_workers_.Append(new_worker);
  count_idle_++;
  return new_worker;
}

首先,传入的任务被添加到任务队列 tasks_ 中。task.release() 释放 unique_ptr 拥有的对象的所有权,并返回其指针,这样任务就可以被队列接管。pending_tasks_ 是一个计数器,表示待处理任务的数量,并且断言至少有一个任务在队列中。

接下来,如果空闲工作线程的数量 count_idle_大于或等于待处理任务的数量,这意味着有足够的工作线程来处理这些任务。此时,将会通过 MonitorLocker 发出通知,让一个空闲的工作线程开始执行任务。由于已经有现成的工作线程可以使用,因此不需要创建新的工作线程,返回 nullptr

紧接着的if判断表示,如果线程池已达到最大线程数 max_pool_size_,并且线程池中的所有工作线程(空闲的 + 正在运行的)总数已经等于或超过了这个最大值,那么不会启动新的工作线程。如果还有空闲的工作线程,通过 MonitorLocker 发出通知,让一个空闲的工作线程开始执行任务。

走到最后的代码,则表示线程池中还可以添加新的工作线程(即没有达到最大线程数),那么就创建一个新的 Worker 对象,并将其添加到空闲工作线程列表 idle_workers_ 中。空闲工作线程计数器 count_idle_ 递增,并返回这个新创建的 Worker 对象的指针。此处就对应我们前面所说的,ScheduleTaskLocked返回值不为空,表示新创建了一个工作线程。

我们再来看一下RunImpl中调用的StartThread方法,它是Worker类中实现的成员方法:

void ThreadPool::Worker::StartThread() {
  int result = OSThread::Start("DartWorker", &Worker::Main,
                               reinterpret_cast<uword>(this));
  if (result != 0) {
    FATAL("Could not start worker thread: result = %d.", result);
  }
}

这个方法的实现非常简单,就调用了一行代码。主要就是启动一个被命名为”DartWorker”的操作系统线程,这个线程的入口是 Worker::Main 函数,表示将在线程中执行Worker::Main

再来详细看看Worker::Main的实现:

void ThreadPool::Worker::Main(uword args) {
  // 线程启动时的回调。如果设置了 Dart_ThreadStartCallback 回调函数,这里会调用它以通知嵌入者,线程池的线程已经启动
  Dart_ThreadStartCallback start_cb = Dart::thread_start_callback();
  if (start_cb != nullptr) {
    start_cb();
  }
  // 获取当前系统线程的指针
  OSThread* os_thread = OSThread::Current();
  ASSERT(os_thread != nullptr);
 
  // 将传入的参数(args)转换回 Worker 类型的指针
  // 这里假设 args 实际上是一个 Worker 对象的指针
  Worker* worker = reinterpret_cast<Worker*>(args);
  // 通过 worker 指针获取到它所属的 ThreadPool 对象
  ThreadPool* pool = worker->pool_;
  // 设置os_thread对象的所有者为此 worker
  os_thread->owning_thread_pool_worker_ = worker;
  // 将 worker 的系统线程设置为当前线程
  worker->os_thread_ = os_thread;
 
  // 设置线程退出时需要被 join 的 id
  worker->join_id_ = OSThread::GetCurrentThreadJoinId(os_thread);
 
  // 调用 WorkerLoop 方法,这是 worker 线程的主循环,负责执行任务
  pool->WorkerLoop(worker);
 
  // 当 WorkerLoop 结束后,清理 worker 的 os_thread_ 成员,表示不再拥有操作系统线程
  worker->os_thread_ = nullptr;
  // 并将操作系统线程的所有者清空
  os_thread->owning_thread_pool_worker_ = nullptr;
 
  // 线程退出时的回调。如果设置了 Dart_ThreadExitCallback 回调函数,这里会调用它以通知嵌入者,线程池的线程即将退出。
  Dart_ThreadExitCallback exit_cb = Dart::thread_exit_callback();
  if (exit_cb != nullptr) {
    exit_cb();
  }
}

以上代码,我都添加了详细注释。主要逻辑,包括线程启动、退出时的钩子函数调用,对于线程生命周期的维护(包括线程所有权的指派和清除),以及工作线程主循环的执行。其中,最为关键的代码是WorkerLoop函数调用,下面我们详细研究一下线程池WorkerLoop函数的实现:

void ThreadPool::WorkerLoop(Worker* worker) {
  WorkerList dead_workers_to_join;
  while (true) {
    MonitorLocker ml(&pool_monitor_);
    if (!tasks_.IsEmpty()) {
      IdleToRunningLocked(worker);
      while (!tasks_.IsEmpty()) {
        std::unique_ptr<Task> task(tasks_.RemoveFirst());
        pending_tasks_--;
        MonitorLeaveScope mls(&ml);
        task->Run();
        ASSERT(Isolate::Current() == nullptr);
        task.reset();
      }
      RunningToIdleLocked(worker);
    }
 
    if (running_workers_.IsEmpty()) {
      ASSERT(tasks_.IsEmpty());
      OnEnterIdleLocked(&ml);
      if (!tasks_.IsEmpty()) {
        continue;
      }
    }
 
    if (shutting_down_) {
      ObtainDeadWorkersLocked(&dead_workers_to_join);
      IdleToDeadLocked(worker);
      break;
    }
 
    // 休眠,直到收到新任务、超时或关闭
    const int64_t idle_start = OS::GetCurrentMonotonicMicros();
    bool done = false;
    while (!done) {
      const auto result = ml.WaitMicros(ComputeTimeout(idle_start));
      // 必须执行完所有待处理的任务
      if (!tasks_.IsEmpty()) break;
 
      if (shutting_down_ || result == Monitor::kTimedOut) {
        done = true;
        break;
      }
    }
    if (done) {
      ObtainDeadWorkersLocked(&dead_workers_to_join);
      IdleToDeadLocked(worker);
      break;
    }
  }
  JoinDeadWorkersLocked(&dead_workers_to_join);
}

这个函数首先定义了一个死亡的工作线程列表dead_workers_to_join,注意,这里的WorkerList类型其实就是我们前面说的双向链表,我们可以查看头文件中的声明:

private:
  using TaskList = IntrusiveDList<Task>;
  using WorkerList = IntrusiveDList<Worker>;

紧接着,WorkerLoop 函数中开始了一个无限循环,每个工作线程都会运行这个循环。MonitorLocker 是同步锁,用于在修改线程池状态或任务队列时保持互斥。它锁定了与线程池关联的监视器对象(pool_monitor_)。

接下来:

    if (!tasks_.IsEmpty()) {
      IdleToRunningLocked(worker);
      while (!tasks_.IsEmpty()) {
        std::unique_ptr<Task> task(tasks_.RemoveFirst());
        pending_tasks_--;
        MonitorLeaveScope mls(&ml);
        task->Run();
        ASSERT(Isolate::Current() == nullptr);
        task.reset();
      }
      RunningToIdleLocked(worker);
    }

判断任务队列如果不为空,调用IdleToRunningLocked函数,将当前工作线程的状态从空闲(Idle)转为运行中(Running)。

在任务队列不为空的情况下,工作线程会不断地取出任务并执行它们。MonitorLeaveScope 是一个作用域管理器,确保在执行任务时释放锁,以便其他线程可以访问线程池。task->Run() 执行任务。执行完毕后,工作线程的状态从运行中(Running)转回空闲(Idle)。

紧接着是另一个if判断:

    if (running_workers_.IsEmpty()) {
      ASSERT(tasks_.IsEmpty());
      OnEnterIdleLocked(&ml);
      if (!tasks_.IsEmpty()) {
        continue;
      }
    }

如果正在运行的工作线程队列为空,那么会假定任务队列也为空,并且会调用OnEnterIdleLocked函数,此函数实际上是一个虚函数,需要子类来实现。这里也并没有实现,此处没有什么实际作用(可能用于资源回收或其他优化)。最后,如果在处理完空闲状态后又发现任务队列不为空,则继续循环以处理新任务。

后面接着检查shutting_down_变量,如果线程池正在关闭,那么会获取所有待回收的死亡工作线程,并将当前工作线程的状态从闲置(Idle)转为死亡(Dead),然后退出循环

再看后面的代码:

    const int64_t idle_start = OS::GetCurrentMonotonicMicros();
    bool done = false;
    while (!done) {
      const auto result = ml.WaitMicros(ComputeTimeout(idle_start));
      // 必须执行完所有待处理的任务
      if (!tasks_.IsEmpty()) break;
 
      if (shutting_down_ || result == Monitor::kTimedOut) {
        done = true;
        break;
      }
    }
    if (done) {
      ObtainDeadWorkersLocked(&dead_workers_to_join);
      IdleToDeadLocked(worker);
      break;
    }
  }

工作线程在没有任务执行且线程池没有关闭的情况下会进入休眠状态。ComputeTimeout 函数计算了工作线程应该等待多长时间。如果在等待期间出现了新的任务或者线程池开始关闭,或者等待超时,工作线程将会结束休眠。如果因为关闭或超时而结束休眠,则将工作线程标记为死亡状态,并准备退出循环

最后,此函数会处理所有已标记为死亡的工作线程。JoinDeadWorkersLocked 负责回收工作线程资源,确保在工作线程完全停止之前不会退出。

那么这个线程池是在哪里初始化的,在哪里使用的呢?这里我们先看看线程池的初始化,至于使用,我将在下一篇文章剖析。

打开Dart VM的入口头文件sdk\runtime\vm\dart.h

// 省略部分代码
class Dart : public AllStatic {
public:
  static char* Init(const Dart_InitializeParams* params);
  ...
  static ThreadPool* thread_pool() { return thread_pool_; }
 
 
private:
  static char* DartInit(const Dart_InitializeParams* params);
  ...
  static ThreadPool* thread_pool_;
  ...
};

可以看到,Dart::thread_pool()就是返回了一个静态的全局变量,接下来看看这个变量是在哪里初始化的,打开实现文件sdk\runtime\vm\dart.c

char* Dart::DartInit(const Dart_InitializeParams* params) {
  ...
 
  // Create the VM isolate and finish the VM initialization.
  ASSERT(thread_pool_ == NULL);
  thread_pool_ = new ThreadPool();
 
  ...
  return NULL;
}
 
char* Dart::Init(const Dart_InitializeParams* params) {
  ...
  char* retval = DartInit(params);
  ...
  return NULL;
}

Dart::Init的调用,则是在sdk\runtime\vm\dart_api_impl.cc进行了封装:

DART_EXPORT char* Dart_Initialize(Dart_InitializeParams* params) {
  ...
  return Dart::Init(params);
}

最终,它是由Dart VM的真正入口源文件sdk\runtime\bin\main_impl.cc进行初始化调用:

void main(int argc, char** argv) {
  ...
 
  error = Dart_Initialize(&init_params);
  ...
 
  // Free environment if any.
  Options::DestroyEnvironment();
 
  Platform::Exit(global_exit_code);
}

这次终于见到了我们喜闻乐见的C/C++入口函数——main!由于这块的初始化代码特别多,以上过程省略大量代码。

最后,简单看一下OSThread是什么。sdk\runtime\vm\os_thread.h

class BaseThread {
public:
  bool is_os_thread() const { return is_os_thread_; }
 
private:
  explicit BaseThread(bool is_os_thread) : is_os_thread_(is_os_thread) {}
  virtual ~BaseThread() {}
 
  bool is_os_thread_;
 
  friend class ThreadState;
  friend class OSThread;
 
  DISALLOW_IMPLICIT_CONSTRUCTORS(BaseThread);
};
 
// Low-level operations on OS platform threads.
class OSThread : public BaseThread {
public:
  // The constructor of OSThread is never called directly, instead we call
  // this factory style method 'CreateOSThread' to create OSThread structures.
  // The method can return a NULL if the Dart VM is in shutdown mode.
  static OSThread* CreateOSThread();
  ~OSThread();
 
  ThreadId id() const {
    ASSERT(id_ != OSThread::kInvalidThreadId);
    return id_;
  }
 
  ......
 
};

基本可以看出来,OSThread就是对操作系统线程的一个抽象,其具体实现,是分平台的,如sdk\runtime\vm\os_thread_linux.ccsdk\runtime\vm\os_thread_android.cc等等。

总结

我将这个过程画成了流程图:

Dart VM 线程池剖析


关注公众号:编程之路从0到1

原文链接:https://juejin.cn/post/7325132165263704090 作者:编程之路从0到1

(0)
上一篇 2024年1月18日 下午4:40
下一篇 2024年1月18日 下午4:50

相关推荐

发表回复

登录后才能评论