diff --git a/faas/cmd/server/main.go b/faas/cmd/server/main.go index cdd9369..37001ca 100644 --- a/faas/cmd/server/main.go +++ b/faas/cmd/server/main.go @@ -6,6 +6,7 @@ import ( "net/http" "os" "os/signal" + "strings" "syscall" "time" @@ -126,10 +127,29 @@ func initLogger(cfg config.ConfigProvider) *zap.Logger { var logger *zap.Logger var err error - if cfg.IsProduction() { + logLevel := cfg.GetString("FAAS_LOG_LEVEL") + + if cfg.IsProduction() && logLevel != "debug" { logger, err = zap.NewProduction() } else { - logger, err = zap.NewDevelopment() + // Use development logger for non-production or when debug is explicitly requested + config := zap.NewDevelopmentConfig() + + // Set log level based on environment variable + switch strings.ToLower(logLevel) { + case "debug": + config.Level = zap.NewAtomicLevelAt(zap.DebugLevel) + case "info": + config.Level = zap.NewAtomicLevelAt(zap.InfoLevel) + case "warn": + config.Level = zap.NewAtomicLevelAt(zap.WarnLevel) + case "error": + config.Level = zap.NewAtomicLevelAt(zap.ErrorLevel) + default: + config.Level = zap.NewAtomicLevelAt(zap.DebugLevel) // Default to debug for development + } + + logger, err = config.Build() } if err != nil { diff --git a/faas/faas-server b/faas/faas-server deleted file mode 100755 index a5d1e70..0000000 Binary files a/faas/faas-server and /dev/null differ diff --git a/faas/internal/domain/models.go b/faas/internal/domain/models.go index 3609a4c..7960cff 100644 --- a/faas/internal/domain/models.go +++ b/faas/internal/domain/models.go @@ -46,37 +46,37 @@ type Owner struct { // FunctionDefinition represents a serverless function type FunctionDefinition struct { - ID uuid.UUID `json:"id" db:"id"` - Name string `json:"name" validate:"required,min=1,max=255" db:"name"` - AppID string `json:"app_id" validate:"required" db:"app_id"` - Runtime RuntimeType `json:"runtime" validate:"required" db:"runtime"` - Image string `json:"image" validate:"required" db:"image"` - Handler string `json:"handler" validate:"required" db:"handler"` - Code string `json:"code,omitempty" db:"code"` - Environment map[string]string `json:"environment,omitempty" db:"environment"` - Timeout Duration `json:"timeout" validate:"required" db:"timeout"` - Memory int `json:"memory" validate:"required,min=64,max=3008" db:"memory"` - Owner Owner `json:"owner" validate:"required"` - CreatedAt time.Time `json:"created_at" db:"created_at"` - UpdatedAt time.Time `json:"updated_at" db:"updated_at"` + ID uuid.UUID `json:"id" db:"id"` + Name string `json:"name" validate:"required,min=1,max=255" db:"name"` + AppID string `json:"app_id" validate:"required" db:"app_id"` + Runtime RuntimeType `json:"runtime" validate:"required" db:"runtime"` + Image string `json:"image" validate:"required" db:"image"` + Handler string `json:"handler" validate:"required" db:"handler"` + Code string `json:"code,omitempty" db:"code"` + Environment map[string]string `json:"environment,omitempty" db:"environment"` + Timeout Duration `json:"timeout" validate:"required" db:"timeout"` + Memory int `json:"memory" validate:"required,min=64,max=3008" db:"memory"` + Owner Owner `json:"owner" validate:"required"` + CreatedAt time.Time `json:"created_at" db:"created_at"` + UpdatedAt time.Time `json:"updated_at" db:"updated_at"` } // FunctionExecution represents a function execution type FunctionExecution struct { - ID uuid.UUID `json:"id" db:"id"` - FunctionID uuid.UUID `json:"function_id" db:"function_id"` - Status ExecutionStatus `json:"status" db:"status"` - Input json.RawMessage `json:"input,omitempty" db:"input"` - Output json.RawMessage `json:"output,omitempty" db:"output"` - Error string `json:"error,omitempty" db:"error"` - Duration time.Duration `json:"duration" db:"duration"` - MemoryUsed int `json:"memory_used" db:"memory_used"` - Logs []string `json:"logs,omitempty" db:"logs"` - ContainerID string `json:"container_id,omitempty" db:"container_id"` - ExecutorID string `json:"executor_id" db:"executor_id"` - CreatedAt time.Time `json:"created_at" db:"created_at"` - StartedAt *time.Time `json:"started_at,omitempty" db:"started_at"` - CompletedAt *time.Time `json:"completed_at,omitempty" db:"completed_at"` + ID uuid.UUID `json:"id" db:"id"` + FunctionID uuid.UUID `json:"function_id" db:"function_id"` + Status ExecutionStatus `json:"status" db:"status"` + Input json.RawMessage `json:"input,omitempty" db:"input"` + Output json.RawMessage `json:"output,omitempty" db:"output"` + Error string `json:"error,omitempty" db:"error"` + Duration time.Duration `json:"duration" db:"duration"` + MemoryUsed int `json:"memory_used" db:"memory_used"` + Logs []string `json:"logs,omitempty" db:"logs"` + ContainerID string `json:"container_id,omitempty" db:"container_id"` + ExecutorID string `json:"executor_id" db:"executor_id"` + CreatedAt time.Time `json:"created_at" db:"created_at"` + StartedAt *time.Time `json:"started_at,omitempty" db:"started_at"` + CompletedAt *time.Time `json:"completed_at,omitempty" db:"completed_at"` } // CreateFunctionRequest represents a request to create a new function @@ -115,12 +115,12 @@ type ExecuteFunctionRequest struct { // ExecuteFunctionResponse represents a response for function execution type ExecuteFunctionResponse struct { - ExecutionID uuid.UUID `json:"execution_id"` - Status ExecutionStatus `json:"status"` - Output json.RawMessage `json:"output,omitempty"` - Error string `json:"error,omitempty"` - Duration time.Duration `json:"duration,omitempty"` - MemoryUsed int `json:"memory_used,omitempty"` + ExecutionID uuid.UUID `json:"execution_id"` + Status ExecutionStatus `json:"status"` + Output json.RawMessage `json:"output,omitempty"` + Error string `json:"error,omitempty"` + Duration time.Duration `json:"duration,omitempty"` + MemoryUsed int `json:"memory_used,omitempty"` } // DeployFunctionRequest represents a request to deploy a function @@ -131,17 +131,17 @@ type DeployFunctionRequest struct { // DeployFunctionResponse represents a response for function deployment type DeployFunctionResponse struct { - Status string `json:"status"` - Message string `json:"message,omitempty"` - Image string `json:"image,omitempty"` - ImageID string `json:"image_id,omitempty"` + Status string `json:"status"` + Message string `json:"message,omitempty"` + Image string `json:"image,omitempty"` + ImageID string `json:"image_id,omitempty"` } // RuntimeInfo represents runtime information type RuntimeInfo struct { - Type RuntimeType `json:"type"` - Version string `json:"version"` - Available bool `json:"available"` + Type RuntimeType `json:"type"` + Version string `json:"version"` + Available bool `json:"available"` DefaultImage string `json:"default_image"` Description string `json:"description"` } @@ -161,4 +161,4 @@ type AuthContext struct { AppID string `json:"app_id"` Permissions []string `json:"permissions"` Claims map[string]string `json:"claims"` -} \ No newline at end of file +} diff --git a/faas/internal/handlers/execution.go b/faas/internal/handlers/execution.go index cfaeb83..0fca418 100644 --- a/faas/internal/handlers/execution.go +++ b/faas/internal/handlers/execution.go @@ -218,17 +218,29 @@ func (h *ExecutionHandler) Cancel(c *gin.Context) { func (h *ExecutionHandler) GetLogs(c *gin.Context) { idStr := c.Param("id") + h.logger.Debug("GetLogs endpoint called", + zap.String("execution_id", idStr), + zap.String("client_ip", c.ClientIP())) + id, err := uuid.Parse(idStr) if err != nil { + h.logger.Warn("Invalid execution ID provided to GetLogs", + zap.String("id", idStr), + zap.Error(err)) c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid execution ID"}) return } if !h.authService.HasPermission(c.Request.Context(), "faas.read") { + h.logger.Warn("Insufficient permissions for GetLogs", + zap.String("execution_id", idStr)) c.JSON(http.StatusForbidden, gin.H{"error": "Insufficient permissions"}) return } + h.logger.Debug("Calling execution service GetLogs", + zap.String("execution_id", idStr)) + logs, err := h.executionService.GetLogs(c.Request.Context(), id) if err != nil { h.logger.Error("Failed to get execution logs", zap.String("id", idStr), zap.Error(err)) @@ -236,6 +248,10 @@ func (h *ExecutionHandler) GetLogs(c *gin.Context) { return } + h.logger.Debug("Successfully retrieved logs from execution service", + zap.String("execution_id", idStr), + zap.Int("log_count", len(logs))) + c.JSON(http.StatusOK, gin.H{ "logs": logs, }) diff --git a/faas/internal/runtime/docker/simple.go b/faas/internal/runtime/docker/simple.go index 74f332f..b235710 100644 --- a/faas/internal/runtime/docker/simple.go +++ b/faas/internal/runtime/docker/simple.go @@ -7,6 +7,7 @@ import ( "io" "regexp" "strings" + "sync" "time" "github.com/docker/docker/api/types/container" @@ -27,19 +28,19 @@ type SimpleDockerRuntime struct { func NewSimpleDockerRuntime(logger *zap.Logger) (*SimpleDockerRuntime, error) { var cli *client.Client var err error - + // Try different socket paths with ping test socketPaths := []string{ "unix:///run/user/1000/podman/podman.sock", // Podman socket (mounted from host) "unix:///var/run/docker.sock", // Standard Docker socket } - + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - + for _, socketPath := range socketPaths { logger.Info("Attempting to connect to socket", zap.String("path", socketPath)) - + cli, err = client.NewClientWithOpts( client.WithHost(socketPath), client.WithAPIVersionNegotiation(), @@ -54,11 +55,11 @@ func NewSimpleDockerRuntime(logger *zap.Logger) (*SimpleDockerRuntime, error) { logger.Warn("Failed to ping daemon", zap.String("path", socketPath), zap.Error(err)) continue } - + logger.Info("Successfully connected to Docker/Podman", zap.String("path", socketPath)) break } - + // Final fallback to environment if cli == nil { logger.Info("Trying default Docker environment") @@ -66,12 +67,12 @@ func NewSimpleDockerRuntime(logger *zap.Logger) (*SimpleDockerRuntime, error) { if err != nil { return nil, fmt.Errorf("failed to create Docker client: %w", err) } - + if _, err := cli.Ping(ctx); err != nil { return nil, fmt.Errorf("failed to ping Docker/Podman daemon: %w", err) } } - + if cli == nil { return nil, fmt.Errorf("no working Docker/Podman socket found") } @@ -83,13 +84,26 @@ func NewSimpleDockerRuntime(logger *zap.Logger) (*SimpleDockerRuntime, error) { } func (s *SimpleDockerRuntime) Execute(ctx context.Context, function *domain.FunctionDefinition, input json.RawMessage) (*domain.ExecutionResult, error) { + return s.ExecuteWithLogStreaming(ctx, function, input, nil) +} + +func (s *SimpleDockerRuntime) ExecuteWithLogStreaming(ctx context.Context, function *domain.FunctionDefinition, input json.RawMessage, logCallback runtime.LogStreamCallback) (*domain.ExecutionResult, error) { startTime := time.Now() + + s.logger.Info("Starting ExecuteWithLogStreaming", + zap.String("function_id", function.ID.String()), + zap.String("function_name", function.Name), + zap.Bool("has_log_callback", logCallback != nil)) // Create container containerID, err := s.createContainer(ctx, function, input) if err != nil { return nil, fmt.Errorf("failed to create container: %w", err) } + + s.logger.Debug("Container created successfully", + zap.String("container_id", containerID), + zap.String("function_id", function.ID.String())) // Start container if err := s.client.ContainerStart(ctx, containerID, container.StartOptions{}); err != nil { @@ -97,23 +111,129 @@ func (s *SimpleDockerRuntime) Execute(ctx context.Context, function *domain.Func return nil, fmt.Errorf("failed to start container: %w", err) } + // Create channels for log streaming + logChan := make(chan string, 1000) // Buffer for logs + doneChan := make(chan struct{}) // Signal to stop streaming + + // Start log streaming in a goroutine + s.logger.Debug("Starting log streaming goroutine", + zap.String("container_id", containerID), + zap.String("function_id", function.ID.String())) + go s.streamContainerLogs(context.Background(), containerID, logChan, doneChan) + // Create timeout context based on function timeout var timeoutCtx context.Context var cancel context.CancelFunc if function.Timeout.Duration > 0 { timeoutCtx, cancel = context.WithTimeout(ctx, function.Timeout.Duration) defer cancel() + s.logger.Debug("Set execution timeout", + zap.Duration("timeout", function.Timeout.Duration), + zap.String("container_id", containerID)) } else { timeoutCtx = ctx + s.logger.Debug("No execution timeout set", + zap.String("container_id", containerID)) } + // For streaming logs, collect logs in a separate goroutine and call the callback + var streamedLogs []string + logsMutex := &sync.Mutex{} + + if logCallback != nil { + s.logger.Info("Starting log callback goroutine", + zap.String("container_id", containerID)) + go func() { + // Keep track of the last time we called the callback to avoid too frequent updates + lastUpdate := time.Now() + ticker := time.NewTicker(1 * time.Second) // Update at most once per second + defer ticker.Stop() + + for { + select { + case log, ok := <-logChan: + if !ok { + // Channel closed, exit the goroutine + s.logger.Debug("Log channel closed, exiting callback goroutine", + zap.String("container_id", containerID)) + return + } + + s.logger.Debug("Received log line from channel", + zap.String("container_id", containerID), + zap.String("log_line", log)) + + logsMutex.Lock() + streamedLogs = append(streamedLogs, log) + shouldUpdate := time.Since(lastUpdate) >= 1*time.Second + currentLogCount := len(streamedLogs) + logsMutex.Unlock() + + // Call the callback if it's been at least 1 second since last update + if shouldUpdate { + logsMutex.Lock() + logsCopy := make([]string, len(streamedLogs)) + copy(logsCopy, streamedLogs) + logsMutex.Unlock() + + s.logger.Info("Calling log callback with accumulated logs", + zap.String("container_id", containerID), + zap.Int("log_count", len(logsCopy))) + + // Call the callback with the current logs + if err := logCallback(logsCopy); err != nil { + s.logger.Error("Failed to stream logs to callback", + zap.String("container_id", containerID), + zap.Error(err)) + } + lastUpdate = time.Now() + } else { + s.logger.Debug("Skipping callback update (too frequent)", + zap.String("container_id", containerID), + zap.Int("current_log_count", currentLogCount), + zap.Duration("time_since_last_update", time.Since(lastUpdate))) + } + case <-ticker.C: + // Periodic update to ensure logs are streamed even if no new logs arrive + logsMutex.Lock() + if len(streamedLogs) > 0 && time.Since(lastUpdate) >= 1*time.Second { + logsCopy := make([]string, len(streamedLogs)) + copy(logsCopy, streamedLogs) + logCount := len(logsCopy) + logsMutex.Unlock() + + s.logger.Debug("Periodic callback update triggered", + zap.String("container_id", containerID), + zap.Int("log_count", logCount)) + + // Call the callback with the current logs + if err := logCallback(logsCopy); err != nil { + s.logger.Error("Failed to stream logs to callback (periodic)", + zap.String("container_id", containerID), + zap.Error(err)) + } + lastUpdate = time.Now() + } else { + logsMutex.Unlock() + s.logger.Debug("Skipping periodic callback (no logs or too frequent)", + zap.String("container_id", containerID), + zap.Duration("time_since_last_update", time.Since(lastUpdate))) + } + } + } + }() + } else { + s.logger.Debug("No log callback provided, logs will be collected at the end", + zap.String("container_id", containerID)) + } // Wait for container to finish with timeout statusCh, errCh := s.client.ContainerWait(timeoutCtx, containerID, container.WaitConditionNotRunning) - + var timedOut bool select { case err := <-errCh: + close(doneChan) // Stop log streaming s.cleanupContainer(ctx, containerID) return nil, fmt.Errorf("error waiting for container: %w", err) case <-statusCh: @@ -121,19 +241,20 @@ func (s *SimpleDockerRuntime) Execute(ctx context.Context, function *domain.Func case <-timeoutCtx.Done(): // Timeout occurred timedOut = true - + // doneChan will be closed below in the common cleanup + // Stop the container in the background - don't wait for it to complete go func() { // Use a very short timeout for stopping, then kill if needed if err := s.client.ContainerStop(context.Background(), containerID, container.StopOptions{ Timeout: &[]int{1}[0], // Only 1 second grace period for stop }); err != nil { - s.logger.Warn("Failed to stop timed out container gracefully, attempting to kill", + s.logger.Warn("Failed to stop timed out container gracefully, attempting to kill", zap.String("container_id", containerID), zap.Error(err)) // If stop fails, try to kill it immediately if killErr := s.client.ContainerKill(context.Background(), containerID, "SIGKILL"); killErr != nil { - s.logger.Error("Failed to kill timed out container", + s.logger.Error("Failed to kill timed out container", zap.String("container_id", containerID), zap.Error(killErr)) } @@ -141,21 +262,67 @@ func (s *SimpleDockerRuntime) Execute(ctx context.Context, function *domain.Func }() } + // Collect all streamed logs var logs []string - var stats *container.InspectResponse - - // For timed-out containers, skip log retrieval and inspection to return quickly - if timedOut { - logs = []string{"Container execution timed out"} - } else { - // Get container logs - var err error - logs, err = s.getContainerLogs(ctx, containerID) - if err != nil { - s.logger.Warn("Failed to get container logs", zap.Error(err)) - logs = []string{"Failed to retrieve logs"} + if !timedOut { + // Collect any remaining logs from the channel + close(doneChan) // Stop log streaming + + // Give a moment for final logs to be processed + time.Sleep(100 * time.Millisecond) + + if logCallback == nil { + // If no callback, collect all logs at the end + for log := range logChan { + logs = append(logs, log) + } + } else { + // If we have a callback, use the streamed logs plus any remaining in channel + logsMutex.Lock() + logs = make([]string, len(streamedLogs)) + copy(logs, streamedLogs) + logsMutex.Unlock() + + // Collect any remaining logs in the channel + remainingLogs := make([]string, 0) + for { + select { + case log := <-logChan: + remainingLogs = append(remainingLogs, log) + default: + goto done + } + } + done: + logs = append(logs, remainingLogs...) } + } else { + logs = []string{"Container execution timed out"} + } + var stats *container.InspectResponse + + // For timed-out containers, still try to collect logs but with a short timeout + if timedOut { + // Collect any remaining logs from the channel before adding timeout message + // doneChan was already closed above + if logCallback == nil { + // If no callback was used, try to collect logs directly but with short timeout + logCtx, logCancel := context.WithTimeout(context.Background(), 2*time.Second) + finalLogs, err := s.getContainerLogs(logCtx, containerID) + logCancel() + if err == nil { + logs = finalLogs + } + } else { + // If callback was used, use the streamed logs + logsMutex.Lock() + logs = make([]string, len(streamedLogs)) + copy(logs, streamedLogs) + logsMutex.Unlock() + } + logs = append(logs, "Container execution timed out") + } else { // Get container stats statsResponse, err := s.client.ContainerInspect(ctx, containerID) if err != nil { @@ -418,7 +585,7 @@ func (s *SimpleDockerRuntime) getContainerLogs(ctx context.Context, containerID // Parse Docker logs to remove binary headers rawOutput := parseDockerLogs(logData) - + // Parse the XML-tagged output to extract logs parsedLogs, _, err := s.parseContainerOutput(rawOutput) if err != nil { @@ -467,7 +634,7 @@ func (s *SimpleDockerRuntime) getContainerOutput(ctx context.Context, containerI // Parse Docker logs to remove binary headers rawOutput := parseDockerLogs(logData) - + // Parse the XML-tagged output to extract the result _, result, err := s.parseContainerOutput(rawOutput) if err != nil { @@ -479,8 +646,8 @@ func (s *SimpleDockerRuntime) getContainerOutput(ctx context.Context, containerI } else { // Return the output wrapped in a JSON object fallbackResult := map[string]interface{}{ - "result": "Function executed successfully", - "output": logContent, + "result": "Function executed successfully", + "output": logContent, "timestamp": time.Now().UTC(), } resultJSON, _ := json.Marshal(fallbackResult) @@ -491,8 +658,8 @@ func (s *SimpleDockerRuntime) getContainerOutput(ctx context.Context, containerI // If no result was found in XML tags, provide a default success result if result == nil { defaultResult := map[string]interface{}{ - "result": "Function executed successfully", - "message": "No result output found", + "result": "Function executed successfully", + "message": "No result output found", "timestamp": time.Now().UTC(), } resultJSON, _ := json.Marshal(defaultResult) @@ -505,7 +672,7 @@ func (s *SimpleDockerRuntime) getContainerOutput(ctx context.Context, containerI // parseDockerLogs parses Docker log output which includes 8-byte headers func parseDockerLogs(logData []byte) string { var cleanOutput strings.Builder - + for len(logData) > 8 { // Docker log header: [STREAM_TYPE, 0, 0, 0, SIZE1, SIZE2, SIZE3, SIZE4] // Skip the first 8 bytes (header) @@ -513,25 +680,25 @@ func parseDockerLogs(logData []byte) string { if len(logData) < headerSize { break } - + // Extract size from bytes 4-7 (big endian) size := int(logData[4])<<24 + int(logData[5])<<16 + int(logData[6])<<8 + int(logData[7]) - + if len(logData) < headerSize+size { // If the remaining data is less than expected size, take what we have size = len(logData) - headerSize } - + if size > 0 { // Extract the actual log content content := string(logData[headerSize : headerSize+size]) cleanOutput.WriteString(content) } - + // Move to next log entry logData = logData[headerSize+size:] } - + return cleanOutput.String() } @@ -581,7 +748,7 @@ func (s *SimpleDockerRuntime) parseContainerOutput(rawOutput string) (logs []str // Remove any XML tags from the output for fallback cleanOutput := regexp.MustCompile(`(?s)<[^>]*>`).ReplaceAllString(rawOutput, "") cleanOutput = strings.TrimSpace(cleanOutput) - + if cleanOutput != "" { if json.Valid([]byte(cleanOutput)) { result = json.RawMessage(cleanOutput) @@ -599,6 +766,130 @@ func (s *SimpleDockerRuntime) parseContainerOutput(rawOutput string) (logs []str return logs, result, nil } +// streamContainerLogs streams logs from a running container and sends them to a channel +func (s *SimpleDockerRuntime) streamContainerLogs(ctx context.Context, containerID string, logChan chan<- string, doneChan <-chan struct{}) { + defer close(logChan) + + s.logger.Info("Starting container log streaming", + zap.String("container_id", containerID)) + + // Get container logs with follow option + logs, err := s.client.ContainerLogs(ctx, containerID, container.LogsOptions{ + ShowStdout: true, + ShowStderr: true, + Follow: true, + Timestamps: false, + }) + if err != nil { + s.logger.Error("Failed to get container logs for streaming", + zap.String("container_id", containerID), + zap.Error(err)) + return + } + defer logs.Close() + + s.logger.Debug("Successfully got container logs stream", + zap.String("container_id", containerID)) + + // Create a context that cancels when doneChan receives a signal + streamCtx, cancel := context.WithCancel(ctx) + defer cancel() + + // Goroutine to listen for done signal + go func() { + select { + case <-doneChan: + cancel() + case <-streamCtx.Done(): + } + }() + + // Buffer for reading log data + buf := make([]byte, 4096) + + // Continue reading until context is cancelled or EOF + totalLogLines := 0 + for { + select { + case <-streamCtx.Done(): + s.logger.Debug("Stream context cancelled, stopping log streaming", + zap.String("container_id", containerID), + zap.Int("total_lines_streamed", totalLogLines)) + return + default: + n, err := logs.Read(buf) + if n > 0 { + s.logger.Debug("Read log data from container", + zap.String("container_id", containerID), + zap.Int("bytes_read", n)) + + // Parse Docker logs to remove binary headers + logData := buf[:n] + rawOutput := parseDockerLogs(logData) + + // Send each line to the log channel, filtering out XML tags + lines := strings.Split(rawOutput, "\n") + for _, line := range lines { + trimmedLine := strings.TrimSpace(line) + // Skip empty lines and XML tags + if trimmedLine != "" && + !strings.HasPrefix(trimmedLine, "") && + !strings.HasPrefix(trimmedLine, "") && + !strings.HasPrefix(trimmedLine, "") && + !strings.HasPrefix(trimmedLine, "") && + trimmedLine != "" && + trimmedLine != "" && + trimmedLine != "" && + trimmedLine != "" { + + totalLogLines++ + s.logger.Debug("Sending filtered log line to channel", + zap.String("container_id", containerID), + zap.String("log_line", trimmedLine), + zap.Int("total_lines", totalLogLines)) + + select { + case logChan <- trimmedLine: + s.logger.Debug("Successfully sent filtered log line to channel", + zap.String("container_id", containerID)) + case <-streamCtx.Done(): + s.logger.Debug("Stream context cancelled while sending log line", + zap.String("container_id", containerID)) + return + default: + // Log buffer is full, warn but continue reading to avoid blocking + s.logger.Warn("Log buffer full, dropping log line", + zap.String("container_id", containerID), + zap.String("dropped_line", trimmedLine)) + } + } else if trimmedLine != "" { + s.logger.Debug("Filtered out XML tag", + zap.String("container_id", containerID), + zap.String("filtered_line", trimmedLine)) + } + } + } + + if err != nil { + if err == io.EOF { + s.logger.Debug("Got EOF from container logs, container might still be running", + zap.String("container_id", containerID), + zap.Int("total_lines_streamed", totalLogLines)) + // Container might still be running, continue reading + time.Sleep(100 * time.Millisecond) + continue + } else { + s.logger.Error("Error reading container logs", + zap.String("container_id", containerID), + zap.Error(err), + zap.Int("total_lines_streamed", totalLogLines)) + return + } + } + } + } +} + func (s *SimpleDockerRuntime) cleanupContainer(ctx context.Context, containerID string) { // Remove container if err := s.client.ContainerRemove(ctx, containerID, container.RemoveOptions{ diff --git a/faas/internal/runtime/interfaces.go b/faas/internal/runtime/interfaces.go index 9ff6ab8..6f8446f 100644 --- a/faas/internal/runtime/interfaces.go +++ b/faas/internal/runtime/interfaces.go @@ -8,20 +8,26 @@ import ( "github.com/google/uuid" ) +// LogStreamCallback is a function that can be called to stream logs during execution +type LogStreamCallback func(logs []string) error + // RuntimeBackend provides function execution capabilities type RuntimeBackend interface { // Execute runs a function with given input Execute(ctx context.Context, function *domain.FunctionDefinition, input json.RawMessage) (*domain.ExecutionResult, error) - + + // ExecuteWithLogStreaming runs a function with given input and streams logs during execution + ExecuteWithLogStreaming(ctx context.Context, function *domain.FunctionDefinition, input json.RawMessage, logCallback LogStreamCallback) (*domain.ExecutionResult, error) + // Deploy prepares function for execution Deploy(ctx context.Context, function *domain.FunctionDefinition) error - + // Remove cleans up function resources Remove(ctx context.Context, functionID uuid.UUID) error - + // GetLogs retrieves execution logs GetLogs(ctx context.Context, executionID uuid.UUID) ([]string, error) - + // HealthCheck verifies runtime availability HealthCheck(ctx context.Context) error @@ -37,11 +43,11 @@ type RuntimeBackend interface { // RuntimeInfo contains runtime backend information type RuntimeInfo struct { - Type string `json:"type"` - Version string `json:"version"` - Available bool `json:"available"` - Endpoint string `json:"endpoint,omitempty"` - Metadata map[string]string `json:"metadata,omitempty"` + Type string `json:"type"` + Version string `json:"version"` + Available bool `json:"available"` + Endpoint string `json:"endpoint,omitempty"` + Metadata map[string]string `json:"metadata,omitempty"` } // ContainerInfo contains information about a running container @@ -59,4 +65,4 @@ type RuntimeFactory interface { CreateRuntime(ctx context.Context, runtimeType string, config map[string]interface{}) (RuntimeBackend, error) GetSupportedRuntimes() []string GetDefaultConfig(runtimeType string) map[string]interface{} -} \ No newline at end of file +} diff --git a/faas/internal/services/execution_service.go b/faas/internal/services/execution_service.go index 46b0d0d..6fc8ecc 100644 --- a/faas/internal/services/execution_service.go +++ b/faas/internal/services/execution_service.go @@ -11,6 +11,7 @@ import ( "github.com/RyanCopley/skybridge/faas/internal/domain" "github.com/RyanCopley/skybridge/faas/internal/repository" + "github.com/RyanCopley/skybridge/faas/internal/runtime" "github.com/google/uuid" ) @@ -42,13 +43,13 @@ func (s *executionService) Execute(ctx context.Context, req *domain.ExecuteFunct return nil, fmt.Errorf("function not found: %w", err) } - // Create execution record + // Create execution record // Initialize input with empty JSON if nil or empty input := req.Input if input == nil || len(input) == 0 { input = json.RawMessage(`{}`) } - + execution := &domain.FunctionExecution{ ID: uuid.New(), FunctionID: req.FunctionID, @@ -112,8 +113,53 @@ func (s *executionService) executeSync(ctx context.Context, execution *domain.Fu defer cancel() } - // Execute function - result, err := backend.Execute(execCtx, function, execution.Input) + // Define log streaming callback + logCallback := func(logs []string) error { + s.logger.Info("Log streaming callback called", + zap.String("execution_id", execution.ID.String()), + zap.Int("log_count", len(logs)), + zap.Strings("logs_preview", logs)) + + // Update execution with current logs using background context + // to ensure updates continue even after HTTP request completes + // Create a copy of the execution to avoid race conditions + execCopy := *execution + execCopy.Logs = logs + _, err := s.executionRepo.Update(context.Background(), execution.ID, &execCopy) + if err == nil { + // Only update the original if database update succeeds + execution.Logs = logs + s.logger.Info("Successfully updated execution with logs in database", + zap.String("execution_id", execution.ID.String()), + zap.Int("log_count", len(logs))) + } else { + s.logger.Error("Failed to update execution with logs in database", + zap.String("execution_id", execution.ID.String()), + zap.Error(err)) + } + return err + } + + // Check if backend supports log streaming + type logStreamingBackend interface { + ExecuteWithLogStreaming(ctx context.Context, function *domain.FunctionDefinition, input json.RawMessage, logCallback runtime.LogStreamCallback) (*domain.ExecutionResult, error) + } + + var result *domain.ExecutionResult + if lsBackend, ok := backend.(logStreamingBackend); ok { + s.logger.Info("Backend supports log streaming, using ExecuteWithLogStreaming", + zap.String("execution_id", execution.ID.String()), + zap.String("function_id", function.ID.String())) + // Execute function with log streaming + result, err = lsBackend.ExecuteWithLogStreaming(execCtx, function, execution.Input, logCallback) + } else { + s.logger.Info("Backend does not support log streaming, using regular Execute", + zap.String("execution_id", execution.ID.String()), + zap.String("function_id", function.ID.String())) + // Fallback to regular execute + result, err = backend.Execute(execCtx, function, execution.Input) + } + if err != nil { // Check if this was a timeout error if execCtx.Err() == context.DeadlineExceeded { @@ -194,8 +240,53 @@ func (s *executionService) executeAsync(ctx context.Context, execution *domain.F defer cancel() } - // Execute function - result, err := backend.Execute(execCtx, function, execution.Input) + // Define log streaming callback + logCallback := func(logs []string) error { + s.logger.Info("Log streaming callback called", + zap.String("execution_id", execution.ID.String()), + zap.Int("log_count", len(logs)), + zap.Strings("logs_preview", logs)) + + // Update execution with current logs using background context + // to ensure updates continue even after HTTP request completes + // Create a copy of the execution to avoid race conditions + execCopy := *execution + execCopy.Logs = logs + _, err := s.executionRepo.Update(context.Background(), execution.ID, &execCopy) + if err == nil { + // Only update the original if database update succeeds + execution.Logs = logs + s.logger.Info("Successfully updated execution with logs in database", + zap.String("execution_id", execution.ID.String()), + zap.Int("log_count", len(logs))) + } else { + s.logger.Error("Failed to update execution with logs in database", + zap.String("execution_id", execution.ID.String()), + zap.Error(err)) + } + return err + } + + // Check if backend supports log streaming + type logStreamingBackend interface { + ExecuteWithLogStreaming(ctx context.Context, function *domain.FunctionDefinition, input json.RawMessage, logCallback runtime.LogStreamCallback) (*domain.ExecutionResult, error) + } + + var result *domain.ExecutionResult + if lsBackend, ok := backend.(logStreamingBackend); ok { + s.logger.Info("Backend supports log streaming, using ExecuteWithLogStreaming", + zap.String("execution_id", execution.ID.String()), + zap.String("function_id", function.ID.String())) + // Execute function with log streaming + result, err = lsBackend.ExecuteWithLogStreaming(execCtx, function, execution.Input, logCallback) + } else { + s.logger.Info("Backend does not support log streaming, using regular Execute", + zap.String("execution_id", execution.ID.String()), + zap.String("function_id", function.ID.String())) + // Fallback to regular execute + result, err = backend.Execute(execCtx, function, execution.Input) + } + if err != nil { // Check if this was a timeout error if execCtx.Err() == context.DeadlineExceeded { @@ -329,17 +420,35 @@ func (s *executionService) Cancel(ctx context.Context, id uuid.UUID, userID stri } func (s *executionService) GetLogs(ctx context.Context, id uuid.UUID) ([]string, error) { + s.logger.Debug("GetLogs called in execution service", + zap.String("execution_id", id.String())) + // Get execution with logs from database execution, err := s.executionRepo.GetByID(ctx, id) if err != nil { + s.logger.Error("Failed to get execution from database in GetLogs", + zap.String("execution_id", id.String()), + zap.Error(err)) return nil, fmt.Errorf("execution not found: %w", err) } + s.logger.Info("Retrieved execution from database", + zap.String("execution_id", id.String()), + zap.String("status", string(execution.Status)), + zap.Int("log_count", len(execution.Logs)), + zap.Bool("logs_nil", execution.Logs == nil)) + // Return logs from execution record if execution.Logs == nil { + s.logger.Debug("Execution has nil logs, returning empty slice", + zap.String("execution_id", id.String())) return []string{}, nil } - + + s.logger.Debug("Returning logs from execution", + zap.String("execution_id", id.String()), + zap.Int("log_count", len(execution.Logs))) + return execution.Logs, nil } diff --git a/faas/test_functions/streaming_logs.js b/faas/test_functions/streaming_logs.js new file mode 100644 index 0000000..98459ad --- /dev/null +++ b/faas/test_functions/streaming_logs.js @@ -0,0 +1,16 @@ +module.exports.handler = async (input, context) => { + console.log("Starting function execution"); + + for (let i = 1; i <= 10; i++) { + console.log(`Processing step ${i}`); + // Wait 1 second between log outputs + await new Promise(resolve => setTimeout(resolve, 1000)); + } + + console.log("Function execution completed"); + + return { + message: "Function executed successfully", + steps: 10 + }; +}; diff --git a/faas/test_functions/streaming_logs.py b/faas/test_functions/streaming_logs.py new file mode 100644 index 0000000..8e51fca --- /dev/null +++ b/faas/test_functions/streaming_logs.py @@ -0,0 +1,16 @@ +import time + +def handler(input, context): + print("Starting function execution") + + for i in range(1, 11): + print(f"Processing step {i}") + # Wait 1 second between log outputs + time.sleep(1) + + print("Function execution completed") + + return { + "message": "Function executed successfully", + "steps": 10 + } diff --git a/faas/web/src/components/ExecutionModal.tsx b/faas/web/src/components/ExecutionModal.tsx index 473cc79..02ac207 100644 --- a/faas/web/src/components/ExecutionModal.tsx +++ b/faas/web/src/components/ExecutionModal.tsx @@ -156,17 +156,24 @@ export const ExecutionModal: React.FC = ({ const loadLogs = async (executionId: string) => { try { + console.debug(`[ExecutionModal] Loading logs for execution ${executionId}`); setLoadingLogs(true); const response = await executionApi.getLogs(executionId); + console.debug(`[ExecutionModal] Loaded logs for execution ${executionId}:`, { + logCount: response.data.logs?.length || 0, + logs: response.data.logs + }); setLogs(response.data.logs || []); } catch (error) { - console.error('Error loading logs:', error); + console.error(`[ExecutionModal] Error loading logs for execution ${executionId}:`, error); } finally { setLoadingLogs(false); } }; const startLogsAutoRefresh = (executionId: string) => { + console.debug(`[ExecutionModal] Starting auto-refresh for execution ${executionId}`); + // Clear any existing interval if (logsPollIntervalRef.current) { clearInterval(logsPollIntervalRef.current); @@ -180,10 +187,15 @@ export const ExecutionModal: React.FC = ({ // Set up auto-refresh every 2 seconds logsPollIntervalRef.current = setInterval(async () => { try { + console.debug(`[ExecutionModal] Auto-refreshing logs for execution ${executionId}`); const response = await executionApi.getLogs(executionId); + console.debug(`[ExecutionModal] Auto-refresh got logs for execution ${executionId}:`, { + logCount: response.data.logs?.length || 0, + logs: response.data.logs + }); setLogs(response.data.logs || []); } catch (error) { - console.error('Error auto-refreshing logs:', error); + console.error(`[ExecutionModal] Error auto-refreshing logs for execution ${executionId}:`, error); } }, 2000); }; diff --git a/faas/web/src/services/apiService.ts b/faas/web/src/services/apiService.ts index e12cfff..1006a8a 100644 --- a/faas/web/src/services/apiService.ts +++ b/faas/web/src/services/apiService.ts @@ -75,8 +75,21 @@ export const executionApi = { cancel: (id: string) => api.delete(`/executions/${id}`), - getLogs: (id: string) => - api.get<{ logs: string[] }>(`/executions/${id}/logs`), + getLogs: (id: string) => { + console.debug(`[API] Fetching logs for execution ${id}`); + return api.get<{ logs: string[] }>(`/executions/${id}/logs`) + .then(response => { + console.debug(`[API] Successfully fetched logs for execution ${id}:`, { + logCount: response.data.logs?.length || 0, + logs: response.data.logs + }); + return response; + }) + .catch(error => { + console.error(`[API] Failed to fetch logs for execution ${id}:`, error); + throw error; + }); + }, getRunning: () => api.get<{ executions: FunctionExecution[]; count: number }>('/executions/running'),