Merge pull request 'move-to-sse' (#1) from move-to-sse into main

Reviewed-on: #1
main
lambda 1 year ago
commit 80b85f0c11

@ -0,0 +1 @@
@radioiceberg:registry=https://gitea.bjornmossa.net/api/packages/Radioiceberg/npm/

@ -2,6 +2,12 @@
All notable changes to this project will be documented in this file. See [conventional commits](https://www.conventionalcommits.org/) for commit guidelines.
- - -
## 0.2.0 - 2024-03-17
#### Features
- move from WebSockets to SSE - (7ea83a6) - Arseniusz
- - -
## 0.1.3 - 2024-01-27
#### Bug Fixes
- change commands for backward compat - (f8bc4de) - Arseniusz

@ -5,5 +5,5 @@ COPY package*.json ./
RUN npm ci --omit=dev
COPY src/. .
EXPOSE 4000 7000
EXPOSE 4000
CMD [ "node", "main.js" ]

@ -11,6 +11,9 @@ start:
build:
docker build . -t $(IMAGE_NAME):$(GIT_TAG)
build-experimental:
docker build . -t $(IMAGE_NAME):experimental
bump-latest:
docker tag $(IMAGE_NAME):$(GIT_TAG) $(IMAGE_NAME):latest

35
package-lock.json generated

@ -9,36 +9,21 @@
"version": "1.0.0",
"license": "GPL-3.0-or-later",
"dependencies": {
"ws": "^8.13.0"
"@radioiceberg/message-processor": "^0.0.1"
}
},
"node_modules/ws": {
"version": "8.14.2",
"resolved": "https://registry.npmjs.org/ws/-/ws-8.14.2.tgz",
"integrity": "sha512-wEBG1ftX4jcglPxgFCMJmZ2PLtSbJ2Peg6TmpJFTbe9GZYOQCDPdMYu/Tm0/bGZkw8paZnJY45J4K2PZrLYq8g==",
"engines": {
"node": ">=10.0.0"
},
"peerDependencies": {
"bufferutil": "^4.0.1",
"utf-8-validate": ">=5.0.2"
},
"peerDependenciesMeta": {
"bufferutil": {
"optional": true
},
"utf-8-validate": {
"optional": true
}
}
"node_modules/@radioiceberg/message-processor": {
"version": "0.0.1",
"resolved": "https://gitea.bjornmossa.net/api/packages/Radioiceberg/npm/%40radioiceberg%2Fmessage-processor/-/0.0.1/message-processor-0.0.1.tgz",
"integrity": "sha512-6hfnRGOXisOZCBueS1wYExFopMiK7Ig+UplP9pJQH7kWPmlnSFE33G0cU/gBuCUrS3v81uDJ8FGBse6H88dptw==",
"license": "GPL-3.0-or-later"
}
},
"dependencies": {
"ws": {
"version": "8.14.2",
"resolved": "https://registry.npmjs.org/ws/-/ws-8.14.2.tgz",
"integrity": "sha512-wEBG1ftX4jcglPxgFCMJmZ2PLtSbJ2Peg6TmpJFTbe9GZYOQCDPdMYu/Tm0/bGZkw8paZnJY45J4K2PZrLYq8g==",
"requires": {}
"@radioiceberg/message-processor": {
"version": "0.0.1",
"resolved": "https://gitea.bjornmossa.net/api/packages/Radioiceberg/npm/%40radioiceberg%2Fmessage-processor/-/0.0.1/message-processor-0.0.1.tgz",
"integrity": "sha512-6hfnRGOXisOZCBueS1wYExFopMiK7Ig+UplP9pJQH7kWPmlnSFE33G0cU/gBuCUrS3v81uDJ8FGBse6H88dptw=="
}
}
}

@ -18,6 +18,6 @@
},
"homepage": "https://github.com/Arseniusz/radioiceberg#readme",
"dependencies": {
"ws": "^8.13.0"
"@radioiceberg/message-processor": "^0.0.1"
}
}

@ -0,0 +1,25 @@
import { events, eventsStream } from "./events-stream.js";
import { state } from "./state.js";
function metadataChange(data) {
state.lastPlayed = data;
eventsStream.emit(events.stateChange, { action: "metadataChange", data });
}
function liveStarted() {
state.isOnline = true;
eventsStream.emit(events.stateChange, { action: "liveStarted" });
}
function liveEnded() {
state.isOnline = false;
eventsStream.emit(events.stateChange, { action: "liveEnded" });
}
const actions = {
metadataChange,
liveStarted,
liveEnded,
};
export { actions };

@ -0,0 +1,11 @@
import { EventEmitter } from "events";
class EventStream extends EventEmitter {}
const events = Object.freeze({
stateChange: "stateChange",
});
const eventsStream = new EventStream();
export { eventsStream, events };

