Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 57 additions & 11 deletions src/conductor/client/automator/async_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from conductor.client.http.models.task_result import TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.http.models.schema_def import SchemaDef, SchemaType
from conductor.client.http.rest import AuthorizationException
from conductor.client.http.rest import ApiException, AuthorizationException
from conductor.client.orkes.orkes_metadata_client import OrkesMetadataClient
from conductor.client.orkes.orkes_schema_client import OrkesSchemaClient
from conductor.client.telemetry.metrics_collector import MetricsCollector
Expand Down Expand Up @@ -111,6 +111,7 @@ def __init__(
# Semaphore will be created in run() within the event loop
self._semaphore = None
self._shutdown = False # Flag to indicate graceful shutdown
self._v2_available = True # Tracks whether server supports update-task-v2

async def run(self) -> None:
"""Main async loop - runs continuously in single event loop."""
Expand Down Expand Up @@ -566,11 +567,15 @@ async def __async_execute_and_update_task(self, task: Task) -> None:
"""Execute task and update result in a tight loop (async version).

Uses the v2 update endpoint which returns the next task to process.
Loops: execute -> update_v2 (get next task) -> execute -> ...
The semaphore is held for the entire loop duration, keeping the slot occupied.
Loops: execute → update_v2 (get next task) → execute → …

The semaphore is held for the entire loop. This is correct because
``_running_tasks`` (which gates polling) tracks the *coroutine*, not
individual tasks — releasing the semaphore mid-loop would not allow
new coroutines to be created (the capacity gate would still block).
For async workers a slow ``await`` naturally yields to the event loop,
so other coroutines make progress regardless.
"""
# Acquire semaphore for entire task lifecycle (execution + update)
# This ensures we never exceed thread_count tasks in any stage of processing
async with self._semaphore:
try:
while task is not None and not self._shutdown:
Expand Down Expand Up @@ -793,7 +798,10 @@ def __merge_context_modifications(self, task_result: TaskResult, context_result:
task_result.output_data = context_result.output_data

async def __async_update_task(self, task_result: TaskResult):
"""Async update task result using v2 endpoint. Returns the next Task to process, or None."""
"""Update task result. Uses v2 endpoint if available, falls back to v1 otherwise.

v2 returns the next Task to process (tight loop). v1 returns None (poll-based).
"""
if not isinstance(task_result, TaskResult):
return None
task_definition_name = self.worker.get_task_definition_name()
Expand All @@ -815,15 +823,53 @@ async def __async_update_task(self, task_result: TaskResult):
# Exponential backoff: [10s, 20s, 30s] before retry
await asyncio.sleep(attempt * 10)
try:
next_task = await self.async_task_client.update_task_v2(body=task_result)
logger.debug(
"Updated async task (v2), id: %s, workflow_instance_id: %s, task_definition_name: %s, next_task: %s",
if self._v2_available:
next_task = await self.async_task_client.update_task_v2(body=task_result)
logger.debug(
"Updated async task (v2), id: %s, workflow_instance_id: %s, task_definition_name: %s, next_task: %s",
task_result.task_id,
task_result.workflow_instance_id,
task_definition_name,
next_task.task_id if next_task else None
)
return next_task
else:
await self.async_task_client.update_task(body=task_result)
logger.debug(
"Updated async task (v1), id: %s, workflow_instance_id: %s, task_definition_name: %s",
task_result.task_id,
task_result.workflow_instance_id,
task_definition_name,
)
return None
except ApiException as e:
if self._v2_available and e.status in (404, 501):
logger.warning(
"update-task-v2 not supported by server (HTTP %s), falling back to v1 for task_definition: %s",
e.status, task_definition_name
)
self._v2_available = False
# Immediately retry this attempt with v1
try:
await self.async_task_client.update_task(body=task_result)
return None
except Exception as fallback_err:
last_exception = fallback_err
continue
last_exception = e
if self.metrics_collector is not None:
self.metrics_collector.increment_task_update_error(
task_definition_name, type(e)
)
logger.error(
"Failed to update async task (attempt %d/%d), id: %s, workflow_instance_id: %s, task_definition_name: %s, reason: %s",
attempt + 1,
retry_count,
task_result.task_id,
task_result.workflow_instance_id,
task_definition_name,
next_task.task_id if next_task else None
traceback.format_exc()
)
return next_task
except Exception as e:
last_exception = e
if self.metrics_collector is not None:
Expand Down
66 changes: 54 additions & 12 deletions src/conductor/client/automator/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from conductor.client.http.models.task_result import TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.http.models.schema_def import SchemaDef, SchemaType
from conductor.client.http.rest import AuthorizationException
from conductor.client.http.rest import ApiException, AuthorizationException
from conductor.client.orkes.orkes_metadata_client import OrkesMetadataClient
from conductor.client.orkes.orkes_schema_client import OrkesSchemaClient
from conductor.client.telemetry.metrics_collector import MetricsCollector
Expand Down Expand Up @@ -92,6 +92,7 @@ def __init__(
self._last_poll_time = 0 # Track last poll to avoid excessive polling when queue is empty
self._consecutive_empty_polls = 0 # Track empty polls to implement backoff
self._shutdown = False # Flag to indicate graceful shutdown
self._v2_available = True # Tracks whether server supports update-task-v2

def run(self) -> None:
if self.configuration is not None:
Expand Down Expand Up @@ -506,18 +507,18 @@ def __execute_and_update_task(self, task: Task) -> None:
"""Execute task and update result in a tight loop (runs in thread pool).

Uses the v2 update endpoint which returns the next task to process.
Loops: execute -> update_v2 (get next task) -> execute -> ...
The loop breaks when no next task is available, the task is async/in-progress,
or shutdown is requested.
Loops: execute update_v2 (get next task) execute → …
The loop breaks when no next task is available, the task is async /
in-progress, or shutdown is requested.
"""
try:
while task is not None and not self._shutdown:
task_result = self.__execute_task(task)
# If task returned None, it's an async task running in background - don't update yet
# If task returned None, it's an async task running in background
if task_result is None:
logger.debug("Task %s is running async, will update when complete", task.task_id)
return
# If task returned TaskInProgress, it's running async - don't update yet
# If task returned TaskInProgress, it's running async
if isinstance(task_result, TaskInProgress):
logger.debug("Task %s is in progress, will update when complete", task.task_id)
return
Expand Down Expand Up @@ -824,7 +825,10 @@ def __merge_context_modifications(self, task_result: TaskResult, context_result:
task_result.output_data = context_result.output_data

def __update_task(self, task_result: TaskResult):
"""Update task result using v2 endpoint. Returns the next Task to process, or None."""
"""Update task result. Uses v2 endpoint if available, falls back to v1 otherwise.

v2 returns the next Task to process (tight loop). v1 returns None (poll-based).
"""
if not isinstance(task_result, TaskResult):
return None
task_definition_name = self.worker.get_task_definition_name()
Expand All @@ -845,15 +849,53 @@ def __update_task(self, task_result: TaskResult):
# Exponential backoff: [10s, 20s, 30s] before retry
time.sleep(attempt * 10)
try:
next_task = self.task_client.update_task_v2(body=task_result)
logger.debug(
"Updated task (v2), id: %s, workflow_instance_id: %s, task_definition_name: %s, next_task: %s",
if self._v2_available:
next_task = self.task_client.update_task_v2(body=task_result)
logger.debug(
"Updated task (v2), id: %s, workflow_instance_id: %s, task_definition_name: %s, next_task: %s",
task_result.task_id,
task_result.workflow_instance_id,
task_definition_name,
next_task.task_id if next_task else None
)
return next_task
else:
self.task_client.update_task(body=task_result)
logger.debug(
"Updated task (v1), id: %s, workflow_instance_id: %s, task_definition_name: %s",
task_result.task_id,
task_result.workflow_instance_id,
task_definition_name,
)
return None
except ApiException as e:
if self._v2_available and e.status in (404, 501):
logger.warning(
"update-task-v2 not supported by server (HTTP %s), falling back to v1 for task_definition: %s",
e.status, task_definition_name
)
self._v2_available = False
# Immediately retry this attempt with v1
try:
self.task_client.update_task(body=task_result)
return None
except Exception as fallback_err:
last_exception = fallback_err
continue
last_exception = e
if self.metrics_collector is not None:
self.metrics_collector.increment_task_update_error(
task_definition_name, type(e)
)
logger.error(
"Failed to update task (attempt %d/%d), id: %s, workflow_instance_id: %s, task_definition_name: %s, reason: %s",
attempt + 1,
retry_count,
task_result.task_id,
task_result.workflow_instance_id,
task_definition_name,
next_task.task_id if next_task else None
traceback.format_exc()
)
return next_task
except Exception as e:
last_exception = e
if self.metrics_collector is not None:
Expand Down
6 changes: 3 additions & 3 deletions tests/integration/test_update_task_v2_perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,17 @@
# ---------------------------------------------------------------------------

@worker_task(task_definition_name="perf_type_a", thread_count=WORKER_THREADS, register_task_def=True)
def perf_worker_a(task_index: int = 0) -> dict:
async def perf_worker_a(task_index: int = 0) -> dict:
return {"worker": "perf_type_a", "task_index": task_index}


@worker_task(task_definition_name="perf_type_b", thread_count=WORKER_THREADS, register_task_def=True)
def perf_worker_b(task_index: int = 0) -> dict:
async def perf_worker_b(task_index: int = 0) -> dict:
return {"worker": "perf_type_b", "task_index": task_index}


@worker_task(task_definition_name="perf_type_c", thread_count=WORKER_THREADS, register_task_def=True)
def perf_worker_c(task_index: int = 0) -> dict:
async def perf_worker_c(task_index: int = 0) -> dict:
return {"worker": "perf_type_c", "task_index": task_index}


Expand Down
Loading
Loading