Skip to content

Commit 16f91eb

Browse files
committed
examples: Use verification results in repo example
This is an example of using the verification resutls in a repository. The only remaining tricky part is in _get_verification_result(): * has to figure out the delegating metadata (something we currently cannot provide in repository.Repository for the general case) * Needs a special case for first root Signed-off-by: Jussi Kukkonen <[email protected]>
1 parent 26bdbbe commit 16f91eb

File tree

1 file changed

+42
-7
lines changed

1 file changed

+42
-7
lines changed

examples/repository/_simplerepo.py

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,13 @@
2020
Metadata,
2121
MetaFile,
2222
Root,
23+
RootVerificationResult,
24+
Signed,
2325
Snapshot,
2426
TargetFile,
2527
Targets,
2628
Timestamp,
29+
VerificationResult,
2730
)
2831
from tuf.repository import Repository
2932

@@ -89,6 +92,27 @@ def targets_infos(self) -> Dict[str, MetaFile]:
8992
def snapshot_info(self) -> MetaFile:
9093
return self._snapshot_info
9194

95+
def _get_verification_result(
96+
self, role: str, md: Metadata
97+
) -> VerificationResult | RootVerificationResult:
98+
"""Verify roles metadata using the existing repository metadata"""
99+
if role == Root.type:
100+
assert isinstance(md.signed, Root)
101+
root = self.root()
102+
if root.version == 0:
103+
# special case first root
104+
root = md.signed
105+
return md.signed.get_root_verification_result(
106+
root, md.signed_bytes, md.signatures
107+
)
108+
if role in [Timestamp.type, Snapshot.type, Targets.type]:
109+
delegator: Signed = self.root()
110+
else:
111+
delegator = self.targets()
112+
return delegator.get_verification_result(
113+
role, md.signed_bytes, md.signatures
114+
)
115+
92116
def open(self, role: str) -> Metadata:
93117
"""Return current Metadata for role from 'storage' (or create a new one)"""
94118

@@ -112,6 +136,14 @@ def close(self, role: str, md: Metadata) -> None:
112136
for signer in self.signer_cache[role]:
113137
md.sign(signer, append=True)
114138

139+
# Double check that we only write verified metadata
140+
vr = self._get_verification_result(role, md)
141+
if not vr:
142+
raise ValueError(f"Role {role} failed to verify")
143+
keyids = [keyid[:7] for keyid in vr.signed]
144+
verify_str = f"verified with keys [{', '.join(keyids)}]"
145+
logger.debug("Role %s v%d: %s", role, md.signed.version, verify_str)
146+
115147
# store new metadata version, update version caches
116148
self.role_cache[role].append(md)
117149
if role == "snapshot":
@@ -130,8 +162,6 @@ def add_target(self, path: str, content: str) -> None:
130162
with self.edit_targets() as targets:
131163
targets.targets[path] = TargetFile.from_data(path, data)
132164

133-
logger.debug("Targets v%d", targets.version)
134-
135165
# update snapshot, timestamp
136166
self.do_snapshot()
137167
self.do_timestamp()
@@ -157,8 +187,6 @@ def submit_delegation(self, rolename: str, data: bytes) -> bool:
157187
logger.info("Failed to add delegation for %s: %s", rolename, e)
158188
return False
159189

160-
logger.debug("Targets v%d", targets.version)
161-
162190
# update snapshot, timestamp
163191
self.do_snapshot()
164192
self.do_timestamp()
@@ -177,19 +205,26 @@ def submit_role(self, role: str, data: bytes) -> bool:
177205
if not targetpath.startswith(f"{role}/"):
178206
raise ValueError(f"targets allowed under {role}/ only")
179207

180-
self.targets().verify_delegate(role, md.signed_bytes, md.signatures)
181-
182208
if md.signed.version != self.targets(role).version + 1:
183209
raise ValueError("Invalid version {md.signed.version}")
184210

185211
except (RepositoryError, ValueError) as e:
186212
logger.info("Failed to add new version for %s: %s", role, e)
187213
return False
188214

215+
# Check that we only write verified metadata
216+
vr = self._get_verification_result(role, md)
217+
if not vr:
218+
logger.info("Role %s failed to verify", role)
219+
return False
220+
221+
keyids = [keyid[:7] for keyid in vr.signed]
222+
verify_str = f"verified with keys [{', '.join(keyids)}]"
223+
logger.debug("Role %s v%d: %s", role, md.signed.version, verify_str)
224+
189225
# Checks passed: Add new delegated role version
190226
self.role_cache[role].append(md)
191227
self._targets_infos[f"{role}.json"].version = md.signed.version
192-
logger.debug("%s v%d", role, md.signed.version)
193228

194229
# To keep it simple, target content is generated from targetpath
195230
for targetpath in md.signed.targets:

0 commit comments

Comments
 (0)