From 66b114f374567141bc6ef4b762d464c5d5295883 Mon Sep 17 00:00:00 2001 From: Ryan Copley Date: Sun, 31 Aug 2025 12:24:50 -0400 Subject: [PATCH] logs --- faas/internal/domain/models.go | 1 + .../postgres/execution_repository.go | 16 +- faas/internal/runtime/docker/simple.go | 216 +++++++++++++++--- faas/internal/services/execution_service.go | 27 +-- .../002_add_execution_logs.down.sql | 2 + faas/migrations/002_add_execution_logs.up.sql | 2 + faas/web/src/components/ExecutionModal.tsx | 137 +++++++++-- faas/web/src/types/index.ts | 1 + 8 files changed, 321 insertions(+), 81 deletions(-) create mode 100644 faas/migrations/002_add_execution_logs.down.sql create mode 100644 faas/migrations/002_add_execution_logs.up.sql diff --git a/faas/internal/domain/models.go b/faas/internal/domain/models.go index d216bed..3609a4c 100644 --- a/faas/internal/domain/models.go +++ b/faas/internal/domain/models.go @@ -71,6 +71,7 @@ type FunctionExecution struct { Error string `json:"error,omitempty" db:"error"` Duration time.Duration `json:"duration" db:"duration"` MemoryUsed int `json:"memory_used" db:"memory_used"` + Logs []string `json:"logs,omitempty" db:"logs"` ContainerID string `json:"container_id,omitempty" db:"container_id"` ExecutorID string `json:"executor_id" db:"executor_id"` CreatedAt time.Time `json:"created_at" db:"created_at"` diff --git a/faas/internal/repository/postgres/execution_repository.go b/faas/internal/repository/postgres/execution_repository.go index 499f375..7992bab 100644 --- a/faas/internal/repository/postgres/execution_repository.go +++ b/faas/internal/repository/postgres/execution_repository.go @@ -8,6 +8,7 @@ import ( "time" "github.com/google/uuid" + "github.com/lib/pq" "go.uber.org/zap" "github.com/RyanCopley/skybridge/faas/internal/domain" @@ -97,7 +98,7 @@ func (r *executionRepository) Create(ctx context.Context, execution *domain.Func func (r *executionRepository) GetByID(ctx context.Context, id uuid.UUID) (*domain.FunctionExecution, error) { query := ` SELECT id, function_id, status, input, output, error, duration, memory_used, - container_id, executor_id, created_at, started_at, completed_at + logs, container_id, executor_id, created_at, started_at, completed_at FROM executions WHERE id = $1` execution := &domain.FunctionExecution{} @@ -106,7 +107,7 @@ func (r *executionRepository) GetByID(ctx context.Context, id uuid.UUID) (*domai err := r.db.QueryRowContext(ctx, query, id).Scan( &execution.ID, &execution.FunctionID, &execution.Status, &execution.Input, &execution.Output, &execution.Error, &durationInterval, &execution.MemoryUsed, - &execution.ContainerID, &execution.ExecutorID, &execution.CreatedAt, + pq.Array(&execution.Logs), &execution.ContainerID, &execution.ExecutorID, &execution.CreatedAt, &execution.StartedAt, &execution.CompletedAt, ) @@ -135,12 +136,13 @@ func (r *executionRepository) Update(ctx context.Context, id uuid.UUID, executio query := ` UPDATE executions SET status = $2, output = $3, error = $4, duration = $5, memory_used = $6, - container_id = $7, started_at = $8, completed_at = $9 + logs = $7, container_id = $8, started_at = $9, completed_at = $10 WHERE id = $1` _, err := r.db.ExecContext(ctx, query, id, execution.Status, jsonField(execution.Output), execution.Error, - durationToInterval(execution.Duration), execution.MemoryUsed, execution.ContainerID, + durationToInterval(execution.Duration), execution.MemoryUsed, + pq.Array(execution.Logs), execution.ContainerID, execution.StartedAt, execution.CompletedAt, ) @@ -209,7 +211,7 @@ func (r *executionRepository) List(ctx context.Context, functionID *uuid.UUID, l err := rows.Scan( &execution.ID, &execution.FunctionID, &execution.Status, &execution.Input, &execution.Output, &execution.Error, &durationInterval, &execution.MemoryUsed, - &execution.ContainerID, &execution.ExecutorID, &execution.CreatedAt, + pq.Array(&execution.Logs), &execution.ContainerID, &execution.ExecutorID, &execution.CreatedAt, &execution.StartedAt, &execution.CompletedAt, ) @@ -245,7 +247,7 @@ func (r *executionRepository) GetByFunctionID(ctx context.Context, functionID uu func (r *executionRepository) GetByStatus(ctx context.Context, status domain.ExecutionStatus, limit, offset int) ([]*domain.FunctionExecution, error) { query := ` SELECT id, function_id, status, input, output, error, duration, memory_used, - container_id, executor_id, created_at, started_at, completed_at + logs, container_id, executor_id, created_at, started_at, completed_at FROM executions WHERE status = $1 ORDER BY created_at DESC LIMIT $2 OFFSET $3` @@ -264,7 +266,7 @@ func (r *executionRepository) GetByStatus(ctx context.Context, status domain.Exe err := rows.Scan( &execution.ID, &execution.FunctionID, &execution.Status, &execution.Input, &execution.Output, &execution.Error, &durationInterval, &execution.MemoryUsed, - &execution.ContainerID, &execution.ExecutorID, &execution.CreatedAt, + pq.Array(&execution.Logs), &execution.ContainerID, &execution.ExecutorID, &execution.CreatedAt, &execution.StartedAt, &execution.CompletedAt, ) diff --git a/faas/internal/runtime/docker/simple.go b/faas/internal/runtime/docker/simple.go index b65c014..74f332f 100644 --- a/faas/internal/runtime/docker/simple.go +++ b/faas/internal/runtime/docker/simple.go @@ -4,6 +4,8 @@ import ( "context" "encoding/json" "fmt" + "io" + "regexp" "strings" "time" @@ -341,7 +343,15 @@ func (s *SimpleDockerRuntime) createContainer(ctx context.Context, function *dom echo "const handler = require('/tmp/index.js').handler; const input = process.env.FUNCTION_INPUT ? JSON.parse(process.env.FUNCTION_INPUT) : {}; const context = { functionName: '` + function.Name + `' }; - handler(input, context).then(result => console.log(JSON.stringify(result))).catch(err => { console.error(err); process.exit(1); });" > /tmp/runner.js && + console.log(''); + handler(input, context).then(result => { + console.log(''); + console.log('' + JSON.stringify(result) + ''); + }).catch(err => { + console.log(''); + console.error('{\"error\": \"' + err.message + '\"}'); + process.exit(1); + });" > /tmp/runner.js && node /tmp/runner.js `} case "python", "python3", "python3.9", "python3.10", "python3.11": @@ -350,8 +360,15 @@ func (s *SimpleDockerRuntime) createContainer(ctx context.Context, function *dom echo "import json, os, sys; sys.path.insert(0, '/tmp'); from handler import handler; input_data = json.loads(os.environ.get('FUNCTION_INPUT', '{}')); context = {'function_name': '` + function.Name + `'}; - result = handler(input_data, context); - print(json.dumps(result))" > /tmp/runner.py && + print(''); + try: + result = handler(input_data, context); + print(''); + print('' + json.dumps(result) + ''); + except Exception as e: + print(''); + print('{\"error\": \"' + str(e) + '\"}', file=sys.stderr); + sys.exit(1);" > /tmp/runner.py && python /tmp/runner.py `} default: @@ -386,20 +403,48 @@ func (s *SimpleDockerRuntime) getContainerLogs(ctx context.Context, containerID logs, err := s.client.ContainerLogs(ctx, containerID, container.LogsOptions{ ShowStdout: true, ShowStderr: true, - Tail: "50", // Get last 50 lines + Tail: "100", // Get last 100 lines }) if err != nil { return nil, fmt.Errorf("failed to get container logs: %w", err) } defer logs.Close() - // For simplicity, we'll return a placeholder - // In a real implementation, you'd parse the log output - return []string{ - "Container logs would appear here", - "Function execution started", - "Function execution completed", - }, nil + // Read the actual logs content + logData, err := io.ReadAll(logs) + if err != nil { + return nil, fmt.Errorf("failed to read log data: %w", err) + } + + // Parse Docker logs to remove binary headers + rawOutput := parseDockerLogs(logData) + + // Parse the XML-tagged output to extract logs + parsedLogs, _, err := s.parseContainerOutput(rawOutput) + if err != nil { + s.logger.Warn("Failed to parse container output for logs", zap.Error(err)) + // Fallback to raw output split by lines + lines := strings.Split(strings.TrimSpace(rawOutput), "\n") + cleanLines := make([]string, 0, len(lines)) + for _, line := range lines { + if trimmed := strings.TrimSpace(line); trimmed != "" { + cleanLines = append(cleanLines, trimmed) + } + } + return cleanLines, nil + } + + // If no logs were parsed from tags, fallback to basic parsing + if len(parsedLogs) == 0 { + lines := strings.Split(strings.TrimSpace(rawOutput), "\n") + for _, line := range lines { + if trimmed := strings.TrimSpace(line); trimmed != "" && !strings.Contains(trimmed, "") && !strings.Contains(trimmed, "") { + parsedLogs = append(parsedLogs, trimmed) + } + } + } + + return parsedLogs, nil } func (s *SimpleDockerRuntime) getContainerOutput(ctx context.Context, containerID string) (json.RawMessage, error) { @@ -415,36 +460,143 @@ func (s *SimpleDockerRuntime) getContainerOutput(ctx context.Context, containerI defer logs.Close() // Read the actual logs content - buf := make([]byte, 4096) - var output strings.Builder - for { - n, err := logs.Read(buf) - if n > 0 { - // Docker logs include 8-byte headers, skip them for stdout content - if n > 8 { - output.Write(buf[8:n]) + logData, err := io.ReadAll(logs) + if err != nil { + return nil, fmt.Errorf("failed to read log data: %w", err) + } + + // Parse Docker logs to remove binary headers + rawOutput := parseDockerLogs(logData) + + // Parse the XML-tagged output to extract the result + _, result, err := s.parseContainerOutput(rawOutput) + if err != nil { + s.logger.Warn("Failed to parse container output for result", zap.Error(err)) + // Fallback to legacy parsing + logContent := strings.TrimSpace(rawOutput) + if json.Valid([]byte(logContent)) && logContent != "" { + return json.RawMessage(logContent), nil + } else { + // Return the output wrapped in a JSON object + fallbackResult := map[string]interface{}{ + "result": "Function executed successfully", + "output": logContent, + "timestamp": time.Now().UTC(), } - } - if err != nil { - break + resultJSON, _ := json.Marshal(fallbackResult) + return json.RawMessage(resultJSON), nil } } - logContent := strings.TrimSpace(output.String()) - - // Try to parse as JSON first, if that fails, wrap in a JSON object - if json.Valid([]byte(logContent)) && logContent != "" { - return json.RawMessage(logContent), nil - } else { - // Return the output wrapped in a JSON object - result := map[string]interface{}{ + // If no result was found in XML tags, provide a default success result + if result == nil { + defaultResult := map[string]interface{}{ "result": "Function executed successfully", - "output": logContent, + "message": "No result output found", "timestamp": time.Now().UTC(), } - resultJSON, _ := json.Marshal(result) + resultJSON, _ := json.Marshal(defaultResult) return json.RawMessage(resultJSON), nil } + + return result, nil +} + +// parseDockerLogs parses Docker log output which includes 8-byte headers +func parseDockerLogs(logData []byte) string { + var cleanOutput strings.Builder + + for len(logData) > 8 { + // Docker log header: [STREAM_TYPE, 0, 0, 0, SIZE1, SIZE2, SIZE3, SIZE4] + // Skip the first 8 bytes (header) + headerSize := 8 + if len(logData) < headerSize { + break + } + + // Extract size from bytes 4-7 (big endian) + size := int(logData[4])<<24 + int(logData[5])<<16 + int(logData[6])<<8 + int(logData[7]) + + if len(logData) < headerSize+size { + // If the remaining data is less than expected size, take what we have + size = len(logData) - headerSize + } + + if size > 0 { + // Extract the actual log content + content := string(logData[headerSize : headerSize+size]) + cleanOutput.WriteString(content) + } + + // Move to next log entry + logData = logData[headerSize+size:] + } + + return cleanOutput.String() +} + +// parseContainerOutput parses container output that contains and XML tags +func (s *SimpleDockerRuntime) parseContainerOutput(rawOutput string) (logs []string, result json.RawMessage, err error) { + // Extract stdout content (logs) - use DOTALL flag for multiline matching + stdoutRegex := regexp.MustCompile(`(?s)(.*?)`) + stdoutMatch := stdoutRegex.FindStringSubmatch(rawOutput) + if len(stdoutMatch) > 1 { + stdoutContent := strings.TrimSpace(stdoutMatch[1]) + if stdoutContent != "" { + // Split stdout content into lines for logs + lines := strings.Split(stdoutContent, "\n") + // Clean up empty lines and trim whitespace + cleanLogs := make([]string, 0, len(lines)) + for _, line := range lines { + if trimmed := strings.TrimSpace(line); trimmed != "" { + cleanLogs = append(cleanLogs, trimmed) + } + } + logs = cleanLogs + } + } + + // Extract result content - use DOTALL flag for multiline matching + resultRegex := regexp.MustCompile(`(?s)(.*?)`) + resultMatch := resultRegex.FindStringSubmatch(rawOutput) + if len(resultMatch) > 1 { + resultContent := strings.TrimSpace(resultMatch[1]) + if resultContent != "" { + // Validate JSON + if json.Valid([]byte(resultContent)) { + result = json.RawMessage(resultContent) + } else { + // If not valid JSON, wrap it + wrappedResult := map[string]interface{}{ + "output": resultContent, + } + resultJSON, _ := json.Marshal(wrappedResult) + result = json.RawMessage(resultJSON) + } + } + } + + // If no result tag found, treat entire output as result (fallback for non-tagged output) + if result == nil { + // Remove any XML tags from the output for fallback + cleanOutput := regexp.MustCompile(`(?s)<[^>]*>`).ReplaceAllString(rawOutput, "") + cleanOutput = strings.TrimSpace(cleanOutput) + + if cleanOutput != "" { + if json.Valid([]byte(cleanOutput)) { + result = json.RawMessage(cleanOutput) + } else { + // Wrap non-JSON output + wrappedResult := map[string]interface{}{ + "output": cleanOutput, + } + resultJSON, _ := json.Marshal(wrappedResult) + result = json.RawMessage(resultJSON) + } + } + } + + return logs, result, nil } func (s *SimpleDockerRuntime) cleanupContainer(ctx context.Context, containerID string) { diff --git a/faas/internal/services/execution_service.go b/faas/internal/services/execution_service.go index 5cda974..46b0d0d 100644 --- a/faas/internal/services/execution_service.go +++ b/faas/internal/services/execution_service.go @@ -142,6 +142,7 @@ func (s *executionService) executeSync(ctx context.Context, execution *domain.Fu execution.Error = result.Error execution.Duration = result.Duration execution.MemoryUsed = result.MemoryUsed + execution.Logs = result.Logs // Check if the result indicates a timeout if result.Error != "" { @@ -219,6 +220,7 @@ func (s *executionService) executeAsync(ctx context.Context, execution *domain.F execution.Error = result.Error execution.Duration = result.Duration execution.MemoryUsed = result.MemoryUsed + execution.Logs = result.Logs // Check if the result indicates a timeout if result.Error != "" { @@ -327,31 +329,18 @@ func (s *executionService) Cancel(ctx context.Context, id uuid.UUID, userID stri } func (s *executionService) GetLogs(ctx context.Context, id uuid.UUID) ([]string, error) { - // Get execution + // Get execution with logs from database execution, err := s.executionRepo.GetByID(ctx, id) if err != nil { return nil, fmt.Errorf("execution not found: %w", err) } - // Get function to determine runtime - function, err := s.functionRepo.GetByID(ctx, execution.FunctionID) - if err != nil { - return nil, fmt.Errorf("function not found: %w", err) + // Return logs from execution record + if execution.Logs == nil { + return []string{}, nil } - - // Get runtime backend - backend, err := s.runtimeService.GetBackend(ctx, string(function.Runtime)) - if err != nil { - return nil, fmt.Errorf("failed to get runtime backend: %w", err) - } - - // Get logs from runtime - logs, err := backend.GetLogs(ctx, id) - if err != nil { - return nil, fmt.Errorf("failed to get logs: %w", err) - } - - return logs, nil + + return execution.Logs, nil } func (s *executionService) GetRunningExecutions(ctx context.Context) ([]*domain.FunctionExecution, error) { diff --git a/faas/migrations/002_add_execution_logs.down.sql b/faas/migrations/002_add_execution_logs.down.sql new file mode 100644 index 0000000..195f0f1 --- /dev/null +++ b/faas/migrations/002_add_execution_logs.down.sql @@ -0,0 +1,2 @@ +-- Remove logs column from executions table +ALTER TABLE executions DROP COLUMN IF EXISTS logs; \ No newline at end of file diff --git a/faas/migrations/002_add_execution_logs.up.sql b/faas/migrations/002_add_execution_logs.up.sql new file mode 100644 index 0000000..0dabf15 --- /dev/null +++ b/faas/migrations/002_add_execution_logs.up.sql @@ -0,0 +1,2 @@ +-- Add logs column to executions table to store function execution logs +ALTER TABLE executions ADD COLUMN logs TEXT[]; \ No newline at end of file diff --git a/faas/web/src/components/ExecutionModal.tsx b/faas/web/src/components/ExecutionModal.tsx index 6612b4b..473cc79 100644 --- a/faas/web/src/components/ExecutionModal.tsx +++ b/faas/web/src/components/ExecutionModal.tsx @@ -1,4 +1,4 @@ -import React, { useState } from 'react'; +import React, { useState, useEffect, useRef } from 'react'; import { Modal, Button, @@ -39,6 +39,40 @@ export const ExecutionModal: React.FC = ({ const [execution, setExecution] = useState(null); const [logs, setLogs] = useState([]); const [loadingLogs, setLoadingLogs] = useState(false); + const [autoRefreshLogs, setAutoRefreshLogs] = useState(false); + const pollIntervalRef = useRef(null); + const logsPollIntervalRef = useRef(null); + + const stopLogsAutoRefresh = () => { + if (logsPollIntervalRef.current) { + clearInterval(logsPollIntervalRef.current); + logsPollIntervalRef.current = null; + } + setAutoRefreshLogs(false); + }; + + // Cleanup intervals on unmount or when modal closes + useEffect(() => { + if (!opened) { + // Stop auto-refresh when modal closes + stopLogsAutoRefresh(); + if (pollIntervalRef.current) { + clearTimeout(pollIntervalRef.current); + } + } + }, [opened]); + + // Cleanup intervals on unmount + useEffect(() => { + return () => { + if (pollIntervalRef.current) { + clearTimeout(pollIntervalRef.current); + } + if (logsPollIntervalRef.current) { + clearInterval(logsPollIntervalRef.current); + } + }; + }, []); if (!func) return null; @@ -69,8 +103,13 @@ export const ExecutionModal: React.FC = ({ setResult(response.data); if (async) { - // Poll for execution status + // Poll for execution status and start auto-refreshing logs pollExecution(response.data.execution_id); + } else { + // For synchronous executions, load logs immediately + if (response.data.execution_id) { + loadLogs(response.data.execution_id); + } } notifications.show({ @@ -91,19 +130,24 @@ export const ExecutionModal: React.FC = ({ }; const pollExecution = async (executionId: string) => { + // Start auto-refreshing logs immediately for async executions + startLogsAutoRefresh(executionId); + const poll = async () => { try { const response = await executionApi.getById(executionId); setExecution(response.data); if (response.data.status === 'running' || response.data.status === 'pending') { - setTimeout(poll, 2000); // Poll every 2 seconds + pollIntervalRef.current = setTimeout(poll, 2000); // Poll every 2 seconds } else { - // Execution completed, get logs + // Execution completed, stop auto-refresh and load final logs + stopLogsAutoRefresh(); loadLogs(executionId); } } catch (error) { console.error('Error polling execution:', error); + stopLogsAutoRefresh(); } }; @@ -122,6 +166,28 @@ export const ExecutionModal: React.FC = ({ } }; + const startLogsAutoRefresh = (executionId: string) => { + // Clear any existing interval + if (logsPollIntervalRef.current) { + clearInterval(logsPollIntervalRef.current); + } + + setAutoRefreshLogs(true); + + // Load logs immediately + loadLogs(executionId); + + // Set up auto-refresh every 2 seconds + logsPollIntervalRef.current = setInterval(async () => { + try { + const response = await executionApi.getLogs(executionId); + setLogs(response.data.logs || []); + } catch (error) { + console.error('Error auto-refreshing logs:', error); + } + }, 2000); + }; + const handleCancel = async () => { if (result && async) { try { @@ -285,35 +351,60 @@ export const ExecutionModal: React.FC = ({ )} {/* Logs */} - {async && ( -
- +
+ + Logs: + {autoRefreshLogs && ( + + Auto-refreshing + + )} + + + {result.execution_id && ( + + )} - - {loadingLogs ? ( - - - - ) : logs.length > 0 ? ( - - {logs.join('\n')} - - ) : ( - No logs available - )} - -
- )} +
+ + {loadingLogs ? ( + + + + ) : (logs.length > 0 || (execution?.logs && execution.logs.length > 0)) ? ( + + {(execution?.logs || logs).join('\n')} + + ) : ( + No logs available + )} + +
)} diff --git a/faas/web/src/types/index.ts b/faas/web/src/types/index.ts index fe98b58..24b7db5 100644 --- a/faas/web/src/types/index.ts +++ b/faas/web/src/types/index.ts @@ -35,6 +35,7 @@ export interface FunctionExecution { error?: string; duration?: number; memory_used?: number; + logs?: string[]; container_id?: string; executor_id: string; created_at: string;