diff --git a/faas/internal/domain/models.go b/faas/internal/domain/models.go
index d216bed..3609a4c 100644
--- a/faas/internal/domain/models.go
+++ b/faas/internal/domain/models.go
@@ -71,6 +71,7 @@ type FunctionExecution struct {
Error string `json:"error,omitempty" db:"error"`
Duration time.Duration `json:"duration" db:"duration"`
MemoryUsed int `json:"memory_used" db:"memory_used"`
+ Logs []string `json:"logs,omitempty" db:"logs"`
ContainerID string `json:"container_id,omitempty" db:"container_id"`
ExecutorID string `json:"executor_id" db:"executor_id"`
CreatedAt time.Time `json:"created_at" db:"created_at"`
diff --git a/faas/internal/repository/postgres/execution_repository.go b/faas/internal/repository/postgres/execution_repository.go
index 499f375..7992bab 100644
--- a/faas/internal/repository/postgres/execution_repository.go
+++ b/faas/internal/repository/postgres/execution_repository.go
@@ -8,6 +8,7 @@ import (
"time"
"github.com/google/uuid"
+ "github.com/lib/pq"
"go.uber.org/zap"
"github.com/RyanCopley/skybridge/faas/internal/domain"
@@ -97,7 +98,7 @@ func (r *executionRepository) Create(ctx context.Context, execution *domain.Func
func (r *executionRepository) GetByID(ctx context.Context, id uuid.UUID) (*domain.FunctionExecution, error) {
query := `
SELECT id, function_id, status, input, output, error, duration, memory_used,
- container_id, executor_id, created_at, started_at, completed_at
+ logs, container_id, executor_id, created_at, started_at, completed_at
FROM executions WHERE id = $1`
execution := &domain.FunctionExecution{}
@@ -106,7 +107,7 @@ func (r *executionRepository) GetByID(ctx context.Context, id uuid.UUID) (*domai
err := r.db.QueryRowContext(ctx, query, id).Scan(
&execution.ID, &execution.FunctionID, &execution.Status, &execution.Input,
&execution.Output, &execution.Error, &durationInterval, &execution.MemoryUsed,
- &execution.ContainerID, &execution.ExecutorID, &execution.CreatedAt,
+ pq.Array(&execution.Logs), &execution.ContainerID, &execution.ExecutorID, &execution.CreatedAt,
&execution.StartedAt, &execution.CompletedAt,
)
@@ -135,12 +136,13 @@ func (r *executionRepository) Update(ctx context.Context, id uuid.UUID, executio
query := `
UPDATE executions
SET status = $2, output = $3, error = $4, duration = $5, memory_used = $6,
- container_id = $7, started_at = $8, completed_at = $9
+ logs = $7, container_id = $8, started_at = $9, completed_at = $10
WHERE id = $1`
_, err := r.db.ExecContext(ctx, query,
id, execution.Status, jsonField(execution.Output), execution.Error,
- durationToInterval(execution.Duration), execution.MemoryUsed, execution.ContainerID,
+ durationToInterval(execution.Duration), execution.MemoryUsed,
+ pq.Array(execution.Logs), execution.ContainerID,
execution.StartedAt, execution.CompletedAt,
)
@@ -209,7 +211,7 @@ func (r *executionRepository) List(ctx context.Context, functionID *uuid.UUID, l
err := rows.Scan(
&execution.ID, &execution.FunctionID, &execution.Status, &execution.Input,
&execution.Output, &execution.Error, &durationInterval, &execution.MemoryUsed,
- &execution.ContainerID, &execution.ExecutorID, &execution.CreatedAt,
+ pq.Array(&execution.Logs), &execution.ContainerID, &execution.ExecutorID, &execution.CreatedAt,
&execution.StartedAt, &execution.CompletedAt,
)
@@ -245,7 +247,7 @@ func (r *executionRepository) GetByFunctionID(ctx context.Context, functionID uu
func (r *executionRepository) GetByStatus(ctx context.Context, status domain.ExecutionStatus, limit, offset int) ([]*domain.FunctionExecution, error) {
query := `
SELECT id, function_id, status, input, output, error, duration, memory_used,
- container_id, executor_id, created_at, started_at, completed_at
+ logs, container_id, executor_id, created_at, started_at, completed_at
FROM executions WHERE status = $1
ORDER BY created_at DESC LIMIT $2 OFFSET $3`
@@ -264,7 +266,7 @@ func (r *executionRepository) GetByStatus(ctx context.Context, status domain.Exe
err := rows.Scan(
&execution.ID, &execution.FunctionID, &execution.Status, &execution.Input,
&execution.Output, &execution.Error, &durationInterval, &execution.MemoryUsed,
- &execution.ContainerID, &execution.ExecutorID, &execution.CreatedAt,
+ pq.Array(&execution.Logs), &execution.ContainerID, &execution.ExecutorID, &execution.CreatedAt,
&execution.StartedAt, &execution.CompletedAt,
)
diff --git a/faas/internal/runtime/docker/simple.go b/faas/internal/runtime/docker/simple.go
index b65c014..74f332f 100644
--- a/faas/internal/runtime/docker/simple.go
+++ b/faas/internal/runtime/docker/simple.go
@@ -4,6 +4,8 @@ import (
"context"
"encoding/json"
"fmt"
+ "io"
+ "regexp"
"strings"
"time"
@@ -341,7 +343,15 @@ func (s *SimpleDockerRuntime) createContainer(ctx context.Context, function *dom
echo "const handler = require('/tmp/index.js').handler;
const input = process.env.FUNCTION_INPUT ? JSON.parse(process.env.FUNCTION_INPUT) : {};
const context = { functionName: '` + function.Name + `' };
- handler(input, context).then(result => console.log(JSON.stringify(result))).catch(err => { console.error(err); process.exit(1); });" > /tmp/runner.js &&
+ console.log('');
+ handler(input, context).then(result => {
+ console.log('');
+ console.log('' + JSON.stringify(result) + '');
+ }).catch(err => {
+ console.log('');
+ console.error('{\"error\": \"' + err.message + '\"}');
+ process.exit(1);
+ });" > /tmp/runner.js &&
node /tmp/runner.js
`}
case "python", "python3", "python3.9", "python3.10", "python3.11":
@@ -350,8 +360,15 @@ func (s *SimpleDockerRuntime) createContainer(ctx context.Context, function *dom
echo "import json, os, sys; sys.path.insert(0, '/tmp'); from handler import handler;
input_data = json.loads(os.environ.get('FUNCTION_INPUT', '{}'));
context = {'function_name': '` + function.Name + `'};
- result = handler(input_data, context);
- print(json.dumps(result))" > /tmp/runner.py &&
+ print('');
+ try:
+ result = handler(input_data, context);
+ print('');
+ print('' + json.dumps(result) + '');
+ except Exception as e:
+ print('');
+ print('{\"error\": \"' + str(e) + '\"}', file=sys.stderr);
+ sys.exit(1);" > /tmp/runner.py &&
python /tmp/runner.py
`}
default:
@@ -386,20 +403,48 @@ func (s *SimpleDockerRuntime) getContainerLogs(ctx context.Context, containerID
logs, err := s.client.ContainerLogs(ctx, containerID, container.LogsOptions{
ShowStdout: true,
ShowStderr: true,
- Tail: "50", // Get last 50 lines
+ Tail: "100", // Get last 100 lines
})
if err != nil {
return nil, fmt.Errorf("failed to get container logs: %w", err)
}
defer logs.Close()
- // For simplicity, we'll return a placeholder
- // In a real implementation, you'd parse the log output
- return []string{
- "Container logs would appear here",
- "Function execution started",
- "Function execution completed",
- }, nil
+ // Read the actual logs content
+ logData, err := io.ReadAll(logs)
+ if err != nil {
+ return nil, fmt.Errorf("failed to read log data: %w", err)
+ }
+
+ // Parse Docker logs to remove binary headers
+ rawOutput := parseDockerLogs(logData)
+
+ // Parse the XML-tagged output to extract logs
+ parsedLogs, _, err := s.parseContainerOutput(rawOutput)
+ if err != nil {
+ s.logger.Warn("Failed to parse container output for logs", zap.Error(err))
+ // Fallback to raw output split by lines
+ lines := strings.Split(strings.TrimSpace(rawOutput), "\n")
+ cleanLines := make([]string, 0, len(lines))
+ for _, line := range lines {
+ if trimmed := strings.TrimSpace(line); trimmed != "" {
+ cleanLines = append(cleanLines, trimmed)
+ }
+ }
+ return cleanLines, nil
+ }
+
+ // If no logs were parsed from tags, fallback to basic parsing
+ if len(parsedLogs) == 0 {
+ lines := strings.Split(strings.TrimSpace(rawOutput), "\n")
+ for _, line := range lines {
+ if trimmed := strings.TrimSpace(line); trimmed != "" && !strings.Contains(trimmed, "") && !strings.Contains(trimmed, "") {
+ parsedLogs = append(parsedLogs, trimmed)
+ }
+ }
+ }
+
+ return parsedLogs, nil
}
func (s *SimpleDockerRuntime) getContainerOutput(ctx context.Context, containerID string) (json.RawMessage, error) {
@@ -415,36 +460,143 @@ func (s *SimpleDockerRuntime) getContainerOutput(ctx context.Context, containerI
defer logs.Close()
// Read the actual logs content
- buf := make([]byte, 4096)
- var output strings.Builder
- for {
- n, err := logs.Read(buf)
- if n > 0 {
- // Docker logs include 8-byte headers, skip them for stdout content
- if n > 8 {
- output.Write(buf[8:n])
+ logData, err := io.ReadAll(logs)
+ if err != nil {
+ return nil, fmt.Errorf("failed to read log data: %w", err)
+ }
+
+ // Parse Docker logs to remove binary headers
+ rawOutput := parseDockerLogs(logData)
+
+ // Parse the XML-tagged output to extract the result
+ _, result, err := s.parseContainerOutput(rawOutput)
+ if err != nil {
+ s.logger.Warn("Failed to parse container output for result", zap.Error(err))
+ // Fallback to legacy parsing
+ logContent := strings.TrimSpace(rawOutput)
+ if json.Valid([]byte(logContent)) && logContent != "" {
+ return json.RawMessage(logContent), nil
+ } else {
+ // Return the output wrapped in a JSON object
+ fallbackResult := map[string]interface{}{
+ "result": "Function executed successfully",
+ "output": logContent,
+ "timestamp": time.Now().UTC(),
}
- }
- if err != nil {
- break
+ resultJSON, _ := json.Marshal(fallbackResult)
+ return json.RawMessage(resultJSON), nil
}
}
- logContent := strings.TrimSpace(output.String())
-
- // Try to parse as JSON first, if that fails, wrap in a JSON object
- if json.Valid([]byte(logContent)) && logContent != "" {
- return json.RawMessage(logContent), nil
- } else {
- // Return the output wrapped in a JSON object
- result := map[string]interface{}{
+ // If no result was found in XML tags, provide a default success result
+ if result == nil {
+ defaultResult := map[string]interface{}{
"result": "Function executed successfully",
- "output": logContent,
+ "message": "No result output found",
"timestamp": time.Now().UTC(),
}
- resultJSON, _ := json.Marshal(result)
+ resultJSON, _ := json.Marshal(defaultResult)
return json.RawMessage(resultJSON), nil
}
+
+ return result, nil
+}
+
+// parseDockerLogs parses Docker log output which includes 8-byte headers
+func parseDockerLogs(logData []byte) string {
+ var cleanOutput strings.Builder
+
+ for len(logData) > 8 {
+ // Docker log header: [STREAM_TYPE, 0, 0, 0, SIZE1, SIZE2, SIZE3, SIZE4]
+ // Skip the first 8 bytes (header)
+ headerSize := 8
+ if len(logData) < headerSize {
+ break
+ }
+
+ // Extract size from bytes 4-7 (big endian)
+ size := int(logData[4])<<24 + int(logData[5])<<16 + int(logData[6])<<8 + int(logData[7])
+
+ if len(logData) < headerSize+size {
+ // If the remaining data is less than expected size, take what we have
+ size = len(logData) - headerSize
+ }
+
+ if size > 0 {
+ // Extract the actual log content
+ content := string(logData[headerSize : headerSize+size])
+ cleanOutput.WriteString(content)
+ }
+
+ // Move to next log entry
+ logData = logData[headerSize+size:]
+ }
+
+ return cleanOutput.String()
+}
+
+// parseContainerOutput parses container output that contains and XML tags
+func (s *SimpleDockerRuntime) parseContainerOutput(rawOutput string) (logs []string, result json.RawMessage, err error) {
+ // Extract stdout content (logs) - use DOTALL flag for multiline matching
+ stdoutRegex := regexp.MustCompile(`(?s)(.*?)`)
+ stdoutMatch := stdoutRegex.FindStringSubmatch(rawOutput)
+ if len(stdoutMatch) > 1 {
+ stdoutContent := strings.TrimSpace(stdoutMatch[1])
+ if stdoutContent != "" {
+ // Split stdout content into lines for logs
+ lines := strings.Split(stdoutContent, "\n")
+ // Clean up empty lines and trim whitespace
+ cleanLogs := make([]string, 0, len(lines))
+ for _, line := range lines {
+ if trimmed := strings.TrimSpace(line); trimmed != "" {
+ cleanLogs = append(cleanLogs, trimmed)
+ }
+ }
+ logs = cleanLogs
+ }
+ }
+
+ // Extract result content - use DOTALL flag for multiline matching
+ resultRegex := regexp.MustCompile(`(?s)(.*?)`)
+ resultMatch := resultRegex.FindStringSubmatch(rawOutput)
+ if len(resultMatch) > 1 {
+ resultContent := strings.TrimSpace(resultMatch[1])
+ if resultContent != "" {
+ // Validate JSON
+ if json.Valid([]byte(resultContent)) {
+ result = json.RawMessage(resultContent)
+ } else {
+ // If not valid JSON, wrap it
+ wrappedResult := map[string]interface{}{
+ "output": resultContent,
+ }
+ resultJSON, _ := json.Marshal(wrappedResult)
+ result = json.RawMessage(resultJSON)
+ }
+ }
+ }
+
+ // If no result tag found, treat entire output as result (fallback for non-tagged output)
+ if result == nil {
+ // Remove any XML tags from the output for fallback
+ cleanOutput := regexp.MustCompile(`(?s)<[^>]*>`).ReplaceAllString(rawOutput, "")
+ cleanOutput = strings.TrimSpace(cleanOutput)
+
+ if cleanOutput != "" {
+ if json.Valid([]byte(cleanOutput)) {
+ result = json.RawMessage(cleanOutput)
+ } else {
+ // Wrap non-JSON output
+ wrappedResult := map[string]interface{}{
+ "output": cleanOutput,
+ }
+ resultJSON, _ := json.Marshal(wrappedResult)
+ result = json.RawMessage(resultJSON)
+ }
+ }
+ }
+
+ return logs, result, nil
}
func (s *SimpleDockerRuntime) cleanupContainer(ctx context.Context, containerID string) {
diff --git a/faas/internal/services/execution_service.go b/faas/internal/services/execution_service.go
index 5cda974..46b0d0d 100644
--- a/faas/internal/services/execution_service.go
+++ b/faas/internal/services/execution_service.go
@@ -142,6 +142,7 @@ func (s *executionService) executeSync(ctx context.Context, execution *domain.Fu
execution.Error = result.Error
execution.Duration = result.Duration
execution.MemoryUsed = result.MemoryUsed
+ execution.Logs = result.Logs
// Check if the result indicates a timeout
if result.Error != "" {
@@ -219,6 +220,7 @@ func (s *executionService) executeAsync(ctx context.Context, execution *domain.F
execution.Error = result.Error
execution.Duration = result.Duration
execution.MemoryUsed = result.MemoryUsed
+ execution.Logs = result.Logs
// Check if the result indicates a timeout
if result.Error != "" {
@@ -327,31 +329,18 @@ func (s *executionService) Cancel(ctx context.Context, id uuid.UUID, userID stri
}
func (s *executionService) GetLogs(ctx context.Context, id uuid.UUID) ([]string, error) {
- // Get execution
+ // Get execution with logs from database
execution, err := s.executionRepo.GetByID(ctx, id)
if err != nil {
return nil, fmt.Errorf("execution not found: %w", err)
}
- // Get function to determine runtime
- function, err := s.functionRepo.GetByID(ctx, execution.FunctionID)
- if err != nil {
- return nil, fmt.Errorf("function not found: %w", err)
+ // Return logs from execution record
+ if execution.Logs == nil {
+ return []string{}, nil
}
-
- // Get runtime backend
- backend, err := s.runtimeService.GetBackend(ctx, string(function.Runtime))
- if err != nil {
- return nil, fmt.Errorf("failed to get runtime backend: %w", err)
- }
-
- // Get logs from runtime
- logs, err := backend.GetLogs(ctx, id)
- if err != nil {
- return nil, fmt.Errorf("failed to get logs: %w", err)
- }
-
- return logs, nil
+
+ return execution.Logs, nil
}
func (s *executionService) GetRunningExecutions(ctx context.Context) ([]*domain.FunctionExecution, error) {
diff --git a/faas/migrations/002_add_execution_logs.down.sql b/faas/migrations/002_add_execution_logs.down.sql
new file mode 100644
index 0000000..195f0f1
--- /dev/null
+++ b/faas/migrations/002_add_execution_logs.down.sql
@@ -0,0 +1,2 @@
+-- Remove logs column from executions table
+ALTER TABLE executions DROP COLUMN IF EXISTS logs;
\ No newline at end of file
diff --git a/faas/migrations/002_add_execution_logs.up.sql b/faas/migrations/002_add_execution_logs.up.sql
new file mode 100644
index 0000000..0dabf15
--- /dev/null
+++ b/faas/migrations/002_add_execution_logs.up.sql
@@ -0,0 +1,2 @@
+-- Add logs column to executions table to store function execution logs
+ALTER TABLE executions ADD COLUMN logs TEXT[];
\ No newline at end of file
diff --git a/faas/web/src/components/ExecutionModal.tsx b/faas/web/src/components/ExecutionModal.tsx
index 6612b4b..473cc79 100644
--- a/faas/web/src/components/ExecutionModal.tsx
+++ b/faas/web/src/components/ExecutionModal.tsx
@@ -1,4 +1,4 @@
-import React, { useState } from 'react';
+import React, { useState, useEffect, useRef } from 'react';
import {
Modal,
Button,
@@ -39,6 +39,40 @@ export const ExecutionModal: React.FC = ({
const [execution, setExecution] = useState(null);
const [logs, setLogs] = useState([]);
const [loadingLogs, setLoadingLogs] = useState(false);
+ const [autoRefreshLogs, setAutoRefreshLogs] = useState(false);
+ const pollIntervalRef = useRef(null);
+ const logsPollIntervalRef = useRef(null);
+
+ const stopLogsAutoRefresh = () => {
+ if (logsPollIntervalRef.current) {
+ clearInterval(logsPollIntervalRef.current);
+ logsPollIntervalRef.current = null;
+ }
+ setAutoRefreshLogs(false);
+ };
+
+ // Cleanup intervals on unmount or when modal closes
+ useEffect(() => {
+ if (!opened) {
+ // Stop auto-refresh when modal closes
+ stopLogsAutoRefresh();
+ if (pollIntervalRef.current) {
+ clearTimeout(pollIntervalRef.current);
+ }
+ }
+ }, [opened]);
+
+ // Cleanup intervals on unmount
+ useEffect(() => {
+ return () => {
+ if (pollIntervalRef.current) {
+ clearTimeout(pollIntervalRef.current);
+ }
+ if (logsPollIntervalRef.current) {
+ clearInterval(logsPollIntervalRef.current);
+ }
+ };
+ }, []);
if (!func) return null;
@@ -69,8 +103,13 @@ export const ExecutionModal: React.FC = ({
setResult(response.data);
if (async) {
- // Poll for execution status
+ // Poll for execution status and start auto-refreshing logs
pollExecution(response.data.execution_id);
+ } else {
+ // For synchronous executions, load logs immediately
+ if (response.data.execution_id) {
+ loadLogs(response.data.execution_id);
+ }
}
notifications.show({
@@ -91,19 +130,24 @@ export const ExecutionModal: React.FC = ({
};
const pollExecution = async (executionId: string) => {
+ // Start auto-refreshing logs immediately for async executions
+ startLogsAutoRefresh(executionId);
+
const poll = async () => {
try {
const response = await executionApi.getById(executionId);
setExecution(response.data);
if (response.data.status === 'running' || response.data.status === 'pending') {
- setTimeout(poll, 2000); // Poll every 2 seconds
+ pollIntervalRef.current = setTimeout(poll, 2000); // Poll every 2 seconds
} else {
- // Execution completed, get logs
+ // Execution completed, stop auto-refresh and load final logs
+ stopLogsAutoRefresh();
loadLogs(executionId);
}
} catch (error) {
console.error('Error polling execution:', error);
+ stopLogsAutoRefresh();
}
};
@@ -122,6 +166,28 @@ export const ExecutionModal: React.FC = ({
}
};
+ const startLogsAutoRefresh = (executionId: string) => {
+ // Clear any existing interval
+ if (logsPollIntervalRef.current) {
+ clearInterval(logsPollIntervalRef.current);
+ }
+
+ setAutoRefreshLogs(true);
+
+ // Load logs immediately
+ loadLogs(executionId);
+
+ // Set up auto-refresh every 2 seconds
+ logsPollIntervalRef.current = setInterval(async () => {
+ try {
+ const response = await executionApi.getLogs(executionId);
+ setLogs(response.data.logs || []);
+ } catch (error) {
+ console.error('Error auto-refreshing logs:', error);
+ }
+ }, 2000);
+ };
+
const handleCancel = async () => {
if (result && async) {
try {
@@ -285,35 +351,60 @@ export const ExecutionModal: React.FC = ({
)}
{/* Logs */}
- {async && (
-
-
+
+
+
Logs:
+ {autoRefreshLogs && (
+
+ Auto-refreshing
+
+ )}
+
+
+ {result.execution_id && (
+ }
+ onClick={() => {
+ if (autoRefreshLogs) {
+ stopLogsAutoRefresh();
+ } else {
+ startLogsAutoRefresh(result.execution_id);
+ }
+ }}
+ >
+ {autoRefreshLogs ? 'Stop Auto-refresh' : 'Auto-refresh'}
+
+ )}
}
onClick={() => result.execution_id && loadLogs(result.execution_id)}
loading={loadingLogs}
+ disabled={autoRefreshLogs}
>
- Refresh
+ Manual Refresh
-
- {loadingLogs ? (
-
-
-
- ) : logs.length > 0 ? (
-
- {logs.join('\n')}
-
- ) : (
- No logs available
- )}
-
-
- )}
+
+
+ {loadingLogs ? (
+
+
+
+ ) : (logs.length > 0 || (execution?.logs && execution.logs.length > 0)) ? (
+
+ {(execution?.logs || logs).join('\n')}
+
+ ) : (
+ No logs available
+ )}
+
+
>
)}
diff --git a/faas/web/src/types/index.ts b/faas/web/src/types/index.ts
index fe98b58..24b7db5 100644
--- a/faas/web/src/types/index.ts
+++ b/faas/web/src/types/index.ts
@@ -35,6 +35,7 @@ export interface FunctionExecution {
error?: string;
duration?: number;
memory_used?: number;
+ logs?: string[];
container_id?: string;
executor_id: string;
created_at: string;