从0到1实现一个小型comlink(仿comlink)

WebWorker-专用线程-从0到1实现一个小型comlink(仿comlink)

目前Q&A,看看有无需要了解的部分

  • Q1:为什么在Proxy中返回一个fn,需要bind?
  • Q2:为什么Await一个Proxy会产生对then的一次拦截?

对于webWorker的封装,主要还是使用一些策略方法来处理调用方的请求,即在eventCallback中额外增加一个区别不同请求的key(type),以便于处理不同的任务

第一步,处理worker中的的message事件

expose用于为ep(Worker/MessagePort/etc)增加一个监听事件,一般对worker中需要暴露的对象使用

这里的expose方法不是最终的expose方法,会随着后续而改动

//先不用在意ts类型
const expose = (
    rawObject: unknown,
    ep: Endpoint = globalThis as any,
    allowedOrigins: string[] = ['*']
) => {
    ep.addEventListener('message', function call(ev: MessageEvent<MessageValue<WireValue>>) {
        if (!ev || !ev.data) return; //TODO catchError;
        if (!allowedOrigins.includes(`*`) && !allowedOrigins.includes(ev.origin as (typeof allowedOrigins)[number])) return; //TODO catchError;
		const { id, path = [], type, value } = ev.data; //这里是内部用于传递数据的自定义类型
        const parentObj = path.slice(0, -1).reduce((obj, prop) => obj[prop], rawObject);//取到其父对象,用其父对象调用方法,保证this指向不变
        const propKey = path.slice(-1)[0];
        let returnValue: any;
        switch (type) {
            case MessageType.GET:
                {
                    returnValue = parentObj[propKey];
                }
                break;
            default:
                {
                    returnValue = undefined;
                }
                break;
        }
        if ('start' in ep) ep.start();
        ep.postMessage(
            {
                id,
              	returnValue
            }
        );
    });
    if ('start' in ep) {
		ep.start();
	}
};

第二步,代理调用方的操作

处理完接收方,我们还需要处理调用方的逻辑,在处理调用方逻辑之前,我们还需要封装一个用于发送消息的函数,通过调用createMessage向另一端发送消息,同时监听另一端处理完成数据之后返回的结果(也就是expose函数中的ep.postMessage)将其转换为一个Promise

const createMessage = (ep: Endpoint, obj: Partial<MessageValue>, transfer?: Transferable[]) => {
	const id = generateUUID();//创建一个随机ID,用于区分不同的消息,因为存在多个文件调用同一个worker的情况,所以需要一个id去确认使用的双方
	return new Promise((_res, _rej) => {
		ep.addEventListener('message', function call(ev: MessageEvent<MessageValue>) {
			if (!ev || !ev.data || ev.data.id !== id) return _rej();//这里应该直接return,后面会改动,并赋有解释
			ep.removeEventListener('message', call);
			_res(ev.data);
		});
		if ('start' in ep) ep.start();
		ep.postMessage(
			{
				id,
				...obj,
			},
			transfer
		);
	});
};

现在让我们来处理调用方的逻辑,将调用方对worker的调用,变为主线程让proxy去帮他操作worker,而交给调用方的是一个Promise

const wrap = (ep: Endpoint) => {
	return createProxy(ep);
};
const createProxy = (ep: Endpoint, path: string[] = []) => {
	const proxy = new Proxy(
		{},//这里应该function(){},后面会改动,并赋有解释
		{
			get(_, prop) {
				if (prop === 'then') {//这里表示只有 prop 为 then 时才发送消息执行取值操作,体现在 Comlink 上就是针对代理对象的操作都需要使用 await 获取。
					//thenable
					const r = createMessage(ep, {
						type: MessageType.GET,
						path: path,
					})
					return r.then.bind(r);//这里必须要返回bind之后的then
				}
				return createProxy(ep, [...path, prop]);
			},
		}
	);
	return proxy;
};

好了,我们对于wroker的封装已经初具雏形了,现在让我们来试着使用一下,在使用之前我们先用tsc转译成js文件

