-
Notifications
You must be signed in to change notification settings - Fork 262
Expand file tree
/
Copy pathtest_media_manager.py
More file actions
80 lines (62 loc) · 2.31 KB
/
test_media_manager.py
File metadata and controls
80 lines (62 loc) · 2.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
from queue import Queue
from types import SimpleNamespace
from unittest.mock import Mock
import httpx
import pytest
from langfuse._task_manager.media_manager import MediaManager
def _upload_response(status_code: int, text: str = "") -> httpx.Response:
request = httpx.Request("PUT", "https://example.com/upload")
return httpx.Response(status_code=status_code, request=request, text=text)
def _upload_job() -> dict:
return {
"media_id": "media-id",
"content_bytes": b"payload",
"content_type": "image/jpeg",
"content_length": 7,
"content_sha256_hash": "sha256hash",
"trace_id": "trace-id",
"observation_id": None,
"field": "input",
}
def test_media_upload_retries_on_retryable_http_status():
media_api = Mock()
media_api.get_upload_url.return_value = SimpleNamespace(
upload_url="https://example.com/upload",
media_id="media-id",
)
media_api.patch.return_value = None
httpx_client = Mock()
httpx_client.put.side_effect = [
_upload_response(503, "temporary failure"),
_upload_response(200, "ok"),
]
manager = MediaManager(
api_client=SimpleNamespace(media=media_api),
httpx_client=httpx_client,
media_upload_queue=Queue(),
max_retries=3,
)
manager._process_upload_media_job(data=_upload_job())
assert httpx_client.put.call_count == 2
media_api.patch.assert_called_once()
assert media_api.patch.call_args.kwargs["upload_http_status"] == 200
def test_media_upload_gives_up_on_non_retryable_http_status():
media_api = Mock()
media_api.get_upload_url.return_value = SimpleNamespace(
upload_url="https://example.com/upload",
media_id="media-id",
)
media_api.patch.return_value = None
httpx_client = Mock()
httpx_client.put.return_value = _upload_response(403, "forbidden")
manager = MediaManager(
api_client=SimpleNamespace(media=media_api),
httpx_client=httpx_client,
media_upload_queue=Queue(),
max_retries=3,
)
with pytest.raises(httpx.HTTPStatusError):
manager._process_upload_media_job(data=_upload_job())
assert httpx_client.put.call_count == 1
media_api.patch.assert_called_once()
assert media_api.patch.call_args.kwargs["upload_http_status"] == 403