基本設計原則
1. 非同期を前提に
AdCP プロトコルは本質的に非同期です。処理は秒〜日単位でかかることがあります。 DO:- すべてのオペレーションを async/await で設計
- オペレーション状態を永続化
- オーケストレーター再起動に耐える
- 適切なタイムアウトを実装
- 即時完了を前提にしない
- 同期ブロッキング呼び出しを使わない
- 状態をメモリだけに置かない
- バックオフなしに無限リトライしない
2. ステータス駆動ロジック
Operations progress through standardized status values:Copy
TASK_STATUSES = {
"submitted", # Long-running (hours to days) - provide webhook or poll
"working", # Processing (< 120 seconds) - poll frequently
"input-required", # Need user input/approval - continue conversation
"completed", # Success - process results
"failed", # Error - handle appropriately
"canceled", # User canceled
"auth-required" # Need authentication
}
3. ステートマシン設計
Implement proper state machines aligned with AdCP task statuses:Copy
class OperationState(Enum):
# Local orchestrator states
REQUESTED = "requested"
CALLING_ADCP = "calling_adcp"
# AdCP task states (match server responses)
SUBMITTED = "submitted"
WORKING = "working"
INPUT_REQUIRED = "input_required"
COMPLETED = "completed"
FAILED = "failed"
CANCELED = "canceled"
# Valid state transitions
VALID_TRANSITIONS = {
"requested": ["calling_adcp"],
"calling_adcp": ["submitted", "working", "input_required", "completed", "failed"],
"submitted": ["working", "completed", "failed", "canceled"],
"working": ["completed", "failed", "input_required"],
"input_required": ["submitted", "working", "completed", "failed"]
}
オペレーションの追跡
永続ストレージ
すべてのオペレーションを詳細に追跡しながら保存します:Copy
class OperationTracker:
def __init__(self, db):
self.db = db
async def create_operation(self, operation_type, request_data, webhook_config=None):
operation = {
"id": str(uuid.uuid4()),
"type": operation_type,
"status": "requested",
"request": request_data,
"webhook_config": webhook_config,
"created_at": datetime.now(),
"updated_at": datetime.now(),
"task_id": None,
"context_id": None,
"result": None,
"error": None
}
await self.db.operations.insert_one(operation)
return operation["id"]
async def update_status(self, operation_id, status, **kwargs):
update = {
"status": status,
"updated_at": datetime.now()
}
update.update(kwargs)
await self.db.operations.update_one(
{"id": operation_id},
{"$set": update}
)
async def get_pending_operations(self):
"""Get all operations that need monitoring"""
return await self.db.operations.find({
"status": {"$in": ["submitted", "working", "input_required"]}
}).to_list(length=None)
状態の再同期
起動時にローカル状態をサーバーと同期します:Copy
async def reconcile_with_server(self, adcp_client):
"""Sync local state with server using tasks/list"""
server_tasks = await adcp_client.call('tasks/list', {
'filters': {'statuses': ['submitted', 'working', 'input_required']}
})
server_task_ids = {task['task_id'] for task in server_tasks['tasks']}
local_operations = await self.get_pending_operations()
local_task_ids = {op['task_id'] for op in local_operations if op['task_id']}
return {
'orphaned_on_server': server_task_ids - local_task_ids,
'missing_from_server': local_task_ids - server_task_ids,
'total_pending_server': len(server_tasks['tasks']),
'total_pending_local': len(local_operations)
}
非同期オペレーションハンドラー
レスポンスの振り分け
ステータスに応じてレスポンスを処理します:Copy
class AsyncOperationHandler:
def __init__(self, adcp_client, tracker, notifier):
self.adcp = adcp_client
self.tracker = tracker
self.notifier = notifier
self.polling_tasks = {}
async def handle_operation_response(self, operation_id, response):
"""Handle any AdCP response with proper status routing"""
status = response.get("status")
# レスポンスの詳細でオペレーションを更新
await self.tracker.update_status(
operation_id,
status,
task_id=response.get("task_id"),
context_id=response.get("context_id"),
result=response.get("result") if status == "completed" else None,
error=response.get("error") if status == "failed" else None
)
# ステータスに応じて処理を振り分け
if status == "completed":
await self._handle_completed(operation_id, response)
elif status == "failed":
await self._handle_failed(operation_id, response)
elif status == "submitted":
await self._handle_submitted(operation_id, response)
elif status == "working":
await self._handle_working(operation_id, response)
elif status == "input_required":
await self._handle_input_required(operation_id, response)
Submitted(長時間)オペレーション
長時間の処理を扱います:Copy
async def _handle_submitted(self, operation_id, response):
"""Handle long-running operations"""
task_id = response["task_id"]
# Check if webhook is configured
operation = await self.tracker.get_operation(operation_id)
webhook_config = operation.get("webhook_config")
if webhook_config:
# Webhook will handle completion notification
await self.notifier.notify_submitted_with_webhook(operation_id, task_id)
else:
# Start polling for completion
polling_task = asyncio.create_task(
self._poll_for_completion(operation_id, task_id, interval=60)
)
self.polling_tasks[task_id] = polling_task
バックオフ付きポーリング
効率的なポーリングを実装します:Copy
async def _poll_for_completion(self, operation_id, task_id, interval=60):
"""Poll task status until completion"""
max_polls = 1440 if interval == 60 else 24 # 24 hours or 2 minutes
poll_count = 0
while poll_count < max_polls:
try:
await asyncio.sleep(interval)
poll_count += 1
task_response = await self.adcp.call('tasks/get', {
'task_id': task_id,
'include_result': True
})
await self.handle_operation_response(operation_id, task_response)
if task_response["status"] in ["completed", "failed", "canceled"]:
break
except Exception as e:
await self.tracker.update_status(
operation_id,
"failed",
error=f"Polling error: {str(e)}"
)
break
self.polling_tasks.pop(task_id, None)
if poll_count >= max_polls:
await self.tracker.update_status(
operation_id,
"failed",
error="Task polling timeout"
)
Webhook サポート
信頼性の高い Webhook ハンドラー
信頼性パターンを使って Webhook を実装します:Copy
class WebhookHandler:
def __init__(self, tracker, notifier, secret_key):
self.tracker = tracker
self.notifier = notifier
self.secret_key = secret_key
self.processed_events = {}
def verify_webhook_signature(self, payload: bytes, signature: str) -> bool:
"""Verify webhook authenticity"""
expected_signature = hmac.new(
self.secret_key.encode(),
payload,
hashlib.sha256
).hexdigest()
return signature == f"sha256={expected_signature}"
async def is_replay_attack(self, timestamp: str, event_id: str) -> bool:
"""Prevent replay attacks using timestamp and event ID"""
event_time = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
now = datetime.now()
if now - event_time > timedelta(minutes=5):
return True
return event_id in self.processed_events
Webhook とポーリングの併用(バックアップ)
Webhook のみに依存しないようにします:Copy
class ReliableWebhookOrchestrator:
def __init__(self):
self.webhook_timeout = timedelta(minutes=10)
self.backup_polling_delay = timedelta(minutes=2)
async def _handle_submitted_with_webhook(self, operation_id, task_id):
"""Handle submitted task with webhook + backup polling"""
async def backup_polling():
await asyncio.sleep(self.backup_polling_delay.total_seconds())
operation = await tracker.get_operation(operation_id)
if operation["status"] not in ["completed", "failed", "canceled"]:
logger.info(f"Starting backup polling for task {task_id}")
await self._poll_for_completion(operation_id, task_id, interval=60)
asyncio.create_task(backup_polling())
オーケストレーターの例
オーケストレーターの実装例:Copy
class AdCPOrchestrator:
def __init__(self):
self.adcp = AdCPClient()
self.tracker = OperationTracker(db)
self.handler = AsyncOperationHandler(self.adcp, self.tracker, UserNotifier())
self.webhook_base_url = "https://orchestrator.com/webhooks"
async def create_media_buy(self, user_id, request, enable_webhook=True):
"""Create a media buy with full async handling."""
# 1. Prepare webhook configuration
webhook_config = None
if enable_webhook:
webhook_config = {
"webhook_url": f"{self.webhook_base_url}/adcp/{user_id}",
"webhook_auth": {
"type": "bearer",
"credentials": await self.get_webhook_token(user_id)
}
}
# 2. Create operation record
operation_id = await self.tracker.create_operation(
"create_media_buy",
request,
webhook_config=webhook_config
)
try:
# 3. Call AdCP
response = await self.adcp.call("create_media_buy", request, webhook_config)
# 4. Handle response
await self.handler.handle_operation_response(operation_id, response)
# 5. Return appropriate response to user
return self._format_user_response(operation_id, response)
except Exception as e:
await self.tracker.update_status(operation_id, "failed", error=str(e))
raise
async def reconcile_state_on_startup(self):
"""Recover from orchestrator restart"""
reconciliation = await self.tracker.reconcile_with_server(self.adcp)
logger.info(f"State reconciliation: {reconciliation}")
for task_id in reconciliation["orphaned_on_server"]:
# Resume monitoring orphaned tasks
operation_id = await self.tracker.create_operation(
"unknown",
{},
status="submitted"
)
await self.tracker.update_status(operation_id, "submitted", task_id=task_id)
asyncio.create_task(
self.handler._poll_for_completion(operation_id, task_id)
)
Best Practices
1. Persistent Storage
Always use persistent storage for operation state:- Database (PostgreSQL, MongoDB)
- Message queue (Redis, RabbitMQ)
- Distributed cache (Redis Cluster)
2. Idempotency
Make all operations idempotent:Copy
async def create_media_buy_idempotent(self, request):
existing = await self.db.operations.find_one({
"type": "create_media_buy",
"request.po_number": request["po_number"],
"status": {"$in": ["created", "active"]}
})
if existing:
return existing["result"]
return await self.create_media_buy(request)
3. Timeout Handling
Implement reasonable timeouts:Copy
OPERATION_TIMEOUTS = {
"create_media_buy": timedelta(hours=24),
"update_media_buy": timedelta(hours=12),
"creative_approval": timedelta(hours=48)
}
4. Error Recovery
Implement retry logic with circuit breakers:Copy
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(min=1, max=60),
retry=retry_if_exception_type(TransientError)
)
async def call_adcp_api(self, tool, params):
try:
return await self.adcp.call(tool, params)
except RateLimitError:
raise TransientError("Rate limited")
except NetworkError:
raise TransientError("Network error")
5. Monitoring and Alerting
Track key metrics:- Pending operation count by type
- Average approval time
- Rejection rate
- Task timeout rate
- API error rate
User Communication
Keep users informed about pending operations:Copy
class UserNotifier:
async def notify_pending_approval(self, user_id, operation):
message = {
"type": "pending_approval",
"operation_id": operation["id"],
"message": "Your media buy requires publisher approval",
"estimated_time": "2-4 hours"
}
await self.send_notification(user_id, message)
async def notify_approval(self, user_id, operation):
message = {
"type": "operation_approved",
"operation_id": operation["id"],
"message": "Your media buy has been approved",
"media_buy_id": operation["result"]["media_buy_id"]
}
await self.send_notification(user_id, message)
Summary
Building a robust AdCP orchestrator requires:- Asynchronous design throughout
- Proper state management with persistence
- Graceful handling of pending states
- User communication for long-running operations
- Monitoring and observability
Next Steps
- Task Lifecycle: See Task Lifecycle for status handling
- Webhooks: See Webhooks for push notifications
- Security: See Security for multi-tenant security