Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@
from dstack._internal.server.services.backends import get_project_backend_by_type_or_error
from dstack._internal.server.services.fleets import (
check_can_create_new_cloud_instance_in_fleet,
fleet_model_to_fleet,
generate_fleet_name,
get_fleet_master_instance_provisioning_data,
get_fleet_spec,
get_next_instance_num,
is_cloud_cluster,
)
Expand Down Expand Up @@ -580,8 +580,8 @@ async def _fetch_fleet_with_master_instance_provisioning_data(
# as FleetPipeline/InstancePipeline.
master_instance_provisioning_data = None
if is_master_job(job) and fleet_model is not None:
fleet = fleet_model_to_fleet(fleet_model)
if fleet.spec.configuration.placement == InstanceGroupPlacement.CLUSTER:
fleet_spec = get_fleet_spec(fleet_model)
if fleet_spec.configuration.placement == InstanceGroupPlacement.CLUSTER:
# To avoid violating fleet placement cluster during master provisioning,
# we must lock empty fleets and respect existing instances in non-empty fleets.
# On SQLite always take the lock during master provisioning for simplicity.
Expand Down Expand Up @@ -624,7 +624,7 @@ async def _fetch_fleet_with_master_instance_provisioning_data(
fleet_model = res.unique().scalar_one()
master_instance_provisioning_data = get_fleet_master_instance_provisioning_data(
fleet_model=fleet_model,
fleet_spec=fleet.spec,
fleet_spec=fleet_spec,
)
return master_instance_provisioning_data

Expand Down Expand Up @@ -730,15 +730,14 @@ async def _run_jobs_on_new_instances(
job = jobs[0]
profile = run.run_spec.merged_profile
requirements = job.job_spec.requirements
fleet = None
if fleet_model is not None:
fleet = fleet_model_to_fleet(fleet_model)
fleet_spec = get_fleet_spec(fleet_model)
try:
check_can_create_new_cloud_instance_in_fleet(fleet)
check_can_create_new_cloud_instance_in_fleet(fleet_model, fleet_spec)
profile, requirements = get_run_profile_and_requirements_in_fleet(
job=job,
run_spec=run.run_spec,
fleet=fleet,
fleet_spec=fleet_spec,
)
except ValueError as e:
logger.debug("%s: %s", fmt(job_model), e.args[0])
Expand Down
16 changes: 8 additions & 8 deletions src/dstack/_internal/server/services/fleets.py
Original file line number Diff line number Diff line change
Expand Up @@ -953,23 +953,23 @@ def get_fleet_master_instance_provisioning_data(
return master_instance_provisioning_data


def can_create_new_cloud_instance_in_fleet(fleet: Fleet) -> bool:
if fleet.spec.configuration.ssh_config is not None:
def can_create_new_cloud_instance_in_fleet(fleet_model: FleetModel, fleet_spec: FleetSpec) -> bool:
if fleet_spec.configuration.ssh_config is not None:
return False
active_instances = [i for i in fleet.instances if i.status.is_active()]
active_instances = [i for i in fleet_model.instances if i.status.is_active()]
# nodes.max is a soft limit that can be exceeded when provisioning concurrently.
# The fleet consolidation logic will remove redundant nodes eventually.
if (
fleet.spec.configuration.nodes is not None
and fleet.spec.configuration.nodes.max is not None
and len(active_instances) >= fleet.spec.configuration.nodes.max
fleet_spec.configuration.nodes is not None
and fleet_spec.configuration.nodes.max is not None
and len(active_instances) >= fleet_spec.configuration.nodes.max
):
return False
return True


def check_can_create_new_cloud_instance_in_fleet(fleet: Fleet):
if not can_create_new_cloud_instance_in_fleet(fleet):
def check_can_create_new_cloud_instance_in_fleet(fleet_model: FleetModel, fleet_spec: FleetSpec):
if not can_create_new_cloud_instance_in_fleet(fleet_model, fleet_spec):
raise ValueError("Cannot fit new cloud instance into fleet")


Expand Down
23 changes: 12 additions & 11 deletions src/dstack/_internal/server/services/runs/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from sqlalchemy.orm import contains_eager, joinedload, noload

from dstack._internal.core.backends.base.backend import Backend
from dstack._internal.core.models.fleets import Fleet, InstanceGroupPlacement
from dstack._internal.core.models.fleets import Fleet, FleetSpec, InstanceGroupPlacement
from dstack._internal.core.models.instances import (
InstanceAvailability,
InstanceOfferWithAvailability,
Expand Down Expand Up @@ -34,6 +34,7 @@
fleet_model_to_fleet,
get_fleet_master_instance_provisioning_data,
get_fleet_requirements,
get_fleet_spec,
)
from dstack._internal.server.services.instances import (
filter_pool_instances,
Expand Down Expand Up @@ -375,7 +376,7 @@ async def find_optimal_fleet_with_offers(
backend_offers = await _get_backend_offers_in_fleet(
project=project,
fleet_model=candidate_fleet_model,
fleet=candidate_fleet,
fleet_spec=candidate_fleet.spec,
run_spec=run_spec,
job=job,
volumes=volumes,
Expand Down Expand Up @@ -436,12 +437,12 @@ async def find_optimal_fleet_with_offers(
def get_run_profile_and_requirements_in_fleet(
job: Job,
run_spec: RunSpec,
fleet: Fleet,
fleet_spec: FleetSpec,
) -> tuple[Profile, Requirements]:
profile = combine_fleet_and_run_profiles(fleet.spec.merged_profile, run_spec.merged_profile)
profile = combine_fleet_and_run_profiles(fleet_spec.merged_profile, run_spec.merged_profile)
if profile is None:
raise ValueError("Cannot combine fleet profile")
fleet_requirements = get_fleet_requirements(fleet.spec)
fleet_requirements = get_fleet_requirements(fleet_spec)
requirements = combine_fleet_and_run_requirements(
fleet_requirements, job.job_spec.requirements
)
Expand Down Expand Up @@ -547,25 +548,25 @@ async def _get_backend_offers_in_fleet(
run_spec: RunSpec,
job: Job,
volumes: Optional[list[list[Volume]]],
fleet: Optional[Fleet] = None,
fleet_spec: Optional[FleetSpec] = None,
max_offers: Optional[int] = None,
) -> list[tuple[Backend, InstanceOfferWithAvailability]]:
if fleet is None:
fleet = fleet_model_to_fleet(fleet_model)
if fleet_spec is None:
fleet_spec = get_fleet_spec(fleet_model)
try:
check_can_create_new_cloud_instance_in_fleet(fleet)
check_can_create_new_cloud_instance_in_fleet(fleet_model, fleet_spec)
profile, requirements = get_run_profile_and_requirements_in_fleet(
job=job,
run_spec=run_spec,
fleet=fleet,
fleet_spec=fleet_spec,
)
except ValueError:
backend_offers = []
else:
# Master job offers must be in the same cluster as existing instances.
master_instance_provisioning_data = get_fleet_master_instance_provisioning_data(
fleet_model=fleet_model,
fleet_spec=fleet.spec,
fleet_spec=fleet_spec,
)
# Handle multinode for old jobs that don't have requirements.multinode set.
# TODO: Drop multinode param.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,137 @@ async def test_not_assigns_job_to_foreign_fleet_if_not_imported(
assert not job.instance_assigned
assert job.instance is None

@pytest.mark.asyncio
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
async def test_assigns_second_replica_to_same_imported_fleet(
self, test_db, session: AsyncSession
):
user = await create_user(session, global_role=GlobalRole.USER)
exporter_project = await create_project(session, name="exporter-project", owner=user)
importer_project = await create_project(session, name="importer-project", owner=user)
fleet = await create_fleet(
session=session,
project=exporter_project,
spec=get_fleet_spec(get_ssh_fleet_configuration()),
name="exported-fleet",
)
instance_0 = await create_instance(
session=session,
project=exporter_project,
fleet=fleet,
name="exported-fleet-0",
status=InstanceStatus.BUSY,
)
instance_1 = await create_instance(
session=session,
project=exporter_project,
fleet=fleet,
name="exported-fleet-1",
status=InstanceStatus.IDLE,
)
await create_export(
session=session,
exporter_project=exporter_project,
importer_projects=[importer_project],
exported_fleets=[fleet],
)
repo = await create_repo(session=session, project_id=importer_project.id)
run = await create_run(
session=session,
project=importer_project,
repo=repo,
user=user,
fleet=fleet,
)
await create_job(
session=session,
run=run,
fleet=fleet,
instance=instance_0,
instance_assigned=True,
status=JobStatus.RUNNING,
job_num=0,
replica_num=0,
)
job_1 = await create_job(
session=session,
run=run,
instance_assigned=False,
status=JobStatus.SUBMITTED,
job_num=0,
replica_num=1,
)
await process_submitted_jobs()
await session.refresh(job_1)
assert job_1.status == JobStatus.SUBMITTED
assert job_1.instance_assigned
assert job_1.instance_id == instance_1.id
assert job_1.fleet_id == fleet.id

@pytest.mark.asyncio
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
async def test_second_job_fails_if_imported_fleet_has_no_capacity(
self, test_db, session: AsyncSession
):
user = await create_user(session, global_role=GlobalRole.USER)
exporter_project = await create_project(session, name="exporter-project", owner=user)
importer_project = await create_project(session, name="importer-project", owner=user)
fleet = await create_fleet(
session=session,
project=exporter_project,
spec=get_fleet_spec(get_ssh_fleet_configuration()),
name="exported-fleet",
)
instance_0 = await create_instance(
session=session,
project=exporter_project,
fleet=fleet,
name="exported-fleet-0",
status=InstanceStatus.BUSY,
)
await create_export(
session=session,
exporter_project=exporter_project,
importer_projects=[importer_project],
exported_fleets=[fleet],
)
repo = await create_repo(session=session, project_id=importer_project.id)
run = await create_run(
session=session,
project=importer_project,
repo=repo,
user=user,
fleet=fleet,
)
await create_job(
session=session,
run=run,
fleet=fleet,
instance=instance_0,
instance_assigned=True,
status=JobStatus.RUNNING,
job_num=0,
replica_num=0,
)
job_1 = await create_job(
session=session,
run=run,
instance_assigned=False,
status=JobStatus.SUBMITTED,
job_num=0,
replica_num=1,
)
await process_submitted_jobs()
await session.refresh(job_1)
assert job_1.status == JobStatus.SUBMITTED
assert job_1.instance_assigned
assert job_1.instance_id is None

await process_submitted_jobs()
await session.refresh(job_1)
assert job_1.status == JobStatus.TERMINATING
assert job_1.termination_reason == JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY

@pytest.mark.asyncio
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
async def test_does_no_reuse_unavailable_instances(self, test_db, session: AsyncSession):
Expand Down
Loading