Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,20 @@

All notable changes to cueapi-sdk will be documented here.

## [0.2.0] - 2026-05-01

### Added
- `client.cues.fire(cue_id, payload_override=None, merge_strategy=None)` for ad-hoc one-shot triggers and for using cues as a messaging channel between agents. Wraps `POST /v1/cues/{id}/fire`.
- `client.executions.list_claimable(task=None, agent=None)` for unclaimed worker-transport executions ready for processing. Filters server-side via task / agent query params; required for single-purpose workers.
- `client.executions.claim(execution_id, worker_id=...)` for atomic claim of a specific execution. Returns 409 if already claimed.
- `client.executions.claim_next(worker_id=..., task=None)` for the next available execution. Without task the server picks the oldest pending; with task the SDK fans out internally (list_claimable filtered, pick oldest, claim by ID) since the server's claim endpoint does not accept a task filter today.

### Changed
- `__version__` in `cueapi/__init__.py` had drifted to 0.1.2 while `pyproject.toml` was at 0.1.3. Both now aligned at 0.2.0.

### Pending follow-up
- `client.executions.heartbeat(execution_id)` currently sends an empty body and does NOT include `worker_id` via the `X-Worker-Id` request header that the server reads from. Worker-id is what the server uses to enforce ownership on the heartbeat (returns 403 on mismatch); without it the race-protection check is silently bypassed. A signature change to add `worker_id` is held pending technical review of the deprecation cadence.

## [0.1.0] - 2025-03-28

### Added
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<!-- Pipeline validation: 2026-04-04 -->
> **For AI agents:** This is the official Python SDK for CueAPI. Run `pip install cueapi-sdk`. Your API key is in `CUEAPI_API_KEY`. Create a client: `client = CueAPI(os.environ["CUEAPI_API_KEY"])`. Schedule a task: `client.cues.create(name="task-name", cron="0 9 * * *", callback="https://your-handler.com", payload={})`. For agents without a public URL: set `transport="worker"` and run `pip install cueapi-worker`, then define a handler with `@worker.handler("task-name")` returning `{"success": true}` or `{"success": false, "error": "reason"}`. List executions: `client.executions.list()`. Get one: `client.executions.get("exec_id")`. Do not retry on `AuthenticationError` or `CueLimitExceededError`. Wait `e.retry_after` seconds on `RateLimitError`.
> **For AI agents:** This is the official Python SDK for CueAPI. Run `pip install cueapi-sdk`. Your API key is in `CUEAPI_API_KEY`. Create a client: `client = CueAPI(os.environ["CUEAPI_API_KEY"])`. Schedule a task: `client.cues.create(name="task-name", cron="0 9 * * *", callback="https://your-handler.com", payload={})`. For agents without a public URL: set `transport="worker"` and run `pip install cueapi-worker`, then define a handler with `@worker.handler("task-name")` returning `{"success": true}` or `{"success": false, "error": "reason"}`. Fire an existing cue immediately: `client.cues.fire("cue_id", payload_override={"task": "x"})`. List unclaimed worker executions: `client.executions.list_claimable(task="task-name")`. Claim one: `client.executions.claim("exec_id", worker_id="my-worker")`. List executions: `client.executions.list()`. Get one: `client.executions.get("exec_id")`. Do not retry on `AuthenticationError` or `CueLimitExceededError`. Wait `e.retry_after` seconds on `RateLimitError`.

# cueapi-sdk

Expand Down
2 changes: 1 addition & 1 deletion cueapi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from cueapi.resources.executions import ExecutionsResource
from cueapi.webhook import verify_webhook

__version__ = "0.1.2"
__version__ = "0.2.0"

