diff --git a/scapy/contrib/j1939.py b/scapy/contrib/j1939.py index e560aadddd0..97021fff813 100644 --- a/scapy/contrib/j1939.py +++ b/scapy/contrib/j1939.py @@ -35,6 +35,7 @@ import struct import logging import time +import traceback from typing import ( Any, @@ -43,9 +44,14 @@ Optional, Tuple, Type, + Union, + cast, + TYPE_CHECKING, ) +from scapy.automaton import ObjectPipe, select_objects from scapy.config import conf +from scapy.consts import LINUX from scapy.data import SO_TIMESTAMPNS from scapy.error import Scapy_Exception, log_runtime from scapy.fields import ( @@ -65,6 +71,10 @@ from scapy.packet import Packet from scapy.supersocket import SuperSocket from scapy.compat import raw +from scapy.utils import EDecimal + +if TYPE_CHECKING: + from scapy.contrib.cansocket import CANSocket log_j1939 = logging.getLogger("scapy.contrib.j1939") @@ -810,3 +820,782 @@ def send(self, x): except OSError as exc: log_j1939.error("Failed to send J1939 packet: %s", exc) return 0 + + +# --------------------------------------------------------------------------- +# J1939 Soft Socket +# --------------------------------------------------------------------------- +# Implements the SAE J1939 Transport Protocol (segmentation and reassembly) +# entirely in Python over any CANSocket, without requiring the Linux kernel +# CAN_J1939 socket module. The design mirrors ISOTPSoftSocket from +# scapy.contrib.isotp.isotp_soft_socket. + +# J1939-21 transport-protocol timing constants (seconds) +_J1939_TP_BAM_DELAY = 0.050 # minimum inter-packet gap for BAM sender (50 ms) +_J1939_TP_T1 = 0.750 # receiver timeout for first DT after BAM/RTS +_J1939_TP_T2 = 1.250 # receiver timeout between consecutive DT frames +_J1939_TP_T3 = 1.250 # sender timeout waiting for CTS after RTS/block +_J1939_TP_T4 = 1.050 # sender timeout waiting for End-of-Message ACK + +# On slow serial interfaces (slcan) the OS serial buffer may hold hundreds of +# background CAN frames that the mux must drain before the TP.DT frames +# arrive. When the inactivity timer fires, the handler checks the total +# elapsed time; if it is below _J1939_TP_T2 × _J1939_TP_DT_TIMEOUT_EXTENSION +# (i.e. 1.25 s × 10 = 12.5 s), the timer is re-armed and the session +# continues. Only after that wall-clock ceiling is exceeded is the transfer +# declared timed-out. +_J1939_TP_DT_TIMEOUT_EXTENSION = 10 + +# Maximum payload / per-frame data constants +_J1939_TP_DT_DATA = 7 # usable data bytes per TP.DT packet +_J1939_TP_MAX_DATA = 1785 # maximum J1939 TP payload (255 × 7 bytes) + +# Internal RX state codes +_J1939_RX_IDLE = 0 +_J1939_RX_WAIT_DT = 1 # waiting for TP.DT frames + +# Internal TX state codes +_J1939_TX_IDLE = 0 +_J1939_TX_BAM = 1 # BAM TP.DT frames are being sent +_J1939_TX_RTS_WAIT_CTS = 2 # RTS sent; waiting for CTS +_J1939_TX_RTS_SENDING = 3 # CTS received; sending TP.DT block + + +class J1939TPImplementation: + """Software implementation of the SAE J1939 Transport Protocol state machine. + + All state is stored here so that the garbage collector can reclaim a + :class:`J1939SoftSocket` even while the background + :class:`~scapy.contrib.isotp.isotp_soft_socket.TimeoutScheduler` thread + holds a reference to this object. + + :param can_socket: a :class:`~scapy.contrib.cansocket.CANSocket` used for + raw CAN I/O + :param src_addr: this node's J1939 source address (0x00–0xFD) + :param listen_only: when ``True`` the implementation never sends CTS, ACK, + or ABORT frames, allowing passive monitoring of TP + sessions without influencing the bus. Received payloads + are still reassembled and delivered via :meth:`recv`. + :param pgn_filter: when non-zero, only messages whose PGN matches this + value are delivered. ``0`` (the default) accepts all + PGNs. Inspired by BenGardiner's ``rx_pgn`` parameter. + """ + + def __init__( + self, + can_socket, # type: "CANSocket" + src_addr, # type: int + listen_only=False, # type: bool + pgn_filter=0, # type: int + ): + # type: (...) -> None + from scapy.contrib.isotp.isotp_soft_socket import TimeoutScheduler + self._TimeoutScheduler = TimeoutScheduler + + self.can_socket = can_socket + self.src_addr = src_addr + self.listen_only = listen_only + self.pgn_filter = pgn_filter # 0 = accept all PGNs + self.closed = False + self.rx_tx_poll_rate = 0.005 + + # ── receive path ────────────────────────────────────────────────────── + self.rx_state = _J1939_RX_IDLE # type: int + # Active RX session fields (valid when rx_state == _J1939_RX_WAIT_DT) + self.rx_pgn = 0 # PGN being received + self.rx_peer_sa = socket.J1939_NO_ADDR # SA of the sending node + self.rx_dst = socket.J1939_NO_ADDR # DA (our SA or 0xFF broadcast) + self.rx_total = 0 # total payload size (bytes) + self.rx_npkts = 0 # total TP.DT packets expected + self.rx_buf = b'' # accumulated payload bytes + self.rx_seq = 1 # next expected DT seq number + self.rx_ts = 0.0 # type: Union[float, EDecimal] + self.rx_is_bam = True # True=BAM; False=RTS/CTS + self.rx_start_time = 0.0 # wall-clock start of current TP rx + self.rx_timeout_handle = None # type: Optional[Any] + + # Delivered received messages: each item is (J1939, timestamp) + self.rx_queue = ObjectPipe() # type: ignore + + # ── transmit path ───────────────────────────────────────────────────── + self.tx_state = _J1939_TX_IDLE # type: int + self.tx_buf = None # type: Optional[bytes] + self.tx_pgn = 0 + self.tx_dst = socket.J1939_NO_ADDR + self.tx_priority = 6 + self.tx_data_page = 0 + self.tx_npkts = 0 # total TP.DT packets to send + self.tx_seq = 1 # next TP.DT sequence number to send + self.tx_peer_sa = socket.J1939_NO_ADDR # peer SA for RTS/CTS sessions + # CTS block management + self.tx_cts_count = 0 # DTs still to send in current CTS block + self.tx_timeout_handle = None # type: Optional[Any] + + # Enqueued outgoing messages: each item is a J1939 packet + self.tx_queue = ObjectPipe() # type: ignore + + # ── background polling ──────────────────────────────────────────────── + self.rx_handle = TimeoutScheduler.schedule(0, self.can_recv) + self.tx_handle = TimeoutScheduler.schedule(0, self._tx_poll) + + # ── lifecycle ───────────────────────────────────────────────────────────── + + def __del__(self): + # type: () -> None + self.close() + + def close(self): + # type: () -> None + if self.closed: + return + self.closed = True + # Brief pause so any in-flight scheduler callback sees the flag. + time.sleep(0.005) + + for handle in (self.rx_handle, self.tx_handle, + self.rx_timeout_handle, self.tx_timeout_handle): + if handle is not None: + try: + handle.cancel() + except Exception: + pass + + try: + self.rx_queue.close() + except Exception: + pass + try: + self.tx_queue.close() + except Exception: + pass + + # ── CAN receive loop ───────────────────────────────────────────────────── + + def can_recv(self): + # type: () -> None + if self.closed: + return + try: + while self.can_socket.select([self.can_socket], 0): + if self.closed: + break + pkt = self.can_socket.recv() + if pkt: + self.on_can_recv(pkt) + else: + break + except Exception: + if not self.closed: + log_j1939.warning( + "J1939TPImplementation.can_recv error: %s", + traceback.format_exc()) + + if not self.closed and not self.can_socket.closed: + self.rx_handle = self._TimeoutScheduler.schedule( + self.rx_tx_poll_rate, self.can_recv) + + def on_can_recv(self, pkt): + # type: (Packet) -> None + """Decode *pkt* as a :class:`J1939_CAN` frame and route it.""" + try: + j = J1939_CAN(bytes(pkt)) + j.time = getattr(pkt, 'time', None) or time.time() + except Exception: + return + + pf = j.pdu_format + ps = j.pdu_specific + sa = j.src + + # Ignore frames sent by this node (CAN loopback echo guard). + if sa == self.src_addr: + return + + # ── TP.CM (PF = 0xEC) ──────────────────────────────────────────────── + if pf == (J1939_PGN_TP_CM >> 8): # 0xEC + # PS must address us or be broadcast. + if ps != self.src_addr and ps != socket.J1939_NO_ADDR: + return + self._on_tp_cm(j) + return + + # ── TP.DT (PF = 0xEB) ──────────────────────────────────────────────── + if pf == (J1939_PGN_TP_DT >> 8): # 0xEB + if ps != self.src_addr and ps != socket.J1939_NO_ADDR: + return + self._on_tp_dt(j) + return + + # ── Short (≤ 8-byte) data frame ────────────────────────────────────── + # PDU1: ps is the destination address. PDU2: always broadcast. + if pf <= J1939_PDU1_MAX_PF: + if ps != self.src_addr and ps != socket.J1939_NO_ADDR: + return + self._on_short_frame(j) + + # ── RX frame handlers ──────────────────────────────────────────────────── + + def _on_short_frame(self, j): + # type: (J1939_CAN) -> None + data = bytes(j.data) + if self.pgn_filter != 0 and j.pgn != self.pgn_filter: + return + msg = J1939(data, pgn=j.pgn, src=j.src, dst=j.dst, priority=j.priority) + self.rx_queue.send((msg, j.time)) + + def _on_tp_cm(self, j): + # type: (J1939_CAN) -> None + data = bytes(j.data) + if not data: + return + ctrl = data[0] + sa = j.src + ts = j.time + + if ctrl == J1939_TP_CTRL_BAM: + if len(data) < 8: + return + cm = J1939_TP_CM_BAM(data) + if self.pgn_filter != 0 and cm.pgn != self.pgn_filter: + return + if self.rx_state != _J1939_RX_IDLE: + log_j1939.debug("J1939 TP: new BAM overwrites active RX session") + self._rx_reset() + self._rx_start(sa=sa, pgn=cm.pgn, dst=socket.J1939_NO_ADDR, + total=cm.total_size, npkts=cm.num_packets, + is_bam=True, ts=ts) + + elif ctrl == J1939_TP_CTRL_RTS: + if len(data) < 8: + return + cm = J1939_TP_CM_RTS(data) + if self.pgn_filter != 0 and cm.pgn != self.pgn_filter: + return + if self.rx_state != _J1939_RX_IDLE: + log_j1939.debug("J1939 TP: new RTS overwrites active RX session") + self._rx_reset() + self._rx_start(sa=sa, pgn=cm.pgn, dst=self.src_addr, + total=cm.total_size, npkts=cm.num_packets, + is_bam=False, ts=ts) + # Respond with CTS authorising all packets starting at seq 1. + if not self.listen_only: + self._can_send_tp_cm( + dst_sa=sa, + data=bytes(J1939_TP_CM_CTS( + num_packets=cm.num_packets, + next_packet=1, + pgn=cm.pgn, + )), + ) + + elif ctrl == J1939_TP_CTRL_CTS: + if (self.tx_state == _J1939_TX_RTS_WAIT_CTS + and sa == self.tx_peer_sa and len(data) >= 8): + self._tx_handle_cts(J1939_TP_CM_CTS(data)) + + elif ctrl == J1939_TP_CTRL_ACK: + if (self.tx_state in (_J1939_TX_RTS_WAIT_CTS, _J1939_TX_RTS_SENDING) + and sa == self.tx_peer_sa): + self._tx_reset() + + elif ctrl == J1939_TP_CTRL_ABORT: + if sa == self.tx_peer_sa: + reason = data[1] if len(data) > 1 else 0 + log_j1939.warning( + "J1939 TP: TX session aborted by peer (reason %d)", reason) + self._tx_reset() + + def _on_tp_dt(self, j): + # type: (J1939_CAN) -> None + if self.rx_state != _J1939_RX_WAIT_DT: + return + sa = j.src + if sa != self.rx_peer_sa: + return + data = bytes(j.data) + if len(data) < 8: + return + + dt = J1939_TP_DT(data) + seq = dt.seq_num + if seq != self.rx_seq: + log_j1939.warning( + "J1939 TP: bad DT seq %d (expected %d)", seq, self.rx_seq) + if not self.rx_is_bam and not self.listen_only: + self._can_send_tp_cm( + dst_sa=sa, + data=bytes(J1939_TP_CM_ABORT(reason=7, pgn=self.rx_pgn)), + ) + self._rx_reset() + return + + self.rx_buf += dt.data + self.rx_seq += 1 + + # Cancel / reschedule the DT timeout. + if self.rx_timeout_handle is not None: + try: + self.rx_timeout_handle.cancel() + except Exception: + pass + self.rx_timeout_handle = None + + if seq >= self.rx_npkts: + # All packets received – finalise the message. + payload = self.rx_buf[:self.rx_total] + if not self.rx_is_bam and not self.listen_only: + self._can_send_tp_cm( + dst_sa=sa, + data=bytes(J1939_TP_CM_ACK( + total_size=self.rx_total, + num_packets=self.rx_npkts, + pgn=self.rx_pgn, + )), + ) + msg = J1939(payload, + pgn=self.rx_pgn, src=self.rx_peer_sa, + dst=self.rx_dst, priority=6) + self.rx_queue.send((msg, self.rx_ts)) + self._rx_reset() + else: + self.rx_timeout_handle = self._TimeoutScheduler.schedule( + _J1939_TP_T2, self._rx_timeout) + + # ── RX session helpers ──────────────────────────────────────────────────── + + def _rx_start(self, sa, pgn, dst, total, npkts, is_bam, ts): + # type: (int, int, int, int, int, bool, Union[float, EDecimal]) -> None + self.rx_state = _J1939_RX_WAIT_DT + self.rx_peer_sa = sa + self.rx_pgn = pgn + self.rx_dst = dst + self.rx_total = total + self.rx_npkts = npkts + self.rx_buf = b'' + self.rx_seq = 1 + self.rx_ts = ts + self.rx_is_bam = is_bam + self.rx_start_time = time.monotonic() + if self.rx_timeout_handle is not None: + try: + self.rx_timeout_handle.cancel() + except Exception: + pass + self.rx_timeout_handle = self._TimeoutScheduler.schedule( + _J1939_TP_T1, self._rx_timeout) + + def _rx_reset(self): + # type: () -> None + self.rx_state = _J1939_RX_IDLE + if self.rx_timeout_handle is not None: + try: + self.rx_timeout_handle.cancel() + except Exception: + pass + self.rx_timeout_handle = None + + def _rx_timeout(self): + # type: () -> None + if self.closed or self.rx_state == _J1939_RX_IDLE: + return + # On slow serial interfaces (slcan) the OS serial buffer may hold many + # background CAN frames queued ahead of TP.DT frames. Re-arm the + # timer as long as the total elapsed time since the session started is + # below _J1939_TP_T2 × _J1939_TP_DT_TIMEOUT_EXTENSION (12.5 s total). + total_wait = time.monotonic() - self.rx_start_time + if total_wait < _J1939_TP_T2 * _J1939_TP_DT_TIMEOUT_EXTENSION: + self.rx_timeout_handle = self._TimeoutScheduler.schedule( + _J1939_TP_T2, self._rx_timeout) + return + log_j1939.warning( + "J1939 TP: RX timeout – discarding incomplete message " + "(PGN=0x%05X SA=0x%02X)", self.rx_pgn, self.rx_peer_sa) + self._rx_reset() + + # ── CAN send helpers ────────────────────────────────────────────────────── + + def _can_send(self, pkt): + # type: (J1939_CAN) -> None + try: + self.can_socket.send(pkt) + except Exception: + log_j1939.warning( + "J1939 CAN send failed: %s", traceback.format_exc()) + + def _can_send_tp_cm(self, dst_sa, data): + # type: (int, bytes) -> None + pkt = J1939_CAN( + priority=6, data_page=0, + pdu_format=J1939_PGN_TP_CM >> 8, # 0xEC + pdu_specific=dst_sa, + src=self.src_addr, + data=data, + ) + self._can_send(pkt) + + def _can_send_tp_dt(self, dst_sa, seq_num, chunk): + # type: (int, int, bytes) -> None + padded = chunk + b'\xff' * (_J1939_TP_DT_DATA - len(chunk)) + dt = J1939_TP_DT(seq_num=seq_num, data=padded[:_J1939_TP_DT_DATA]) + pkt = J1939_CAN( + priority=7, data_page=0, + pdu_format=J1939_PGN_TP_DT >> 8, # 0xEB + pdu_specific=dst_sa, + src=self.src_addr, + data=bytes(dt), + ) + self._can_send(pkt) + + # ── TX state machine ────────────────────────────────────────────────────── + + def _tx_poll(self): + # type: () -> None + """Dequeue and start transmitting the next J1939 message.""" + if self.closed: + return + try: + if self.tx_state == _J1939_TX_IDLE: + if select_objects([self.tx_queue], 0): + msg = self.tx_queue.recv() + if msg is not None: + self._begin_send(msg) + except Exception: + if not self.closed: + log_j1939.warning( + "J1939 _tx_poll error: %s", traceback.format_exc()) + if not self.closed: + self.tx_handle = self._TimeoutScheduler.schedule( + self.rx_tx_poll_rate, self._tx_poll) + + def _begin_send(self, msg): + # type: (Packet) -> None + """Start transmitting *msg*. Called from _tx_poll in the scheduler thread.""" + if isinstance(msg, J1939): + data = msg.data + if not isinstance(data, (bytes, bytearray)): + data = bytes(msg) + data = bytes(data) + pgn = msg.pgn + dst = msg.dst + priority = msg.priority + else: + data = bytes(msg) + pgn = 0 + dst = socket.J1939_NO_ADDR + priority = 6 + + data_page = (pgn >> 16) & 0x1 + pf = (pgn >> 8) & 0xFF + + if len(data) <= 8: + # Single CAN frame – no TP needed. + if pf <= J1939_PDU1_MAX_PF: + ps = dst & 0xFF + else: + ps = pgn & 0xFF + pkt = J1939_CAN( + priority=priority, data_page=data_page, + pdu_format=pf, pdu_specific=ps, + src=self.src_addr, data=data, + ) + self._can_send(pkt) + + elif dst == socket.J1939_NO_ADDR or dst == 0xFF: + # Broadcast multi-packet message via BAM. + self._tx_start_bam(data, pgn, dst, priority, data_page) + + else: + # Unicast multi-packet message via RTS/CTS. + self._tx_start_rts(data, pgn, dst, priority, data_page) + + # ── BAM TX ─────────────────────────────────────────────────────────────── + + def _tx_start_bam(self, data, pgn, dst, priority, data_page): + # type: (bytes, int, int, int, int) -> None + npkts = (len(data) + _J1939_TP_DT_DATA - 1) // _J1939_TP_DT_DATA + bam = J1939_TP_CM_BAM(total_size=len(data), num_packets=npkts, pgn=pgn) + self._can_send_tp_cm(socket.J1939_NO_ADDR, bytes(bam)) + + self.tx_state = _J1939_TX_BAM + self.tx_buf = data + self.tx_pgn = pgn + self.tx_dst = dst + self.tx_priority = priority + self.tx_data_page = data_page + self.tx_npkts = npkts + self.tx_seq = 1 + self.tx_timeout_handle = self._TimeoutScheduler.schedule( + _J1939_TP_BAM_DELAY, self._tx_bam_next_dt) + + def _tx_bam_next_dt(self): + # type: () -> None + if self.closed or self.tx_state != _J1939_TX_BAM or self.tx_buf is None: + self._tx_reset() + return + seq = self.tx_seq + start = (seq - 1) * _J1939_TP_DT_DATA + chunk = self.tx_buf[start:start + _J1939_TP_DT_DATA] + self._can_send_tp_dt(socket.J1939_NO_ADDR, seq, chunk) + self.tx_seq += 1 + if self.tx_seq > self.tx_npkts: + self._tx_reset() + else: + self.tx_timeout_handle = self._TimeoutScheduler.schedule( + _J1939_TP_BAM_DELAY, self._tx_bam_next_dt) + + # ── RTS/CTS TX ─────────────────────────────────────────────────────────── + + def _tx_start_rts(self, data, pgn, dst, priority, data_page): + # type: (bytes, int, int, int, int) -> None + npkts = (len(data) + _J1939_TP_DT_DATA - 1) // _J1939_TP_DT_DATA + rts = J1939_TP_CM_RTS( + total_size=len(data), num_packets=npkts, + max_packets=0xFF, pgn=pgn, + ) + self._can_send_tp_cm(dst, bytes(rts)) + + self.tx_state = _J1939_TX_RTS_WAIT_CTS + self.tx_buf = data + self.tx_pgn = pgn + self.tx_dst = dst + self.tx_priority = priority + self.tx_data_page = data_page + self.tx_npkts = npkts + self.tx_seq = 1 + self.tx_peer_sa = dst + self.tx_timeout_handle = self._TimeoutScheduler.schedule( + _J1939_TP_T3, self._tx_timeout) + + def _tx_handle_cts(self, cts): + # type: (J1939_TP_CM_CTS) -> None + if self.tx_timeout_handle is not None: + try: + self.tx_timeout_handle.cancel() + except Exception: + pass + self.tx_timeout_handle = None + + if cts.num_packets == 0: + # Receiver requested a hold; wait for another CTS. + self.tx_state = _J1939_TX_RTS_WAIT_CTS + self.tx_timeout_handle = self._TimeoutScheduler.schedule( + _J1939_TP_T3, self._tx_timeout) + return + + self.tx_cts_count = cts.num_packets + self.tx_seq = cts.next_packet + self.tx_state = _J1939_TX_RTS_SENDING + self._tx_rts_send_block() + + def _tx_rts_send_block(self): + # type: () -> None + """Send the block of TP.DT frames authorised by the most recent CTS.""" + if self.closed or self.tx_state != _J1939_TX_RTS_SENDING \ + or self.tx_buf is None: + self._tx_reset() + return + + sent = 0 + while sent < self.tx_cts_count: + seq = self.tx_seq + if seq > self.tx_npkts: + break + start = (seq - 1) * _J1939_TP_DT_DATA + chunk = self.tx_buf[start:start + _J1939_TP_DT_DATA] + self._can_send_tp_dt(self.tx_dst, seq, chunk) + self.tx_seq += 1 + sent += 1 + + # After the block, wait for the next CTS (or ACK if all data sent). + self.tx_state = _J1939_TX_RTS_WAIT_CTS + timeout = _J1939_TP_T4 if self.tx_seq > self.tx_npkts else _J1939_TP_T3 + self.tx_timeout_handle = self._TimeoutScheduler.schedule( + timeout, self._tx_timeout) + + def _tx_timeout(self): + # type: () -> None + if self.closed or self.tx_state == _J1939_TX_IDLE: + return + log_j1939.warning( + "J1939 TP: TX timeout (PGN=0x%05X DA=0x%02X)", + self.tx_pgn, self.tx_dst) + self._tx_reset() + + def _tx_reset(self): + # type: () -> None + self.tx_state = _J1939_TX_IDLE + self.tx_buf = None + if self.tx_timeout_handle is not None: + try: + self.tx_timeout_handle.cancel() + except Exception: + pass + self.tx_timeout_handle = None + + # ── public interface ───────────────────────────────────────────────────── + + def send(self, msg): + # type: (Packet) -> None + """Enqueue *msg* for transmission.""" + self.tx_queue.send(msg) + + def recv(self): + # type: () -> Optional[Tuple[J1939, Union[float, EDecimal]]] + """Return the next received :class:`J1939` message from the queue.""" + return self.rx_queue.recv() # type: ignore + + +class J1939SoftSocket(SuperSocket): + """Software J1939 application-layer socket over a :class:`CANSocket`. + + Implements the SAE J1939 Transport Protocol (segmentation and + reassembly) entirely in Python, without requiring the Linux kernel + ``CAN_J1939`` socket module. It is API-compatible with + :class:`NativeJ1939Socket` and works on any platform that has a CAN + socket layer (Linux SocketCAN via + :class:`~scapy.contrib.cansocket_native.NativeCANSocket`, or any platform + via :class:`~scapy.contrib.cansocket_python_can.PythonCANSocket`). + + The implementation mirrors :class:`~scapy.contrib.isotp.ISOTPSoftSocket`: + a background thread driven by + :class:`~scapy.contrib.isotp.isotp_soft_socket.TimeoutScheduler` polls the + CAN socket and advances the TP state machine, so + :class:`J1939SoftSocket` can send Flow-Control (CTS / ACK / ABORT) frames + even before :meth:`recv` is called. + + Example – broadcast receive:: + + >>> cansock = NativeCANSocket("vcan0") + >>> with J1939SoftSocket(cansock, src_addr=0x00) as s: + ... pkt = s.recv() + + Example – broadcast send:: + + >>> cansock = NativeCANSocket("vcan0") + >>> with J1939SoftSocket(cansock, src_addr=0x00) as s: + ... s.send(J1939(b'\\x01\\x02', pgn=0xFECA, dst=0xFF)) + + :param can_socket: a :class:`~scapy.contrib.cansocket.CANSocket` instance + *or* a CAN interface name string (Linux only) + :param src_addr: this node's J1939 source address (0x00–0xFD); + defaults to :data:`socket.J1939_NO_ADDR` (0xFE = no address) + :param basecls: packet class for received messages + (default: :class:`J1939`) + :param listen_only: when ``True``, never send CTS / ACK / ABORT frames; + all received TP sessions are still reassembled and + delivered. Useful for passive bus monitoring. + :param pgn: when non-zero, only messages whose PGN matches this + value are delivered; ``0`` (the default) accepts every + PGN. Inspired by BenGardiner's ``rx_pgn`` parameter. + """ + + desc = ("read/write J1939 messages using a software " + "transport-protocol implementation") + + def __init__( + self, + can_socket=None, # type: Optional["CANSocket"] + src_addr=socket.J1939_NO_ADDR, # type: int + basecls=J1939, # type: Type[Packet] + listen_only=False, # type: bool + pgn=0, # type: int + ): + # type: (...) -> None + if LINUX and isinstance(can_socket, str): + from scapy.contrib.cansocket_native import NativeCANSocket + can_socket = NativeCANSocket(can_socket) + elif isinstance(can_socket, str): + raise Scapy_Exception( + "Provide a CANSocket object instead of an interface name") + + self.src_addr = src_addr + self.basecls = basecls + + impl = J1939TPImplementation( + can_socket, src_addr, + listen_only=listen_only, + pgn_filter=pgn, + ) + # Cast so SuperSocket internals are satisfied (recv/send are overridden). + self.ins = cast(socket.socket, impl) + self.outs = cast(socket.socket, impl) + self.impl = impl + + if basecls is None: + log_j1939.warning("Provide a basecls") + + # ── lifecycle ───────────────────────────────────────────────────────────── + + def close(self): + # type: () -> None + if not self.closed: + if hasattr(self, "impl"): + self.impl.close() + self.closed = True + + # ── recv / send ────────────────────────────────────────────────────────── + + def recv_raw(self, x=0xffff): + # type: (int) -> Tuple[Optional[Type[Packet]], Optional[bytes], Optional[float]] + # Not used for J1939SoftSocket; recv() is overridden directly. + return self.basecls, None, None + + def recv(self, x=0xffff, **kwargs): + # type: (int, **Any) -> Optional[Packet] + """Receive the next :class:`J1939` message. + + Blocks until a complete message is available or the socket is closed. + Returns ``None`` if the socket is closed before a message arrives. + """ + if self.closed: + return None + tup = self.impl.recv() + if tup is None: + return None + msg, ts = tup + msg.time = float(ts) + return msg + + def send(self, x): + # type: (Packet) -> int + """Enqueue *x* for transmission. + + If *x* is a :class:`J1939` packet its ``pgn``, ``dst``, and + ``priority`` attributes are used. Payloads of 8 bytes or fewer are + sent as a single CAN frame; larger payloads use the J1939 Transport + Protocol automatically (BAM for broadcast, RTS/CTS for unicast). + """ + if self.closed: + return 0 + try: + x.sent_time = time.time() + except AttributeError: + pass + self.impl.send(x) + return len(bytes(x)) + + # ── select ──────────────────────────────────────────────────────────────── + + @staticmethod + def select(sockets, remain=None): # type: ignore[override] + # type: (List[Union[SuperSocket, ObjectPipe[Any]]], Optional[float]) -> List[Union[SuperSocket, ObjectPipe[Any]]] + """Support :func:`~scapy.sendrecv.sniff` on :class:`J1939SoftSocket`.""" + obj_pipes = [ + x.impl.rx_queue for x in sockets + if isinstance(x, J1939SoftSocket) and not x.closed + ] + obj_pipes += [ + x for x in sockets + if isinstance(x, ObjectPipe) and not x.closed + ] + ready_pipes = select_objects(obj_pipes, remain) + result = [ + x for x in sockets + if isinstance(x, J1939SoftSocket) and not x.closed + and x.impl.rx_queue in ready_pipes + ] + result += [ + x for x in sockets + if isinstance(x, ObjectPipe) and x in ready_pipes + ] + return result # type: ignore[return-value] diff --git a/test/contrib/j1939.uts b/test/contrib/j1939.uts index 6e0166ba02f..fe686e17bc9 100644 --- a/test/contrib/j1939.uts +++ b/test/contrib/j1939.uts @@ -2072,3 +2072,1291 @@ _r3_sock = NativeJ1939Socket("vcan0", src_addr=0x32, promisc=False) _r3_ret = _r3_sock.send(None) _r3_sock.close() assert _r3_ret == 0, "send(None) must return 0, got %r" % _r3_ret + + +############ +############ ++ J1939SoftSocket tests +~ not_pypy + += J1939SoftSocket imports + +import time +from scapy.contrib.j1939 import ( + J1939SoftSocket, + J1939TPImplementation, + J1939, J1939_CAN, + J1939_TP_CM_BAM, J1939_TP_CM_RTS, J1939_TP_CM_CTS, + J1939_TP_CM_ACK, J1939_TP_CM_ABORT, J1939_TP_DT, + J1939_TP_CTRL_BAM, J1939_TP_CTRL_RTS, J1939_TP_CTRL_CTS, + J1939_TP_CTRL_ACK, J1939_TP_CTRL_ABORT, +) +from scapy.layers.can import CAN +from test.testsocket import TestSocket, cleanup_testsockets +import socket as _socket + += J1939SoftSocket is importable and has the correct type + +assert issubclass(J1939SoftSocket, SuperSocket) + += J1939SoftSocket – context manager and close + +with TestSocket(CAN) as cans: + with J1939SoftSocket(cans, src_addr=0x00) as sock: + assert not sock.closed + assert sock.closed + += J1939SoftSocket – single-frame receive (broadcast PDU2) +# Inject a short J1939_CAN broadcast frame from SA=0x01; the soft socket +# should decode it and deliver a J1939 packet to the application layer. + +with TestSocket(CAN) as cans, TestSocket(CAN) as stim: + cans.pair(stim) + with J1939SoftSocket(cans, src_addr=0x00) as sock: + stim.send(J1939_CAN(priority=6, data_page=0, pdu_format=0xFE, + pdu_specific=0xCA, src=0x01, + data=b'\x11\x22\x33')) + pkts = sock.sniff(count=1, timeout=1) + +assert len(pkts) == 1, "Expected 1 packet, got %d" % len(pkts) +assert pkts[0].pgn == 0xFECA, "PGN mismatch: 0x%05X" % pkts[0].pgn +assert pkts[0].src == 0x01, "SA mismatch: 0x%02X" % pkts[0].src +assert pkts[0].data == b'\x11\x22\x33', "data mismatch: %r" % pkts[0].data + += J1939SoftSocket – single-frame receive (PDU1 unicast to our SA) + +with TestSocket(CAN) as cans, TestSocket(CAN) as stim: + cans.pair(stim) + with J1939SoftSocket(cans, src_addr=0x10) as sock: + # PF=0xEF (239 < 240, PDU1), PS=0x10 (our SA) -> unicast to us + stim.send(J1939_CAN(priority=6, data_page=0, pdu_format=0xEF, + pdu_specific=0x10, src=0x05, + data=b'\xAA\xBB\xCC')) + pkts = sock.sniff(count=1, timeout=1) + +assert len(pkts) == 1, "Expected 1 packet, got %d" % len(pkts) +assert pkts[0].src == 0x05 +assert pkts[0].dst == 0x10 +assert pkts[0].data == b'\xAA\xBB\xCC' + += J1939SoftSocket – single-frame receive ignored (unicast to different SA) +# Frame addressed to SA=0x20 must NOT be delivered when our SA is 0x10. + +with TestSocket(CAN) as cans, TestSocket(CAN) as stim: + cans.pair(stim) + with J1939SoftSocket(cans, src_addr=0x10) as sock: + stim.send(J1939_CAN(priority=6, data_page=0, pdu_format=0xEF, + pdu_specific=0x20, src=0x05, + data=b'\xAA\xBB\xCC')) + pkts = sock.sniff(count=1, timeout=0.3) + +assert len(pkts) == 0, "Frame not addressed to us should be ignored" + += J1939SoftSocket – single-frame send (broadcast PDU2) +# After calling send(), the underlying CAN socket must receive exactly one +# J1939_CAN frame with the correct PGN and source address. + +with TestSocket(CAN) as cans, TestSocket(CAN) as peer: + cans.pair(peer) + with J1939SoftSocket(cans, src_addr=0x00) as sock: + sock.send(J1939(b'\xAA\xBB', pgn=0xFECA, + dst=_socket.J1939_NO_ADDR, priority=6)) + pkts = peer.sniff(count=1, timeout=1) + +assert len(pkts) == 1, "Expected 1 CAN frame, got %d" % len(pkts) +j = J1939_CAN(bytes(pkts[0])) +assert j.pgn == 0xFECA, "PGN mismatch: 0x%05X" % j.pgn +assert j.src == 0x00, "SA mismatch: 0x%02X" % j.src +assert j.data == b'\xAA\xBB', "data mismatch: %r" % j.data + += J1939SoftSocket – single-frame send (PDU1 unicast) +# Unicast to DA=0x10: pdu_format encodes the PGN base, pdu_specific = DA. + +with TestSocket(CAN) as cans, TestSocket(CAN) as peer: + cans.pair(peer) + with J1939SoftSocket(cans, src_addr=0x01) as sock: + sock.send(J1939(b'\x01\x02\x03', pgn=0xEF00, dst=0x10, priority=6)) + pkts = peer.sniff(count=1, timeout=1) + +assert len(pkts) == 1 +j = J1939_CAN(bytes(pkts[0])) +assert j.pdu_format == 0xEF, "pf=0x%02X" % j.pdu_format +assert j.pdu_specific == 0x10, "ps=0x%02X" % j.pdu_specific +assert j.src == 0x01 +assert j.data == b'\x01\x02\x03' + += J1939SoftSocket – BAM multi-packet receive (20-byte payload, 3 TP.DT frames) + +_bam_payload = bytes(range(0x01, 0x15)) # 20 bytes -> 3 TP.DT + +with TestSocket(CAN) as cans, TestSocket(CAN) as stim: + cans.pair(stim) + with J1939SoftSocket(cans, src_addr=0x00) as sock: + bam = J1939_TP_CM_BAM(total_size=20, num_packets=3, pgn=0xFECA) + stim.send(J1939_CAN(priority=6, pdu_format=0xEC, pdu_specific=0xFF, + src=0x01, data=bytes(bam))) + time.sleep(0.05) + for seq in range(1, 4): + start = (seq - 1) * 7 + chunk = _bam_payload[start:start + 7] + chunk += b'\xff' * (7 - len(chunk)) + stim.send(J1939_CAN(priority=7, pdu_format=0xEB, pdu_specific=0xFF, + src=0x01, data=bytes(J1939_TP_DT(seq_num=seq, data=chunk)))) + time.sleep(0.01) + pkts = sock.sniff(count=1, timeout=2) + +assert len(pkts) == 1, "Expected 1 reassembled message, got %d" % len(pkts) +assert pkts[0].pgn == 0xFECA, "PGN mismatch" +assert pkts[0].src == 0x01, "SA mismatch" +assert pkts[0].data == _bam_payload, \ + "Payload mismatch: %r != %r" % (pkts[0].data, _bam_payload) + += J1939SoftSocket – BAM multi-packet send (20-byte payload, 3 TP.DT frames) +# The soft socket must emit: 1 TP.CM BAM + 3 TP.DT frames, with the correct +# wire encoding. + +_bam_tx_payload = bytes(range(0x01, 0x15)) # 20 bytes + +with TestSocket(CAN) as cans, TestSocket(CAN) as peer: + cans.pair(peer) + with J1939SoftSocket(cans, src_addr=0x00) as sock: + sock.send(J1939(_bam_tx_payload, pgn=0xFECA, + dst=_socket.J1939_NO_ADDR, priority=6)) + # 1 BAM + 3 DT frames; 50 ms delay between each -> ~200 ms total. + pkts = peer.sniff(count=4, timeout=3) + +assert len(pkts) == 4, "Expected 4 CAN frames (1 BAM + 3 DT), got %d" % len(pkts) + +j0 = J1939_CAN(bytes(pkts[0])) +assert j0.pdu_format == 0xEC, "Frame 0 must be TP.CM, pf=0x%02X" % j0.pdu_format +assert j0.pdu_specific == 0xFF, "BAM DA must be broadcast (0xFF)" +bam_decoded = J1939_TP_CM_BAM(j0.data) +assert bam_decoded.ctrl == J1939_TP_CTRL_BAM +assert bam_decoded.total_size == 20 +assert bam_decoded.num_packets == 3 +assert bam_decoded.pgn == 0xFECA + +_bam_tx_reassembled = b'' +for _i in range(1, 4): + _ji = J1939_CAN(bytes(pkts[_i])) + assert _ji.pdu_format == 0xEB, "Frame %d must be TP.DT, pf=0x%02X" % (_i, _ji.pdu_format) + assert _ji.pdu_specific == 0xFF, "BAM DT DA must be broadcast" + _dt = J1939_TP_DT(_ji.data) + assert _dt.seq_num == _i, "seq_num=%d expected %d" % (_dt.seq_num, _i) + _bam_tx_reassembled += _dt.data + +assert _bam_tx_reassembled[:20] == _bam_tx_payload, \ + "Reassembled payload mismatch: %r" % _bam_tx_reassembled[:20] + += J1939SoftSocket – BAM large payload (100 bytes, 15 TP.DT frames) + +_large_payload = bytes(range(100)) # 100 bytes -> ceil(100/7) = 15 TP.DT + +with TestSocket(CAN) as cans, TestSocket(CAN) as peer: + cans.pair(peer) + with J1939SoftSocket(cans, src_addr=0x00) as sock: + sock.send(J1939(_large_payload, pgn=0xFECA, + dst=_socket.J1939_NO_ADDR, priority=6)) + # 1 BAM + 15 DT with 50 ms spacing -> up to ~800 ms + pkts = peer.sniff(count=16, timeout=5) + +assert len(pkts) == 16, "Expected 16 frames (1 BAM + 15 DT), got %d" % len(pkts) +j0 = J1939_CAN(bytes(pkts[0])) +bam_large = J1939_TP_CM_BAM(j0.data) +assert bam_large.total_size == 100 +assert bam_large.num_packets == 15 + +_large_reassembled = b'' +for _i in range(1, 16): + _ji = J1939_CAN(bytes(pkts[_i])) + _dt = J1939_TP_DT(_ji.data) + _large_reassembled += _dt.data + +assert _large_reassembled[:100] == _large_payload, \ + "Large payload mismatch: %r" % _large_reassembled[:100] + += J1939SoftSocket – soft-to-soft BAM (sender J1939SoftSocket → receiver J1939SoftSocket) +# Two J1939SoftSocket instances connected through paired TestSockets. + +_s2s_payload = bytes(range(0x01, 0x15)) # 20 bytes + +with TestSocket(CAN) as cans1, TestSocket(CAN) as cans2: + cans1.pair(cans2) + with J1939SoftSocket(cans1, src_addr=0x01) as sender, \ + J1939SoftSocket(cans2, src_addr=0x02) as receiver: + sender.send(J1939(_s2s_payload, pgn=0xFECA, + dst=_socket.J1939_NO_ADDR, priority=6)) + pkts = receiver.sniff(count=1, timeout=3) + +assert len(pkts) == 1, "Expected 1 reassembled message, got %d" % len(pkts) +assert pkts[0].pgn == 0xFECA +assert pkts[0].src == 0x01 +assert pkts[0].data == _s2s_payload, \ + "Payload mismatch: %r != %r" % (pkts[0].data, _s2s_payload) + += J1939SoftSocket – soft-to-soft RTS/CTS unicast + +_rtc_payload = bytes(range(0x01, 0x10)) # 15 bytes -> 3 TP.DT + +with TestSocket(CAN) as cans1, TestSocket(CAN) as cans2: + cans1.pair(cans2) + with J1939SoftSocket(cans1, src_addr=0x01) as sender, \ + J1939SoftSocket(cans2, src_addr=0x02) as receiver: + # Unicast to receiver's SA -> triggers RTS/CTS + sender.send(J1939(_rtc_payload, pgn=0xEF00, dst=0x02, priority=6)) + # RTS -> CTS -> 3xDT -> ACK: allow up to 3 s + pkts = receiver.sniff(count=1, timeout=3) + +assert len(pkts) == 1, "Expected 1 RTS/CTS message, got %d" % len(pkts) +assert pkts[0].pgn == 0xEF00 +assert pkts[0].src == 0x01 +assert pkts[0].dst == 0x02 +assert pkts[0].data == _rtc_payload, \ + "RTS/CTS payload mismatch: %r != %r" % (pkts[0].data, _rtc_payload) + += J1939SoftSocket – RX sequence-number error triggers ABORT +# Deliver TP.DT frames out of sequence; the soft socket must abort. + +with TestSocket(CAN) as cans, TestSocket(CAN) as stim: + cans.pair(stim) + with J1939SoftSocket(cans, src_addr=0x02) as sock: + rts = J1939_TP_CM_RTS(total_size=14, num_packets=2, + max_packets=0xFF, pgn=0xEF00) + stim.send(J1939_CAN(priority=6, pdu_format=0xEC, pdu_specific=0x02, + src=0x01, data=bytes(rts))) + time.sleep(0.05) + stim.send(J1939_CAN(priority=7, pdu_format=0xEB, pdu_specific=0x02, + src=0x01, + data=bytes(J1939_TP_DT(seq_num=2, + data=b'\x01\x02\x03\x04\x05\x06\x07')))) + abort_frames = stim.sniff(count=5, timeout=1) + +_abort_found = False +for _af in abort_frames: + _aj = J1939_CAN(bytes(_af)) + if _aj.pdu_format == 0xEC and _aj.pdu_specific == 0x01: + _d = bytes(_aj.data) + if _d and _d[0] == J1939_TP_CTRL_ABORT: + _abort_found = True + +assert _abort_found, "Expected ABORT frame after bad seq number" + += J1939SoftSocket – RX timeout discards incomplete message +# Start a BAM session but deliver no DT; after T1 (750 ms) the session +# should be silently discarded and rx_state reset to idle. + +with TestSocket(CAN) as cans, TestSocket(CAN) as stim: + cans.pair(stim) + with J1939SoftSocket(cans, src_addr=0x00) as sock: + bam = J1939_TP_CM_BAM(total_size=14, num_packets=2, pgn=0xFECA) + stim.send(J1939_CAN(priority=6, pdu_format=0xEC, pdu_specific=0xFF, + src=0x03, data=bytes(bam))) + # Wait longer than T1 (750 ms); no DT delivered. + pkts = sock.sniff(count=1, timeout=1.5) + +assert len(pkts) == 0, "No message should be delivered after BAM timeout" + += J1939SoftSocket – send minimal valid packet is safe + +_safe_send_exc = None +with TestSocket(CAN) as cans: + with J1939SoftSocket(cans, src_addr=0x00) as sock: + try: + _send_ret = sock.send(J1939(b'\x00', pgn=0xFECA)) + except Exception as _e: + _safe_send_exc = _e + +assert _safe_send_exc is None, "send raised: %s" % _safe_send_exc + + +############ +############ ++ J1939SoftSocket ↔ NativeJ1939Socket interoperability tests +~ vcan_socket needs_root not_pypy + += Setup interoperability environment + +import os +import threading +from time import sleep +from subprocess import call + +_iop_setup_cmd = "/bin/bash -c 'sudo modprobe vcan; sudo ip link add name vcan0 type vcan 2>/dev/null; sudo ip link set dev vcan0 up'" +os.system(_iop_setup_cmd) # best-effort; vcan0 may already be up + +from scapy.contrib.cansocket_native import NativeCANSocket +from scapy.contrib.j1939 import NativeJ1939Socket + += J1939SoftSocket TX (broadcast) → NativeJ1939Socket RX +# Soft socket sends a short broadcast; native socket receives it. + +_iop1_payload = b'\x01\x02\x03\x04' +_iop1_pgn = 0xFECA +_iop1_sa = 0x30 + +_iop1_cansock = NativeCANSocket("vcan0") +_iop1_native_rx = NativeJ1939Socket("vcan0", promisc=True) +_iop1_native_rx.ins.settimeout(3.0) + +def _iop1_send(): + sleep(0.1) + with J1939SoftSocket(_iop1_cansock, src_addr=_iop1_sa) as s: + s.send(J1939(_iop1_payload, pgn=_iop1_pgn, + dst=_socket.J1939_NO_ADDR, priority=6)) + +_iop1_t = threading.Thread(target=_iop1_send) +_iop1_pkts = _iop1_native_rx.sniff(timeout=3.0, started_callback=_iop1_t.start, count=1) +_iop1_t.join(timeout=5) +_iop1_native_rx.close() + +assert _iop1_pkts, "NativeJ1939Socket received no packet from J1939SoftSocket" +_iop1_rx = _iop1_pkts[0] +assert _iop1_rx.data == _iop1_payload, \ + "Payload mismatch: %r != %r" % (_iop1_rx.data, _iop1_payload) +assert _iop1_rx.pgn == _iop1_pgn, "PGN mismatch: 0x%X" % _iop1_rx.pgn +assert _iop1_rx.src == _iop1_sa, "SA mismatch: 0x%X" % _iop1_rx.src + += NativeJ1939Socket TX (broadcast) → J1939SoftSocket RX +# Native socket sends a short broadcast; soft socket receives and decodes it. + +_iop2_payload = b'\x05\x06\x07\x08' +_iop2_pgn = 0xFECA +_iop2_sa = 0x31 + +_iop2_cansock = NativeCANSocket("vcan0") +_iop2_soft_rx = J1939SoftSocket(_iop2_cansock, src_addr=0x00) +_iop2_native_tx = NativeJ1939Socket("vcan0", src_addr=_iop2_sa, promisc=False) + +def _iop2_send(): + sleep(0.1) + _iop2_native_tx.send( + J1939(_iop2_payload, pgn=_iop2_pgn, + src=_iop2_sa, dst=_socket.J1939_NO_ADDR)) + +_iop2_t = threading.Thread(target=_iop2_send) +_iop2_pkts = _iop2_soft_rx.sniff(timeout=3.0, started_callback=_iop2_t.start, count=1) +_iop2_t.join(timeout=5) +_iop2_native_tx.close() +_iop2_soft_rx.close() + +assert _iop2_pkts, "J1939SoftSocket received no packet from NativeJ1939Socket" +_iop2_rx = _iop2_pkts[0] +assert _iop2_rx.data == _iop2_payload, \ + "Payload mismatch: %r != %r" % (_iop2_rx.data, _iop2_payload) +assert _iop2_rx.pgn == _iop2_pgn, "PGN mismatch: 0x%X" % _iop2_rx.pgn +assert _iop2_rx.src == _iop2_sa, "SA mismatch: 0x%X" % _iop2_rx.src + += J1939SoftSocket TX (BAM, long message) → NativeJ1939Socket RX +# Soft socket sends a 20-byte message via BAM; the kernel J1939 stack +# reassembles it and delivers a single complete message to the native socket. + +_iop3_payload = bytes(range(0x01, 0x15)) # 20 bytes -> BAM + 3 TP.DT +_iop3_pgn = 0xFECA +_iop3_sa = 0x32 + +_iop3_cansock = NativeCANSocket("vcan0") +_iop3_native_rx = NativeJ1939Socket("vcan0", promisc=True) +_iop3_native_rx.ins.settimeout(5.0) + +def _iop3_send(): + sleep(0.1) + with J1939SoftSocket(_iop3_cansock, src_addr=_iop3_sa) as s: + s.send(J1939(_iop3_payload, pgn=_iop3_pgn, + dst=_socket.J1939_NO_ADDR, priority=6)) + +_iop3_t = threading.Thread(target=_iop3_send) +# The kernel reassembles BAM; snap until we see the full message. +_iop3_pkts = _iop3_native_rx.sniff(timeout=5.0, started_callback=_iop3_t.start, count=1) +_iop3_t.join(timeout=10) +_iop3_native_rx.close() + +assert _iop3_pkts, "NativeJ1939Socket received no BAM message from J1939SoftSocket" +_iop3_rx = _iop3_pkts[0] +assert _iop3_rx.data == _iop3_payload, \ + "Payload mismatch: %r != %r" % (_iop3_rx.data, _iop3_payload) +assert _iop3_rx.pgn == _iop3_pgn, "PGN mismatch: 0x%X" % _iop3_rx.pgn +assert _iop3_rx.src == _iop3_sa, "SA mismatch: 0x%X" % _iop3_rx.src + += NativeJ1939Socket TX (BAM, long message) → J1939SoftSocket RX +# Native socket sends a 20-byte broadcast; the soft socket must reassemble +# the BAM sequence and deliver the complete payload. + +_iop4_payload = bytes(range(0x14, 0x28)) # 20 bytes +_iop4_pgn = 0xFECA +_iop4_sa = 0x33 + +_iop4_cansock = NativeCANSocket("vcan0") +_iop4_soft_rx = J1939SoftSocket(_iop4_cansock, src_addr=0x00) +_iop4_native_tx = NativeJ1939Socket("vcan0", src_addr=_iop4_sa, promisc=False) + +def _iop4_send(): + sleep(0.1) + _iop4_native_tx.send( + J1939(_iop4_payload, pgn=_iop4_pgn, + src=_iop4_sa, dst=_socket.J1939_NO_ADDR)) + +_iop4_t = threading.Thread(target=_iop4_send) +_iop4_pkts = _iop4_soft_rx.sniff(timeout=5.0, started_callback=_iop4_t.start, count=1) +_iop4_t.join(timeout=10) +_iop4_native_tx.close() +_iop4_soft_rx.close() + +assert _iop4_pkts, "J1939SoftSocket received no reassembled BAM message" +_iop4_rx = _iop4_pkts[0] +assert _iop4_rx.data == _iop4_payload, \ + "Payload mismatch: %r != %r" % (_iop4_rx.data, _iop4_payload) +assert _iop4_rx.pgn == _iop4_pgn, "PGN mismatch: 0x%X" % _iop4_rx.pgn +assert _iop4_rx.src == _iop4_sa, "SA mismatch: 0x%X" % _iop4_rx.src + + +############ +############ ++ J1939SoftSocket – additional edge-case unit tests +~ not_pypy + += J1939SoftSocket – 8-byte payload is sent as single CAN frame (no TP) +# J1939-21: payloads ≤ 8 bytes must use a single CAN frame; no TP.CM/TP.DT. + +with TestSocket(CAN) as cans, TestSocket(CAN) as peer: + cans.pair(peer) + with J1939SoftSocket(cans, src_addr=0x01) as sock: + _sf8_payload = bytes(range(8)) # exactly 8 bytes + sock.send(J1939(_sf8_payload, pgn=0xFECA, + dst=_socket.J1939_NO_ADDR, priority=6)) + _sf8_pkts = peer.sniff(count=2, timeout=1) + +# Exactly 1 CAN frame; no TP.CM preamble. +assert len(_sf8_pkts) == 1, \ + "8-byte payload should be 1 CAN frame, got %d" % len(_sf8_pkts) +_sf8_j = J1939_CAN(bytes(_sf8_pkts[0])) +assert _sf8_j.pdu_format != 0xEC, \ + "No TP.CM should be emitted for an 8-byte payload" +assert _sf8_j.data == _sf8_payload, \ + "Data mismatch: %r != %r" % (_sf8_j.data, _sf8_payload) + += J1939SoftSocket – 9-byte payload triggers BAM with exactly 2 TP.DT frames +# 9 bytes / 7 = 2 DT frames (first full, second has 2 bytes + 5 padding). + +with TestSocket(CAN) as cans, TestSocket(CAN) as peer: + cans.pair(peer) + with J1939SoftSocket(cans, src_addr=0x01) as sock: + _tp9_payload = bytes(range(9)) # 9 bytes → 2 TP.DT + sock.send(J1939(_tp9_payload, pgn=0xFECA, + dst=_socket.J1939_NO_ADDR, priority=6)) + _tp9_pkts = peer.sniff(count=3, timeout=2) + +assert len(_tp9_pkts) == 3, \ + "9-byte payload: expected 3 frames (BAM + 2 DT), got %d" % len(_tp9_pkts) +_tp9_bam = J1939_TP_CM_BAM(J1939_CAN(bytes(_tp9_pkts[0])).data) +assert _tp9_bam.total_size == 9, "BAM.total_size=%d" % _tp9_bam.total_size +assert _tp9_bam.num_packets == 2, "BAM.num_packets=%d" % _tp9_bam.num_packets + +_tp9_dt1 = J1939_TP_DT(J1939_CAN(bytes(_tp9_pkts[1])).data) +_tp9_dt2 = J1939_TP_DT(J1939_CAN(bytes(_tp9_pkts[2])).data) +assert _tp9_dt1.seq_num == 1 +assert _tp9_dt2.seq_num == 2 +_tp9_reassembled = (_tp9_dt1.data + _tp9_dt2.data)[:9] +assert _tp9_reassembled == _tp9_payload, \ + "9-byte reassembly mismatch: %r" % _tp9_reassembled + += J1939SoftSocket – 9-byte BAM receive and reassembly + +with TestSocket(CAN) as cans, TestSocket(CAN) as stim: + cans.pair(stim) + with J1939SoftSocket(cans, src_addr=0x00) as sock: + _rxtp9_payload = bytes(range(9)) + _rxtp9_bam = J1939_TP_CM_BAM(total_size=9, num_packets=2, pgn=0xFECA) + stim.send(J1939_CAN(priority=6, pdu_format=0xEC, pdu_specific=0xFF, + src=0x05, data=bytes(_rxtp9_bam))) + time.sleep(0.05) + stim.send(J1939_CAN(priority=7, pdu_format=0xEB, pdu_specific=0xFF, + src=0x05, + data=bytes(J1939_TP_DT(seq_num=1, + data=_rxtp9_payload[:7])))) + time.sleep(0.01) + _rxtp9_chunk2 = _rxtp9_payload[7:] + b'\xff' * 5 + stim.send(J1939_CAN(priority=7, pdu_format=0xEB, pdu_specific=0xFF, + src=0x05, + data=bytes(J1939_TP_DT(seq_num=2, + data=_rxtp9_chunk2)))) + _rxtp9_pkts = sock.sniff(count=1, timeout=2) + +assert len(_rxtp9_pkts) == 1, \ + "Expected 1 reassembled message, got %d" % len(_rxtp9_pkts) +assert _rxtp9_pkts[0].data == _rxtp9_payload, \ + "9-byte RX mismatch: %r" % _rxtp9_pkts[0].data + += J1939SoftSocket – 14-byte payload: exactly 2 TP.DT frames +# 14 bytes / 7 = 2 full TP.DT frames (no padding needed). + +with TestSocket(CAN) as cans, TestSocket(CAN) as peer: + cans.pair(peer) + with J1939SoftSocket(cans, src_addr=0x01) as sock: + _bam2_payload = bytes(range(14)) # 14 bytes → exactly 2 TP.DT + sock.send(J1939(_bam2_payload, pgn=0xFECA, + dst=_socket.J1939_NO_ADDR, priority=6)) + _bam2_pkts = peer.sniff(count=3, timeout=2) + +assert len(_bam2_pkts) == 3, \ + "14-byte payload: expected 3 frames (BAM + 2 DT), got %d" % len(_bam2_pkts) +_bam2_cm = J1939_TP_CM_BAM(J1939_CAN(bytes(_bam2_pkts[0])).data) +assert _bam2_cm.total_size == 14, "BAM.total_size=%d" % _bam2_cm.total_size +assert _bam2_cm.num_packets == 2, "BAM.num_packets=%d" % _bam2_cm.num_packets +_bam2_dt1 = J1939_TP_DT(J1939_CAN(bytes(_bam2_pkts[1])).data) +_bam2_dt2 = J1939_TP_DT(J1939_CAN(bytes(_bam2_pkts[2])).data) +assert _bam2_dt1.seq_num == 1 +assert _bam2_dt2.seq_num == 2 +# Both DT frames are fully used (no padding bytes needed for 14 bytes) +assert _bam2_dt1.data == _bam2_payload[:7], \ + "DT1 data mismatch: %r" % _bam2_dt1.data +assert _bam2_dt2.data == _bam2_payload[7:], \ + "DT2 data mismatch: %r" % _bam2_dt2.data + += J1939SoftSocket – new BAM from same peer overwrites incomplete session +# J1939-21 allows the sender to restart a BAM session; the receiver resets. + +with TestSocket(CAN) as cans, TestSocket(CAN) as stim: + cans.pair(stim) + with J1939SoftSocket(cans, src_addr=0x00) as sock: + # First BAM (never completed) + stim.send(J1939_CAN(priority=6, pdu_format=0xEC, pdu_specific=0xFF, + src=0x07, + data=bytes(J1939_TP_CM_BAM(total_size=14, + num_packets=2, + pgn=0xFECA)))) + time.sleep(0.02) + # Second BAM (1-DT message) overwrites the first session + _owrt_payload = bytes(range(1, 8)) + stim.send(J1939_CAN(priority=6, pdu_format=0xEC, pdu_specific=0xFF, + src=0x07, + data=bytes(J1939_TP_CM_BAM(total_size=7, + num_packets=1, + pgn=0xFECA)))) + time.sleep(0.02) + # Now deliver the DT for the second BAM session + stim.send(J1939_CAN(priority=7, pdu_format=0xEB, pdu_specific=0xFF, + src=0x07, + data=bytes(J1939_TP_DT(seq_num=1, + data=_owrt_payload)))) + _owrt_pkts = sock.sniff(count=1, timeout=2) + +assert len(_owrt_pkts) == 1, \ + "Expected 1 reassembled message after BAM overwrite, got %d" % len(_owrt_pkts) +assert _owrt_pkts[0].data == _owrt_payload, \ + "Overwrite BAM data mismatch: %r" % _owrt_pkts[0].data + += J1939SoftSocket – TP.DT from wrong SA is ignored during active BAM session +# During a BAM from SA=0x07, TP.DT from SA=0x08 must be dropped. + +with TestSocket(CAN) as cans, TestSocket(CAN) as stim: + cans.pair(stim) + with J1939SoftSocket(cans, src_addr=0x00) as sock: + _wrongsa_payload = bytes(range(1, 8)) + # Start a BAM from SA=0x07 (1 DT needed) + stim.send(J1939_CAN(priority=6, pdu_format=0xEC, pdu_specific=0xFF, + src=0x07, + data=bytes(J1939_TP_CM_BAM(total_size=7, + num_packets=1, + pgn=0xFECA)))) + time.sleep(0.02) + # Inject a DT from a DIFFERENT SA=0x08 (must be ignored) + stim.send(J1939_CAN(priority=7, pdu_format=0xEB, pdu_specific=0xFF, + src=0x08, + data=bytes(J1939_TP_DT(seq_num=1, + data=_wrongsa_payload)))) + # Correct DT from SA=0x07 must still be accepted + stim.send(J1939_CAN(priority=7, pdu_format=0xEB, pdu_specific=0xFF, + src=0x07, + data=bytes(J1939_TP_DT(seq_num=1, + data=_wrongsa_payload)))) + _wrongsa_pkts = sock.sniff(count=1, timeout=2) + +assert len(_wrongsa_pkts) == 1, \ + "Expected 1 message (DT from wrong SA dropped), got %d" % len(_wrongsa_pkts) +assert _wrongsa_pkts[0].data == _wrongsa_payload, \ + "Wrong-SA test data mismatch: %r" % _wrongsa_pkts[0].data +assert _wrongsa_pkts[0].src == 0x07, \ + "Message src should be 0x07, got 0x%02X" % _wrongsa_pkts[0].src + += J1939SoftSocket – priority preserved on single-frame TX +# J1939-21: the priority field in the CAN ID must match the one in J1939.priority. + +for _prio_val in [0, 3, 6, 7]: + with TestSocket(CAN) as cans, TestSocket(CAN) as peer: + cans.pair(peer) + with J1939SoftSocket(cans, src_addr=0x01) as sock: + sock.send(J1939(b'\xAA', pgn=0xFECA, + dst=_socket.J1939_NO_ADDR, priority=_prio_val)) + _prio_pkts = peer.sniff(count=1, timeout=1) + assert len(_prio_pkts) == 1, \ + "Priority %d: expected 1 frame" % _prio_val + _prio_j = J1939_CAN(bytes(_prio_pkts[0])) + assert _prio_j.priority == _prio_val, \ + "Priority mismatch: got %d, expected %d" % (_prio_j.priority, _prio_val) + += J1939SoftSocket – BAM with data_page=1 (PGN in the 0x1xxxx range) +# For PGNs > 0xFFFF the data_page bit is set. The TP.CM (BAM) CAN frame itself +# always uses PGN 0xEC00 (data_page=0 in the CAN ID); the full transported PGN +# including the data_page bit is encoded inside the BAM payload's pgn field. + +_dp1_pgn = 0x1FECA # data_page=1, pf=0xFE, ps=0xCA +_dp1_payload = bytes(range(1, 12)) # 11 bytes → 2 TP.DT + +with TestSocket(CAN) as cans, TestSocket(CAN) as peer: + cans.pair(peer) + with J1939SoftSocket(cans, src_addr=0x01) as sock: + sock.send(J1939(_dp1_payload, pgn=_dp1_pgn, + dst=_socket.J1939_NO_ADDR, priority=6)) + _dp1_pkts = peer.sniff(count=3, timeout=2) + +assert len(_dp1_pkts) == 3, \ + "DP1 BAM: expected 3 frames, got %d" % len(_dp1_pkts) +# The TP.CM frame uses PGN 0xEC00, so its data_page is always 0 in the CAN ID. +_dp1_bam_j = J1939_CAN(bytes(_dp1_pkts[0])) +assert _dp1_bam_j.pdu_format == 0xEC, \ + "Expected TP.CM frame, pf=0x%02X" % _dp1_bam_j.pdu_format +# The transported PGN (with data_page bit) is carried inside the BAM payload. +_dp1_bam_cm = J1939_TP_CM_BAM(_dp1_bam_j.data) +assert _dp1_bam_cm.pgn == _dp1_pgn, \ + "BAM PGN mismatch: 0x%05X != 0x%05X" % (_dp1_bam_cm.pgn, _dp1_pgn) +assert _dp1_bam_cm.total_size == 11, \ + "BAM total_size=%d" % _dp1_bam_cm.total_size +assert _dp1_bam_cm.num_packets == 2, \ + "BAM num_packets=%d" % _dp1_bam_cm.num_packets + +_dp1_reassembled = (J1939_TP_DT(J1939_CAN(bytes(_dp1_pkts[1])).data).data + + J1939_TP_DT(J1939_CAN(bytes(_dp1_pkts[2])).data).data)[:11] +assert _dp1_reassembled == _dp1_payload, \ + "DP1 payload mismatch: %r" % _dp1_reassembled + += J1939SoftSocket – multiple sequential messages through the same socket +# Send three independent messages one after another; all must be received in order. + +_seq_msgs = [ + (b'\x01', 0xFECA, _socket.J1939_NO_ADDR), # 1-byte broadcast + (bytes(range(7)), 0xFECA, _socket.J1939_NO_ADDR), # 7-byte broadcast (still single frame? no, wait 7 > 8? No, 7 <= 8) + (bytes(range(10)), 0xFECA, _socket.J1939_NO_ADDR), # 10-byte -> BAM +] + +with TestSocket(CAN) as cans1, TestSocket(CAN) as cans2: + cans1.pair(cans2) + with J1939SoftSocket(cans1, src_addr=0x01) as sender, \ + J1939SoftSocket(cans2, src_addr=0x02) as receiver: + for _sq_data, _sq_pgn, _sq_dst in _seq_msgs: + sender.send(J1939(_sq_data, pgn=_sq_pgn, dst=_sq_dst, priority=6)) + # Two single frames + 1 BAM (2 DT) = 5 CAN frames, 3 reassembled msgs. + _seq_pkts = receiver.sniff(count=3, timeout=5) + +assert len(_seq_pkts) == 3, \ + "Sequential messages: expected 3, got %d" % len(_seq_pkts) +assert _seq_pkts[0].data == _seq_msgs[0][0], \ + "Msg 0 mismatch: %r" % _seq_pkts[0].data +assert _seq_pkts[1].data == _seq_msgs[1][0], \ + "Msg 1 mismatch: %r" % _seq_pkts[1].data +assert _seq_pkts[2].data == _seq_msgs[2][0], \ + "Msg 2 mismatch: %r" % _seq_pkts[2].data + += J1939SoftSocket – CTS hold (num_packets=0): sender pauses until next CTS +# Simulate a receiver that first sends CTS(0) (hold) then CTS(num_packets). + +import threading as _threading_hold + +with TestSocket(CAN) as cans, TestSocket(CAN) as stim: + cans.pair(stim) + with J1939SoftSocket(cans, src_addr=0x01) as sock: + _hold_payload = bytes(range(14)) + def _hold_bg_send(): + sock.send(J1939(_hold_payload, pgn=0xEF00, dst=0x02, priority=6)) + _hold_t = _threading_hold.Thread(target=_hold_bg_send) + _hold_t.start() + _hold_rts_frames = stim.sniff(count=1, timeout=2) + assert _hold_rts_frames, "No RTS received" + _hold_rts_j = J1939_CAN(bytes(_hold_rts_frames[0])) + assert _hold_rts_j.pdu_format == 0xEC + _hold_rts_cm = J1939_TP_CM_RTS(bytes(_hold_rts_j.data)) + assert _hold_rts_cm.ctrl == J1939_TP_CTRL_RTS + stim.send(J1939_CAN(priority=6, pdu_format=0xEC, pdu_specific=0x01, + src=0x02, + data=bytes(J1939_TP_CM_CTS(num_packets=0, + next_packet=1, + pgn=0xEF00)))) + _hold_dt_early = stim.sniff(count=1, timeout=0.3) + assert len(_hold_dt_early) == 0, \ + "Sender must not send DT during CTS hold, got %d frames" % len(_hold_dt_early) + stim.send(J1939_CAN(priority=6, pdu_format=0xEC, pdu_specific=0x01, + src=0x02, + data=bytes(J1939_TP_CM_CTS(num_packets=2, + next_packet=1, + pgn=0xEF00)))) + _hold_dt_frames = stim.sniff(count=2, timeout=2) + assert len(_hold_dt_frames) == 2, \ + "Expected 2 DT frames after CTS release, got %d" % len(_hold_dt_frames) + for _hdi, _hdf in enumerate(_hold_dt_frames): + _hdj = J1939_CAN(bytes(_hdf)) + assert _hdj.pdu_format == 0xEB, \ + "Frame %d must be TP.DT, got pf=0x%02X" % (_hdi, _hdj.pdu_format) + assert J1939_TP_DT(_hdj.data).seq_num == _hdi + 1 + stim.send(J1939_CAN(priority=6, pdu_format=0xEC, pdu_specific=0x01, + src=0x02, + data=bytes(J1939_TP_CM_ACK(total_size=14, + num_packets=2, + pgn=0xEF00)))) + _hold_t.join(timeout=5) + += J1939SoftSocket – TX timeout: no CTS after RTS → sender resets to IDLE +# After _J1939_TP_T3 (1.25 s) without a CTS the TX state machine must +# discard the session and accept a fresh message. + +with TestSocket(CAN) as cans, TestSocket(CAN) as stim: + cans.pair(stim) + with J1939SoftSocket(cans, src_addr=0x01) as sock: + # Unicast send → triggers RTS + sock.send(J1939(bytes(range(9)), pgn=0xEF00, dst=0x02, priority=6)) + # Consume the RTS; respond with nothing (simulate dead peer) + _to_rts = stim.sniff(count=1, timeout=2) + assert _to_rts, "No RTS received" + # Wait > T3 = 1.25 s for the TX state machine to reset + time.sleep(1.4) + # After timeout, the socket should accept a new single-frame message + sock.send(J1939(b'\xAB', pgn=0xFECA, + dst=_socket.J1939_NO_ADDR, priority=6)) + _to_pkts = stim.sniff(count=1, timeout=2) + +assert len(_to_pkts) == 1, \ + "After TX timeout, new message should be sent; got %d frames" % len(_to_pkts) +_to_j = J1939_CAN(bytes(_to_pkts[0])) +assert _to_j.pdu_format != 0xEC, \ + "Frame after TX timeout must not be a TP.CM (got pf=0x%02X)" % _to_j.pdu_format +assert _to_j.data == b'\xAB', \ + "Data mismatch after TX timeout: %r" % _to_j.data + += J1939SoftSocket – 255-byte payload (maximum non-255-DT single BAM) +# 255 bytes → ceil(255/7) = 37 TP.DT frames. + +_big_payload = bytes(range(255)) +_big_npkts = (255 + 6) // 7 # = 37 + +with TestSocket(CAN) as cans, TestSocket(CAN) as peer: + cans.pair(peer) + with J1939SoftSocket(cans, src_addr=0x01) as sock: + sock.send(J1939(_big_payload, pgn=0xFECA, + dst=_socket.J1939_NO_ADDR, priority=6)) + # 1 BAM + 37 DT frames; 50 ms spacing → up to ~1.9 s + _big_pkts = peer.sniff(count=_big_npkts + 1, timeout=5) + +assert len(_big_pkts) == _big_npkts + 1, \ + "255-byte BAM: expected %d frames, got %d" % (_big_npkts + 1, len(_big_pkts)) +_big_bam = J1939_TP_CM_BAM(J1939_CAN(bytes(_big_pkts[0])).data) +assert _big_bam.total_size == 255 +assert _big_bam.num_packets == _big_npkts + +_big_reassembled = b''.join( + J1939_TP_DT(J1939_CAN(bytes(_big_pkts[i])).data).data + for i in range(1, _big_npkts + 1) +)[:255] +assert _big_reassembled == _big_payload, \ + "255-byte reassembly mismatch at index %d" % next( + (i for i in range(255) if _big_reassembled[i] != _big_payload[i]), -1) + += J1939SoftSocket – receive-after-close returns None without raising + +_rac_cansock = TestSocket(CAN) +_rac_sock = J1939SoftSocket(_rac_cansock, src_addr=0x00) +_rac_sock.close() +_rac_cansock.close() +_rac_result = _rac_sock.recv() +assert _rac_result is None, "recv() on closed socket should return None, got %r" % _rac_result + += J1939SoftSocket – loopback echo suppression: own frames not delivered +# A frame with src == our SA must be silently discarded. + +with TestSocket(CAN) as cans, TestSocket(CAN) as stim: + cans.pair(stim) + with J1939SoftSocket(cans, src_addr=0x05) as sock: + # Inject a frame that looks like it came from our own SA + stim.send(J1939_CAN(priority=6, pdu_format=0xFE, pdu_specific=0xCA, + src=0x05, # == sock.src_addr + data=b'\x11\x22')) + _echo_pkts = sock.sniff(count=1, timeout=0.3) + +assert len(_echo_pkts) == 0, \ + "Own-address frame must not be delivered (loopback suppression)" + += J1939SoftSocket – soft-to-soft RTS/CTS with 49-byte payload (7 DT frames) +# Test a larger RTS/CTS session fully handled between two soft sockets. + +_large_rts_payload = bytes(range(49)) # 49 bytes → ceil(49/7) = 7 TP.DT + +with TestSocket(CAN) as cans1, TestSocket(CAN) as cans2: + cans1.pair(cans2) + with J1939SoftSocket(cans1, src_addr=0x10) as sender, \ + J1939SoftSocket(cans2, src_addr=0x20) as receiver: + sender.send(J1939(_large_rts_payload, pgn=0xEF00, dst=0x20, priority=6)) + # RTS → CTS → 7×DT → ACK; allow enough time + _large_rts_pkts = receiver.sniff(count=1, timeout=5) + +assert len(_large_rts_pkts) == 1, \ + "Large RTS/CTS: expected 1 msg, got %d" % len(_large_rts_pkts) +assert _large_rts_pkts[0].data == _large_rts_payload, \ + "Large RTS/CTS payload mismatch: %r" % _large_rts_pkts[0].data +assert _large_rts_pkts[0].src == 0x10 +assert _large_rts_pkts[0].dst == 0x20 + += J1939SoftSocket – listen_only: RTS does not elicit a CTS response +# When listen_only=True the implementation must not send CTS or ACK frames, +# allowing pure passive capture of TP sessions. + +with TestSocket(CAN) as cans, TestSocket(CAN) as stim: + cans.pair(stim) + with J1939SoftSocket(cans, src_addr=0x00, listen_only=True) as sock: + _lo_pgn = 0xEF00 + stim.send(J1939_CAN(priority=6, pdu_format=0xEC, pdu_specific=0x00, + src=0x07, + data=bytes(J1939_TP_CM_RTS(total_size=9, + num_packets=2, + pgn=_lo_pgn)))) + _lo_cts_frames = stim.sniff(count=1, timeout=0.3) + +assert len(_lo_cts_frames) == 0, \ + "listen_only: RTS must not elicit a CTS, got %d frame(s)" % len(_lo_cts_frames) + += J1939SoftSocket – listen_only: BAM session still reassembled passively +# Even in listen_only mode, received BAM TP.DT frames must be reassembled and +# delivered to the application; the socket just never sends back control frames. + +with TestSocket(CAN) as cans, TestSocket(CAN) as stim: + cans.pair(stim) + with J1939SoftSocket(cans, src_addr=0x00, listen_only=True) as sock: + _lo_bam_payload = bytes(range(1, 10)) + stim.send(J1939_CAN(priority=6, pdu_format=0xEC, pdu_specific=0xFF, + src=0x09, + data=bytes(J1939_TP_CM_BAM(total_size=9, + num_packets=2, + pgn=0xFECA)))) + time.sleep(0.05) + stim.send(J1939_CAN(priority=7, pdu_format=0xEB, pdu_specific=0xFF, + src=0x09, + data=bytes(J1939_TP_DT(seq_num=1, + data=_lo_bam_payload[:7])))) + time.sleep(0.01) + stim.send(J1939_CAN(priority=7, pdu_format=0xEB, pdu_specific=0xFF, + src=0x09, + data=bytes(J1939_TP_DT(seq_num=2, + data=_lo_bam_payload[7:] + b'\xff' * 5)))) + _lo_bam_pkts = sock.sniff(count=1, timeout=2) + +assert len(_lo_bam_pkts) == 1, \ + "listen_only BAM: expected 1 reassembled msg, got %d" % len(_lo_bam_pkts) +assert _lo_bam_pkts[0].data == _lo_bam_payload, \ + "listen_only BAM payload mismatch: %r" % _lo_bam_pkts[0].data +assert _lo_bam_pkts[0].src == 0x09, \ + "listen_only BAM src mismatch: 0x%02X" % _lo_bam_pkts[0].src + += J1939SoftSocket – listen_only: RTS/CTS session reassembled without sending ACK +# listen_only socket passively receives unicast TP.DT frames and reassembles +# without ever sending EndOfMsgACK back to the sender. + +with TestSocket(CAN) as cans, TestSocket(CAN) as stim: + cans.pair(stim) + with J1939SoftSocket(cans, src_addr=0x00, listen_only=True) as sock: + _lo_rts_payload = bytes(range(1, 10)) + stim.send(J1939_CAN(priority=6, pdu_format=0xEC, pdu_specific=0x00, + src=0x0A, + data=bytes(J1939_TP_CM_RTS(total_size=9, + num_packets=2, + pgn=0xEF00)))) + time.sleep(0.05) + stim.send(J1939_CAN(priority=7, pdu_format=0xEB, pdu_specific=0x00, + src=0x0A, + data=bytes(J1939_TP_DT(seq_num=1, + data=_lo_rts_payload[:7])))) + time.sleep(0.01) + stim.send(J1939_CAN(priority=7, pdu_format=0xEB, pdu_specific=0x00, + src=0x0A, + data=bytes(J1939_TP_DT(seq_num=2, + data=_lo_rts_payload[7:] + b'\xff' * 5)))) + _lo_rts_pkts = sock.sniff(count=1, timeout=2) + _lo_ack_frames = stim.sniff(count=1, timeout=0.2) + +assert len(_lo_rts_pkts) == 1, \ + "listen_only RTS/CTS: expected 1 reassembled msg, got %d" % len(_lo_rts_pkts) +assert _lo_rts_pkts[0].data == _lo_rts_payload, \ + "listen_only RTS payload mismatch: %r" % _lo_rts_pkts[0].data +assert len(_lo_ack_frames) == 0, \ + "listen_only: must not send EndOfMsgACK, got %d frame(s)" % len(_lo_ack_frames) + += J1939SoftSocket – inactivity timeout: incomplete BAM resets state machine +# After the TP.DT inactivity timeout (T2 × extension factor) with no DT +# frames arriving, the state machine must reset to IDLE and accept new messages. +# We shorten the timeout extension window to make the test run in < 3 s. +# (T2 = 1.25 s, extension factor = 10 → total default = 12.5 s; we override +# the internal timeout to 0.1 s so total = 1.0 s.) + +import scapy.contrib.j1939 as _j1939_mod +_saved_T1 = _j1939_mod._J1939_TP_T1 +_saved_T2 = _j1939_mod._J1939_TP_T2 +_j1939_mod._J1939_TP_T1 = 0.1 +_j1939_mod._J1939_TP_T2 = 0.1 + +with TestSocket(CAN) as cans, TestSocket(CAN) as stim: + cans.pair(stim) + with J1939SoftSocket(cans, src_addr=0x00) as sock: + stim.send(J1939_CAN(priority=6, pdu_format=0xEC, pdu_specific=0xFF, + src=0x0B, + data=bytes(J1939_TP_CM_BAM(total_size=9, + num_packets=2, + pgn=0xFECA)))) + time.sleep(1.4) + _j1939_mod._J1939_TP_T1 = _saved_T1 + _j1939_mod._J1939_TP_T2 = _saved_T2 + stim.send(J1939_CAN(priority=6, pdu_format=0xFE, pdu_specific=0xCA, + src=0x0B, data=b'\x42')) + _timeout_pkts = sock.sniff(count=1, timeout=1) + +assert len(_timeout_pkts) == 1, \ + "After inactivity timeout state should be IDLE; new msg not received" +assert _timeout_pkts[0].data == b'\x42', \ + "Post-timeout message mismatch: %r" % _timeout_pkts[0].data + += J1939SoftSocket – pgn filter: matching PGN is delivered +# pgn=0xFECA → only 0xFECA frames are delivered; 0xFECB is ignored. + +with TestSocket(CAN) as cans, TestSocket(CAN) as stim: + cans.pair(stim) + with J1939SoftSocket(cans, src_addr=0x00, pgn=0xFECA) as sock: + # Should be delivered (PGN matches) + stim.send(J1939_CAN(priority=6, pdu_format=0xFE, pdu_specific=0xCA, + src=0x01, data=b'\x01\x02\x03')) + # Should be silently dropped (PGN does not match filter) + stim.send(J1939_CAN(priority=6, pdu_format=0xFE, pdu_specific=0xCB, + src=0x01, data=b'\x04\x05\x06')) + _pgn_pkts = sock.sniff(count=2, timeout=0.5) + +assert len(_pgn_pkts) == 1, \ + "pgn filter: expected 1 packet, got %d" % len(_pgn_pkts) +assert _pgn_pkts[0].pgn == 0xFECA, \ + "pgn filter: wrong PGN 0x%05X" % _pgn_pkts[0].pgn + += J1939SoftSocket – pgn filter: BAM with non-matching PGN is silently dropped +# When pgn=0xFECA, a BAM announcing PGN=0xFECB must be completely ignored. + +with TestSocket(CAN) as cans, TestSocket(CAN) as stim: + cans.pair(stim) + with J1939SoftSocket(cans, src_addr=0x00, pgn=0xFECA) as sock: + # BAM for PGN 0xFECB – should NOT be accepted + _bam_bad = J1939_TP_CM_BAM(total_size=9, num_packets=2, pgn=0xFECB) + stim.send(J1939_CAN(priority=6, pdu_format=0xEC, pdu_specific=0xFF, + src=0x02, data=bytes(_bam_bad))) + time.sleep(0.05) + stim.send(J1939_CAN(priority=6, pdu_format=0xEB, pdu_specific=0xFF, + src=0x02, + data=bytes(J1939_TP_DT(seq_num=1, + data=b'\x01' * 7)))) + stim.send(J1939_CAN(priority=6, pdu_format=0xEB, pdu_specific=0xFF, + src=0x02, + data=bytes(J1939_TP_DT(seq_num=2, + data=b'\x02' * 7)))) + _pgn_bam_dropped = sock.sniff(count=1, timeout=0.5) + +assert len(_pgn_bam_dropped) == 0, \ + "pgn filter: BAM for non-matching PGN should be dropped, got %d packet(s)" \ + % len(_pgn_bam_dropped) + += J1939SoftSocket – pgn=0 accepts all PGNs (default accept-all behaviour) +# BenGardiner's rx_pgn=0 means "accept all"; our pgn=0 (the default) must +# behave the same way. + +with TestSocket(CAN) as cans, TestSocket(CAN) as stim: + cans.pair(stim) + with J1939SoftSocket(cans, src_addr=0x00, pgn=0) as sock: + stim.send(J1939_CAN(priority=6, pdu_format=0xFE, pdu_specific=0xCA, + src=0x01, data=b'\xAA')) + stim.send(J1939_CAN(priority=6, pdu_format=0xFE, pdu_specific=0xCB, + src=0x01, data=b'\xBB')) + _pgn0_pkts = sock.sniff(count=2, timeout=0.5) + +assert len(_pgn0_pkts) == 2, \ + "pgn=0: expected both PGNs delivered, got %d" % len(_pgn0_pkts) + + +############ +############ ++ J1939SoftSocket ↔ NativeJ1939Socket – additional interoperability tests +~ vcan_socket needs_root not_pypy + += Setup (already done in earlier section; just import what we need) + +import threading as _threading_iop2 +from time import sleep as _sleep_iop2 +from scapy.contrib.cansocket_native import NativeCANSocket +from scapy.contrib.j1939 import NativeJ1939Socket + += Soft TX PDU1 unicast (≤ 8 bytes) → NativeJ1939Socket RX at specific SA +# Unicast from soft socket (SA=0x41) to native socket bound at SA=0x42. + +_u1_payload = b'\x0A\x0B\x0C\x0D' +_u1_pgn = 0xEF00 # PDU1 (PF=0xEF=239 < 240), unicast +_u1_src = 0x41 +_u1_dst = 0x42 + +_u1_cansock = NativeCANSocket("vcan0") +_u1_native_rx = NativeJ1939Socket("vcan0", src_addr=_u1_dst, + pgn=socket.J1939_NO_PGN, promisc=False) +_u1_native_rx.ins.settimeout(3.0) + +def _u1_send(): + _sleep_iop2(0.1) + with J1939SoftSocket(_u1_cansock, src_addr=_u1_src) as s: + s.send(J1939(_u1_payload, pgn=_u1_pgn, dst=_u1_dst, priority=6)) + +_u1_t = _threading_iop2.Thread(target=_u1_send) +_u1_pkts = _u1_native_rx.sniff(timeout=3.0, started_callback=_u1_t.start, count=1) +_u1_t.join(timeout=5) +_u1_native_rx.close() + +assert _u1_pkts, "NativeJ1939Socket received no unicast from J1939SoftSocket" +_u1_rx = _u1_pkts[0] +assert _u1_rx.data == _u1_payload, \ + "Unicast payload mismatch: %r != %r" % (_u1_rx.data, _u1_payload) +assert _u1_rx.pgn == _u1_pgn, "Unicast PGN mismatch: 0x%X" % _u1_rx.pgn +assert _u1_rx.src == _u1_src, "Unicast SA mismatch: 0x%X" % _u1_rx.src + += NativeJ1939Socket TX PDU1 unicast (≤ 8 bytes) → J1939SoftSocket RX at our SA +# Native socket (SA=0x43) sends a unicast to soft socket at SA=0x44. + +_u2_payload = b'\x10\x20\x30\x40' +_u2_pgn = 0xEF00 +_u2_src = 0x43 +_u2_dst = 0x44 + +_u2_cansock = NativeCANSocket("vcan0") +_u2_soft_rx = J1939SoftSocket(_u2_cansock, src_addr=_u2_dst) +_u2_native_tx = NativeJ1939Socket("vcan0", src_addr=_u2_src, promisc=False) + +def _u2_send(): + _sleep_iop2(0.1) + _u2_native_tx.send(J1939(_u2_payload, pgn=_u2_pgn, src=_u2_src, dst=_u2_dst)) + +_u2_t = _threading_iop2.Thread(target=_u2_send) +_u2_pkts = _u2_soft_rx.sniff(timeout=3.0, started_callback=_u2_t.start, count=1) +_u2_t.join(timeout=5) +_u2_native_tx.close() +_u2_soft_rx.close() + +assert _u2_pkts, "J1939SoftSocket received no unicast from NativeJ1939Socket" +_u2_rx = _u2_pkts[0] +assert _u2_rx.data == _u2_payload, \ + "Unicast payload mismatch: %r != %r" % (_u2_rx.data, _u2_payload) +assert _u2_rx.pgn == _u2_pgn, "Unicast PGN mismatch: 0x%X" % _u2_rx.pgn +assert _u2_rx.src == _u2_src, "Unicast SA mismatch: 0x%X" % _u2_rx.src +assert _u2_rx.dst == _u2_dst, "Unicast DA mismatch: 0x%X" % _u2_rx.dst + += J1939SoftSocket TX RTS/CTS unicast (long) → NativeJ1939Socket RX +# Soft socket sends a 20-byte unicast message (triggers RTS/CTS). +# NativeJ1939Socket bound at the destination SA receives the reassembled payload. + +_rtc_iop_payload = bytes(range(0x20, 0x34)) # 20 bytes +_rtc_iop_pgn = 0xEF00 +_rtc_iop_src = 0x50 +_rtc_iop_dst = 0x51 + +_rtc_cansock = NativeCANSocket("vcan0") +_rtc_native_rx = NativeJ1939Socket("vcan0", src_addr=_rtc_iop_dst, + pgn=socket.J1939_NO_PGN, promisc=False) +_rtc_native_rx.ins.settimeout(5.0) + +def _rtc_iop_send(): + _sleep_iop2(0.1) + with J1939SoftSocket(_rtc_cansock, src_addr=_rtc_iop_src) as s: + s.send(J1939(_rtc_iop_payload, pgn=_rtc_iop_pgn, + dst=_rtc_iop_dst, priority=6)) + +_rtc_iop_t = _threading_iop2.Thread(target=_rtc_iop_send) +_rtc_iop_pkts = _rtc_native_rx.sniff(timeout=5.0, + started_callback=_rtc_iop_t.start, count=1) +_rtc_iop_t.join(timeout=10) +_rtc_native_rx.close() + +assert _rtc_iop_pkts, \ + "NativeJ1939Socket received no RTS/CTS message from J1939SoftSocket" +_rtc_iop_rx = _rtc_iop_pkts[0] +assert _rtc_iop_rx.data == _rtc_iop_payload, \ + "RTS/CTS payload mismatch: %r != %r" % (_rtc_iop_rx.data, _rtc_iop_payload) +assert _rtc_iop_rx.pgn == _rtc_iop_pgn, "RTS/CTS PGN mismatch" +assert _rtc_iop_rx.src == _rtc_iop_src, "RTS/CTS SA mismatch" + += NativeJ1939Socket TX large BAM (100 bytes) → J1939SoftSocket RX +# Native socket sends 100-byte broadcast; soft socket reassembles 15 TP.DT frames. + +_big_iop_payload = bytes(range(100)) +_big_iop_pgn = 0xFECA +_big_iop_sa = 0x60 + +_big_iop_cansock = NativeCANSocket("vcan0") +_big_iop_soft_rx = J1939SoftSocket(_big_iop_cansock, src_addr=0x00) +_big_iop_native_tx = NativeJ1939Socket("vcan0", src_addr=_big_iop_sa, promisc=False) + +def _big_iop_send(): + _sleep_iop2(0.1) + _big_iop_native_tx.send( + J1939(_big_iop_payload, pgn=_big_iop_pgn, + src=_big_iop_sa, dst=_socket.J1939_NO_ADDR)) + +_big_iop_t = _threading_iop2.Thread(target=_big_iop_send) +_big_iop_pkts = _big_iop_soft_rx.sniff(timeout=8.0, + started_callback=_big_iop_t.start, count=1) +_big_iop_t.join(timeout=12) +_big_iop_native_tx.close() +_big_iop_soft_rx.close() + +assert _big_iop_pkts, \ + "J1939SoftSocket received no 100-byte BAM from NativeJ1939Socket" +_big_iop_rx = _big_iop_pkts[0] +assert _big_iop_rx.data == _big_iop_payload, \ + "100-byte BAM payload mismatch: %r != %r" % (_big_iop_rx.data, _big_iop_payload) +assert _big_iop_rx.pgn == _big_iop_pgn +assert _big_iop_rx.src == _big_iop_sa + += J1939SoftSocket TX large BAM (100 bytes) → NativeJ1939Socket RX +# Soft socket sends 100-byte broadcast via BAM; kernel J1939 stack reassembles. + +_bigs_iop_payload = bytes(range(50, 150)) +_bigs_iop_pgn = 0xFECA +_bigs_iop_sa = 0x61 + +_bigs_iop_cansock = NativeCANSocket("vcan0") +_bigs_iop_native_rx = NativeJ1939Socket("vcan0", promisc=True) +_bigs_iop_native_rx.ins.settimeout(8.0) + +def _bigs_iop_send(): + _sleep_iop2(0.1) + with J1939SoftSocket(_bigs_iop_cansock, src_addr=_bigs_iop_sa) as s: + s.send(J1939(_bigs_iop_payload, pgn=_bigs_iop_pgn, + dst=_socket.J1939_NO_ADDR, priority=6)) + +_bigs_iop_t = _threading_iop2.Thread(target=_bigs_iop_send) +_bigs_iop_pkts = _bigs_iop_native_rx.sniff(timeout=8.0, + started_callback=_bigs_iop_t.start, + count=1) +_bigs_iop_t.join(timeout=12) +_bigs_iop_native_rx.close() + +assert _bigs_iop_pkts, \ + "NativeJ1939Socket received no 100-byte BAM from J1939SoftSocket" +_bigs_iop_rx = _bigs_iop_pkts[0] +assert _bigs_iop_rx.data == _bigs_iop_payload, \ + "100-byte soft→native BAM payload mismatch: %r != %r" % \ + (_bigs_iop_rx.data, _bigs_iop_payload) +assert _bigs_iop_rx.pgn == _bigs_iop_pgn +assert _bigs_iop_rx.src == _bigs_iop_sa + += J1939SoftSocket TX → NativeJ1939Socket RX: priority preserved on wire +# Send frames with different J1939 priorities; the native side should receive +# them with a matching source address (the kernel doesn't necessarily expose +# priority to user space, but the CAN frame ID must carry it). +# We verify at the CAN level via a NativeCANSocket sniffer. + +_prio_iop_cansock_tx = NativeCANSocket("vcan0") +_prio_iop_cansock_rx = NativeCANSocket("vcan0", basecls=J1939_CAN) +_prio_iop_cansock_rx.ins.settimeout(3.0) + +_prio_iop_sa = 0x70 +_prio_iop_pgn = 0xFECA + +def _prio_iop_send(): + _sleep_iop2(0.1) + with J1939SoftSocket(_prio_iop_cansock_tx, src_addr=_prio_iop_sa) as s: + for _pv in [3, 6]: + s.send(J1939(b'\xBB', pgn=_prio_iop_pgn, + dst=_socket.J1939_NO_ADDR, priority=_pv)) + _sleep_iop2(0.05) + +_prio_iop_t = _threading_iop2.Thread(target=_prio_iop_send) +_prio_iop_raw = _prio_iop_cansock_rx.sniff(timeout=3.0, + started_callback=_prio_iop_t.start, + count=2) +_prio_iop_t.join(timeout=5) +_prio_iop_cansock_rx.close() + +assert len(_prio_iop_raw) == 2, \ + "Priority test: expected 2 frames, got %d" % len(_prio_iop_raw) +_prio_iop_frames = [J1939_CAN(bytes(f)) for f in _prio_iop_raw] +assert _prio_iop_frames[0].priority == 3, \ + "Frame 0 priority: %d" % _prio_iop_frames[0].priority +assert _prio_iop_frames[1].priority == 6, \ + "Frame 1 priority: %d" % _prio_iop_frames[1].priority + += Multiple consecutive messages Soft → Native and back (ping-pong) +# Soft socket sends 3 messages; native socket sends 3 back; both sides verify. + +_pp_soft_sa = 0x72 +_pp_native_sa = 0x73 +_pp_pgn = 0xFECA + +_pp_soft_msgs = [b'\x01', b'\x02\x03', b'\x04\x05\x06\x07'] +_pp_native_msgs = [b'\xAA', b'\xBB\xCC', b'\xDD\xEE\xFF\x00'] + +_pp_cansock1 = NativeCANSocket("vcan0") +_pp_cansock2 = NativeCANSocket("vcan0") +_pp_soft = J1939SoftSocket(_pp_cansock1, src_addr=_pp_soft_sa) +_pp_native = NativeJ1939Socket("vcan0", src_addr=_pp_native_sa, promisc=True) +_pp_native.ins.settimeout(5.0) + +_pp_soft_rx_results = [] +_pp_native_rx_results = [] + +def _pp_native_send(): + _sleep_iop2(0.3) + for _pp_msg in _pp_native_msgs: + _pp_native.send( + J1939(_pp_msg, pgn=_pp_pgn, src=_pp_native_sa, + dst=_socket.J1939_NO_ADDR)) + _sleep_iop2(0.05) + +def _pp_soft_recv(): + # Collect 3 messages sent by the native socket (filter by src) + _collected = [] + _deadline = time.time() + 5.0 + while len(_collected) < 3 and time.time() < _deadline: + _p = _pp_soft.sniff(count=1, timeout=0.5) + if _p and _p[0].src == _pp_native_sa: + _collected.append(_p[0]) + _pp_soft_rx_results.extend(_collected) + +# Soft sends first; native sends concurrently +def _pp_soft_send(): + for _pp_msg in _pp_soft_msgs: + _pp_soft.send(J1939(_pp_msg, pgn=_pp_pgn, + dst=_socket.J1939_NO_ADDR, priority=6)) + _sleep_iop2(0.05) + +_pp_t_ss = _threading_iop2.Thread(target=_pp_soft_send) +_pp_t_ns = _threading_iop2.Thread(target=_pp_native_send) +_pp_t_sr = _threading_iop2.Thread(target=_pp_soft_recv) + +_pp_native_captured = _pp_native.sniff( + timeout=5.0, + started_callback=lambda: ( + _pp_t_ss.start(), _pp_t_ns.start(), _pp_t_sr.start()), + count=len(_pp_soft_msgs), +) + +_pp_t_ss.join(timeout=5); _pp_t_ns.join(timeout=5); _pp_t_sr.join(timeout=5) +_pp_soft.close(); _pp_native.close() + +# The native socket captured at least the soft-socket messages +_pp_from_soft = [p for p in _pp_native_captured if p.src == _pp_soft_sa] +assert len(_pp_from_soft) == len(_pp_soft_msgs), \ + "Native captured %d msgs from soft, expected %d" % ( + len(_pp_from_soft), len(_pp_soft_msgs)) +for _pi, (_pp_got, _pp_exp) in enumerate(zip(_pp_from_soft, _pp_soft_msgs)): + assert _pp_got.data == _pp_exp, \ + "Ping-pong msg %d: %r != %r" % (_pi, _pp_got.data, _pp_exp) + +# Soft socket received the native messages +assert len(_pp_soft_rx_results) == len(_pp_native_msgs), \ + "Soft received %d msgs from native, expected %d" % ( + len(_pp_soft_rx_results), len(_pp_native_msgs)) +for _pi, (_pp_got, _pp_exp) in enumerate(zip(_pp_soft_rx_results, _pp_native_msgs)): + assert _pp_got.data == _pp_exp, \ + "Soft rx msg %d: %r != %r" % (_pi, _pp_got.data, _pp_exp)