15
15
import time
16
16
import unittest
17
17
from contextlib import contextmanager
18
+ from dataclasses import dataclass
18
19
from datetime import datetime
19
20
from os .path import join
20
21
from typing import Callable , Generator , Optional
32
33
join_PATH ,
33
34
make_unique ,
34
35
)
35
- from torchx .specs .api import AppDef , AppState , Role , is_terminal , macros
36
+ from torchx .specs .api import AppDef , AppState , Role , is_terminal , macros , Resource
36
37
37
38
from .test_util import write_shell_script
38
39
@@ -828,6 +829,102 @@ def test_close_twice(self) -> None:
828
829
self .scheduler .close ()
829
830
# nothing to validate just make sure no errors are raised
830
831
832
+ def test_get_gpu_count (self ) -> None :
833
+ @dataclass
834
+ class ProcResult :
835
+ stdout : str
836
+
837
+ nvidia_smi_out = (
838
+ "GPU 0: Tesla V100-SXM2-16GB (UUID: GPU-196a22c5-717b-66db-0acc-58cde6f3df85)\n "
839
+ "GPU 1: Tesla V100-SXM2-16GB (UUID: GPU-45e9165d-4f7e-d954-7ff5-481bc2c0ec7b)\n "
840
+ "GPU 2: Tesla V100-SXM2-16GB (UUID: GPU-26e22503-5fd5-8f55-d068-e1714fbb6fd6)\n "
841
+ "GPU 3: Tesla V100-SXM2-16GB (UUID: GPU-ebfc20c7-5f1a-1bc9-0d98-601cbe21fc2d)\n "
842
+ )
843
+
844
+ stdout = nvidia_smi_out
845
+ result = ProcResult (stdout )
846
+ with patch ("subprocess.run" , return_value = result ):
847
+ gpu_count = self .scheduler ._get_gpu_count ()
848
+ self .assertEqual (4 , gpu_count )
849
+
850
+ def test_get_gpu_count_error (self ) -> None :
851
+ with patch ("subprocess.run" , side_effect = Exception ("test error" )):
852
+ gpu_count = self .scheduler ._get_gpu_count ()
853
+ self .assertEqual (0 , gpu_count )
854
+
855
+ def test_set_cuda_devices (self ) -> None :
856
+ role = Role (
857
+ name = "sleep" ,
858
+ image = self .test_dir ,
859
+ entrypoint = "sleep.sh" ,
860
+ args = ["60" ],
861
+ num_replicas = 4 ,
862
+ resource = Resource (gpu = 4 , cpu = 0 , memMB = 0 ),
863
+ )
864
+ self .scheduler ._set_cuda_devices (role , 0 , 0 )
865
+ self .assertEqual ("0,1,2,3" , role .env ["CUDA_VISIBLE_DEVICES" ])
866
+ self .scheduler ._set_cuda_devices (role , 1 , 0 )
867
+ self .assertEqual ("4,5,6,7" , role .env ["CUDA_VISIBLE_DEVICES" ])
868
+ self .scheduler ._set_cuda_devices (role , 1 , 10 )
869
+ self .assertEqual ("14,15,16,17" , role .env ["CUDA_VISIBLE_DEVICES" ])
870
+
871
+ def test_get_cuda_devices_is_set (self ) -> None :
872
+ with patch .object (self .scheduler , "_get_gpu_count" , return_value = 16 ):
873
+ appdef = AppDef (
874
+ name = "role1" ,
875
+ roles = [
876
+ Role (
877
+ name = "role1" ,
878
+ image = self .test_dir ,
879
+ entrypoint = "train" ,
880
+ resource = Resource (gpu = 2 , cpu = 0 , memMB = 0 ),
881
+ num_replicas = 2 ,
882
+ ),
883
+ Role (
884
+ name = "role2" ,
885
+ image = self .test_dir ,
886
+ entrypoint = "train" ,
887
+ resource = Resource (gpu = 3 , cpu = 0 , memMB = 0 ),
888
+ num_replicas = 2 ,
889
+ ),
890
+ ],
891
+ )
892
+
893
+ popen_req = self .scheduler ._to_popen_request (
894
+ appdef , {"auto_set_cuda_devices" : True }
895
+ )
896
+ role1_params = popen_req .role_params ["role1" ]
897
+ self .assertEqual (2 , len (role1_params ))
898
+ self .assertEqual ("0,1" , role1_params [0 ].env ["CUDA_VISIBLE_DEVICES" ])
899
+ self .assertEqual ("2,3" , role1_params [1 ].env ["CUDA_VISIBLE_DEVICES" ])
900
+ role2_params = popen_req .role_params ["role2" ]
901
+ self .assertEqual (2 , len (role2_params ))
902
+ self .assertEqual ("4,5,6" , role2_params [0 ].env ["CUDA_VISIBLE_DEVICES" ])
903
+ self .assertEqual ("7,8,9" , role2_params [1 ].env ["CUDA_VISIBLE_DEVICES" ])
904
+
905
+ def test_get_cuda_devices_not_set (self ) -> None :
906
+ with patch .object (self .scheduler , "_get_gpu_count" , return_value = 8 ):
907
+ trainer1 = AppDef (
908
+ name = "trainer1" ,
909
+ roles = [
910
+ Role (
911
+ name = "trainer1" ,
912
+ image = self .test_dir ,
913
+ entrypoint = "trainer1.sh" ,
914
+ resource = Resource (gpu = 4 , cpu = 0 , memMB = 0 ),
915
+ num_replicas = 4 ,
916
+ )
917
+ ],
918
+ )
919
+
920
+ popen_req = self .scheduler ._to_popen_request (trainer1 , {})
921
+ role_params = popen_req .role_params ["trainer1" ]
922
+ self .assertEqual (4 , len (role_params ))
923
+ self .assertFalse ("CUDA_VISIBLE_DEVICES" in role_params [0 ].env )
924
+ self .assertFalse ("CUDA_VISIBLE_DEVICES" in role_params [1 ].env )
925
+ self .assertFalse ("CUDA_VISIBLE_DEVICES" in role_params [2 ].env )
926
+ self .assertFalse ("CUDA_VISIBLE_DEVICES" in role_params [3 ].env )
927
+
831
928
def test_no_orphan_process_function (self ) -> None :
832
929
self ._test_orphan_workflow ()
833
930
@@ -839,6 +936,9 @@ def _test_orphan_workflow(self) -> None:
839
936
target = start_sleep_processes , args = (self .test_dir , mp_queue , child_nproc )
840
937
)
841
938
proc .start ()
939
+ # Before querying the queue we need to wait
940
+ # Otherwise we will get `FileNotFoundError: [Errno 2] No such file or directory` error
941
+ time .sleep (10 )
842
942
total_processes = child_nproc + 1
843
943
pids = []
844
944
for _ in range (total_processes ):
0 commit comments