//main.js
import { wrap } from './harverWorker.js';
const proxy = wrap(new Worker('./worker1.js'));
console.log(await proxy.name);//22
console.log(proxy.name);//proxy

//worker1.js
importScripts('selfWorker.js');
// 要共享的对象或函数
const myObject = {
	name: '22',
};
MyWorker.expose(myObject);

结果是符合预期的

从0到1实现一个小型comlink(仿comlink)

Q1:为什么在Proxy中返回一个fn,需要bind?

因为Proxy存在一定的局限性,对于一些JS中的内置对象,比如Map,Promise等等,这些都使用了内部插槽。内部插槽类似于属性,但是仅限内部使用,用于在对象上记录状态或数据。

例如Map将项目存储在[[MapData]],内置的方法就可以直接访问他们,不会通过[[Get]/[[Set]]],所以Proxy拦截不到。

对于proxy,它会将目标对象上的方法和访问器内部的this的指向修改为代理对象,即使Proxy未代理任何操作(handler为{})

这些对象被代理之后,调用proxy.set相当于Map.prototype.set.call(proxy, 1, 2)Map.prototype.set即访问内部槽proxy.[[MapData]]。由于proxy对象没有此内部槽,将抛出错误。所以调用内置的方法将会失败

const map = new Map();
const proxy = new Proxy(map, {});
console.log(proxy.set === new Map().set)//true
proxy.set('test', 1); // Error

从0到1实现一个小型comlink(仿comlink)

其实,这个挺好理解,函数的调用者是谁,this就指向谁

我们可以手动将this指向改为正确的对象

const proxy = new Proxy(map, {
  get(target, property, receiver) {
    const value = Reflect.get(target, property, receiver)
    if (target instanceof Map) {
      value = value.bind(target)
    }
    return value
  },
})

对于Promise实例,proxy上没有 [[PromiseState]]内部槽,所以需要将then进行bind

Q2:为什么Await一个Proxy会产生对then的一次拦截?

thenable:thenable对象指的是具有then方法的对象thenable = {then: function (resolve, reject) {resolve(42);}};

这是因为 await v,会将其转换为Promise.resolve(v)

Promise.resolve()静态方法将给定值“解析”为 Promise,如果值是 promise,则返回该 promise;如果该值是 thenable,则将调用其then方法,并为其准备的两个回调;

如果要确定一个对象是否是thenable,需要判断其是否具有then方法,所以表现在proxy中,就是对其then的一次拦截

注意一点避免被绕进去,判断具有then方法,指的是拦截then之后返回的得是一个fn,而不是一个obj,如果不是fn,await 得到的不是返回的obj,而是原来的obj(也就是proxy)

第三步,实现SET

能够传递给worker的数据是能够被结构化克隆算法处理的数据,但是函数不具备这种性质,所以目前还不能赋值属性为一个function

const createProxy = (ep: Endpoint, path: string[] = []) => {
    const proxy = new Proxy(
        {},
        {
            get(_, prop) {
                if (prop === 'then') {
                    const r = createMessage(ep, {
                        type: MessageType.GET,
                        path: path,
                    });
                    return r.then.bind(r);
                }
                return createProxy(ep, [...path, prop]);
            },
            set(_, prop, newValue) {
                return createMessage(ep, {
                    type: MessageType.SET,
                    path: [...path, prop].map(p => p.toString()),
                    value: newValue,
                })
            },
        }
    );
    return proxy;
};

	switch (type) {
			case MessageType.GET:
				{
					returnValue = parentObj[propKey];
				}
				break;
			case MessageType.SET:
				{
					parentObj[propKey] = value;
					returnValue = true;
				}
				break;
			default:
				{
					returnValue = undefined;
				}
				break;
		}

这里需要注意的一个地方是,在createMessage中不应该对id不相同的消息给reject掉,

