kemono2/src/pages/api/v1/account.py
2025-04-02 16:32:47 +02:00

223 lines
6.7 KiB
Python

from typing import TypedDict, List, Literal, Optional
from flask import Blueprint, g, make_response, jsonify, abort, request, current_app
from src.config import Configuration
from src.lib.account import (
is_logged_in,
load_account,
get_saved_key_import_ids,
get_saved_keys,
revoke_saved_keys,
change_password as db_change_password,
)
from src.lib.notification import count_account_notifications, get_account_notifications, set_notifications_as_seen
from src.lib.security import is_password_compromised
from src.lib.api import create_client_error_response
from src.lib.dms import approve_dms, cleanup_unapproved_dms, get_unapproved_dms, clean_dms_already_approved
from src.pages.account.types import AccountPageProps, NotificationsProps, ServiceKeysProps
from src.types.props import SuccessProps
from src.types.account.account import Account
from src.types.kemono import Unapproved_DM
from . import v1api_bp
from .moderator import moderator_bp
account_bp = Blueprint("account", __name__)
# check credentials for all requests for this blueprint
# so the subsequent handlers wouldn't need to check it again
@account_bp.before_request
def check_auth():
if not is_logged_in():
abort(401)
account = load_account()
if not account:
abort(401)
@account_bp.get("/account")
def get_account_info():
account: Account = g.get("account")
if Configuration().enable_notifications:
notifications_count = count_account_notifications(account.id)
else:
notifications_count = 0
props = AccountPageProps(account=account, notifications_count=notifications_count)
return make_response(jsonify(props=props), 200)
TDChangePasswordBody = TypedDict(
"TDChangePasswordBody", {"current-password": str, "new-password": str, "new-password-confirmation": str}
)
@account_bp.post("/account/change_password")
def post_change_password():
body: TDChangePasswordBody = request.get_json()
current_password = body.get("current-password", "")
new_password = body.get("new-password", "").strip()
new_password_conf = body.get("new-password-confirmation", "").strip()
account: Account = load_account()
if not new_password:
return create_client_error_response("New password cannot be empty.")
if len(new_password) < 5:
return create_client_error_response("New password must have at least 5 characters.")
if new_password != new_password_conf:
return create_client_error_response("New password and confirmation do not match.")
if current_app.config.get("ENABLE_PASSWORD_VALIDATOR") and is_password_compromised(new_password):
response = create_client_error_response(
"We've detected that password was compromised in a data breach on another site. Please choose a different password."
)
return response
if not db_change_password(account.id, current_password, new_password):
return create_client_error_response("Current password is invalid.")
response = make_response(jsonify(True), 200)
return response
@account_bp.get("/account/notifications")
def get_notifications():
account: Account = g.get("account")
if Configuration().enable_notifications:
notifications = get_account_notifications(account.id)
else:
notifications = []
props = NotificationsProps(notifications=notifications)
seen_notif_ids = [notification.id for notification in notifications if not notification.is_seen]
set_notifications_as_seen(seen_notif_ids)
return make_response(jsonify(props=props), 200)
@account_bp.get("/account/keys")
def get_account_keys():
account: Account = g.get("account")
saved_keys = get_saved_keys(account.id)
props = ServiceKeysProps(service_keys=saved_keys)
saved_session_key_import_ids = []
for key in saved_keys:
saved_session_key_import_ids.append(get_saved_key_import_ids(key.id))
response = make_response(
jsonify(props=props, import_ids=saved_session_key_import_ids),
200,
)
response.headers["Cache-Control"] = "s-maxage=60"
return response
class TDKeyRevokeBody(TypedDict):
revoke: list[int]
@account_bp.post("/account/keys")
def revoke_service_keys():
account: Account = g.get("account")
body: TDKeyRevokeBody = request.get_json()
keys_for_revocation = body["revoke"]
revoke_saved_keys(keys_for_revocation, account.id)
props = SuccessProps(currentPage="account", redirect="/account/keys")
response = make_response(jsonify(props=props), 200)
return response
@account_bp.get("/account/posts/upload")
def upload_post():
account: Account = g.get("account")
required_roles = Configuration().filehaus["required_roles"]
if len(required_roles) and account.role not in required_roles:
return create_client_error_response(
"Filehaus uploading requires elevated permissions. Please contact the administrator to change your role."
)
props = {"currentPage": "posts"}
response = make_response(jsonify(props), 200)
response.headers["Cache-Control"] = "s-maxage=60"
return response
class TDDMPageProps(TypedDict):
account_id: int
dms: List[Unapproved_DM]
status: str
currentPage: Literal["import"]
@account_bp.get("/account/review_dms")
def importer_dms():
account: Account = g.get("account")
status = "ignored" if request.args.get("status") == "ignored" else "pending"
dms = get_unapproved_dms(account.id, request.args.get("status") == "ignored")
props = TDDMPageProps(currentPage="import", account_id=account.id, dms=dms, status=status)
response = make_response(
jsonify(
props,
),
200,
)
response.headers["Cache-Control"] = "max-age=0, private, must-revalidate"
return response
class TDApproveDMsBody(TypedDict):
approved_hashes: list[str]
delete_ignored: Optional[bool]
@account_bp.post("/account/review_dms")
def approve_importer_dms():
account: Account = g.get("account")
body: TDApproveDMsBody = request.get_json()
approved_hashes = body.get("approved_hashes", [])
delete_ignored = bool(body.get("delete_ignored", False))
approve_dms(int(account.id), approved_hashes)
clean_dms_already_approved(int(account.id))
cleanup_unapproved_dms(int(account.id))
if delete_ignored:
cleanup_unapproved_dms(int(account.id), delete=True)
response = make_response(jsonify(True), 200)
response.headers["Cache-Control"] = "max-age=0, private, must-revalidate"
return response
@account_bp.errorhandler(401)
def not_authorized_error(error):
return (jsonify(error="Not Authorized"), 401)
account_bp.register_blueprint(moderator_bp, url_prefix="/account")
# not sure about resolution order
# so load it in the end
v1api_bp.register_blueprint(account_bp)