Files
skybridge/faas/internal/services/execution_service.go
2025-08-31 17:01:07 -04:00

458 lines
15 KiB
Go

package services
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
"go.uber.org/zap"
"github.com/RyanCopley/skybridge/faas/internal/domain"
"github.com/RyanCopley/skybridge/faas/internal/repository"
"github.com/RyanCopley/skybridge/faas/internal/runtime"
"github.com/google/uuid"
)
type executionService struct {
executionRepo repository.ExecutionRepository
functionRepo repository.FunctionRepository
runtimeService RuntimeService
logger *zap.Logger
}
func NewExecutionService(
executionRepo repository.ExecutionRepository,
functionRepo repository.FunctionRepository,
runtimeService RuntimeService,
logger *zap.Logger,
) ExecutionService {
return &executionService{
executionRepo: executionRepo,
functionRepo: functionRepo,
runtimeService: runtimeService,
logger: logger,
}
}
func (s *executionService) Execute(ctx context.Context, req *domain.ExecuteFunctionRequest, userID string) (*domain.ExecuteFunctionResponse, error) {
// Get function definition
function, err := s.functionRepo.GetByID(ctx, req.FunctionID)
if err != nil {
return nil, fmt.Errorf("function not found: %w", err)
}
// Create execution record
// Initialize input with empty JSON if nil or empty
input := req.Input
if input == nil || len(input) == 0 {
input = json.RawMessage(`{}`)
}
execution := &domain.FunctionExecution{
ID: uuid.New(),
FunctionID: req.FunctionID,
Status: domain.StatusPending,
Input: input,
Output: json.RawMessage(`{}`), // Initialize with empty JSON object
ExecutorID: userID,
CreatedAt: time.Now(),
}
// Store execution
createdExecution, err := s.executionRepo.Create(ctx, execution)
if err != nil {
s.logger.Error("Failed to create execution record",
zap.String("function_id", req.FunctionID.String()),
zap.Error(err))
return nil, fmt.Errorf("failed to create execution record: %w", err)
}
if req.Async {
// Start async execution
go s.executeAsync(context.Background(), createdExecution, function)
return &domain.ExecuteFunctionResponse{
ExecutionID: createdExecution.ID,
Status: domain.StatusPending,
}, nil
} else {
// Execute synchronously
return s.executeSync(ctx, createdExecution, function)
}
}
func (s *executionService) executeSync(ctx context.Context, execution *domain.FunctionExecution, function *domain.FunctionDefinition) (*domain.ExecuteFunctionResponse, error) {
// Update status to running
execution.Status = domain.StatusRunning
execution.StartedAt = &[]time.Time{time.Now()}[0]
if _, err := s.executionRepo.Update(ctx, execution.ID, execution); err != nil {
s.logger.Warn("Failed to update execution status to running", zap.Error(err))
}
// Get runtime backend
backend, err := s.runtimeService.GetBackend(ctx, string(function.Runtime))
if err != nil {
execution.Status = domain.StatusFailed
execution.Error = fmt.Sprintf("failed to get runtime backend: %v", err)
s.updateExecutionComplete(ctx, execution)
return &domain.ExecuteFunctionResponse{
ExecutionID: execution.ID,
Status: domain.StatusFailed,
Error: execution.Error,
}, nil
}
// Create timeout context for execution
execCtx := ctx
var cancel context.CancelFunc
if function.Timeout.Duration > 0 {
execCtx, cancel = context.WithTimeout(ctx, function.Timeout.Duration)
defer cancel()
}
// Define log streaming callback
logCallback := func(logs []string) error {
s.logger.Info("Log streaming callback called",
zap.String("execution_id", execution.ID.String()),
zap.Int("log_count", len(logs)),
zap.Strings("logs_preview", logs))
// Update execution with current logs using background context
// to ensure updates continue even after HTTP request completes
// Create a copy of the execution to avoid race conditions
execCopy := *execution
execCopy.Logs = logs
_, err := s.executionRepo.Update(context.Background(), execution.ID, &execCopy)
if err == nil {
// Only update the original if database update succeeds
execution.Logs = logs
s.logger.Info("Successfully updated execution with logs in database",
zap.String("execution_id", execution.ID.String()),
zap.Int("log_count", len(logs)))
} else {
s.logger.Error("Failed to update execution with logs in database",
zap.String("execution_id", execution.ID.String()),
zap.Error(err))
}
return err
}
// Check if backend supports log streaming
type logStreamingBackend interface {
ExecuteWithLogStreaming(ctx context.Context, function *domain.FunctionDefinition, input json.RawMessage, logCallback runtime.LogStreamCallback) (*domain.ExecutionResult, error)
}
var result *domain.ExecutionResult
if lsBackend, ok := backend.(logStreamingBackend); ok {
s.logger.Info("Backend supports log streaming, using ExecuteWithLogStreaming",
zap.String("execution_id", execution.ID.String()),
zap.String("function_id", function.ID.String()))
// Execute function with log streaming
result, err = lsBackend.ExecuteWithLogStreaming(execCtx, function, execution.Input, logCallback)
} else {
s.logger.Info("Backend does not support log streaming, using regular Execute",
zap.String("execution_id", execution.ID.String()),
zap.String("function_id", function.ID.String()))
// Fallback to regular execute
result, err = backend.Execute(execCtx, function, execution.Input)
}
if err != nil {
// Check if this was a timeout error
if execCtx.Err() == context.DeadlineExceeded {
execution.Status = domain.StatusTimeout
execution.Error = fmt.Sprintf("function execution timed out after %v", function.Timeout.Duration)
} else {
execution.Status = domain.StatusFailed
execution.Error = fmt.Sprintf("execution failed: %v", err)
}
s.updateExecutionComplete(ctx, execution)
return &domain.ExecuteFunctionResponse{
ExecutionID: execution.ID,
Status: execution.Status,
Error: execution.Error,
}, nil
}
// Update execution with results
execution.Status = domain.StatusCompleted
// Handle empty output
if len(result.Output) == 0 {
execution.Output = json.RawMessage(`{}`)
} else {
execution.Output = result.Output
}
execution.Error = result.Error
execution.Duration = result.Duration
execution.MemoryUsed = result.MemoryUsed
execution.Logs = result.Logs
// Check if the result indicates a timeout
if result.Error != "" {
if strings.Contains(result.Error, "timed out") {
execution.Status = domain.StatusTimeout
} else {
execution.Status = domain.StatusFailed
}
}
s.updateExecutionComplete(ctx, execution)
return &domain.ExecuteFunctionResponse{
ExecutionID: execution.ID,
Status: execution.Status,
Output: execution.Output,
Error: execution.Error,
Duration: execution.Duration,
MemoryUsed: execution.MemoryUsed,
}, nil
}
func (s *executionService) executeAsync(ctx context.Context, execution *domain.FunctionExecution, function *domain.FunctionDefinition) {
// Update status to running
execution.Status = domain.StatusRunning
execution.StartedAt = &[]time.Time{time.Now()}[0]
if _, err := s.executionRepo.Update(ctx, execution.ID, execution); err != nil {
s.logger.Warn("Failed to update execution status to running", zap.Error(err))
}
// Get runtime backend
backend, err := s.runtimeService.GetBackend(ctx, string(function.Runtime))
if err != nil {
s.logger.Error("Failed to get runtime backend for async execution",
zap.String("execution_id", execution.ID.String()),
zap.Error(err))
execution.Status = domain.StatusFailed
execution.Error = fmt.Sprintf("failed to get runtime backend: %v", err)
s.updateExecutionComplete(ctx, execution)
return
}
// Create timeout context for execution
execCtx := ctx
var cancel context.CancelFunc
if function.Timeout.Duration > 0 {
execCtx, cancel = context.WithTimeout(ctx, function.Timeout.Duration)
defer cancel()
}
// Define log streaming callback
logCallback := func(logs []string) error {
s.logger.Info("Log streaming callback called",
zap.String("execution_id", execution.ID.String()),
zap.Int("log_count", len(logs)),
zap.Strings("logs_preview", logs))
// Update execution with current logs using background context
// to ensure updates continue even after HTTP request completes
// Create a copy of the execution to avoid race conditions
execCopy := *execution
execCopy.Logs = logs
_, err := s.executionRepo.Update(context.Background(), execution.ID, &execCopy)
if err == nil {
// Only update the original if database update succeeds
execution.Logs = logs
s.logger.Info("Successfully updated execution with logs in database",
zap.String("execution_id", execution.ID.String()),
zap.Int("log_count", len(logs)))
} else {
s.logger.Error("Failed to update execution with logs in database",
zap.String("execution_id", execution.ID.String()),
zap.Error(err))
}
return err
}
// Check if backend supports log streaming
type logStreamingBackend interface {
ExecuteWithLogStreaming(ctx context.Context, function *domain.FunctionDefinition, input json.RawMessage, logCallback runtime.LogStreamCallback) (*domain.ExecutionResult, error)
}
var result *domain.ExecutionResult
if lsBackend, ok := backend.(logStreamingBackend); ok {
s.logger.Info("Backend supports log streaming, using ExecuteWithLogStreaming",
zap.String("execution_id", execution.ID.String()),
zap.String("function_id", function.ID.String()))
// Execute function with log streaming
result, err = lsBackend.ExecuteWithLogStreaming(execCtx, function, execution.Input, logCallback)
} else {
s.logger.Info("Backend does not support log streaming, using regular Execute",
zap.String("execution_id", execution.ID.String()),
zap.String("function_id", function.ID.String()))
// Fallback to regular execute
result, err = backend.Execute(execCtx, function, execution.Input)
}
if err != nil {
// Check if this was a timeout error
if execCtx.Err() == context.DeadlineExceeded {
execution.Status = domain.StatusTimeout
execution.Error = fmt.Sprintf("function execution timed out after %v", function.Timeout.Duration)
} else {
execution.Status = domain.StatusFailed
execution.Error = fmt.Sprintf("execution failed: %v", err)
}
s.updateExecutionComplete(ctx, execution)
return
}
// Update execution with results
execution.Status = domain.StatusCompleted
// Handle empty output
if len(result.Output) == 0 {
execution.Output = json.RawMessage(`{}`)
} else {
execution.Output = result.Output
}
execution.Error = result.Error
execution.Duration = result.Duration
execution.MemoryUsed = result.MemoryUsed
execution.Logs = result.Logs
// Check if the result indicates a timeout
if result.Error != "" {
if strings.Contains(result.Error, "timed out") {
execution.Status = domain.StatusTimeout
} else {
execution.Status = domain.StatusFailed
}
}
s.updateExecutionComplete(ctx, execution)
s.logger.Info("Async function execution completed",
zap.String("execution_id", execution.ID.String()),
zap.String("status", string(execution.Status)),
zap.Duration("duration", execution.Duration))
}
func (s *executionService) updateExecutionComplete(ctx context.Context, execution *domain.FunctionExecution) {
execution.CompletedAt = &[]time.Time{time.Now()}[0]
if _, err := s.executionRepo.Update(ctx, execution.ID, execution); err != nil {
s.logger.Error("Failed to update execution completion",
zap.String("execution_id", execution.ID.String()),
zap.Error(err))
}
}
func (s *executionService) GetByID(ctx context.Context, id uuid.UUID) (*domain.FunctionExecution, error) {
execution, err := s.executionRepo.GetByID(ctx, id)
if err != nil {
return nil, fmt.Errorf("execution not found: %w", err)
}
return execution, nil
}
func (s *executionService) List(ctx context.Context, functionID *uuid.UUID, limit, offset int) ([]*domain.FunctionExecution, error) {
if limit <= 0 {
limit = 50 // Default limit
}
if limit > 100 {
limit = 100 // Max limit
}
return s.executionRepo.List(ctx, functionID, limit, offset)
}
func (s *executionService) GetByFunctionID(ctx context.Context, functionID uuid.UUID, limit, offset int) ([]*domain.FunctionExecution, error) {
if limit <= 0 {
limit = 50 // Default limit
}
if limit > 100 {
limit = 100 // Max limit
}
return s.executionRepo.GetByFunctionID(ctx, functionID, limit, offset)
}
func (s *executionService) Cancel(ctx context.Context, id uuid.UUID, userID string) error {
s.logger.Info("Canceling execution",
zap.String("execution_id", id.String()),
zap.String("user_id", userID))
// Get execution
execution, err := s.executionRepo.GetByID(ctx, id)
if err != nil {
return fmt.Errorf("execution not found: %w", err)
}
// Check if execution is still running
if execution.Status != domain.StatusRunning && execution.Status != domain.StatusPending {
return fmt.Errorf("execution is not running (status: %s)", execution.Status)
}
// Get function to determine runtime
function, err := s.functionRepo.GetByID(ctx, execution.FunctionID)
if err != nil {
return fmt.Errorf("function not found: %w", err)
}
// Stop execution in runtime
backend, err := s.runtimeService.GetBackend(ctx, string(function.Runtime))
if err != nil {
return fmt.Errorf("failed to get runtime backend: %w", err)
}
if err := backend.StopExecution(ctx, id); err != nil {
s.logger.Warn("Failed to stop execution in runtime",
zap.String("execution_id", id.String()),
zap.Error(err))
}
// Update execution status
execution.Status = domain.StatusCanceled
execution.Error = "execution canceled by user"
execution.CompletedAt = &[]time.Time{time.Now()}[0]
if _, err := s.executionRepo.Update(ctx, execution.ID, execution); err != nil {
return fmt.Errorf("failed to update execution status: %w", err)
}
s.logger.Info("Execution canceled successfully",
zap.String("execution_id", id.String()))
return nil
}
func (s *executionService) GetLogs(ctx context.Context, id uuid.UUID) ([]string, error) {
s.logger.Debug("GetLogs called in execution service",
zap.String("execution_id", id.String()))
// Get execution with logs from database
execution, err := s.executionRepo.GetByID(ctx, id)
if err != nil {
s.logger.Error("Failed to get execution from database in GetLogs",
zap.String("execution_id", id.String()),
zap.Error(err))
return nil, fmt.Errorf("execution not found: %w", err)
}
s.logger.Info("Retrieved execution from database",
zap.String("execution_id", id.String()),
zap.String("status", string(execution.Status)),
zap.Int("log_count", len(execution.Logs)),
zap.Bool("logs_nil", execution.Logs == nil))
// Return logs from execution record
if execution.Logs == nil {
s.logger.Debug("Execution has nil logs, returning empty slice",
zap.String("execution_id", id.String()))
return []string{}, nil
}
s.logger.Debug("Returning logs from execution",
zap.String("execution_id", id.String()),
zap.Int("log_count", len(execution.Logs)))
return execution.Logs, nil
}
func (s *executionService) GetRunningExecutions(ctx context.Context) ([]*domain.FunctionExecution, error) {
return s.executionRepo.GetRunningExecutions(ctx)
}