Skip to main content
このガイドは、非同期オペレーションや保留状態、Human-in-the-Loop を扱う AdCP オーケストレーターのベストプラクティスをまとめています。

基本設計原則

1. 非同期を前提に

AdCP プロトコルは本質的に非同期です。処理は秒〜日単位でかかることがあります。 DO:
  • すべてのオペレーションを async/await で設計
  • オペレーション状態を永続化
  • オーケストレーター再起動に耐える
  • 適切なタイムアウトを実装
DON’T:
  • 即時完了を前提にしない
  • 同期ブロッキング呼び出しを使わない
  • 状態をメモリだけに置かない
  • バックオフなしに無限リトライしない

2. ステータス駆動ロジック

Operations progress through standardized status values:
TASK_STATUSES = {
    "submitted",      # Long-running (hours to days) - provide webhook or poll
    "working",        # Processing (< 120 seconds) - poll frequently
    "input-required", # Need user input/approval - continue conversation
    "completed",      # Success - process results
    "failed",         # Error - handle appropriately
    "canceled",       # User canceled
    "auth-required"   # Need authentication
}

3. ステートマシン設計

Implement proper state machines aligned with AdCP task statuses:
class OperationState(Enum):
    # Local orchestrator states
    REQUESTED = "requested"
    CALLING_ADCP = "calling_adcp"

    # AdCP task states (match server responses)
    SUBMITTED = "submitted"
    WORKING = "working"
    INPUT_REQUIRED = "input_required"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELED = "canceled"

# Valid state transitions
VALID_TRANSITIONS = {
    "requested": ["calling_adcp"],
    "calling_adcp": ["submitted", "working", "input_required", "completed", "failed"],
    "submitted": ["working", "completed", "failed", "canceled"],
    "working": ["completed", "failed", "input_required"],
    "input_required": ["submitted", "working", "completed", "failed"]
}

オペレーションの追跡

永続ストレージ

すべてのオペレーションを詳細に追跡しながら保存します:
class OperationTracker:
    def __init__(self, db):
        self.db = db

    async def create_operation(self, operation_type, request_data, webhook_config=None):
        operation = {
            "id": str(uuid.uuid4()),
            "type": operation_type,
            "status": "requested",
            "request": request_data,
            "webhook_config": webhook_config,
            "created_at": datetime.now(),
            "updated_at": datetime.now(),
            "task_id": None,
            "context_id": None,
            "result": None,
            "error": None
        }
        await self.db.operations.insert_one(operation)
        return operation["id"]

    async def update_status(self, operation_id, status, **kwargs):
        update = {
            "status": status,
            "updated_at": datetime.now()
        }
        update.update(kwargs)

        await self.db.operations.update_one(
            {"id": operation_id},
            {"$set": update}
        )

    async def get_pending_operations(self):
        """Get all operations that need monitoring"""
        return await self.db.operations.find({
            "status": {"$in": ["submitted", "working", "input_required"]}
        }).to_list(length=None)

状態の再同期

起動時にローカル状態をサーバーと同期します:
async def reconcile_with_server(self, adcp_client):
    """Sync local state with server using tasks/list"""
    server_tasks = await adcp_client.call('tasks/list', {
        'filters': {'statuses': ['submitted', 'working', 'input_required']}
    })

    server_task_ids = {task['task_id'] for task in server_tasks['tasks']}
    local_operations = await self.get_pending_operations()
    local_task_ids = {op['task_id'] for op in local_operations if op['task_id']}

    return {
        'orphaned_on_server': server_task_ids - local_task_ids,
        'missing_from_server': local_task_ids - server_task_ids,
        'total_pending_server': len(server_tasks['tasks']),
        'total_pending_local': len(local_operations)
    }

非同期オペレーションハンドラー

レスポンスの振り分け

