-
This commit is contained in:
@ -6,6 +6,7 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
|
"strings"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -126,10 +127,29 @@ func initLogger(cfg config.ConfigProvider) *zap.Logger {
|
|||||||
var logger *zap.Logger
|
var logger *zap.Logger
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
if cfg.IsProduction() {
|
logLevel := cfg.GetString("FAAS_LOG_LEVEL")
|
||||||
|
|
||||||
|
if cfg.IsProduction() && logLevel != "debug" {
|
||||||
logger, err = zap.NewProduction()
|
logger, err = zap.NewProduction()
|
||||||
} else {
|
} else {
|
||||||
logger, err = zap.NewDevelopment()
|
// Use development logger for non-production or when debug is explicitly requested
|
||||||
|
config := zap.NewDevelopmentConfig()
|
||||||
|
|
||||||
|
// Set log level based on environment variable
|
||||||
|
switch strings.ToLower(logLevel) {
|
||||||
|
case "debug":
|
||||||
|
config.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
|
||||||
|
case "info":
|
||||||
|
config.Level = zap.NewAtomicLevelAt(zap.InfoLevel)
|
||||||
|
case "warn":
|
||||||
|
config.Level = zap.NewAtomicLevelAt(zap.WarnLevel)
|
||||||
|
case "error":
|
||||||
|
config.Level = zap.NewAtomicLevelAt(zap.ErrorLevel)
|
||||||
|
default:
|
||||||
|
config.Level = zap.NewAtomicLevelAt(zap.DebugLevel) // Default to debug for development
|
||||||
|
}
|
||||||
|
|
||||||
|
logger, err = config.Build()
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
BIN
faas/faas-server
BIN
faas/faas-server
Binary file not shown.
@ -46,37 +46,37 @@ type Owner struct {
|
|||||||
|
|
||||||
// FunctionDefinition represents a serverless function
|
// FunctionDefinition represents a serverless function
|
||||||
type FunctionDefinition struct {
|
type FunctionDefinition struct {
|
||||||
ID uuid.UUID `json:"id" db:"id"`
|
ID uuid.UUID `json:"id" db:"id"`
|
||||||
Name string `json:"name" validate:"required,min=1,max=255" db:"name"`
|
Name string `json:"name" validate:"required,min=1,max=255" db:"name"`
|
||||||
AppID string `json:"app_id" validate:"required" db:"app_id"`
|
AppID string `json:"app_id" validate:"required" db:"app_id"`
|
||||||
Runtime RuntimeType `json:"runtime" validate:"required" db:"runtime"`
|
Runtime RuntimeType `json:"runtime" validate:"required" db:"runtime"`
|
||||||
Image string `json:"image" validate:"required" db:"image"`
|
Image string `json:"image" validate:"required" db:"image"`
|
||||||
Handler string `json:"handler" validate:"required" db:"handler"`
|
Handler string `json:"handler" validate:"required" db:"handler"`
|
||||||
Code string `json:"code,omitempty" db:"code"`
|
Code string `json:"code,omitempty" db:"code"`
|
||||||
Environment map[string]string `json:"environment,omitempty" db:"environment"`
|
Environment map[string]string `json:"environment,omitempty" db:"environment"`
|
||||||
Timeout Duration `json:"timeout" validate:"required" db:"timeout"`
|
Timeout Duration `json:"timeout" validate:"required" db:"timeout"`
|
||||||
Memory int `json:"memory" validate:"required,min=64,max=3008" db:"memory"`
|
Memory int `json:"memory" validate:"required,min=64,max=3008" db:"memory"`
|
||||||
Owner Owner `json:"owner" validate:"required"`
|
Owner Owner `json:"owner" validate:"required"`
|
||||||
CreatedAt time.Time `json:"created_at" db:"created_at"`
|
CreatedAt time.Time `json:"created_at" db:"created_at"`
|
||||||
UpdatedAt time.Time `json:"updated_at" db:"updated_at"`
|
UpdatedAt time.Time `json:"updated_at" db:"updated_at"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// FunctionExecution represents a function execution
|
// FunctionExecution represents a function execution
|
||||||
type FunctionExecution struct {
|
type FunctionExecution struct {
|
||||||
ID uuid.UUID `json:"id" db:"id"`
|
ID uuid.UUID `json:"id" db:"id"`
|
||||||
FunctionID uuid.UUID `json:"function_id" db:"function_id"`
|
FunctionID uuid.UUID `json:"function_id" db:"function_id"`
|
||||||
Status ExecutionStatus `json:"status" db:"status"`
|
Status ExecutionStatus `json:"status" db:"status"`
|
||||||
Input json.RawMessage `json:"input,omitempty" db:"input"`
|
Input json.RawMessage `json:"input,omitempty" db:"input"`
|
||||||
Output json.RawMessage `json:"output,omitempty" db:"output"`
|
Output json.RawMessage `json:"output,omitempty" db:"output"`
|
||||||
Error string `json:"error,omitempty" db:"error"`
|
Error string `json:"error,omitempty" db:"error"`
|
||||||
Duration time.Duration `json:"duration" db:"duration"`
|
Duration time.Duration `json:"duration" db:"duration"`
|
||||||
MemoryUsed int `json:"memory_used" db:"memory_used"`
|
MemoryUsed int `json:"memory_used" db:"memory_used"`
|
||||||
Logs []string `json:"logs,omitempty" db:"logs"`
|
Logs []string `json:"logs,omitempty" db:"logs"`
|
||||||
ContainerID string `json:"container_id,omitempty" db:"container_id"`
|
ContainerID string `json:"container_id,omitempty" db:"container_id"`
|
||||||
ExecutorID string `json:"executor_id" db:"executor_id"`
|
ExecutorID string `json:"executor_id" db:"executor_id"`
|
||||||
CreatedAt time.Time `json:"created_at" db:"created_at"`
|
CreatedAt time.Time `json:"created_at" db:"created_at"`
|
||||||
StartedAt *time.Time `json:"started_at,omitempty" db:"started_at"`
|
StartedAt *time.Time `json:"started_at,omitempty" db:"started_at"`
|
||||||
CompletedAt *time.Time `json:"completed_at,omitempty" db:"completed_at"`
|
CompletedAt *time.Time `json:"completed_at,omitempty" db:"completed_at"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateFunctionRequest represents a request to create a new function
|
// CreateFunctionRequest represents a request to create a new function
|
||||||
@ -115,12 +115,12 @@ type ExecuteFunctionRequest struct {
|
|||||||
|
|
||||||
// ExecuteFunctionResponse represents a response for function execution
|
// ExecuteFunctionResponse represents a response for function execution
|
||||||
type ExecuteFunctionResponse struct {
|
type ExecuteFunctionResponse struct {
|
||||||
ExecutionID uuid.UUID `json:"execution_id"`
|
ExecutionID uuid.UUID `json:"execution_id"`
|
||||||
Status ExecutionStatus `json:"status"`
|
Status ExecutionStatus `json:"status"`
|
||||||
Output json.RawMessage `json:"output,omitempty"`
|
Output json.RawMessage `json:"output,omitempty"`
|
||||||
Error string `json:"error,omitempty"`
|
Error string `json:"error,omitempty"`
|
||||||
Duration time.Duration `json:"duration,omitempty"`
|
Duration time.Duration `json:"duration,omitempty"`
|
||||||
MemoryUsed int `json:"memory_used,omitempty"`
|
MemoryUsed int `json:"memory_used,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeployFunctionRequest represents a request to deploy a function
|
// DeployFunctionRequest represents a request to deploy a function
|
||||||
@ -131,17 +131,17 @@ type DeployFunctionRequest struct {
|
|||||||
|
|
||||||
// DeployFunctionResponse represents a response for function deployment
|
// DeployFunctionResponse represents a response for function deployment
|
||||||
type DeployFunctionResponse struct {
|
type DeployFunctionResponse struct {
|
||||||
Status string `json:"status"`
|
Status string `json:"status"`
|
||||||
Message string `json:"message,omitempty"`
|
Message string `json:"message,omitempty"`
|
||||||
Image string `json:"image,omitempty"`
|
Image string `json:"image,omitempty"`
|
||||||
ImageID string `json:"image_id,omitempty"`
|
ImageID string `json:"image_id,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// RuntimeInfo represents runtime information
|
// RuntimeInfo represents runtime information
|
||||||
type RuntimeInfo struct {
|
type RuntimeInfo struct {
|
||||||
Type RuntimeType `json:"type"`
|
Type RuntimeType `json:"type"`
|
||||||
Version string `json:"version"`
|
Version string `json:"version"`
|
||||||
Available bool `json:"available"`
|
Available bool `json:"available"`
|
||||||
DefaultImage string `json:"default_image"`
|
DefaultImage string `json:"default_image"`
|
||||||
Description string `json:"description"`
|
Description string `json:"description"`
|
||||||
}
|
}
|
||||||
@ -161,4 +161,4 @@ type AuthContext struct {
|
|||||||
AppID string `json:"app_id"`
|
AppID string `json:"app_id"`
|
||||||
Permissions []string `json:"permissions"`
|
Permissions []string `json:"permissions"`
|
||||||
Claims map[string]string `json:"claims"`
|
Claims map[string]string `json:"claims"`
|
||||||
}
|
}
|
||||||
|
|||||||
@ -218,17 +218,29 @@ func (h *ExecutionHandler) Cancel(c *gin.Context) {
|
|||||||
|
|
||||||
func (h *ExecutionHandler) GetLogs(c *gin.Context) {
|
func (h *ExecutionHandler) GetLogs(c *gin.Context) {
|
||||||
idStr := c.Param("id")
|
idStr := c.Param("id")
|
||||||
|
h.logger.Debug("GetLogs endpoint called",
|
||||||
|
zap.String("execution_id", idStr),
|
||||||
|
zap.String("client_ip", c.ClientIP()))
|
||||||
|
|
||||||
id, err := uuid.Parse(idStr)
|
id, err := uuid.Parse(idStr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
h.logger.Warn("Invalid execution ID provided to GetLogs",
|
||||||
|
zap.String("id", idStr),
|
||||||
|
zap.Error(err))
|
||||||
c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid execution ID"})
|
c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid execution ID"})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if !h.authService.HasPermission(c.Request.Context(), "faas.read") {
|
if !h.authService.HasPermission(c.Request.Context(), "faas.read") {
|
||||||
|
h.logger.Warn("Insufficient permissions for GetLogs",
|
||||||
|
zap.String("execution_id", idStr))
|
||||||
c.JSON(http.StatusForbidden, gin.H{"error": "Insufficient permissions"})
|
c.JSON(http.StatusForbidden, gin.H{"error": "Insufficient permissions"})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
h.logger.Debug("Calling execution service GetLogs",
|
||||||
|
zap.String("execution_id", idStr))
|
||||||
|
|
||||||
logs, err := h.executionService.GetLogs(c.Request.Context(), id)
|
logs, err := h.executionService.GetLogs(c.Request.Context(), id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.logger.Error("Failed to get execution logs", zap.String("id", idStr), zap.Error(err))
|
h.logger.Error("Failed to get execution logs", zap.String("id", idStr), zap.Error(err))
|
||||||
@ -236,6 +248,10 @@ func (h *ExecutionHandler) GetLogs(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
h.logger.Debug("Successfully retrieved logs from execution service",
|
||||||
|
zap.String("execution_id", idStr),
|
||||||
|
zap.Int("log_count", len(logs)))
|
||||||
|
|
||||||
c.JSON(http.StatusOK, gin.H{
|
c.JSON(http.StatusOK, gin.H{
|
||||||
"logs": logs,
|
"logs": logs,
|
||||||
})
|
})
|
||||||
|
|||||||
@ -7,6 +7,7 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"regexp"
|
"regexp"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/docker/docker/api/types/container"
|
"github.com/docker/docker/api/types/container"
|
||||||
@ -27,19 +28,19 @@ type SimpleDockerRuntime struct {
|
|||||||
func NewSimpleDockerRuntime(logger *zap.Logger) (*SimpleDockerRuntime, error) {
|
func NewSimpleDockerRuntime(logger *zap.Logger) (*SimpleDockerRuntime, error) {
|
||||||
var cli *client.Client
|
var cli *client.Client
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
// Try different socket paths with ping test
|
// Try different socket paths with ping test
|
||||||
socketPaths := []string{
|
socketPaths := []string{
|
||||||
"unix:///run/user/1000/podman/podman.sock", // Podman socket (mounted from host)
|
"unix:///run/user/1000/podman/podman.sock", // Podman socket (mounted from host)
|
||||||
"unix:///var/run/docker.sock", // Standard Docker socket
|
"unix:///var/run/docker.sock", // Standard Docker socket
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
for _, socketPath := range socketPaths {
|
for _, socketPath := range socketPaths {
|
||||||
logger.Info("Attempting to connect to socket", zap.String("path", socketPath))
|
logger.Info("Attempting to connect to socket", zap.String("path", socketPath))
|
||||||
|
|
||||||
cli, err = client.NewClientWithOpts(
|
cli, err = client.NewClientWithOpts(
|
||||||
client.WithHost(socketPath),
|
client.WithHost(socketPath),
|
||||||
client.WithAPIVersionNegotiation(),
|
client.WithAPIVersionNegotiation(),
|
||||||
@ -54,11 +55,11 @@ func NewSimpleDockerRuntime(logger *zap.Logger) (*SimpleDockerRuntime, error) {
|
|||||||
logger.Warn("Failed to ping daemon", zap.String("path", socketPath), zap.Error(err))
|
logger.Warn("Failed to ping daemon", zap.String("path", socketPath), zap.Error(err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Info("Successfully connected to Docker/Podman", zap.String("path", socketPath))
|
logger.Info("Successfully connected to Docker/Podman", zap.String("path", socketPath))
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
// Final fallback to environment
|
// Final fallback to environment
|
||||||
if cli == nil {
|
if cli == nil {
|
||||||
logger.Info("Trying default Docker environment")
|
logger.Info("Trying default Docker environment")
|
||||||
@ -66,12 +67,12 @@ func NewSimpleDockerRuntime(logger *zap.Logger) (*SimpleDockerRuntime, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to create Docker client: %w", err)
|
return nil, fmt.Errorf("failed to create Docker client: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := cli.Ping(ctx); err != nil {
|
if _, err := cli.Ping(ctx); err != nil {
|
||||||
return nil, fmt.Errorf("failed to ping Docker/Podman daemon: %w", err)
|
return nil, fmt.Errorf("failed to ping Docker/Podman daemon: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if cli == nil {
|
if cli == nil {
|
||||||
return nil, fmt.Errorf("no working Docker/Podman socket found")
|
return nil, fmt.Errorf("no working Docker/Podman socket found")
|
||||||
}
|
}
|
||||||
@ -83,13 +84,26 @@ func NewSimpleDockerRuntime(logger *zap.Logger) (*SimpleDockerRuntime, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *SimpleDockerRuntime) Execute(ctx context.Context, function *domain.FunctionDefinition, input json.RawMessage) (*domain.ExecutionResult, error) {
|
func (s *SimpleDockerRuntime) Execute(ctx context.Context, function *domain.FunctionDefinition, input json.RawMessage) (*domain.ExecutionResult, error) {
|
||||||
|
return s.ExecuteWithLogStreaming(ctx, function, input, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SimpleDockerRuntime) ExecuteWithLogStreaming(ctx context.Context, function *domain.FunctionDefinition, input json.RawMessage, logCallback runtime.LogStreamCallback) (*domain.ExecutionResult, error) {
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
|
|
||||||
|
s.logger.Info("Starting ExecuteWithLogStreaming",
|
||||||
|
zap.String("function_id", function.ID.String()),
|
||||||
|
zap.String("function_name", function.Name),
|
||||||
|
zap.Bool("has_log_callback", logCallback != nil))
|
||||||
|
|
||||||
// Create container
|
// Create container
|
||||||
containerID, err := s.createContainer(ctx, function, input)
|
containerID, err := s.createContainer(ctx, function, input)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to create container: %w", err)
|
return nil, fmt.Errorf("failed to create container: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.logger.Debug("Container created successfully",
|
||||||
|
zap.String("container_id", containerID),
|
||||||
|
zap.String("function_id", function.ID.String()))
|
||||||
|
|
||||||
// Start container
|
// Start container
|
||||||
if err := s.client.ContainerStart(ctx, containerID, container.StartOptions{}); err != nil {
|
if err := s.client.ContainerStart(ctx, containerID, container.StartOptions{}); err != nil {
|
||||||
@ -97,23 +111,129 @@ func (s *SimpleDockerRuntime) Execute(ctx context.Context, function *domain.Func
|
|||||||
return nil, fmt.Errorf("failed to start container: %w", err)
|
return nil, fmt.Errorf("failed to start container: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Create channels for log streaming
|
||||||
|
logChan := make(chan string, 1000) // Buffer for logs
|
||||||
|
doneChan := make(chan struct{}) // Signal to stop streaming
|
||||||
|
|
||||||
|
// Start log streaming in a goroutine
|
||||||
|
s.logger.Debug("Starting log streaming goroutine",
|
||||||
|
zap.String("container_id", containerID),
|
||||||
|
zap.String("function_id", function.ID.String()))
|
||||||
|
go s.streamContainerLogs(context.Background(), containerID, logChan, doneChan)
|
||||||
|
|
||||||
// Create timeout context based on function timeout
|
// Create timeout context based on function timeout
|
||||||
var timeoutCtx context.Context
|
var timeoutCtx context.Context
|
||||||
var cancel context.CancelFunc
|
var cancel context.CancelFunc
|
||||||
if function.Timeout.Duration > 0 {
|
if function.Timeout.Duration > 0 {
|
||||||
timeoutCtx, cancel = context.WithTimeout(ctx, function.Timeout.Duration)
|
timeoutCtx, cancel = context.WithTimeout(ctx, function.Timeout.Duration)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
s.logger.Debug("Set execution timeout",
|
||||||
|
zap.Duration("timeout", function.Timeout.Duration),
|
||||||
|
zap.String("container_id", containerID))
|
||||||
} else {
|
} else {
|
||||||
timeoutCtx = ctx
|
timeoutCtx = ctx
|
||||||
|
s.logger.Debug("No execution timeout set",
|
||||||
|
zap.String("container_id", containerID))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// For streaming logs, collect logs in a separate goroutine and call the callback
|
||||||
|
var streamedLogs []string
|
||||||
|
logsMutex := &sync.Mutex{}
|
||||||
|
|
||||||
|
if logCallback != nil {
|
||||||
|
s.logger.Info("Starting log callback goroutine",
|
||||||
|
zap.String("container_id", containerID))
|
||||||
|
go func() {
|
||||||
|
// Keep track of the last time we called the callback to avoid too frequent updates
|
||||||
|
lastUpdate := time.Now()
|
||||||
|
ticker := time.NewTicker(1 * time.Second) // Update at most once per second
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case log, ok := <-logChan:
|
||||||
|
if !ok {
|
||||||
|
// Channel closed, exit the goroutine
|
||||||
|
s.logger.Debug("Log channel closed, exiting callback goroutine",
|
||||||
|
zap.String("container_id", containerID))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
s.logger.Debug("Received log line from channel",
|
||||||
|
zap.String("container_id", containerID),
|
||||||
|
zap.String("log_line", log))
|
||||||
|
|
||||||
|
logsMutex.Lock()
|
||||||
|
streamedLogs = append(streamedLogs, log)
|
||||||
|
shouldUpdate := time.Since(lastUpdate) >= 1*time.Second
|
||||||
|
currentLogCount := len(streamedLogs)
|
||||||
|
logsMutex.Unlock()
|
||||||
|
|
||||||
|
// Call the callback if it's been at least 1 second since last update
|
||||||
|
if shouldUpdate {
|
||||||
|
logsMutex.Lock()
|
||||||
|
logsCopy := make([]string, len(streamedLogs))
|
||||||
|
copy(logsCopy, streamedLogs)
|
||||||
|
logsMutex.Unlock()
|
||||||
|
|
||||||
|
s.logger.Info("Calling log callback with accumulated logs",
|
||||||
|
zap.String("container_id", containerID),
|
||||||
|
zap.Int("log_count", len(logsCopy)))
|
||||||
|
|
||||||
|
// Call the callback with the current logs
|
||||||
|
if err := logCallback(logsCopy); err != nil {
|
||||||
|
s.logger.Error("Failed to stream logs to callback",
|
||||||
|
zap.String("container_id", containerID),
|
||||||
|
zap.Error(err))
|
||||||
|
}
|
||||||
|
lastUpdate = time.Now()
|
||||||
|
} else {
|
||||||
|
s.logger.Debug("Skipping callback update (too frequent)",
|
||||||
|
zap.String("container_id", containerID),
|
||||||
|
zap.Int("current_log_count", currentLogCount),
|
||||||
|
zap.Duration("time_since_last_update", time.Since(lastUpdate)))
|
||||||
|
}
|
||||||
|
case <-ticker.C:
|
||||||
|
// Periodic update to ensure logs are streamed even if no new logs arrive
|
||||||
|
logsMutex.Lock()
|
||||||
|
if len(streamedLogs) > 0 && time.Since(lastUpdate) >= 1*time.Second {
|
||||||
|
logsCopy := make([]string, len(streamedLogs))
|
||||||
|
copy(logsCopy, streamedLogs)
|
||||||
|
logCount := len(logsCopy)
|
||||||
|
logsMutex.Unlock()
|
||||||
|
|
||||||
|
s.logger.Debug("Periodic callback update triggered",
|
||||||
|
zap.String("container_id", containerID),
|
||||||
|
zap.Int("log_count", logCount))
|
||||||
|
|
||||||
|
// Call the callback with the current logs
|
||||||
|
if err := logCallback(logsCopy); err != nil {
|
||||||
|
s.logger.Error("Failed to stream logs to callback (periodic)",
|
||||||
|
zap.String("container_id", containerID),
|
||||||
|
zap.Error(err))
|
||||||
|
}
|
||||||
|
lastUpdate = time.Now()
|
||||||
|
} else {
|
||||||
|
logsMutex.Unlock()
|
||||||
|
s.logger.Debug("Skipping periodic callback (no logs or too frequent)",
|
||||||
|
zap.String("container_id", containerID),
|
||||||
|
zap.Duration("time_since_last_update", time.Since(lastUpdate)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
} else {
|
||||||
|
s.logger.Debug("No log callback provided, logs will be collected at the end",
|
||||||
|
zap.String("container_id", containerID))
|
||||||
|
}
|
||||||
|
|
||||||
// Wait for container to finish with timeout
|
// Wait for container to finish with timeout
|
||||||
statusCh, errCh := s.client.ContainerWait(timeoutCtx, containerID, container.WaitConditionNotRunning)
|
statusCh, errCh := s.client.ContainerWait(timeoutCtx, containerID, container.WaitConditionNotRunning)
|
||||||
|
|
||||||
var timedOut bool
|
var timedOut bool
|
||||||
select {
|
select {
|
||||||
case err := <-errCh:
|
case err := <-errCh:
|
||||||
|
close(doneChan) // Stop log streaming
|
||||||
s.cleanupContainer(ctx, containerID)
|
s.cleanupContainer(ctx, containerID)
|
||||||
return nil, fmt.Errorf("error waiting for container: %w", err)
|
return nil, fmt.Errorf("error waiting for container: %w", err)
|
||||||
case <-statusCh:
|
case <-statusCh:
|
||||||
@ -121,19 +241,20 @@ func (s *SimpleDockerRuntime) Execute(ctx context.Context, function *domain.Func
|
|||||||
case <-timeoutCtx.Done():
|
case <-timeoutCtx.Done():
|
||||||
// Timeout occurred
|
// Timeout occurred
|
||||||
timedOut = true
|
timedOut = true
|
||||||
|
// doneChan will be closed below in the common cleanup
|
||||||
|
|
||||||
// Stop the container in the background - don't wait for it to complete
|
// Stop the container in the background - don't wait for it to complete
|
||||||
go func() {
|
go func() {
|
||||||
// Use a very short timeout for stopping, then kill if needed
|
// Use a very short timeout for stopping, then kill if needed
|
||||||
if err := s.client.ContainerStop(context.Background(), containerID, container.StopOptions{
|
if err := s.client.ContainerStop(context.Background(), containerID, container.StopOptions{
|
||||||
Timeout: &[]int{1}[0], // Only 1 second grace period for stop
|
Timeout: &[]int{1}[0], // Only 1 second grace period for stop
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
s.logger.Warn("Failed to stop timed out container gracefully, attempting to kill",
|
s.logger.Warn("Failed to stop timed out container gracefully, attempting to kill",
|
||||||
zap.String("container_id", containerID),
|
zap.String("container_id", containerID),
|
||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
// If stop fails, try to kill it immediately
|
// If stop fails, try to kill it immediately
|
||||||
if killErr := s.client.ContainerKill(context.Background(), containerID, "SIGKILL"); killErr != nil {
|
if killErr := s.client.ContainerKill(context.Background(), containerID, "SIGKILL"); killErr != nil {
|
||||||
s.logger.Error("Failed to kill timed out container",
|
s.logger.Error("Failed to kill timed out container",
|
||||||
zap.String("container_id", containerID),
|
zap.String("container_id", containerID),
|
||||||
zap.Error(killErr))
|
zap.Error(killErr))
|
||||||
}
|
}
|
||||||
@ -141,21 +262,67 @@ func (s *SimpleDockerRuntime) Execute(ctx context.Context, function *domain.Func
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Collect all streamed logs
|
||||||
var logs []string
|
var logs []string
|
||||||
var stats *container.InspectResponse
|
if !timedOut {
|
||||||
|
// Collect any remaining logs from the channel
|
||||||
// For timed-out containers, skip log retrieval and inspection to return quickly
|
close(doneChan) // Stop log streaming
|
||||||
if timedOut {
|
|
||||||
logs = []string{"Container execution timed out"}
|
// Give a moment for final logs to be processed
|
||||||
} else {
|
time.Sleep(100 * time.Millisecond)
|
||||||
// Get container logs
|
|
||||||
var err error
|
if logCallback == nil {
|
||||||
logs, err = s.getContainerLogs(ctx, containerID)
|
// If no callback, collect all logs at the end
|
||||||
if err != nil {
|
for log := range logChan {
|
||||||
s.logger.Warn("Failed to get container logs", zap.Error(err))
|
logs = append(logs, log)
|
||||||
logs = []string{"Failed to retrieve logs"}
|
}
|
||||||
|
} else {
|
||||||
|
// If we have a callback, use the streamed logs plus any remaining in channel
|
||||||
|
logsMutex.Lock()
|
||||||
|
logs = make([]string, len(streamedLogs))
|
||||||
|
copy(logs, streamedLogs)
|
||||||
|
logsMutex.Unlock()
|
||||||
|
|
||||||
|
// Collect any remaining logs in the channel
|
||||||
|
remainingLogs := make([]string, 0)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case log := <-logChan:
|
||||||
|
remainingLogs = append(remainingLogs, log)
|
||||||
|
default:
|
||||||
|
goto done
|
||||||
|
}
|
||||||
|
}
|
||||||
|
done:
|
||||||
|
logs = append(logs, remainingLogs...)
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
logs = []string{"Container execution timed out"}
|
||||||
|
}
|
||||||
|
|
||||||
|
var stats *container.InspectResponse
|
||||||
|
|
||||||
|
// For timed-out containers, still try to collect logs but with a short timeout
|
||||||
|
if timedOut {
|
||||||
|
// Collect any remaining logs from the channel before adding timeout message
|
||||||
|
// doneChan was already closed above
|
||||||
|
if logCallback == nil {
|
||||||
|
// If no callback was used, try to collect logs directly but with short timeout
|
||||||
|
logCtx, logCancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||||
|
finalLogs, err := s.getContainerLogs(logCtx, containerID)
|
||||||
|
logCancel()
|
||||||
|
if err == nil {
|
||||||
|
logs = finalLogs
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// If callback was used, use the streamed logs
|
||||||
|
logsMutex.Lock()
|
||||||
|
logs = make([]string, len(streamedLogs))
|
||||||
|
copy(logs, streamedLogs)
|
||||||
|
logsMutex.Unlock()
|
||||||
|
}
|
||||||
|
logs = append(logs, "Container execution timed out")
|
||||||
|
} else {
|
||||||
// Get container stats
|
// Get container stats
|
||||||
statsResponse, err := s.client.ContainerInspect(ctx, containerID)
|
statsResponse, err := s.client.ContainerInspect(ctx, containerID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -418,7 +585,7 @@ func (s *SimpleDockerRuntime) getContainerLogs(ctx context.Context, containerID
|
|||||||
|
|
||||||
// Parse Docker logs to remove binary headers
|
// Parse Docker logs to remove binary headers
|
||||||
rawOutput := parseDockerLogs(logData)
|
rawOutput := parseDockerLogs(logData)
|
||||||
|
|
||||||
// Parse the XML-tagged output to extract logs
|
// Parse the XML-tagged output to extract logs
|
||||||
parsedLogs, _, err := s.parseContainerOutput(rawOutput)
|
parsedLogs, _, err := s.parseContainerOutput(rawOutput)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -467,7 +634,7 @@ func (s *SimpleDockerRuntime) getContainerOutput(ctx context.Context, containerI
|
|||||||
|
|
||||||
// Parse Docker logs to remove binary headers
|
// Parse Docker logs to remove binary headers
|
||||||
rawOutput := parseDockerLogs(logData)
|
rawOutput := parseDockerLogs(logData)
|
||||||
|
|
||||||
// Parse the XML-tagged output to extract the result
|
// Parse the XML-tagged output to extract the result
|
||||||
_, result, err := s.parseContainerOutput(rawOutput)
|
_, result, err := s.parseContainerOutput(rawOutput)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -479,8 +646,8 @@ func (s *SimpleDockerRuntime) getContainerOutput(ctx context.Context, containerI
|
|||||||
} else {
|
} else {
|
||||||
// Return the output wrapped in a JSON object
|
// Return the output wrapped in a JSON object
|
||||||
fallbackResult := map[string]interface{}{
|
fallbackResult := map[string]interface{}{
|
||||||
"result": "Function executed successfully",
|
"result": "Function executed successfully",
|
||||||
"output": logContent,
|
"output": logContent,
|
||||||
"timestamp": time.Now().UTC(),
|
"timestamp": time.Now().UTC(),
|
||||||
}
|
}
|
||||||
resultJSON, _ := json.Marshal(fallbackResult)
|
resultJSON, _ := json.Marshal(fallbackResult)
|
||||||
@ -491,8 +658,8 @@ func (s *SimpleDockerRuntime) getContainerOutput(ctx context.Context, containerI
|
|||||||
// If no result was found in XML tags, provide a default success result
|
// If no result was found in XML tags, provide a default success result
|
||||||
if result == nil {
|
if result == nil {
|
||||||
defaultResult := map[string]interface{}{
|
defaultResult := map[string]interface{}{
|
||||||
"result": "Function executed successfully",
|
"result": "Function executed successfully",
|
||||||
"message": "No result output found",
|
"message": "No result output found",
|
||||||
"timestamp": time.Now().UTC(),
|
"timestamp": time.Now().UTC(),
|
||||||
}
|
}
|
||||||
resultJSON, _ := json.Marshal(defaultResult)
|
resultJSON, _ := json.Marshal(defaultResult)
|
||||||
@ -505,7 +672,7 @@ func (s *SimpleDockerRuntime) getContainerOutput(ctx context.Context, containerI
|
|||||||
// parseDockerLogs parses Docker log output which includes 8-byte headers
|
// parseDockerLogs parses Docker log output which includes 8-byte headers
|
||||||
func parseDockerLogs(logData []byte) string {
|
func parseDockerLogs(logData []byte) string {
|
||||||
var cleanOutput strings.Builder
|
var cleanOutput strings.Builder
|
||||||
|
|
||||||
for len(logData) > 8 {
|
for len(logData) > 8 {
|
||||||
// Docker log header: [STREAM_TYPE, 0, 0, 0, SIZE1, SIZE2, SIZE3, SIZE4]
|
// Docker log header: [STREAM_TYPE, 0, 0, 0, SIZE1, SIZE2, SIZE3, SIZE4]
|
||||||
// Skip the first 8 bytes (header)
|
// Skip the first 8 bytes (header)
|
||||||
@ -513,25 +680,25 @@ func parseDockerLogs(logData []byte) string {
|
|||||||
if len(logData) < headerSize {
|
if len(logData) < headerSize {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
// Extract size from bytes 4-7 (big endian)
|
// Extract size from bytes 4-7 (big endian)
|
||||||
size := int(logData[4])<<24 + int(logData[5])<<16 + int(logData[6])<<8 + int(logData[7])
|
size := int(logData[4])<<24 + int(logData[5])<<16 + int(logData[6])<<8 + int(logData[7])
|
||||||
|
|
||||||
if len(logData) < headerSize+size {
|
if len(logData) < headerSize+size {
|
||||||
// If the remaining data is less than expected size, take what we have
|
// If the remaining data is less than expected size, take what we have
|
||||||
size = len(logData) - headerSize
|
size = len(logData) - headerSize
|
||||||
}
|
}
|
||||||
|
|
||||||
if size > 0 {
|
if size > 0 {
|
||||||
// Extract the actual log content
|
// Extract the actual log content
|
||||||
content := string(logData[headerSize : headerSize+size])
|
content := string(logData[headerSize : headerSize+size])
|
||||||
cleanOutput.WriteString(content)
|
cleanOutput.WriteString(content)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Move to next log entry
|
// Move to next log entry
|
||||||
logData = logData[headerSize+size:]
|
logData = logData[headerSize+size:]
|
||||||
}
|
}
|
||||||
|
|
||||||
return cleanOutput.String()
|
return cleanOutput.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -581,7 +748,7 @@ func (s *SimpleDockerRuntime) parseContainerOutput(rawOutput string) (logs []str
|
|||||||
// Remove any XML tags from the output for fallback
|
// Remove any XML tags from the output for fallback
|
||||||
cleanOutput := regexp.MustCompile(`(?s)<[^>]*>`).ReplaceAllString(rawOutput, "")
|
cleanOutput := regexp.MustCompile(`(?s)<[^>]*>`).ReplaceAllString(rawOutput, "")
|
||||||
cleanOutput = strings.TrimSpace(cleanOutput)
|
cleanOutput = strings.TrimSpace(cleanOutput)
|
||||||
|
|
||||||
if cleanOutput != "" {
|
if cleanOutput != "" {
|
||||||
if json.Valid([]byte(cleanOutput)) {
|
if json.Valid([]byte(cleanOutput)) {
|
||||||
result = json.RawMessage(cleanOutput)
|
result = json.RawMessage(cleanOutput)
|
||||||
@ -599,6 +766,130 @@ func (s *SimpleDockerRuntime) parseContainerOutput(rawOutput string) (logs []str
|
|||||||
return logs, result, nil
|
return logs, result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// streamContainerLogs streams logs from a running container and sends them to a channel
|
||||||
|
func (s *SimpleDockerRuntime) streamContainerLogs(ctx context.Context, containerID string, logChan chan<- string, doneChan <-chan struct{}) {
|
||||||
|
defer close(logChan)
|
||||||
|
|
||||||
|
s.logger.Info("Starting container log streaming",
|
||||||
|
zap.String("container_id", containerID))
|
||||||
|
|
||||||
|
// Get container logs with follow option
|
||||||
|
logs, err := s.client.ContainerLogs(ctx, containerID, container.LogsOptions{
|
||||||
|
ShowStdout: true,
|
||||||
|
ShowStderr: true,
|
||||||
|
Follow: true,
|
||||||
|
Timestamps: false,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
s.logger.Error("Failed to get container logs for streaming",
|
||||||
|
zap.String("container_id", containerID),
|
||||||
|
zap.Error(err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer logs.Close()
|
||||||
|
|
||||||
|
s.logger.Debug("Successfully got container logs stream",
|
||||||
|
zap.String("container_id", containerID))
|
||||||
|
|
||||||
|
// Create a context that cancels when doneChan receives a signal
|
||||||
|
streamCtx, cancel := context.WithCancel(ctx)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// Goroutine to listen for done signal
|
||||||
|
go func() {
|
||||||
|
select {
|
||||||
|
case <-doneChan:
|
||||||
|
cancel()
|
||||||
|
case <-streamCtx.Done():
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Buffer for reading log data
|
||||||
|
buf := make([]byte, 4096)
|
||||||
|
|
||||||
|
// Continue reading until context is cancelled or EOF
|
||||||
|
totalLogLines := 0
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-streamCtx.Done():
|
||||||
|
s.logger.Debug("Stream context cancelled, stopping log streaming",
|
||||||
|
zap.String("container_id", containerID),
|
||||||
|
zap.Int("total_lines_streamed", totalLogLines))
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
n, err := logs.Read(buf)
|
||||||
|
if n > 0 {
|
||||||
|
s.logger.Debug("Read log data from container",
|
||||||
|
zap.String("container_id", containerID),
|
||||||
|
zap.Int("bytes_read", n))
|
||||||
|
|
||||||
|
// Parse Docker logs to remove binary headers
|
||||||
|
logData := buf[:n]
|
||||||
|
rawOutput := parseDockerLogs(logData)
|
||||||
|
|
||||||
|
// Send each line to the log channel, filtering out XML tags
|
||||||
|
lines := strings.Split(rawOutput, "\n")
|
||||||
|
for _, line := range lines {
|
||||||
|
trimmedLine := strings.TrimSpace(line)
|
||||||
|
// Skip empty lines and XML tags
|
||||||
|
if trimmedLine != "" &&
|
||||||
|
!strings.HasPrefix(trimmedLine, "<stdout>") &&
|
||||||
|
!strings.HasPrefix(trimmedLine, "</stdout>") &&
|
||||||
|
!strings.HasPrefix(trimmedLine, "<result>") &&
|
||||||
|
!strings.HasPrefix(trimmedLine, "</result>") &&
|
||||||
|
trimmedLine != "<stdout>" &&
|
||||||
|
trimmedLine != "</stdout>" &&
|
||||||
|
trimmedLine != "<result>" &&
|
||||||
|
trimmedLine != "</result>" {
|
||||||
|
|
||||||
|
totalLogLines++
|
||||||
|
s.logger.Debug("Sending filtered log line to channel",
|
||||||
|
zap.String("container_id", containerID),
|
||||||
|
zap.String("log_line", trimmedLine),
|
||||||
|
zap.Int("total_lines", totalLogLines))
|
||||||
|
|
||||||
|
select {
|
||||||
|
case logChan <- trimmedLine:
|
||||||
|
s.logger.Debug("Successfully sent filtered log line to channel",
|
||||||
|
zap.String("container_id", containerID))
|
||||||
|
case <-streamCtx.Done():
|
||||||
|
s.logger.Debug("Stream context cancelled while sending log line",
|
||||||
|
zap.String("container_id", containerID))
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
// Log buffer is full, warn but continue reading to avoid blocking
|
||||||
|
s.logger.Warn("Log buffer full, dropping log line",
|
||||||
|
zap.String("container_id", containerID),
|
||||||
|
zap.String("dropped_line", trimmedLine))
|
||||||
|
}
|
||||||
|
} else if trimmedLine != "" {
|
||||||
|
s.logger.Debug("Filtered out XML tag",
|
||||||
|
zap.String("container_id", containerID),
|
||||||
|
zap.String("filtered_line", trimmedLine))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
s.logger.Debug("Got EOF from container logs, container might still be running",
|
||||||
|
zap.String("container_id", containerID),
|
||||||
|
zap.Int("total_lines_streamed", totalLogLines))
|
||||||
|
// Container might still be running, continue reading
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
continue
|
||||||
|
} else {
|
||||||
|
s.logger.Error("Error reading container logs",
|
||||||
|
zap.String("container_id", containerID),
|
||||||
|
zap.Error(err),
|
||||||
|
zap.Int("total_lines_streamed", totalLogLines))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (s *SimpleDockerRuntime) cleanupContainer(ctx context.Context, containerID string) {
|
func (s *SimpleDockerRuntime) cleanupContainer(ctx context.Context, containerID string) {
|
||||||
// Remove container
|
// Remove container
|
||||||
if err := s.client.ContainerRemove(ctx, containerID, container.RemoveOptions{
|
if err := s.client.ContainerRemove(ctx, containerID, container.RemoveOptions{
|
||||||
|
|||||||
@ -8,20 +8,26 @@ import (
|
|||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// LogStreamCallback is a function that can be called to stream logs during execution
|
||||||
|
type LogStreamCallback func(logs []string) error
|
||||||
|
|
||||||
// RuntimeBackend provides function execution capabilities
|
// RuntimeBackend provides function execution capabilities
|
||||||
type RuntimeBackend interface {
|
type RuntimeBackend interface {
|
||||||
// Execute runs a function with given input
|
// Execute runs a function with given input
|
||||||
Execute(ctx context.Context, function *domain.FunctionDefinition, input json.RawMessage) (*domain.ExecutionResult, error)
|
Execute(ctx context.Context, function *domain.FunctionDefinition, input json.RawMessage) (*domain.ExecutionResult, error)
|
||||||
|
|
||||||
|
// ExecuteWithLogStreaming runs a function with given input and streams logs during execution
|
||||||
|
ExecuteWithLogStreaming(ctx context.Context, function *domain.FunctionDefinition, input json.RawMessage, logCallback LogStreamCallback) (*domain.ExecutionResult, error)
|
||||||
|
|
||||||
// Deploy prepares function for execution
|
// Deploy prepares function for execution
|
||||||
Deploy(ctx context.Context, function *domain.FunctionDefinition) error
|
Deploy(ctx context.Context, function *domain.FunctionDefinition) error
|
||||||
|
|
||||||
// Remove cleans up function resources
|
// Remove cleans up function resources
|
||||||
Remove(ctx context.Context, functionID uuid.UUID) error
|
Remove(ctx context.Context, functionID uuid.UUID) error
|
||||||
|
|
||||||
// GetLogs retrieves execution logs
|
// GetLogs retrieves execution logs
|
||||||
GetLogs(ctx context.Context, executionID uuid.UUID) ([]string, error)
|
GetLogs(ctx context.Context, executionID uuid.UUID) ([]string, error)
|
||||||
|
|
||||||
// HealthCheck verifies runtime availability
|
// HealthCheck verifies runtime availability
|
||||||
HealthCheck(ctx context.Context) error
|
HealthCheck(ctx context.Context) error
|
||||||
|
|
||||||
@ -37,11 +43,11 @@ type RuntimeBackend interface {
|
|||||||
|
|
||||||
// RuntimeInfo contains runtime backend information
|
// RuntimeInfo contains runtime backend information
|
||||||
type RuntimeInfo struct {
|
type RuntimeInfo struct {
|
||||||
Type string `json:"type"`
|
Type string `json:"type"`
|
||||||
Version string `json:"version"`
|
Version string `json:"version"`
|
||||||
Available bool `json:"available"`
|
Available bool `json:"available"`
|
||||||
Endpoint string `json:"endpoint,omitempty"`
|
Endpoint string `json:"endpoint,omitempty"`
|
||||||
Metadata map[string]string `json:"metadata,omitempty"`
|
Metadata map[string]string `json:"metadata,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// ContainerInfo contains information about a running container
|
// ContainerInfo contains information about a running container
|
||||||
@ -59,4 +65,4 @@ type RuntimeFactory interface {
|
|||||||
CreateRuntime(ctx context.Context, runtimeType string, config map[string]interface{}) (RuntimeBackend, error)
|
CreateRuntime(ctx context.Context, runtimeType string, config map[string]interface{}) (RuntimeBackend, error)
|
||||||
GetSupportedRuntimes() []string
|
GetSupportedRuntimes() []string
|
||||||
GetDefaultConfig(runtimeType string) map[string]interface{}
|
GetDefaultConfig(runtimeType string) map[string]interface{}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -11,6 +11,7 @@ import (
|
|||||||
|
|
||||||
"github.com/RyanCopley/skybridge/faas/internal/domain"
|
"github.com/RyanCopley/skybridge/faas/internal/domain"
|
||||||
"github.com/RyanCopley/skybridge/faas/internal/repository"
|
"github.com/RyanCopley/skybridge/faas/internal/repository"
|
||||||
|
"github.com/RyanCopley/skybridge/faas/internal/runtime"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -42,13 +43,13 @@ func (s *executionService) Execute(ctx context.Context, req *domain.ExecuteFunct
|
|||||||
return nil, fmt.Errorf("function not found: %w", err)
|
return nil, fmt.Errorf("function not found: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create execution record
|
// Create execution record
|
||||||
// Initialize input with empty JSON if nil or empty
|
// Initialize input with empty JSON if nil or empty
|
||||||
input := req.Input
|
input := req.Input
|
||||||
if input == nil || len(input) == 0 {
|
if input == nil || len(input) == 0 {
|
||||||
input = json.RawMessage(`{}`)
|
input = json.RawMessage(`{}`)
|
||||||
}
|
}
|
||||||
|
|
||||||
execution := &domain.FunctionExecution{
|
execution := &domain.FunctionExecution{
|
||||||
ID: uuid.New(),
|
ID: uuid.New(),
|
||||||
FunctionID: req.FunctionID,
|
FunctionID: req.FunctionID,
|
||||||
@ -112,8 +113,53 @@ func (s *executionService) executeSync(ctx context.Context, execution *domain.Fu
|
|||||||
defer cancel()
|
defer cancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Execute function
|
// Define log streaming callback
|
||||||
result, err := backend.Execute(execCtx, function, execution.Input)
|
logCallback := func(logs []string) error {
|
||||||
|
s.logger.Info("Log streaming callback called",
|
||||||
|
zap.String("execution_id", execution.ID.String()),
|
||||||
|
zap.Int("log_count", len(logs)),
|
||||||
|
zap.Strings("logs_preview", logs))
|
||||||
|
|
||||||
|
// Update execution with current logs using background context
|
||||||
|
// to ensure updates continue even after HTTP request completes
|
||||||
|
// Create a copy of the execution to avoid race conditions
|
||||||
|
execCopy := *execution
|
||||||
|
execCopy.Logs = logs
|
||||||
|
_, err := s.executionRepo.Update(context.Background(), execution.ID, &execCopy)
|
||||||
|
if err == nil {
|
||||||
|
// Only update the original if database update succeeds
|
||||||
|
execution.Logs = logs
|
||||||
|
s.logger.Info("Successfully updated execution with logs in database",
|
||||||
|
zap.String("execution_id", execution.ID.String()),
|
||||||
|
zap.Int("log_count", len(logs)))
|
||||||
|
} else {
|
||||||
|
s.logger.Error("Failed to update execution with logs in database",
|
||||||
|
zap.String("execution_id", execution.ID.String()),
|
||||||
|
zap.Error(err))
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if backend supports log streaming
|
||||||
|
type logStreamingBackend interface {
|
||||||
|
ExecuteWithLogStreaming(ctx context.Context, function *domain.FunctionDefinition, input json.RawMessage, logCallback runtime.LogStreamCallback) (*domain.ExecutionResult, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
var result *domain.ExecutionResult
|
||||||
|
if lsBackend, ok := backend.(logStreamingBackend); ok {
|
||||||
|
s.logger.Info("Backend supports log streaming, using ExecuteWithLogStreaming",
|
||||||
|
zap.String("execution_id", execution.ID.String()),
|
||||||
|
zap.String("function_id", function.ID.String()))
|
||||||
|
// Execute function with log streaming
|
||||||
|
result, err = lsBackend.ExecuteWithLogStreaming(execCtx, function, execution.Input, logCallback)
|
||||||
|
} else {
|
||||||
|
s.logger.Info("Backend does not support log streaming, using regular Execute",
|
||||||
|
zap.String("execution_id", execution.ID.String()),
|
||||||
|
zap.String("function_id", function.ID.String()))
|
||||||
|
// Fallback to regular execute
|
||||||
|
result, err = backend.Execute(execCtx, function, execution.Input)
|
||||||
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Check if this was a timeout error
|
// Check if this was a timeout error
|
||||||
if execCtx.Err() == context.DeadlineExceeded {
|
if execCtx.Err() == context.DeadlineExceeded {
|
||||||
@ -194,8 +240,53 @@ func (s *executionService) executeAsync(ctx context.Context, execution *domain.F
|
|||||||
defer cancel()
|
defer cancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Execute function
|
// Define log streaming callback
|
||||||
result, err := backend.Execute(execCtx, function, execution.Input)
|
logCallback := func(logs []string) error {
|
||||||
|
s.logger.Info("Log streaming callback called",
|
||||||
|
zap.String("execution_id", execution.ID.String()),
|
||||||
|
zap.Int("log_count", len(logs)),
|
||||||
|
zap.Strings("logs_preview", logs))
|
||||||
|
|
||||||
|
// Update execution with current logs using background context
|
||||||
|
// to ensure updates continue even after HTTP request completes
|
||||||
|
// Create a copy of the execution to avoid race conditions
|
||||||
|
execCopy := *execution
|
||||||
|
execCopy.Logs = logs
|
||||||
|
_, err := s.executionRepo.Update(context.Background(), execution.ID, &execCopy)
|
||||||
|
if err == nil {
|
||||||
|
// Only update the original if database update succeeds
|
||||||
|
execution.Logs = logs
|
||||||
|
s.logger.Info("Successfully updated execution with logs in database",
|
||||||
|
zap.String("execution_id", execution.ID.String()),
|
||||||
|
zap.Int("log_count", len(logs)))
|
||||||
|
} else {
|
||||||
|
s.logger.Error("Failed to update execution with logs in database",
|
||||||
|
zap.String("execution_id", execution.ID.String()),
|
||||||
|
zap.Error(err))
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if backend supports log streaming
|
||||||
|
type logStreamingBackend interface {
|
||||||
|
ExecuteWithLogStreaming(ctx context.Context, function *domain.FunctionDefinition, input json.RawMessage, logCallback runtime.LogStreamCallback) (*domain.ExecutionResult, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
var result *domain.ExecutionResult
|
||||||
|
if lsBackend, ok := backend.(logStreamingBackend); ok {
|
||||||
|
s.logger.Info("Backend supports log streaming, using ExecuteWithLogStreaming",
|
||||||
|
zap.String("execution_id", execution.ID.String()),
|
||||||
|
zap.String("function_id", function.ID.String()))
|
||||||
|
// Execute function with log streaming
|
||||||
|
result, err = lsBackend.ExecuteWithLogStreaming(execCtx, function, execution.Input, logCallback)
|
||||||
|
} else {
|
||||||
|
s.logger.Info("Backend does not support log streaming, using regular Execute",
|
||||||
|
zap.String("execution_id", execution.ID.String()),
|
||||||
|
zap.String("function_id", function.ID.String()))
|
||||||
|
// Fallback to regular execute
|
||||||
|
result, err = backend.Execute(execCtx, function, execution.Input)
|
||||||
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Check if this was a timeout error
|
// Check if this was a timeout error
|
||||||
if execCtx.Err() == context.DeadlineExceeded {
|
if execCtx.Err() == context.DeadlineExceeded {
|
||||||
@ -329,17 +420,35 @@ func (s *executionService) Cancel(ctx context.Context, id uuid.UUID, userID stri
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *executionService) GetLogs(ctx context.Context, id uuid.UUID) ([]string, error) {
|
func (s *executionService) GetLogs(ctx context.Context, id uuid.UUID) ([]string, error) {
|
||||||
|
s.logger.Debug("GetLogs called in execution service",
|
||||||
|
zap.String("execution_id", id.String()))
|
||||||
|
|
||||||
// Get execution with logs from database
|
// Get execution with logs from database
|
||||||
execution, err := s.executionRepo.GetByID(ctx, id)
|
execution, err := s.executionRepo.GetByID(ctx, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
s.logger.Error("Failed to get execution from database in GetLogs",
|
||||||
|
zap.String("execution_id", id.String()),
|
||||||
|
zap.Error(err))
|
||||||
return nil, fmt.Errorf("execution not found: %w", err)
|
return nil, fmt.Errorf("execution not found: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.logger.Info("Retrieved execution from database",
|
||||||
|
zap.String("execution_id", id.String()),
|
||||||
|
zap.String("status", string(execution.Status)),
|
||||||
|
zap.Int("log_count", len(execution.Logs)),
|
||||||
|
zap.Bool("logs_nil", execution.Logs == nil))
|
||||||
|
|
||||||
// Return logs from execution record
|
// Return logs from execution record
|
||||||
if execution.Logs == nil {
|
if execution.Logs == nil {
|
||||||
|
s.logger.Debug("Execution has nil logs, returning empty slice",
|
||||||
|
zap.String("execution_id", id.String()))
|
||||||
return []string{}, nil
|
return []string{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.logger.Debug("Returning logs from execution",
|
||||||
|
zap.String("execution_id", id.String()),
|
||||||
|
zap.Int("log_count", len(execution.Logs)))
|
||||||
|
|
||||||
return execution.Logs, nil
|
return execution.Logs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
16
faas/test_functions/streaming_logs.js
Normal file
16
faas/test_functions/streaming_logs.js
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
module.exports.handler = async (input, context) => {
|
||||||
|
console.log("Starting function execution");
|
||||||
|
|
||||||
|
for (let i = 1; i <= 10; i++) {
|
||||||
|
console.log(`Processing step ${i}`);
|
||||||
|
// Wait 1 second between log outputs
|
||||||
|
await new Promise(resolve => setTimeout(resolve, 1000));
|
||||||
|
}
|
||||||
|
|
||||||
|
console.log("Function execution completed");
|
||||||
|
|
||||||
|
return {
|
||||||
|
message: "Function executed successfully",
|
||||||
|
steps: 10
|
||||||
|
};
|
||||||
|
};
|
||||||
16
faas/test_functions/streaming_logs.py
Normal file
16
faas/test_functions/streaming_logs.py
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
import time
|
||||||
|
|
||||||
|
def handler(input, context):
|
||||||
|
print("Starting function execution")
|
||||||
|
|
||||||
|
for i in range(1, 11):
|
||||||
|
print(f"Processing step {i}")
|
||||||
|
# Wait 1 second between log outputs
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
print("Function execution completed")
|
||||||
|
|
||||||
|
return {
|
||||||
|
"message": "Function executed successfully",
|
||||||
|
"steps": 10
|
||||||
|
}
|
||||||
@ -156,17 +156,24 @@ export const ExecutionModal: React.FC<ExecutionModalProps> = ({
|
|||||||
|
|
||||||
const loadLogs = async (executionId: string) => {
|
const loadLogs = async (executionId: string) => {
|
||||||
try {
|
try {
|
||||||
|
console.debug(`[ExecutionModal] Loading logs for execution ${executionId}`);
|
||||||
setLoadingLogs(true);
|
setLoadingLogs(true);
|
||||||
const response = await executionApi.getLogs(executionId);
|
const response = await executionApi.getLogs(executionId);
|
||||||
|
console.debug(`[ExecutionModal] Loaded logs for execution ${executionId}:`, {
|
||||||
|
logCount: response.data.logs?.length || 0,
|
||||||
|
logs: response.data.logs
|
||||||
|
});
|
||||||
setLogs(response.data.logs || []);
|
setLogs(response.data.logs || []);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('Error loading logs:', error);
|
console.error(`[ExecutionModal] Error loading logs for execution ${executionId}:`, error);
|
||||||
} finally {
|
} finally {
|
||||||
setLoadingLogs(false);
|
setLoadingLogs(false);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
const startLogsAutoRefresh = (executionId: string) => {
|
const startLogsAutoRefresh = (executionId: string) => {
|
||||||
|
console.debug(`[ExecutionModal] Starting auto-refresh for execution ${executionId}`);
|
||||||
|
|
||||||
// Clear any existing interval
|
// Clear any existing interval
|
||||||
if (logsPollIntervalRef.current) {
|
if (logsPollIntervalRef.current) {
|
||||||
clearInterval(logsPollIntervalRef.current);
|
clearInterval(logsPollIntervalRef.current);
|
||||||
@ -180,10 +187,15 @@ export const ExecutionModal: React.FC<ExecutionModalProps> = ({
|
|||||||
// Set up auto-refresh every 2 seconds
|
// Set up auto-refresh every 2 seconds
|
||||||
logsPollIntervalRef.current = setInterval(async () => {
|
logsPollIntervalRef.current = setInterval(async () => {
|
||||||
try {
|
try {
|
||||||
|
console.debug(`[ExecutionModal] Auto-refreshing logs for execution ${executionId}`);
|
||||||
const response = await executionApi.getLogs(executionId);
|
const response = await executionApi.getLogs(executionId);
|
||||||
|
console.debug(`[ExecutionModal] Auto-refresh got logs for execution ${executionId}:`, {
|
||||||
|
logCount: response.data.logs?.length || 0,
|
||||||
|
logs: response.data.logs
|
||||||
|
});
|
||||||
setLogs(response.data.logs || []);
|
setLogs(response.data.logs || []);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('Error auto-refreshing logs:', error);
|
console.error(`[ExecutionModal] Error auto-refreshing logs for execution ${executionId}:`, error);
|
||||||
}
|
}
|
||||||
}, 2000);
|
}, 2000);
|
||||||
};
|
};
|
||||||
|
|||||||
@ -75,8 +75,21 @@ export const executionApi = {
|
|||||||
cancel: (id: string) =>
|
cancel: (id: string) =>
|
||||||
api.delete(`/executions/${id}`),
|
api.delete(`/executions/${id}`),
|
||||||
|
|
||||||
getLogs: (id: string) =>
|
getLogs: (id: string) => {
|
||||||
api.get<{ logs: string[] }>(`/executions/${id}/logs`),
|
console.debug(`[API] Fetching logs for execution ${id}`);
|
||||||
|
return api.get<{ logs: string[] }>(`/executions/${id}/logs`)
|
||||||
|
.then(response => {
|
||||||
|
console.debug(`[API] Successfully fetched logs for execution ${id}:`, {
|
||||||
|
logCount: response.data.logs?.length || 0,
|
||||||
|
logs: response.data.logs
|
||||||
|
});
|
||||||
|
return response;
|
||||||
|
})
|
||||||
|
.catch(error => {
|
||||||
|
console.error(`[API] Failed to fetch logs for execution ${id}:`, error);
|
||||||
|
throw error;
|
||||||
|
});
|
||||||
|
},
|
||||||
|
|
||||||
getRunning: () =>
|
getRunning: () =>
|
||||||
api.get<{ executions: FunctionExecution[]; count: number }>('/executions/running'),
|
api.get<{ executions: FunctionExecution[]; count: number }>('/executions/running'),
|
||||||
|
|||||||
Reference in New Issue
Block a user