diff --git a/backend/src/index.ts b/backend/src/index.ts index e1dd284..2faf2b1 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -3,6 +3,7 @@ import { cors } from "@elysiajs/cors"; import { taskRoutes } from "./routes/tasks"; import { adminRoutes } from "./routes/admin"; import { projectRoutes } from "./routes/projects"; +import { chatRoutes } from "./routes/chat"; import { auth } from "./lib/auth"; import { db } from "./db"; import { tasks, users } from "./db/schema"; @@ -116,6 +117,7 @@ const app = new Elysia() .use(taskRoutes) .use(projectRoutes) .use(adminRoutes) + .use(chatRoutes) // Current user info (role, etc.) .get("/api/me", async ({ request }) => { diff --git a/backend/src/lib/gateway-relay.ts b/backend/src/lib/gateway-relay.ts new file mode 100644 index 0000000..ace4ceb --- /dev/null +++ b/backend/src/lib/gateway-relay.ts @@ -0,0 +1,244 @@ +/** + * Gateway WebSocket Relay + * + * Maintains a single persistent WebSocket connection to the Clawdbot gateway. + * Dashboard clients connect through the backend (authenticated via BetterAuth), + * and messages are relayed bidirectionally. + * + * Architecture: + * Browser ←WSS→ Dashboard Backend ←WSS→ Clawdbot Gateway + * (BetterAuth) (relay) (token auth) + */ + +const GATEWAY_URL = process.env.GATEWAY_WS_URL || "wss://hammer.donovankelly.xyz"; +const GATEWAY_TOKEN = process.env.GATEWAY_WS_TOKEN || ""; + +type GatewayState = "disconnected" | "connecting" | "connected"; +type MessageHandler = (msg: any) => void; + +let reqCounter = 0; +function nextReqId() { + return `relay-${++reqCounter}`; +} + +class GatewayConnection { + private ws: WebSocket | null = null; + private state: GatewayState = "disconnected"; + private pendingRequests = new Map void; reject: (e: any) => void; timer: ReturnType }>(); + private eventListeners = new Set(); + private reconnectTimer: ReturnType | null = null; + private shouldReconnect = true; + + constructor() { + this.connect(); + } + + private connect() { + if (this.state === "connecting") return; + this.state = "connecting"; + + if (!GATEWAY_TOKEN) { + console.warn("[gateway-relay] No GATEWAY_WS_TOKEN set, chat relay disabled"); + this.state = "disconnected"; + return; + } + + console.log(`[gateway-relay] Connecting to ${GATEWAY_URL}...`); + + try { + this.ws = new WebSocket(GATEWAY_URL); + } catch (e) { + console.error("[gateway-relay] Failed to create WebSocket:", e); + this.state = "disconnected"; + this.scheduleReconnect(); + return; + } + + this.ws.addEventListener("open", () => { + console.log("[gateway-relay] WebSocket open, sending handshake..."); + const connectId = nextReqId(); + this.sendRaw({ + type: "req", + id: connectId, + method: "connect", + params: { + minProtocol: 3, + maxProtocol: 3, + client: { + id: "dashboard-relay", + displayName: "Hammer Dashboard Relay", + version: "1.0.0", + platform: "server", + mode: "webchat", + instanceId: `relay-${process.pid}-${Date.now()}`, + }, + auth: { + token: GATEWAY_TOKEN, + }, + }, + }); + + // Wait for handshake response + this.pendingRequests.set(connectId, { + resolve: () => { + console.log("[gateway-relay] Connected to gateway"); + this.state = "connected"; + }, + reject: (err) => { + console.error("[gateway-relay] Handshake failed:", err); + this.state = "disconnected"; + this.ws?.close(); + }, + timer: setTimeout(() => { + if (this.pendingRequests.has(connectId)) { + this.pendingRequests.delete(connectId); + console.error("[gateway-relay] Handshake timeout"); + this.state = "disconnected"; + this.ws?.close(); + } + }, 15000), + }); + }); + + this.ws.addEventListener("message", (event) => { + try { + const msg = JSON.parse(String(event.data)); + this.handleMessage(msg); + } catch (e) { + console.error("[gateway-relay] Failed to parse message:", e); + } + }); + + this.ws.addEventListener("close", () => { + console.log("[gateway-relay] Disconnected"); + this.state = "disconnected"; + this.ws = null; + // Reject all pending requests + for (const [id, pending] of this.pendingRequests) { + clearTimeout(pending.timer); + pending.reject(new Error("Connection closed")); + this.pendingRequests.delete(id); + } + if (this.shouldReconnect) { + this.scheduleReconnect(); + } + }); + + this.ws.addEventListener("error", (e) => { + console.error("[gateway-relay] WebSocket error"); + }); + } + + private scheduleReconnect() { + if (this.reconnectTimer) return; + this.reconnectTimer = setTimeout(() => { + this.reconnectTimer = null; + if (this.shouldReconnect && this.state === "disconnected") { + this.connect(); + } + }, 5000); + } + + private sendRaw(msg: any) { + if (this.ws?.readyState === WebSocket.OPEN) { + this.ws.send(JSON.stringify(msg)); + } + } + + private handleMessage(msg: any) { + if (msg.type === "res") { + const pending = this.pendingRequests.get(msg.id); + if (pending) { + clearTimeout(pending.timer); + this.pendingRequests.delete(msg.id); + if (msg.ok !== false) { + pending.resolve(msg.payload ?? msg.result ?? {}); + } else { + pending.reject(new Error(msg.error?.message || "Request failed")); + } + } + } else if (msg.type === "event") { + // Forward events to all listeners + for (const listener of this.eventListeners) { + try { + listener(msg); + } catch (e) { + console.error("[gateway-relay] Event listener error:", e); + } + } + } + } + + isConnected(): boolean { + return this.state === "connected"; + } + + async request(method: string, params?: any): Promise { + if (!this.isConnected()) { + throw new Error("Gateway not connected"); + } + + return new Promise((resolve, reject) => { + const id = nextReqId(); + const timer = setTimeout(() => { + if (this.pendingRequests.has(id)) { + this.pendingRequests.delete(id); + reject(new Error("Request timeout")); + } + }, 120000); + + this.pendingRequests.set(id, { resolve, reject, timer }); + this.sendRaw({ type: "req", id, method, params }); + }); + } + + onEvent(handler: MessageHandler): () => void { + this.eventListeners.add(handler); + return () => this.eventListeners.delete(handler); + } + + destroy() { + this.shouldReconnect = false; + if (this.reconnectTimer) clearTimeout(this.reconnectTimer); + if (this.ws) { + try { this.ws.close(); } catch {} + } + this.ws = null; + this.state = "disconnected"; + } +} + +// Singleton gateway connection +export const gateway = new GatewayConnection(); + +/** + * Send a chat message to the gateway + */ +export async function chatSend(sessionKey: string, message: string): Promise { + return gateway.request("chat.send", { + sessionKey, + message, + idempotencyKey: `dash-${Date.now()}-${Math.random().toString(36).slice(2)}`, + }); +} + +/** + * Get chat history from the gateway + */ +export async function chatHistory(sessionKey: string, limit = 50): Promise { + return gateway.request("chat.history", { sessionKey, limit }); +} + +/** + * Abort an in-progress chat response + */ +export async function chatAbort(sessionKey: string): Promise { + return gateway.request("chat.abort", { sessionKey }); +} + +/** + * List sessions from the gateway + */ +export async function sessionsList(limit = 50): Promise { + return gateway.request("sessions.list", { limit }); +} diff --git a/backend/src/routes/chat.ts b/backend/src/routes/chat.ts new file mode 100644 index 0000000..ea56a4f --- /dev/null +++ b/backend/src/routes/chat.ts @@ -0,0 +1,269 @@ +/** + * Chat routes - WebSocket relay + REST fallback for dashboard chat + * + * WebSocket: /api/chat/ws - Real-time bidirectional relay to gateway + * REST: /api/chat/send, /api/chat/history, /api/chat/sessions - Fallback endpoints + */ +import { Elysia } from "elysia"; +import { auth } from "../lib/auth"; +import { gateway, chatSend, chatHistory, chatAbort, sessionsList } from "../lib/gateway-relay"; + +// Track active WebSocket client connections +const activeClients = new Map; +}>(); + +export const chatRoutes = new Elysia() + // WebSocket endpoint for real-time chat relay + .ws("/api/chat/ws", { + open(ws) { + const clientId = `client-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; + (ws.data as any).__clientId = clientId; + (ws.data as any).__authenticated = false; + console.log(`[chat-ws] Client connected: ${clientId}`); + }, + + async message(ws, rawMsg) { + const clientId = (ws.data as any).__clientId || "unknown"; + + let msg: any; + try { + msg = typeof rawMsg === "string" ? JSON.parse(rawMsg) : rawMsg; + } catch { + ws.send(JSON.stringify({ type: "error", error: "Invalid JSON" })); + return; + } + + // First message must be auth + if (!(ws.data as any).__authenticated) { + if (msg.type !== "auth") { + ws.send(JSON.stringify({ type: "error", error: "Must authenticate first" })); + ws.close(); + return; + } + + // Validate session cookie or token + const session = await validateAuth(msg); + if (!session) { + ws.send(JSON.stringify({ type: "error", error: "Authentication failed" })); + ws.close(); + return; + } + + (ws.data as any).__authenticated = true; + (ws.data as any).__userId = session.user.id; + (ws.data as any).__userName = session.user.name || session.user.email; + + // Register client + activeClients.set(clientId, { + ws, + userId: session.user.id, + sessionKeys: new Set(), + }); + + ws.send(JSON.stringify({ + type: "auth_ok", + user: { id: session.user.id, name: session.user.name }, + gatewayConnected: gateway.isConnected(), + })); + + console.log(`[chat-ws] Client authenticated: ${clientId} (${session.user.name || session.user.email})`); + return; + } + + // Handle authenticated messages + try { + await handleClientMessage(clientId, ws, msg); + } catch (e: any) { + ws.send(JSON.stringify({ + type: "error", + id: msg.id, + error: e.message || "Internal error", + })); + } + }, + + close(ws) { + const clientId = (ws.data as any).__clientId || "unknown"; + activeClients.delete(clientId); + console.log(`[chat-ws] Client disconnected: ${clientId}`); + }, + }) + + // REST: Send a chat message + .post("/api/chat/send", async ({ request, body }) => { + const session = await auth.api.getSession({ headers: request.headers }); + if (!session?.user) { + return new Response(JSON.stringify({ error: "Unauthorized" }), { status: 401 }); + } + + const { sessionKey, message } = body as { sessionKey: string; message: string }; + if (!sessionKey || !message) { + return new Response(JSON.stringify({ error: "sessionKey and message required" }), { status: 400 }); + } + + if (!gateway.isConnected()) { + return new Response(JSON.stringify({ error: "Gateway not connected" }), { status: 503 }); + } + + try { + const result = await chatSend(sessionKey, message); + return { ok: true, result }; + } catch (e: any) { + return new Response(JSON.stringify({ error: e.message }), { status: 500 }); + } + }) + + // REST: Get chat history + .get("/api/chat/history/:sessionKey", async ({ request, params }) => { + const session = await auth.api.getSession({ headers: request.headers }); + if (!session?.user) { + return new Response(JSON.stringify({ error: "Unauthorized" }), { status: 401 }); + } + + if (!gateway.isConnected()) { + return new Response(JSON.stringify({ error: "Gateway not connected" }), { status: 503 }); + } + + try { + const result = await chatHistory(params.sessionKey); + return { ok: true, ...result }; + } catch (e: any) { + return new Response(JSON.stringify({ error: e.message }), { status: 500 }); + } + }) + + // REST: List sessions + .get("/api/chat/sessions", async ({ request }) => { + const session = await auth.api.getSession({ headers: request.headers }); + if (!session?.user) { + return new Response(JSON.stringify({ error: "Unauthorized" }), { status: 401 }); + } + + if (!gateway.isConnected()) { + return new Response(JSON.stringify({ error: "Gateway not connected" }), { status: 503 }); + } + + try { + const result = await sessionsList(); + return { ok: true, ...result }; + } catch (e: any) { + return new Response(JSON.stringify({ error: e.message }), { status: 500 }); + } + }) + + // REST: Gateway connection status + .get("/api/chat/status", async ({ request }) => { + const session = await auth.api.getSession({ headers: request.headers }); + if (!session?.user) { + return new Response(JSON.stringify({ error: "Unauthorized" }), { status: 401 }); + } + + return { + gatewayConnected: gateway.isConnected(), + activeClients: activeClients.size, + }; + }); + +// Validate auth from WebSocket auth message +async function validateAuth(msg: any): Promise { + // Support cookie-based auth (pass cookie string) + if (msg.cookie) { + try { + // Create a fake request with the cookie header for BetterAuth + const headers = new Headers(); + headers.set("cookie", msg.cookie); + const session = await auth.api.getSession({ headers }); + return session; + } catch { + return null; + } + } + + // Support bearer token auth + if (msg.token) { + try { + const headers = new Headers(); + headers.set("authorization", `Bearer ${msg.token}`); + const session = await auth.api.getSession({ headers }); + return session; + } catch { + return null; + } + } + + return null; +} + +// Handle messages from authenticated WebSocket clients +async function handleClientMessage(clientId: string, ws: any, msg: any) { + const client = activeClients.get(clientId); + if (!client) return; + + switch (msg.type) { + case "chat.send": { + const { sessionKey, message } = msg; + if (!sessionKey || !message) { + ws.send(JSON.stringify({ type: "error", id: msg.id, error: "sessionKey and message required" })); + return; + } + client.sessionKeys.add(sessionKey); + const result = await chatSend(sessionKey, message); + ws.send(JSON.stringify({ type: "res", id: msg.id, ok: true, payload: result })); + break; + } + + case "chat.history": { + const { sessionKey, limit } = msg; + if (!sessionKey) { + ws.send(JSON.stringify({ type: "error", id: msg.id, error: "sessionKey required" })); + return; + } + client.sessionKeys.add(sessionKey); + const result = await chatHistory(sessionKey, limit); + ws.send(JSON.stringify({ type: "res", id: msg.id, ok: true, payload: result })); + break; + } + + case "chat.abort": { + const { sessionKey } = msg; + if (!sessionKey) { + ws.send(JSON.stringify({ type: "error", id: msg.id, error: "sessionKey required" })); + return; + } + const result = await chatAbort(sessionKey); + ws.send(JSON.stringify({ type: "res", id: msg.id, ok: true, payload: result })); + break; + } + + case "sessions.list": { + const result = await sessionsList(msg.limit); + ws.send(JSON.stringify({ type: "res", id: msg.id, ok: true, payload: result })); + break; + } + + default: + ws.send(JSON.stringify({ type: "error", id: msg.id, error: `Unknown message type: ${msg.type}` })); + } +} + +// Forward gateway events to relevant WebSocket clients +gateway.onEvent((msg: any) => { + if (msg.type !== "event") return; + + const payload = msg.payload || {}; + const sessionKey = payload.sessionKey; + + for (const [, client] of activeClients) { + // Forward to clients subscribed to this session key, or broadcast if no key + if (!sessionKey || client.sessionKeys.has(sessionKey)) { + try { + client.ws.send(JSON.stringify(msg)); + } catch { + // Client disconnected, will be cleaned up + } + } + } +}); diff --git a/docker-compose.dokploy.yml b/docker-compose.dokploy.yml index d9c90dc..73f8cb7 100644 --- a/docker-compose.dokploy.yml +++ b/docker-compose.dokploy.yml @@ -25,6 +25,8 @@ services: COOKIE_DOMAIN: .donovankelly.xyz CLAWDBOT_HOOK_URL: ${CLAWDBOT_HOOK_URL:-https://hammer.donovankelly.xyz/hooks/agent} CLAWDBOT_HOOK_TOKEN: ${CLAWDBOT_HOOK_TOKEN} + GATEWAY_WS_URL: ${GATEWAY_WS_URL:-wss://hammer.donovankelly.xyz} + GATEWAY_WS_TOKEN: ${GATEWAY_WS_TOKEN} PORT: "3100" depends_on: - db @@ -34,9 +36,6 @@ services: build: context: ./frontend dockerfile: Dockerfile - args: - VITE_WS_URL: ${VITE_WS_URL:-wss://hammer.donovankelly.xyz} - VITE_WS_TOKEN: ${VITE_WS_TOKEN} ports: - "80" depends_on: diff --git a/frontend/Dockerfile b/frontend/Dockerfile index 7216d49..a266438 100644 --- a/frontend/Dockerfile +++ b/frontend/Dockerfile @@ -1,11 +1,6 @@ FROM oven/bun:1 AS build WORKDIR /app -ARG VITE_WS_URL="" -ARG VITE_WS_TOKEN="" -ENV VITE_WS_URL=$VITE_WS_URL -ENV VITE_WS_TOKEN=$VITE_WS_TOKEN - COPY package.json bun.lock* ./ RUN bun install --frozen-lockfile 2>/dev/null || bun install diff --git a/frontend/nginx.conf b/frontend/nginx.conf index d818ee3..3bcdb35 100644 --- a/frontend/nginx.conf +++ b/frontend/nginx.conf @@ -8,12 +8,19 @@ server { try_files $uri $uri/ /index.html; } - # Proxy API requests to backend + # Proxy API requests to backend (including WebSocket for chat) location /api/ { proxy_pass http://backend:3100; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + + # WebSocket support + proxy_http_version 1.1; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "upgrade"; + proxy_read_timeout 86400; } location /health { diff --git a/frontend/src/components/TaskDetailPanel.tsx b/frontend/src/components/TaskDetailPanel.tsx index f2d3514..45cdc28 100644 --- a/frontend/src/components/TaskDetailPanel.tsx +++ b/frontend/src/components/TaskDetailPanel.tsx @@ -1,6 +1,6 @@ import { useState, useEffect, useRef } from "react"; import type { Task, TaskStatus, TaskPriority, TaskSource, Project } from "../lib/types"; -import { updateTask, fetchProjects } from "../lib/api"; +import { updateTask, fetchProjects, addProgressNote } from "../lib/api"; const priorityColors: Record = { critical: "bg-red-500 text-white", @@ -244,6 +244,8 @@ export function TaskDetailPanel({ task, onClose, onStatusChange, onTaskUpdated, const actions = statusActions[task.status] || []; const isActive = task.status === "active"; const [saving, setSaving] = useState(false); + const [noteText, setNoteText] = useState(""); + const [addingNote, setAddingNote] = useState(false); // Draft state for editable fields const [draftTitle, setDraftTitle] = useState(task.title); @@ -514,6 +516,55 @@ export function TaskDetailPanel({ task, onClose, onStatusChange, onTaskUpdated, ({task.progressNotes.length}) )} + + {/* Add note input */} + {hasToken && ( +
+
+