Dart IO源码剖析(下)
在上一篇文章中,我们详细剖析了Dart 的文件IO,这一章,我们将剖析Dart 网络IO。
网络IO
基于上篇的基础,我们已经大概知道Dart VM的一些知识,那么这篇文章,我就不会解释得过于细致了。
还是让我们先从Dart代码开始吧!
var httpClient = HttpClient();
// 发起 GET 请求
var request = await httpClient.getUrl(Uri.parse('http://example.com/data.json'));
// 等待服务器响应
var response = await request.close();
// 如果请求成功(HTTP 状态码为 200)
if (response.statusCode == HttpStatus.ok) {
// TODO:
}
以上是不借助第三方库,直接使用Dart 标准库发起HTTP请求的Demo。我们知道,HTTP其实也是建立在TCP的基础之上的,Dart中的HTTP网络请求,也是封装的Socket 接口,当然,还有一系列对HTTP协议的实现,但我们去查看HttpClient
的实现,会发现里面就是调用的Socket发送数据。那么我们再看以下Dart中的Socket编程的Demo:
// 连接服务器的8081端口
Socket socket = await Socket.connect('127.0.0.1', 8081);
socket.write('Hello, Server!');
socket.cast<List<int>>().transform(utf8.decoder).listen(print);
当我们再去追查Socket的实现代码时,又会发现,它其实是对RawSocket
类的封装。所以,我们干脆来看一个例子,学习一下RawSocket
怎么用:
RawSocket socket = await RawSocket.connect("127.0.0.1", 8081);
socket.listen((event) {
switch(event){
case RawSocketEvent.read:
if(socket.available() > 0){
var buffer = socket.read();
// TODO:
}
break;
case RawSocketEvent.write:
socket.write([96,97,98]);
break;
case RawSocketEvent.readClosed:
case RawSocketEvent.closed:
socket.close();
break;
}
});
整个Demo很简单,首先通过RawSocket.connect
连接指定的IP和端口,然后监听连接事件,当事件为RawSocketEvent.read
时,说明服务器有数据发过来,我们需要去读取,当事件为RawSocketEvent.write
时,说明底层已经准备好,我们可以发送数据。
到这里,我们就找到了源码跟踪的目标,那就是RawSocket.connect
方法的实现。
socket.dart
abstract interface class RawSocket implements Stream<RawSocketEvent> {
...
external static Future<RawSocket> connect(host, int port,
{sourceAddress, int sourcePort = 0, Duration? timeout});
...
}
学习了上一篇文章,我们已经有了一点经验,实际上这里的connect
方法并不是一个本地扩展方法,很可能只是隐藏了实现,我们轻车路熟的找到sdk\lib\_internal\vm\bin\socket_patch.dart
:
class RawSocket {
@patch
static Future<RawSocket> connect(dynamic host, int port,
{dynamic sourceAddress, int sourcePort = 0, Duration? timeout}) {
return _RawSocket.connect(host, port, sourceAddress, sourcePort, timeout);
}
@patch
static Future<ConnectionTask<RawSocket>> startConnect(dynamic host, int port,
{dynamic sourceAddress, int sourcePort = 0}) {
return _RawSocket.startConnect(host, port, sourceAddress, sourcePort);
}
}
可以看到,补丁内容很少,主要是调用了子类_RawSocket.connect
方法:
static Future<RawSocket> connect(dynamic host, int port,
dynamic sourceAddress, int sourcePort, Duration? timeout) {
return _NativeSocket.connect(host, port, sourceAddress, sourcePort, timeout)
.then((socket) {
if (!const bool.fromEnvironment("dart.vm.product")) {
_SocketProfile.collectNewSocket(
socket.nativeGetSocketId(), _tcpSocket, socket.address, port);
}
return _RawSocket(socket);
});
}
继续查看_NativeSocket
中的connect
实现:
static Future<_NativeSocket> connect(dynamic host, int port,
dynamic sourceAddress, int sourcePort, Duration? timeout) {
return startConnect(host, port, sourceAddress, sourcePort)
.then((ConnectionTask<_NativeSocket> task) {
Future<_NativeSocket> socketFuture = task.socket;
if (timeout != null) {
socketFuture = socketFuture.timeout(timeout, onTimeout: () {
task.cancel();
throw createError(
null, "Connection timed out, host: ${host}, port: ${port}");
});
}
return socketFuture;
});
}
继续查看startConnect
:
static Future<ConnectionTask<_NativeSocket>> startConnect(
dynamic host, int port, dynamic sourceAddress, int sourcePort) {
// ...省略部分代码
return new Future.value(host).then<ConnectionTask<_NativeSocket>>((host) {
if (host is String) {
host = _InternetAddress.tryParse(host) ?? host;
}
if (host is _InternetAddress) {
return tryConnectToResolvedAddresses(host, port, source, sourcePort,
Stream.value(<_InternetAddress>[host]), stackTrace);
}
final hostname = host as String;
final Stream<List<InternetAddress>> addresses =
staggeredLookup(hostname, source);
return tryConnectToResolvedAddresses(
host, port, source, sourcePort, addresses, stackTrace);
});
}
可以看到,主要是调用了tryConnectToResolvedAddresses
方法,这个方法的实现代码非常多,这里我做一些删减,只显示最关键的代码:
static ConnectionTask<_NativeSocket> tryConnectToResolvedAddresses(
dynamic host,
int port,
_InternetAddress? source,
int sourcePort,
Stream<List<InternetAddress>> addresses,
StackTrace callerStackTrace) {
...
Object? createConnection(InternetAddress address, _InternetAddress? source,
_NativeSocket socket) {
Object? connectionResult;
if (address.type == InternetAddressType.unix) {
if (source == null) {
connectionResult = socket.nativeCreateUnixDomainConnect(
address.address, _Namespace._namespace);
} else {
...
connectionResult = socket.nativeCreateUnixDomainBindConnect(
address.address, source.address, _Namespace._namespace);
}
} else {
final address_ = address as _InternetAddress;
if (source == null && sourcePort == 0) {
connectionResult = socket.nativeCreateConnect(
address_._in_addr, port, address_._scope_id);
} else {
...
connectionResult = socket.nativeCreateBindConnect(address_._in_addr,
port, source._in_addr, sourcePort, address_._scope_id);
}
}
return connectionResult;
}
// Invoked either directly or via throttling Timer callback when we
// are ready to verify that we can connect to resolved address.
connectNext() {
...
final Object? connectionResult =
createConnection(address, source, socket);
if (connectionResult != true) {
// connectionResult was not a success.
error = createConnectionError(connectionResult, address, port, socket);
connectNext(); // Try again after failure to connect.
return;
}
...
}
connectNext();
return new ConnectionTask<_NativeSocket>._(result.future, onCancel);
}
可以看到,这个函数中又定义了几个闭包函数,我删减了一些,这里最关键的是调用createConnection
这个闭包创建连接。connectNext
实际上是对多个地址的遍历,它的内部就是调用createConnection
建立连接的。
createConnection
中,又根据地址的不同,分别调用三种不同的本地扩展方法:
nativeCreateUnixDomainConnect
nativeCreateConnect
nativeCreateBindConnect
其实它们底层实现都类似,这里按照我们Demo的调用方式,应该跟踪nativeCreateConnect
方法:
@pragma("vm:external-name", "Socket_CreateConnect")
external nativeCreateConnect(Uint8List addr, int port, int scope_id);
根据声明,我们去C++中检索Socket_CreateConnect
函数:
sdk\runtime\bin\socket.cc
void FUNCTION_NAME(Socket_CreateConnect)(Dart_NativeArguments args) {
RawAddr addr;
SocketAddress::GetSockAddr(Dart_GetNativeArgument(args, 1), &addr);
Dart_Handle port_arg = Dart_GetNativeArgument(args, 2);
int64_t port = DartUtils::GetInt64ValueCheckRange(port_arg, 0, 65535);
SocketAddress::SetAddrPort(&addr, static_cast<intptr_t>(port));
if (addr.addr.sa_family == AF_INET6) {
Dart_Handle scope_id_arg = Dart_GetNativeArgument(args, 3);
int64_t scope_id =
DartUtils::GetInt64ValueCheckRange(scope_id_arg, 0, 65535);
SocketAddress::SetAddrScope(&addr, scope_id);
}
intptr_t socket = Socket::CreateConnect(addr);
OSError error;
if (socket >= 0) {
Socket::SetSocketIdNativeField(Dart_GetNativeArgument(args, 0), socket,
Socket::kFinalizerNormal);
Dart_SetReturnValue(args, Dart_True());
} else {
Dart_SetReturnValue(args, DartUtils::NewDartOSError(&error));
}
}
这里主要是调用Socket::CreateConnect
函数创建连接,但是这里的socket.cc
并没有该函数的具体实现,头文件socket.h
中只有声明:
// Creates a socket which is bound and connected. The port to connect to is
// specified as the port component of the passed RawAddr structure.
static intptr_t CreateConnect(const RawAddr& addr);
其实Native层面的套接字是操作系统的API,不同的系统有不同的API,所以这里肯定是需要做条件编译的,不同的系统平台,其实现函数是不同的,但是总体来说,主要还是分为Windows、POSIX、Fuchsia 三大套,像Android、iOS、Linux、MacOS都是属于POSIX系列的Socket编程。这里我们先选取Android和Linux平台的实现来分析一下:
sdk\runtime\bin\socket_linux.cc
static intptr_t Create(const RawAddr& addr) {
intptr_t fd;
intptr_t type = SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC;
fd = NO_RETRY_EXPECTED(socket(addr.ss.ss_family, type, 0));
if (fd < 0) {
return -1;
}
return fd;
}
static intptr_t Connect(intptr_t fd, const RawAddr& addr) {
intptr_t result = TEMP_FAILURE_RETRY(
connect(fd, &addr.addr, SocketAddress::GetAddrLength(addr)));
if ((result == 0) || (errno == EINPROGRESS)) {
return fd;
}
FDUtils::SaveErrorAndClose(fd);
return -1;
}
intptr_t Socket::CreateConnect(const RawAddr& addr) {
intptr_t fd = Create(addr);
if (fd < 0) {
return fd;
}
return Connect(fd, addr);
}
CreateConnect
内部主要是调用了两个方法,Create
和Connect
,这里涉及的socket()
和connect()
函数调用,就是基本的POSIX Socket编程了,如果不清楚的,可以去了解一些C语言Socket编程相关知识。这里我们需要重点关注的是标志位SOCK_NONBLOCK
,在调用socket()
函数时,添加了此标志位,这就表示创建一个非阻塞的套接字。这就意味着在进行套接字相关的输入输出操作时,如果操作不能立即完成,这些操作将不会阻塞当前线程。
再看看sdk\runtime\bin\socket_android.cc
static intptr_t Create(const RawAddr& addr) {
intptr_t fd;
fd = NO_RETRY_EXPECTED(socket(addr.ss.ss_family, SOCK_STREAM, 0));
if (fd < 0) {
return -1;
}
if (!FDUtils::SetCloseOnExec(fd) || !FDUtils::SetNonBlocking(fd)) {
FDUtils::SaveErrorAndClose(fd);
return -1;
}
return fd;
}
static intptr_t Connect(intptr_t fd, const RawAddr& addr) {
intptr_t result = TEMP_FAILURE_RETRY(
connect(fd, &addr.addr, SocketAddress::GetAddrLength(addr)));
if ((result == 0) || (errno == EINPROGRESS)) {
return fd;
}
FDUtils::SaveErrorAndClose(fd);
return -1;
}
intptr_t Socket::CreateConnect(const RawAddr& addr) {
intptr_t fd = Create(addr);
if (fd < 0) {
return fd;
}
return Connect(fd, addr);
}
Android的代码大同小异,只是在调用socket()
函数时没有设置SOCK_NONBLOCK
标志位。由此我们就可以判断,linux实现和android的实现代码很可能不是同一个人写的。我们注意到,Android的Create
方法中调用了FDUtils::SetNonBlocking()
方法设置非阻塞方式。在POSIX Socket编程中,有两种方法可以创建非阻塞IO,一种是前面代码中调用socket()
函数时传标志位,另一种是调用fcntl
函数设置。这里的FDUtils::SetNonBlocking
其实就是封装了fcntl
函数的调用,使用的第二种方式。同一个人写代码,基本上风格都是统一的,不太可能一会儿一个风格。
通常,在阻塞模式下,如果一个套接字操作(例如 accept, read, write, connect 等)不能立即完成,该操作会挂起调用线程直到操作完成。例如,如果你在读取一个套接字,而没有数据可用,那么在阻塞模式下的 read 调用将会一直等待直到有数据到来。
而在非阻塞模式下,这些操作会立即返回,即使操作并未完成。如果操作因为需要等待某些条件而无法立即完成,操作将返回一个错误码(通常是
EWOULDBLOCK
或EAGAIN
),而不是挂起调用线程。
那么非阻塞IO如何读取数据呢?
非阻塞套接字通常与I/O多路复用(如 select, poll, epoll)结合使用,使单个线程能够高效地管理多个并发I/O操作。有些人喜欢把这称为异步IO,其实严格的说,非阻塞IO并不是异步IO,真正的异步IO是AIO技术。
Dart 在网络IO方面的实现,就是基于各个操作系统主流的I/O多路复用技术,Linux内核上是使用的epoll
,MacOS内核上是使用的kqueue
,这也是Unix内核的标准,WIndows上则是使用的IOCP技术。
至此,我们明白了,Dart是通过操作系统底层的非阻塞IO发送网络请求,理论上是不会阻塞当前线程的。当然,这只是理论,实际上,当调用epoll_wait()
监听文件描述符时,依然会阻塞Dart的主线程。所以Dart VM仍然是为网络连接单独创建了工作线程执行操作,这些并不是在Dart 层的单线程模型中进行操作。
接下来,我们继续分析底层源码。那么我们的线索是什么呢?研究源码没有线索是不行的,那会成为没头苍蝇一样乱飞。
socket.listen((event) {
switch(event){
case RawSocketEvent.read:
if(socket.available() > 0){
var buffer = socket.read();
// TODO:
}
break;
case RawSocketEvent.write:
socket.write([96,97,98]);
break;
case RawSocketEvent.readClosed:
case RawSocketEvent.closed:
socket.close();
break;
}
});
还记得我们前面学习的Dart RawSocket
的Demo吗?这个Demo当然是有用的!当我们创建好连接后,并不是马上就可以读写数据的,需要接收到RawSocketEvent
类型的相应事件,然后才会去操作读写。那么这些事件是谁发给我们的?是从哪里发出的?这就是我们要探寻的线索。
_RawSocket(this._socket) {
...
_socket.setHandlers(
read: () => _controller.add(RawSocketEvent.read),
write: () {
// The write event handler is automatically disabled by the
// event handler when it fires.
writeEventsEnabled = false;
_controller.add(RawSocketEvent.write);
},
closed: () => _controller.add(RawSocketEvent.readClosed),
destroyed: () {
_controller.add(RawSocketEvent.closed);
_controller.close();
},
error: zone.bindBinaryCallbackGuarded((Object e, StackTrace? st) {
_controller.addError(e, st);
_socket.close();
}));
}
// _NativeSocket 类
void setHandlers(
{void Function()? read,
void Function()? write,
void Function(Object e, StackTrace? st)? error,
void Function()? closed,
void Function()? destroyed}) {
readEventHandler = read;
writeEventHandler = write;
errorEventHandler = error;
closedEventHandler = closed;
destroyedEventHandler = destroyed;
}
我们可以看到Dart 层_RawSocket
类的构造方法,它通过调用_NativeSocket
类的setHandlers
方法设置了一些回调,当相应的回调被调用时,它就向Stream
中发射RawSocketEvent
事件。setHandlers
方法的实现也很简单,就是赋值,我们先追踪一下read
事件,检索一下readEventHandler
被谁调用了:
// Multiplexes socket events to the socket handlers.
void multiplex(Object eventsObj) {
int events = eventsObj as int;
for (int i = firstEvent; i <= lastEvent; i++) {
if (((events & (1 << i)) != 0)) {
if (isClosing && i != destroyedEvent) continue;
switch (i) {
case readEvent:
if (isClosedRead) continue;
if (isListening) {
connections++;
if (!isClosed) {
// If the connection is closed right after it's accepted, there's a
// chance the close-handler is not set.
var handler = readEventHandler;
if (handler != null) handler();
}
}
...
break;
case writeEvent:
...
continue;
case errorEvent:
...
break;
case closedEvent:
...
break;
case destroyedEvent:
...
continue;
}
}
}
...
}
看注释我们也知道,这个方法就是处理多路复用的,这里我精简了大量事件处理的代码,只留下了readEvent
事件处理的逻辑。
我们继续追踪:
void setListening({bool read = true, bool write = true}) {
sendReadEvents = read;
sendWriteEvents = write;
if (read) issueReadEvent();
if (write) issueWriteEvent();
if (!flagsSent && !isClosing) {
flagsSent = true;
int flags = 1 << setEventMaskCommand;
if (!isClosedRead) flags |= 1 << readEvent;
if (!isClosedWrite) flags |= 1 << writeEvent;
sendToEventHandler(flags);
}
}
void sendToEventHandler(int data) {
int fullData = (typeFlags & typeTypeMask) | data;
assert(!isClosing);
connectToEventHandler();
_EventHandler._sendData(this, eventPort!.sendPort, fullData);
}
void connectToEventHandler() {
assert(!isClosed);
if (eventPort == null) {
eventPort = new RawReceivePort(multiplex, 'Socket Event Handler');
}
}
来了来了,我们看见了什么,又见端口通信!还记得我在上一篇文章说了什么吗?在Dart中使用端口通信的目的就是为了跨线程。这里必然是有工作线程的。
不过connectToEventHandler()
这里使用了RawReceivePort
创建消息接收端口,一般大家都使用更高级的ReceivePort
,而这个类更底层,因此很少使用,可能很多人对这个类不熟。ReceivePort
直接通过listen
注册一个回调监听流接收消息,而RawReceivePort
则需要传一个消息处理器接收消息。这里multiplex
方法就是消息处理器。
创建好了消息接收端口后,又通过_EventHandler._sendData()
,将接收端口的sendPort
发送了出去,这里肯定是发给了Dart VM底层。而sendToEventHandler
方法又被setListening()
调用,setListening()
则是在前面我们见过的tryConnectToResolvedAddresses()
中被调用。也就是是说,创建连接后,立刻就设置了事件监听器。
我们可以稍微关注一下这里的sendToEventHandler()
被传入的参数data
,显然它是一个掩码。isClosedRead
和isClosedWrite
默认值都是false
,我们在Demo中没有主动设置它,因此,flags
经过按位或操作后,最终的值应该是0001 0000 0000 0011
。在sendToEventHandler()
方法中,这个掩码又经过了一些操作,但是经过我的演算,最终的fullData
值仍然是0001 0000 0000 0011
,这个值将会发送到Dart VM底层。
接下来,我们需要重点探索一下_EventHandler._sendData
方法:
@pragma("vm:external-name", "EventHandler_SendData")
external static void _sendData(Object? sender, SendPort sendPort, int data);
这是一个本地扩展方法,那么我们就要到C++源码中去检索EventHandler_SendData
函数了:
sdk\runtime\bin\eventhandler.cc
void FUNCTION_NAME(EventHandler_SendData)(Dart_NativeArguments args) {
Dart_Handle handle = Dart_GetNativeArgument(args, 1);
Dart_Port dart_port;
handle = Dart_SendPortGetId(handle, &dart_port);
if (Dart_IsError(handle)) {
Dart_PropagateError(handle);
UNREACHABLE();
}
Dart_Handle sender = Dart_GetNativeArgument(args, 0);
intptr_t id;
if (Dart_IsNull(sender)) {
id = kTimerId;
} else {
Socket* socket = Socket::GetSocketIdNativeField(sender);
ASSERT(dart_port != ILLEGAL_PORT);
socket->set_port(dart_port);
socket->Retain(); // inc refcount before sending to the eventhandler.
id = reinterpret_cast<intptr_t>(socket);
}
int64_t data = DartUtils::GetIntegerValue(Dart_GetNativeArgument(args, 2));
event_handler->SendData(id, dart_port, data);
}
看到这个函数,里面提取转换参数的过程就不说了,主要的逻辑其实只有三处:
- 获取C++层面的Socket对象:
Socket::GetSocketIdNativeField
。这里的Socket对象就是前面通过调用Socket_CreateConnect
函数创建的,还记得它里面有这么一行代码Socket::SetSocketIdNativeField(Dart_GetNativeArgument(args, 0), socket,Socket::kFinalizerNormal)
吗?显然这里的GetSocketIdNativeField
和SetSocketIdNativeField
是成对调用的。作用类似于Java JNI的反射,通过VM层面的C++代码去修改Dart类的成员属性。这里就是把C++的Socket对象句柄赋值给Dart的_NativeSocket
类成员属性,从而达到一个关联的效果。 - 在
Socket
类中设置了一个Dart层的发送端口:socket->set_port(dart_port)
,这也是调用_sendData
方法的一个主要目的之一。这样一来,将来VM层就可以通过这个发送端口给Dart上层发消息了。 - 处理事件:
event_handler->SendData(id, dart_port, data)
。我们前面解析了这里的data
参数,它其实是一个事件掩码,这里调用事件处理器,显然是为了处理我们传下来的掩码。
那么接下来跟踪的线索,显然就是event_handler->SendData()
方法:
static EventHandler* event_handler = NULL;
static Monitor* shutdown_monitor = NULL;
void EventHandler::Start() {
// Initialize global socket registry.
ListeningSocketRegistry::Initialize();
ASSERT(event_handler == NULL);
shutdown_monitor = new Monitor();
event_handler = new EventHandler();
event_handler->delegate_.Start(event_handler);
if (!SocketBase::Initialize()) {
FATAL("Failed to initialize sockets");
}
}
首先发现,event_handler
是一个静态的全局变量,它在Start()
方法中被初始化。在\sdk\runtime\bin\eventhandler.h
头文件中有其实现:
void SendData(intptr_t id, Dart_Port dart_port, int64_t data) {
delegate_.SendData(id, dart_port, data);
}
而这里的delegate_
其实是EventHandler
的子类EventHandlerImplementation
。在不同的操作系统平台上,其实现也是不同的,这里我们就查看linux系统上的实现,sdk\runtime\bin\eventhandler_linux.cc
:
void EventHandlerImplementation::SendData(intptr_t id,Dart_Port dart_port,int64_t data) {
WakeupHandler(id, dart_port, data);
}
void EventHandlerImplementation::WakeupHandler(intptr_t id,Dart_Port dart_port,int64_t data) {
InterruptMessage msg;
msg.id = id;
msg.dart_port = dart_port;
msg.data = data;
ASSERT(kInterruptMessageSize < PIPE_BUF);
intptr_t result =
FDUtils::WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize);
if (result != kInterruptMessageSize) {
if (result == -1) {
perror("Interrupt message failure:");
}
FATAL("Interrupt message failure. Wrote %" Pd " bytes.", result);
}
}
从这里开始的代码,就涉及更多C语言 Linux 系统编程的知识了,不易理解,大家可以去查询一些相关资料帮助理解,主要是涉及Linux 匿名管道编程,epoll编程知识,可自行学习。后面我就主要做简单的流程概括,不作代码的详细解释了。
Linux的管道主要是作为一种跨进程通信的手段,当然也可以用于两个线程通信,这里就是用于线程间通信。
以上WakeupHandler
方法的实现很容易理解,主要就是将我们前面的参数封装成一个InterruptMessage
消息类,然后通过FDUtils::WriteToBlocking
函数把消息写入了管道,其实就是发送了出去。
管道分为两头,一个写一个读:
int interrupt_fds_[2];
这里interrupt_fds_
就是一个含有两个文件描述符的数组,interrupt_fds_[1]
这头用来写,那么interrupt_fds_[0]
那一端肯定就是用来读取消息的。我们只要搜索interrupt_fds_[0]
就能找到接收消息的代码:
EventHandlerImplementation::EventHandlerImplementation()
: socket_map_(&SimpleHashMap::SamePointerValue, 16) {
intptr_t result;
result = NO_RETRY_EXPECTED(pipe(interrupt_fds_));
...
shutdown_ = false;
static const int kEpollInitialSize = 64;
epoll_fd_ = NO_RETRY_EXPECTED(epoll_create(kEpollInitialSize));
...
struct epoll_event event;
event.events = EPOLLIN;
event.data.ptr = NULL;
int status = NO_RETRY_EXPECTED(
epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupt_fds_[0], &event));
if (status == -1) {
FATAL("Failed adding interrupt fd to epoll instance");
}
timer_fd_ = NO_RETRY_EXPECTED(timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC));
if (timer_fd_ == -1) {
FATAL("Failed creating timerfd file descriptor: %i", errno);
}
// Register the timer_fd_ with the epoll instance.
event.events = EPOLLIN;
event.data.fd = timer_fd_;
status =
NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &event));
if (status == -1) {
FATAL("Failed adding timerfd fd(%i) to epoll instance: %i", timer_fd_,
errno);
}
}
我们发现,管道是在EventHandlerImplementation
类的构造方法中创建的:pipe(interrupt_fds_)
,以上省略部分代码。然后通过epoll_create
函数创建了一个 epoll 实例,接着调用epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupt_fds_[0], &event))
这行,把管道接收端添加到了epoll的监听中。那么根据Linux epoll多路复用的机制,必须使用epoll_wait
函数完成消息接收,所以我们只需要在代码中搜索epoll_wait
函数在哪即可。但是此处有一个地方需要注意,就是 struct epoll_event
结构体的成员赋值,我们监听管道时, event.data.ptr = NULL
。
void EventHandlerImplementation::Poll(uword args) {
ThreadSignalBlocker signal_blocker(SIGPROF);
static const intptr_t kMaxEvents = 16;
struct epoll_event events[kMaxEvents];
EventHandler* handler = reinterpret_cast<EventHandler*>(args);
EventHandlerImplementation* handler_impl = &handler->delegate_;
ASSERT(handler_impl != NULL);
while (!handler_impl->shutdown_) {
intptr_t result = TEMP_FAILURE_RETRY_NO_SIGNAL_BLOCKER(
epoll_wait(handler_impl->epoll_fd_, events, kMaxEvents, -1));
ASSERT(EAGAIN == EWOULDBLOCK);
if (result <= 0) {
if (errno != EWOULDBLOCK) {
perror("Poll failed");
}
} else {
handler_impl->HandleEvents(events, result);
}
}
DEBUG_ASSERT(ReferenceCounted<Socket>::instances() == 0);
handler->NotifyShutdownDone();
}
全局只有一处,就是以上Poll()
方法。其实这里的核心代码也只有一行:handler_impl->HandleEvents(events, result)
,我们继续跟踪
void EventHandlerImplementation::HandleEvents(struct epoll_event* events,int size) {
bool interrupt_seen = false;
for (int i = 0; i < size; i++) {
if (events[i].data.ptr == NULL) {
interrupt_seen = true;
} else if (events[i].data.fd == timer_fd_) {
int64_t val;
VOID_TEMP_FAILURE_RETRY_NO_SIGNAL_BLOCKER(
read(timer_fd_, &val, sizeof(val)));
if (timeout_queue_.HasTimeout()) {
DartUtils::PostNull(timeout_queue_.CurrentPort());
timeout_queue_.RemoveCurrent();
}
UpdateTimerFd();
} else {
DescriptorInfo* di =
reinterpret_cast<DescriptorInfo*>(events[i].data.ptr);
const intptr_t old_mask = di->Mask();
const intptr_t event_mask = GetPollEvents(events[i].events, di);
if ((event_mask & (1 << kErrorEvent)) != 0) {
di->NotifyAllDartPorts(event_mask);
UpdateEpollInstance(old_mask, di);
} else if (event_mask != 0) {
Dart_Port port = di->NextNotifyDartPort(event_mask);
ASSERT(port != 0);
UpdateEpollInstance(old_mask, di);
DartUtils::PostInt32(port, event_mask);
}
}
}
if (interrupt_seen) {
HandleInterruptFd();
}
}
这里就是处理事件的地方了。还记得我们前面说的,关于struct epoll_event
结构体成员的赋值吗?显然,这里应该走interrupt_seen = true;
的逻辑。中间大段代码被跳过,最终直接执行HandleInterruptFd()
:
void EventHandlerImplementation::HandleInterruptFd() {
const intptr_t MAX_MESSAGES = kInterruptMessageSize;
InterruptMessage msg[MAX_MESSAGES];
ssize_t bytes = TEMP_FAILURE_RETRY_NO_SIGNAL_BLOCKER(
read(interrupt_fds_[0], msg, MAX_MESSAGES * kInterruptMessageSize));
for (ssize_t i = 0; i < bytes / kInterruptMessageSize; i++) {
if (msg[i].id == kTimerId) {
timeout_queue_.UpdateTimeout(msg[i].dart_port, msg[i].data);
UpdateTimerFd();
} else if (msg[i].id == kShutdownId) {
shutdown_ = true;
} else {
ASSERT((msg[i].data & COMMAND_MASK) != 0);
Socket* socket = reinterpret_cast<Socket*>(msg[i].id);
RefCntReleaseScope<Socket> rs(socket);
if (socket->fd() == -1) {
continue;
}
DescriptorInfo* di =
GetDescriptorInfo(socket->fd(), IS_LISTENING_SOCKET(msg[i].data));
if (IS_COMMAND(msg[i].data, kShutdownReadCommand)) {
ASSERT(!di->IsListeningSocket());
// Close the socket for reading.
VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_RD));
}
// ......省略部分事件处理代码
else if (IS_COMMAND(msg[i].data, kSetEventMaskCommand)) {
// `events` can only have kInEvent/kOutEvent flags set.
intptr_t events = msg[i].data & EVENT_MASK;
ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent)));
intptr_t old_mask = di->Mask();
di->SetPortAndMask(msg[i].dart_port, msg[i].data & EVENT_MASK);
UpdateEpollInstance(old_mask, di);
} else {
UNREACHABLE();
}
}
}
}
显然,此处读取出InterruptMessage
消息,并做事件处理,我省略了部分其他事件处理的逻辑,直接看kSetEventMaskCommand
事件的处理代码。还记得我们在Dart层生成掩码的代码吗:int flags = 1 << setEventMaskCommand
,正好与此处对应了。它首先将端口和掩码设置到DescriptorInfo
对象中,然后调用了UpdateEpollInstance(old_mask, di)
方法,看名称也知道,此处是修改epoll的监听,让我们来看看,具体是怎么修改的:
void EventHandlerImplementation::UpdateEpollInstance(intptr_t old_mask,DescriptorInfo* di) {
intptr_t new_mask = di->Mask();
if ((old_mask != 0) && (new_mask == 0)) {
RemoveFromEpollInstance(epoll_fd_, di);
} else if ((old_mask == 0) && (new_mask != 0)) {
AddToEpollInstance(epoll_fd_, di);
} else if ((old_mask != 0) && (new_mask != 0) && (old_mask != new_mask)) {
ASSERT(!di->IsListeningSocket());
RemoveFromEpollInstance(epoll_fd_, di);
AddToEpollInstance(epoll_fd_, di);
}
}
显然的,new_mask
和old_mask
都不为0,走最后的分支,也就是说,先删除之前的epoll监听,也就是对管道的监听。然后添加了一个新的对di
的监听:
static void AddToEpollInstance(intptr_t epoll_fd_, DescriptorInfo* di) {
struct epoll_event event;
event.events = EPOLLRDHUP | di->GetPollEvents();
if (!di->IsListeningSocket()) {
event.events |= EPOLLET;
}
event.data.ptr = di;
int status =
NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, di->fd(), &event));
if (status == -1) {
di->NotifyAllDartPorts(1 << kCloseEvent);
}
}
可见,依然是对系统APIepoll_ctl
的封装,具体监听的文件描述符,其实就是DescriptorInfo
对象中设置的文件描述符。那么这个传进来的DescriptorInfo
对象又是哪来的呢?
Socket* socket = reinterpret_cast<Socket*>(msg[i].id);
if (socket->fd() == -1) {
continue;
}
DescriptorInfo* di =
GetDescriptorInfo(socket->fd(), IS_LISTENING_SOCKET(msg[i].data));
其实就是监听的管道消息发过来的Socket
的文件描述符,也就是我们一开始创建连接打开的那个文件描述符。这一系列操作绕来绕去,其实就是干嘛呢?就是利用linux内核的epoll去监听我们最开始创建的连接。
假如此时服务器给我们的客户端发来了一条消息,那么接收消息的逻辑在哪呢?还是跟前面的管道一样,根据epoll机制,消息发过来由epoll_wait
函数处理。如此一来,又回到了我们前面的Poll
方法。我们再仔细看看之前跳过了一大段逻辑的HandleEvents
方法:
void EventHandlerImplementation::HandleEvents(struct epoll_event* events,int size) {
bool interrupt_seen = false;
for (int i = 0; i < size; i++) {
if (events[i].data.ptr == NULL) {
...
} else if (events[i].data.fd == timer_fd_) {
...
} else {
DescriptorInfo* di =
reinterpret_cast<DescriptorInfo*>(events[i].data.ptr);
const intptr_t old_mask = di->Mask();
const intptr_t event_mask = GetPollEvents(events[i].events, di);
if ((event_mask & (1 << kErrorEvent)) != 0) {
di->NotifyAllDartPorts(event_mask);
UpdateEpollInstance(old_mask, di);
} else if (event_mask != 0) {
Dart_Port port = di->NextNotifyDartPort(event_mask);
ASSERT(port != 0);
UpdateEpollInstance(old_mask, di);
DartUtils::PostInt32(port, event_mask);
}
}
}
if (interrupt_seen) {
HandleInterruptFd();
}
}
这一次,显然走else
分支。如果没有错误,那么应该从DescriptorInfo
中取出Dart_Port
,最后应该调用DartUtils::PostInt32(port, event_mask)
通过之前保存的发送端口,给Dart上层发消息。这里发送的应该就是网络IO可读的事件掩码。
看到此处,我们可以舒一口气了,整个流程基本梳理了一遍。但是有些小伙伴可能要质疑了,你前面不是说创建子线程工作,跨线程通信吗?怎么前面源码剖析的部分压根没看见线程呢?
其实前面我为了源码跟踪思路的流畅,故意忽略了一个问题,那就是Poll
方法是谁调用的?我们讲到WakeupHandler
方法向匿名管道发消息,然后马上就跳到Poll
方法收消息,这显然是有问题的,中间缺失了一环。
这里就不卖关子了,直接回到EventHandler
创建之处,来剖析一下,整个EventHandler
是怎么运转的。
void EventHandler::Start() {
...
event_handler = new EventHandler();
event_handler->delegate_.Start(event_handler);
...
}
可以看到,这里的Start
是子类实现的:
void EventHandlerImplementation::Start(EventHandler* handler) {
int result = Thread::Start("dart:io EventHandler", &EventHandlerImplementation::Poll,
reinterpret_cast<uword>(handler));
if (result != 0) {
FATAL("Failed to start event handler thread %d", result);
}
}
可见,EventHandler
其实是在一开始就创建了一个内部子线程,这个子线程专门用来调用Poll
方法执行事件处理逻辑。也就是说,Poll
里面执行的方法,都是在VM的另一个子线程中,因此Dart层的主线程必须并且只能通过端口通信的方法,与底层交互。
PS: 多说一句,在我们前面的Dart VM 线程池剖析一文中提到过OSThread
类,那个类就是Dart对操作系统线程的抽象,并且在不同系统平台有不同的实现文件。然而这里又冒出来一个Thread
类,它也是Dart对操作系统线程的抽象,根据不同系统平台有不同的实现文件。由此我们可以看出,Dart VM源码存在一些混乱,应该也是经历了几波不同的人接手,代码不统一,存在各写各的工具类的问题。
最后我们看看,EventHandler::Start
方法是哪里调用的呢?经过检索,我们发现其实是在sdk\runtime\bin\dart_embedder_api_impl.cc
中:
bool InitOnce(char** error) {
if (!bin::DartUtils::SetOriginalWorkingDirectory()) {
bin::OSError err;
*error = MallocFormatedString("Error determining current directory: %s\n",
err.message());
return false;
}
bin::TimerUtils::InitOnce();
bin::Process::Init();
#if !defined(DART_IO_SECURE_SOCKET_DISABLED)
bin::SSLFilter::Init();
#endif
bin::EventHandler::Start();
return true;
}
这里的InitOnce
,则是在整个虚拟机创建时调用的,也就是C++的入口,main
函数中进行调用初始化。
关注公众号:编程之路从0到1
原文链接:https://juejin.cn/post/7325132211311738906 作者:编程之路从0到1