Skip to content

Improve verification results #2551

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 41 additions & 8 deletions examples/repository/_simplerepo.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import logging
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Dict, List
from typing import Dict, List, Union

from securesystemslib import keys
from securesystemslib.signer import Key, Signer, SSlibKey, SSlibSigner
Expand All @@ -20,10 +20,13 @@
Metadata,
MetaFile,
Root,
RootVerificationResult,
Signed,
Snapshot,
TargetFile,
Targets,
Timestamp,
VerificationResult,
)
from tuf.repository import Repository

Expand Down Expand Up @@ -89,6 +92,25 @@ def targets_infos(self) -> Dict[str, MetaFile]:
def snapshot_info(self) -> MetaFile:
return self._snapshot_info

def _get_verification_result(
self, role: str, md: Metadata
) -> Union[VerificationResult, RootVerificationResult]:
"""Verify roles metadata using the existing repository metadata"""
if role == Root.type:
assert isinstance(md.signed, Root)
root = self.root()
previous = root if root.version > 0 else None
return md.signed.get_root_verification_result(
previous, md.signed_bytes, md.signatures
)
if role in [Timestamp.type, Snapshot.type, Targets.type]:
delegator: Signed = self.root()
else:
delegator = self.targets()
return delegator.get_verification_result(
role, md.signed_bytes, md.signatures
)

def open(self, role: str) -> Metadata:
"""Return current Metadata for role from 'storage' (or create a new one)"""

Expand All @@ -112,6 +134,14 @@ def close(self, role: str, md: Metadata) -> None:
for signer in self.signer_cache[role]:
md.sign(signer, append=True)

# Double check that we only write verified metadata
vr = self._get_verification_result(role, md)
if not vr:
raise ValueError(f"Role {role} failed to verify")
keyids = [keyid[:7] for keyid in vr.signed]
verify_str = f"verified with keys [{', '.join(keyids)}]"
logger.debug("Role %s v%d: %s", role, md.signed.version, verify_str)

# store new metadata version, update version caches
self.role_cache[role].append(md)
if role == "snapshot":
Expand All @@ -130,8 +160,6 @@ def add_target(self, path: str, content: str) -> None:
with self.edit_targets() as targets:
targets.targets[path] = TargetFile.from_data(path, data)

logger.debug("Targets v%d", targets.version)

# update snapshot, timestamp
self.do_snapshot()
self.do_timestamp()
Expand All @@ -157,8 +185,6 @@ def submit_delegation(self, rolename: str, data: bytes) -> bool:
logger.info("Failed to add delegation for %s: %s", rolename, e)
return False

logger.debug("Targets v%d", targets.version)

# update snapshot, timestamp
self.do_snapshot()
self.do_timestamp()
Expand All @@ -177,19 +203,26 @@ def submit_role(self, role: str, data: bytes) -> bool:
if not targetpath.startswith(f"{role}/"):
raise ValueError(f"targets allowed under {role}/ only")

self.targets().verify_delegate(role, md.signed_bytes, md.signatures)

if md.signed.version != self.targets(role).version + 1:
raise ValueError("Invalid version {md.signed.version}")

except (RepositoryError, ValueError) as e:
logger.info("Failed to add new version for %s: %s", role, e)
return False

# Check that we only write verified metadata
vr = self._get_verification_result(role, md)
if not vr:
logger.info("Role %s failed to verify", role)
return False

keyids = [keyid[:7] for keyid in vr.signed]
verify_str = f"verified with keys [{', '.join(keyids)}]"
logger.debug("Role %s v%d: %s", role, md.signed.version, verify_str)

# Checks passed: Add new delegated role version
self.role_cache[role].append(md)
self._targets_infos[f"{role}.json"].version = md.signed.version
logger.debug("%s v%d", role, md.signed.version)

# To keep it simple, target content is generated from targetpath
for targetpath in md.signed.targets:
Expand Down
213 changes: 162 additions & 51 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import sys
import tempfile
import unittest
from copy import copy
from copy import copy, deepcopy
from datetime import datetime, timedelta
from typing import Any, ClassVar, Dict, Optional

Expand Down Expand Up @@ -41,6 +41,7 @@
Metadata,
MetaFile,
Root,
RootVerificationResult,
Signature,
Snapshot,
SuccinctRoles,
Expand All @@ -55,7 +56,7 @@
logger = logging.getLogger(__name__)


# pylint: disable=too-many-public-methods
# pylint: disable=too-many-public-methods,too-many-statements
class TestMetadata(unittest.TestCase):
"""Tests for public API of all classes in 'tuf/api/metadata.py'."""

Expand Down Expand Up @@ -471,95 +472,205 @@ def test_signed_verify_delegate(self) -> None:
Snapshot.type, snapshot_md.signed_bytes, snapshot_md.signatures
)