@ -1,49 +1,24 @@
import { createServer } from "node:http";
import { randomUUID } from "node:crypto";
import { WebSocketServer } from "ws";
import { notifier } from "./notifier.js";
import { actions } from "./actions.js";
import { parse } from "node:url";
import { routes } from "./routes/index.js";
import { messageProcessor } from "@radioiceberg/message-processor";
const wss = new WebSocketServer({ port: 7000 });
wss.on("connection", (ws) => {
const id = randomUUID();
const message = {
action: "clientConnected",
data: {
socket: ws,
id,
},
};
notifier.processMessage(message);
ws.on("close", () => {
const message = {
action: "cliendDisconnected",
data: {
id,
},
};
notifier.processMessage(message);
});
});
messageProcessor.actions = actions;
createServer((req, res) => {
let body = "";
if (!req.url) res.end();
var url = parse(req.url);
var path = url.pathname.replace(/^\/+|\/$/g, "").toLowerCase();
var data = "";
req.on("data", (chunk) => {
body += chunk;
data += chunk;
});
req.on("end", () => {
try {
const message = JSON.parse(body);
notifier.processMessage(message);
} catch (error) {
console.error(error);
}
var route = routes[path] ?? routes["notFound"];
return route(req, res, data);
});
res.end();
}).listen(4000);

@ -1,60 +0,0 @@
import { state } from "./state.js";
function updateMeta(message) {
const { data } = message;
this.state.lastPlayed = data;
this.notifyAll(message);
}
function startOnline(message) {
this.state.isOnline = true;
this.notifyAll(message);
}
function stopOnline(message) {
this.state.isOnline = false;
this.notifyAll(message);
}
function addClient({ data }) {
const { socket, id } = data;
this.clients.set(id, socket);
this.notifyOne(socket, { action: "setState", data: this.state });
}
function deleteClient({ data }) {
const { id } = data;
this.clients.delete(id);
}
const actions = {
clientConnected: addClient,
cliendDisconnected: deleteClient,
metadataChange: updateMeta,
liveStarted: startOnline,
liveEnded: stopOnline,
};
const notifier = {
__proto__: state,
notifyAll: function (message) {
const messageStrinified = JSON.stringify(message);
this.clients.forEach((c) => c.send(messageStrinified));
},
notifyOne: function (socket, message) {
const messageStrinified = JSON.stringify(message);
console.log("NOTIFY ONE:", message);
socket.send(messageStrinified);
},
processMessage: function (message) {
const action = actions[message.action];
if (action) {
action.call(this, message);
console.log("PERFORM ACTION", message.action);
} else {
console.log("no avaible command for state mutation", message.action);
}
},
};
export { notifier };

@ -0,0 +1,11 @@
import { notFound } from "./notfound.js";
import { post } from "./post.js";
import { liveData } from "./livedata.js";
const routes = {
livedata: liveData,
post,
notFound,
};
export { routes };

@ -0,0 +1,25 @@
import { state } from "../state.js";
import { events, eventsStream } from "../events-stream.js";
import { SSE } from "../sse.js";
/**
* @param {import('http').ServerResponse} res somebody Somebody's name.
*/
function liveData(req, res) {
const sse = new SSE(res);
// Client connected. Send last saved state
const startupMessage = {
action: "setState",
data: state,
};
sse.send(startupMessage);
// Subscribe on events update and send it to client
eventsStream.on(events.stateChange, (data) => {
sse.send(data);
});
}
export { liveData };

@ -0,0 +1,9 @@
/**
* @param {import('http').ServerResponse} res somebody Somebody's name.
*/
function notFound(_, res) {
res.writeHead(404);
res.end();
}
export { notFound };

@ -0,0 +1,13 @@
import { messageProcessor } from "@radioiceberg/message-processor";
/**
* @param {import('http').ServerResponse} res somebody Somebody's name.
* @param {string} data possibly transformable to json string data.
*/
function post(_, res, data) {
messageProcessor.processMessage(data);
res.writeHead(202);
res.end();
}
export { post };

@ -0,0 +1,27 @@
import { EOL } from "os";
class SSE {
/**
* @param {import('http').ServerResponse} res somebody Somebody's name.
*/
constructor(res) {
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
this.res = res;
this.id = 0;
}
send(data) {
try {
this.res.write(`id: ${++this.id}${EOL}`);
this.res.write(`data: ${JSON.stringify(data)}${EOL}`);
this.res.write(`${EOL}`);
} catch (e) {
console.error("Can not send SSE event", e);
}
}
}
export { SSE };

@ -1,9 +1,6 @@
const state = {
state: {
lastPlayed: null,
isOnline: false,
},
clients: new Map(),
};
export { state };