Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/_newsfragments/2337.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Static routes now support ``HEAD`` without opening file streams, and reject unsupported methods with a proper ``405 Method Not Allowed`` response.
130 changes: 87 additions & 43 deletions falcon/routing/static.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from pathlib import Path
import re
from re import Pattern
import stat
from typing import Any, ClassVar, IO, TYPE_CHECKING

import falcon
Expand Down Expand Up @@ -39,57 +40,81 @@ def _open_file(file_path: str | Path) -> tuple[io.BufferedReader, os.stat_result
return fh, st


def _set_range(
fh: io.BufferedReader, st: os.stat_result, req_range: tuple[int, int] | None
) -> tuple[ReadableIO, int, tuple[int, int, int] | None]:
"""Process file handle for a ranged request.
def _stat_file(file_path: str | Path) -> os.stat_result:
"""Read file stat for a static file request without opening it."""

Args:
fh (io.BufferedReader): file handle of the file.
st (os.stat_result): fs stat result of the file.
req_range (Optional[Tuple[int, int]]): Request.range value.
Returns:
tuple: Three-member tuple of (stream, content-length, content-range).
If req_range is ``None`` or ignored, content-range will be
``None``; otherwise, the stream will be appropriately seeked and
possibly bounded, and the content-range will be a tuple of
(start, end, size).
"""
try:
st = os.stat(file_path)
if not stat.S_ISREG(st.st_mode):
raise OSError
except OSError:
raise falcon.HTTPNotFound()

return st


def _get_range(
st: os.stat_result, req_range: tuple[int, int] | None
) -> tuple[int, tuple[int, int, int] | None]:
size = st.st_size
if req_range is None:
return fh, size, None
return size, None

start, end = req_range
if size == 0:
# NOTE(tipabu): Ignore Range headers for zero-byte files; just serve
# the empty body since Content-Range can't be used to express a
# zero-byte body.
return fh, 0, None
return 0, None

if start < 0 and end == -1:
# NOTE(tipabu): Special case: only want the last N bytes.
start = max(start, -size)
fh.seek(start, os.SEEK_END)
# NOTE(vytas): Wrap in order to prevent sendfile from being used, as
# its implementation was found to be buggy in many popular WSGI
# servers for open files with a non-zero offset.
return _BoundedFile(fh, -start), -start, (size + start, size - 1, size)
return -start, (size + start, size - 1, size)

if start >= size:
fh.close()
raise falcon.HTTPRangeNotSatisfiable(size)

fh.seek(start)
if end == -1:
# NOTE(vytas): Wrap in order to prevent sendfile from being used, as
# its implementation was found to be buggy in many popular WSGI
# servers for open files with a non-zero offset.
length = size - start
return _BoundedFile(fh, length), length, (start, size - 1, size)
return length, (start, size - 1, size)

end = min(end, size - 1)
length = end - start + 1
return _BoundedFile(fh, length), length, (start, end, size)
return length, (start, end, size)


def _set_range(
fh: io.BufferedReader, st: os.stat_result, req_range: tuple[int, int] | None
) -> tuple[ReadableIO, int, tuple[int, int, int] | None]:
"""Process file handle for a ranged request.

Args:
fh (io.BufferedReader): file handle of the file.
st (os.stat_result): fs stat result of the file.
req_range (Optional[Tuple[int, int]]): Request.range value.
Returns:
tuple: Three-member tuple of (stream, content-length, content-range).
If req_range is ``None`` or ignored, content-range will be
``None``; otherwise, the stream will be appropriately seeked and
possibly bounded, and the content-range will be a tuple of
(start, end, size).
"""
try:
length, content_range = _get_range(st, req_range)
except falcon.HTTPRangeNotSatisfiable:
fh.close()
raise

if content_range is None:
return fh, length, None

fh.seek(content_range[0])

# NOTE(vytas): Wrap in order to prevent sendfile from being used, as
# its implementation was found to be buggy in many popular WSGI
# servers for open files with a non-zero offset.
return _BoundedFile(fh, length), length, content_range


def _is_not_modified(
Expand Down Expand Up @@ -228,10 +253,13 @@ def __call__(self, req: Request, resp: Response, **kw: Any) -> None:
assert not kw
if req.method == 'OPTIONS':
# it's likely a CORS request. Set the allow header to the appropriate value.
resp.set_header('Allow', 'GET')
resp.set_header('Allow', 'GET, HEAD')
resp.set_header('Content-Length', '0')
return

if req.method not in ('GET', 'HEAD'):
raise falcon.HTTPMethodNotAllowed(['GET', 'HEAD'])

without_prefix = req.path[len(self._prefix) :]

# NOTE(kgriffs): Check surrounding whitespace and strip trailing
Expand Down Expand Up @@ -260,14 +288,24 @@ def __call__(self, req: Request, resp: Response, **kw: Any) -> None:
if '..' in file_path or not file_path.startswith(self._directory):
raise falcon.HTTPNotFound()

if self._fallback_filename is None:
fh, st = _open_file(file_path)
if req.method == 'HEAD':
if self._fallback_filename is None:
st = _stat_file(file_path)
else:
try:
st = _stat_file(file_path)
except falcon.HTTPNotFound:
st = _stat_file(self._fallback_filename)
file_path = self._fallback_filename
else:
try:
if self._fallback_filename is None:
fh, st = _open_file(file_path)
except falcon.HTTPNotFound:
fh, st = _open_file(self._fallback_filename)
file_path = self._fallback_filename
else:
try:
fh, st = _open_file(file_path)
except falcon.HTTPNotFound:
fh, st = _open_file(self._fallback_filename)
file_path = self._fallback_filename

etag = f'{int(st.st_mtime):x}-{st.st_size:x}'
resp.etag = etag
Expand All @@ -280,18 +318,24 @@ def __call__(self, req: Request, resp: Response, **kw: Any) -> None:
resp.last_modified = last_modified

if _is_not_modified(req, etag, last_modified):
fh.close()
if req.method != 'HEAD':
fh.close()
resp.status = falcon.HTTP_304
return

req_range = req.range if req.range_unit == 'bytes' else None
try:
stream, length, content_range = _set_range(fh, st, req_range)
except OSError:
fh.close()
raise falcon.HTTPNotFound()
if req.method == 'HEAD':
length, content_range = _get_range(st, req_range)
resp.content_length = length
else:
try:
stream, length, content_range = _set_range(fh, st, req_range)
except OSError:
fh.close()
raise falcon.HTTPNotFound()

resp.set_stream(stream, length)

resp.set_stream(stream, length)
suffix = os.path.splitext(file_path)[1]
resp.content_type = resp.options.static_media_types.get(
suffix, 'application/octet-stream'
Expand Down
58 changes: 57 additions & 1 deletion tests/test_static.py
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,62 @@ def test_file_closed(client, patch_open):
assert patch_open.current_file.closed


def test_head_request_does_not_open_file(client, monkeypatch, tmp_path):
file_path = tmp_path / 'main.css'
file_content = b'body { color: black; }'
file_path.write_bytes(file_content)

open_mock = mock.Mock(side_effect=AssertionError('HEAD must not open file streams'))
monkeypatch.setattr(io, 'open', open_mock)

client.app.add_static_route('/assets/', tmp_path)

resp = client.simulate_head(path='/assets/main.css')

assert resp.status == falcon.HTTP_200
assert resp.text == ''
assert resp.headers['Content-Type'] == 'text/css'
assert int(resp.headers['Content-Length']) == len(file_content)
assert resp.headers['Accept-Ranges'] == 'bytes'
open_mock.assert_not_called()


def test_head_request_honors_range(client, monkeypatch, tmp_path):
file_path = tmp_path / 'main.css'
file_path.write_bytes(b'0123456789')

open_mock = mock.Mock(side_effect=AssertionError('HEAD must not open file streams'))
monkeypatch.setattr(io, 'open', open_mock)

client.app.add_static_route('/assets/', tmp_path)

resp = client.simulate_head(
path='/assets/main.css',
headers={'Range': 'bytes=2-5'},
)

assert resp.status == falcon.HTTP_206
assert resp.text == ''
assert int(resp.headers['Content-Length']) == 4
assert resp.headers['Content-Range'] == 'bytes 2-5/10'
open_mock.assert_not_called()


def test_static_route_method_not_allowed(client, monkeypatch, tmp_path):
(tmp_path / 'main.css').write_bytes(b'body { color: black; }')

open_mock = mock.Mock(side_effect=AssertionError('unsupported methods must not open files'))
monkeypatch.setattr(io, 'open', open_mock)

client.app.add_static_route('/assets/', tmp_path)

resp = client.simulate_post(path='/assets/main.css')

assert resp.status == falcon.HTTP_405
assert resp.headers['Allow'] == 'GET, HEAD'
open_mock.assert_not_called()


def test_options_request(client, patch_open):
patch_open()

Expand All @@ -634,7 +690,7 @@ def test_options_request(client, patch_open):
assert resp.status_code == 200
assert resp.text == ''
assert int(resp.headers['Content-Length']) == 0
assert resp.headers['Access-Control-Allow-Methods'] == 'GET'
assert resp.headers['Access-Control-Allow-Methods'] == 'GET, HEAD'


def test_last_modified(client, patch_open):
Expand Down
Loading