timeouts for faas

This commit is contained in:
2025-08-31 11:48:55 -04:00
parent 61bed7b412
commit 847ba2eede
2 changed files with 262 additions and 41 deletions

View File

@ -83,10 +83,11 @@ func NewSimpleDockerRuntime(logger *zap.Logger) (*SimpleDockerRuntime, error) {
func (s *SimpleDockerRuntime) Execute(ctx context.Context, function *domain.FunctionDefinition, input json.RawMessage) (*domain.ExecutionResult, error) { func (s *SimpleDockerRuntime) Execute(ctx context.Context, function *domain.FunctionDefinition, input json.RawMessage) (*domain.ExecutionResult, error) {
startTime := time.Now() startTime := time.Now()
s.logger.Info("Executing function in Docker container", s.logger.Info("🐳 TIMEOUT DEBUG: Executing function in Docker container",
zap.String("function_id", function.ID.String()), zap.String("function_id", function.ID.String()),
zap.String("name", function.Name), zap.String("name", function.Name),
zap.String("image", function.Image)) zap.String("image", function.Image),
zap.Duration("timeout", function.Timeout.Duration))
// Create container // Create container
containerID, err := s.createContainer(ctx, function, input) containerID, err := s.createContainer(ctx, function, input)
@ -100,53 +101,158 @@ func (s *SimpleDockerRuntime) Execute(ctx context.Context, function *domain.Func
return nil, fmt.Errorf("failed to start container: %w", err) return nil, fmt.Errorf("failed to start container: %w", err)
} }
// Wait for container to finish // Create timeout context based on function timeout
statusCh, errCh := s.client.ContainerWait(ctx, containerID, container.WaitConditionNotRunning) var timeoutCtx context.Context
var cancel context.CancelFunc
if function.Timeout.Duration > 0 {
s.logger.Info("⏰ TIMEOUT DEBUG: Creating timeout context for function execution",
zap.Duration("timeout", function.Timeout.Duration),
zap.String("container_id", containerID))
timeoutCtx, cancel = context.WithTimeout(ctx, function.Timeout.Duration)
defer cancel()
} else {
s.logger.Info("❌ TIMEOUT DEBUG: No timeout specified, using original context",
zap.String("container_id", containerID))
timeoutCtx = ctx
}
s.logger.Debug("Starting container wait with timeout",
zap.String("container_id", containerID),
zap.Duration("timeout", function.Timeout.Duration))
// Wait for container to finish with timeout
statusCh, errCh := s.client.ContainerWait(timeoutCtx, containerID, container.WaitConditionNotRunning)
var timedOut bool
select { select {
case err := <-errCh: case err := <-errCh:
s.logger.Debug("Container wait returned error",
zap.String("container_id", containerID),
zap.Error(err))
s.cleanupContainer(ctx, containerID) s.cleanupContainer(ctx, containerID)
return nil, fmt.Errorf("error waiting for container: %w", err) return nil, fmt.Errorf("error waiting for container: %w", err)
case <-statusCh: case status := <-statusCh:
// Container finished s.logger.Debug("Container finished normally",
zap.String("container_id", containerID),
zap.Int64("exit_code", status.StatusCode))
// Container finished normally
case <-timeoutCtx.Done():
// Timeout occurred
timedOut = true
s.logger.Info("💥 TIMEOUT DEBUG: Function execution timed out, stopping container",
zap.String("container_id", containerID),
zap.Duration("timeout", function.Timeout.Duration),
zap.Error(timeoutCtx.Err()))
// Stop the container in the background - don't wait for it to complete
go func() {
s.logger.Debug("Attempting to stop timed out container in background",
zap.String("container_id", containerID))
// Use a very short timeout for stopping, then kill if needed
if err := s.client.ContainerStop(context.Background(), containerID, container.StopOptions{
Timeout: &[]int{1}[0], // Only 1 second grace period for stop
}); err != nil {
s.logger.Warn("Failed to stop timed out container gracefully, attempting to kill",
zap.String("container_id", containerID),
zap.Error(err))
// If stop fails, try to kill it immediately
if killErr := s.client.ContainerKill(context.Background(), containerID, "SIGKILL"); killErr != nil {
s.logger.Error("Failed to kill timed out container",
zap.String("container_id", containerID),
zap.Error(killErr))
} else {
s.logger.Debug("Successfully killed timed out container",
zap.String("container_id", containerID))
}
} else {
s.logger.Debug("Successfully stopped timed out container",
zap.String("container_id", containerID))
}
}()
} }
// Get container logs var logs []string
logs, err := s.getContainerLogs(ctx, containerID) var stats *container.InspectResponse
if err != nil {
s.logger.Warn("Failed to get container logs", zap.Error(err))
logs = []string{"Failed to retrieve logs"}
}
// Get container stats // For timed-out containers, skip log retrieval and inspection to return quickly
stats, err := s.client.ContainerInspect(ctx, containerID) if timedOut {
if err != nil { s.logger.Debug("Skipping log retrieval and inspection for timed-out container",
s.logger.Warn("Failed to inspect container", zap.Error(err)) zap.String("container_id", containerID))
logs = []string{"Container execution timed out"}
} else {
// Get container logs
var err error
logs, err = s.getContainerLogs(ctx, containerID)
if err != nil {
s.logger.Warn("Failed to get container logs", zap.Error(err))
logs = []string{"Failed to retrieve logs"}
}
// Get container stats
statsResponse, err := s.client.ContainerInspect(ctx, containerID)
if err != nil {
s.logger.Warn("Failed to inspect container", zap.Error(err))
} else {
stats = &statsResponse
}
} }
// Get execution result // Get execution result
result := &domain.ExecutionResult{ result := &domain.ExecutionResult{
Logs: logs, Logs: logs,
Duration: time.Since(startTime).Truncate(time.Millisecond),
} }
// Try to get output from container // Handle timeout case
if stats.State != nil { if timedOut {
result.Duration = time.Since(startTime).Truncate(time.Millisecond) s.logger.Debug("Processing timeout result",
if stats.State.ExitCode == 0 { zap.String("container_id", containerID),
// Try to get output from container zap.Duration("timeout", function.Timeout.Duration))
output, err := s.getContainerOutput(ctx, containerID) result.Error = fmt.Sprintf("Function execution timed out after %v", function.Timeout.Duration)
if err != nil { result.Output = json.RawMessage(`{"error": "Function execution timed out"}`)
s.logger.Warn("Failed to get container output", zap.Error(err)) } else {
result.Output = json.RawMessage(`{"error": "Failed to retrieve output"}`) s.logger.Debug("Processing normal execution result",
zap.String("container_id", containerID))
// Try to get output from container for successful executions
if stats.State != nil {
s.logger.Debug("Container state available",
zap.String("container_id", containerID),
zap.Int("exit_code", stats.State.ExitCode))
if stats.State.ExitCode == 0 {
// Try to get output from container
output, err := s.getContainerOutput(ctx, containerID)
if err != nil {
s.logger.Warn("Failed to get container output", zap.Error(err))
result.Output = json.RawMessage(`{"error": "Failed to retrieve output"}`)
} else {
s.logger.Debug("Successfully retrieved container output",
zap.String("container_id", containerID),
zap.Int("output_size", len(output)))
result.Output = output
}
} else { } else {
result.Output = output s.logger.Debug("Container exited with non-zero code",
zap.String("container_id", containerID),
zap.Int("exit_code", stats.State.ExitCode))
result.Error = fmt.Sprintf("Container exited with code %d", stats.State.ExitCode)
result.Output = json.RawMessage(`{"error": "Container execution failed"}`)
} }
} else { } else {
result.Error = fmt.Sprintf("Container exited with code %d", stats.State.ExitCode) s.logger.Warn("Container state not available",
zap.String("container_id", containerID))
} }
} }
// Cleanup container // Cleanup container - for timed-out containers, do this in background
s.cleanupContainer(ctx, containerID) if timedOut {
s.logger.Debug("Scheduling background cleanup for timed-out container",
zap.String("container_id", containerID))
go func() {
s.cleanupContainer(context.Background(), containerID)
}()
} else {
s.cleanupContainer(ctx, containerID)
}
return result, nil return result, nil
} }

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"strings"
"time" "time"
"go.uber.org/zap" "go.uber.org/zap"
@ -35,7 +36,7 @@ func NewExecutionService(
} }
func (s *executionService) Execute(ctx context.Context, req *domain.ExecuteFunctionRequest, userID string) (*domain.ExecuteFunctionResponse, error) { func (s *executionService) Execute(ctx context.Context, req *domain.ExecuteFunctionRequest, userID string) (*domain.ExecuteFunctionResponse, error) {
s.logger.Info("Executing function", s.logger.Info("🔄 TIMEOUT DEBUG: Starting function execution",
zap.String("function_id", req.FunctionID.String()), zap.String("function_id", req.FunctionID.String()),
zap.String("user_id", userID), zap.String("user_id", userID),
zap.Bool("async", req.Async)) zap.Bool("async", req.Async))
@ -46,6 +47,11 @@ func (s *executionService) Execute(ctx context.Context, req *domain.ExecuteFunct
return nil, fmt.Errorf("function not found: %w", err) return nil, fmt.Errorf("function not found: %w", err)
} }
s.logger.Info("⏱️ TIMEOUT DEBUG: Function loaded with timeout",
zap.String("function_id", req.FunctionID.String()),
zap.String("function_name", function.Name),
zap.Duration("timeout", function.Timeout.Duration))
// Create execution record // Create execution record
// Initialize input with empty JSON if nil or empty // Initialize input with empty JSON if nil or empty
input := req.Input input := req.Input
@ -108,15 +114,52 @@ func (s *executionService) executeSync(ctx context.Context, execution *domain.Fu
}, nil }, nil
} }
// Create timeout context for execution
execCtx := ctx
var cancel context.CancelFunc
if function.Timeout.Duration > 0 {
s.logger.Info("⏰ TIMEOUT DEBUG: Creating execution timeout context for sync execution",
zap.String("execution_id", execution.ID.String()),
zap.String("function_id", function.ID.String()),
zap.Duration("timeout", function.Timeout.Duration))
execCtx, cancel = context.WithTimeout(ctx, function.Timeout.Duration)
defer cancel()
} else {
s.logger.Info("❌ TIMEOUT DEBUG: No timeout specified for sync execution",
zap.String("execution_id", execution.ID.String()),
zap.String("function_id", function.ID.String()))
}
// Execute function // Execute function
result, err := backend.Execute(ctx, function, execution.Input) s.logger.Debug("Starting function execution in runtime backend (sync)",
zap.String("execution_id", execution.ID.String()),
zap.String("function_id", function.ID.String()),
zap.String("runtime", string(function.Runtime)))
result, err := backend.Execute(execCtx, function, execution.Input)
s.logger.Debug("Function execution completed in runtime backend (sync)",
zap.String("execution_id", execution.ID.String()),
zap.String("function_id", function.ID.String()),
zap.Error(err))
if err != nil { if err != nil {
execution.Status = domain.StatusFailed // Check if this was a timeout error
execution.Error = fmt.Sprintf("execution failed: %v", err) if execCtx.Err() == context.DeadlineExceeded {
s.logger.Debug("Sync execution timed out (context deadline exceeded)",
zap.String("execution_id", execution.ID.String()),
zap.Duration("timeout", function.Timeout.Duration),
zap.Error(execCtx.Err()))
execution.Status = domain.StatusTimeout
execution.Error = fmt.Sprintf("function execution timed out after %v", function.Timeout.Duration)
} else {
s.logger.Debug("Sync execution failed (non-timeout error)",
zap.String("execution_id", execution.ID.String()),
zap.Error(err))
execution.Status = domain.StatusFailed
execution.Error = fmt.Sprintf("execution failed: %v", err)
}
s.updateExecutionComplete(ctx, execution) s.updateExecutionComplete(ctx, execution)
return &domain.ExecuteFunctionResponse{ return &domain.ExecuteFunctionResponse{
ExecutionID: execution.ID, ExecutionID: execution.ID,
Status: domain.StatusFailed, Status: execution.Status,
Error: execution.Error, Error: execution.Error,
}, nil }, nil
} }
@ -132,12 +175,30 @@ func (s *executionService) executeSync(ctx context.Context, execution *domain.Fu
execution.Error = result.Error execution.Error = result.Error
execution.Duration = result.Duration execution.Duration = result.Duration
execution.MemoryUsed = result.MemoryUsed execution.MemoryUsed = result.MemoryUsed
s.updateExecutionComplete(ctx, execution)
// Check if the result indicates a timeout
if result.Error != "" { if result.Error != "" {
execution.Status = domain.StatusFailed s.logger.Debug("Execution result contains error",
zap.String("execution_id", execution.ID.String()),
zap.String("error", result.Error))
if strings.Contains(result.Error, "timed out") {
s.logger.Debug("Result indicates timeout - setting status to timeout",
zap.String("execution_id", execution.ID.String()),
zap.String("error", result.Error))
execution.Status = domain.StatusTimeout
} else {
s.logger.Debug("Result indicates failure - setting status to failed",
zap.String("execution_id", execution.ID.String()),
zap.String("error", result.Error))
execution.Status = domain.StatusFailed
}
} else {
s.logger.Debug("Execution successful - no errors",
zap.String("execution_id", execution.ID.String()))
} }
s.updateExecutionComplete(ctx, execution)
return &domain.ExecuteFunctionResponse{ return &domain.ExecuteFunctionResponse{
ExecutionID: execution.ID, ExecutionID: execution.ID,
Status: execution.Status, Status: execution.Status,
@ -169,14 +230,51 @@ func (s *executionService) executeAsync(ctx context.Context, execution *domain.F
return return
} }
// Create timeout context for execution
execCtx := ctx
var cancel context.CancelFunc
if function.Timeout.Duration > 0 {
s.logger.Debug("Creating execution timeout context for async execution",
zap.String("execution_id", execution.ID.String()),
zap.String("function_id", function.ID.String()),
zap.Duration("timeout", function.Timeout.Duration))
execCtx, cancel = context.WithTimeout(ctx, function.Timeout.Duration)
defer cancel()
} else {
s.logger.Debug("No timeout specified for async execution",
zap.String("execution_id", execution.ID.String()),
zap.String("function_id", function.ID.String()))
}
// Execute function // Execute function
result, err := backend.Execute(ctx, function, execution.Input) s.logger.Debug("Starting function execution in runtime backend (async)",
zap.String("execution_id", execution.ID.String()),
zap.String("function_id", function.ID.String()),
zap.String("runtime", string(function.Runtime)))
result, err := backend.Execute(execCtx, function, execution.Input)
s.logger.Debug("Function execution completed in runtime backend (async)",
zap.String("execution_id", execution.ID.String()),
zap.String("function_id", function.ID.String()),
zap.Error(err))
if err != nil { if err != nil {
s.logger.Error("Async function execution failed", s.logger.Error("Async function execution failed",
zap.String("execution_id", execution.ID.String()), zap.String("execution_id", execution.ID.String()),
zap.Error(err)) zap.Error(err))
execution.Status = domain.StatusFailed // Check if this was a timeout error
execution.Error = fmt.Sprintf("execution failed: %v", err) if execCtx.Err() == context.DeadlineExceeded {
s.logger.Debug("Async execution timed out (context deadline exceeded)",
zap.String("execution_id", execution.ID.String()),
zap.Duration("timeout", function.Timeout.Duration),
zap.Error(execCtx.Err()))
execution.Status = domain.StatusTimeout
execution.Error = fmt.Sprintf("function execution timed out after %v", function.Timeout.Duration)
} else {
s.logger.Debug("Async execution failed (non-timeout error)",
zap.String("execution_id", execution.ID.String()),
zap.Error(err))
execution.Status = domain.StatusFailed
execution.Error = fmt.Sprintf("execution failed: %v", err)
}
s.updateExecutionComplete(ctx, execution) s.updateExecutionComplete(ctx, execution)
return return
} }
@ -193,8 +291,25 @@ func (s *executionService) executeAsync(ctx context.Context, execution *domain.F
execution.Duration = result.Duration execution.Duration = result.Duration
execution.MemoryUsed = result.MemoryUsed execution.MemoryUsed = result.MemoryUsed
// Check if the result indicates a timeout
if result.Error != "" { if result.Error != "" {
execution.Status = domain.StatusFailed s.logger.Debug("Execution result contains error",
zap.String("execution_id", execution.ID.String()),
zap.String("error", result.Error))
if strings.Contains(result.Error, "timed out") {
s.logger.Debug("Result indicates timeout - setting status to timeout",
zap.String("execution_id", execution.ID.String()),
zap.String("error", result.Error))
execution.Status = domain.StatusTimeout
} else {
s.logger.Debug("Result indicates failure - setting status to failed",
zap.String("execution_id", execution.ID.String()),
zap.String("error", result.Error))
execution.Status = domain.StatusFailed
}
} else {
s.logger.Debug("Execution successful - no errors",
zap.String("execution_id", execution.ID.String()))
} }
s.updateExecutionComplete(ctx, execution) s.updateExecutionComplete(ctx, execution)