From ca7272802b877d8f5485ad64d5bf4d0583ad52c7 Mon Sep 17 00:00:00 2001 From: Jvst Me Date: Thu, 12 Mar 2026 16:54:35 +0100 Subject: [PATCH] Fix error submitting run to empty imported fleet Avoid `fleet_model_to_fleet()`, since it requires the fleet project to be loaded from the database, which is not always the case for imported fleets. Now, `fleet_model_to_fleet()` is only used in API-related services. Also stop loading the fleet project model where it was previously necessary only for the redundant `fleet_model_to_fleet()` call. --- .../background/scheduled_tasks/instances.py | 12 +++--- .../_internal/server/services/runs/plan.py | 40 ++++++++--------- .../scheduled_tasks/test_submitted_jobs.py | 36 ++++++++++++++++ .../_internal/server/routers/test_runs.py | 43 +++++++++++++++++++ 4 files changed, 103 insertions(+), 28 deletions(-) diff --git a/src/dstack/_internal/server/background/scheduled_tasks/instances.py b/src/dstack/_internal/server/background/scheduled_tasks/instances.py index 1857e0ad0..e60f81190 100644 --- a/src/dstack/_internal/server/background/scheduled_tasks/instances.py +++ b/src/dstack/_internal/server/background/scheduled_tasks/instances.py @@ -77,8 +77,8 @@ from dstack._internal.server.services import backends as backends_services from dstack._internal.server.services import events from dstack._internal.server.services.fleets import ( - fleet_model_to_fleet, get_create_instance_offers, + get_fleet_spec, is_cloud_cluster, ) from dstack._internal.server.services.instances import ( @@ -310,12 +310,12 @@ def _can_terminate_fleet_instances_on_idle_duration(fleet_model: FleetModel) -> # There may be race conditions since we don't take the fleet lock. # That's ok: in the worst case we go below `nodes.min`, but # the fleet consolidation logic will provision new nodes. - fleet = fleet_model_to_fleet(fleet_model) - if fleet.spec.configuration.nodes is None or fleet.spec.autocreated: + fleet_spec = get_fleet_spec(fleet_model) + if fleet_spec.configuration.nodes is None or fleet_spec.autocreated: return True active_instances = [i for i in fleet_model.instances if i.status.is_active()] active_instances_num = len(active_instances) - return active_instances_num > fleet.spec.configuration.nodes.min + return active_instances_num > fleet_spec.configuration.nodes.min async def _add_remote(session: AsyncSession, instance: InstanceModel) -> None: @@ -1223,8 +1223,8 @@ def _get_instance_offer_for_instance( ) -> InstanceOfferWithAvailability: if instance.fleet is None: return instance_offer - fleet = fleet_model_to_fleet(instance.fleet) - if fleet.spec.configuration.placement == InstanceGroupPlacement.CLUSTER: + fleet_spec = get_fleet_spec(instance.fleet) + if fleet_spec.configuration.placement == InstanceGroupPlacement.CLUSTER: master_job_provisioning_data = get_instance_provisioning_data(master_instance) return get_instance_offer_with_restricted_az( instance_offer=instance_offer, diff --git a/src/dstack/_internal/server/services/runs/plan.py b/src/dstack/_internal/server/services/runs/plan.py index a0b6ee3c7..06e7c6c5a 100644 --- a/src/dstack/_internal/server/services/runs/plan.py +++ b/src/dstack/_internal/server/services/runs/plan.py @@ -3,10 +3,10 @@ from sqlalchemy import and_, exists, not_, or_, select from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.orm import contains_eager, joinedload, noload +from sqlalchemy.orm import contains_eager, noload from dstack._internal.core.backends.base.backend import Backend -from dstack._internal.core.models.fleets import Fleet, FleetSpec, InstanceGroupPlacement +from dstack._internal.core.models.fleets import FleetSpec, InstanceGroupPlacement from dstack._internal.core.models.instances import ( InstanceAvailability, InstanceOfferWithAvailability, @@ -31,7 +31,6 @@ ) from dstack._internal.server.services.fleets import ( check_can_create_new_cloud_instance_in_fleet, - fleet_model_to_fleet, get_fleet_master_instance_provisioning_data, get_fleet_requirements, get_fleet_spec, @@ -251,12 +250,7 @@ async def select_run_candidate_fleet_models_with_filters( .join(FleetModel.instances) .where(*fleet_filters) .where(*instance_filters) - .options( - contains_eager(FleetModel.instances), - joinedload(FleetModel.project) - .load_only(ProjectModel.name) - .joinedload(ProjectModel.backends), - ) + .options(contains_eager(FleetModel.instances)) .execution_options(populate_existing=True) ) if lock_instances: @@ -341,18 +335,18 @@ async def find_optimal_fleet_with_offers( ] ] = [] for candidate_fleet_model in fleet_models: - candidate_fleet = fleet_model_to_fleet(candidate_fleet_model) + candidate_fleet_spec = get_fleet_spec(candidate_fleet_model) if ( is_multinode_job(job) - and candidate_fleet.spec.configuration.placement != InstanceGroupPlacement.CLUSTER + and candidate_fleet_spec.configuration.placement != InstanceGroupPlacement.CLUSTER ): # Limit multinode runs to cluster fleets to guarantee best connectivity. continue - if not _run_can_fit_into_fleet(run_spec, candidate_fleet): + if not _run_can_fit_into_fleet(run_spec, candidate_fleet_model, candidate_fleet_spec): logger.debug( "Skipping fleet %s from consideration: run cannot fit into fleet", - candidate_fleet.name, + candidate_fleet_model.name, ) continue @@ -376,7 +370,7 @@ async def find_optimal_fleet_with_offers( backend_offers = await _get_backend_offers_in_fleet( project=project, fleet_model=candidate_fleet_model, - fleet_spec=candidate_fleet.spec, + fleet_spec=candidate_fleet_spec, run_spec=run_spec, job=job, volumes=volumes, @@ -509,7 +503,9 @@ def _get_instance_offers_in_fleet( return instances_with_offers -def _run_can_fit_into_fleet(run_spec: RunSpec, fleet: Fleet) -> bool: +def _run_can_fit_into_fleet( + run_spec: RunSpec, fleet_model: FleetModel, fleet_spec: FleetSpec +) -> bool: """ Returns `False` if the run cannot fit into fleet for sure. This is helpful heuristic to avoid even considering fleets too small for a run. @@ -522,19 +518,19 @@ def _run_can_fit_into_fleet(run_spec: RunSpec, fleet: Fleet) -> bool: # how many jobs such fleets can accommodate. nodes_required_num = get_nodes_required_num(run_spec) if ( - fleet.spec.configuration.nodes is not None - and fleet.spec.configuration.blocks == 1 - and fleet.spec.configuration.nodes.max is not None + fleet_spec.configuration.nodes is not None + and fleet_spec.configuration.blocks == 1 + and fleet_spec.configuration.nodes.max is not None ): - busy_instances = [i for i in fleet.instances if i.busy_blocks > 0] - fleet_available_capacity = fleet.spec.configuration.nodes.max - len(busy_instances) + busy_instances = [i for i in fleet_model.instances if i.busy_blocks > 0] + fleet_available_capacity = fleet_spec.configuration.nodes.max - len(busy_instances) if fleet_available_capacity < nodes_required_num: return False - elif fleet.spec.configuration.ssh_config is not None: + elif fleet_spec.configuration.ssh_config is not None: # Currently assume that each idle block can run a job. # TODO: Take resources / eligible offers into account. total_idle_blocks = 0 - for instance in fleet.instances: + for instance in fleet_model.instances: total_blocks = instance.total_blocks or 1 total_idle_blocks += total_blocks - instance.busy_blocks if total_idle_blocks < nodes_required_num: diff --git a/src/tests/_internal/server/background/scheduled_tasks/test_submitted_jobs.py b/src/tests/_internal/server/background/scheduled_tasks/test_submitted_jobs.py index b0481ad4e..c6aa644dd 100644 --- a/src/tests/_internal/server/background/scheduled_tasks/test_submitted_jobs.py +++ b/src/tests/_internal/server/background/scheduled_tasks/test_submitted_jobs.py @@ -470,6 +470,42 @@ async def test_not_assigns_job_to_foreign_fleet_if_not_imported( assert not job.instance_assigned assert job.instance is None + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_job_fails_if_imported_ssh_fleet_is_empty(self, test_db, session: AsyncSession): + user = await create_user(session, global_role=GlobalRole.USER) + exporter_project = await create_project(session, name="exporter-project", owner=user) + importer_project = await create_project(session, name="importer-project", owner=user) + fleet = await create_fleet( + session=session, + project=exporter_project, + spec=get_fleet_spec(get_ssh_fleet_configuration()), + name="exported-fleet", + ) + await create_export( + session=session, + exporter_project=exporter_project, + importer_projects=[importer_project], + exported_fleets=[fleet], + ) + repo = await create_repo(session=session, project_id=importer_project.id) + run = await create_run( + session=session, + project=importer_project, + repo=repo, + user=user, + ) + job = await create_job( + session=session, + run=run, + instance_assigned=False, + status=JobStatus.SUBMITTED, + ) + await process_submitted_jobs() + await session.refresh(job) + assert job.status == JobStatus.TERMINATING + assert job.termination_reason == JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY + @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) async def test_assigns_second_replica_to_same_imported_fleet( diff --git a/src/tests/_internal/server/routers/test_runs.py b/src/tests/_internal/server/routers/test_runs.py index ea829d569..1d72ba552 100644 --- a/src/tests/_internal/server/routers/test_runs.py +++ b/src/tests/_internal/server/routers/test_runs.py @@ -1436,6 +1436,49 @@ async def test_returns_run_plan_with_offer_from_imported_fleet( assert response_json["project_name"] == "importer-project" assert response_json["job_plans"][0]["offers"][0]["backend"] == "remote" + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_returns_no_offers_if_imported_ssh_fleet_is_empty( + self, + test_db, + session: AsyncSession, + client: AsyncClient, + ) -> None: + importer_user = await create_user(session, global_role=GlobalRole.USER) + exporter_project = await create_project(session, name="exporter-project") + importer_project = await create_project( + session, name="importer-project", owner=importer_user + ) + await add_project_member( + session=session, + project=importer_project, + user=importer_user, + project_role=ProjectRole.USER, + ) + fleet = await create_fleet( + session=session, + project=exporter_project, + spec=get_fleet_spec(get_ssh_fleet_configuration()), + ) + await create_export( + session=session, + exporter_project=exporter_project, + importer_projects=[importer_project], + exported_fleets=[fleet], + ) + + run_spec = {"configuration": {"type": "dev-environment", "ide": "vscode"}} + body = {"run_spec": run_spec} + response = await client.post( + "/api/project/importer-project/runs/get_plan", + headers=get_auth_headers(importer_user.token), + json=body, + ) + assert response.status_code == 200, response.json() + response_json = response.json() + assert response_json["project_name"] == "importer-project" + assert len(response_json["job_plans"][0]["offers"]) == 0 + @pytest.mark.parametrize( ("client_version", "expected_availability"), [