Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
name: CI

on:
push:
branches: [main]
pull_request:
branches: [main]

jobs:
test:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
go-version: ['1.21', '1.22']
steps:
- uses: actions/checkout@v4
- name: Set up Go ${{ matrix.go-version }}
uses: actions/setup-go@v5
with:
go-version: ${{ matrix.go-version }}
- name: Build
run: go build ./...
- name: Test
run: go test ./... -race -count=1
- name: Format check
run: |
if [ -n "$(gofmt -l .)" ]; then
echo "::error::gofmt found unformatted files:"
gofmt -l .
exit 1
fi
- name: Vet
run: go vet ./...
39 changes: 39 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,44 @@ client.Permits.Lineage(ctx, id) // Get lineage
client.Permits.Bundle(ctx, id) // Full audit bundle
```

### Workflows

Declare workflow intent before a multi-call run, then carry the workflow ID through `context.Context`. The SDK automatically injects `X-Keel-Workflow-Id` from the context on outbound Keel API requests.

```go
expectedCalls := 10000
maxCalls := 12000

workflow, err := client.Workflows.Declare(ctx, keel.WorkflowDeclareRequest{
WorkflowID: "invoice-batch-2027-01-05",
Intent: keel.WorkflowIntent{
ExpectedCalls: &expectedCalls,
MaxCalls: &maxCalls,
},
})
if err != nil {
log.Fatal(err)
}

