Files
skybridge/faas/internal/runtime/docker/docker.go.bak
2025-08-30 21:17:23 -04:00

431 lines
12 KiB
Go

package docker
import (
"context"
"encoding/json"
"fmt"
"io"
"strings"
"time"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/image"
"github.com/docker/docker/client"
"github.com/google/uuid"
"go.uber.org/zap"
"github.com/RyanCopley/skybridge/faas/internal/domain"
"github.com/RyanCopley/skybridge/faas/internal/runtime"
)
type DockerRuntime struct {
client *client.Client
logger *zap.Logger
config *Config
}
type Config struct {
DockerHost string `json:"docker_host"`
NetworkMode string `json:"network_mode"`
SecurityOpts []string `json:"security_opts"`
DefaultLabels map[string]string `json:"default_labels"`
MaxCPUs float64 `json:"max_cpus"`
MaxMemory int64 `json:"max_memory"`
TimeoutSeconds int `json:"timeout_seconds"`
}
func NewDockerRuntime(logger *zap.Logger, cfg *Config) (*DockerRuntime, error) {
if cfg == nil {
cfg = &Config{
NetworkMode: "bridge",
SecurityOpts: []string{"no-new-privileges:true"},
DefaultLabels: map[string]string{"service": "faas"},
MaxCPUs: 2.0,
MaxMemory: 512 * 1024 * 1024, // 512MB
TimeoutSeconds: 300,
}
}
var cli *client.Client
var err error
if cfg.DockerHost != "" {
cli, err = client.NewClientWithOpts(client.WithHost(cfg.DockerHost))
} else {
cli, err = client.NewClientWithOpts(client.FromEnv)
}
if err != nil {
return nil, fmt.Errorf("failed to create Docker client: %w", err)
}
return &DockerRuntime{
client: cli,
logger: logger,
config: cfg,
}, nil
}
func (d *DockerRuntime) Execute(ctx context.Context, function *domain.FunctionDefinition, input json.RawMessage) (*domain.ExecutionResult, error) {
executionID := uuid.New()
startTime := time.Now()
d.logger.Info("Starting function execution",
zap.String("function_id", function.ID.String()),
zap.String("execution_id", executionID.String()),
zap.String("image", function.Image))
// Create container configuration
containerConfig := &container.Config{
Image: function.Image,
Env: d.buildEnvironment(function, input),
Labels: map[string]string{
"faas.function_id": function.ID.String(),
"faas.execution_id": executionID.String(),
"faas.function_name": function.Name,
},
WorkingDir: "/app",
Cmd: []string{function.Handler},
}
// Add default labels
for k, v := range d.config.DefaultLabels {
containerConfig.Labels[k] = v
}
// Create host configuration with resource limits
hostConfig := &container.HostConfig{
Resources: container.Resources{
Memory: int64(function.Memory) * 1024 * 1024, // Convert MB to bytes
CPUQuota: int64(d.config.MaxCPUs * 100000), // CPU quota in microseconds
CPUPeriod: 100000, // CPU period in microseconds
},
NetworkMode: container.NetworkMode(d.config.NetworkMode),
SecurityOpt: d.config.SecurityOpts,
AutoRemove: true,
}
// Create container
resp, err := d.client.ContainerCreate(ctx, containerConfig, hostConfig, nil, nil, "")
if err != nil {
return &domain.ExecutionResult{
Error: fmt.Sprintf("failed to create container: %v", err),
Duration: time.Since(startTime),
}, nil
}
containerID := resp.ID
// Start container
if err := d.client.ContainerStart(ctx, containerID, container.StartOptions{}); err != nil {
return &domain.ExecutionResult{
Error: fmt.Sprintf("failed to start container: %v", err),
Duration: time.Since(startTime),
}, nil
}
// Wait for container to finish with timeout
timeoutCtx, cancel := context.WithTimeout(ctx, function.Timeout.Duration)
defer cancel()
statusCh, errCh := d.client.ContainerWait(timeoutCtx, containerID, container.WaitConditionNotRunning)
var waitResult container.WaitResponse
select {
case result := <-statusCh:
waitResult = result
case err := <-errCh:
d.client.ContainerKill(ctx, containerID, "SIGTERM")
return &domain.ExecutionResult{
Error: fmt.Sprintf("container wait error: %v", err),
Duration: time.Since(startTime),
}, nil
case <-timeoutCtx.Done():
d.client.ContainerKill(ctx, containerID, "SIGTERM")
return &domain.ExecutionResult{
Error: "execution timeout",
Duration: time.Since(startTime),
}, nil
}
// Get container logs
logs, err := d.getContainerLogs(ctx, containerID)
if err != nil {
d.logger.Warn("Failed to get container logs", zap.Error(err))
}
// Get container stats for memory usage
memoryUsed := d.getMemoryUsage(ctx, containerID)
duration := time.Since(startTime)
// Parse output from logs if successful
var output json.RawMessage
var execError string
if waitResult.StatusCode == 0 {
// Extract output from logs (assuming last line contains JSON output)
if len(logs) > 0 {
lastLog := logs[len(logs)-1]
if json.Valid([]byte(lastLog)) {
output = json.RawMessage(lastLog)
} else {
output = json.RawMessage(fmt.Sprintf(`{"result": "%s"}`, lastLog))
}
}
} else {
execError = fmt.Sprintf("container exited with code %d", waitResult.StatusCode)
if len(logs) > 0 {
execError += ": " + strings.Join(logs, "\n")
}
}
d.logger.Info("Function execution completed",
zap.String("function_id", function.ID.String()),
zap.String("execution_id", executionID.String()),
zap.Duration("duration", duration),
zap.Int64("status_code", waitResult.StatusCode),
zap.Int("memory_used", memoryUsed))
return &domain.ExecutionResult{
Output: output,
Error: execError,
Duration: duration,
MemoryUsed: memoryUsed,
Logs: logs,
}, nil
}
func (d *DockerRuntime) Deploy(ctx context.Context, function *domain.FunctionDefinition) error {
d.logger.Info("Deploying function",
zap.String("function_id", function.ID.String()),
zap.String("image", function.Image))
// Pull image
reader, err := d.client.ImagePull(ctx, function.Image, image.PullOptions{})
if err != nil {
return fmt.Errorf("failed to pull image %s: %w", function.Image, err)
}
defer reader.Close()
// Read the pull response to ensure it completes
_, err = io.ReadAll(reader)
if err != nil {
return fmt.Errorf("failed to complete image pull: %w", err)
}
d.logger.Info("Function deployed successfully",
zap.String("function_id", function.ID.String()),
zap.String("image", function.Image))
return nil
}
func (d *DockerRuntime) Remove(ctx context.Context, functionID uuid.UUID) error {
d.logger.Info("Removing function containers", zap.String("function_id", functionID.String()))
// List containers with the function label
filters := filters.NewArgs()
filters.Add("label", fmt.Sprintf("faas.function_id=%s", functionID.String()))
containers, err := d.client.ContainerList(ctx, container.ListOptions{
All: true,
Filters: filters,
})
if err != nil {
return fmt.Errorf("failed to list containers: %w", err)
}
// Remove containers
for _, container := range containers {
if err := d.client.ContainerRemove(ctx, container.ID, struct {
Force bool
}{Force: true}); err != nil {
d.logger.Warn("Failed to remove container",
zap.String("container_id", container.ID),
zap.Error(err))
}
}
return nil
}
func (d *DockerRuntime) GetLogs(ctx context.Context, executionID uuid.UUID) ([]string, error) {
// Find container by execution ID
filters := filters.NewArgs()
filters.Add("label", fmt.Sprintf("faas.execution_id=%s", executionID.String()))
containers, err := d.client.ContainerList(ctx, container.ListOptions{
All: true,
Filters: filters,
})
if err != nil {
return nil, fmt.Errorf("failed to list containers: %w", err)
}
if len(containers) == 0 {
return nil, fmt.Errorf("no container found for execution %s", executionID.String())
}
return d.getContainerLogs(ctx, containers[0].ID)
}
func (d *DockerRuntime) HealthCheck(ctx context.Context) error {
_, err := d.client.Ping(ctx)
if err != nil {
return fmt.Errorf("Docker daemon not accessible: %w", err)
}
return nil
}
func (d *DockerRuntime) GetInfo(ctx context.Context) (*runtime.RuntimeInfo, error) {
info, err := d.client.Info(ctx)
if err != nil {
return &runtime.RuntimeInfo{
Type: "docker",
Available: false,
}, nil
}
return &runtime.RuntimeInfo{
Type: "docker",
Version: info.ServerVersion,
Available: true,
Endpoint: d.client.DaemonHost(),
Metadata: map[string]string{
"containers": fmt.Sprintf("%d", info.Containers),
"images": fmt.Sprintf("%d", info.Images),
},
}, nil
}
func (d *DockerRuntime) ListContainers(ctx context.Context) ([]runtime.ContainerInfo, error) {
filters := filters.NewArgs()
filters.Add("label", "service=faas")
containers, err := d.client.ContainerList(ctx, container.ListOptions{
All: true,
Filters: filters,
})
if err != nil {
return nil, fmt.Errorf("failed to list containers: %w", err)
}
var result []runtime.ContainerInfo
for _, container := range containers {
functionIDStr, exists := container.Labels["faas.function_id"]
if !exists {
continue
}
functionID, err := uuid.Parse(functionIDStr)
if err != nil {
continue
}
result = append(result, runtime.ContainerInfo{
ID: container.ID,
FunctionID: functionID,
Status: container.Status,
Image: container.Image,
CreatedAt: time.Unix(container.Created, 0).Format(time.RFC3339),
Labels: container.Labels,
})
}
return result, nil
}
func (d *DockerRuntime) StopExecution(ctx context.Context, executionID uuid.UUID) error {
// Find container by execution ID
filters := filters.NewArgs()
filters.Add("label", fmt.Sprintf("faas.execution_id=%s", executionID.String()))
containers, err := d.client.ContainerList(ctx, container.ListOptions{
All: true,
Filters: filters,
})
if err != nil {
return fmt.Errorf("failed to list containers: %w", err)
}
if len(containers) == 0 {
return fmt.Errorf("no container found for execution %s", executionID.String())
}
// Stop container
timeout := 10
return d.client.ContainerStop(ctx, containers[0].ID, container.StopOptions{Timeout: &timeout})
}
func (d *DockerRuntime) buildEnvironment(function *domain.FunctionDefinition, input json.RawMessage) []string {
env := []string{
fmt.Sprintf("FAAS_FUNCTION_ID=%s", function.ID.String()),
fmt.Sprintf("FAAS_FUNCTION_NAME=%s", function.Name),
fmt.Sprintf("FAAS_RUNTIME=%s", function.Runtime),
fmt.Sprintf("FAAS_HANDLER=%s", function.Handler),
fmt.Sprintf("FAAS_MEMORY=%d", function.Memory),
fmt.Sprintf("FAAS_TIMEOUT=%s", function.Timeout.String()),
}
// Add function-specific environment variables
for key, value := range function.Environment {
env = append(env, fmt.Sprintf("%s=%s", key, value))
}
// Add input as environment variable if provided
if input != nil {
env = append(env, fmt.Sprintf("FAAS_INPUT=%s", string(input)))
}
return env
}
func (d *DockerRuntime) getContainerLogs(ctx context.Context, containerID string) ([]string, error) {
options := container.LogsOptions{
ShowStdout: true,
ShowStderr: true,
Timestamps: false,
}
reader, err := d.client.ContainerLogs(ctx, containerID, options)
if err != nil {
return nil, fmt.Errorf("failed to get container logs: %w", err)
}
defer reader.Close()
logs, err := io.ReadAll(reader)
if err != nil {
return nil, fmt.Errorf("failed to read container logs: %w", err)
}
// Split logs into lines and remove empty lines
lines := strings.Split(string(logs), "\n")
var result []string
for _, line := range lines {
if strings.TrimSpace(line) != "" {
result = append(result, line)
}
}
return result, nil
}
func (d *DockerRuntime) getMemoryUsage(ctx context.Context, containerID string) int {
stats, err := d.client.ContainerStats(ctx, containerID, false)
if err != nil {
d.logger.Warn("Failed to get container stats", zap.Error(err))
return 0
}
defer stats.Body.Close()
var containerStats struct {
MemoryStats struct {
Usage uint64 `json:"usage"`
} `json:"memory_stats"`
}
if err := json.NewDecoder(stats.Body).Decode(&containerStats); err != nil {
d.logger.Warn("Failed to decode container stats", zap.Error(err))
return 0
}
// Return memory usage in MB
return int(containerStats.MemoryStats.Usage / 1024 / 1024)
}