ステータスに応じてレスポンスを処理します:
class AsyncOperationHandler:
    def __init__(self, adcp_client, tracker, notifier):
        self.adcp = adcp_client
        self.tracker = tracker
        self.notifier = notifier
        self.polling_tasks = {}

    async def handle_operation_response(self, operation_id, response):
        """Handle any AdCP response with proper status routing"""
        status = response.get("status")

        # レスポンスの詳細でオペレーションを更新
        await self.tracker.update_status(
            operation_id,
            status,
            task_id=response.get("task_id"),
            context_id=response.get("context_id"),
            result=response.get("result") if status == "completed" else None,
            error=response.get("error") if status == "failed" else None
        )

        # ステータスに応じて処理を振り分け
        if status == "completed":
            await self._handle_completed(operation_id, response)
        elif status == "failed":
            await self._handle_failed(operation_id, response)
        elif status == "submitted":
            await self._handle_submitted(operation_id, response)
        elif status == "working":
            await self._handle_working(operation_id, response)
        elif status == "input_required":
            await self._handle_input_required(operation_id, response)

Submitted(長時間)オペレーション

長時間の処理を扱います:
async def _handle_submitted(self, operation_id, response):
    """Handle long-running operations"""
    task_id = response["task_id"]

    # Check if webhook is configured
    operation = await self.tracker.get_operation(operation_id)
    webhook_config = operation.get("webhook_config")

    if webhook_config:
        # Webhook will handle completion notification
        await self.notifier.notify_submitted_with_webhook(operation_id, task_id)
    else:
        # Start polling for completion
        polling_task = asyncio.create_task(
            self._poll_for_completion(operation_id, task_id, interval=60)
        )
        self.polling_tasks[task_id] = polling_task

バックオフ付きポーリング

効率的なポーリングを実装します:
async def _poll_for_completion(self, operation_id, task_id, interval=60):
    """Poll task status until completion"""
    max_polls = 1440 if interval == 60 else 24  # 24 hours or 2 minutes
    poll_count = 0

    while poll_count < max_polls:
        try:
            await asyncio.sleep(interval)
            poll_count += 1

            task_response = await self.adcp.call('tasks/get', {
                'task_id': task_id,
                'include_result': True
            })

            await self.handle_operation_response(operation_id, task_response)

            if task_response["status"] in ["completed", "failed", "canceled"]:
                break

        except Exception as e:
            await self.tracker.update_status(
                operation_id,
                "failed",
                error=f"Polling error: {str(e)}"
            )
            break

    self.polling_tasks.pop(task_id, None)

    if poll_count >= max_polls:
        await self.tracker.update_status(
            operation_id,
            "failed",
            error="Task polling timeout"
        )

Webhook サポート

信頼性の高い Webhook ハンドラー

信頼性パターンを使って Webhook を実装します:
class WebhookHandler:
    def __init__(self, tracker, notifier, secret_key):
        self.tracker = tracker
        self.notifier = notifier
        self.secret_key = secret_key
        self.processed_events = {}

    def verify_webhook_signature(self, payload: bytes, signature: str) -> bool:
        """Verify webhook authenticity"""
        expected_signature = hmac.new(
            self.secret_key.encode(),
            payload,
            hashlib.sha256
        ).hexdigest()
        return signature == f"sha256={expected_signature}"

    async def is_replay_attack(self, timestamp: str, event_id: str) -> bool:
        """Prevent replay attacks using timestamp and event ID"""
        event_time = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
        now = datetime.now()

        if now - event_time > timedelta(minutes=5):
            return True

        return event_id in self.processed_events

Webhook とポーリングの併用(バックアップ)

Webhook のみに依存しないようにします:
class ReliableWebhookOrchestrator:
    def __init__(self):
        self.webhook_timeout = timedelta(minutes=10)
        self.backup_polling_delay = timedelta(minutes=2)

    async def _handle_submitted_with_webhook(self, operation_id, task_id):
        """Handle submitted task with webhook + backup polling"""

        async def backup_polling():
            await asyncio.sleep(self.backup_polling_delay.total_seconds())

            operation = await tracker.get_operation(operation_id)
            if operation["status"] not in ["completed", "failed", "canceled"]:
                logger.info(f"Starting backup polling for task {task_id}")
                await self._poll_for_completion(operation_id, task_id, interval=60)

        asyncio.create_task(backup_polling())

オーケストレーターの例