__all__ = [
"CueAPI",
Expand Down
33 changes: 33 additions & 0 deletions cueapi/resources/cues.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,36 @@ def resume(self, cue_id: str) -> Cue:
The updated Cue object.
"""
return self.update(cue_id, status="active")

def fire(
self,
cue_id: str,
*,
payload_override: Optional[Dict[str, Any]] = None,
merge_strategy: Optional[str] = None,
) -> Dict[str, Any]:
"""Fire an existing cue immediately, optionally overriding its payload.

For ad-hoc one-shot triggers and for using cues as a messaging channel
between agents (carry message/instruction/task/reply_cue_id in
payload_override).

Args:
cue_id: The cue ID to fire.
payload_override: Override the cue's default payload for this fire
only. Persisted on the resulting execution row, never on the
cue itself.
merge_strategy: How payload_override combines with the cue's stored
payload. "merge" (server default) shallow-merges with override
wins on key collisions. "replace" uses override as the final
payload, ignoring cue.payload.

Returns:
The execution dict (id, scheduled_for, status, etc.).
"""
body: Dict[str, Any] = {}
if payload_override is not None:
body["payload_override"] = payload_override
if merge_strategy is not None:
body["merge_strategy"] = merge_strategy
return self._client._post(f"/v1/cues/{cue_id}/fire", json=body)
90 changes: 90 additions & 0 deletions cueapi/resources/executions.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,96 @@ def get(self, execution_id: str) -> dict:
"""Get a single execution."""
return self._client._get(f"/v1/executions/{execution_id}")

def list_claimable(
self,
*,
task: Optional[str] = None,
agent: Optional[str] = None,
) -> dict:
"""List unclaimed worker-transport executions ready for processing.

Filters server-side via task / agent query params (NOT client-side).
Required for single-purpose workers; without a filter, sibling tasks
ahead in the LIMIT 50 window starve your handler.

Returns:
Dict with "executions" list, each item carrying execution_id,
cue_id, cue_name, task, scheduled_for, payload, attempt.
"""
params: Dict[str, Any] = {}
if task is not None:
params["task"] = task
if agent is not None:
params["agent"] = agent
return self._client._get("/v1/executions/claimable", params=params)

def claim(self, execution_id: str, *, worker_id: str) -> dict:
"""Atomically claim a specific worker-transport execution.

Conditional UPDATE WHERE status IN ('pending', 'retry_ready'); returns
409 if already claimed or not eligible. Response includes lease_seconds
(default 900s = 15 min); send heartbeat well before that to extend.

Args:
execution_id: Execution UUID.
worker_id: Stable identifier for this worker. Caller-defined, not
session/process-scoped. Same value must be used across
claim, heartbeat, and outcome calls so the server can enforce
ownership.

Returns:
Dict with claimed (bool), execution_id, lease_seconds.
"""
return self._client._post(
f"/v1/executions/{execution_id}/claim",
json={"worker_id": worker_id},
)

def claim_next(
self,
*,
worker_id: str,
task: Optional[str] = None,
) -> dict:
"""Claim the next available worker-transport execution.

Without task, the server picks the oldest pending across any of your
worker cues. With task, this method internally fans out (list_claimable
filtered, pick oldest, claim by ID) since the server's claim endpoint
does not accept a task filter today. Tiny race window between list and
claim is bounded by the atomic claim returning 409, in which case the
caller retries.

Args:
worker_id: Stable caller-defined identifier (see claim()).
task: Optional task filter.

Returns:
Dict with claimed (bool), execution_id, lease_seconds. When
task is set and no executions are claimable for that task,
returns {"claimed": False, "reason": "no_executions_for_task",
"task": <task>}.
"""
if task is not None:
listing = self._client._get(
"/v1/executions/claimable", params={"task": task}
)
execs = listing.get("executions") or []
if not execs:
return {
"claimed": False,
"reason": "no_executions_for_task",
"task": task,
}
next_id = execs[0].get("execution_id")
return self._client._post(
f"/v1/executions/{next_id}/claim",
json={"worker_id": worker_id},
)
return self._client._post(
"/v1/executions/claim", json={"worker_id": worker_id}
)

def heartbeat(self, execution_id: str) -> dict:
"""Send heartbeat to extend claim lease."""
return self._client._post(f"/v1/executions/{execution_id}/heartbeat", json={})
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "cueapi-sdk"
version = "0.1.3"
version = "0.2.0"
description = "Official Python SDK for CueAPI — open-source execution accountability primitive for AI agents. Schedule agent work, require evidence-backed outcomes, and gate execution with write-once verification."
readme = "README.md"
license = { text = "Apache-2.0" }
Expand Down
57 changes: 57 additions & 0 deletions tests/test_cues_resource.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""Unit tests for CuesResource methods that don't fit the staging-integration pattern."""

from unittest.mock import MagicMock

from cueapi.resources.cues import CuesResource


class TestFire:
def test_fire_no_payload_override(self):
mock_client = MagicMock()
mock_client._post.return_value = {"id": "exec_test", "status": "queued"}
resource = CuesResource(mock_client)

result = resource.fire("cue_abc123")

mock_client._post.assert_called_once_with("/v1/cues/cue_abc123/fire", json={})
assert result["id"] == "exec_test"

def test_fire_with_payload_override_only(self):
mock_client = MagicMock()
mock_client._post.return_value = {"id": "exec_test"}
resource = CuesResource(mock_client)

payload = {"task": "downstream", "scope": "single-row"}
resource.fire("cue_abc123", payload_override=payload)

mock_client._post.assert_called_once_with(
"/v1/cues/cue_abc123/fire",
json={"payload_override": payload},
)

def test_fire_with_payload_override_and_merge_strategy(self):
mock_client = MagicMock()
mock_client._post.return_value = {"id": "exec_test"}
resource = CuesResource(mock_client)

payload = {"run_id": "ad-hoc-001"}
resource.fire("cue_abc123", payload_override=payload, merge_strategy="replace")

mock_client._post.assert_called_once_with(
"/v1/cues/cue_abc123/fire",
json={"payload_override": payload, "merge_strategy": "replace"},
)

def test_fire_omits_merge_strategy_when_not_passed(self):
# When the caller omits merge_strategy, the wrapper must NOT send a
# client-side default. The server's Pydantic default of "merge"
# applies. This pins the contract so a future refactor can't silently
# start sending a strategy that overrides the server's choice.
mock_client = MagicMock()
mock_client._post.return_value = {"id": "exec_test"}
resource = CuesResource(mock_client)

resource.fire("cue_abc123", payload_override={"k": "v"})

sent_body = mock_client._post.call_args.kwargs["json"]
assert "merge_strategy" not in sent_body
140 changes: 140 additions & 0 deletions tests/test_executions_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,143 @@ def test_mark_verified(self):
mock_client._post.assert_called_once_with(
"/v1/executions/exec_123/verify", json={},
)


class TestListClaimable:
# Filtering MUST be server-side via query params, not client-side after
# fetch. Client-side filter hits the LIMIT 50 starvation bug fixed in the
# 2026-04-25 prod incident (see cueapi-core app/routers/executions.py
# docstring at line 122-131).

def test_list_claimable_no_filters_sends_no_params(self):
mock_client = MagicMock()
mock_client._get.return_value = {"executions": []}
resource = ExecutionsResource(mock_client)

resource.list_claimable()

mock_client._get.assert_called_once_with(
"/v1/executions/claimable", params={},
)

def test_list_claimable_passes_task_as_query_param(self):
mock_client = MagicMock()
mock_client._get.return_value = {"executions": []}
resource = ExecutionsResource(mock_client)

resource.list_claimable(task="cowork-workspace")

mock_client._get.assert_called_once_with(
"/v1/executions/claimable",
params={"task": "cowork-workspace"},
)

def test_list_claimable_passes_agent_as_query_param(self):
mock_client = MagicMock()
mock_client._get.return_value = {"executions": []}
resource = ExecutionsResource(mock_client)

resource.list_claimable(agent="writer-bot")

mock_client._get.assert_called_once_with(
"/v1/executions/claimable",
params={"agent": "writer-bot"},
)

def test_list_claimable_passes_both_task_and_agent(self):
mock_client = MagicMock()
mock_client._get.return_value = {"executions": []}
resource = ExecutionsResource(mock_client)

resource.list_claimable(task="t", agent="a")

mock_client._get.assert_called_once_with(
"/v1/executions/claimable",
params={"task": "t", "agent": "a"},
)


class TestClaim:
def test_claim_posts_to_specific_execution_with_worker_id_in_body(self):
mock_client = MagicMock()
mock_client._post.return_value = {
"claimed": True,
"execution_id": "exec_abc123",
"lease_seconds": 900,
}
resource = ExecutionsResource(mock_client)

result = resource.claim("exec_abc123", worker_id="cowork-workspace")

mock_client._post.assert_called_once_with(
"/v1/executions/exec_abc123/claim",
json={"worker_id": "cowork-workspace"},
)
assert result["claimed"] is True


class TestClaimNext:
# Two branches: with task and without. Without task is a single POST.
# With task is a fan-out (list_claimable filtered, pick first, claim by ID)
# because the server's POST /v1/executions/claim does not accept a task
# filter today.

def test_claim_next_without_task_sends_single_post(self):
mock_client = MagicMock()
mock_client._post.return_value = {
"claimed": True,
"execution_id": "exec_test",
"lease_seconds": 900,
}
resource = ExecutionsResource(mock_client)

resource.claim_next(worker_id="cowork-workspace")

mock_client._post.assert_called_once_with(
"/v1/executions/claim",
json={"worker_id": "cowork-workspace"},
)

def test_claim_next_with_task_fans_out_to_list_then_claim(self):
mock_client = MagicMock()
mock_client._get.return_value = {
"executions": [
{"execution_id": "exec_first"},
{"execution_id": "exec_second"},
],
}
mock_client._post.return_value = {
"claimed": True,
"execution_id": "exec_first",
"lease_seconds": 900,
}
resource = ExecutionsResource(mock_client)

result = resource.claim_next(
worker_id="cowork-workspace", task="cowork-workspace"
)

mock_client._get.assert_called_once_with(
"/v1/executions/claimable", params={"task": "cowork-workspace"},
)
mock_client._post.assert_called_once_with(
"/v1/executions/exec_first/claim",
json={"worker_id": "cowork-workspace"},
)
assert result["claimed"] is True

def test_claim_next_with_task_and_empty_list_returns_no_claim(self):
mock_client = MagicMock()
mock_client._get.return_value = {"executions": []}
resource = ExecutionsResource(mock_client)

result = resource.claim_next(
worker_id="cowork-workspace", task="no-such-task"
)

mock_client._post.assert_not_called()
assert result == {
"claimed": False,
"reason": "no_executions_for_task",
"task": "no-such-task",
}
Loading