def test_verification_result(self) -> None:
vr = VerificationResult(3, {"a": None}, {"b": None})
self.assertEqual(vr.missing, 2)
self.assertFalse(vr.verified)
self.assertFalse(vr)

# Add a signature
vr.signed["c"] = None
self.assertEqual(vr.missing, 1)
self.assertFalse(vr.verified)
self.assertFalse(vr)

# Add last missing signature
vr.signed["d"] = None
self.assertEqual(vr.missing, 0)
self.assertTrue(vr.verified)
self.assertTrue(vr)

# Add one more signature
vr.signed["e"] = None
self.assertEqual(vr.missing, 0)
self.assertTrue(vr.verified)
self.assertTrue(vr)

def test_root_verification_result(self) -> None:
vr1 = VerificationResult(3, {"a": None}, {"b": None})
vr2 = VerificationResult(1, {"c": None}, {"b": None})

vr = RootVerificationResult(vr1, vr2)
self.assertEqual(vr.signed, {"a": None, "c": None})
self.assertEqual(vr.unsigned, {"b": None})
self.assertFalse(vr.verified)
self.assertFalse(vr)

vr1.signed["c"] = None
vr1.signed["f"] = None
self.assertEqual(vr.signed, {"a": None, "c": None, "f": None})
self.assertEqual(vr.unsigned, {"b": None})
self.assertTrue(vr.verified)
self.assertTrue(vr)

def test_signed_get_verification_result(self) -> None:
# Setup: Load test metadata and keys
root_path = os.path.join(self.repo_dir, "metadata", "root.json")
root = Metadata[Root].from_file(root_path)
initial_root_keyids = root.signed.roles[Root.type].keyids
self.assertEqual(len(initial_root_keyids), 1)
key1_id = initial_root_keyids[0]
key2 = self.keystore[Timestamp.type]
key2_id = key2["keyid"]

key1_id = root.signed.roles[Root.type].keyids[0]
key1 = root.signed.get_key(key1_id)

key2_id = root.signed.roles[Timestamp.type].keyids[0]
key2 = root.signed.get_key(key2_id)
priv_key2 = self.keystore[Timestamp.type]

key3_id = "123456789abcdefg"
key4 = self.keystore[Snapshot.type]
key4_id = key4["keyid"]
priv_key4 = self.keystore[Snapshot.type]
key4_id = priv_key4["keyid"]

# Test: 1 authorized key, 1 valid signature
result = root.signed.get_verification_result(
Root.type, root.signed_bytes, root.signatures
)
self.assertTrue(result.verified)
self.assertEqual(result.signed, {key1_id})
self.assertEqual(result.unsigned, set())
self.assertTrue(result)
self.assertEqual(result.signed, {key1_id: key1})
self.assertEqual(result.unsigned, {})

# Test: 2 authorized keys, 1 invalid signature
# Adding a key, i.e. metadata change, invalidates existing signature
root.signed.add_key(
SSlibKey.from_securesystemslib_key(key2),
Root.type,
)
root.signed.add_key(key2, Root.type)
result = root.signed.get_verification_result(
Root.type, root.signed_bytes, root.signatures
)
self.assertFalse(result.verified)
self.assertEqual(result.signed, set())
self.assertEqual(result.unsigned, {key1_id, key2_id})
self.assertFalse(result)
self.assertEqual(result.signed, {})
self.assertEqual(result.unsigned, {key1_id: key1, key2_id: key2})

# Test: 3 authorized keys, 1 invalid signature, 1 key missing key data
# Adding a keyid w/o key, fails verification the same as no signature
# or an invalid signature for that key
# Adding a keyid w/o key, fails verification but this key is not listed
# in unsigned
root.signed.roles[Root.type].keyids.append(key3_id)
result = root.signed.get_verification_result(
Root.type, root.signed_bytes, root.signatures
)
self.assertFalse(result.verified)
self.assertEqual(result.signed, set())
self.assertEqual(result.unsigned, {key1_id, key2_id, key3_id})
self.assertFalse(result)
self.assertEqual(result.signed, {})
self.assertEqual(result.unsigned, {key1_id: key1, key2_id: key2})

# Test: 3 authorized keys, 1 valid signature, 1 invalid signature, 1
# key missing key data
root.sign(SSlibSigner(key2), append=True)
root.sign(SSlibSigner(priv_key2), append=True)
result = root.signed.get_verification_result(
Root.type, root.signed_bytes, root.signatures
)
self.assertTrue(result.verified)
self.assertEqual(result.signed, {key2_id})
self.assertEqual(result.unsigned, {key1_id, key3_id})
self.assertTrue(result)
self.assertEqual(result.signed, {key2_id: key2})
self.assertEqual(result.unsigned, {key1_id: key1})

