@@ -102,9 +102,9 @@ class ReplicaParam:
102
102
env : Dict [str , str ]
103
103
104
104
# IO stream files
105
- stdout : Optional [str ]
106
- stderr : Optional [str ]
107
- combined : Optional [str ]
105
+ stdout : Optional [str ] = None
106
+ stderr : Optional [str ] = None
107
+ combined : Optional [str ] = None
108
108
109
109
cwd : Optional [str ] = None
110
110
@@ -592,6 +592,15 @@ def run_opts(self) -> runopts:
592
592
help = "if set, prepends CWD to replica's PATH env var"
593
593
" making any binaries in CWD take precedence over those in PATH" ,
594
594
)
595
+ opts .add (
596
+ "auto_set_cuda_devices" ,
597
+ type_ = bool ,
598
+ default = False ,
599
+ help = "if set, sets the `CUDA_AVAILABLE_DEVICES` for roles that request GPU resources"
600
+ " The parameter will try to assign available GPUs to all role-replicas"
601
+ " in that requested GPU. If device count is less than total requested"
602
+ " GPUs across role-replicas, the `CUDA_AVAILABLE_DEVICES` will not be set" ,
603
+ )
595
604
return opts
596
605
597
606
def _validate (self , app : AppDef , scheduler : SchedulerBackend ) -> None :
@@ -768,6 +777,69 @@ def _submit_dryrun(
768
777
request = self ._to_popen_request (app , cfg )
769
778
return AppDryRunInfo (request , lambda p : pprint .pformat (p , indent = 2 , width = 80 ))
770
779
780
+ def _get_gpu_count (self ) -> int :
781
+ gpu_cmd = "nvidia-smi -L"
782
+ try :
783
+ log .debug (f"Running { gpu_cmd } " )
784
+ result = subprocess .run (
785
+ gpu_cmd .split (), capture_output = True , text = True , check = True
786
+ )
787
+ log .debug (f"Cmd { gpu_cmd } returned: { result } " )
788
+ gpus_info = [gpu_info for gpu_info in result .stdout .split ("\n " ) if gpu_info ]
789
+ return len (gpus_info )
790
+ except subprocess .CalledProcessError as e :
791
+ log .exception (f"Got exception while getting GPUs { e .stderr } " )
792
+ return 0
793
+ except Exception :
794
+ log .exception ("Got exception while getting GPUs" )
795
+ return 0
796
+
797
+ def _set_cuda_devices_for_role_replica (
798
+ self ,
799
+ replica : ReplicaParam ,
800
+ replica_id : int ,
801
+ requested_gpus : int ,
802
+ role_gpu_start_idx : int ,
803
+ ) -> None :
804
+ if requested_gpus <= 0 :
805
+ return
806
+ start_device = role_gpu_start_idx + requested_gpus * replica_id
807
+ end_device = role_gpu_start_idx + requested_gpus * (replica_id + 1 )
808
+ devices = list (range (start_device , end_device ))
809
+ visible_devices = "," .join ([str (device ) for device in devices ])
810
+ replica .env ["CUDA_VISIBLE_DEVICES" ] = visible_devices
811
+
812
+ def _update_env_cuda_visible_devices (
813
+ self ,
814
+ role_params : Dict [str , List [ReplicaParam ]],
815
+ app : AppDef ,
816
+ cfg : Mapping [str , CfgVal ],
817
+ ) -> None :
818
+ device_count = 0
819
+ total_requested_gpus = self ._get_total_requested_gpus (app .roles )
820
+ auto_set_cuda_devices = cfg .get ("auto_set_cuda_devices" , False )
821
+ if auto_set_cuda_devices and total_requested_gpus > 0 :
822
+ device_count = self ._get_gpu_count ()
823
+ if auto_set_cuda_devices and total_requested_gpus > device_count :
824
+ auto_set_cuda_devices = False
825
+ log .warning (
826
+ "Cannot set `CUDA_VISIBLE_DEVICES` due to "
827
+ f"Available GPUs { device_count } less than requested { total_requested_gpus } "
828
+ )
829
+ if not auto_set_cuda_devices :
830
+ return
831
+ role_gpu_start_idx = 0
832
+ for role in app .roles :
833
+ role_replicas = role_params [role .name ]
834
+ for replica_id , replica in enumerate (role_replicas ):
835
+ self ._set_cuda_devices_for_role_replica (
836
+ replica , replica_id , role .resource .gpu , role_gpu_start_idx
837
+ )
838
+ role_gpu_start_idx += role .resource .gpu * role .num_replicas
839
+
840
+ def _get_total_requested_gpus (self , roles : List [Role ]) -> int :
841
+ return sum ([role .resource .gpu * role .num_replicas for role in roles ])
842
+
771
843
def _to_popen_request (
772
844
self ,
773
845
app : AppDef ,
@@ -783,6 +855,7 @@ def _to_popen_request(
783
855
784
856
role_params : Dict [str , List [ReplicaParam ]] = {}
785
857
role_log_dirs : Dict [str , List [str ]] = {}
858
+
786
859
for role in app .roles :
787
860
replica_params = role_params .setdefault (role .name , [])
788
861
replica_log_dirs = role_log_dirs .setdefault (role .name , [])
@@ -796,8 +869,8 @@ def _to_popen_request(
796
869
replica_id = str (replica_id ),
797
870
)
798
871
replica_role = values .apply (role )
799
- replica_log_dir = os .path .join (app_log_dir , role .name , str (replica_id ))
800
872
873
+ replica_log_dir = os .path .join (app_log_dir , role .name , str (replica_id ))
801
874
if "TORCHELASTIC_ERROR_FILE" not in replica_role .env :
802
875
# this is the top level (agent if using elastic role) error file
803
876
# a.k.a scheduler reply file
@@ -816,7 +889,7 @@ def _to_popen_request(
816
889
)
817
890
)
818
891
replica_log_dirs .append (replica_log_dir )
819
-
892
+ self . _update_env_cuda_visible_devices ( role_params , app , cfg )
820
893
return PopenRequest (app_id , app_log_dir , role_params , role_log_dirs )
821
894
822
895
def describe (self , app_id : str ) -> Optional [DescribeAppResponse ]:
0 commit comments