原因是因为set操作没有阻塞的(没有使用await),所以如果在set下面再读取另一端的属性(await proxy.name),这时就会在上一个SET的事件没removeEventListener前又addEventListener一个GET的事件,这样在任意一个事件解绑前,worker那边发送的消息都会给这两个callback都去发送一次,所以id这时就可以去区分真正需要的消息,因此我们不能给他reject掉,否则可能会导致在正确的消息传过来之前我们对传过来的错误消息throw error,从而让promise达到终态

例如await proxy.name 如果我们在其前面有proxy.name=222,那么set可以正确的被处理,但是到了get时,get的callback会先收到set的一次message,然后我们如果进行了rej,那么createMessage所创建的promise会变为终态,这时获取值的操作已经有结果了,后面虽然会再收到一次正确的get message 但是promise的状态已经无法发生更改了,只能继续流程取消callback

同时他也不是很符合reject,因为本来使用的过程中就会存在多个EventListener,这里直接return是最正确的处理

之前我以为开源作者漏写了rej,结果发现还是自己太年轻。。。

第四步,实现APPLY

在实现之前,有个地方要注意一下Proxy的target应该为一个function,原因是当我们调用例如proxy.getName()时,是会先走get获取getName的值,发现是function然后执行到()会被apply捕获。

但在我们的实现中,非then都会返回一个Proxy,并非函数,所以当遇到getName的时候会返回一个proxy,然后执行()就会报错,提示is not a function,但是如果我们的target是一个function,相当于就是proxy(),这样是可以成功调用的

const proxy1 = new Proxy(function () {}, {});
console.log(proxy1(1, 2));//success run
//expose
switch (type) {
    case MessageType.GET:
        {
            returnValue = parentObj[propKey];
        }
        break;
    case MessageType.SET:
        {
            parentObj[propKey] = value;
            returnValue = true;
        }
        break;
    case MessageType.APPLY:
        {
            returnValue = parentObj[propKey].apply(parentObj, ...value);
        }
        break;
    default:
        {
            returnValue = undefined;
        }
        break;
}
//createProxy
const createProxy = (ep: Endpoint, path:string[] = []) => {
	const proxy = new Proxy(function () {}, {
		get(_, prop) {
			if (prop === 'then') {
				//这里表示只有 prop 为 then 时才发送消息执行取值操作,体现在 Comlink 上就是针对代理对象的操作都需要使用 await 获取。
				//thenable
				const r = createMessage(ep, {
					type: MessageType.GET,
					path: path,
				});
				return r.then.bind(r);
			}
			return createProxy(ep, [...path, prop]);
		},
		set(_, prop, newValue) {
			return createMessage(ep, {
				type: MessageType.SET,
				path: [...path, prop].map(p => p.toString()),
				value: newValue,
			}) as any;
		},
		apply(_, __, rawArgumentList) {
			return createMessage(ep, {
				type: MessageType.APPLY,
				path: [...path].map(p => p.toString()),
				value: rawArgumentList,
			}) as any;
		},
	});
	return proxy;
};

第五步,整理内置数据传递格式 & 实现可转移对象

是时候统一一下内部通信时的传递格式了,WeakMap是一种弱引用映射,我们可以利用他来实现可转移对象与raw Object的绑定

注意一个地方,在实际使用中,worker中引入的模块和我们在主线程引入模块是两个不同的模块,也就是说transferCacheObject所记录的映射在主线程和worker中是不同的


const transferCacheObject = new WeakMap<any, Transferable[]>();

const fromWireValue = (val: WireValue) => {
	switch (val.type) {
		case ValueType.RAW: {
			return val.value;
		}
	}
};
const toWireValue = (val: any): [WireValue, Transferable[]] => {
	return [
		{
			type: ValueType.RAW,
			value: val,
		},
		transferCacheObject.get(val) ?? [],
	];
};
const transfer = <T>(obj: T, transfers: Transferable[]) => {
	transferCacheObject.set(obj, transfers);
	return obj;
};

将我们所有有数据传输的地方都给它套上

第六步,实现函数传递并保留上下文

