Skip to content

Support Nebius InfiniBand clusters #2604

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion src/dstack/_internal/core/backends/aws/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ def create_instance(
self,
instance_offer: InstanceOfferWithAvailability,
instance_config: InstanceConfiguration,
placement_group: Optional[PlacementGroup],
) -> JobProvisioningData:
project_name = instance_config.project_name
ec2_resource = self.session.resource("ec2", region_name=instance_offer.region)
Expand Down Expand Up @@ -248,7 +249,7 @@ def create_instance(
spot=instance_offer.instance.resources.spot,
subnet_id=subnet_id,
allocate_public_ip=allocate_public_ip,
placement_group_name=instance_config.placement_group_name,
placement_group_name=placement_group.name if placement_group else None,
enable_efa=enable_efa,
max_efa_interfaces=max_efa_interfaces,
reservation_id=instance_config.reservation,
Expand Down Expand Up @@ -291,6 +292,7 @@ def create_instance(
def create_placement_group(
self,
placement_group: PlacementGroup,
master_instance_offer: InstanceOffer,
) -> PlacementGroupProvisioningData:
ec2_client = self.session.client("ec2", region_name=placement_group.configuration.region)
logger.debug("Creating placement group %s...", placement_group.name)
Expand Down Expand Up @@ -323,6 +325,16 @@ def delete_placement_group(
raise e
logger.debug("Deleted placement group %s", placement_group.name)

def is_suitable_placement_group(
self,
placement_group: PlacementGroup,
instance_offer: InstanceOffer,
) -> bool:
return (
placement_group.configuration.backend == BackendType.AWS
and placement_group.configuration.region == instance_offer.region
)

def create_gateway(
self,
configuration: GatewayComputeConfiguration,
Expand Down
2 changes: 2 additions & 0 deletions src/dstack/_internal/core/backends/azure/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
InstanceOfferWithAvailability,
InstanceType,
)
from dstack._internal.core.models.placement import PlacementGroup
from dstack._internal.core.models.resources import Memory, Range
from dstack._internal.core.models.runs import JobProvisioningData, Requirements
from dstack._internal.utils.logging import get_logger
Expand Down Expand Up @@ -109,6 +110,7 @@ def create_instance(
self,
instance_offer: InstanceOfferWithAvailability,
instance_config: InstanceConfiguration,
placement_group: Optional[PlacementGroup],
) -> JobProvisioningData:
instance_name = generate_unique_instance_name(
instance_config, max_length=azure_resources.MAX_RESOURCE_NAME_LEN
Expand Down
44 changes: 42 additions & 2 deletions src/dstack/_internal/core/backends/base/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
)
from dstack._internal.core.models.instances import (
InstanceConfiguration,
InstanceOffer,
InstanceOfferWithAvailability,
SSHKey,
)
Expand Down Expand Up @@ -144,6 +145,7 @@ def create_instance(
self,
instance_offer: InstanceOfferWithAvailability,
instance_config: InstanceConfiguration,
placement_group: Optional[PlacementGroup],
) -> JobProvisioningData:
"""
Launches a new instance. It should return `JobProvisioningData` ASAP.
Expand Down Expand Up @@ -176,7 +178,7 @@ def run_job(
)
instance_offer = instance_offer.copy()
self._restrict_instance_offer_az_to_volumes_az(instance_offer, volumes)
return self.create_instance(instance_offer, instance_config)
return self.create_instance(instance_offer, instance_config, placement_group=None)

def _restrict_instance_offer_az_to_volumes_az(
self,
Expand Down Expand Up @@ -225,9 +227,15 @@ class ComputeWithPlacementGroupSupport(ABC):
def create_placement_group(
self,
placement_group: PlacementGroup,
master_instance_offer: InstanceOffer,
) -> PlacementGroupProvisioningData:
"""
Creates a placement group.

Args:
placement_group: details about the placement group to be created
master_instance_offer: the first instance dstack will attempt to add
to the placement group
"""
pass

Expand All @@ -242,10 +250,27 @@ def delete_placement_group(
"""
pass

@abstractmethod
def is_suitable_placement_group(
self,
placement_group: PlacementGroup,
instance_offer: InstanceOffer,
) -> bool:
"""
Checks if the instance offer can be provisioned in the placement group.

Should return immediately, without performing API calls.

Can be called with an offer originating from a different backend, because some backends
(BackendType.DSTACK) produce offers on behalf of other backends. Should return `False`
in that case.
"""
pass


class ComputeWithGatewaySupport(ABC):
"""
Must be subclassed and imlemented to support gateways.
Must be subclassed and implemented to support gateways.
"""

@abstractmethod
Expand Down Expand Up @@ -418,6 +443,21 @@ def generate_unique_volume_name(
)


def generate_unique_placement_group_name(
project_name: str,
fleet_name: str,
max_length: int = _DEFAULT_MAX_RESOURCE_NAME_LEN,
) -> str:
"""
Generates a unique placement group name valid across all backends.
"""
return generate_unique_backend_name(
resource_name=fleet_name,
project_name=project_name,
max_length=max_length,
)


def generate_unique_backend_name(
resource_name: str,
project_name: Optional[str],
Expand Down
2 changes: 2 additions & 0 deletions src/dstack/_internal/core/backends/cudo/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
InstanceConfiguration,
InstanceOfferWithAvailability,
)
from dstack._internal.core.models.placement import PlacementGroup
from dstack._internal.core.models.runs import JobProvisioningData, Requirements
from dstack._internal.utils.logging import get_logger

Expand Down Expand Up @@ -58,6 +59,7 @@ def create_instance(
self,
instance_offer: InstanceOfferWithAvailability,
instance_config: InstanceConfiguration,
placement_group: Optional[PlacementGroup],
) -> JobProvisioningData:
vm_id = generate_unique_instance_name(instance_config, max_length=MAX_RESOURCE_NAME_LEN)
public_keys = instance_config.get_public_keys()
Expand Down
2 changes: 2 additions & 0 deletions src/dstack/_internal/core/backends/datacrunch/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
InstanceOffer,
InstanceOfferWithAvailability,
)
from dstack._internal.core.models.placement import PlacementGroup
from dstack._internal.core.models.resources import Memory, Range
from dstack._internal.core.models.runs import JobProvisioningData, Requirements
from dstack._internal.utils.logging import get_logger
Expand Down Expand Up @@ -85,6 +86,7 @@ def create_instance(
self,
instance_offer: InstanceOfferWithAvailability,
instance_config: InstanceConfiguration,
placement_group: Optional[PlacementGroup],
) -> JobProvisioningData:
instance_name = generate_unique_instance_name(
instance_config, max_length=MAX_INSTANCE_NAME_LEN
Expand Down
2 changes: 1 addition & 1 deletion src/dstack/_internal/core/backends/gcp/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def authenticate(creds: AnyGCPCreds, project_id: Optional[str] = None) -> Tuple[
credentials, credentials_project_id = get_credentials(creds)
if project_id is None:
# If project_id is not specified explicitly, try using credentials' project_id.
# Explicit project_id takes precedence bacause credentials' project_id may be irrelevant.
# Explicit project_id takes precedence because credentials' project_id may be irrelevant.
# For example, with Workload Identity Federation for GKE, it's cluster project_id.
project_id = credentials_project_id
if project_id is None:
Expand Down
16 changes: 14 additions & 2 deletions src/dstack/_internal/core/backends/gcp/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ def create_instance(
self,
instance_offer: InstanceOfferWithAvailability,
instance_config: InstanceConfiguration,
placement_group: Optional[PlacementGroup],
) -> JobProvisioningData:
instance_name = generate_unique_instance_name(
instance_config, max_length=gcp_resources.MAX_RESOURCE_NAME_LEN
Expand Down Expand Up @@ -199,11 +200,11 @@ def create_instance(
instance_type_name=instance_offer.instance.name,
)
placement_policy = None
if instance_config.placement_group_name is not None:
if placement_group is not None:
placement_policy = gcp_resources.get_placement_policy_resource_name(
project_id=self.config.project_id,
region=instance_offer.region,
placement_policy=instance_config.placement_group_name,
placement_policy=placement_group.name,
)
labels = {
"owner": "dstack",
Expand Down Expand Up @@ -406,6 +407,7 @@ def update_provisioning_data(
def create_placement_group(
self,
placement_group: PlacementGroup,
master_instance_offer: InstanceOffer,
) -> PlacementGroupProvisioningData:
policy = compute_v1.ResourcePolicy(
name=placement_group.name,
Expand Down Expand Up @@ -440,6 +442,16 @@ def delete_placement_group(
raise PlacementGroupInUseError()
raise

def is_suitable_placement_group(
self,
placement_group: PlacementGroup,
instance_offer: InstanceOffer,
) -> bool:
return (
placement_group.configuration.backend == BackendType.GCP
and placement_group.configuration.region == instance_offer.region
)

def create_gateway(
self,
configuration: GatewayComputeConfiguration,
Expand Down
6 changes: 5 additions & 1 deletion src/dstack/_internal/core/backends/lambdalabs/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
InstanceOffer,
InstanceOfferWithAvailability,
)
from dstack._internal.core.models.placement import PlacementGroup
from dstack._internal.core.models.runs import JobProvisioningData, Requirements

MAX_INSTANCE_NAME_LEN = 60
Expand All @@ -46,7 +47,10 @@ def get_offers(
return offers_with_availability

def create_instance(
self, instance_offer: InstanceOfferWithAvailability, instance_config: InstanceConfiguration
self,
instance_offer: InstanceOfferWithAvailability,
instance_config: InstanceConfiguration,
placement_group: Optional[PlacementGroup],
) -> JobProvisioningData:
instance_name = generate_unique_instance_name(
instance_config, max_length=MAX_INSTANCE_NAME_LEN
Expand Down
2 changes: 2 additions & 0 deletions src/dstack/_internal/core/backends/local/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
InstanceType,
Resources,
)
from dstack._internal.core.models.placement import PlacementGroup
from dstack._internal.core.models.runs import Job, JobProvisioningData, Requirements, Run
from dstack._internal.core.models.volumes import Volume, VolumeProvisioningData
from dstack._internal.utils.logging import get_logger
Expand Down Expand Up @@ -53,6 +54,7 @@ def create_instance(
self,
instance_offer: InstanceOfferWithAvailability,
instance_config: InstanceConfiguration,
placement_group: Optional[PlacementGroup],
) -> JobProvisioningData:
return JobProvisioningData(
backend=instance_offer.backend,
Expand Down
Loading
Loading