package docker import ( "context" "encoding/json" "fmt" "io" "regexp" "strings" "time" "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/image" "github.com/docker/docker/client" "github.com/google/uuid" "go.uber.org/zap" "github.com/RyanCopley/skybridge/faas/internal/domain" "github.com/RyanCopley/skybridge/faas/internal/runtime" ) type SimpleDockerRuntime struct { logger *zap.Logger client *client.Client } func NewSimpleDockerRuntime(logger *zap.Logger) (*SimpleDockerRuntime, error) { var cli *client.Client var err error // Try different socket paths with ping test socketPaths := []string{ "unix:///run/user/1000/podman/podman.sock", // Podman socket (mounted from host) "unix:///var/run/docker.sock", // Standard Docker socket } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() for _, socketPath := range socketPaths { logger.Info("Attempting to connect to socket", zap.String("path", socketPath)) cli, err = client.NewClientWithOpts( client.WithHost(socketPath), client.WithAPIVersionNegotiation(), ) if err != nil { logger.Warn("Failed to create client", zap.String("path", socketPath), zap.Error(err)) continue } // Test connection if _, err := cli.Ping(ctx); err != nil { logger.Warn("Failed to ping daemon", zap.String("path", socketPath), zap.Error(err)) continue } logger.Info("Successfully connected to Docker/Podman", zap.String("path", socketPath)) break } // Final fallback to environment if cli == nil { logger.Info("Trying default Docker environment") cli, err = client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) if err != nil { return nil, fmt.Errorf("failed to create Docker client: %w", err) } if _, err := cli.Ping(ctx); err != nil { return nil, fmt.Errorf("failed to ping Docker/Podman daemon: %w", err) } } if cli == nil { return nil, fmt.Errorf("no working Docker/Podman socket found") } return &SimpleDockerRuntime{ logger: logger, client: cli, }, nil } func (s *SimpleDockerRuntime) Execute(ctx context.Context, function *domain.FunctionDefinition, input json.RawMessage) (*domain.ExecutionResult, error) { startTime := time.Now() // Create container containerID, err := s.createContainer(ctx, function, input) if err != nil { return nil, fmt.Errorf("failed to create container: %w", err) } // Start container if err := s.client.ContainerStart(ctx, containerID, container.StartOptions{}); err != nil { s.cleanupContainer(ctx, containerID) return nil, fmt.Errorf("failed to start container: %w", err) } // Create timeout context based on function timeout var timeoutCtx context.Context var cancel context.CancelFunc if function.Timeout.Duration > 0 { timeoutCtx, cancel = context.WithTimeout(ctx, function.Timeout.Duration) defer cancel() } else { timeoutCtx = ctx } // Wait for container to finish with timeout statusCh, errCh := s.client.ContainerWait(timeoutCtx, containerID, container.WaitConditionNotRunning) var timedOut bool select { case err := <-errCh: s.cleanupContainer(ctx, containerID) return nil, fmt.Errorf("error waiting for container: %w", err) case <-statusCh: // Container finished normally case <-timeoutCtx.Done(): // Timeout occurred timedOut = true // Stop the container in the background - don't wait for it to complete go func() { // Use a very short timeout for stopping, then kill if needed if err := s.client.ContainerStop(context.Background(), containerID, container.StopOptions{ Timeout: &[]int{1}[0], // Only 1 second grace period for stop }); err != nil { s.logger.Warn("Failed to stop timed out container gracefully, attempting to kill", zap.String("container_id", containerID), zap.Error(err)) // If stop fails, try to kill it immediately if killErr := s.client.ContainerKill(context.Background(), containerID, "SIGKILL"); killErr != nil { s.logger.Error("Failed to kill timed out container", zap.String("container_id", containerID), zap.Error(killErr)) } } }() } var logs []string var stats *container.InspectResponse // For timed-out containers, skip log retrieval and inspection to return quickly if timedOut { logs = []string{"Container execution timed out"} } else { // Get container logs var err error logs, err = s.getContainerLogs(ctx, containerID) if err != nil { s.logger.Warn("Failed to get container logs", zap.Error(err)) logs = []string{"Failed to retrieve logs"} } // Get container stats statsResponse, err := s.client.ContainerInspect(ctx, containerID) if err != nil { s.logger.Warn("Failed to inspect container", zap.Error(err)) } else { stats = &statsResponse } } // Get execution result result := &domain.ExecutionResult{ Logs: logs, Duration: time.Since(startTime).Truncate(time.Millisecond), } // Handle timeout case if timedOut { result.Error = fmt.Sprintf("Function execution timed out after %v", function.Timeout.Duration) result.Output = json.RawMessage(`{"error": "Function execution timed out"}`) } else { // Try to get output from container for successful executions if stats.State != nil { if stats.State.ExitCode == 0 { // Try to get output from container output, err := s.getContainerOutput(ctx, containerID) if err != nil { s.logger.Warn("Failed to get container output", zap.Error(err)) result.Output = json.RawMessage(`{"error": "Failed to retrieve output"}`) } else { result.Output = output } } else { result.Error = fmt.Sprintf("Container exited with code %d", stats.State.ExitCode) result.Output = json.RawMessage(`{"error": "Container execution failed"}`) } } else { s.logger.Warn("Container state not available") } } // Cleanup container - for timed-out containers, do this in background if timedOut { go func() { s.cleanupContainer(context.Background(), containerID) }() } else { s.cleanupContainer(ctx, containerID) } return result, nil } func (s *SimpleDockerRuntime) Deploy(ctx context.Context, function *domain.FunctionDefinition) error { s.logger.Info("Deploying function image", zap.String("function_id", function.ID.String()), zap.String("image", function.Image)) // Pull the image if it doesn't exist _, _, err := s.client.ImageInspectWithRaw(ctx, function.Image) if err != nil { // Image doesn't exist, try to pull it s.logger.Info("Pulling image", zap.String("image", function.Image)) reader, err := s.client.ImagePull(ctx, function.Image, image.PullOptions{}) if err != nil { return fmt.Errorf("failed to pull image %s: %w", function.Image, err) } defer reader.Close() // Wait for pull to complete (we could parse the output but for now we'll just wait) buf := make([]byte, 1024) for { _, err := reader.Read(buf) if err != nil { break } } } return nil } func (s *SimpleDockerRuntime) Remove(ctx context.Context, functionID uuid.UUID) error { s.logger.Info("Removing function resources", zap.String("function_id", functionID.String())) // In a real implementation, we would remove any function-specific resources // For now, we don't need to do anything as containers are cleaned up after execution return nil } func (s *SimpleDockerRuntime) GetLogs(ctx context.Context, executionID uuid.UUID) ([]string, error) { // In a real implementation, we would need to store container IDs associated with execution IDs // For now, we'll return a placeholder return []string{ "Function execution logs would appear here", "In a full implementation, these would be retrieved from the Docker container", }, nil } func (s *SimpleDockerRuntime) HealthCheck(ctx context.Context) error { _, err := s.client.Ping(ctx) return err } func (s *SimpleDockerRuntime) GetInfo(ctx context.Context) (*runtime.RuntimeInfo, error) { info, err := s.client.Info(ctx) if err != nil { return nil, fmt.Errorf("failed to get Docker info: %w", err) } return &runtime.RuntimeInfo{ Type: "docker", Version: info.ServerVersion, Available: true, Endpoint: s.client.DaemonHost(), Metadata: map[string]string{ "containers": fmt.Sprintf("%d", info.Containers), "images": fmt.Sprintf("%d", info.Images), "docker_root_dir": info.DockerRootDir, }, }, nil } func (s *SimpleDockerRuntime) ListContainers(ctx context.Context) ([]runtime.ContainerInfo, error) { containers, err := s.client.ContainerList(ctx, container.ListOptions{}) if err != nil { return nil, fmt.Errorf("failed to list containers: %w", err) } var containerInfos []runtime.ContainerInfo for _, c := range containers { containerInfo := runtime.ContainerInfo{ ID: c.ID, Status: c.State, Image: c.Image, } if len(c.Names) > 0 { containerInfo.ID = c.Names[0] } containerInfos = append(containerInfos, containerInfo) } return containerInfos, nil } func (s *SimpleDockerRuntime) StopExecution(ctx context.Context, executionID uuid.UUID) error { s.logger.Info("Stopping execution", zap.String("execution_id", executionID.String())) // In a real implementation, we would need to map execution IDs to container IDs // For now, we'll just log that this was called return nil } // Helper methods func (s *SimpleDockerRuntime) createContainer(ctx context.Context, function *domain.FunctionDefinition, input json.RawMessage) (string, error) { // Prepare environment variables env := []string{} for key, value := range function.Environment { env = append(env, fmt.Sprintf("%s=%s", key, value)) } // Add input as environment variable inputStr := string(input) if inputStr != "" { env = append(env, fmt.Sprintf("FUNCTION_INPUT=%s", inputStr)) } // Add function code as environment variable for dynamic languages env = append(env, fmt.Sprintf("FUNCTION_CODE=%s", function.Code)) env = append(env, fmt.Sprintf("FUNCTION_HANDLER=%s", function.Handler)) // Create container config with proper command for runtime config := &container.Config{ Image: function.Image, Env: env, AttachStdout: true, AttachStderr: true, } // Set command based on runtime switch function.Runtime { case "nodejs", "nodejs18", "nodejs20": config.Cmd = []string{"sh", "-c", ` echo "$FUNCTION_CODE" > /tmp/index.js && echo "const handler = require('/tmp/index.js').handler; const input = process.env.FUNCTION_INPUT ? JSON.parse(process.env.FUNCTION_INPUT) : {}; const context = { functionName: '` + function.Name + `' }; console.log(''); handler(input, context).then(result => { console.log(''); console.log('' + JSON.stringify(result) + ''); }).catch(err => { console.log(''); console.error('{\"error\": \"' + err.message + '\"}'); process.exit(1); });" > /tmp/runner.js && node /tmp/runner.js `} case "python", "python3", "python3.9", "python3.10", "python3.11": config.Cmd = []string{"sh", "-c", ` echo "$FUNCTION_CODE" > /tmp/handler.py && echo "import json, os, sys; sys.path.insert(0, '/tmp'); from handler import handler; input_data = json.loads(os.environ.get('FUNCTION_INPUT', '{}')); context = {'function_name': '` + function.Name + `'}; print(''); try: result = handler(input_data, context); print(''); print('' + json.dumps(result) + ''); except Exception as e: print(''); print('{\"error\": \"' + str(e) + '\"}', file=sys.stderr); sys.exit(1);" > /tmp/runner.py && python /tmp/runner.py `} default: // For other runtimes, assume they handle execution themselves // This is for pre-built container images } // Create host config with resource limits hostConfig := &container.HostConfig{ Resources: container.Resources{ Memory: int64(function.Memory) * 1024 * 1024, // Convert MB to bytes }, } // Apply timeout if set if function.Timeout.Duration > 0 { // Docker doesn't have a direct timeout, but we can set a reasonable upper limit // In a production system, you'd want to implement actual timeout handling hostConfig.Resources.NanoCPUs = 1000000000 // 1 CPU } resp, err := s.client.ContainerCreate(ctx, config, hostConfig, nil, nil, "") if err != nil { return "", fmt.Errorf("failed to create container: %w", err) } return resp.ID, nil } func (s *SimpleDockerRuntime) getContainerLogs(ctx context.Context, containerID string) ([]string, error) { // Get container logs logs, err := s.client.ContainerLogs(ctx, containerID, container.LogsOptions{ ShowStdout: true, ShowStderr: true, Tail: "100", // Get last 100 lines }) if err != nil { return nil, fmt.Errorf("failed to get container logs: %w", err) } defer logs.Close() // Read the actual logs content logData, err := io.ReadAll(logs) if err != nil { return nil, fmt.Errorf("failed to read log data: %w", err) } // Parse Docker logs to remove binary headers rawOutput := parseDockerLogs(logData) // Parse the XML-tagged output to extract logs parsedLogs, _, err := s.parseContainerOutput(rawOutput) if err != nil { s.logger.Warn("Failed to parse container output for logs", zap.Error(err)) // Fallback to raw output split by lines lines := strings.Split(strings.TrimSpace(rawOutput), "\n") cleanLines := make([]string, 0, len(lines)) for _, line := range lines { if trimmed := strings.TrimSpace(line); trimmed != "" { cleanLines = append(cleanLines, trimmed) } } return cleanLines, nil } // If no logs were parsed from tags, fallback to basic parsing if len(parsedLogs) == 0 { lines := strings.Split(strings.TrimSpace(rawOutput), "\n") for _, line := range lines { if trimmed := strings.TrimSpace(line); trimmed != "" && !strings.Contains(trimmed, "") && !strings.Contains(trimmed, "") { parsedLogs = append(parsedLogs, trimmed) } } } return parsedLogs, nil } func (s *SimpleDockerRuntime) getContainerOutput(ctx context.Context, containerID string) (json.RawMessage, error) { // Get container logs as output logs, err := s.client.ContainerLogs(ctx, containerID, container.LogsOptions{ ShowStdout: true, ShowStderr: true, Tail: "100", // Get last 100 lines }) if err != nil { return nil, fmt.Errorf("failed to get container logs: %w", err) } defer logs.Close() // Read the actual logs content logData, err := io.ReadAll(logs) if err != nil { return nil, fmt.Errorf("failed to read log data: %w", err) } // Parse Docker logs to remove binary headers rawOutput := parseDockerLogs(logData) // Parse the XML-tagged output to extract the result _, result, err := s.parseContainerOutput(rawOutput) if err != nil { s.logger.Warn("Failed to parse container output for result", zap.Error(err)) // Fallback to legacy parsing logContent := strings.TrimSpace(rawOutput) if json.Valid([]byte(logContent)) && logContent != "" { return json.RawMessage(logContent), nil } else { // Return the output wrapped in a JSON object fallbackResult := map[string]interface{}{ "result": "Function executed successfully", "output": logContent, "timestamp": time.Now().UTC(), } resultJSON, _ := json.Marshal(fallbackResult) return json.RawMessage(resultJSON), nil } } // If no result was found in XML tags, provide a default success result if result == nil { defaultResult := map[string]interface{}{ "result": "Function executed successfully", "message": "No result output found", "timestamp": time.Now().UTC(), } resultJSON, _ := json.Marshal(defaultResult) return json.RawMessage(resultJSON), nil } return result, nil } // parseDockerLogs parses Docker log output which includes 8-byte headers func parseDockerLogs(logData []byte) string { var cleanOutput strings.Builder for len(logData) > 8 { // Docker log header: [STREAM_TYPE, 0, 0, 0, SIZE1, SIZE2, SIZE3, SIZE4] // Skip the first 8 bytes (header) headerSize := 8 if len(logData) < headerSize { break } // Extract size from bytes 4-7 (big endian) size := int(logData[4])<<24 + int(logData[5])<<16 + int(logData[6])<<8 + int(logData[7]) if len(logData) < headerSize+size { // If the remaining data is less than expected size, take what we have size = len(logData) - headerSize } if size > 0 { // Extract the actual log content content := string(logData[headerSize : headerSize+size]) cleanOutput.WriteString(content) } // Move to next log entry logData = logData[headerSize+size:] } return cleanOutput.String() } // parseContainerOutput parses container output that contains and XML tags func (s *SimpleDockerRuntime) parseContainerOutput(rawOutput string) (logs []string, result json.RawMessage, err error) { // Extract stdout content (logs) - use DOTALL flag for multiline matching stdoutRegex := regexp.MustCompile(`(?s)(.*?)`) stdoutMatch := stdoutRegex.FindStringSubmatch(rawOutput) if len(stdoutMatch) > 1 { stdoutContent := strings.TrimSpace(stdoutMatch[1]) if stdoutContent != "" { // Split stdout content into lines for logs lines := strings.Split(stdoutContent, "\n") // Clean up empty lines and trim whitespace cleanLogs := make([]string, 0, len(lines)) for _, line := range lines { if trimmed := strings.TrimSpace(line); trimmed != "" { cleanLogs = append(cleanLogs, trimmed) } } logs = cleanLogs } } // Extract result content - use DOTALL flag for multiline matching resultRegex := regexp.MustCompile(`(?s)(.*?)`) resultMatch := resultRegex.FindStringSubmatch(rawOutput) if len(resultMatch) > 1 { resultContent := strings.TrimSpace(resultMatch[1]) if resultContent != "" { // Validate JSON if json.Valid([]byte(resultContent)) { result = json.RawMessage(resultContent) } else { // If not valid JSON, wrap it wrappedResult := map[string]interface{}{ "output": resultContent, } resultJSON, _ := json.Marshal(wrappedResult) result = json.RawMessage(resultJSON) } } } // If no result tag found, treat entire output as result (fallback for non-tagged output) if result == nil { // Remove any XML tags from the output for fallback cleanOutput := regexp.MustCompile(`(?s)<[^>]*>`).ReplaceAllString(rawOutput, "") cleanOutput = strings.TrimSpace(cleanOutput) if cleanOutput != "" { if json.Valid([]byte(cleanOutput)) { result = json.RawMessage(cleanOutput) } else { // Wrap non-JSON output wrappedResult := map[string]interface{}{ "output": cleanOutput, } resultJSON, _ := json.Marshal(wrappedResult) result = json.RawMessage(resultJSON) } } } return logs, result, nil } func (s *SimpleDockerRuntime) cleanupContainer(ctx context.Context, containerID string) { // Remove container if err := s.client.ContainerRemove(ctx, containerID, container.RemoveOptions{ Force: true, }); err != nil { s.logger.Warn("Failed to remove container", zap.String("container_id", containerID), zap.Error(err)) } }