From 7ea83a62245cf07b18a5418d0314a9992010b534 Mon Sep 17 00:00:00 2001 From: Arseniusz Date: Sun, 17 Mar 2024 15:01:27 +0300 Subject: [PATCH] feat!: move from WebSockets to SSE Reasons: - We need only one directional channel - SSE easier to proxy - we can use only one port for app --- .npmrc | 1 + Dockerfile | 2 +- Makefile | 3 +++ package-lock.json | 35 +++++++----------------- package.json | 2 +- src/actions.js | 25 ++++++++++++++++++ src/events-stream.js | 11 ++++++++ src/main.js | 51 +++++++++-------------------------- src/notifier.js | 60 ------------------------------------------ src/routes/index.js | 11 ++++++++ src/routes/livedata.js | 25 ++++++++++++++++++ src/routes/notfound.js | 9 +++++++ src/routes/post.js | 13 +++++++++ src/sse.js | 27 +++++++++++++++++++ src/state.js | 7 ++--- 15 files changed, 152 insertions(+), 130 deletions(-) create mode 100644 .npmrc create mode 100644 src/actions.js create mode 100644 src/events-stream.js delete mode 100644 src/notifier.js create mode 100644 src/routes/index.js create mode 100644 src/routes/livedata.js create mode 100644 src/routes/notfound.js create mode 100644 src/routes/post.js create mode 100644 src/sse.js diff --git a/.npmrc b/.npmrc new file mode 100644 index 0000000..1da4592 --- /dev/null +++ b/.npmrc @@ -0,0 +1 @@ +@radioiceberg:registry=https://gitea.bjornmossa.net/api/packages/Radioiceberg/npm/ \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 2f28a2c..b6c6ce1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,5 +5,5 @@ COPY package*.json ./ RUN npm ci --omit=dev COPY src/. . -EXPOSE 4000 7000 +EXPOSE 4000 CMD [ "node", "main.js" ] \ No newline at end of file diff --git a/Makefile b/Makefile index a9c89dc..e5e9c9b 100644 --- a/Makefile +++ b/Makefile @@ -11,6 +11,9 @@ start: build: docker build . -t $(IMAGE_NAME):$(GIT_TAG) +build-experimental: + docker build . -t $(IMAGE_NAME):experimental + bump-latest: docker tag $(IMAGE_NAME):$(GIT_TAG) $(IMAGE_NAME):latest diff --git a/package-lock.json b/package-lock.json index 88eb9f8..413f604 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,36 +9,21 @@ "version": "1.0.0", "license": "GPL-3.0-or-later", "dependencies": { - "ws": "^8.13.0" + "@radioiceberg/message-processor": "^0.0.1" } }, - "node_modules/ws": { - "version": "8.14.2", - "resolved": "https://registry.npmjs.org/ws/-/ws-8.14.2.tgz", - "integrity": "sha512-wEBG1ftX4jcglPxgFCMJmZ2PLtSbJ2Peg6TmpJFTbe9GZYOQCDPdMYu/Tm0/bGZkw8paZnJY45J4K2PZrLYq8g==", - "engines": { - "node": ">=10.0.0" - }, - "peerDependencies": { - "bufferutil": "^4.0.1", - "utf-8-validate": ">=5.0.2" - }, - "peerDependenciesMeta": { - "bufferutil": { - "optional": true - }, - "utf-8-validate": { - "optional": true - } - } + "node_modules/@radioiceberg/message-processor": { + "version": "0.0.1", + "resolved": "https://gitea.bjornmossa.net/api/packages/Radioiceberg/npm/%40radioiceberg%2Fmessage-processor/-/0.0.1/message-processor-0.0.1.tgz", + "integrity": "sha512-6hfnRGOXisOZCBueS1wYExFopMiK7Ig+UplP9pJQH7kWPmlnSFE33G0cU/gBuCUrS3v81uDJ8FGBse6H88dptw==", + "license": "GPL-3.0-or-later" } }, "dependencies": { - "ws": { - "version": "8.14.2", - "resolved": "https://registry.npmjs.org/ws/-/ws-8.14.2.tgz", - "integrity": "sha512-wEBG1ftX4jcglPxgFCMJmZ2PLtSbJ2Peg6TmpJFTbe9GZYOQCDPdMYu/Tm0/bGZkw8paZnJY45J4K2PZrLYq8g==", - "requires": {} + "@radioiceberg/message-processor": { + "version": "0.0.1", + "resolved": "https://gitea.bjornmossa.net/api/packages/Radioiceberg/npm/%40radioiceberg%2Fmessage-processor/-/0.0.1/message-processor-0.0.1.tgz", + "integrity": "sha512-6hfnRGOXisOZCBueS1wYExFopMiK7Ig+UplP9pJQH7kWPmlnSFE33G0cU/gBuCUrS3v81uDJ8FGBse6H88dptw==" } } } diff --git a/package.json b/package.json index adc2d2c..8a7276e 100644 --- a/package.json +++ b/package.json @@ -18,6 +18,6 @@ }, "homepage": "https://github.com/Arseniusz/radioiceberg#readme", "dependencies": { - "ws": "^8.13.0" + "@radioiceberg/message-processor": "^0.0.1" } } diff --git a/src/actions.js b/src/actions.js new file mode 100644 index 0000000..a3b0f8c --- /dev/null +++ b/src/actions.js @@ -0,0 +1,25 @@ +import { events, eventsStream } from "./events-stream.js"; +import { state } from "./state.js"; + +function metadataChange(data) { + state.lastPlayed = data; + eventsStream.emit(events.stateChange, { action: "metadataChange", data }); +} + +function liveStarted() { + state.isOnline = true; + eventsStream.emit(events.stateChange, { action: "liveStarted" }); +} + +function liveEnded() { + state.isOnline = false; + eventsStream.emit(events.stateChange, { action: "liveEnded" }); +} + +const actions = { + metadataChange, + liveStarted, + liveEnded, +}; + +export { actions }; diff --git a/src/events-stream.js b/src/events-stream.js new file mode 100644 index 0000000..e976af8 --- /dev/null +++ b/src/events-stream.js @@ -0,0 +1,11 @@ +import { EventEmitter } from "events"; + +class EventStream extends EventEmitter {} + +const events = Object.freeze({ + stateChange: "stateChange", +}); + +const eventsStream = new EventStream(); + +export { eventsStream, events }; diff --git a/src/main.js b/src/main.js index 354d01e..c49b850 100644 --- a/src/main.js +++ b/src/main.js @@ -1,49 +1,24 @@ import { createServer } from "node:http"; -import { randomUUID } from "node:crypto"; -import { WebSocketServer } from "ws"; -import { notifier } from "./notifier.js"; +import { actions } from "./actions.js"; +import { parse } from "node:url"; +import { routes } from "./routes/index.js"; +import { messageProcessor } from "@radioiceberg/message-processor"; -const wss = new WebSocketServer({ port: 7000 }); - -wss.on("connection", (ws) => { - const id = randomUUID(); - const message = { - action: "clientConnected", - data: { - socket: ws, - id, - }, - }; - - notifier.processMessage(message); - - ws.on("close", () => { - const message = { - action: "cliendDisconnected", - data: { - id, - }, - }; - - notifier.processMessage(message); - }); -}); +messageProcessor.actions = actions; createServer((req, res) => { - let body = ""; + if (!req.url) res.end(); + + var url = parse(req.url); + var path = url.pathname.replace(/^\/+|\/$/g, "").toLowerCase(); + var data = ""; req.on("data", (chunk) => { - body += chunk; + data += chunk; }); req.on("end", () => { - try { - const message = JSON.parse(body); - notifier.processMessage(message); - } catch (error) { - console.error(error); - } + var route = routes[path] ?? routes["notFound"]; + return route(req, res, data); }); - - res.end(); }).listen(4000); diff --git a/src/notifier.js b/src/notifier.js deleted file mode 100644 index 59baa90..0000000 --- a/src/notifier.js +++ /dev/null @@ -1,60 +0,0 @@ -import { state } from "./state.js"; - -function updateMeta(message) { - const { data } = message; - this.state.lastPlayed = data; - this.notifyAll(message); -} - -function startOnline(message) { - this.state.isOnline = true; - this.notifyAll(message); -} - -function stopOnline(message) { - this.state.isOnline = false; - this.notifyAll(message); -} - -function addClient({ data }) { - const { socket, id } = data; - this.clients.set(id, socket); - this.notifyOne(socket, { action: "setState", data: this.state }); -} - -function deleteClient({ data }) { - const { id } = data; - this.clients.delete(id); -} - -const actions = { - clientConnected: addClient, - cliendDisconnected: deleteClient, - metadataChange: updateMeta, - liveStarted: startOnline, - liveEnded: stopOnline, -}; - -const notifier = { - __proto__: state, - notifyAll: function (message) { - const messageStrinified = JSON.stringify(message); - this.clients.forEach((c) => c.send(messageStrinified)); - }, - notifyOne: function (socket, message) { - const messageStrinified = JSON.stringify(message); - console.log("NOTIFY ONE:", message); - socket.send(messageStrinified); - }, - processMessage: function (message) { - const action = actions[message.action]; - if (action) { - action.call(this, message); - console.log("PERFORM ACTION", message.action); - } else { - console.log("no avaible command for state mutation", message.action); - } - }, -}; - -export { notifier }; diff --git a/src/routes/index.js b/src/routes/index.js new file mode 100644 index 0000000..8746885 --- /dev/null +++ b/src/routes/index.js @@ -0,0 +1,11 @@ +import { notFound } from "./notfound.js"; +import { post } from "./post.js"; +import { liveData } from "./livedata.js"; + +const routes = { + livedata: liveData, + post, + notFound, +}; + +export { routes }; diff --git a/src/routes/livedata.js b/src/routes/livedata.js new file mode 100644 index 0000000..4b3b5f5 --- /dev/null +++ b/src/routes/livedata.js @@ -0,0 +1,25 @@ +import { state } from "../state.js"; +import { events, eventsStream } from "../events-stream.js"; +import { SSE } from "../sse.js"; + +/** + * @param {import('http').ServerResponse} res somebody Somebody's name. + */ +function liveData(req, res) { + const sse = new SSE(res); + + // Client connected. Send last saved state + const startupMessage = { + action: "setState", + data: state, + }; + + sse.send(startupMessage); + + // Subscribe on events update and send it to client + eventsStream.on(events.stateChange, (data) => { + sse.send(data); + }); +} + +export { liveData }; diff --git a/src/routes/notfound.js b/src/routes/notfound.js new file mode 100644 index 0000000..0388ad0 --- /dev/null +++ b/src/routes/notfound.js @@ -0,0 +1,9 @@ +/** + * @param {import('http').ServerResponse} res somebody Somebody's name. + */ +function notFound(_, res) { + res.writeHead(404); + res.end(); +} + +export { notFound }; diff --git a/src/routes/post.js b/src/routes/post.js new file mode 100644 index 0000000..7579f89 --- /dev/null +++ b/src/routes/post.js @@ -0,0 +1,13 @@ +import { messageProcessor } from "@radioiceberg/message-processor"; + +/** + * @param {import('http').ServerResponse} res somebody Somebody's name. + * @param {string} data possibly transformable to json string data. + */ +function post(_, res, data) { + messageProcessor.processMessage(data); + res.writeHead(202); + res.end(); +} + +export { post }; diff --git a/src/sse.js b/src/sse.js new file mode 100644 index 0000000..3be8131 --- /dev/null +++ b/src/sse.js @@ -0,0 +1,27 @@ +import { EOL } from "os"; + +class SSE { + /** + * @param {import('http').ServerResponse} res somebody Somebody's name. + */ + constructor(res) { + res.setHeader("Content-Type", "text/event-stream"); + res.setHeader("Cache-Control", "no-cache"); + res.setHeader("Connection", "keep-alive"); + + this.res = res; + this.id = 0; + } + + send(data) { + try { + this.res.write(`id: ${++this.id}${EOL}`); + this.res.write(`data: ${JSON.stringify(data)}${EOL}`); + this.res.write(`${EOL}`); + } catch (e) { + console.error("Can not send SSE event", e); + } + } +} + +export { SSE }; diff --git a/src/state.js b/src/state.js index e8ba7d2..b7a879d 100644 --- a/src/state.js +++ b/src/state.js @@ -1,9 +1,6 @@ const state = { - state: { - lastPlayed: null, - isOnline: false, - }, - clients: new Map(), + lastPlayed: null, + isOnline: false, }; export { state };