workflowCtx := keel.WithWorkflow(ctx, workflow.WorkflowID)
_, err = client.Permits.Create(workflowCtx, permitReq)
if err != nil {
log.Fatal(err)
}
```

`RunInWorkflow` is a convenience wrapper when you want to scope several calls to the same workflow context.

Workflow APIs mirror the rest of the SDK:

```go
client.Workflows.Declare(ctx, req)
client.Workflows.Amend(ctx, workflowID, req)
client.Workflows.Complete(ctx, workflowID)
client.Workflows.Get(ctx, workflowID)
client.Workflows.List(ctx, params)
```

### Executions

```go
Expand Down Expand Up @@ -225,6 +263,7 @@ source .env && export KEEL_BASE_URL KEEL_API_KEY KEEL_PROJECT_ID
go run ./examples/quickstart
go run ./examples/provider-swap
go run ./examples/end-to-end
go run ./examples/workflows
```

## Error Handling
Expand Down
1 change: 1 addition & 0 deletions apikeys.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func (c *ApiKeysClient) List(ctx context.Context, params ApiKeyListParams) (*Api
}
return &resp, nil
}

// Revoke revokes an API key.
func (c *ApiKeysClient) Revoke(ctx context.Context, keyID string) error {
_, err := c.t.post(ctx, "/v1/api-keys/"+keyID+"/revoke", nil, nil)
Expand Down
65 changes: 54 additions & 11 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,52 @@ import (

// Reason code constants for permit denials and throttles (Shape D).
const (
ReasonBudgetRequestCapExceeded = "budget.request_cap_exceeded"
ReasonBudgetDailyCapExceeded = "budget.daily_cap_exceeded"
ReasonBudgetMonthlyCapExceeded = "budget.monthly_cap_exceeded"
ReasonBudgetMonthlyThresholdExceeded = "budget.monthly_threshold_exceeded"
ReasonBudgetDailySpikeDetected = "budget.daily_spike_detected"
ReasonBudgetRateLimitExceeded = "budget.rate_limit_exceeded"
ReasonBudgetRateLimitThrottled = "budget.rate_limit_throttled"
ReasonBudgetPricingUnavailable = "budget.pricing_unavailable"
ReasonPolicyModelNotAllowed = "policy.model_not_allowed"
ReasonPolicyRuleDenied = "policy.rule_denied"
ReasonPolicyReviewRequired = "policy.review_required"
ReasonBudgetRequestCapExceeded = "budget.request_cap_exceeded"
ReasonBudgetDailyCapExceeded = "budget.daily_cap_exceeded"
ReasonBudgetMonthlyCapExceeded = "budget.monthly_cap_exceeded"
ReasonBudgetMonthlyThresholdExceeded = "budget.monthly_threshold_exceeded"
ReasonBudgetDailySpikeDetected = "budget.daily_spike_detected"
ReasonBudgetRateLimitExceeded = "budget.rate_limit_exceeded"
ReasonBudgetRateLimitThrottled = "budget.rate_limit_throttled"
ReasonBudgetPricingUnavailable = "budget.pricing_unavailable"
ReasonPolicyModelNotAllowed = "policy.model_not_allowed"
ReasonPolicyRuleDenied = "policy.rule_denied"
ReasonPolicyReviewRequired = "policy.review_required"
ReasonWorkflowDeclarationExceedsBudgetCap = "workflow_intent.declaration_exceeds_budget_cap"
ReasonWorkflowMaxCallsExceeded = "workflow_intent.max_calls_exceeded"
ReasonWorkflowExpectedCallsExceeded = "workflow_intent.expected_calls_exceeded"
ReasonWorkflowUnknownOrInactive = "workflow_intent.unknown_or_inactive"
ReasonWorkflowIdempotencyConflict = "workflow_intent.idempotency_conflict"
ReasonWorkflowAmendmentVersionConflict = "workflow_intent.amendment_version_conflict"
)

type reasonCodeError string

func (e reasonCodeError) Error() string {
return string(e)
}

func (e reasonCodeError) reasonCode() string {
return string(e)
}

type reasonCodeMatcher interface {
reasonCode() string
}

var (
ErrWorkflowMaxCallsExceeded = reasonCodeError(ReasonWorkflowMaxCallsExceeded)
ErrWorkflowUnknownOrInactive = reasonCodeError(ReasonWorkflowUnknownOrInactive)
ErrWorkflowDeclarationExceedsBudgetCap = reasonCodeError(ReasonWorkflowDeclarationExceedsBudgetCap)
ErrWorkflowIdempotencyConflict = reasonCodeError(ReasonWorkflowIdempotencyConflict)
ErrWorkflowAmendmentVersionConflict = reasonCodeError(ReasonWorkflowAmendmentVersionConflict)
)

func matchesReasonCode(reasonCode string, target error) bool {
matcher, ok := target.(reasonCodeMatcher)
return ok && reasonCode != "" && reasonCode == matcher.reasonCode()
}

// KeelError represents an error response from the Keel API.
type KeelError struct {
Status int `json:"status"`
Expand All @@ -37,6 +70,11 @@ func (e *KeelError) Error() string {
return fmt.Sprintf("keel: %d %s: %s", e.Status, e.Code, e.Message)
}

// Is allows workflow reason-code sentinels to match Keel API errors.
func (e *KeelError) Is(target error) bool {
return matchesReasonCode(e.Code, target)
}

// IsRetryable returns true if the error status code indicates the request can be retried.
func (e *KeelError) IsRetryable() bool {
switch e.Status {
Expand Down Expand Up @@ -65,6 +103,11 @@ func (e *ThrottledError) Error() string {
return fmt.Sprintf("keel: 429 throttled: %s (retry after %ds)", msg, e.RetryAfterSeconds)
}

// Is allows workflow reason-code sentinels to match throttled API errors.
func (e *ThrottledError) Is(target error) bool {
return matchesReasonCode(e.ReasonCode, target)
}

// IsRetryable returns true. A throttled error is always retryable after the indicated delay.
func (e *ThrottledError) IsRetryable() bool {
return true
Expand Down
102 changes: 102 additions & 0 deletions examples/workflows/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package main

import (
"context"
"fmt"
"log"
"os"
"time"

keel "github.com/keelapi/keel-go"
)

func intPtr(v int) *int {
return &v
}

func stringPtr(v string) *string {
return &v
}

func main() {
client := keel.NewClient(keel.ClientConfig{
BaseURL: os.Getenv("KEEL_BASE_URL"),
APIKey: os.Getenv("KEEL_API_KEY"),
})

ctx := context.Background()
workflowID := fmt.Sprintf("invoice-batch-%d", time.Now().Unix())

declaration, err := client.Workflows.Declare(ctx, keel.WorkflowDeclareRequest{
WorkflowID: workflowID,
Intent: keel.WorkflowIntent{
ExpectedCalls: intPtr(2),
MaxCalls: intPtr(4),
ExpectedModel: stringPtr("gpt-4o-mini"),
ExpectedInputTokensPerCall: intPtr(500),
ExpectedOutputTokensPerCall: intPtr(200),
MaxDurationSeconds: intPtr(3600),
},
})
if err != nil {
log.Fatal(err)
}
fmt.Printf("Workflow %s declared: %s\n", declaration.WorkflowID, declaration.Decision)

err = keel.RunInWorkflow(ctx, workflowID, func(ctx context.Context) error {
for i := 0; i < 2; i++ {
permit, err := client.Permits.Create(ctx, keel.PermitRequest{
ProjectID: os.Getenv("KEEL_PROJECT_ID"),
IdempotencyKey: fmt.Sprintf("%s-permit-%d", workflowID, i),
Subject: keel.Subject{Type: "service", ID: "invoice-worker"},
Action: keel.Action{Name: string(keel.OpGenerateText)},
Resource: keel.Resource{
Type: "ai_model",
ID: "gpt-4o-mini",
Attributes: keel.ResourceAttributes{
Provider: string(keel.ProviderOpenAI),
Model: "gpt-4o-mini",
Operation: keel.OpGenerateText,
EstimatedInputTokens: 500,
EstimatedOutputTokens: 200,
},
},
})
if err != nil {
return err
}
fmt.Printf("Permit %s: %s\n", permit.PermitID, permit.Decision)
}
return nil
})
if err != nil {
log.Fatal(err)
}

version := 1
if declaration.Version != nil {
version = *declaration.Version
}
amended, err := client.Workflows.Amend(ctx, workflowID, keel.WorkflowAmendRequest{
IfMatchVersion: version,
NewMaxCalls: intPtr(6),
ReasonProvided: stringPtr("invoice volume increased"),
})
if err != nil {
log.Fatal(err)
}
fmt.Printf("Workflow %s amended to version %d\n", amended.WorkflowID, valueOrZero(amended.Version))

completed, err := client.Workflows.Complete(ctx, workflowID)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Workflow %s completed with %d calls\n", completed.WorkflowID, completed.AuthoritativeActualCalls)
}

func valueOrZero(v *int) int {
if v == nil {
return 0
}
return *v
}
4 changes: 4 additions & 0 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ func (t *httpTransport) newRequest(ctx context.Context, method, path string, bod
req.Header.Set("Authorization", "Bearer "+t.apiKey)
req.Header.Set("Content-Type", "application/json")

if workflowID, ok := WorkflowFromContext(ctx); ok {
req.Header.Set(workflowIDHeader, workflowID)
}

if t.freshness {
req.Header.Set("X-Keel-Timestamp", strconv.FormatInt(time.Now().Unix(), 10))
nonce := make([]byte, 16)
Expand Down
4 changes: 4 additions & 0 deletions keel.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ type Client struct {

// Requests provides request timeline inspection.
Requests *RequestsClient

// Workflows manages caller-declared workflow intent.
Workflows *WorkflowsClient
}

// NewClient creates a new Keel client with the given configuration.
Expand Down Expand Up @@ -92,6 +95,7 @@ func NewClient(config ClientConfig) *Client {
c.Jobs = &JobsClient{t: t}
c.ApiKeys = &ApiKeysClient{t: t}
c.Requests = &RequestsClient{t: t}
c.Workflows = &WorkflowsClient{t: t}

return c
}
1 change: 0 additions & 1 deletion keel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,6 @@ func TestJobsCreateAndGet(t *testing.T) {
}
}


func TestRequestsTimeline(t *testing.T) {
c, _ := testServer(t, func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/v1/requests/req_123/timeline" {
Expand Down
24 changes: 12 additions & 12 deletions providers/anthropic/anthropic.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ import (

// Config configures the Anthropic-compatible Keel client.
type Config struct {
APIKey string
KeelBaseURL string
KeelAPIKey string
KeelProjectID string
KeelSubject *keel.PermitSubject
Timeout time.Duration
MaxRetries int
APIKey string
KeelBaseURL string
KeelAPIKey string
KeelProjectID string
KeelSubject *keel.PermitSubject
Timeout time.Duration
MaxRetries int
}

func (c *Config) resolve() {
Expand Down Expand Up @@ -81,11 +81,11 @@ func NewClient(cfg Config) *Client {

// MessageCreateParams are the parameters for creating a message.
type MessageCreateParams struct {
Model string `json:"model"`
MaxTokens int `json:"max_tokens"`
Messages []MessageParam `json:"messages"`
System *string `json:"system,omitempty"`
Extra map[string]any `json:"extra,omitempty"`
Model string `json:"model"`
MaxTokens int `json:"max_tokens"`
Messages []MessageParam `json:"messages"`
System *string `json:"system,omitempty"`
Extra map[string]any `json:"extra,omitempty"`
}

// MessageParam represents a message in the conversation.
Expand Down
15 changes: 7 additions & 8 deletions providers/google/google.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,18 @@ import (
"os"
"time"


keel "github.com/keelapi/keel-go"
)

// Config configures the Google-compatible Keel client.
type Config struct {
APIKey string
KeelBaseURL string
KeelAPIKey string
KeelProjectID string
KeelSubject *keel.PermitSubject
Timeout time.Duration
MaxRetries int
APIKey string
KeelBaseURL string
KeelAPIKey string
KeelProjectID string
KeelSubject *keel.PermitSubject
Timeout time.Duration
MaxRetries int
}

func (c *Config) resolve() {
Expand Down
Loading
Loading