オーケストレーターの実装例:
class AdCPOrchestrator:
    def __init__(self):
        self.adcp = AdCPClient()
        self.tracker = OperationTracker(db)
        self.handler = AsyncOperationHandler(self.adcp, self.tracker, UserNotifier())
        self.webhook_base_url = "https://orchestrator.com/webhooks"

    async def create_media_buy(self, user_id, request, enable_webhook=True):
        """Create a media buy with full async handling."""

        # 1. Prepare webhook configuration
        webhook_config = None
        if enable_webhook:
            webhook_config = {
                "webhook_url": f"{self.webhook_base_url}/adcp/{user_id}",
                "webhook_auth": {
                    "type": "bearer",
                    "credentials": await self.get_webhook_token(user_id)
                }
            }

        # 2. Create operation record
        operation_id = await self.tracker.create_operation(
            "create_media_buy",
            request,
            webhook_config=webhook_config
        )

        try:
            # 3. Call AdCP
            response = await self.adcp.call("create_media_buy", request, webhook_config)

            # 4. Handle response
            await self.handler.handle_operation_response(operation_id, response)

            # 5. Return appropriate response to user
            return self._format_user_response(operation_id, response)

        except Exception as e:
            await self.tracker.update_status(operation_id, "failed", error=str(e))
            raise

    async def reconcile_state_on_startup(self):
        """Recover from orchestrator restart"""
        reconciliation = await self.tracker.reconcile_with_server(self.adcp)
        logger.info(f"State reconciliation: {reconciliation}")

        for task_id in reconciliation["orphaned_on_server"]:
            # Resume monitoring orphaned tasks
            operation_id = await self.tracker.create_operation(
                "unknown",
                {},
                status="submitted"
            )
            await self.tracker.update_status(operation_id, "submitted", task_id=task_id)
            asyncio.create_task(
                self.handler._poll_for_completion(operation_id, task_id)
            )

Best Practices

1. Persistent Storage

Always use persistent storage for operation state:
  • Database (PostgreSQL, MongoDB)
  • Message queue (Redis, RabbitMQ)
  • Distributed cache (Redis Cluster)

2. Idempotency

Make all operations idempotent:
async def create_media_buy_idempotent(self, request):
    existing = await self.db.operations.find_one({
        "type": "create_media_buy",
        "request.po_number": request["po_number"],
        "status": {"$in": ["created", "active"]}
    })

    if existing:
        return existing["result"]

    return await self.create_media_buy(request)

3. Timeout Handling

Implement reasonable timeouts:
OPERATION_TIMEOUTS = {
    "create_media_buy": timedelta(hours=24),
    "update_media_buy": timedelta(hours=12),
    "creative_approval": timedelta(hours=48)
}

4. Error Recovery

Implement retry logic with circuit breakers:
@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(min=1, max=60),
    retry=retry_if_exception_type(TransientError)
)
async def call_adcp_api(self, tool, params):
    try:
        return await self.adcp.call(tool, params)
    except RateLimitError:
        raise TransientError("Rate limited")
    except NetworkError:
        raise TransientError("Network error")

5. Monitoring and Alerting

Track key metrics:
  • Pending operation count by type
  • Average approval time
  • Rejection rate
  • Task timeout rate
  • API error rate

User Communication

Keep users informed about pending operations:
class UserNotifier:
    async def notify_pending_approval(self, user_id, operation):
        message = {
            "type": "pending_approval",
            "operation_id": operation["id"],
            "message": "Your media buy requires publisher approval",
            "estimated_time": "2-4 hours"
        }
        await self.send_notification(user_id, message)

    async def notify_approval(self, user_id, operation):
        message = {
            "type": "operation_approved",
            "operation_id": operation["id"],
            "message": "Your media buy has been approved",
            "media_buy_id": operation["result"]["media_buy_id"]
        }
        await self.send_notification(user_id, message)

Summary

Building a robust AdCP orchestrator requires:
  1. Asynchronous design throughout
  2. Proper state management with persistence
  3. Graceful handling of pending states
  4. User communication for long-running operations
  5. Monitoring and observability
Remember: Pending states are not errors - they’re a normal part of the advertising workflow.

Next Steps

  • Task Lifecycle: See Task Lifecycle for status handling
  • Webhooks: See Webhooks for push notifications
  • Security: See Security for multi-tenant security