Skip to content

WebSocket Subscription

Use when: you need a persistent, server-pushed connection — a live chat feed, real-time dashboard, presence indicators, or a collaborative document. Don’t use when: you only need server-sent data occasionally and polling would suffice; a persistent socket has reconnect overhead and connection limits.

Open the socket in init, push received messages into state with patch, and close the socket in the dispose system event so it is always cleaned up.

class
class ChatCubit
ChatCubit
extends
class Cubit<S extends object = any, Args = void, Deps extends object = Record<string, never>>

Cubit<S> is a StateContainer<S> with emit / patch exposed as public mutation surface. Today it adds nothing structurally beyond StateContainer — both are inherited from the underlying StructuralContainer<S>. Kept as a real class (not a type alias) because downstream code does instance instanceof Cubit checks (see A2 audit).

The class body is intentionally empty: a no-op emit override would still go through applyState, and patch is inherited from StructuralContainer (path-diffed, microtask-flushed). If a caller needs the old "skip if no real change" patch semantics, they can wrap patch themselves or call emit after a manual equality check.

Cubit
<
interface ChatState
ChatState
, {
channelId: string
channelId
: string }> {
private
ChatCubit.socket: ReconnectingWebSocket | null
socket
:
class ReconnectingWebSocket
ReconnectingWebSocket
| null = null;
constructor() {
super({
ChatState.messages: ChatMessage[]
messages
: [],
ChatState.connected: boolean
connected
: false,
ChatState.error: string | null
error
: null });
this.
StateContainer<ChatState, { channelId: string; }, Record<string, never>>.onSystemEvent: <"dispose">(event: "dispose", handler: SystemEventHandler<ChatState, "dispose">) => (() => void)
onSystemEvent
('dispose', () => {
// Always close on disposal — avoid orphaned connections.
this.
ChatCubit.socket: ReconnectingWebSocket | null
socket
?.
ReconnectingWebSocket.close(): void
close
();
this.
ChatCubit.socket: ReconnectingWebSocket | null
socket
= null;
});
}
protected override
ChatCubit.init({ channelId }: {
channelId: string;
}): void

Called once after construction with the args passed at acquire time, before the first state snapshot is read by any consumer. Override to seed args-derived state (via this.emit(...)) or kick off loads.

init
({
channelId: string
channelId
}: {
channelId: string
channelId
: string }) {
this.
ChatCubit.connect(channelId: string): void
connect
(
channelId: string
channelId
);
}
private
ChatCubit.connect(channelId: string): void
connect
(
channelId: string
channelId
: string) {
const
const ws: ReconnectingWebSocket
ws
= new
constructor ReconnectingWebSocket(url: string): ReconnectingWebSocket
ReconnectingWebSocket
(`${
const WS_URL: string
WS_URL
}/chat/${
channelId: string
channelId
}`);
this.
ChatCubit.socket: ReconnectingWebSocket | null
socket
=
const ws: ReconnectingWebSocket
ws
;
const ws: ReconnectingWebSocket
ws
.
ReconnectingWebSocket.onopen: (() => void) | null
onopen
= () => {
this.
StateContainer<ChatState, { channelId: string; }, Record<string, never>>.patch(partial: {
messages?: readonly {
id?: string | undefined;
author?: string | undefined;
text?: string | undefined;
timestamp?: number | undefined;
}[] | undefined;
connected?: boolean | undefined;
error?: string | null | undefined;
}): void

Override of StructuralContainer.patch that routes through the StateContainer concerns: disposed guard, dev-only emit-rate check, _changedWhileHydrating flag, pending-change capture (so legacy listeners and stateChanged system events see the merged prev/next), and the registry-level stateChanged notification. We still call super.patch so path-marking semantics (the whole point of patch) are preserved.

patch
({
connected?: boolean | undefined
connected
: true,
error?: string | null | undefined
error
: null });
};
const ws: ReconnectingWebSocket
ws
.
ReconnectingWebSocket.onclose: (() => void) | null
onclose
= () => {
this.
StateContainer<ChatState, { channelId: string; }, Record<string, never>>.patch(partial: {
messages?: readonly {
id?: string | undefined;
author?: string | undefined;
text?: string | undefined;
timestamp?: number | undefined;
}[] | undefined;
connected?: boolean | undefined;
error?: string | null | undefined;
}): void

Override of StructuralContainer.patch that routes through the StateContainer concerns: disposed guard, dev-only emit-rate check, _changedWhileHydrating flag, pending-change capture (so legacy listeners and stateChanged system events see the merged prev/next), and the registry-level stateChanged notification. We still call super.patch so path-marking semantics (the whole point of patch) are preserved.

patch
({
connected?: boolean | undefined
connected
: false });
};
const ws: ReconnectingWebSocket
ws
.
ReconnectingWebSocket.onerror: ((e: Event) => void) | null
onerror
= (
e: Event
e
) => {
// ⚠️ Never forward raw WebSocket error events to an analytics sink —
// they may contain auth tokens present in the connection URL.
this.
StateContainer<ChatState, { channelId: string; }, Record<string, never>>.patch(partial: {
messages?: readonly {
id?: string | undefined;
author?: string | undefined;
text?: string | undefined;
timestamp?: number | undefined;
}[] | undefined;
connected?: boolean | undefined;
error?: string | null | undefined;
}): void

Override of StructuralContainer.patch that routes through the StateContainer concerns: disposed guard, dev-only emit-rate check, _changedWhileHydrating flag, pending-change capture (so legacy listeners and stateChanged system events see the merged prev/next), and the registry-level stateChanged notification. We still call super.patch so path-marking semantics (the whole point of patch) are preserved.

patch
({
connected?: boolean | undefined
connected
: false,
error?: string | null | undefined
error
: 'Connection error' });
};
const ws: ReconnectingWebSocket
ws
.
ReconnectingWebSocket.onmessage: ((e: {
data: string;
}) => void) | null
onmessage
= ({
data: string
data
}) => {
// Parse incoming data defensively — the server may send system frames.
let
let message: ChatMessage
message
:
interface ChatMessage
ChatMessage
;
try {
let message: ChatMessage
message
=
var JSON: JSON

An intrinsic object that provides functions to convert JavaScript values to and from the JavaScript Object Notation (JSON) format.

JSON
.
JSON.parse(text: string, reviver?: (this: any, key: string, value: any) => any): any

Converts a JavaScript Object Notation (JSON) string into an object.

@paramtext A valid JSON string.

@paramreviver A function that transforms the results. This function is called for each member of the object. If a member contains nested objects, the nested objects are transformed before the parent object is.

@throws{SyntaxError} If text is not valid JSON.

parse
(
data: string
data
) as
interface ChatMessage
ChatMessage
;
} catch {
return; // ignore malformed frames
}
this.
StateContainer<ChatState, { channelId: string; }, Record<string, never>>.patch(partial: {
messages?: readonly {
id?: string | undefined;
author?: string | undefined;
text?: string | undefined;
timestamp?: number | undefined;
}[] | undefined;
connected?: boolean | undefined;
error?: string | null | undefined;
}): void

Override of StructuralContainer.patch that routes through the StateContainer concerns: disposed guard, dev-only emit-rate check, _changedWhileHydrating flag, pending-change capture (so legacy listeners and stateChanged system events see the merged prev/next), and the registry-level stateChanged notification. We still call super.patch so path-marking semantics (the whole point of patch) are preserved.

patch
({
messages?: readonly {
id?: string | undefined;
author?: string | undefined;
text?: string | undefined;
timestamp?: number | undefined;
}[] | undefined
messages
: [...this.
StructuralContainer<ChatState>.state: ChatState
state
.
ChatState.messages: ChatMessage[]
messages
,
let message: ChatMessage
message
],
});
};
}
ChatCubit.sendMessage: (text: string) => void
sendMessage
= (
text: string
text
: string) => {
if (!this.
StructuralContainer<ChatState>.state: ChatState
state
.
ChatState.connected: boolean
connected
|| !this.
ChatCubit.socket: ReconnectingWebSocket | null
socket
) return;
this.
ChatCubit.socket: ReconnectingWebSocket
socket
.
ReconnectingWebSocket.send(data: string): void
send
(
var JSON: JSON

An intrinsic object that provides functions to convert JavaScript values to and from the JavaScript Object Notation (JSON) format.

JSON
.
JSON.stringify(value: any, replacer?: (this: any, key: string, value: any) => any, space?: string | number): string (+1 overload)

Converts a JavaScript value to a JavaScript Object Notation (JSON) string.

@paramvalue A JavaScript value, usually an object or array, to be converted.

@paramreplacer A function that transforms the results.

@paramspace Adds indentation, white space, and line break characters to the return-value JSON text to make it easier to read.

@throws{TypeError} If a circular reference or a BigInt value is found.

stringify
({
text: string
text
}));
};
}
function ChatRoom({ channelId }: { channelId: string }) {
// args: { channelId } both seeds init() and keys the instance — one per channel
const [state, chat] = useBloc(ChatCubit, {
args: { channelId },
});
return (
<div>
<span>{state.connected ? '🟢 Connected' : '🔴 Disconnected'}</span>
<ul>
{state.messages.map((m) => (
<li key={m.id}>
<strong>{m.author}</strong>: {m.text}
</li>
))}
</ul>
<MessageInput onSend={chat.sendMessage} disabled={!state.connected} />
</div>
);
}