diff options
| author | AAGaming <aa@mail.catvibers.me> | 2023-07-03 23:11:44 -0400 |
|---|---|---|
| committer | marios8543 <marios8543@gmail.com> | 2023-11-14 00:03:47 +0200 |
| commit | 05b41b341027dc80e62935a2d0a953c847bdb44b (patch) | |
| tree | 69251b158c0de09ebfec4eea1f29a82612ace24f | |
| parent | 18d89e76fd1f90066fbad08000c71ba149d2a4aa (diff) | |
| download | decky-loader-05b41b341027dc80e62935a2d0a953c847bdb44b.tar.gz decky-loader-05b41b341027dc80e62935a2d0a953c847bdb44b.zip | |
more progress on WS router
| -rw-r--r-- | backend/decky_loader/wsrouter.py | 49 | ||||
| -rw-r--r-- | frontend/src/wsrouter.ts | 129 |
2 files changed, 160 insertions, 18 deletions
diff --git a/backend/decky_loader/wsrouter.py b/backend/decky_loader/wsrouter.py index 84b83179..9c8fe424 100644 --- a/backend/decky_loader/wsrouter.py +++ b/backend/decky_loader/wsrouter.py @@ -15,23 +15,24 @@ class MessageType(Enum): CALL = 0 REPLY = 1 ERROR = 2 - # Pub/sub + # # Pub/sub + # SUBSCRIBE = 3 + # UNSUBSCRIBE = 4 + # PUBLISH = 5 -running_calls: Dict[str, Future] = {} - -subscriptions: Dict[str, Callable[[Any]]] - -# {type: MessageType, data: dta, id: id} +# see wsrouter.ts for typings class WSRouter: def __init__(self) -> None: self.ws = None self.req_id = 0 self.routes = {} + self.running_calls: Dict[int, Future] = {} + # self.subscriptions: Dict[str, Callable[[Any]]] = {} self.logger = getLogger("WSRouter") - async def add_routes(self, routes): - self.routes.update(routes) + async def add_route(self, name, route): + self.routes[name] = route async def handle(self, request): self.logger.debug('Websocket connection starting') @@ -39,6 +40,13 @@ class WSRouter: await ws.prepare(request) self.logger.debug('Websocket connection ready') + if self.ws != None: + try: + await self.ws.close() + except: + pass + self.ws = None + self.ws = ws try: @@ -56,19 +64,30 @@ class WSRouter: data = msg.json() if self.routes[data.route]: try: - res = await self.routes[data.route](data.data) - await ws.send_json({type: MessageType.REPLY, id: data.id, data: res}) + res = await self.routes[data.route](*data.args) + await self.write({"type": MessageType.REPLY, "id": data.id, "result": res}) + self.logger.debug(f"Started PY call {data.route} ID {data.id}") except: - await ws.send_json({type: MessageType.ERROR, id: data.ud, data: format_exc()}) + await self.write({"type": MessageType.ERROR, "id": data.id, "error": format_exc()}) + else: + await self.write({"type": MessageType.ERROR, "id": data.id, "error": "Route does not exist."}) case MessageType.REPLY: - if running_calls[data.id]: - running_calls[data.id].set_result(data.data) + if self.running_calls[data.id]: + self.running_calls[data.id].set_result(data.result) + del self.running_calls[data.id] + self.logger.debug(f"Resolved JS call {data.id} with value {str(data.result)}") case MessageType.ERROR: - if running_calls[data.id]: - running_calls[data.id].set_exception(data.data) + if self.running_calls[data.id]: + self.running_calls[data.id].set_exception(data.error) + del self.running_calls[data.id] + self.logger.debug(f"Errored JS call {data.id} with error {data.error}") + + case _: + self.logger.error("Unknown message type", data) finally: try: await ws.close() + self.ws = None except: pass diff --git a/frontend/src/wsrouter.ts b/frontend/src/wsrouter.ts index 8e033f30..b6437568 100644 --- a/frontend/src/wsrouter.ts +++ b/frontend/src/wsrouter.ts @@ -1,14 +1,52 @@ import Logger from './logger'; enum MessageType { + // Call-reply CALL, REPLY, ERROR, + // Pub/sub + // SUBSCRIBE, + // UNSUBSCRIBE, + // PUBLISH +} + +interface CallMessage { + type: MessageType.CALL; + args: any[]; + route: string; + id: number; + // TODO implement this + // skipResponse?: boolean; +} + +interface ReplyMessage { + type: MessageType.REPLY; + result: any; + id: number; +} + +interface ErrorMessage { + type: MessageType.ERROR; + error: any; + id: number; +} + +type Message = CallMessage | ReplyMessage | ErrorMessage; + +// Helper to resolve a promise from the outside +interface PromiseResolver<T> { + resolve: (res: T) => void; + reject: (error: string) => void; + promise: Promise<T>; } class WSRouter extends Logger { - routes: Map<string, (args: any) => any> = new Map(); + routes: Map<string, (...args: any) => any> = new Map(); + runningCalls: Map<number, PromiseResolver<any>> = new Map(); ws?: WebSocket; + // Used to map results and errors to calls + reqId: number = 0; constructor() { super('WSRouter'); } @@ -21,7 +59,92 @@ class WSRouter extends Logger { this.ws.addEventListener('message', this.onError.bind(this)); } - onMessage() {} + createPromiseResolver<T>(): PromiseResolver<T> { + let resolver: PromiseResolver<T>; + const promise = new Promise<T>((resolve, reject) => { + resolver = { + promise, + resolve, + reject, + }; + this.debug('Created new PromiseResolver'); + }); + this.debug('Returning new PromiseResolver'); + // The promise will always run first + // @ts-expect-error 2454 + return resolver; + } + + write(data: Message) { + this.ws?.send(JSON.stringify(data)); + } + + addRoute(name: string, route: (args: any) => any) { + this.routes.set(name, route); + } + + removeRoute(name: string) { + this.routes.delete(name); + } + + async onMessage(msg: MessageEvent) { + this.debug('WS Message', msg); + try { + const data = JSON.parse(msg.data) as Message; + switch (data.type) { + case MessageType.CALL: + if (this.routes.has(data.route)) { + try { + const res = await this.routes.get(data.route)!(...data.args); + this.write({ type: MessageType.REPLY, id: data.id, result: res }); + this.debug(`Started JS call ${data.route} ID ${data.id}`); + } catch (e) { + await this.write({ type: MessageType.ERROR, id: data.id, error: (e as Error)?.stack || e }); + } + } else { + await this.write({ type: MessageType.ERROR, id: data.id, error: 'Route does not exist.' }); + } + break; + + case MessageType.REPLY: + if (this.runningCalls.has(data.id)) { + this.runningCalls.get(data.id)!.resolve(data.result); + this.runningCalls.delete(data.id); + this.debug(`Resolved PY call ${data.id} with value`, data.result); + } + break; + + case MessageType.ERROR: + if (this.runningCalls.has(data.id)) { + this.runningCalls.get(data.id)!.reject(data.error); + this.runningCalls.delete(data.id); + this.debug(`Errored PY call ${data.id} with error`, data.error); + } + break; + + default: + this.error('Unknown message type', data); + break; + } + } catch (e) { + this.error('Error parsing WebSocket message', e); + } + this.call<[number, number], string>('methodName', 1, 2); + } + + call<Args extends any[] = any[], Return = void>(route: string, ...args: Args): Promise<Return> { + const resolver = this.createPromiseResolver<Return>(); + + const id = ++this.reqId; + + this.runningCalls.set(id, resolver); + + this.write({ type: MessageType.CALL, route, args, id }); - onError() {} + return resolver.promise; + } + + onError(error: any) { + this.error('WS ERROR', error); + } } |
