Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/bot/database/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ class TrustedUser(SQLModel, table=True):
user_id: int = Field(index=True)
group_id: int = Field(default=0, index=True)
trusted_by_admin_id: int
user_full_name: str = Field(default="")
username: str | None = Field(default=None)
trusted_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
notes: str | None = Field(default=None)

Expand Down
29 changes: 29 additions & 0 deletions src/bot/database/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,29 @@ def __init__(self, database_path: str):

SQLModel.metadata.create_all(self._engine)

# Migrate existing tables: add new columns if missing
self._migrate_trusted_users()

def _migrate_trusted_users(self) -> None:
"""Add user_full_name/username columns to trusted_users if missing."""
with self._engine.connect() as conn:
columns = {
row[1] for row in conn.exec_driver_sql(
"PRAGMA table_info(trusted_users)"
).fetchall()
}
if "user_full_name" not in columns:
conn.exec_driver_sql(
"ALTER TABLE trusted_users ADD COLUMN user_full_name TEXT DEFAULT ''"
)
logger.info("Migrated trusted_users: added user_full_name column")
if "username" not in columns:
conn.exec_driver_sql(
"ALTER TABLE trusted_users ADD COLUMN username TEXT"
)
logger.info("Migrated trusted_users: added username column")
conn.commit()

def get_or_create_user_warning(self, user_id: int, group_id: int) -> UserWarning:
"""
Get existing warning record or create a new one.
Expand Down Expand Up @@ -354,6 +377,8 @@ def add_trusted_user(
trusted_by_admin_id: int,
group_id: int = 0,
notes: str | None = None,
user_full_name: str = "",
username: str | None = None,
) -> TrustedUser:
"""
Add a user to trusted list.
Expand All @@ -363,6 +388,8 @@ def add_trusted_user(
trusted_by_admin_id: Telegram user ID of admin granting trust.
group_id: Trust scope ID (0 means global).
notes: Optional admin notes.
user_full_name: Display name of the trusted user.
username: Username of the trusted user.

Returns:
TrustedUser: Created trusted record.
Expand All @@ -385,6 +412,8 @@ def add_trusted_user(
group_id=group_id,
trusted_by_admin_id=trusted_by_admin_id,
notes=notes,
user_full_name=user_full_name,
username=username,
)
session.add(record)
session.commit()
Expand Down
56 changes: 30 additions & 26 deletions src/bot/handlers/trust.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import logging
from datetime import UTC

from telegram import Bot, Update
from telegram import Update
from telegram.ext import ContextTypes
from telegram.helpers import escape_markdown

Expand Down Expand Up @@ -51,25 +51,15 @@ def _remove_trusted_cache(context: ContextTypes.DEFAULT_TYPE, user_id: int) -> N
trusted_ids.discard(user_id)


async def _format_user_display(bot: Bot, user_id: int) -> str:
"""Return a markdown-safe ``Name (@username)`` display for a user.
def _format_stored_user(full_name: str, user_id: int) -> str:
"""Return a markdown-safe display for a stored user.

Falls back to ``User <id>`` if the bot cannot fetch the user profile
(e.g., user has not interacted with the bot or has blocked it).
Uses the cached full_name from the DB. Falls back to ``User <id>``
if the name is empty (e.g. trust granted via callback without name).
"""
try:
chat = await bot.get_chat(user_id)
full_name = chat.full_name or f"User {user_id}"
name = escape_markdown(full_name, version=1)
if chat.username:
username = escape_markdown(chat.username, version=1)
return f"{name} (@{username})"
return name
except Exception:
logger.warning(
f"Failed to fetch user info for {user_id}", exc_info=True
)
return f"User {user_id}"
if full_name:
return escape_markdown(full_name, version=1)
return f"User {user_id}"


def _resolve_target_user_id(
Expand Down Expand Up @@ -102,6 +92,8 @@ async def trust_user(
registry: GroupRegistry,
target_user_id: int,
admin_user_id: int,
target_user_full_name: str = "",
target_username: str | None = None,
) -> tuple[int, int]:
"""Add a trusted user and apply cleanup side effects.

Expand All @@ -119,6 +111,8 @@ async def trust_user(
db.add_trusted_user(
user_id=target_user_id,
trusted_by_admin_id=admin_user_id,
user_full_name=target_user_full_name,
username=target_username,
)

cleared_probation = 0
Expand Down Expand Up @@ -180,12 +174,21 @@ async def handle_trust_command(
await update.message.reply_text(error_message)
return

# Resolve target user's display name
target_full_name = ""
target_username = None
if update.message.forward_from:
target_full_name = update.message.forward_from.full_name
target_username = update.message.forward_from.username

db = get_database()
registry = get_group_registry()

try:
cleared_count, unrestricted_count = await trust_user(
context.bot, db, registry, target_user_id, admin_user_id
context.bot, db, registry, target_user_id, admin_user_id,
target_user_full_name=target_full_name,
target_username=target_username,
)
_add_trusted_cache(context, target_user_id)
await update.message.reply_text(
Expand Down Expand Up @@ -271,16 +274,17 @@ async def handle_trusted_list_command(
if trusted_at.tzinfo is None:
trusted_at = trusted_at.replace(tzinfo=UTC)
trusted_at_display = trusted_at.astimezone(UTC).strftime("%Y-%m-%d %H:%M UTC")
user_display = await _format_user_display(context.bot, record.user_id)
admin_display = await _format_user_display(
context.bot, record.trusted_by_admin_id
)

# Use stored name/username — no API calls
user_display = _format_stored_user(record.user_full_name, record.user_id)
if record.username:
escaped = escape_markdown(record.username, version=1)
user_display += f" (@{escaped})"

trusted_lines.append(
"• {user_display} (`{user_id}`) — oleh {admin_display} (`{admin_id}`) pada `{trusted_at}`".format(
"• {user_display} (`{user_id}`) pada `{trusted_at}`".format(
user_display=user_display,
user_id=record.user_id,
admin_display=admin_display,
admin_id=record.trusted_by_admin_id,
trusted_at=trusted_at_display,
)
)
Expand Down
15 changes: 13 additions & 2 deletions tests/test_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,14 +421,25 @@ def test_get_trusted_user_ids(self, db_service: DatabaseService):
assert trusted_ids == {1001, 1002}

def test_get_trusted_users_returns_metadata(self, db_service: DatabaseService):
db_service.add_trusted_user(user_id=2001, trusted_by_admin_id=9001)
db_service.add_trusted_user(user_id=2002, trusted_by_admin_id=9002)
db_service.add_trusted_user(
user_id=2001, trusted_by_admin_id=9001,
user_full_name="Alice", username="alice",
)
db_service.add_trusted_user(
user_id=2002, trusted_by_admin_id=9002,
user_full_name="Bob", username=None,
)

trusted_users = db_service.get_trusted_users()

assert len(trusted_users) == 2
assert {u.user_id for u in trusted_users} == {2001, 2002}
assert {u.trusted_by_admin_id for u in trusted_users} == {9001, 9002}
by_id = {u.user_id: u for u in trusted_users}
assert by_id[2001].user_full_name == "Alice"
assert by_id[2001].username == "alice"
assert by_id[2002].user_full_name == "Bob"
assert by_id[2002].username is None

def test_is_user_trusted_non_zero_group_raises(
self, db_service: DatabaseService
Expand Down
31 changes: 11 additions & 20 deletions tests/test_trust_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ async def test_trust_command_success_from_forwarded_message(
forwarded_user = MagicMock()
forwarded_user.id = 4444
forwarded_user.full_name = "Forwarded User"
forwarded_user.username = None
mock_update.message.forward_from = forwarded_user

await handle_trust_command(mock_update, mock_context)
Expand Down Expand Up @@ -318,45 +319,35 @@ async def test_trusted_list_command_empty(self, mock_update, mock_context):

async def test_trusted_list_command(self, mock_update, mock_context):
db = get_database()
db.add_trusted_user(user_id=8001, trusted_by_admin_id=12345)
db.add_trusted_user(user_id=8002, trusted_by_admin_id=54321)

chats = {
8001: MagicMock(full_name="Alice Trusted", username="alice_t"),
8002: MagicMock(full_name="Bob Trusted", username=None),
12345: MagicMock(full_name="Admin One", username="admin_one"),
54321: MagicMock(full_name="Admin Two", username=None),
}
mock_context.bot.get_chat = AsyncMock(side_effect=lambda uid: chats[uid])
db.add_trusted_user(
user_id=8001, trusted_by_admin_id=12345,
user_full_name="Alice Trusted", username="alice_t",
)
db.add_trusted_user(
user_id=8002, trusted_by_admin_id=54321,
user_full_name="Bob Trusted", username=None,
)

await handle_trusted_list_command(mock_update, mock_context)

message = mock_update.message.reply_text.call_args.args[0]
assert "8001" in message
assert "8002" in message
assert "12345" in message
assert "54321" in message
assert "UTC" in message
assert "Alice Trusted" in message
assert "@alice\\_t" in message
assert "@alice\_t" in message
assert "Bob Trusted" in message
assert "Admin One" in message
assert "@admin\\_one" in message
assert "Admin Two" in message

async def test_trusted_list_command_get_chat_failure_fallback(
async def test_trusted_list_command_empty_name_fallback(
self, mock_update, mock_context
):
db = get_database()
db.add_trusted_user(user_id=8001, trusted_by_admin_id=12345)

mock_context.bot.get_chat = AsyncMock(side_effect=Exception("not found"))

await handle_trusted_list_command(mock_update, mock_context)

message = mock_update.message.reply_text.call_args.args[0]
assert "User 8001" in message
assert "User 12345" in message


class TestTrustCallbacks:
Expand Down
Loading