faas-clinet
This commit is contained in:
396
faas-client/client.go
Normal file
396
faas-client/client.go
Normal file
@ -0,0 +1,396 @@
|
||||
package faasclient
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// Client represents the FaaS API client
|
||||
type Client struct {
|
||||
baseURL string
|
||||
httpClient *http.Client
|
||||
authHeader map[string]string
|
||||
}
|
||||
|
||||
// NewClient creates a new FaaS client
|
||||
func NewClient(baseURL string, options ...ClientOption) *Client {
|
||||
client := &Client{
|
||||
baseURL: baseURL,
|
||||
httpClient: http.DefaultClient,
|
||||
authHeader: make(map[string]string),
|
||||
}
|
||||
|
||||
for _, option := range options {
|
||||
option(client)
|
||||
}
|
||||
|
||||
return client
|
||||
}
|
||||
|
||||
// ClientOption represents a configuration option for the client
|
||||
type ClientOption func(*Client)
|
||||
|
||||
// WithHTTPClient sets a custom HTTP client
|
||||
func WithHTTPClient(httpClient *http.Client) ClientOption {
|
||||
return func(c *Client) {
|
||||
c.httpClient = httpClient
|
||||
}
|
||||
}
|
||||
|
||||
// WithAuth sets authentication headers
|
||||
func WithAuth(headers map[string]string) ClientOption {
|
||||
return func(c *Client) {
|
||||
for k, v := range headers {
|
||||
c.authHeader[k] = v
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// WithUserEmail sets the X-User-Email header for authentication
|
||||
func WithUserEmail(email string) ClientOption {
|
||||
return func(c *Client) {
|
||||
c.authHeader["X-User-Email"] = email
|
||||
}
|
||||
}
|
||||
|
||||
// doRequest performs an HTTP request
|
||||
func (c *Client) doRequest(ctx context.Context, method, path string, body interface{}) (*http.Response, error) {
|
||||
var reqBody io.Reader
|
||||
if body != nil {
|
||||
jsonData, err := json.Marshal(body)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to marshal request body: %w", err)
|
||||
}
|
||||
reqBody = bytes.NewBuffer(jsonData)
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, method, c.baseURL+path, reqBody)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create request: %w", err)
|
||||
}
|
||||
|
||||
if body != nil {
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
}
|
||||
|
||||
// Add authentication headers
|
||||
for k, v := range c.authHeader {
|
||||
req.Header.Set(k, v)
|
||||
}
|
||||
|
||||
return c.httpClient.Do(req)
|
||||
}
|
||||
|
||||
// CreateFunction creates a new function
|
||||
func (c *Client) CreateFunction(ctx context.Context, req *CreateFunctionRequest) (*FunctionDefinition, error) {
|
||||
resp, err := c.doRequest(ctx, "POST", "/api/v1/functions", req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusCreated {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return nil, fmt.Errorf("create function failed with status %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
var function FunctionDefinition
|
||||
if err := json.NewDecoder(resp.Body).Decode(&function); err != nil {
|
||||
return nil, fmt.Errorf("failed to decode response: %w", err)
|
||||
}
|
||||
|
||||
return &function, nil
|
||||
}
|
||||
|
||||
// GetFunction retrieves a function by ID
|
||||
func (c *Client) GetFunction(ctx context.Context, id uuid.UUID) (*FunctionDefinition, error) {
|
||||
resp, err := c.doRequest(ctx, "GET", "/api/v1/functions/"+id.String(), nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return nil, fmt.Errorf("get function failed with status %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
var function FunctionDefinition
|
||||
if err := json.NewDecoder(resp.Body).Decode(&function); err != nil {
|
||||
return nil, fmt.Errorf("failed to decode response: %w", err)
|
||||
}
|
||||
|
||||
return &function, nil
|
||||
}
|
||||
|
||||
// UpdateFunction updates an existing function
|
||||
func (c *Client) UpdateFunction(ctx context.Context, id uuid.UUID, req *UpdateFunctionRequest) (*FunctionDefinition, error) {
|
||||
resp, err := c.doRequest(ctx, "PUT", "/api/v1/functions/"+id.String(), req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return nil, fmt.Errorf("update function failed with status %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
var function FunctionDefinition
|
||||
if err := json.NewDecoder(resp.Body).Decode(&function); err != nil {
|
||||
return nil, fmt.Errorf("failed to decode response: %w", err)
|
||||
}
|
||||
|
||||
return &function, nil
|
||||
}
|
||||
|
||||
// DeleteFunction deletes a function
|
||||
func (c *Client) DeleteFunction(ctx context.Context, id uuid.UUID) error {
|
||||
resp, err := c.doRequest(ctx, "DELETE", "/api/v1/functions/"+id.String(), nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return fmt.Errorf("delete function failed with status %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListFunctions lists functions with optional filtering
|
||||
func (c *Client) ListFunctions(ctx context.Context, appID string, limit, offset int) (*ListFunctionsResponse, error) {
|
||||
params := url.Values{}
|
||||
if appID != "" {
|
||||
params.Set("app_id", appID)
|
||||
}
|
||||
if limit > 0 {
|
||||
params.Set("limit", strconv.Itoa(limit))
|
||||
}
|
||||
if offset > 0 {
|
||||
params.Set("offset", strconv.Itoa(offset))
|
||||
}
|
||||
|
||||
path := "/api/v1/functions"
|
||||
if len(params) > 0 {
|
||||
path += "?" + params.Encode()
|
||||
}
|
||||
|
||||
resp, err := c.doRequest(ctx, "GET", path, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return nil, fmt.Errorf("list functions failed with status %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
var response ListFunctionsResponse
|
||||
if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
|
||||
return nil, fmt.Errorf("failed to decode response: %w", err)
|
||||
}
|
||||
|
||||
return &response, nil
|
||||
}
|
||||
|
||||
// DeployFunction deploys a function
|
||||
func (c *Client) DeployFunction(ctx context.Context, id uuid.UUID, req *DeployFunctionRequest) (*DeployFunctionResponse, error) {
|
||||
if req == nil {
|
||||
req = &DeployFunctionRequest{FunctionID: id}
|
||||
}
|
||||
req.FunctionID = id
|
||||
|
||||
resp, err := c.doRequest(ctx, "POST", "/api/v1/functions/"+id.String()+"/deploy", req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return nil, fmt.Errorf("deploy function failed with status %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
var response DeployFunctionResponse
|
||||
if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
|
||||
return nil, fmt.Errorf("failed to decode response: %w", err)
|
||||
}
|
||||
|
||||
return &response, nil
|
||||
}
|
||||
|
||||
// ExecuteFunction executes a function synchronously or asynchronously
|
||||
func (c *Client) ExecuteFunction(ctx context.Context, req *ExecuteFunctionRequest) (*ExecuteFunctionResponse, error) {
|
||||
resp, err := c.doRequest(ctx, "POST", "/api/v1/functions/"+req.FunctionID.String()+"/execute", req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return nil, fmt.Errorf("execute function failed with status %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
var response ExecuteFunctionResponse
|
||||
if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
|
||||
return nil, fmt.Errorf("failed to decode response: %w", err)
|
||||
}
|
||||
|
||||
return &response, nil
|
||||
}
|
||||
|
||||
// InvokeFunction invokes a function asynchronously
|
||||
func (c *Client) InvokeFunction(ctx context.Context, functionID uuid.UUID, input json.RawMessage) (*ExecuteFunctionResponse, error) {
|
||||
req := &ExecuteFunctionRequest{
|
||||
FunctionID: functionID,
|
||||
Input: input,
|
||||
Async: true,
|
||||
}
|
||||
|
||||
resp, err := c.doRequest(ctx, "POST", "/api/v1/functions/"+functionID.String()+"/invoke", req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusAccepted {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return nil, fmt.Errorf("invoke function failed with status %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
var response ExecuteFunctionResponse
|
||||
if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
|
||||
return nil, fmt.Errorf("failed to decode response: %w", err)
|
||||
}
|
||||
|
||||
return &response, nil
|
||||
}
|
||||
|
||||
// GetExecution retrieves an execution by ID
|
||||
func (c *Client) GetExecution(ctx context.Context, id uuid.UUID) (*FunctionExecution, error) {
|
||||
resp, err := c.doRequest(ctx, "GET", "/api/v1/executions/"+id.String(), nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return nil, fmt.Errorf("get execution failed with status %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
var execution FunctionExecution
|
||||
if err := json.NewDecoder(resp.Body).Decode(&execution); err != nil {
|
||||
return nil, fmt.Errorf("failed to decode response: %w", err)
|
||||
}
|
||||
|
||||
return &execution, nil
|
||||
}
|
||||
|
||||
// ListExecutions lists executions with optional filtering
|
||||
func (c *Client) ListExecutions(ctx context.Context, functionID *uuid.UUID, limit, offset int) (*ListExecutionsResponse, error) {
|
||||
params := url.Values{}
|
||||
if functionID != nil {
|
||||
params.Set("function_id", functionID.String())
|
||||
}
|
||||
if limit > 0 {
|
||||
params.Set("limit", strconv.Itoa(limit))
|
||||
}
|
||||
if offset > 0 {
|
||||
params.Set("offset", strconv.Itoa(offset))
|
||||
}
|
||||
|
||||
path := "/api/v1/executions"
|
||||
if len(params) > 0 {
|
||||
path += "?" + params.Encode()
|
||||
}
|
||||
|
||||
resp, err := c.doRequest(ctx, "GET", path, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return nil, fmt.Errorf("list executions failed with status %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
var response ListExecutionsResponse
|
||||
if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
|
||||
return nil, fmt.Errorf("failed to decode response: %w", err)
|
||||
}
|
||||
|
||||
return &response, nil
|
||||
}
|
||||
|
||||
// CancelExecution cancels a running execution
|
||||
func (c *Client) CancelExecution(ctx context.Context, id uuid.UUID) error {
|
||||
resp, err := c.doRequest(ctx, "POST", "/api/v1/executions/"+id.String()+"/cancel", nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return fmt.Errorf("cancel execution failed with status %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetExecutionLogs retrieves logs for an execution
|
||||
func (c *Client) GetExecutionLogs(ctx context.Context, id uuid.UUID) (*GetLogsResponse, error) {
|
||||
resp, err := c.doRequest(ctx, "GET", "/api/v1/executions/"+id.String()+"/logs", nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return nil, fmt.Errorf("get execution logs failed with status %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
var response GetLogsResponse
|
||||
if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
|
||||
return nil, fmt.Errorf("failed to decode response: %w", err)
|
||||
}
|
||||
|
||||
return &response, nil
|
||||
}
|
||||
|
||||
// GetRunningExecutions retrieves all currently running executions
|
||||
func (c *Client) GetRunningExecutions(ctx context.Context) (*GetRunningExecutionsResponse, error) {
|
||||
resp, err := c.doRequest(ctx, "GET", "/api/v1/executions/running", nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return nil, fmt.Errorf("get running executions failed with status %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
var response GetRunningExecutionsResponse
|
||||
if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
|
||||
return nil, fmt.Errorf("failed to decode response: %w", err)
|
||||
}
|
||||
|
||||
return &response, nil
|
||||
}
|
||||
Reference in New Issue
Block a user