Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/e2e-master.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
cluster_name: kubernetes-python-e2e-master-${{ matrix.python-version }}
# The kind version to be used to spin the cluster up
# this needs to be updated whenever a new Kind version is released
version: v0.17.0
version: v0.31.0
# Update the config here whenever a new client snapshot is performed
# This would eventually point to cluster with the latest Kubernetes version
# as we sync with Kubernetes upstream
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/e2e-release-35.0.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
cluster_name: kubernetes-python-e2e-release-35.0-${{ matrix.python-version }}
# The kind version to be used to spin the cluster up
# this needs to be updated whenever a new Kind version is released
version: v0.17.0
version: v0.31.0
# Update the config here whenever a new client snapshot is performed
# This would eventually point to cluster with the latest Kubernetes version
# as we sync with Kubernetes upstream
Expand Down
60 changes: 53 additions & 7 deletions kubernetes/base/stream/ws_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@
STDERR_CHANNEL = 2
ERROR_CHANNEL = 3
RESIZE_CHANNEL = 4
CLOSE_CHANNEL = 255

V4_CHANNEL_PROTOCOL = "v4.channel.k8s.io"
V5_CHANNEL_PROTOCOL = "v5.channel.k8s.io"

class _IgnoredIO:
def write(self, _x):
Expand All @@ -59,26 +63,40 @@ def __init__(self, configuration, url, headers, capture_all, binary=False):
"""
self._connected = False
self._channels = {}
self._closed_channels = set()
self.subprotocol = None
self.binary = binary
self.newline = '\n' if not self.binary else b'\n'
if capture_all:
self._all = StringIO() if not self.binary else BytesIO()
else:
self._all = _IgnoredIO()
self.sock = create_websocket(configuration, url, headers)
self.subprotocol = getattr(self.sock, 'subprotocol', None)
if not self.subprotocol and self.sock:
headers_dict = self.sock.getheaders()
if headers_dict:
for k, v in headers_dict.items():
if k.lower() == 'sec-websocket-protocol':
self.subprotocol = v
break
self._connected = True
self._returncode = None

def peek_channel(self, channel, timeout=0):
"""Peek a channel and return part of the input,
empty string otherwise."""
if channel in self._closed_channels and channel not in self._channels:
return b"" if self.binary else ""
self.update(timeout=timeout)
if channel in self._channels:
return self._channels[channel]
return ""
return b"" if self.binary else ""

def read_channel(self, channel, timeout=0):
"""Read data from a channel."""
if channel in self._closed_channels and channel not in self._channels:
return b"" if self.binary else ""
if channel not in self._channels:
ret = self.peek_channel(channel, timeout)
else:
Expand All @@ -93,6 +111,7 @@ def readline_channel(self, channel, timeout=None):
timeout = float("inf")
start = time.time()
while self.is_open() and time.time() - start < timeout:
# Always try to drain the channel first
if channel in self._channels:
data = self._channels[channel]
if self.newline in data:
Expand All @@ -104,6 +123,14 @@ def readline_channel(self, channel, timeout=None):
else:
del self._channels[channel]
return ret

if channel in self._closed_channels:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this same short-circuiting code in peek_channel and read_channel ?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

if channel in self._channels:
ret = self._channels[channel]
del self._channels[channel]
return ret
return b"" if self.binary else ""

self.update(timeout=(timeout - time.time() + start))

def write_channel(self, channel, data):
Expand All @@ -119,6 +146,14 @@ def write_channel(self, channel, data):
payload = channel_prefix + data
self.sock.send(payload, opcode=opcode)

def close_channel(self, channel):
"""Close a channel (v5 protocol only)."""
if self.subprotocol != V5_CHANNEL_PROTOCOL:
return
data = bytes([CLOSE_CHANNEL, channel])
self.sock.send(data, opcode=ABNF.OPCODE_BINARY)
self._closed_channels.add(channel)

def peek_stdout(self, timeout=0):
"""Same as peek_channel with channel=1."""
return self.peek_channel(STDOUT_CHANNEL, timeout=timeout)
Expand Down Expand Up @@ -200,13 +235,24 @@ def update(self, timeout=0):
return
elif op_code == ABNF.OPCODE_BINARY or op_code == ABNF.OPCODE_TEXT:
data = frame.data
if six.PY3 and not self.binary:
data = data.decode("utf-8", "replace")
if len(data) > 1:
if len(data) > 0:
# Parse channel from raw bytes to support v5 CLOSE signal AND avoid charset issues
channel = data[0]
if six.PY3 and not self.binary:
channel = ord(channel)
# In Py3, iterating bytes gives int, but indexing bytes gives int.
# websocket-client frame.data might be bytes.

if channel == CLOSE_CHANNEL and self.subprotocol == V5_CHANNEL_PROTOCOL: # v5 CLOSE
if len(data) > 1:
# data[1] is already int in Py3 bytes
close_chan = data[1]
self._closed_channels.add(close_chan)
return

data = data[1:]
# Decode data if expected text
if not self.binary:
data = data.decode("utf-8", "replace")

if data:
if channel in [STDOUT_CHANNEL, STDERR_CHANNEL]:
# keeping all messages in the order they received
Expand Down Expand Up @@ -476,7 +522,7 @@ def create_websocket(configuration, url, headers=None):
header.append("sec-websocket-protocol: %s" %
headers['sec-websocket-protocol'])
else:
header.append("sec-websocket-protocol: v4.channel.k8s.io")
header.append("sec-websocket-protocol: %s,%s" % (V5_CHANNEL_PROTOCOL, V4_CHANNEL_PROTOCOL))

if url.startswith('wss://') and configuration.verify_ssl:
ssl_opts = {
Expand Down
223 changes: 222 additions & 1 deletion kubernetes/base/stream/ws_client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,18 @@
# limitations under the License.

import unittest
from unittest.mock import MagicMock, patch

from .ws_client import get_websocket_url
from . import ws_client as ws_client_module
from .ws_client import get_websocket_url, WSClient, V5_CHANNEL_PROTOCOL, V4_CHANNEL_PROTOCOL, CLOSE_CHANNEL, STDIN_CHANNEL
from .ws_client import websocket_proxycare
from kubernetes.client.configuration import Configuration
import os
import socket
import threading
import pytest
from kubernetes import stream, client, config
import websocket

try:
import urllib3
Expand Down Expand Up @@ -123,6 +126,224 @@ def test_websocket_proxycare(self):
assert dictval(connect_opts, 'http_proxy_auth') == expect_auth
assert dictval(connect_opts, 'http_no_proxy') == expect_noproxy


class WSClientProtocolTest(unittest.TestCase):
"""Tests for WSClient V5 protocol handling"""

def setUp(self):
# Mock configuration to avoid real connections in WSClient.__init__
self.config_mock = MagicMock()
self.config_mock.assert_hostname = False
self.config_mock.api_key = {}
self.config_mock.proxy = None
self.config_mock.ssl_ca_cert = None
self.config_mock.cert_file = None
self.config_mock.key_file = None
self.config_mock.verify_ssl = True

def test_create_websocket_header(self):
"""Verify sec-websocket-protocol header requests v5 first"""
# Patch WebSocket class in the module
with patch.object(ws_client_module, 'WebSocket', autospec=True) as mock_ws_cls:
mock_ws = mock_ws_cls.return_value

WSClient(self.config_mock, "ws://test", headers=None, capture_all=True)

mock_ws.connect.assert_called_once()
call_args = mock_ws.connect.call_args
# connect(url, **options)
# check kwargs for 'header'
kwargs = call_args[1]
self.assertIn('header', kwargs)
expected_header = f"sec-websocket-protocol: {V5_CHANNEL_PROTOCOL},{V4_CHANNEL_PROTOCOL}"
self.assertIn(expected_header, kwargs['header'])

def test_close_channel_v5(self):
"""Verify close_channel sends correct frame when v5 is negotiated"""
with patch.object(ws_client_module, 'create_websocket') as mock_create:
mock_ws = MagicMock()
mock_ws.subprotocol = V5_CHANNEL_PROTOCOL
mock_ws.connected = True
mock_create.return_value = mock_ws

client = WSClient(self.config_mock, "ws://test", headers=None, capture_all=True)
client.close_channel(0)

mock_ws.send.assert_called_with(bytes([CLOSE_CHANNEL, STDIN_CHANNEL]), opcode=websocket.ABNF.OPCODE_BINARY)

def test_close_channel_v4(self):
"""Verify close_channel does nothing when v4 is negotiated"""
with patch.object(ws_client_module, 'create_websocket') as mock_create:
mock_ws = MagicMock()
mock_ws.subprotocol = V4_CHANNEL_PROTOCOL
mock_ws.connected = True
mock_create.return_value = mock_ws

client = WSClient(self.config_mock, "ws://test", headers=None, capture_all=True)
client.close_channel(0)

mock_ws.send.assert_not_called()

def test_update_receives_close_v5(self):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add an additional unit test to verify how readline_channel handles a closed channel with leftover data?

While the current unit tests cover the parsing of the close signal itself, the new logic you added inside readline_channel—which flushes the remaining buffer even if it lacks a newline—is currently untested. Having a test that asserts readline_channel successfully flushes leftover data (e.g. "hello") and then returns an empty string on the subsequent call would ensure this specific edge-case logic is protected from future regressions.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added tests also for read_channel and peek_channel to validate channels are drained and follow the expected semantics

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

"""Verify update processes close signal when v5 is negotiated"""
with patch.object(ws_client_module, 'create_websocket') as mock_create, \
patch('select.select') as mock_select:

mock_ws = MagicMock()
mock_ws.subprotocol = V5_CHANNEL_PROTOCOL
mock_ws.connected = True
mock_ws.sock.fileno.return_value = 10

# Setup frame with close signal for channel 0
frame = MagicMock()
frame.data = bytes([CLOSE_CHANNEL, STDIN_CHANNEL])
mock_ws.recv_data_frame.return_value = (websocket.ABNF.OPCODE_BINARY, frame)

mock_create.return_value = mock_ws
# Make select return ready
mock_select.return_value = ([mock_ws.sock], [], [])

client = WSClient(self.config_mock, "ws://test", headers=None, capture_all=True)
client.update(timeout=0)

self.assertIn(0, client._closed_channels)

def test_update_ignores_close_signal_v4(self):
"""Verify update treats 0xFF as regular data (or ignores signal interpretation) when v4"""
with patch.object(ws_client_module, 'create_websocket') as mock_create, \
patch('select.select') as mock_select:

mock_ws = MagicMock()
mock_ws.subprotocol = V4_CHANNEL_PROTOCOL
mock_ws.connected = True
mock_ws.sock.fileno.return_value = 10

# Setup frame that looks like close signal but should be treated as data
frame = MagicMock()
frame.data = bytes([CLOSE_CHANNEL, STDIN_CHANNEL])
mock_ws.recv_data_frame.return_value = (websocket.ABNF.OPCODE_BINARY, frame)

mock_create.return_value = mock_ws
mock_select.return_value = ([mock_ws.sock], [], [])

client = WSClient(self.config_mock, "ws://test", headers=None, capture_all=True, binary=True) # binary=True to avoid decode errors
client.update(timeout=0)

# Should NOT be in closed channels
self.assertNotIn(0, client._closed_channels)
# Should be in data channels (channel 255 with data \x00)
# Code: channel = data[0] (255), data = data[1:] (\x00)
# if channel (255) not in _channels...
self.assertIn(255, client._channels)
self.assertEqual(client._channels[255], b'\x00')

def test_readline_channel_closed_with_leftover_data(self):
"""Verify readline_channel flushes remaining buffer when channel is closed"""
with patch.object(ws_client_module, 'create_websocket') as mock_create:
mock_ws = MagicMock()
mock_ws.subprotocol = V5_CHANNEL_PROTOCOL
mock_ws.connected = True
mock_create.return_value = mock_ws

client = WSClient(self.config_mock, "ws://test", headers=None, capture_all=True, binary=False)

# Simulate some data in the channel buffer, and then close it
client._channels[1] = "hello"
client._closed_channels.add(1)

# First call to readline should flush "hello" even though there is no newline
line1 = client.readline_channel(1)
self.assertEqual(line1, "hello")

# Subsequent call should return empty string
line2 = client.readline_channel(1)
self.assertEqual(line2, "")

def test_readline_channel_closed_with_leftover_data_binary(self):
"""Verify readline_channel flushes remaining buffer when channel is closed in binary mode"""
with patch.object(ws_client_module, 'create_websocket') as mock_create:
mock_ws = MagicMock()
mock_ws.subprotocol = V5_CHANNEL_PROTOCOL
mock_ws.connected = True
mock_create.return_value = mock_ws

client = WSClient(self.config_mock, "ws://test", headers=None, capture_all=True, binary=True)

# Simulate some bytes in the channel buffer, and then close it
client._channels[1] = b"hello-binary"
client._closed_channels.add(1)

# First call to readline should flush leftover bytes
line1 = client.readline_channel(1)
self.assertEqual(line1, b"hello-binary")

# Subsequent call should return empty bytes
line2 = client.readline_channel(1)
self.assertEqual(line2, b"")

def test_read_channel_closed_with_leftover_data(self):
"""Verify read_channel drains leftover data and then short-circuits on closed channel"""
with patch.object(ws_client_module, 'create_websocket') as mock_create:
mock_ws = MagicMock()
mock_ws.subprotocol = V5_CHANNEL_PROTOCOL
mock_ws.connected = True
mock_ws.sock.fileno.return_value = 10
mock_create.return_value = mock_ws

client = WSClient(self.config_mock, "ws://test", headers=None, capture_all=True, binary=False)

# Simulate leftover data and closed channel
client._channels[1] = "hello"
client._closed_channels.add(1)

# First call should drain data
data1 = client.read_channel(1)
self.assertEqual(data1, "hello")

# Subsequent call should short-circuit and return empty string
# Patch `update` to assert it is NOT called (short-circuit)
with patch.object(client, 'update') as mock_update:
data2 = client.read_channel(1)
self.assertEqual(data2, "")
mock_update.assert_not_called()

def test_peek_channel_closed_with_leftover_data(self):
"""Verify peek_channel allows peeking leftover data and then short-circuits after draining"""
with patch.object(ws_client_module, 'create_websocket') as mock_create, \
patch('select.poll') as mock_poll:
mock_poll.return_value.poll.return_value = []
mock_ws = MagicMock()
mock_ws.subprotocol = V5_CHANNEL_PROTOCOL
mock_ws.connected = True
mock_ws.sock.fileno.return_value = 10
mock_create.return_value = mock_ws

client = WSClient(self.config_mock, "ws://test", headers=None, capture_all=True, binary=False)

# Simulate leftover data and closed channel
client._channels[1] = "hello"
client._closed_channels.add(1)

# First peek should return data without draining
data1 = client.peek_channel(1)
self.assertEqual(data1, "hello")

# Second peek should still return data
data2 = client.peek_channel(1)
self.assertEqual(data2, "hello")

# Drain it
client.read_channel(1)

# Now peek should short-circuit and return empty string
# Patch `update` to assert it is NOT called (short-circuit)
with patch.object(client, 'update') as mock_update:
data3 = client.peek_channel(1)
self.assertEqual(data3, "")
mock_update.assert_not_called()



@pytest.fixture(scope="module")
def dummy_proxy():
#Dummy Proxy
Expand Down
Loading