Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ on:
- cron: '0 0 * * *'

env:
FAF_DB_VERSION: v138
FAF_DB_VERSION: v143
FLYWAY_VERSION: 7.5.4

jobs:
Expand Down
2 changes: 1 addition & 1 deletion compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ services:
- "3306:3306"

faf-db-migrations:
image: faforever/faf-db-migrations:v138
image: faforever/faf-db-migrations:v143
command: migrate
environment:
FLYWAY_URL: jdbc:mysql://faf-db/faf?useSSL=false
Expand Down
2 changes: 2 additions & 0 deletions server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
import server.metrics as metrics

from .asyncio_extensions import map_suppress, synchronizedmethod
from .avatar_change_queue_service import AvatarChangeQueueService
from .broadcast_service import BroadcastService
from .client_message_queue_service import ClientMessageQueueService
from .config import TRACE, config
Expand Down Expand Up @@ -144,6 +145,7 @@
__copyright__ = "Copyright (c) 2011-2015 " + __author__

__all__ = (
"AvatarChangeQueueService",
"BroadcastService",
"ClientMessageQueueService",
"ConfigurationService",
Expand Down
119 changes: 119 additions & 0 deletions server/avatar_change_queue_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
"""RabbitMQ consumer that refreshes player avatars from DB on update events."""

import json
import logging
import socket
from typing import Any, ClassVar, Optional

from aio_pika.abc import AbstractIncomingMessage, AbstractQueue

from .config import config
from .core import Service
from .decorators import with_logger
from .message_queue_service import MessageQueueService
from .player_service import PlayerService

PLAYER_AVATAR_UPDATE_ROUTING_KEY = "success.player_avatar.update"


@with_logger
class AvatarChangeQueueService(Service):

"""

Check notice on line 22 in server/avatar_change_queue_service.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

server/avatar_change_queue_service.py#L22

Multi-line docstring summary should start at the first line (D212)

Check notice on line 22 in server/avatar_change_queue_service.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

server/avatar_change_queue_service.py#L22

No blank lines allowed before class docstring (found 1) (D211)
Consume `success.player_avatar.update` messages and refresh players.

Wire contract
-------------
Publishers post to the `MQ_EXCHANGE_NAME` topic exchange with routing
key `success.player_avatar.update`. The body is a UTF-8 JSON object:

- `player_id` (int, required): the player whose selected avatar changed.
- `avatar_id` (int or null, optional): the newly selected avatar id, or
null if the player cleared their avatar. The lobby itself ignores this
field — it always re-reads the DB so it gets the matching url/tooltip
and applies the ownership check. The field is shipped for the benefit
of other subscribers that may want to act on the change without an
extra DB roundtrip.

On receipt the lobby re-reads the affected player's avatar from the DB
and marks them dirty so the existing `BroadcastService` emits a
`player_info` to every connected client on its next tick.
"""

_logger: ClassVar[logging.Logger]

def __init__(
self,
message_queue_service: MessageQueueService,
player_service: PlayerService,
):
"""Wire dependencies; consumer is started in `initialize`."""
self.message_queue_service = message_queue_service
self.player_service = player_service
self._queue: Optional[AbstractQueue] = None
self._consumer_tag: Optional[str] = None

async def initialize(self) -> None:
# Per-instance queue: every lobby pod must process every event so
# whichever pod is hosting the player can refresh its in-memory
# state. Naming follows `<exchange>.<service>.<routing-key>.<host>`,
# matching `ClientMessageQueueService`.
queue_name = (
f"{config.MQ_EXCHANGE_NAME}.lobby.player_avatar.update"
f".{socket.gethostname()}"
)
result = await self.message_queue_service.declare_queue_and_consume(
exchange_name=config.MQ_EXCHANGE_NAME,
routing_key=PLAYER_AVATAR_UPDATE_ROUTING_KEY,
callback=self._on_message,
queue_name=queue_name,
)
if result is not None:
self._queue, self._consumer_tag = result

async def shutdown(self) -> None:
if self._queue is not None and self._consumer_tag is not None:
await self._queue.cancel(self._consumer_tag)
self._queue = None
self._consumer_tag = None

async def _on_message(self, message: AbstractIncomingMessage) -> None:
async with message.process(requeue=False):
try:
payload = json.loads(message.body)
except (ValueError, UnicodeDecodeError):
self._logger.warning(
"Dropping avatar-update message with non-JSON body"
)
return

if not isinstance(payload, dict):
self._logger.warning(
"Dropping avatar-update message: payload is not a JSON object"
)
return

raw_player_id: Any = payload.get("player_id")
# Reject bool explicitly: int(True) == 1 would otherwise sneak
# through and refresh player 1 on every truthy payload.
if isinstance(raw_player_id, bool):
self._logger.warning(
"Dropping avatar-update message: invalid player_id %r",
raw_player_id,
)
return
try:
player_id = int(raw_player_id)
except (TypeError, ValueError):
self._logger.warning(
"Dropping avatar-update message: invalid player_id %r",
raw_player_id,
)
return

refreshed = await self.player_service.refresh_player_avatar(player_id)
if not refreshed:
self._logger.debug(
"avatar-update for player %s ignored: not connected here",
player_id,
)
3 changes: 2 additions & 1 deletion server/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@
Column("create_time", TIMESTAMP, nullable=False),
Column("update_time", TIMESTAMP, nullable=False),
Column("user_agent", String),
Column("last_login", TIMESTAMP)
Column("last_login", TIMESTAMP),
Column("avatar_id", Integer, ForeignKey("avatars_list.id")),
)

