Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@
from dstack._internal.server.services import backends as backends_services
from dstack._internal.server.services import events
from dstack._internal.server.services.fleets import (
fleet_model_to_fleet,
get_create_instance_offers,
get_fleet_spec,
is_cloud_cluster,
)
from dstack._internal.server.services.instances import (
Expand Down Expand Up @@ -310,12 +310,12 @@ def _can_terminate_fleet_instances_on_idle_duration(fleet_model: FleetModel) ->
# There may be race conditions since we don't take the fleet lock.
# That's ok: in the worst case we go below `nodes.min`, but
# the fleet consolidation logic will provision new nodes.
fleet = fleet_model_to_fleet(fleet_model)
if fleet.spec.configuration.nodes is None or fleet.spec.autocreated:
fleet_spec = get_fleet_spec(fleet_model)
if fleet_spec.configuration.nodes is None or fleet_spec.autocreated:
return True
active_instances = [i for i in fleet_model.instances if i.status.is_active()]
active_instances_num = len(active_instances)
return active_instances_num > fleet.spec.configuration.nodes.min
return active_instances_num > fleet_spec.configuration.nodes.min


async def _add_remote(session: AsyncSession, instance: InstanceModel) -> None:
Expand Down Expand Up @@ -1223,8 +1223,8 @@ def _get_instance_offer_for_instance(
) -> InstanceOfferWithAvailability:
if instance.fleet is None:
return instance_offer
fleet = fleet_model_to_fleet(instance.fleet)
if fleet.spec.configuration.placement == InstanceGroupPlacement.CLUSTER:
fleet_spec = get_fleet_spec(instance.fleet)
if fleet_spec.configuration.placement == InstanceGroupPlacement.CLUSTER:
master_job_provisioning_data = get_instance_provisioning_data(master_instance)
return get_instance_offer_with_restricted_az(
instance_offer=instance_offer,
Expand Down
40 changes: 18 additions & 22 deletions src/dstack/_internal/server/services/runs/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@

from sqlalchemy import and_, exists, not_, or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import contains_eager, joinedload, noload
from sqlalchemy.orm import contains_eager, noload

from dstack._internal.core.backends.base.backend import Backend
from dstack._internal.core.models.fleets import Fleet, FleetSpec, InstanceGroupPlacement
from dstack._internal.core.models.fleets import FleetSpec, InstanceGroupPlacement
from dstack._internal.core.models.instances import (
InstanceAvailability,
InstanceOfferWithAvailability,
Expand All @@ -31,7 +31,6 @@
)
from dstack._internal.server.services.fleets import (
check_can_create_new_cloud_instance_in_fleet,
fleet_model_to_fleet,
get_fleet_master_instance_provisioning_data,
get_fleet_requirements,
get_fleet_spec,
Expand Down Expand Up @@ -251,12 +250,7 @@ async def select_run_candidate_fleet_models_with_filters(
.join(FleetModel.instances)
.where(*fleet_filters)
.where(*instance_filters)
.options(
contains_eager(FleetModel.instances),
joinedload(FleetModel.project)
.load_only(ProjectModel.name)
.joinedload(ProjectModel.backends),
)
.options(contains_eager(FleetModel.instances))
.execution_options(populate_existing=True)
)
if lock_instances:
Expand Down Expand Up @@ -341,18 +335,18 @@ async def find_optimal_fleet_with_offers(
]
] = []
for candidate_fleet_model in fleet_models:
candidate_fleet = fleet_model_to_fleet(candidate_fleet_model)
candidate_fleet_spec = get_fleet_spec(candidate_fleet_model)
if (
is_multinode_job(job)
and candidate_fleet.spec.configuration.placement != InstanceGroupPlacement.CLUSTER
and candidate_fleet_spec.configuration.placement != InstanceGroupPlacement.CLUSTER
):
# Limit multinode runs to cluster fleets to guarantee best connectivity.
continue

if not _run_can_fit_into_fleet(run_spec, candidate_fleet):
if not _run_can_fit_into_fleet(run_spec, candidate_fleet_model, candidate_fleet_spec):
logger.debug(
"Skipping fleet %s from consideration: run cannot fit into fleet",
candidate_fleet.name,
candidate_fleet_model.name,
)
continue

Expand All @@ -376,7 +370,7 @@ async def find_optimal_fleet_with_offers(
backend_offers = await _get_backend_offers_in_fleet(
project=project,
fleet_model=candidate_fleet_model,
fleet_spec=candidate_fleet.spec,
fleet_spec=candidate_fleet_spec,
run_spec=run_spec,
job=job,
volumes=volumes,
Expand Down Expand Up @@ -509,7 +503,9 @@ def _get_instance_offers_in_fleet(
return instances_with_offers


def _run_can_fit_into_fleet(run_spec: RunSpec, fleet: Fleet) -> bool:
def _run_can_fit_into_fleet(
run_spec: RunSpec, fleet_model: FleetModel, fleet_spec: FleetSpec
) -> bool:
"""
Returns `False` if the run cannot fit into fleet for sure.
This is helpful heuristic to avoid even considering fleets too small for a run.
Expand All @@ -522,19 +518,19 @@ def _run_can_fit_into_fleet(run_spec: RunSpec, fleet: Fleet) -> bool:
# how many jobs such fleets can accommodate.
nodes_required_num = get_nodes_required_num(run_spec)
if (
fleet.spec.configuration.nodes is not None
and fleet.spec.configuration.blocks == 1
and fleet.spec.configuration.nodes.max is not None
fleet_spec.configuration.nodes is not None
and fleet_spec.configuration.blocks == 1
and fleet_spec.configuration.nodes.max is not None
):
busy_instances = [i for i in fleet.instances if i.busy_blocks > 0]
fleet_available_capacity = fleet.spec.configuration.nodes.max - len(busy_instances)
busy_instances = [i for i in fleet_model.instances if i.busy_blocks > 0]
fleet_available_capacity = fleet_spec.configuration.nodes.max - len(busy_instances)
if fleet_available_capacity < nodes_required_num:
return False
elif fleet.spec.configuration.ssh_config is not None:
elif fleet_spec.configuration.ssh_config is not None:
# Currently assume that each idle block can run a job.
# TODO: Take resources / eligible offers into account.
total_idle_blocks = 0
for instance in fleet.instances:
for instance in fleet_model.instances:
total_blocks = instance.total_blocks or 1
total_idle_blocks += total_blocks - instance.busy_blocks
if total_idle_blocks < nodes_required_num:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,42 @@ async def test_not_assigns_job_to_foreign_fleet_if_not_imported(
assert not job.instance_assigned
assert job.instance is None

@pytest.mark.asyncio
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
async def test_job_fails_if_imported_ssh_fleet_is_empty(self, test_db, session: AsyncSession):
user = await create_user(session, global_role=GlobalRole.USER)
exporter_project = await create_project(session, name="exporter-project", owner=user)
importer_project = await create_project(session, name="importer-project", owner=user)
fleet = await create_fleet(
session=session,
project=exporter_project,
spec=get_fleet_spec(get_ssh_fleet_configuration()),
name="exported-fleet",
)
await create_export(
session=session,
exporter_project=exporter_project,
importer_projects=[importer_project],
exported_fleets=[fleet],
)
repo = await create_repo(session=session, project_id=importer_project.id)
run = await create_run(
session=session,
project=importer_project,
repo=repo,
user=user,
)
job = await create_job(
session=session,
run=run,
instance_assigned=False,
status=JobStatus.SUBMITTED,
)
await process_submitted_jobs()
await session.refresh(job)
assert job.status == JobStatus.TERMINATING
assert job.termination_reason == JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY

@pytest.mark.asyncio
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
async def test_assigns_second_replica_to_same_imported_fleet(
Expand Down
43 changes: 43 additions & 0 deletions src/tests/_internal/server/routers/test_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1436,6 +1436,49 @@ async def test_returns_run_plan_with_offer_from_imported_fleet(
assert response_json["project_name"] == "importer-project"
assert response_json["job_plans"][0]["offers"][0]["backend"] == "remote"

@pytest.mark.asyncio
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
async def test_returns_no_offers_if_imported_ssh_fleet_is_empty(
self,
test_db,
session: AsyncSession,
client: AsyncClient,
) -> None:
importer_user = await create_user(session, global_role=GlobalRole.USER)
exporter_project = await create_project(session, name="exporter-project")
importer_project = await create_project(
session, name="importer-project", owner=importer_user
)
await add_project_member(
session=session,
project=importer_project,
user=importer_user,
project_role=ProjectRole.USER,
)
fleet = await create_fleet(
session=session,
project=exporter_project,
spec=get_fleet_spec(get_ssh_fleet_configuration()),
)
await create_export(
session=session,
exporter_project=exporter_project,
importer_projects=[importer_project],
exported_fleets=[fleet],
)

run_spec = {"configuration": {"type": "dev-environment", "ide": "vscode"}}
body = {"run_spec": run_spec}
response = await client.post(
"/api/project/importer-project/runs/get_plan",
headers=get_auth_headers(importer_user.token),
json=body,
)
assert response.status_code == 200, response.json()
response_json = response.json()
assert response_json["project_name"] == "importer-project"
assert len(response_json["job_plans"][0]["offers"]) == 0

@pytest.mark.parametrize(
("client_version", "expected_availability"),
[
Expand Down
Loading