From 26bb9fcf5d9685360f7d506fd8ab954d265e28b4 Mon Sep 17 00:00:00 2001 From: Jvst Me Date: Wed, 11 Mar 2026 16:13:04 +0100 Subject: [PATCH] Fix error when imported fleet has no capacity Avoid `fleet_model_to_fleet()`, since it requires the fleet project to be loaded from the database, which is not always the case for imported fleets. --- .../scheduled_tasks/submitted_jobs.py | 15 +- .../_internal/server/services/fleets.py | 16 +-- .../_internal/server/services/runs/plan.py | 23 +-- .../scheduled_tasks/test_submitted_jobs.py | 131 ++++++++++++++++++ 4 files changed, 158 insertions(+), 27 deletions(-) diff --git a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py index 729ded205..fc67e8024 100644 --- a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py +++ b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py @@ -81,9 +81,9 @@ from dstack._internal.server.services.backends import get_project_backend_by_type_or_error from dstack._internal.server.services.fleets import ( check_can_create_new_cloud_instance_in_fleet, - fleet_model_to_fleet, generate_fleet_name, get_fleet_master_instance_provisioning_data, + get_fleet_spec, get_next_instance_num, is_cloud_cluster, ) @@ -580,8 +580,8 @@ async def _fetch_fleet_with_master_instance_provisioning_data( # as FleetPipeline/InstancePipeline. master_instance_provisioning_data = None if is_master_job(job) and fleet_model is not None: - fleet = fleet_model_to_fleet(fleet_model) - if fleet.spec.configuration.placement == InstanceGroupPlacement.CLUSTER: + fleet_spec = get_fleet_spec(fleet_model) + if fleet_spec.configuration.placement == InstanceGroupPlacement.CLUSTER: # To avoid violating fleet placement cluster during master provisioning, # we must lock empty fleets and respect existing instances in non-empty fleets. # On SQLite always take the lock during master provisioning for simplicity. @@ -624,7 +624,7 @@ async def _fetch_fleet_with_master_instance_provisioning_data( fleet_model = res.unique().scalar_one() master_instance_provisioning_data = get_fleet_master_instance_provisioning_data( fleet_model=fleet_model, - fleet_spec=fleet.spec, + fleet_spec=fleet_spec, ) return master_instance_provisioning_data @@ -730,15 +730,14 @@ async def _run_jobs_on_new_instances( job = jobs[0] profile = run.run_spec.merged_profile requirements = job.job_spec.requirements - fleet = None if fleet_model is not None: - fleet = fleet_model_to_fleet(fleet_model) + fleet_spec = get_fleet_spec(fleet_model) try: - check_can_create_new_cloud_instance_in_fleet(fleet) + check_can_create_new_cloud_instance_in_fleet(fleet_model, fleet_spec) profile, requirements = get_run_profile_and_requirements_in_fleet( job=job, run_spec=run.run_spec, - fleet=fleet, + fleet_spec=fleet_spec, ) except ValueError as e: logger.debug("%s: %s", fmt(job_model), e.args[0]) diff --git a/src/dstack/_internal/server/services/fleets.py b/src/dstack/_internal/server/services/fleets.py index 969ef1ace..8fb93d08d 100644 --- a/src/dstack/_internal/server/services/fleets.py +++ b/src/dstack/_internal/server/services/fleets.py @@ -953,23 +953,23 @@ def get_fleet_master_instance_provisioning_data( return master_instance_provisioning_data -def can_create_new_cloud_instance_in_fleet(fleet: Fleet) -> bool: - if fleet.spec.configuration.ssh_config is not None: +def can_create_new_cloud_instance_in_fleet(fleet_model: FleetModel, fleet_spec: FleetSpec) -> bool: + if fleet_spec.configuration.ssh_config is not None: return False - active_instances = [i for i in fleet.instances if i.status.is_active()] + active_instances = [i for i in fleet_model.instances if i.status.is_active()] # nodes.max is a soft limit that can be exceeded when provisioning concurrently. # The fleet consolidation logic will remove redundant nodes eventually. if ( - fleet.spec.configuration.nodes is not None - and fleet.spec.configuration.nodes.max is not None - and len(active_instances) >= fleet.spec.configuration.nodes.max + fleet_spec.configuration.nodes is not None + and fleet_spec.configuration.nodes.max is not None + and len(active_instances) >= fleet_spec.configuration.nodes.max ): return False return True -def check_can_create_new_cloud_instance_in_fleet(fleet: Fleet): - if not can_create_new_cloud_instance_in_fleet(fleet): +def check_can_create_new_cloud_instance_in_fleet(fleet_model: FleetModel, fleet_spec: FleetSpec): + if not can_create_new_cloud_instance_in_fleet(fleet_model, fleet_spec): raise ValueError("Cannot fit new cloud instance into fleet") diff --git a/src/dstack/_internal/server/services/runs/plan.py b/src/dstack/_internal/server/services/runs/plan.py index 9694cccd7..a0b6ee3c7 100644 --- a/src/dstack/_internal/server/services/runs/plan.py +++ b/src/dstack/_internal/server/services/runs/plan.py @@ -6,7 +6,7 @@ from sqlalchemy.orm import contains_eager, joinedload, noload from dstack._internal.core.backends.base.backend import Backend -from dstack._internal.core.models.fleets import Fleet, InstanceGroupPlacement +from dstack._internal.core.models.fleets import Fleet, FleetSpec, InstanceGroupPlacement from dstack._internal.core.models.instances import ( InstanceAvailability, InstanceOfferWithAvailability, @@ -34,6 +34,7 @@ fleet_model_to_fleet, get_fleet_master_instance_provisioning_data, get_fleet_requirements, + get_fleet_spec, ) from dstack._internal.server.services.instances import ( filter_pool_instances, @@ -375,7 +376,7 @@ async def find_optimal_fleet_with_offers( backend_offers = await _get_backend_offers_in_fleet( project=project, fleet_model=candidate_fleet_model, - fleet=candidate_fleet, + fleet_spec=candidate_fleet.spec, run_spec=run_spec, job=job, volumes=volumes, @@ -436,12 +437,12 @@ async def find_optimal_fleet_with_offers( def get_run_profile_and_requirements_in_fleet( job: Job, run_spec: RunSpec, - fleet: Fleet, + fleet_spec: FleetSpec, ) -> tuple[Profile, Requirements]: - profile = combine_fleet_and_run_profiles(fleet.spec.merged_profile, run_spec.merged_profile) + profile = combine_fleet_and_run_profiles(fleet_spec.merged_profile, run_spec.merged_profile) if profile is None: raise ValueError("Cannot combine fleet profile") - fleet_requirements = get_fleet_requirements(fleet.spec) + fleet_requirements = get_fleet_requirements(fleet_spec) requirements = combine_fleet_and_run_requirements( fleet_requirements, job.job_spec.requirements ) @@ -547,17 +548,17 @@ async def _get_backend_offers_in_fleet( run_spec: RunSpec, job: Job, volumes: Optional[list[list[Volume]]], - fleet: Optional[Fleet] = None, + fleet_spec: Optional[FleetSpec] = None, max_offers: Optional[int] = None, ) -> list[tuple[Backend, InstanceOfferWithAvailability]]: - if fleet is None: - fleet = fleet_model_to_fleet(fleet_model) + if fleet_spec is None: + fleet_spec = get_fleet_spec(fleet_model) try: - check_can_create_new_cloud_instance_in_fleet(fleet) + check_can_create_new_cloud_instance_in_fleet(fleet_model, fleet_spec) profile, requirements = get_run_profile_and_requirements_in_fleet( job=job, run_spec=run_spec, - fleet=fleet, + fleet_spec=fleet_spec, ) except ValueError: backend_offers = [] @@ -565,7 +566,7 @@ async def _get_backend_offers_in_fleet( # Master job offers must be in the same cluster as existing instances. master_instance_provisioning_data = get_fleet_master_instance_provisioning_data( fleet_model=fleet_model, - fleet_spec=fleet.spec, + fleet_spec=fleet_spec, ) # Handle multinode for old jobs that don't have requirements.multinode set. # TODO: Drop multinode param. diff --git a/src/tests/_internal/server/background/scheduled_tasks/test_submitted_jobs.py b/src/tests/_internal/server/background/scheduled_tasks/test_submitted_jobs.py index db75bbf53..b0481ad4e 100644 --- a/src/tests/_internal/server/background/scheduled_tasks/test_submitted_jobs.py +++ b/src/tests/_internal/server/background/scheduled_tasks/test_submitted_jobs.py @@ -470,6 +470,137 @@ async def test_not_assigns_job_to_foreign_fleet_if_not_imported( assert not job.instance_assigned assert job.instance is None + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_assigns_second_replica_to_same_imported_fleet( + self, test_db, session: AsyncSession + ): + user = await create_user(session, global_role=GlobalRole.USER) + exporter_project = await create_project(session, name="exporter-project", owner=user) + importer_project = await create_project(session, name="importer-project", owner=user) + fleet = await create_fleet( + session=session, + project=exporter_project, + spec=get_fleet_spec(get_ssh_fleet_configuration()), + name="exported-fleet", + ) + instance_0 = await create_instance( + session=session, + project=exporter_project, + fleet=fleet, + name="exported-fleet-0", + status=InstanceStatus.BUSY, + ) + instance_1 = await create_instance( + session=session, + project=exporter_project, + fleet=fleet, + name="exported-fleet-1", + status=InstanceStatus.IDLE, + ) + await create_export( + session=session, + exporter_project=exporter_project, + importer_projects=[importer_project], + exported_fleets=[fleet], + ) + repo = await create_repo(session=session, project_id=importer_project.id) + run = await create_run( + session=session, + project=importer_project, + repo=repo, + user=user, + fleet=fleet, + ) + await create_job( + session=session, + run=run, + fleet=fleet, + instance=instance_0, + instance_assigned=True, + status=JobStatus.RUNNING, + job_num=0, + replica_num=0, + ) + job_1 = await create_job( + session=session, + run=run, + instance_assigned=False, + status=JobStatus.SUBMITTED, + job_num=0, + replica_num=1, + ) + await process_submitted_jobs() + await session.refresh(job_1) + assert job_1.status == JobStatus.SUBMITTED + assert job_1.instance_assigned + assert job_1.instance_id == instance_1.id + assert job_1.fleet_id == fleet.id + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_second_job_fails_if_imported_fleet_has_no_capacity( + self, test_db, session: AsyncSession + ): + user = await create_user(session, global_role=GlobalRole.USER) + exporter_project = await create_project(session, name="exporter-project", owner=user) + importer_project = await create_project(session, name="importer-project", owner=user) + fleet = await create_fleet( + session=session, + project=exporter_project, + spec=get_fleet_spec(get_ssh_fleet_configuration()), + name="exported-fleet", + ) + instance_0 = await create_instance( + session=session, + project=exporter_project, + fleet=fleet, + name="exported-fleet-0", + status=InstanceStatus.BUSY, + ) + await create_export( + session=session, + exporter_project=exporter_project, + importer_projects=[importer_project], + exported_fleets=[fleet], + ) + repo = await create_repo(session=session, project_id=importer_project.id) + run = await create_run( + session=session, + project=importer_project, + repo=repo, + user=user, + fleet=fleet, + ) + await create_job( + session=session, + run=run, + fleet=fleet, + instance=instance_0, + instance_assigned=True, + status=JobStatus.RUNNING, + job_num=0, + replica_num=0, + ) + job_1 = await create_job( + session=session, + run=run, + instance_assigned=False, + status=JobStatus.SUBMITTED, + job_num=0, + replica_num=1, + ) + await process_submitted_jobs() + await session.refresh(job_1) + assert job_1.status == JobStatus.SUBMITTED + assert job_1.instance_assigned + assert job_1.instance_id is None + + await process_submitted_jobs() + await session.refresh(job_1) + assert job_1.status == JobStatus.TERMINATING + assert job_1.termination_reason == JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY + @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) async def test_does_no_reuse_unavailable_instances(self, test_db, session: AsyncSession):