Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
d4585ff
Add running jobs pipeline fetcher scaffold
r4victor Mar 11, 2026
88a7de1
Prototype job pipeline for provisioning and pullings states
r4victor Mar 11, 2026
6ae8dfc
Treat job_model as read-on;y
r4victor Mar 11, 2026
22e724d
Finish running jobs pipeline worker
r4victor Mar 12, 2026
d5c3551
Wire pipeline
r4victor Mar 12, 2026
f392d53
Restore TODOs and simplifify code
r4victor Mar 12, 2026
0d1e4d9
Set TERMINATED_DUE_TO_UTILIZATION_POLICY
r4victor Mar 12, 2026
8053e59
Set INACTIVITY_DURATION_EXCEEDED
r4victor Mar 12, 2026
0a0f09e
Extract _handle_instance_unreachable
r4victor Mar 12, 2026
0e6c123
Unify jobs pipelines patterns
r4victor Mar 12, 2026
748077c
Add context and apply to fleet pipeline
r4victor Mar 12, 2026
0fdafde
Describe Typical worker structure
r4victor Mar 12, 2026
c54c002
Move unlock/processed updates inside _apply_process_result
r4victor Mar 12, 2026
f6987ec
Add FIXME: Race condition when checking len(fleet_model.instances) == 0
r4victor Mar 12, 2026
9b0b298
Fix stale fleet_model read
r4victor Mar 12, 2026
8bdba43
Clean up pipeline tests
r4victor Mar 12, 2026
b29dda0
Fix empty fleet select
r4victor Mar 12, 2026
444c5fa
Fix missing az restriction for clusters in submitted_jobs
r4victor Mar 12, 2026
0923340
Add deprecated note
r4victor Mar 12, 2026
4f9eb93
Merge branch 'master' into issue_3551_running_jobs_pipeline_2
r4victor Mar 12, 2026
9d8c6f2
Pass instance_project_ssh_private_key
r4victor Mar 12, 2026
fdd38dd
Fix missing pipeline tests
r4victor Mar 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions contributing/PIPELINES.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,55 @@ Brief checklist for implementing a new pipeline:
8. Register the pipeline in `PipelineManager` and hint fetch from services after commit via `pipeline_hinter.hint_fetch(Model.__name__)`.
9. Add minimum tests: fetch eligibility/order, successful unlock path, stale lock token path, and related lock contention retry path.

## Typical worker structure

Most workers are easiest to reason about when `process()` is split into three phases:

1. Load/refetch: open a short DB session, refetch the locked main row by `id + lock_token`, lock any required related rows, and gather any extra data needed for processing.
2. Process: do the heavy work outside DB sessions and build result objects or update maps instead of mutating detached ORM models.
3. Apply: open a short DB session, guard the main update by `id + lock_token`, resolve time placeholders, apply related updates, emit events, and unlock rows.

A dedicated context object is often useful for the load step when the worker needs multiple loaded models, related lock metadata, or derived values that should be passed cleanly into processing and apply. For very small pipelines, a direct load -> process -> apply flow may still be clearer.

Workers can share one context type and one apply function across all states even if the processing logic differs by state:

```python
async def process(item):
context = await _load_process_context(item)
if context is None:
return
result = await _process_item(context)
await _apply_process_result(item, context, result)
```

Sometimes state-specific helpers are still the cleanest option, but they can still share a common apply phase if all states write results in the same general shape:

```python
async def process(item):
if item.status == Status.PENDING:
context = await _load_pending_context(item)
elif item.status == Status.RUNNING:
context = await _load_running_context(item)
else:
return
if context is None:
return
result = await _process_item(context)
await _apply_process_result(item, context, result)
```

If different states have materially different write-side behavior, different apply paths are fine as well. This commonly happens when one state does a normal guarded update while another does delete-or-cleanup work with different related updates:

```python
async def process(item):
if item.to_be_deleted:
await _process_to_be_deleted_item(item)
elif item.status == Status.SUBMITTED:
await _process_submitted_item(item)
```

It's ok not to force all pipelines into one exact shape.

## Implementation patterns

