From 847ba2eedea640782a1344aee47ca637c587fb7c Mon Sep 17 00:00:00 2001 From: Ryan Copley Date: Sun, 31 Aug 2025 11:48:55 -0400 Subject: [PATCH] timeouts for faas --- faas/internal/runtime/docker/simple.go | 166 ++++++++++++++++---- faas/internal/services/execution_service.go | 137 ++++++++++++++-- 2 files changed, 262 insertions(+), 41 deletions(-) diff --git a/faas/internal/runtime/docker/simple.go b/faas/internal/runtime/docker/simple.go index 61e4a35..f584e0b 100644 --- a/faas/internal/runtime/docker/simple.go +++ b/faas/internal/runtime/docker/simple.go @@ -83,10 +83,11 @@ func NewSimpleDockerRuntime(logger *zap.Logger) (*SimpleDockerRuntime, error) { func (s *SimpleDockerRuntime) Execute(ctx context.Context, function *domain.FunctionDefinition, input json.RawMessage) (*domain.ExecutionResult, error) { startTime := time.Now() - s.logger.Info("Executing function in Docker container", + s.logger.Info("🐳 TIMEOUT DEBUG: Executing function in Docker container", zap.String("function_id", function.ID.String()), zap.String("name", function.Name), - zap.String("image", function.Image)) + zap.String("image", function.Image), + zap.Duration("timeout", function.Timeout.Duration)) // Create container containerID, err := s.createContainer(ctx, function, input) @@ -100,53 +101,158 @@ func (s *SimpleDockerRuntime) Execute(ctx context.Context, function *domain.Func return nil, fmt.Errorf("failed to start container: %w", err) } - // Wait for container to finish - statusCh, errCh := s.client.ContainerWait(ctx, containerID, container.WaitConditionNotRunning) + // Create timeout context based on function timeout + var timeoutCtx context.Context + var cancel context.CancelFunc + if function.Timeout.Duration > 0 { + s.logger.Info("⏰ TIMEOUT DEBUG: Creating timeout context for function execution", + zap.Duration("timeout", function.Timeout.Duration), + zap.String("container_id", containerID)) + timeoutCtx, cancel = context.WithTimeout(ctx, function.Timeout.Duration) + defer cancel() + } else { + s.logger.Info("❌ TIMEOUT DEBUG: No timeout specified, using original context", + zap.String("container_id", containerID)) + timeoutCtx = ctx + } + + s.logger.Debug("Starting container wait with timeout", + zap.String("container_id", containerID), + zap.Duration("timeout", function.Timeout.Duration)) + + // Wait for container to finish with timeout + statusCh, errCh := s.client.ContainerWait(timeoutCtx, containerID, container.WaitConditionNotRunning) + + var timedOut bool select { case err := <-errCh: + s.logger.Debug("Container wait returned error", + zap.String("container_id", containerID), + zap.Error(err)) s.cleanupContainer(ctx, containerID) return nil, fmt.Errorf("error waiting for container: %w", err) - case <-statusCh: - // Container finished + case status := <-statusCh: + s.logger.Debug("Container finished normally", + zap.String("container_id", containerID), + zap.Int64("exit_code", status.StatusCode)) + // Container finished normally + case <-timeoutCtx.Done(): + // Timeout occurred + timedOut = true + s.logger.Info("💥 TIMEOUT DEBUG: Function execution timed out, stopping container", + zap.String("container_id", containerID), + zap.Duration("timeout", function.Timeout.Duration), + zap.Error(timeoutCtx.Err())) + + // Stop the container in the background - don't wait for it to complete + go func() { + s.logger.Debug("Attempting to stop timed out container in background", + zap.String("container_id", containerID)) + // Use a very short timeout for stopping, then kill if needed + if err := s.client.ContainerStop(context.Background(), containerID, container.StopOptions{ + Timeout: &[]int{1}[0], // Only 1 second grace period for stop + }); err != nil { + s.logger.Warn("Failed to stop timed out container gracefully, attempting to kill", + zap.String("container_id", containerID), + zap.Error(err)) + // If stop fails, try to kill it immediately + if killErr := s.client.ContainerKill(context.Background(), containerID, "SIGKILL"); killErr != nil { + s.logger.Error("Failed to kill timed out container", + zap.String("container_id", containerID), + zap.Error(killErr)) + } else { + s.logger.Debug("Successfully killed timed out container", + zap.String("container_id", containerID)) + } + } else { + s.logger.Debug("Successfully stopped timed out container", + zap.String("container_id", containerID)) + } + }() } - // Get container logs - logs, err := s.getContainerLogs(ctx, containerID) - if err != nil { - s.logger.Warn("Failed to get container logs", zap.Error(err)) - logs = []string{"Failed to retrieve logs"} - } + var logs []string + var stats *container.InspectResponse + + // For timed-out containers, skip log retrieval and inspection to return quickly + if timedOut { + s.logger.Debug("Skipping log retrieval and inspection for timed-out container", + zap.String("container_id", containerID)) + logs = []string{"Container execution timed out"} + } else { + // Get container logs + var err error + logs, err = s.getContainerLogs(ctx, containerID) + if err != nil { + s.logger.Warn("Failed to get container logs", zap.Error(err)) + logs = []string{"Failed to retrieve logs"} + } - // Get container stats - stats, err := s.client.ContainerInspect(ctx, containerID) - if err != nil { - s.logger.Warn("Failed to inspect container", zap.Error(err)) + // Get container stats + statsResponse, err := s.client.ContainerInspect(ctx, containerID) + if err != nil { + s.logger.Warn("Failed to inspect container", zap.Error(err)) + } else { + stats = &statsResponse + } } // Get execution result result := &domain.ExecutionResult{ - Logs: logs, + Logs: logs, + Duration: time.Since(startTime).Truncate(time.Millisecond), } - // Try to get output from container - if stats.State != nil { - result.Duration = time.Since(startTime).Truncate(time.Millisecond) - if stats.State.ExitCode == 0 { - // Try to get output from container - output, err := s.getContainerOutput(ctx, containerID) - if err != nil { - s.logger.Warn("Failed to get container output", zap.Error(err)) - result.Output = json.RawMessage(`{"error": "Failed to retrieve output"}`) + // Handle timeout case + if timedOut { + s.logger.Debug("Processing timeout result", + zap.String("container_id", containerID), + zap.Duration("timeout", function.Timeout.Duration)) + result.Error = fmt.Sprintf("Function execution timed out after %v", function.Timeout.Duration) + result.Output = json.RawMessage(`{"error": "Function execution timed out"}`) + } else { + s.logger.Debug("Processing normal execution result", + zap.String("container_id", containerID)) + // Try to get output from container for successful executions + if stats.State != nil { + s.logger.Debug("Container state available", + zap.String("container_id", containerID), + zap.Int("exit_code", stats.State.ExitCode)) + if stats.State.ExitCode == 0 { + // Try to get output from container + output, err := s.getContainerOutput(ctx, containerID) + if err != nil { + s.logger.Warn("Failed to get container output", zap.Error(err)) + result.Output = json.RawMessage(`{"error": "Failed to retrieve output"}`) + } else { + s.logger.Debug("Successfully retrieved container output", + zap.String("container_id", containerID), + zap.Int("output_size", len(output))) + result.Output = output + } } else { - result.Output = output + s.logger.Debug("Container exited with non-zero code", + zap.String("container_id", containerID), + zap.Int("exit_code", stats.State.ExitCode)) + result.Error = fmt.Sprintf("Container exited with code %d", stats.State.ExitCode) + result.Output = json.RawMessage(`{"error": "Container execution failed"}`) } } else { - result.Error = fmt.Sprintf("Container exited with code %d", stats.State.ExitCode) + s.logger.Warn("Container state not available", + zap.String("container_id", containerID)) } } - // Cleanup container - s.cleanupContainer(ctx, containerID) + // Cleanup container - for timed-out containers, do this in background + if timedOut { + s.logger.Debug("Scheduling background cleanup for timed-out container", + zap.String("container_id", containerID)) + go func() { + s.cleanupContainer(context.Background(), containerID) + }() + } else { + s.cleanupContainer(ctx, containerID) + } return result, nil } diff --git a/faas/internal/services/execution_service.go b/faas/internal/services/execution_service.go index ff08c1a..efb66ef 100644 --- a/faas/internal/services/execution_service.go +++ b/faas/internal/services/execution_service.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "strings" "time" "go.uber.org/zap" @@ -35,7 +36,7 @@ func NewExecutionService( } func (s *executionService) Execute(ctx context.Context, req *domain.ExecuteFunctionRequest, userID string) (*domain.ExecuteFunctionResponse, error) { - s.logger.Info("Executing function", + s.logger.Info("🔄 TIMEOUT DEBUG: Starting function execution", zap.String("function_id", req.FunctionID.String()), zap.String("user_id", userID), zap.Bool("async", req.Async)) @@ -45,6 +46,11 @@ func (s *executionService) Execute(ctx context.Context, req *domain.ExecuteFunct if err != nil { return nil, fmt.Errorf("function not found: %w", err) } + + s.logger.Info("⏱️ TIMEOUT DEBUG: Function loaded with timeout", + zap.String("function_id", req.FunctionID.String()), + zap.String("function_name", function.Name), + zap.Duration("timeout", function.Timeout.Duration)) // Create execution record // Initialize input with empty JSON if nil or empty @@ -108,15 +114,52 @@ func (s *executionService) executeSync(ctx context.Context, execution *domain.Fu }, nil } + // Create timeout context for execution + execCtx := ctx + var cancel context.CancelFunc + if function.Timeout.Duration > 0 { + s.logger.Info("⏰ TIMEOUT DEBUG: Creating execution timeout context for sync execution", + zap.String("execution_id", execution.ID.String()), + zap.String("function_id", function.ID.String()), + zap.Duration("timeout", function.Timeout.Duration)) + execCtx, cancel = context.WithTimeout(ctx, function.Timeout.Duration) + defer cancel() + } else { + s.logger.Info("❌ TIMEOUT DEBUG: No timeout specified for sync execution", + zap.String("execution_id", execution.ID.String()), + zap.String("function_id", function.ID.String())) + } + // Execute function - result, err := backend.Execute(ctx, function, execution.Input) + s.logger.Debug("Starting function execution in runtime backend (sync)", + zap.String("execution_id", execution.ID.String()), + zap.String("function_id", function.ID.String()), + zap.String("runtime", string(function.Runtime))) + result, err := backend.Execute(execCtx, function, execution.Input) + s.logger.Debug("Function execution completed in runtime backend (sync)", + zap.String("execution_id", execution.ID.String()), + zap.String("function_id", function.ID.String()), + zap.Error(err)) if err != nil { - execution.Status = domain.StatusFailed - execution.Error = fmt.Sprintf("execution failed: %v", err) + // Check if this was a timeout error + if execCtx.Err() == context.DeadlineExceeded { + s.logger.Debug("Sync execution timed out (context deadline exceeded)", + zap.String("execution_id", execution.ID.String()), + zap.Duration("timeout", function.Timeout.Duration), + zap.Error(execCtx.Err())) + execution.Status = domain.StatusTimeout + execution.Error = fmt.Sprintf("function execution timed out after %v", function.Timeout.Duration) + } else { + s.logger.Debug("Sync execution failed (non-timeout error)", + zap.String("execution_id", execution.ID.String()), + zap.Error(err)) + execution.Status = domain.StatusFailed + execution.Error = fmt.Sprintf("execution failed: %v", err) + } s.updateExecutionComplete(ctx, execution) return &domain.ExecuteFunctionResponse{ ExecutionID: execution.ID, - Status: domain.StatusFailed, + Status: execution.Status, Error: execution.Error, }, nil } @@ -132,12 +175,30 @@ func (s *executionService) executeSync(ctx context.Context, execution *domain.Fu execution.Error = result.Error execution.Duration = result.Duration execution.MemoryUsed = result.MemoryUsed - s.updateExecutionComplete(ctx, execution) + // Check if the result indicates a timeout if result.Error != "" { - execution.Status = domain.StatusFailed + s.logger.Debug("Execution result contains error", + zap.String("execution_id", execution.ID.String()), + zap.String("error", result.Error)) + if strings.Contains(result.Error, "timed out") { + s.logger.Debug("Result indicates timeout - setting status to timeout", + zap.String("execution_id", execution.ID.String()), + zap.String("error", result.Error)) + execution.Status = domain.StatusTimeout + } else { + s.logger.Debug("Result indicates failure - setting status to failed", + zap.String("execution_id", execution.ID.String()), + zap.String("error", result.Error)) + execution.Status = domain.StatusFailed + } + } else { + s.logger.Debug("Execution successful - no errors", + zap.String("execution_id", execution.ID.String())) } + s.updateExecutionComplete(ctx, execution) + return &domain.ExecuteFunctionResponse{ ExecutionID: execution.ID, Status: execution.Status, @@ -169,14 +230,51 @@ func (s *executionService) executeAsync(ctx context.Context, execution *domain.F return } + // Create timeout context for execution + execCtx := ctx + var cancel context.CancelFunc + if function.Timeout.Duration > 0 { + s.logger.Debug("Creating execution timeout context for async execution", + zap.String("execution_id", execution.ID.String()), + zap.String("function_id", function.ID.String()), + zap.Duration("timeout", function.Timeout.Duration)) + execCtx, cancel = context.WithTimeout(ctx, function.Timeout.Duration) + defer cancel() + } else { + s.logger.Debug("No timeout specified for async execution", + zap.String("execution_id", execution.ID.String()), + zap.String("function_id", function.ID.String())) + } + // Execute function - result, err := backend.Execute(ctx, function, execution.Input) + s.logger.Debug("Starting function execution in runtime backend (async)", + zap.String("execution_id", execution.ID.String()), + zap.String("function_id", function.ID.String()), + zap.String("runtime", string(function.Runtime))) + result, err := backend.Execute(execCtx, function, execution.Input) + s.logger.Debug("Function execution completed in runtime backend (async)", + zap.String("execution_id", execution.ID.String()), + zap.String("function_id", function.ID.String()), + zap.Error(err)) if err != nil { s.logger.Error("Async function execution failed", zap.String("execution_id", execution.ID.String()), zap.Error(err)) - execution.Status = domain.StatusFailed - execution.Error = fmt.Sprintf("execution failed: %v", err) + // Check if this was a timeout error + if execCtx.Err() == context.DeadlineExceeded { + s.logger.Debug("Async execution timed out (context deadline exceeded)", + zap.String("execution_id", execution.ID.String()), + zap.Duration("timeout", function.Timeout.Duration), + zap.Error(execCtx.Err())) + execution.Status = domain.StatusTimeout + execution.Error = fmt.Sprintf("function execution timed out after %v", function.Timeout.Duration) + } else { + s.logger.Debug("Async execution failed (non-timeout error)", + zap.String("execution_id", execution.ID.String()), + zap.Error(err)) + execution.Status = domain.StatusFailed + execution.Error = fmt.Sprintf("execution failed: %v", err) + } s.updateExecutionComplete(ctx, execution) return } @@ -193,8 +291,25 @@ func (s *executionService) executeAsync(ctx context.Context, execution *domain.F execution.Duration = result.Duration execution.MemoryUsed = result.MemoryUsed + // Check if the result indicates a timeout if result.Error != "" { - execution.Status = domain.StatusFailed + s.logger.Debug("Execution result contains error", + zap.String("execution_id", execution.ID.String()), + zap.String("error", result.Error)) + if strings.Contains(result.Error, "timed out") { + s.logger.Debug("Result indicates timeout - setting status to timeout", + zap.String("execution_id", execution.ID.String()), + zap.String("error", result.Error)) + execution.Status = domain.StatusTimeout + } else { + s.logger.Debug("Result indicates failure - setting status to failed", + zap.String("execution_id", execution.ID.String()), + zap.String("error", result.Error)) + execution.Status = domain.StatusFailed + } + } else { + s.logger.Debug("Execution successful - no errors", + zap.String("execution_id", execution.ID.String())) } s.updateExecutionComplete(ctx, execution)