# Test: 3 authorized keys, 1 valid signature, 1 invalid signature, 1
# key missing key data, 1 ignored unrelated signature
root.sign(SSlibSigner(key4), append=True)
root.sign(SSlibSigner(priv_key4), append=True)
self.assertEqual(
set(root.signatures.keys()), {key1_id, key2_id, key4_id}
)
self.assertTrue(result.verified)
self.assertEqual(result.signed, {key2_id})
self.assertEqual(result.unsigned, {key1_id, key3_id})
self.assertTrue(result)
self.assertEqual(result.signed, {key2_id: key2})
self.assertEqual(result.unsigned, {key1_id: key1})

# See test_signed_verify_delegate for more related tests ...

def test_signed_verification_result_union(self) -> None:
# Test all possible "unions" (AND) of "verified" field
data = [
(True, True, True),
(True, False, False),
(False, True, False),
(False, False, False),
]
def test_root_get_root_verification_result(self) -> None:
# Setup: Load test metadata and keys
root_path = os.path.join(self.repo_dir, "metadata", "root.json")
root = Metadata[Root].from_file(root_path)

key1_id = root.signed.roles[Root.type].keyids[0]
key1 = root.signed.get_key(key1_id)

key2_id = root.signed.roles[Timestamp.type].keyids[0]
key2 = root.signed.get_key(key2_id)
priv_key2 = self.keystore[Timestamp.type]

for a_part, b_part, ab_part in data:
self.assertEqual(
VerificationResult(a_part, set(), set()).union(
VerificationResult(b_part, set(), set())
),
VerificationResult(ab_part, set(), set()),
priv_key4 = self.keystore[Snapshot.type]

# Test: Verify with no previous root version
result = root.signed.get_root_verification_result(
None, root.signed_bytes, root.signatures
)
self.assertTrue(result)
self.assertEqual(result.signed, {key1_id: key1})
self.assertEqual(result.unsigned, {})

# Test: Verify with other root that is not version N-1
prev_root: Metadata[Root] = deepcopy(root)
with self.assertRaises(ValueError):
result = root.signed.get_root_verification_result(
prev_root.signed, root.signed_bytes, root.signatures
)

# Test exemplary union (|) of "signed" and "unsigned" fields
a = VerificationResult(True, {"1"}, {"2"})
b = VerificationResult(True, {"3"}, {"4"})
ab = VerificationResult(True, {"1", "3"}, {"2", "4"})
self.assertEqual(a.union(b), ab)
# Test: Verify with previous root
prev_root.signed.version -= 1
result = root.signed.get_root_verification_result(
prev_root.signed, root.signed_bytes, root.signatures
)
self.assertTrue(result)
self.assertEqual(result.signed, {key1_id: key1})
self.assertEqual(result.unsigned, {})

# Test: Add a signer to previous root (threshold still 1)
prev_root.signed.add_key(key2, Root.type)
result = root.signed.get_root_verification_result(
prev_root.signed, root.signed_bytes, root.signatures
)
self.assertTrue(result)
self.assertEqual(result.signed, {key1_id: key1})
self.assertEqual(result.unsigned, {key2_id: key2})

# Test: Increase threshold in previous root
prev_root.signed.roles[Root.type].threshold += 1
result = root.signed.get_root_verification_result(
prev_root.signed, root.signed_bytes, root.signatures
)
self.assertFalse(result)
self.assertEqual(result.signed, {key1_id: key1})
self.assertEqual(result.unsigned, {key2_id: key2})

# Test: Sign root with both keys
root.sign(SSlibSigner(priv_key2), append=True)
result = root.signed.get_root_verification_result(
prev_root.signed, root.signed_bytes, root.signatures
)
self.assertTrue(result)
self.assertEqual(result.signed, {key1_id: key1, key2_id: key2})
self.assertEqual(result.unsigned, {})

# Test: Sign root with an unrelated key
root.sign(SSlibSigner(priv_key4), append=True)
result = root.signed.get_root_verification_result(
prev_root.signed, root.signed_bytes, root.signatures
)
self.assertTrue(result)
self.assertEqual(result.signed, {key1_id: key1, key2_id: key2})
self.assertEqual(result.unsigned, {})

# Test: Remove key1 from previous root
prev_root.signed.revoke_key(key1_id, Root.type)
result = root.signed.get_root_verification_result(
prev_root.signed, root.signed_bytes, root.signatures
)
self.assertFalse(result)
self.assertEqual(result.signed, {key1_id: key1, key2_id: key2})
self.assertEqual(result.unsigned, {})

# Test: Lower threshold in previous root
prev_root.signed.roles[Root.type].threshold -= 1
result = root.signed.get_root_verification_result(
prev_root.signed, root.signed_bytes, root.signatures
)
self.assertTrue(result)
self.assertEqual(result.signed, {key1_id: key1, key2_id: key2})
self.assertEqual(result.unsigned, {})

def test_key_class(self) -> None:
# Test if from_securesystemslib_key removes the private key from keyval
Expand Down
Loading