Skip to content

Refactoring for error messages and security fix for path echoing #636

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions lib/error_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ def end_error(*errors):


def format_error(*errors):
err = 'Error: '
err = ''

for error in errors:
err += str(error)
err += str(error) + "\n"

error_string = f"""
\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< 0_o >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n
{err}
Error: {err}
\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< 0_o >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n
{traceback.format_exc()}
\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< 0_o >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n
Expand All @@ -30,11 +30,15 @@ def format_error(*errors):
def log_error(*errors):
error_log_file = GlobalConfig().config['machine']['error_log_file']

err = ''
for error in errors:
err += str(error) + "\n"

if error_log_file:
try:
with open(error_log_file, 'a', encoding='utf-8') as file:
print('\n\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< 0_o >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n\n', file=file)
print('Error: ', *errors, file=file)
print('Error: ', err, file=file)
print('\n\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< 0_o >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n\n', file=file)
print(traceback.format_exc(), file=file)
print('\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< 0_o >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n\n', file=file)
Expand All @@ -46,6 +50,6 @@ def log_error(*errors):
'\n\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< 0_o >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n\n', file=sys.stderr)
print(traceback.format_exc(), file=sys.stderr)
print('\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< 0_o >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n\n', file=sys.stderr)
print('Error: ', *errors, file=sys.stderr)
print('Error: ', err, file=sys.stderr)
print('\n\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< 0_o >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n\n',
TerminalColors.ENDC, file=sys.stderr)
35 changes: 19 additions & 16 deletions runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def arrows(text):
# path with `..`, symbolic links or similar.
# We always return the same error message including the path and file parameter, never `filename` as
# otherwise we might disclose if certain files exist or not.
def join_paths(path, path2, mode=None):
def join_paths(path, path2, mode='file'):
filename = os.path.realpath(os.path.join(path, path2))

# If the original path is a symlink we need to resolve it.
Expand All @@ -59,23 +59,23 @@ def join_paths(path, path2, mode=None):
return filename

if not filename.startswith(path):
raise ValueError(f"{path2} not in {path}")
raise ValueError(f"{path2} must not be in folder above {path}")

# To double check we also check if it is in the files allow list

if mode == 'file':
folder_content = [str(item) for item in Path(path).rglob("*") if item.is_file()]
elif mode == 'dir':
elif mode == 'directory':
folder_content = [str(item) for item in Path(path).rglob("*") if item.is_dir()]
else:
folder_content = [str(item) for item in Path(path).rglob("*")]
raise RuntimeError(f"Unknown mode supplied for join_paths: {mode}")

if filename not in folder_content:
raise ValueError(f"{path2} not in {path}")
raise ValueError(f"{mode.capitalize()} '{path2}' not in '{path}'")

# Another way to implement this. This is checking the third time but we want to be extra secure 👾
if Path(path).resolve(strict=True) not in Path(path, path2).resolve(strict=True).parents:
raise ValueError(f"{path2} not in {path}")
raise ValueError(f"{mode.capitalize()} '{path2}' not in folder '{path}'")

if os.path.exists(filename):
return filename
Expand Down Expand Up @@ -574,7 +574,7 @@ def build_docker_images(self):
self.__notes_helper.add_note({'note': f"Building {service['image']}", 'detail_name': '[NOTES]', 'timestamp': int(time.time_ns() / 1_000)})

# Make sure the context docker file exists and is not trying to escape some root. We don't need the returns
context_path = join_paths(self._folder, context, 'dir')
context_path = join_paths(self._folder, context, 'directory')
join_paths(context_path, dockerfile, 'file')

docker_build_command = ['docker', 'run', '--rm',
Expand Down Expand Up @@ -742,16 +742,19 @@ def setup_services(self):
# We always assume the format to be ./dir:dir:[flag] as if we allow none bind mounts people
# could create volumes that would linger on our system.
path = os.path.realpath(os.path.join(self._folder, vol[0]))
if not os.path.exists(path):
raise RuntimeError(f"Service '{service_name}' volume path does not exist: {path}")

# Check that the path starts with self._folder
# We need to do this first, as telling first if path exists or not will tell about our filesystem structure
if not path.startswith(self._folder):
raise RuntimeError(f"Service '{service_name}' trying to escape folder: {path}")
raise RuntimeError(f"Service '{service_name}' volume path ({vol[0]}) is outside allowed folder: {path}")

if not os.path.exists(path):
raise RuntimeError(f"Service '{service_name}' volume path does not exist: {path}")

# To double check we also check if it is in the files allow list
if path not in [str(item) for item in Path(self._folder).rglob("*")]:
raise RuntimeError(f"Service '{service_name}' volume '{path}' not in allowed file list")
allowed_files_list = [str(item) for item in Path(self._folder).rglob("*")]
if path not in allowed_files_list:
raise RuntimeError(f"Service '{service_name}' volume '{path}' not in allowed file list:\n{chr(10).join(allowed_files_list)}\n\nOnly files from the supplied repository are allowed.")

if len(vol) == 3:
if vol[2] != 'ro':
Expand Down Expand Up @@ -1505,13 +1508,13 @@ def run(self):
print('####################################################################################\n\n', TerminalColors.ENDC)

except FileNotFoundError as e:
error_helpers.log_error('File or executable not found', e, runner._run_id)
error_helpers.log_error('File or executable not found', e, "\n", f"Run-ID: {runner._run_id}")
except subprocess.CalledProcessError as e:
error_helpers.log_error('Command failed', 'Stdout:', e.stdout, 'Stderr:', e.stderr, runner._run_id)
error_helpers.log_error('Command failed', 'Stdout:', e.stdout, 'Stderr:', e.stderr, "\n", f"Run-ID: {runner._run_id}")
except RuntimeError as e:
error_helpers.log_error('RuntimeError occured in runner.py: ', e, runner._run_id)
error_helpers.log_error('RuntimeError occured in runner.py: ', e, "\n", f"Run-ID: {runner._run_id}")
except BaseException as e:
error_helpers.log_error('Base exception occured in runner.py: ', e, runner._run_id)
error_helpers.log_error('Base exception occured in runner.py: ', e, "\n", f"Run-ID: {runner._run_id}")
finally:
if args.print_logs:
for container_id_outer, std_out in runner.get_logs().items():
Expand Down
9 changes: 5 additions & 4 deletions tests/test_volume_loading.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ def test_volume_load_no_escape():
container_running = check_if_container_running('test-container')
runner.cleanup()

expected_error = 'trying to escape folder: /etc/passwd'
assert expected_error in str(e.value), Tests.assertion_info(expected_error, str(e.value))
expected_error = 'Service \'test-container\' volume path (/etc/passwd) is outside allowed folder:'
assert str(e.value).startswith(expected_error), Tests.assertion_info(expected_error, str(e.value))
assert container_running is False, Tests.assertion_info('test-container stopped', 'test-container was still running!')

def create_tmp_dir():
Expand Down Expand Up @@ -106,6 +106,7 @@ def test_load_files_from_within_gmt():
def test_symlinks_should_fail():
tmp_dir, tmp_dir_name = create_tmp_dir()
# make a symlink to /etc/passwords in tmp_dir
symlink = os.path.join(tmp_dir, 'symlink')
os.symlink('/etc/passwd', os.path.join(tmp_dir, 'symlink'))

copy_compose_and_edit_directory('volume_load_symlinks_negative.yml', tmp_dir)
Expand All @@ -120,8 +121,8 @@ def test_symlinks_should_fail():
container_running = check_if_container_running('test-container')
runner.cleanup()

expected_error = 'trying to escape folder: /etc/passwd'
assert expected_error in str(e.value), Tests.assertion_info(expected_error, str(e.value))
expected_error = f"Service 'test-container' volume path ({symlink}) is outside allowed folder:"
assert str(e.value).startswith(expected_error), Tests.assertion_info(expected_error, str(e.value))
assert container_running is False, Tests.assertion_info('test-container stopped', 'test-container was still running!')

def test_non_bind_mounts_should_fail():
Expand Down