Skip to content

✅ Add tests to raise coverage to at least 90% and fix recover password logic #632

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions backend/app/api/routes/login.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from app.core import security
from app.core.config import settings
from app.core.security import get_password_hash
from app.models import Message, NewPassword, Token, UserOut
from app.models import Message, NewPassword, Token, User, UserOut
from app.utils import (
generate_password_reset_token,
send_reset_password_email,
Expand Down Expand Up @@ -73,10 +73,10 @@ def reset_password(session: SessionDep, body: NewPassword) -> Message:
"""
Reset password
"""
email = verify_password_reset_token(token=body.token)
if not email:
user_id = verify_password_reset_token(token=body.token)
if not user_id:
raise HTTPException(status_code=400, detail="Invalid token")
user = crud.get_user_by_email(session=session, email=email)
user = session.get(User, int(user_id))
if not user:
raise HTTPException(
status_code=404,
Expand Down
5 changes: 2 additions & 3 deletions backend/app/api/routes/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,7 @@ def read_user_by_id(
return user
if not current_user.is_superuser:
raise HTTPException(
# TODO: Review status code
status_code=400,
status_code=403,
detail="The user doesn't have enough privileges",
)
return user
Expand Down Expand Up @@ -194,5 +193,5 @@ def delete_user(
return Message(message="User deleted successfully")
elif user == current_user and current_user.is_superuser:
raise HTTPException(
status_code=400, detail="Super users are not allowed to delete themselves"
status_code=403, detail="Super users are not allowed to delete themselves"
)
7 changes: 4 additions & 3 deletions backend/app/backend_pre_start.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging

from sqlalchemy import Engine
from sqlmodel import Session, select
from tenacity import after_log, before_log, retry, stop_after_attempt, wait_fixed

Expand All @@ -18,9 +19,9 @@
before=before_log(logger, logging.INFO),
after=after_log(logger, logging.WARN),
)
def init() -> None:
def init(db_engine: Engine) -> None:
try:
with Session(engine) as session:
with Session(db_engine) as session:
# Try to create session to check if DB is awake
session.exec(select(1))
except Exception as e:
Expand All @@ -30,7 +31,7 @@ def init() -> None:

def main() -> None:
logger.info("Initializing service")
init()
init(engine)
logger.info("Service finished initializing")


Expand Down
7 changes: 4 additions & 3 deletions backend/app/celeryworker_pre_start.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging

from sqlalchemy import Engine
from sqlmodel import Session, select
from tenacity import after_log, before_log, retry, stop_after_attempt, wait_fixed

Expand All @@ -18,10 +19,10 @@
before=before_log(logger, logging.INFO),
after=after_log(logger, logging.WARN),
)
def init() -> None:
def init(db_engine: Engine) -> None:
try:
# Try to create session to check if DB is awake
with Session(engine) as session:
with Session(db_engine) as session:
session.exec(select(1))
except Exception as e:
logger.error(e)
Expand All @@ -30,7 +31,7 @@ def init() -> None:

def main() -> None:
logger.info("Initializing service")
init()
init(engine)
logger.info("Service finished initializing")


Expand Down
1 change: 1 addition & 0 deletions backend/app/crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def authenticate(*, session: Session, email: str, password: str) -> User | None:
return None
return db_user


def create_item(*, session: Session, item_in: ItemCreate, owner_id: int) -> Item:
db_item = Item.model_validate(item_in, update={"owner_id": owner_id})
session.add(db_item)
Expand Down
124 changes: 124 additions & 0 deletions backend/app/tests/api/api_v1/test_items.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,127 @@ def test_read_item(
assert content["description"] == item.description
assert content["id"] == item.id
assert content["owner_id"] == item.owner_id


def test_read_item_not_found(
client: TestClient, superuser_token_headers: dict, db: Session
) -> None:
response = client.get(
f"{settings.API_V1_STR}/items/999",
headers=superuser_token_headers,
)
assert response.status_code == 404
content = response.json()
assert content["detail"] == "Item not found"


def test_read_item_not_enough_permissions(
client: TestClient, normal_user_token_headers: dict, db: Session
) -> None:
item = create_random_item(db)
response = client.get(
f"{settings.API_V1_STR}/items/{item.id}",
headers=normal_user_token_headers,
)
assert response.status_code == 400
content = response.json()
assert content["detail"] == "Not enough permissions"


def test_read_items(
client: TestClient, superuser_token_headers: dict, db: Session
) -> None:
create_random_item(db)
create_random_item(db)
response = client.get(
f"{settings.API_V1_STR}/items/",
headers=superuser_token_headers,
)
assert response.status_code == 200
content = response.json()
assert len(content["data"]) >= 2


def test_update_item(
client: TestClient, superuser_token_headers: dict, db: Session
) -> None:
item = create_random_item(db)
data = {"title": "Updated title", "description": "Updated description"}
response = client.put(
f"{settings.API_V1_STR}/items/{item.id}",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == 200
content = response.json()
assert content["title"] == data["title"]
assert content["description"] == data["description"]
assert content["id"] == item.id
assert content["owner_id"] == item.owner_id


def test_update_item_not_found(
client: TestClient, superuser_token_headers: dict, db: Session
) -> None:
data = {"title": "Updated title", "description": "Updated description"}
response = client.put(
f"{settings.API_V1_STR}/items/999",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == 404
content = response.json()
assert content["detail"] == "Item not found"


def test_update_item_not_enough_permissions(
client: TestClient, normal_user_token_headers: dict, db: Session
) -> None:
item = create_random_item(db)
data = {"title": "Updated title", "description": "Updated description"}
response = client.put(
f"{settings.API_V1_STR}/items/{item.id}",
headers=normal_user_token_headers,
json=data,
)
assert response.status_code == 400
content = response.json()
assert content["detail"] == "Not enough permissions"


def test_delete_item(
client: TestClient, superuser_token_headers: dict, db: Session
) -> None:
item = create_random_item(db)
response = client.delete(
f"{settings.API_V1_STR}/items/{item.id}",
headers=superuser_token_headers,
)
assert response.status_code == 200
content = response.json()
assert content["message"] == "Item deleted successfully"


def test_delete_item_not_found(
client: TestClient, superuser_token_headers: dict, db: Session
) -> None:
response = client.delete(
f"{settings.API_V1_STR}/items/999",
headers=superuser_token_headers,
)
assert response.status_code == 404
content = response.json()
assert content["detail"] == "Item not found"


def test_delete_item_not_enough_permissions(
client: TestClient, normal_user_token_headers: dict, db: Session
) -> None:
item = create_random_item(db)
response = client.delete(
f"{settings.API_V1_STR}/items/{item.id}",
headers=normal_user_token_headers,
)
assert response.status_code == 400
content = response.json()
assert content["detail"] == "Not enough permissions"
70 changes: 70 additions & 0 deletions backend/app/tests/api/api_v1/test_login.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@ def test_get_access_token(client: TestClient) -> None:
assert tokens["access_token"]


def test_get_access_token_incorrect_password(client: TestClient) -> None:
login_data = {
"username": settings.FIRST_SUPERUSER,
"password": "incorrect",
}
r = client.post(f"{settings.API_V1_STR}/login/access-token", data=login_data)
assert r.status_code == 400


def test_use_access_token(
client: TestClient, superuser_token_headers: dict[str, str]
) -> None:
Expand All @@ -25,3 +34,64 @@ def test_use_access_token(
result = r.json()
assert r.status_code == 200
assert "email" in result


def test_recovery_password(
client: TestClient, normal_user_token_headers: dict[str, str], mocker
) -> None:
mocker.patch("app.utils.send_reset_password_email", return_value=None)
mocker.patch("app.utils.send_email", return_value=None)
email = "[email protected]"
r = client.post(
f"{settings.API_V1_STR}/password-recovery/{email}",
headers=normal_user_token_headers,
)
assert r.status_code == 200
assert r.json() == {"message": "Password recovery email sent"}


def test_recovery_password_user_not_exits(
client: TestClient, normal_user_token_headers: dict[str, str]
) -> None:
email = "[email protected]"
r = client.post(
f"{settings.API_V1_STR}/password-recovery/{email}",
headers=normal_user_token_headers,
)
assert r.status_code == 404


def test_reset_password(
client: TestClient, superuser_token_headers: dict[str, str]
) -> None:
login_data = {
"username": settings.FIRST_SUPERUSER,
"password": settings.FIRST_SUPERUSER_PASSWORD,
}
r = client.post(f"{settings.API_V1_STR}/login/access-token", data=login_data)
token = r.json().get("access_token")

data = {"new_password": "changethis", "token": token}
r = client.post(
f"{settings.API_V1_STR}/reset-password/",
headers=superuser_token_headers,
json=data
)
assert r.status_code == 200
assert r.json() == {"message": "Password updated successfully"}


def test_reset_password_invalid_token(
client: TestClient, superuser_token_headers: dict[str, str]
) -> None:
data = {"new_password": "changethis", "token": "invalid"}
r = client.post(
f"{settings.API_V1_STR}/reset-password/",
headers=superuser_token_headers,
json=data
)
response = r.json()

assert "detail" in response
assert r.status_code == 400
assert response["detail"] == "Invalid token"
Loading