目前在我们的的实现中无法实现函数的传递,但是我们可以借助MessageChannel来改变一种实现的方式,MessageChannel可以在不同线程中通信,那么我们也可以仿造代理worker线程一样,对MessageChannel也进行代理,具体来说MessageChannel会给出两个port(port1、port2),我们可以代理port1,接受来自port2的msg并做出处理

测试一下

从0到1实现一个小型comlink(仿comlink)

符合我们预期

第七步,回收信道

对于已经完成计算的worker/port,需要关闭ep

FinalizationRegistry可以在对值进行垃圾回收时请求回调,也就是GC时会调用的清理callback,一般调用就表示这个值已经被垃圾回收了

const releaseProxy = Symbol('releaseProxy');
const proxyFinalizers =
'FinalizationRegistry' in globalThis &&
new FinalizationRegistry((ep: Endpoint) => {
proxyCount.set(ep, (proxyCount.get(ep) ?? 1) - 1);
if (!proxyCount.get(ep)) {
releaseEndpoint(ep);
}
});
function isMessagePort(endpoint: Endpoint): endpoint is MessagePort {
return endpoint.constructor.name === 'MessagePort';
}
function closeEndPoint(endpoint: Endpoint) {
if (isMessagePort(endpoint)) endpoint.close();
}
const releaseEndpoint = (ep: Endpoint) => {
return createMessage(ep, { type: MessageType.RELEASE }).then(_ => {
closeEndPoint(ep);
});
};
const unregisterProxy = (proxy: any) => {
if (proxyFinalizers) {
proxyFinalizers.unregister(proxy);
}
};
const registerProxy = (proxy: any, ep: Endpoint) => {
proxyCount.set(ep, (proxyCount.get(ep) ?? 0) + 1);
if (proxyFinalizers) {
proxyFinalizers.register(proxy, ep, proxy);
}
};
const throwIfProxyReleased = (isReleased: boolean) => {
if (isReleased) {
throw new Error('Proxy has been released and is not useable');
}
};
const createProxy = (ep: Endpoint, path: string[] = []) => {
let isProxyReleased = false;
const proxy = new Proxy(function () {}, {
get(_, prop) {
throwIfProxyReleased(isProxyReleased);
if (prop === releaseProxy) {
return () => {
unregisterProxy(proxy);
releaseEndpoint(ep);
isProxyReleased = true;
};
}
if (prop === 'then') {
//这里表示只有 prop 为 then 时才发送消息执行取值操作,体现在 Comlink 上就是针对代理对象的操作都需要使用 await 获取。
//thenable
const r = createMessage(ep, {
type: MessageType.GET,
path: path,
}).then(fromWireValue);
return r.then.bind(r);
}
return createProxy(ep, [...path, prop]);
},
set(_, prop, newValue) {
const [wireValue, transfer] = toWireValue(newValue);
return createMessage(
ep,
{
type: MessageType.SET,
path: [...path, prop].map(p => p.toString()),
value: wireValue,
},
transfer
) as any;
},
apply(_, __, rawArgumentList) {
const [wireValues, transfers] = rawArgumentList.map(toWireValue).reduce(
(array, cur) => {
array = [
[...array[0], cur[0]],
[...array[1], ...cur[1]],
];
return array;
},
[[], []]
);
return createMessage(
ep,
{
type: MessageType.APPLY,
path: [...path].map(p => p.toString()),
value: wireValues,
},
transfers
).then(fromWireValue) as any;
},
});
registerProxy(proxy, ep);
return proxy;
};
const expose = (rawObject: unknown, ep: Endpoint = globalThis as any, allowedOrigins: string[] = ['*']) => {
ep.addEventListener('message', function call(ev: MessageEvent<MessageValue<any>>) {
if (!ev || !ev.data) return; //TODO catchError;
if (!allowedOrigins.includes(`*`) && !allowedOrigins.includes(ev.origin as (typeof allowedOrigins)[number])) return; //TODO catchError;
const { id, path = [], type, value } = ev.data; //这里是内部用于传递数据的自定义类型
const parentObj = path.slice(0, -1).reduce((obj, prop) => obj[prop], rawObject); //取到其父对象,用其父对象调用方法,保证this指向不变
const rawValue = path.reduce((obj, prop) => obj[prop], rawObject) as any;
let returnValue: any;
switch (type) {
case MessageType.GET:
{
returnValue = rawValue;
}
break;
case MessageType.SET:
{
parentObj[path.slice(-1)[0]] = fromWireValue(value);
returnValue = true;
}
break;
case MessageType.APPLY:
{
returnValue = rawValue.apply(parentObj, value.map(fromWireValue));
}
break;
case MessageType.RELEASE:
default:
{
returnValue = undefined;
}
break;
}
if ('start' in ep) ep.start();
Promise.resolve(returnValue).then(value => {
const [wireValue, transfer] = toWireValue(value);
ep.postMessage(
{
...wireValue,
id,
},
transfer
);
if (type === MessageType.RELEASE) {
//释放
ep.removeEventListener('message', call as any);
closeEndPoint(ep);
}
});
});
if ('start' in ep) {
ep.start();
}
};