leaderboard = Table(
Expand Down
18 changes: 18 additions & 0 deletions server/lobbyconnection.py
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,7 @@ async def command_avatar(self, message):
)
self.player.avatar = None

new_avatar_id = row.id if avatar_url is not None else None
if avatar_url is not None:
await conn.execute(
avatars.update().where(
Expand All @@ -972,6 +973,23 @@ async def command_avatar(self, message):
"url": avatar_url,
"tooltip": row.tooltip
}
# Mirror the selection to login.avatar_id so reads via the new
# authoritative column stay consistent with the legacy flag.
await conn.execute(
t_login.update().where(
t_login.c.id == self.player.id
).values(
avatar_id=new_avatar_id
)
)
avatar_tooltip = (
self.player.avatar["tooltip"] if self.player.avatar else None
)
self._logger.info(
"Player %s changed avatar via client connection: "
"avatar_id=%s tooltip=%s",
self.player.id, new_avatar_id, avatar_tooltip
)
self.player_service.mark_dirty(self.player)
else:
raise KeyError("invalid action")
Expand Down
80 changes: 75 additions & 5 deletions server/player_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from typing import TYPE_CHECKING, ClassVar, Optional, ValuesView

import aiocron
from sqlalchemy import and_, select
from sqlalchemy import and_, or_, select

import server.metrics as metrics
from server.config import config
Expand Down Expand Up @@ -81,6 +81,25 @@ def pop_dirty_players(self) -> set[Player]:

return dirty_players

@staticmethod
def _avatar_grant_join_onclause():
# ON clause for joining `avatars` against `login` to pick a
# player's currently worn avatar with ownership enforced.
# Prefer the new authoritative `login.avatar_id` column; fall
# back to the legacy `avatars.selected = 1` row only when
# `avatar_id` is null. Either way the row must be a real grant
# in `avatars`, so revoked grants resolve to no avatar.
return and_(
avatars.c.idUser == login.c.id,
or_(
avatars.c.idAvatar == login.c.avatar_id,
and_(
login.c.avatar_id.is_(None),
avatars.c.selected == 1
)
)
)

async def fetch_player_data(self, player: Player) -> None:
async with self._db.acquire() as conn:
result = await conn.execute(
Expand All @@ -100,10 +119,7 @@ async def fetch_player_data(self, player: Player) -> None:
.outerjoin(clan)
.outerjoin(
avatars,
onclause=and_(
avatars.c.idUser == login.c.id,
avatars.c.selected == 1
)
onclause=self._avatar_grant_join_onclause()
)
.outerjoin(avatars_list)
).where(login.c.id == player.id) # yapf: disable
Expand All @@ -129,6 +145,60 @@ async def fetch_player_data(self, player: Player) -> None:

await self._fetch_player_ratings(player, conn)

async def _fetch_player_avatar(
self, player: Player, conn
) -> Optional[int]:
"""Refresh `player.avatar` from DB; return the avatar id, if any."""
sql = select(
avatars_list.c.id,
avatars_list.c.url,
avatars_list.c.tooltip,
).select_from(
login
.outerjoin(
avatars,
onclause=self._avatar_grant_join_onclause()
)
.outerjoin(avatars_list)
).where(login.c.id == player.id)

result = await conn.execute(sql)
row = result.fetchone()
if row is None:
player.avatar = None
return None

row_mapping = row._mapping
avatar_id = row_mapping.get(avatars_list.c.id)
url = row_mapping.get(avatars_list.c.url)
tooltip = row_mapping.get(avatars_list.c.tooltip)
if url and tooltip:
player.avatar = {"url": url, "tooltip": tooltip}
return avatar_id
player.avatar = None
return None

async def refresh_player_avatar(self, player_id: int) -> bool:
"""
Re-read avatar for one player and mark them dirty.

`BroadcastService` emits a `player_info` on the next tick. Returns
True if the player is connected to this instance, False otherwise.
"""
player = self._players.get(player_id)
if player is None:
return False
async with self._db.acquire() as conn:
avatar_id = await self._fetch_player_avatar(player, conn)
avatar_tooltip = player.avatar["tooltip"] if player.avatar else None
self._logger.info(
"Player %s avatar refreshed from RabbitMQ event: "
"avatar_id=%s tooltip=%s",
player_id, avatar_id, avatar_tooltip
)
self.mark_dirty(player)
return True

async def _fetch_player_ratings(self, player: Player, conn):
sql = select(
leaderboard_rating.c.mean,
Expand Down
Loading
Loading