Skip to content

Remove component versions in a single passs #2803

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Simplify the `Template` class
- Simplify `Component` lifecycle by removing `Component.pre_create`
- Renamed `Component.init` to `Component.setup`
- Require `.upstream` when setting upstream dependencies
- Disable ability to remove components verion-by-version

#### New Features & Functionality

Expand Down
11 changes: 6 additions & 5 deletions superduper/base/apply.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@
from rich.console import Console

from superduper import Component, logging
from superduper.base.datatype import Blob
from superduper.base.document import Document
from superduper.base.event import Create, Signal, Update
from superduper.base.metadata import NonExistentMetadataError
from superduper.base.query import Query
from superduper.components.component import Status
from superduper.misc.tree import dict_to_tree

Expand Down Expand Up @@ -190,6 +188,7 @@ def _apply(
object.db = db

create_events = {}
children = []

def wrapper(child):
nonlocal create_events
Expand All @@ -199,14 +198,14 @@ def wrapper(child):
object=child,
context=context,
job_events=job_events,
parent=[object.component, object.uuid],
parent=[object.component, object.identifier, object.uuid],
global_diff=global_diff,
non_breaking_changes=non_breaking_changes,
)

job_events.update(j)
create_events.update(c)

children.append((child.component, child.identifier, child.uuid))
return f'&:component:{child.huuid}'

try:
Expand Down Expand Up @@ -246,7 +245,7 @@ def wrapper(child):
# instances.
serialized = serialized.map(wrapper, lambda x: isinstance(x, Component))
Document(object.metadata).map(wrapper, lambda x: isinstance(x, Component))
serialized = db._save_artifact(object.uuid, serialized.encode())
serialized = db._save_artifact(serialized.encode())

if apply_status == 'same':
return create_events, job_events
Expand All @@ -258,6 +257,7 @@ def wrapper(child):
path=object.__module__ + '.' + object.__class__.__name__,
data=serialized,
parent=parent,
children=children,
)

these_job_events = object.create_jobs(
Expand All @@ -272,6 +272,7 @@ def wrapper(child):
path=object.__module__ + '.' + object.__class__.__name__,
data=serialized,
parent=parent,
children=children,
)