第八步,错误处理

const throwMarker = Symbol('error');
const throwHandler: Handle<ErrorValue, any> = {
canHandle: value => isObject(value) && obj[throwMarker],
serialize(obj: unknown) {
if (obj instanceof Error) {
return [
{
isError: true,
value: {
message: obj.message,
name: obj.name,
stack: obj.stack,
},
},
[],
];
} else {
return [{ isError: false, value: obj }, []];
}
},
deserialize(serialized) {
if (serialized.isError) {
throw Object.assign(new Error(serialized.value.message), serialized.value);
}
throw serialized.value;
},
};
const handles = new Map<string, Handle<any, any>>([
['proxyMarker', transferHandle],
['throwMarker', throwHandler],
]);
const expose = (rawObject: unknown, ep: Endpoint = globalThis as any, allowedOrigins: string[] = ['*']) => {
ep.addEventListener('message', function call(ev: MessageEvent<MessageValue<any>>) {
if (!ev || !ev.data) return; //TODO catchError;
if (!allowedOrigins.includes(`*`) && !allowedOrigins.includes(ev.origin as (typeof allowedOrigins)[number])) return; //TODO catchError;
const { id, path = [], type, value } = ev.data; //这里是内部用于传递数据的自定义类型
let returnValue: any;
try {
const parentObj = path.slice(0, -1).reduce((obj, prop) => obj[prop], rawObject); //取到其父对象,用其父对象调用方法,保证this指向不变
const rawValue = path.reduce((obj, prop) => obj[prop], rawObject) as any;
switch (type) {
case MessageType.GET:
{
returnValue = rawValue;
}
break;
case MessageType.SET:
{
parentObj[path.slice(-1)[0]] = fromWireValue(value);
returnValue = true;
}
break;
case MessageType.APPLY:
{
returnValue = rawValue.apply(parentObj, value.map(fromWireValue));
}
break;
case MessageType.RELEASE:
default:
{
returnValue = undefined;
}
break;
}
} catch (value) {
returnValue = { value, throwMarker: true };
}
Promise.resolve(returnValue)
.catch(value => {
return { value, [throwMarker]: 0 };
})
.then(value => {
const [wireValue, transfer] = toWireValue(value);
ep.postMessage(
{
...wireValue,
id,
},
transfer
);
if (type === MessageType.RELEASE) {
//释放
ep.removeEventListener('message', call as any);
closeEndPoint(ep);
}
})
.catch(value => {
return { value, [throwMarker]: 0 };
});
});
if ('start' in ep) {
ep.start();
}
};

到此,我们对worker主要的封装处理已经完毕了,其中错误还望指出,后续会及时修改

原文链接:https://juejin.cn/post/7325791449664176180 作者:迦逻

(0)
上一篇 2024年1月21日 上午10:26
下一篇 2024年1月21日 上午10:36

相关推荐

发表回复

登录后才能评论