Reasons: - We need only one directional channel - SSE easier to proxy - we can use only one port for apppull/1/head
parent
98252d9f57
commit
7ea83a6224
@ -0,0 +1 @@
|
|||||||
|
@radioiceberg:registry=https://gitea.bjornmossa.net/api/packages/Radioiceberg/npm/
|
||||||
@ -0,0 +1,25 @@
|
|||||||
|
import { events, eventsStream } from "./events-stream.js";
|
||||||
|
import { state } from "./state.js";
|
||||||
|
|
||||||
|
function metadataChange(data) {
|
||||||
|
state.lastPlayed = data;
|
||||||
|
eventsStream.emit(events.stateChange, { action: "metadataChange", data });
|
||||||
|
}
|
||||||
|
|
||||||
|
function liveStarted() {
|
||||||
|
state.isOnline = true;
|
||||||
|
eventsStream.emit(events.stateChange, { action: "liveStarted" });
|
||||||
|
}
|
||||||
|
|
||||||
|
function liveEnded() {
|
||||||
|
state.isOnline = false;
|
||||||
|
eventsStream.emit(events.stateChange, { action: "liveEnded" });
|
||||||
|
}
|
||||||
|
|
||||||
|
const actions = {
|
||||||
|
metadataChange,
|
||||||
|
liveStarted,
|
||||||
|
liveEnded,
|
||||||
|
};
|
||||||
|
|
||||||
|
export { actions };
|
||||||
@ -0,0 +1,11 @@
|
|||||||
|
import { EventEmitter } from "events";
|
||||||
|
|
||||||
|
class EventStream extends EventEmitter {}
|
||||||
|
|
||||||
|
const events = Object.freeze({
|
||||||
|
stateChange: "stateChange",
|
||||||
|
});
|
||||||
|
|
||||||
|
const eventsStream = new EventStream();
|
||||||
|
|
||||||
|
export { eventsStream, events };
|
||||||
@ -1,49 +1,24 @@
|
|||||||
import { createServer } from "node:http";
|
import { createServer } from "node:http";
|
||||||
import { randomUUID } from "node:crypto";
|
import { actions } from "./actions.js";
|
||||||
import { WebSocketServer } from "ws";
|
import { parse } from "node:url";
|
||||||
import { notifier } from "./notifier.js";
|
import { routes } from "./routes/index.js";
|
||||||
|
import { messageProcessor } from "@radioiceberg/message-processor";
|
||||||
|
|
||||||
const wss = new WebSocketServer({ port: 7000 });
|
messageProcessor.actions = actions;
|
||||||
|
|
||||||
wss.on("connection", (ws) => {
|
|
||||||
const id = randomUUID();
|
|
||||||
const message = {
|
|
||||||
action: "clientConnected",
|
|
||||||
data: {
|
|
||||||
socket: ws,
|
|
||||||
id,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
notifier.processMessage(message);
|
|
||||||
|
|
||||||
ws.on("close", () => {
|
|
||||||
const message = {
|
|
||||||
action: "cliendDisconnected",
|
|
||||||
data: {
|
|
||||||
id,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
notifier.processMessage(message);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
createServer((req, res) => {
|
createServer((req, res) => {
|
||||||
let body = "";
|
if (!req.url) res.end();
|
||||||
|
|
||||||
|
var url = parse(req.url);
|
||||||
|
var path = url.pathname.replace(/^\/+|\/$/g, "").toLowerCase();
|
||||||
|
var data = "";
|
||||||
|
|
||||||
req.on("data", (chunk) => {
|
req.on("data", (chunk) => {
|
||||||
body += chunk;
|
data += chunk;
|
||||||
});
|
});
|
||||||
|
|
||||||
req.on("end", () => {
|
req.on("end", () => {
|
||||||
try {
|
var route = routes[path] ?? routes["notFound"];
|
||||||
const message = JSON.parse(body);
|
return route(req, res, data);
|
||||||
notifier.processMessage(message);
|
|
||||||
} catch (error) {
|
|
||||||
console.error(error);
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
|
|
||||||
res.end();
|
|
||||||
}).listen(4000);
|
}).listen(4000);
|
||||||
|
|||||||
@ -1,60 +0,0 @@
|
|||||||
import { state } from "./state.js";
|
|
||||||
|
|
||||||
function updateMeta(message) {
|
|
||||||
const { data } = message;
|
|
||||||
this.state.lastPlayed = data;
|
|
||||||
this.notifyAll(message);
|
|
||||||
}
|
|
||||||
|
|
||||||
function startOnline(message) {
|
|
||||||
this.state.isOnline = true;
|
|
||||||
this.notifyAll(message);
|
|
||||||
}
|
|
||||||
|
|
||||||
function stopOnline(message) {
|
|
||||||
this.state.isOnline = false;
|
|
||||||
this.notifyAll(message);
|
|
||||||
}
|
|
||||||
|
|
||||||
function addClient({ data }) {
|
|
||||||
const { socket, id } = data;
|
|
||||||
this.clients.set(id, socket);
|
|
||||||
this.notifyOne(socket, { action: "setState", data: this.state });
|
|
||||||
}
|
|
||||||
|
|
||||||
function deleteClient({ data }) {
|
|
||||||
const { id } = data;
|
|
||||||
this.clients.delete(id);
|
|
||||||
}
|
|
||||||
|
|
||||||
const actions = {
|
|
||||||
clientConnected: addClient,
|
|
||||||
cliendDisconnected: deleteClient,
|
|
||||||
metadataChange: updateMeta,
|
|
||||||
liveStarted: startOnline,
|
|
||||||
liveEnded: stopOnline,
|
|
||||||
};
|
|
||||||
|
|
||||||
const notifier = {
|
|
||||||
__proto__: state,
|
|
||||||
notifyAll: function (message) {
|
|
||||||
const messageStrinified = JSON.stringify(message);
|
|
||||||
this.clients.forEach((c) => c.send(messageStrinified));
|
|
||||||
},
|
|
||||||
notifyOne: function (socket, message) {
|
|
||||||
const messageStrinified = JSON.stringify(message);
|
|
||||||
console.log("NOTIFY ONE:", message);
|
|
||||||
socket.send(messageStrinified);
|
|
||||||
},
|
|
||||||
processMessage: function (message) {
|
|
||||||
const action = actions[message.action];
|
|
||||||
if (action) {
|
|
||||||
action.call(this, message);
|
|
||||||
console.log("PERFORM ACTION", message.action);
|
|
||||||
} else {
|
|
||||||
console.log("no avaible command for state mutation", message.action);
|
|
||||||
}
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
export { notifier };
|
|
||||||
@ -0,0 +1,11 @@
|
|||||||
|
import { notFound } from "./notfound.js";
|
||||||
|
import { post } from "./post.js";
|
||||||
|
import { liveData } from "./livedata.js";
|
||||||
|
|
||||||
|
const routes = {
|
||||||
|
livedata: liveData,
|
||||||
|
post,
|
||||||
|
notFound,
|
||||||
|
};
|
||||||
|
|
||||||
|
export { routes };
|
||||||
@ -0,0 +1,25 @@
|
|||||||
|
import { state } from "../state.js";
|
||||||
|
import { events, eventsStream } from "../events-stream.js";
|
||||||
|
import { SSE } from "../sse.js";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param {import('http').ServerResponse} res somebody Somebody's name.
|
||||||
|
*/
|
||||||
|
function liveData(req, res) {
|
||||||
|
const sse = new SSE(res);
|
||||||
|
|
||||||
|
// Client connected. Send last saved state
|
||||||
|
const startupMessage = {
|
||||||
|
action: "setState",
|
||||||
|
data: state,
|
||||||
|
};
|
||||||
|
|
||||||
|
sse.send(startupMessage);
|
||||||
|
|
||||||
|
// Subscribe on events update and send it to client
|
||||||
|
eventsStream.on(events.stateChange, (data) => {
|
||||||
|
sse.send(data);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
export { liveData };
|
||||||
@ -0,0 +1,9 @@
|
|||||||
|
/**
|
||||||
|
* @param {import('http').ServerResponse} res somebody Somebody's name.
|
||||||
|
*/
|
||||||
|
function notFound(_, res) {
|
||||||
|
res.writeHead(404);
|
||||||
|
res.end();
|
||||||
|
}
|
||||||
|
|
||||||
|
export { notFound };
|
||||||
@ -0,0 +1,13 @@
|
|||||||
|
import { messageProcessor } from "@radioiceberg/message-processor";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param {import('http').ServerResponse} res somebody Somebody's name.
|
||||||
|
* @param {string} data possibly transformable to json string data.
|
||||||
|
*/
|
||||||
|
function post(_, res, data) {
|
||||||
|
messageProcessor.processMessage(data);
|
||||||
|
res.writeHead(202);
|
||||||
|
res.end();
|
||||||
|
}
|
||||||
|
|
||||||
|
export { post };
|
||||||
@ -0,0 +1,27 @@
|
|||||||
|
import { EOL } from "os";
|
||||||
|
|
||||||
|
class SSE {
|
||||||
|
/**
|
||||||
|
* @param {import('http').ServerResponse} res somebody Somebody's name.
|
||||||
|
*/
|
||||||
|
constructor(res) {
|
||||||
|
res.setHeader("Content-Type", "text/event-stream");
|
||||||
|
res.setHeader("Cache-Control", "no-cache");
|
||||||
|
res.setHeader("Connection", "keep-alive");
|
||||||
|
|
||||||
|
this.res = res;
|
||||||
|
this.id = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
send(data) {
|
||||||
|
try {
|
||||||
|
this.res.write(`id: ${++this.id}${EOL}`);
|
||||||
|
this.res.write(`data: ${JSON.stringify(data)}${EOL}`);
|
||||||
|
this.res.write(`${EOL}`);
|
||||||
|
} catch (e) {
|
||||||
|
console.error("Can not send SSE event", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export { SSE };
|
||||||
@ -1,9 +1,6 @@
|
|||||||
const state = {
|
const state = {
|
||||||
state: {
|
lastPlayed: null,
|
||||||
lastPlayed: null,
|
isOnline: false,
|
||||||
isOnline: false,
|
|
||||||
},
|
|
||||||
clients: new Map(),
|
|
||||||
};
|
};
|
||||||
|
|
||||||
export { state };
|
export { state };
|
||||||
|
|||||||
Reference in new issue