**Guarded apply by lock token**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from dstack._internal.server.background.pipeline_tasks.fleets import FleetPipeline
from dstack._internal.server.background.pipeline_tasks.gateways import GatewayPipeline
from dstack._internal.server.background.pipeline_tasks.instances import InstancePipeline
from dstack._internal.server.background.pipeline_tasks.jobs_running import JobRunningPipeline
from dstack._internal.server.background.pipeline_tasks.jobs_terminating import (
JobTerminatingPipeline,
)
Expand All @@ -23,6 +24,7 @@ def __init__(self) -> None:
ComputeGroupPipeline(),
FleetPipeline(),
GatewayPipeline(),
JobRunningPipeline(),
JobTerminatingPipeline(),
InstancePipeline(),
PlacementGroupPipeline(),
Expand Down
272 changes: 155 additions & 117 deletions src/dstack/_internal/server/background/pipeline_tasks/fleets.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,106 +196,23 @@ def __init__(

@sentry_utils.instrument_named_task("pipeline_tasks.FleetWorker.process")
async def process(self, item: PipelineItem):
async with get_session_ctx() as session:
res = await session.execute(
select(FleetModel)
.where(
FleetModel.id == item.id,
FleetModel.lock_token == item.lock_token,
)
.options(joinedload(FleetModel.project))
.options(
selectinload(FleetModel.instances.and_(InstanceModel.deleted == False))
.joinedload(InstanceModel.jobs)
.load_only(JobModel.id),
)
.options(
selectinload(
FleetModel.runs.and_(RunModel.status.not_in(RunStatus.finished_statuses()))
).load_only(RunModel.status)
)
)
fleet_model = res.unique().scalar_one_or_none()
if fleet_model is None:
log_lock_token_mismatch(logger, item)
return

# Lock instance only if consolidation is needed.
locked_instance_ids: set[uuid.UUID] = set()
consolidation_fleet_spec = _get_fleet_spec_if_ready_for_consolidation(fleet_model)
consolidation_instances = None
if consolidation_fleet_spec is not None:
consolidation_instances = await _lock_fleet_instances_for_consolidation(
session=session,
item=item,
)
if consolidation_instances is None:
return
locked_instance_ids = {instance.id for instance in consolidation_instances}

process_context = await _load_process_context(item)
if process_context is None:
return
result = await _process_fleet(
fleet_model,
consolidation_fleet_spec=consolidation_fleet_spec,
consolidation_instances=consolidation_instances,
)
fleet_update_map = _FleetUpdateMap()
fleet_update_map.update(result.fleet_update_map)
set_processed_update_map_fields(fleet_update_map)
set_unlock_update_map_fields(fleet_update_map)
instance_update_rows = _build_instance_update_rows(
result.instance_id_to_update_map,
unlock_instance_ids=locked_instance_ids,
process_context.fleet_model,
consolidation_fleet_spec=process_context.consolidation_fleet_spec,
consolidation_instances=process_context.consolidation_instances,
)
await _apply_process_result(item, process_context, result)

async with get_session_ctx() as session:
now = get_current_datetime()
resolve_now_placeholders(fleet_update_map, now=now)
resolve_now_placeholders(instance_update_rows, now=now)
res = await session.execute(
update(FleetModel)
.where(
FleetModel.id == fleet_model.id,
FleetModel.lock_token == fleet_model.lock_token,
)
.values(**fleet_update_map)
.returning(FleetModel.id)
)
updated_ids = list(res.scalars().all())
if len(updated_ids) == 0:
log_lock_token_changed_after_processing(logger, item)
if locked_instance_ids:
await _unlock_fleet_locked_instances(
session=session,
item=item,
locked_instance_ids=locked_instance_ids,
)
# TODO: Clean up fleet.
return

if fleet_update_map.get("deleted"):
await session.execute(
update(PlacementGroupModel)
.where(PlacementGroupModel.fleet_id == item.id)
.values(fleet_deleted=True)
)
if instance_update_rows:
await session.execute(
update(InstanceModel),
instance_update_rows,
)
if len(result.new_instance_creates) > 0:
await _create_missing_fleet_instances(
session=session,
fleet_model=fleet_model,
new_instance_creates=result.new_instance_creates,
)
emit_fleet_status_change_event(
session=session,
fleet_model=fleet_model,
old_status=fleet_model.status,
new_status=fleet_update_map.get("status", fleet_model.status),
status_message=fleet_update_map.get("status_message", fleet_model.status_message),
)

@dataclass
class _ProcessContext:
fleet_model: FleetModel
consolidation_fleet_spec: Optional[FleetSpec]
consolidation_instances: Optional[list[InstanceModel]]
locked_instance_ids: set[uuid.UUID] = field(default_factory=set)


class _FleetUpdateMap(ItemUpdateMap, total=False):
Expand All @@ -318,6 +235,83 @@ class _InstanceUpdateMap(ItemUpdateMap, total=False):
id: uuid.UUID


@dataclass
class _ProcessResult:
fleet_update_map: _FleetUpdateMap = field(default_factory=_FleetUpdateMap)
instance_id_to_update_map: dict[uuid.UUID, _InstanceUpdateMap] = field(default_factory=dict)
new_instance_creates: list["_NewInstanceCreate"] = field(default_factory=list)


class _NewInstanceCreate(TypedDict):
id: uuid.UUID
instance_num: int


@dataclass
class _MaintainNodesResult:
instance_id_to_update_map: dict[uuid.UUID, _InstanceUpdateMap] = field(default_factory=dict)
new_instance_creates: list[_NewInstanceCreate] = field(default_factory=list)
changes_required: bool = False

@property
def has_changes(self) -> bool:
return len(self.instance_id_to_update_map) > 0 or len(self.new_instance_creates) > 0


async def _load_process_context(item: PipelineItem) -> Optional[_ProcessContext]:
async with get_session_ctx() as session:
fleet_model = await _refetch_locked_fleet(session=session, item=item)
if fleet_model is None:
log_lock_token_mismatch(logger, item)
return None

consolidation_fleet_spec = _get_fleet_spec_if_ready_for_consolidation(fleet_model)
consolidation_instances = None
if consolidation_fleet_spec is not None:
consolidation_instances = await _lock_fleet_instances_for_consolidation(
session=session,
item=item,
)
if consolidation_instances is None:
return None

return _ProcessContext(
fleet_model=fleet_model,
consolidation_fleet_spec=consolidation_fleet_spec,
consolidation_instances=consolidation_instances,
locked_instance_ids=(
set()
if consolidation_instances is None
else {i.id for i in consolidation_instances}
),
)


async def _refetch_locked_fleet(
session: AsyncSession,
item: PipelineItem,
) -> Optional[FleetModel]:
res = await session.execute(
select(FleetModel)
.where(
FleetModel.id == item.id,
FleetModel.lock_token == item.lock_token,
)
.options(joinedload(FleetModel.project))
.options(
selectinload(FleetModel.instances.and_(InstanceModel.deleted == False))
.joinedload(InstanceModel.jobs)
.load_only(JobModel.id),
)
.options(
selectinload(
FleetModel.runs.and_(RunModel.status.not_in(RunStatus.finished_statuses()))
).load_only(RunModel.status)
)
)
return res.unique().scalar_one_or_none()


def _get_fleet_spec_if_ready_for_consolidation(fleet_model: FleetModel) -> Optional[FleetSpec]:
if fleet_model.status == FleetStatus.TERMINATING:
return None
Expand Down Expand Up @@ -398,27 +392,71 @@ async def _lock_fleet_instances_for_consolidation(
return locked_instance_models


@dataclass
class _ProcessResult:
fleet_update_map: _FleetUpdateMap = field(default_factory=_FleetUpdateMap)
instance_id_to_update_map: dict[uuid.UUID, _InstanceUpdateMap] = field(default_factory=dict)
new_instance_creates: list["_NewInstanceCreate"] = field(default_factory=list)


class _NewInstanceCreate(TypedDict):
id: uuid.UUID
instance_num: int


@dataclass
class _MaintainNodesResult:
instance_id_to_update_map: dict[uuid.UUID, _InstanceUpdateMap] = field(default_factory=dict)
new_instance_creates: list[_NewInstanceCreate] = field(default_factory=list)
changes_required: bool = False
async def _apply_process_result(
item: PipelineItem,
context: _ProcessContext,
result: "_ProcessResult",
) -> None:
fleet_update_map = _FleetUpdateMap()
fleet_update_map.update(result.fleet_update_map)
set_processed_update_map_fields(fleet_update_map)
set_unlock_update_map_fields(fleet_update_map)
instance_update_rows = _build_instance_update_rows(
result.instance_id_to_update_map,
unlock_instance_ids=context.locked_instance_ids,
)

@property
def has_changes(self) -> bool:
return len(self.instance_id_to_update_map) > 0 or len(self.new_instance_creates) > 0
async with get_session_ctx() as session:
now = get_current_datetime()
resolve_now_placeholders(fleet_update_map, now=now)
resolve_now_placeholders(instance_update_rows, now=now)
res = await session.execute(
update(FleetModel)
.where(
FleetModel.id == context.fleet_model.id,
FleetModel.lock_token == context.fleet_model.lock_token,
)
.values(**fleet_update_map)
.returning(FleetModel.id)
)
updated_ids = list(res.scalars().all())
if len(updated_ids) == 0:
log_lock_token_changed_after_processing(logger, item)
if context.locked_instance_ids:
await _unlock_fleet_locked_instances(
session=session,
item=item,
locked_instance_ids=context.locked_instance_ids,
)
# TODO: Clean up fleet.
return

if fleet_update_map.get("deleted"):
await session.execute(
update(PlacementGroupModel)
.where(PlacementGroupModel.fleet_id == context.fleet_model.id)
.values(fleet_deleted=True)
)
if instance_update_rows:
await session.execute(
update(InstanceModel),
instance_update_rows,
)
if len(result.new_instance_creates) > 0:
await _create_missing_fleet_instances(
session=session,
fleet_model=context.fleet_model,
new_instance_creates=result.new_instance_creates,
)
emit_fleet_status_change_event(
session=session,
fleet_model=context.fleet_model,
old_status=context.fleet_model.status,
new_status=fleet_update_map.get("status", context.fleet_model.status),
status_message=fleet_update_map.get(
"status_message", context.fleet_model.status_message
),
)


async def _process_fleet(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,7 @@ async def process(self, item: InstancePipelineItem):
process_context = await _process_terminating_item(item)
if process_context is None:
return
set_processed_update_map_fields(process_context.result.instance_update_map)
set_unlock_update_map_fields(process_context.result.instance_update_map)

await _apply_process_result(
item=item,
instance_model=process_context.instance_model,
Expand Down Expand Up @@ -376,6 +375,9 @@ async def _apply_process_result(
instance_model: InstanceModel,
result: ProcessResult,
) -> None:
set_processed_update_map_fields(result.instance_update_map)
set_unlock_update_map_fields(result.instance_update_map)

async with get_session_ctx() as session:
if result.health_check_create is not None:
session.add(InstanceHealthCheckModel(**result.health_check_create))
Expand Down
Loading
Loading