these_job_events = object.create_jobs(
Expand Down
192 changes: 67 additions & 125 deletions superduper/base/datalayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from collections import namedtuple

import click
import pandas

import superduper as s
from superduper import CFG, logging
Expand All @@ -13,6 +14,7 @@
from superduper.base.config import Config
from superduper.base.datatype import BaseType, ComponentType
from superduper.base.document import Document
from superduper.base.event import Delete
from superduper.base.metadata import (
MetaDataStore,
NonExistentMetadataError,
Expand Down Expand Up @@ -261,7 +263,6 @@ def remove(
self,
component: str,
identifier: str,
version: t.Optional[int] = None,
recursive: bool = False,
force: bool = False,
):
Expand All @@ -271,48 +272,77 @@ def remove(
:param component: Cmponent to remove ('Model', 'Listener', etc.)
:param identifier: Identifier of the component (refer to
`container.base.Component`).
:param version: [Optional] Numerical version to remove.
:param recursive: Toggle to remove all descendants of the component.
:param force: Force skip confirmation (use with caution).
:param force: Toggle to force remove the component.
"""
if version is not None:
return self._remove_component_version(
component, identifier, version=version, force=force, recursive=recursive
events: t.List[Delete] = []
failed: t.List[str] = []
self._build_remove(
component=component,
identifier=identifier,
events=events,
failed=failed,
recursive=recursive,
)

if failed and not force:
raise exceptions.ComponentInUseError(
f'Failed to remove {component}:{identifier} because the'
' following components are in use:\n'
'\n'.join(failed) + '\n'
)

versions = self.metadata.show_component_versions(component, identifier)
versions_in_use = []
for v in versions:
if self.metadata.component_version_has_parents(component, identifier, v):
versions_in_use.append(v)

if versions_in_use:
component_versions_in_use = []
for v in versions_in_use:
uuid = self.metadata.get_uuid(component, identifier, v)
component_versions_in_use.append(
f"{uuid} -> "
f"{self.metadata.get_component_version_parents(uuid)}",
)
if not force:
raise exceptions.ComponentInUseError(
f'Component versions: {component_versions_in_use} are in use'
)
for i, e in enumerate(events):
logging.info(
f'Removing component [{i + 1}/{len(events)}] '
f'{e.component}:{e.identifier}'
)
e.execute(self)
logging.info(
f'Removing component [{i + 1}/{len(events)}] '
f'{e.component}:{e.identifier}... DONE'
)

if force or click.confirm(
f'You are about to delete {component}/{identifier}, are you sure?',
default=False,
):
for v in sorted(list(set(versions) - set(versions_in_use))):
self._remove_component_version(
component, identifier, v, recursive=recursive, force=True
)
def _build_remove(
self,
component: str,
identifier: str,
events: t.List,
failed: t.List,
recursive: bool = False,
):

for v in sorted(versions_in_use):
self.metadata.hide_component_version(component, identifier, v)
object = self.load(component=component, identifier=identifier)

else:
logging.warn('aborting.')
previous = [e.huuid for e in events]

parents = self.metadata.get_component_parents(
component=component, identifier=identifier
)
fail = False
if parents:
# Only fail the deletion attempt if the parents aren't in this cascade
for p in parents:
if f'{p[0]}:{p[1]}' not in previous:
failed.append(f'{component}:{identifier} -> {p[0]}:{p[1]}')
fail = True

# If the deletion fails, we need to stop
if fail:
return

events.append(Delete(component=component, identifier=identifier))

if recursive:
children = object.get_children()
for c in children:
self._build_remove(
c.component,
c.identifier,
recursive=True,
events=events,
failed=failed,
)

def load_all(self, component: str, **kwargs) -> t.List[Component]:
"""Load all instances of component.
Expand Down Expand Up @@ -411,102 +441,14 @@ def load(
)
return c

def _remove_component_version(
self,
component: str,
identifier: str,
version: int,
force: bool = False,
recursive: bool = False,
):
# TODO - change this logic for a not version-by-version deletion
if version is None:
return

try:
r = self.metadata.get_component(component, identifier, version=version)
except NonExistentMetadataError:
logging.warn(
f'Component {component}:{identifier}:{version} has already been removed'
)
return

if self.metadata.component_version_has_parents(component, identifier, version):
parents = self.metadata.get_component_version_parents(r['uuid'])
raise exceptions.ComponentInUseError(
f'{r["uuid"]} is involved in other components: {parents}'
)

if not (
force
or click.confirm(
f'You are about to delete {component}/{identifier}{version}, '
'are you sure?',
default=False,
)
):
return

object = self.load(
component,
identifier,
version=version,
)
info = self.metadata.get_component(
component, identifier, version=version, allow_hidden=force
)
object.cleanup()
self._delete_artifacts(r['uuid'], info)
self.metadata.delete_component_version(component, identifier, version=version)
self.metadata.delete_parent_child_relationships(r['uuid'])

if not recursive:
return

children = object.get_children(deep=True)

children = object.sort_components(children)[::-1]
for c in children:
if c.version is None:
logging.warn(f'Found uninitialized component {c.huuid}, skipping...')
continue
try:
self._remove_component_version(
c.component,
c.identifier,
version=c.version,
recursive=False,
force=force,
)
except exceptions.ComponentInUseError as e:
if force:
logging.warn(
f'Component {c.huuid} is in use: {e}\n'
'Skipping since force=True...'
)
else:
raise e

def _save_artifact(self, uuid, info: t.Dict):
def _save_artifact(self, info: t.Dict):
"""
Save an artifact to the artifact store.

:param artifact: The artifact to save.
"""
return self.artifact_store.save_artifact(info)

def _delete_artifacts(self, uuid, info: t.Dict):
artifact_ids, artifacts = self._find_artifacts(info)
for artifact_id in artifact_ids:
relation_uuids = self.metadata.get_artifact_relations(
artifact_id=artifact_id
)
if len(relation_uuids) == 1 and relation_uuids[0] == uuid:
self.artifact_store.delete_artifact([artifact_id])
self.metadata.delete_artifact_relation(
uuid=uuid, artifact_ids=artifact_id
)

def _find_artifacts(self, info: t.Dict):
from superduper.misc.special_dicts import recursive_find

Expand Down
Loading
Loading