-
This commit is contained in:
@ -83,12 +83,6 @@ func NewSimpleDockerRuntime(logger *zap.Logger) (*SimpleDockerRuntime, error) {
|
|||||||
func (s *SimpleDockerRuntime) Execute(ctx context.Context, function *domain.FunctionDefinition, input json.RawMessage) (*domain.ExecutionResult, error) {
|
func (s *SimpleDockerRuntime) Execute(ctx context.Context, function *domain.FunctionDefinition, input json.RawMessage) (*domain.ExecutionResult, error) {
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
|
|
||||||
s.logger.Info("🐳 TIMEOUT DEBUG: Executing function in Docker container",
|
|
||||||
zap.String("function_id", function.ID.String()),
|
|
||||||
zap.String("name", function.Name),
|
|
||||||
zap.String("image", function.Image),
|
|
||||||
zap.Duration("timeout", function.Timeout.Duration))
|
|
||||||
|
|
||||||
// Create container
|
// Create container
|
||||||
containerID, err := s.createContainer(ctx, function, input)
|
containerID, err := s.createContainer(ctx, function, input)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -105,20 +99,12 @@ func (s *SimpleDockerRuntime) Execute(ctx context.Context, function *domain.Func
|
|||||||
var timeoutCtx context.Context
|
var timeoutCtx context.Context
|
||||||
var cancel context.CancelFunc
|
var cancel context.CancelFunc
|
||||||
if function.Timeout.Duration > 0 {
|
if function.Timeout.Duration > 0 {
|
||||||
s.logger.Info("⏰ TIMEOUT DEBUG: Creating timeout context for function execution",
|
|
||||||
zap.Duration("timeout", function.Timeout.Duration),
|
|
||||||
zap.String("container_id", containerID))
|
|
||||||
timeoutCtx, cancel = context.WithTimeout(ctx, function.Timeout.Duration)
|
timeoutCtx, cancel = context.WithTimeout(ctx, function.Timeout.Duration)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
} else {
|
} else {
|
||||||
s.logger.Info("❌ TIMEOUT DEBUG: No timeout specified, using original context",
|
|
||||||
zap.String("container_id", containerID))
|
|
||||||
timeoutCtx = ctx
|
timeoutCtx = ctx
|
||||||
}
|
}
|
||||||
|
|
||||||
s.logger.Debug("Starting container wait with timeout",
|
|
||||||
zap.String("container_id", containerID),
|
|
||||||
zap.Duration("timeout", function.Timeout.Duration))
|
|
||||||
|
|
||||||
// Wait for container to finish with timeout
|
// Wait for container to finish with timeout
|
||||||
statusCh, errCh := s.client.ContainerWait(timeoutCtx, containerID, container.WaitConditionNotRunning)
|
statusCh, errCh := s.client.ContainerWait(timeoutCtx, containerID, container.WaitConditionNotRunning)
|
||||||
@ -126,28 +112,16 @@ func (s *SimpleDockerRuntime) Execute(ctx context.Context, function *domain.Func
|
|||||||
var timedOut bool
|
var timedOut bool
|
||||||
select {
|
select {
|
||||||
case err := <-errCh:
|
case err := <-errCh:
|
||||||
s.logger.Debug("Container wait returned error",
|
|
||||||
zap.String("container_id", containerID),
|
|
||||||
zap.Error(err))
|
|
||||||
s.cleanupContainer(ctx, containerID)
|
s.cleanupContainer(ctx, containerID)
|
||||||
return nil, fmt.Errorf("error waiting for container: %w", err)
|
return nil, fmt.Errorf("error waiting for container: %w", err)
|
||||||
case status := <-statusCh:
|
case <-statusCh:
|
||||||
s.logger.Debug("Container finished normally",
|
|
||||||
zap.String("container_id", containerID),
|
|
||||||
zap.Int64("exit_code", status.StatusCode))
|
|
||||||
// Container finished normally
|
// Container finished normally
|
||||||
case <-timeoutCtx.Done():
|
case <-timeoutCtx.Done():
|
||||||
// Timeout occurred
|
// Timeout occurred
|
||||||
timedOut = true
|
timedOut = true
|
||||||
s.logger.Info("💥 TIMEOUT DEBUG: Function execution timed out, stopping container",
|
|
||||||
zap.String("container_id", containerID),
|
|
||||||
zap.Duration("timeout", function.Timeout.Duration),
|
|
||||||
zap.Error(timeoutCtx.Err()))
|
|
||||||
|
|
||||||
// Stop the container in the background - don't wait for it to complete
|
// Stop the container in the background - don't wait for it to complete
|
||||||
go func() {
|
go func() {
|
||||||
s.logger.Debug("Attempting to stop timed out container in background",
|
|
||||||
zap.String("container_id", containerID))
|
|
||||||
// Use a very short timeout for stopping, then kill if needed
|
// Use a very short timeout for stopping, then kill if needed
|
||||||
if err := s.client.ContainerStop(context.Background(), containerID, container.StopOptions{
|
if err := s.client.ContainerStop(context.Background(), containerID, container.StopOptions{
|
||||||
Timeout: &[]int{1}[0], // Only 1 second grace period for stop
|
Timeout: &[]int{1}[0], // Only 1 second grace period for stop
|
||||||
@ -160,13 +134,7 @@ func (s *SimpleDockerRuntime) Execute(ctx context.Context, function *domain.Func
|
|||||||
s.logger.Error("Failed to kill timed out container",
|
s.logger.Error("Failed to kill timed out container",
|
||||||
zap.String("container_id", containerID),
|
zap.String("container_id", containerID),
|
||||||
zap.Error(killErr))
|
zap.Error(killErr))
|
||||||
} else {
|
|
||||||
s.logger.Debug("Successfully killed timed out container",
|
|
||||||
zap.String("container_id", containerID))
|
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
s.logger.Debug("Successfully stopped timed out container",
|
|
||||||
zap.String("container_id", containerID))
|
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
@ -176,8 +144,6 @@ func (s *SimpleDockerRuntime) Execute(ctx context.Context, function *domain.Func
|
|||||||
|
|
||||||
// For timed-out containers, skip log retrieval and inspection to return quickly
|
// For timed-out containers, skip log retrieval and inspection to return quickly
|
||||||
if timedOut {
|
if timedOut {
|
||||||
s.logger.Debug("Skipping log retrieval and inspection for timed-out container",
|
|
||||||
zap.String("container_id", containerID))
|
|
||||||
logs = []string{"Container execution timed out"}
|
logs = []string{"Container execution timed out"}
|
||||||
} else {
|
} else {
|
||||||
// Get container logs
|
// Get container logs
|
||||||
@ -205,19 +171,11 @@ func (s *SimpleDockerRuntime) Execute(ctx context.Context, function *domain.Func
|
|||||||
|
|
||||||
// Handle timeout case
|
// Handle timeout case
|
||||||
if timedOut {
|
if timedOut {
|
||||||
s.logger.Debug("Processing timeout result",
|
|
||||||
zap.String("container_id", containerID),
|
|
||||||
zap.Duration("timeout", function.Timeout.Duration))
|
|
||||||
result.Error = fmt.Sprintf("Function execution timed out after %v", function.Timeout.Duration)
|
result.Error = fmt.Sprintf("Function execution timed out after %v", function.Timeout.Duration)
|
||||||
result.Output = json.RawMessage(`{"error": "Function execution timed out"}`)
|
result.Output = json.RawMessage(`{"error": "Function execution timed out"}`)
|
||||||
} else {
|
} else {
|
||||||
s.logger.Debug("Processing normal execution result",
|
|
||||||
zap.String("container_id", containerID))
|
|
||||||
// Try to get output from container for successful executions
|
// Try to get output from container for successful executions
|
||||||
if stats.State != nil {
|
if stats.State != nil {
|
||||||
s.logger.Debug("Container state available",
|
|
||||||
zap.String("container_id", containerID),
|
|
||||||
zap.Int("exit_code", stats.State.ExitCode))
|
|
||||||
if stats.State.ExitCode == 0 {
|
if stats.State.ExitCode == 0 {
|
||||||
// Try to get output from container
|
// Try to get output from container
|
||||||
output, err := s.getContainerOutput(ctx, containerID)
|
output, err := s.getContainerOutput(ctx, containerID)
|
||||||
@ -225,28 +183,19 @@ func (s *SimpleDockerRuntime) Execute(ctx context.Context, function *domain.Func
|
|||||||
s.logger.Warn("Failed to get container output", zap.Error(err))
|
s.logger.Warn("Failed to get container output", zap.Error(err))
|
||||||
result.Output = json.RawMessage(`{"error": "Failed to retrieve output"}`)
|
result.Output = json.RawMessage(`{"error": "Failed to retrieve output"}`)
|
||||||
} else {
|
} else {
|
||||||
s.logger.Debug("Successfully retrieved container output",
|
|
||||||
zap.String("container_id", containerID),
|
|
||||||
zap.Int("output_size", len(output)))
|
|
||||||
result.Output = output
|
result.Output = output
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
s.logger.Debug("Container exited with non-zero code",
|
|
||||||
zap.String("container_id", containerID),
|
|
||||||
zap.Int("exit_code", stats.State.ExitCode))
|
|
||||||
result.Error = fmt.Sprintf("Container exited with code %d", stats.State.ExitCode)
|
result.Error = fmt.Sprintf("Container exited with code %d", stats.State.ExitCode)
|
||||||
result.Output = json.RawMessage(`{"error": "Container execution failed"}`)
|
result.Output = json.RawMessage(`{"error": "Container execution failed"}`)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
s.logger.Warn("Container state not available",
|
s.logger.Warn("Container state not available")
|
||||||
zap.String("container_id", containerID))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cleanup container - for timed-out containers, do this in background
|
// Cleanup container - for timed-out containers, do this in background
|
||||||
if timedOut {
|
if timedOut {
|
||||||
s.logger.Debug("Scheduling background cleanup for timed-out container",
|
|
||||||
zap.String("container_id", containerID))
|
|
||||||
go func() {
|
go func() {
|
||||||
s.cleanupContainer(context.Background(), containerID)
|
s.cleanupContainer(context.Background(), containerID)
|
||||||
}()
|
}()
|
||||||
|
|||||||
@ -36,21 +36,11 @@ func NewExecutionService(
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *executionService) Execute(ctx context.Context, req *domain.ExecuteFunctionRequest, userID string) (*domain.ExecuteFunctionResponse, error) {
|
func (s *executionService) Execute(ctx context.Context, req *domain.ExecuteFunctionRequest, userID string) (*domain.ExecuteFunctionResponse, error) {
|
||||||
s.logger.Info("🔄 TIMEOUT DEBUG: Starting function execution",
|
|
||||||
zap.String("function_id", req.FunctionID.String()),
|
|
||||||
zap.String("user_id", userID),
|
|
||||||
zap.Bool("async", req.Async))
|
|
||||||
|
|
||||||
// Get function definition
|
// Get function definition
|
||||||
function, err := s.functionRepo.GetByID(ctx, req.FunctionID)
|
function, err := s.functionRepo.GetByID(ctx, req.FunctionID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("function not found: %w", err)
|
return nil, fmt.Errorf("function not found: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
s.logger.Info("⏱️ TIMEOUT DEBUG: Function loaded with timeout",
|
|
||||||
zap.String("function_id", req.FunctionID.String()),
|
|
||||||
zap.String("function_name", function.Name),
|
|
||||||
zap.Duration("timeout", function.Timeout.Duration))
|
|
||||||
|
|
||||||
// Create execution record
|
// Create execution record
|
||||||
// Initialize input with empty JSON if nil or empty
|
// Initialize input with empty JSON if nil or empty
|
||||||
@ -118,41 +108,18 @@ func (s *executionService) executeSync(ctx context.Context, execution *domain.Fu
|
|||||||
execCtx := ctx
|
execCtx := ctx
|
||||||
var cancel context.CancelFunc
|
var cancel context.CancelFunc
|
||||||
if function.Timeout.Duration > 0 {
|
if function.Timeout.Duration > 0 {
|
||||||
s.logger.Info("⏰ TIMEOUT DEBUG: Creating execution timeout context for sync execution",
|
|
||||||
zap.String("execution_id", execution.ID.String()),
|
|
||||||
zap.String("function_id", function.ID.String()),
|
|
||||||
zap.Duration("timeout", function.Timeout.Duration))
|
|
||||||
execCtx, cancel = context.WithTimeout(ctx, function.Timeout.Duration)
|
execCtx, cancel = context.WithTimeout(ctx, function.Timeout.Duration)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
} else {
|
|
||||||
s.logger.Info("❌ TIMEOUT DEBUG: No timeout specified for sync execution",
|
|
||||||
zap.String("execution_id", execution.ID.String()),
|
|
||||||
zap.String("function_id", function.ID.String()))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Execute function
|
// Execute function
|
||||||
s.logger.Debug("Starting function execution in runtime backend (sync)",
|
|
||||||
zap.String("execution_id", execution.ID.String()),
|
|
||||||
zap.String("function_id", function.ID.String()),
|
|
||||||
zap.String("runtime", string(function.Runtime)))
|
|
||||||
result, err := backend.Execute(execCtx, function, execution.Input)
|
result, err := backend.Execute(execCtx, function, execution.Input)
|
||||||
s.logger.Debug("Function execution completed in runtime backend (sync)",
|
|
||||||
zap.String("execution_id", execution.ID.String()),
|
|
||||||
zap.String("function_id", function.ID.String()),
|
|
||||||
zap.Error(err))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Check if this was a timeout error
|
// Check if this was a timeout error
|
||||||
if execCtx.Err() == context.DeadlineExceeded {
|
if execCtx.Err() == context.DeadlineExceeded {
|
||||||
s.logger.Debug("Sync execution timed out (context deadline exceeded)",
|
|
||||||
zap.String("execution_id", execution.ID.String()),
|
|
||||||
zap.Duration("timeout", function.Timeout.Duration),
|
|
||||||
zap.Error(execCtx.Err()))
|
|
||||||
execution.Status = domain.StatusTimeout
|
execution.Status = domain.StatusTimeout
|
||||||
execution.Error = fmt.Sprintf("function execution timed out after %v", function.Timeout.Duration)
|
execution.Error = fmt.Sprintf("function execution timed out after %v", function.Timeout.Duration)
|
||||||
} else {
|
} else {
|
||||||
s.logger.Debug("Sync execution failed (non-timeout error)",
|
|
||||||
zap.String("execution_id", execution.ID.String()),
|
|
||||||
zap.Error(err))
|
|
||||||
execution.Status = domain.StatusFailed
|
execution.Status = domain.StatusFailed
|
||||||
execution.Error = fmt.Sprintf("execution failed: %v", err)
|
execution.Error = fmt.Sprintf("execution failed: %v", err)
|
||||||
}
|
}
|
||||||
@ -178,23 +145,11 @@ func (s *executionService) executeSync(ctx context.Context, execution *domain.Fu
|
|||||||
|
|
||||||
// Check if the result indicates a timeout
|
// Check if the result indicates a timeout
|
||||||
if result.Error != "" {
|
if result.Error != "" {
|
||||||
s.logger.Debug("Execution result contains error",
|
|
||||||
zap.String("execution_id", execution.ID.String()),
|
|
||||||
zap.String("error", result.Error))
|
|
||||||
if strings.Contains(result.Error, "timed out") {
|
if strings.Contains(result.Error, "timed out") {
|
||||||
s.logger.Debug("Result indicates timeout - setting status to timeout",
|
|
||||||
zap.String("execution_id", execution.ID.String()),
|
|
||||||
zap.String("error", result.Error))
|
|
||||||
execution.Status = domain.StatusTimeout
|
execution.Status = domain.StatusTimeout
|
||||||
} else {
|
} else {
|
||||||
s.logger.Debug("Result indicates failure - setting status to failed",
|
|
||||||
zap.String("execution_id", execution.ID.String()),
|
|
||||||
zap.String("error", result.Error))
|
|
||||||
execution.Status = domain.StatusFailed
|
execution.Status = domain.StatusFailed
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
s.logger.Debug("Execution successful - no errors",
|
|
||||||
zap.String("execution_id", execution.ID.String()))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
s.updateExecutionComplete(ctx, execution)
|
s.updateExecutionComplete(ctx, execution)
|
||||||
@ -234,44 +189,18 @@ func (s *executionService) executeAsync(ctx context.Context, execution *domain.F
|
|||||||
execCtx := ctx
|
execCtx := ctx
|
||||||
var cancel context.CancelFunc
|
var cancel context.CancelFunc
|
||||||
if function.Timeout.Duration > 0 {
|
if function.Timeout.Duration > 0 {
|
||||||
s.logger.Debug("Creating execution timeout context for async execution",
|
|
||||||
zap.String("execution_id", execution.ID.String()),
|
|
||||||
zap.String("function_id", function.ID.String()),
|
|
||||||
zap.Duration("timeout", function.Timeout.Duration))
|
|
||||||
execCtx, cancel = context.WithTimeout(ctx, function.Timeout.Duration)
|
execCtx, cancel = context.WithTimeout(ctx, function.Timeout.Duration)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
} else {
|
|
||||||
s.logger.Debug("No timeout specified for async execution",
|
|
||||||
zap.String("execution_id", execution.ID.String()),
|
|
||||||
zap.String("function_id", function.ID.String()))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Execute function
|
// Execute function
|
||||||
s.logger.Debug("Starting function execution in runtime backend (async)",
|
|
||||||
zap.String("execution_id", execution.ID.String()),
|
|
||||||
zap.String("function_id", function.ID.String()),
|
|
||||||
zap.String("runtime", string(function.Runtime)))
|
|
||||||
result, err := backend.Execute(execCtx, function, execution.Input)
|
result, err := backend.Execute(execCtx, function, execution.Input)
|
||||||
s.logger.Debug("Function execution completed in runtime backend (async)",
|
|
||||||
zap.String("execution_id", execution.ID.String()),
|
|
||||||
zap.String("function_id", function.ID.String()),
|
|
||||||
zap.Error(err))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.logger.Error("Async function execution failed",
|
|
||||||
zap.String("execution_id", execution.ID.String()),
|
|
||||||
zap.Error(err))
|
|
||||||
// Check if this was a timeout error
|
// Check if this was a timeout error
|
||||||
if execCtx.Err() == context.DeadlineExceeded {
|
if execCtx.Err() == context.DeadlineExceeded {
|
||||||
s.logger.Debug("Async execution timed out (context deadline exceeded)",
|
|
||||||
zap.String("execution_id", execution.ID.String()),
|
|
||||||
zap.Duration("timeout", function.Timeout.Duration),
|
|
||||||
zap.Error(execCtx.Err()))
|
|
||||||
execution.Status = domain.StatusTimeout
|
execution.Status = domain.StatusTimeout
|
||||||
execution.Error = fmt.Sprintf("function execution timed out after %v", function.Timeout.Duration)
|
execution.Error = fmt.Sprintf("function execution timed out after %v", function.Timeout.Duration)
|
||||||
} else {
|
} else {
|
||||||
s.logger.Debug("Async execution failed (non-timeout error)",
|
|
||||||
zap.String("execution_id", execution.ID.String()),
|
|
||||||
zap.Error(err))
|
|
||||||
execution.Status = domain.StatusFailed
|
execution.Status = domain.StatusFailed
|
||||||
execution.Error = fmt.Sprintf("execution failed: %v", err)
|
execution.Error = fmt.Sprintf("execution failed: %v", err)
|
||||||
}
|
}
|
||||||
@ -293,23 +222,11 @@ func (s *executionService) executeAsync(ctx context.Context, execution *domain.F
|
|||||||
|
|
||||||
// Check if the result indicates a timeout
|
// Check if the result indicates a timeout
|
||||||
if result.Error != "" {
|
if result.Error != "" {
|
||||||
s.logger.Debug("Execution result contains error",
|
|
||||||
zap.String("execution_id", execution.ID.String()),
|
|
||||||
zap.String("error", result.Error))
|
|
||||||
if strings.Contains(result.Error, "timed out") {
|
if strings.Contains(result.Error, "timed out") {
|
||||||
s.logger.Debug("Result indicates timeout - setting status to timeout",
|
|
||||||
zap.String("execution_id", execution.ID.String()),
|
|
||||||
zap.String("error", result.Error))
|
|
||||||
execution.Status = domain.StatusTimeout
|
execution.Status = domain.StatusTimeout
|
||||||
} else {
|
} else {
|
||||||
s.logger.Debug("Result indicates failure - setting status to failed",
|
|
||||||
zap.String("execution_id", execution.ID.String()),
|
|
||||||
zap.String("error", result.Error))
|
|
||||||
execution.Status = domain.StatusFailed
|
execution.Status = domain.StatusFailed
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
s.logger.Debug("Execution successful - no errors",
|
|
||||||
zap.String("execution_id", execution.ID.String()))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
s.updateExecutionComplete(ctx, execution)
|
s.updateExecutionComplete(ctx, execution)
|
||||||
|
|||||||
Reference in New Issue
Block a user