feat(provider): 添加 StepFun ASR Provider#8362
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces support for the StepFun ASR (Speech-to-Text) API. It adds configuration options, dynamically loads the new provider, implements audio preprocessing (including format detection and conversion of silk/amr to wav), and integrates SSE parsing for the transcription stream. The feedback highlights opportunities to improve robustness and performance, specifically by handling standard \r\n line endings in the SSE parser, adding type checks to prevent crashes on malformed JSON payloads, and offloading synchronous file reading to a separate thread to avoid blocking the asyncio event loop.
|
|
||
|
|
||
| def _iter_sse_payloads(content: str): | ||
| for event in content.split("\n\n"): |
There was a problem hiding this comment.
If the server returns standard HTTP/SSE line endings (\r\n), content.split("\n\n") will fail to split the events because the separator will be \r\n\r\n (which contains \n\r\n instead of \n\n). Normalizing the line endings by replacing \r\n with \n first ensures robust parsing of the SSE stream.
| for event in content.split("\n\n"): | |
| normalized_content = content.replace("\r\n", "\n") | |
| for event in normalized_content.split("\n\n"): |
There was a problem hiding this comment.
已在 604ecd8ac 中修复:_iter_sse_payloads() 解析前会先将 CRLF 规范化为 LF,避免标准 SSE \r\n\r\n 分隔导致事件拆分失败。
| try: | ||
| data = json.loads(payload) | ||
| except json.JSONDecodeError: | ||
| continue | ||
|
|
||
| event_type = str(data.get("type") or data.get("event") or "") |
There was a problem hiding this comment.
If the JSON payload returned by the SSE stream is not a dictionary (e.g., a list, string, or null), calling data.get(...) will raise an AttributeError and crash the transcription process. Adding a type check ensures the parser handles unexpected or malformed payloads gracefully.
try:
data = json.loads(payload)
except json.JSONDecodeError:
continue
if not isinstance(data, dict):
continue
event_type = str(data.get("type") or data.get("event") or "")There was a problem hiding this comment.
已在 604ecd8ac 中修复:json.loads() 后增加 isinstance(data, dict) 检查,非对象 SSE payload 会跳过,避免 .get() 触发异常。
| source_path = converted_path | ||
| audio_type = "wav" | ||
|
|
||
| encoded_audio = base64.b64encode(source_path.read_bytes()).decode("utf-8") |
There was a problem hiding this comment.
Reading the entire audio file synchronously using source_path.read_bytes() inside an async function blocks the single-threaded asyncio event loop. For larger audio files, this can cause significant latency spikes and degrade the performance of other concurrent bot operations. Running the file read operation in a separate thread using asyncio.to_thread keeps the event loop responsive.
| encoded_audio = base64.b64encode(source_path.read_bytes()).decode("utf-8") | |
| import asyncio | |
| raw_bytes = await asyncio.to_thread(source_path.read_bytes) | |
| encoded_audio = base64.b64encode(raw_bytes).decode("utf-8") |
There was a problem hiding this comment.
已在 14baae962 中处理:音频文件读取改为 await asyncio.to_thread(source_path.read_bytes),避免大音频读取阻塞事件循环。
There was a problem hiding this comment.
Hey - I've found 3 issues, and left some high level feedback:
- In
create_http_client,httpx.AsyncClientdoes not accept aproxykeyword argument (it usesproxiesand/ortransportinstead), so the currentclient_kwargs["proxy"] = proxywill be ignored or raise; update this to the correct parameter to ensure the configured proxy is actually used. - The SSE handling in
get_textbuffers the entire response viaresponse.textbefore parsing; if the StepFun ASR endpoint produces longer streams it would be more robust to iterate the response asynchronously (e.g.,async for line in response.aiter_lines()) and feed lines into the SSE parser incrementally.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `create_http_client`, `httpx.AsyncClient` does not accept a `proxy` keyword argument (it uses `proxies` and/or `transport` instead), so the current `client_kwargs["proxy"] = proxy` will be ignored or raise; update this to the correct parameter to ensure the configured proxy is actually used.
- The SSE handling in `get_text` buffers the entire response via `response.text` before parsing; if the StepFun ASR endpoint produces longer streams it would be more robust to iterate the response asynchronously (e.g., `async for line in response.aiter_lines()`) and feed lines into the SSE parser incrementally.
## Individual Comments
### Comment 1
<location path="astrbot/core/provider/sources/stepfun_asr_source.py" line_range="67-72" />
<code_context>
+ return headers
+
+
+def create_http_client(timeout: int | None, proxy: str) -> httpx.AsyncClient:
+ client_kwargs: dict[str, object] = {
+ "timeout": timeout,
+ "follow_redirects": True,
+ }
+ if proxy:
+ logger.info("[StepFun ASR] Using proxy: %s", proxy)
+ client_kwargs["proxy"] = proxy
</code_context>
<issue_to_address>
**issue (bug_risk):** httpx.AsyncClient uses `proxies` rather than `proxy`, so the proxy setting will be silently ignored.
Because `httpx.AsyncClient` only accepts `proxies` (not `proxy`) at the client level, this value will be ignored and requests won’t use the proxy. Please set `client_kwargs["proxies"] = proxy`, or use a full `proxies` dict if you need per-scheme settings.
</issue_to_address>
### Comment 2
<location path="astrbot/core/provider/sources/stepfun_asr_source.py" line_range="72-73" />
<code_context>
+ "timeout": timeout,
+ "follow_redirects": True,
+ }
+ if proxy:
+ logger.info("[StepFun ASR] Using proxy: %s", proxy)
+ client_kwargs["proxy"] = proxy
+ return httpx.AsyncClient(**client_kwargs)
</code_context>
<issue_to_address>
**🚨 suggestion (security):** Logging the full proxy URL can inadvertently leak credentials to logs.
URLs like `http://user:pass@host:port` will print credentials directly into logs here, which is a security/privacy risk in shared or persistent logging systems. Please either avoid logging the proxy value (only log that a proxy is in use) or sanitize it first by removing the userinfo segment before logging.
Suggested implementation:
```python
def create_http_client(timeout: int | None, proxy: str) -> httpx.AsyncClient:
client_kwargs: dict[str, object] = {
"timeout": timeout,
"follow_redirects": True,
}
if proxy:
try:
# Sanitize proxy to avoid logging credentials (user:pass@)
parts = urlsplit(proxy)
netloc = parts.netloc
if "@" in netloc:
# Drop userinfo, keep host:port
netloc = netloc.split("@", 1)[1]
sanitized_proxy = urlunsplit(
(
parts.scheme,
netloc,
parts.path,
parts.query,
parts.fragment,
)
)
except Exception:
# If anything goes wrong during parsing, avoid leaking the raw proxy
sanitized_proxy = "<redacted>"
logger.info("[StepFun ASR] Using proxy: %s", sanitized_proxy)
client_kwargs["proxy"] = proxy
return httpx.AsyncClient(**client_kwargs)
```
To make this compile, ensure the following imports are present near the top of `astrbot/core/provider/sources/stepfun_asr_source.py`:
- `from urllib.parse import urlsplit, urlunsplit`
If those imports are not already in the file, add them alongside the other imports.
</issue_to_address>
### Comment 3
<location path="astrbot/core/provider/sources/stepfun_asr_source.py" line_range="120" />
<code_context>
+ return {"type": audio_type}
+
+
+async def prepare_audio_input(
+ audio_source: str,
+) -> tuple[str, dict[str, object], list[Path]]:
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying this provider by inlining single-use helpers and narrowing boolean normalization to reduce unnecessary abstraction and make config behavior clearer.
You can trim a few abstraction layers without changing behavior:
### 1. Drop the duplicate `prepare_audio_input` indirection
`ProviderStepFunASR.prepare_audio_input` is a thin delegator to the module‑level function. Keeping only one entry point will make it easier to follow.
Inline the logic into the method and remove the top‑level `prepare_audio_input`:
```python
# remove this function
async def prepare_audio_input(
audio_source: str,
) -> tuple[str, dict[str, object], list[Path]]:
...
# in class ProviderStepFunASR:
async def prepare_audio_input(
self,
audio_source: str,
) -> tuple[str, dict[str, object], list[Path]]:
cleanup_paths: list[Path] = []
source_path = Path(audio_source)
is_remote = audio_source.startswith(("http://", "https://"))
is_tencent = "multimedia.nt.qq.com.cn" in audio_source if is_remote else False
if is_remote:
parsed_url = urlparse(audio_source)
suffix = Path(parsed_url.path).suffix or ".input"
download_path = get_temp_dir() / f"stepfun_asr_{uuid.uuid4().hex[:8]}{suffix}"
await download_file(audio_source, str(download_path))
source_path = download_path
cleanup_paths.append(download_path)
if not source_path.exists():
raise FileNotFoundError(f"File does not exist: {source_path}")
audio_type = await detect_audio_format(source_path)
if audio_type is None:
audio_type = source_path.suffix.lower().lstrip(".")
if source_path.suffix.lower() in {".amr", ".silk"} or is_tencent:
file_format = await detect_audio_format(source_path)
if file_format in {"silk", "amr"}:
converted_path = get_temp_dir() / f"stepfun_asr_{uuid.uuid4().hex[:8]}.wav"
cleanup_paths.append(converted_path)
if file_format == "silk":
logger.info("Converting silk file to wav for StepFun ASR...")
await tencent_silk_to_wav(str(source_path), str(converted_path))
else:
logger.info("Converting amr file to wav for StepFun ASR...")
await convert_to_pcm_wav(str(source_path), str(converted_path))
source_path = converted_path
audio_type = "wav"
if audio_type not in SUPPORTED_AUDIO_FORMATS:
converted_path = get_temp_dir() / f"stepfun_asr_{uuid.uuid4().hex[:8]}.wav"
cleanup_paths.append(converted_path)
logger.info("Converting audio file to wav for StepFun ASR...")
await convert_audio_to_wav(str(source_path), str(converted_path))
source_path = converted_path
audio_type = "wav"
encoded_audio = base64.b64encode(source_path.read_bytes()).decode("utf-8")
return encoded_audio, build_audio_format(audio_type), cleanup_paths
```
Then:
```python
# in get_text
encoded_audio, audio_format, cleanup_paths = await self.prepare_audio_input(audio_url)
```
### 2. Inline `create_http_client` into `__init__`
`create_http_client` has a single call site and simple logic. Inlining removes one level of indirection without changing behavior:
```python
# remove this helper
def create_http_client(timeout: int | None, proxy: str) -> httpx.AsyncClient:
client_kwargs: dict[str, object] = {
"timeout": timeout,
"follow_redirects": True,
}
if proxy:
logger.info("[StepFun ASR] Using proxy: %s", proxy)
client_kwargs["proxy"] = proxy
return httpx.AsyncClient(**client_kwargs)
# in __init__
self.timeout = normalize_timeout(provider_config.get("timeout", 20))
...
client_kwargs: dict[str, object] = {
"timeout": self.timeout,
"follow_redirects": True,
}
if self.proxy:
logger.info("[StepFun ASR] Using proxy: %s", self.proxy)
client_kwargs["proxy"] = self.proxy
self.client = httpx.AsyncClient(**client_kwargs)
```
### 3. Narrow `normalize_bool` if the config shape allows it
If `provider_config` is controlled and you don’t actually pass `int`/arbitrary strings, you can simplify `normalize_bool` to be more predictable:
```python
def normalize_bool(value: bool | str | None, default: bool) -> bool:
if value is None or value == "":
return default
if isinstance(value, bool):
return value
value_str = value.strip().lower()
if value_str in {"true", "yes", "on"}:
return True
if value_str in {"false", "no", "off"}:
return False
return default
```
This keeps existing behavior for typical truthy/falsey config strings but removes support for less obvious cases like `"enabled"`, `"1"`, and numeric types, making the config semantics easier to reason about.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
a4c4a7d to
9bd38ca
Compare
概要
新增阶跃星辰语音转文本服务提供商 StepFun StepAudio ASR Provider,用于接入 Step Plan 的语音转文字接口。
解决#7774 提到的需求,阶跃星辰的Coding Plan端点无法通过现有OpanAI接入方式调用。
主要改动
stepfun_asr语音转文字 Provider,调用/audio/asr/sse。stepfun_asr。验证