Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lmdeploy/pytorch/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class InferOutput:
finish: bool = False
logits: torch.Tensor = None
logprobs: torch.Tensor = None
last_hidden_states: torch.Tensor = None

# send cache blocks back for migration in Disaggregated LLM Serving
# when Prefill Engine is Done.
Expand Down
4 changes: 3 additions & 1 deletion lmdeploy/pytorch/engine/engine_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,22 +240,24 @@ async def async_stream_infer(self,
resp_data = resp.data
token_ids = []
logits = None
last_hidden_states = None
if resp_data is not None:
# request might be cancelled before any output
logits = resp_data.get('logits', None)
gen_token_ids = resp_data.get('token_ids', None)
if gen_token_ids is not None:
token_ids = gen_token_ids[output_offset:].tolist()
last_hidden_states = resp_data.get('last_hidden_states', None)

num_ids = len(token_ids)
num_all_ids = prompt_ids_len + output_offset + num_ids
extra_outputs = self._get_extra_outputs(resp, num_all_ids)
routed_experts = extra_outputs.get('routed_experts', None)

logger.debug(f'session[{session_id}] finish: num_out_ids={num_ids}.')
yield EngineOutput(resp.type,
token_ids,
logits=logits,
last_hidden_state=last_hidden_states,
cache_block_ids=cache_block_ids,
req_metrics=req_metrics,
routed_experts=routed_experts,
Expand Down
5 changes: 5 additions & 0 deletions lmdeploy/pytorch/engine/engine_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ def _send_resp(self, out: InferOutput):
resp_type,
data=dict(token_ids=out.token_ids,
logits=out.logits,
last_hidden_states=out.last_hidden_states,
cache_block_ids=out.cache_block_ids,
req_metrics=out.req_metrics,
routed_experts=out.routed_experts,
Expand Down Expand Up @@ -297,6 +298,7 @@ def __get_logprobs(batched_outputs: 'BatchedOutputs'):

logits = batched_outputs.logits
all_routed_experts = batched_outputs.all_routed_experts
all_hidden_states = batched_outputs.last_hidden_states

if model_inputs is not None and (model_inputs.is_chunk and not model_inputs.is_last_chunk):
# chunk long context does not need to update seqs and outputs
Expand Down Expand Up @@ -363,6 +365,9 @@ def __get_logprobs(batched_outputs: 'BatchedOutputs'):
if msg.return_logits:
logit = __get_logit(msg, logits, seq_length, idx)
outputs[session_id].logits = logit

if msg.return_last_hidden_states and all_hidden_states is not None:
outputs[session_id].last_hidden_states = all_hidden_states[idx]
return outputs

async def _main_loop_try_send_next_inputs(self):
Expand Down
6 changes: 6 additions & 0 deletions lmdeploy/pytorch/engine/inputs_maker.py
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,10 @@ def __need_routed_experts(seqs: 'SeqList'):
"""Need routed experts."""
return any(seq.return_routed_experts for seq in seqs)

def __need_hidden_states(seqs: 'SeqList'):
"""Need last hidden states."""
return any(seq.return_last_hidden_states for seq in seqs)

def __create_model_inputs(seqs):
"""Createe model inputs."""
inputs = self.create_model_inputs(seqs, True)
Expand Down Expand Up @@ -728,6 +732,7 @@ def __create_inputs_prefill():

return_logits = __need_logits(running)
return_routed_experts = __need_routed_experts(running)
return_last_hidden_states = __need_hidden_states(running)

return dict(
running=running,
Expand All @@ -740,6 +745,7 @@ def __create_inputs_prefill():
return_logits=return_logits,
extra_inputs=extra_inputs,
return_routed_experts=return_routed_experts,
return_last_hidden_states=return_last_hidden_states,
)

def do_prefill_pnode(self):
Expand Down
27 changes: 27 additions & 0 deletions lmdeploy/pytorch/engine/model_agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class BatchedOutputs:
new_token_timestamp: int = 0
extra_outputs: ExtraOutputs | None = None
all_routed_experts: torch.Tensor | None = None
last_hidden_states: torch.Tensor | None = None

def to_cpu(self):
"""To cpu."""
Expand Down Expand Up @@ -440,18 +441,36 @@ async def _async_model_forward(
self,
inputs: ModelInputs,
return_logits: bool,
return_last_hidden_states: bool = False,
):
"""Model forward."""
origin_inputs = inputs
ret = await self.async_forward(inputs)

# capture full hidden states before postprocessing slices to last token
full_hidden_states = None
if return_last_hidden_states:
raw_hidden = ret['hidden_states']
raw_seq_length = ret.get('seq_length', inputs.seq_length)
# raw_hidden shape: [1, total_tokens, hidden_dim] or [total_tokens, hidden_dim]
if raw_hidden.dim() == 3:
raw_hidden = raw_hidden[0] # [total_tokens, hidden_dim]
# slice per-sequence and mean pool
if raw_seq_length.numel() == 1:
full_hidden_states = raw_hidden.mean(dim=0, keepdim=True) # [1, hidden_dim]
else:
parts = raw_hidden.split(raw_seq_length.tolist(), dim=0)
full_hidden_states = torch.stack([p.mean(dim=0) for p in parts], dim=0) # [bs, hidden_dim]

Comment on lines +450 to +464
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mean pooling is computed per forward pass using raw_seq_length. For long-context chunking (inputs.is_chunk), intermediate chunks do not emit outputs (EngineLoop skips non-last chunks), so the embedding on the last chunk will only reflect that final chunk rather than the full input sequence. To make embeddings correct for chunked prefill, accumulate a weighted sum/count across chunks (e.g., store partial sums in the sequence state) and finalize the mean on the last chunk.

Copilot uses AI. Check for mistakes.
if not return_logits:
ret = self._postprocess_forward_output(ret, origin_inputs)

hidden_states, ret = self.spec_agent.update_main_model_outputs(ret, origin_inputs)

logits = self.get_logits(hidden_states)
ret['logits'] = logits
ret['_hidden_states'] = hidden_states
ret['_full_hidden_states'] = full_hidden_states
return ret

async def async_sampling_logits(self, logits: torch.Tensor, sampling_inputs: SamplingInputs):
Expand Down Expand Up @@ -603,6 +622,7 @@ async def _step_postprocess_with_output(self,
need_broadcast_next: bool,
return_logits: bool = False,
all_routed_experts: Any = None,
last_hidden_states: torch.Tensor = None,
extra_inputs: ExtraInputs = None):
"""Step postprocess with output."""
rank = self.rank
Expand Down Expand Up @@ -645,6 +665,7 @@ async def _step_postprocess_with_output(self,
model_metas=model_metas,
logprobs=logprobs,
all_routed_experts=all_routed_experts,
last_hidden_states=last_hidden_states,
extra_outputs=extra_outputs))

return inputs, extra_inputs, stopping_criteria, extra_outputs, next_token_ids
Expand Down Expand Up @@ -679,6 +700,7 @@ async def _async_step(
stopping_criteria: StoppingCriteria = None,
return_logits: bool = False,
return_routed_experts: bool = False,
return_last_hidden_states: bool = False,
extra_inputs: ExtraInputs = None,
):
"""Asyc forward task."""
Expand Down Expand Up @@ -739,6 +761,7 @@ async def _async_step(
output = await self._async_model_forward(
inputs,
return_logits=return_logits,
return_last_hidden_states=return_last_hidden_states,
)
# recovery is_decoding
inputs.is_decoding = is_decoding
Expand All @@ -747,6 +770,9 @@ async def _async_step(
# skip dummy forward output
return

# get pre-pooled hidden states for embeddings
last_hidden_states = output.get('_full_hidden_states', None)

logits = output['logits'][0] # [bs, seq, prob] -> [seq, prob]
seq_length = output.get('seq_length', inputs.seq_length)
last_logits = self._slice_outs(logits, seq_length) # [bs, 1, prob] -> [bs, prob]
Expand Down Expand Up @@ -778,6 +804,7 @@ async def _async_step(
need_broadcast_next,
return_logits=return_logits,
all_routed_experts=all_routed_experts,
last_hidden_states=last_hidden_states,
extra_inputs=extra_inputs,
))
else:
Expand Down
12 changes: 10 additions & 2 deletions lmdeploy/pytorch/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class SamplingParam:
logits_processors: None | list[LogitsProcessor] = None
out_logits: bool = False
out_last_hidden_states: bool = False
output_last_hidden_state: str = None
num_logprobs: int = -1
return_routed_experts: bool = False

Expand Down Expand Up @@ -92,8 +93,10 @@ def from_gen_config(cls, gen_config: GenerationConfig):
output_logits = None
logger.warning('Pytorch Engine only support output_logits="all"'
' with max_new_tokens=0')
if gen_config.output_last_hidden_state is not None:
logger.warning('Pytorch Engine does not support output last hidden states.')
output_last_hidden_state = gen_config.output_last_hidden_state
if output_last_hidden_state and output_last_hidden_state != 'all':
logger.warning('Pytorch Engine only supports output_last_hidden_state="all"')
output_last_hidden_state = None
if top_p < 0 or top_p > 1.0:
logger.warning('`top_p` has to be a float > 0 and < 1'
f' but is {top_p}')
Expand Down Expand Up @@ -156,6 +159,7 @@ def from_gen_config(cls, gen_config: GenerationConfig):
min_new_tokens=min_new_tokens,
logits_processors=gen_config.logits_processors,
out_logits=(output_logits is not None),
output_last_hidden_state=output_last_hidden_state,
num_logprobs=logprobs,
return_routed_experts=gen_config.return_routed_experts,
repetition_ngram_size=repetition_ngram_size,
Expand Down Expand Up @@ -790,6 +794,10 @@ def status(self):
def return_logits(self):
return self.sampling_param.out_logits

@property
def return_last_hidden_states(self):
return self.sampling_param.output_last_hidden_state is not None

@property
def logits(self):
"""Get logits."""
Expand Down
75 changes: 73 additions & 2 deletions lmdeploy/serve/openai/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
from __future__ import annotations

import asyncio
import base64
import json
import os
import re
import struct
import time
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
Expand Down Expand Up @@ -64,6 +66,7 @@
CompletionStreamResponse,
DeltaMessage,
EmbeddingsRequest,
EmbeddingsResponse,
EncodeRequest,
EncodeResponse,
ErrorResponse,
Expand Down Expand Up @@ -987,10 +990,78 @@ async def _inner_call():
return response


@router.post('/v1/embeddings', tags=['unsupported'])
@router.post('/v1/embeddings', dependencies=[Depends(validate_json_request)])
async def create_embeddings(request: EmbeddingsRequest, raw_request: Request = None):
"""Creates embeddings for the text."""
return create_error_response(HTTPStatus.BAD_REQUEST, 'Unsupported by turbomind.')
if isinstance(request.input, str):
inputs = [request.input]
else:
inputs = request.input

if not inputs:
return create_error_response(HTTPStatus.BAD_REQUEST, 'Input must not be empty.')

async_engine = VariableInterface.async_engine
embedding_data = []
total_prompt_tokens = 0
for idx, text in enumerate(inputs):
if not text:
return create_error_response(HTTPStatus.BAD_REQUEST, 'Input text must not be empty.')

session = VariableInterface.create_session(-1)
gen_config = GenerationConfig(
max_new_tokens=1,
output_last_hidden_state='all',
)
result_generator = async_engine.generate(
messages=text,
session_id=session.session_id,
gen_config=gen_config,
stream_response=True,
sequence_start=True,
sequence_end=True,
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AsyncEngine.generate defaults to do_preprocess=True, which applies the configured chat template even when messages is a plain string (see MultimodalProcessor._get_text_prompt_input). For an OpenAI-compatible embeddings endpoint, this will change the text being embedded (e.g., adding user/assistant wrappers). Pass do_preprocess=False here (similar to how string prompts are handled in /v1/chat/completions).

Suggested change
sequence_end=True,
sequence_end=True,
do_preprocess=False,

Copilot uses AI. Check for mistakes.
do_preprocess=False,
)

last_hidden_state = None
prompt_tokens = 0
async for res in result_generator:
if res.finish_reason == 'error':
return create_error_response(
HTTPStatus.INTERNAL_SERVER_ERROR,
getattr(res, 'text', 'Internal error during embedding generation.'),
)
if res.last_hidden_state is not None:
last_hidden_state = res.last_hidden_state
prompt_tokens = res.input_token_len

total_prompt_tokens += prompt_tokens

if last_hidden_state is None:
return create_error_response(
HTTPStatus.INTERNAL_SERVER_ERROR,
'Model does not support hidden states output for embeddings.',
)
Comment on lines +1028 to +1044
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The loop ignores res.finish_reason / error frames from AsyncEngine.generate (e.g. when prefix caching is enabled with output_last_hidden_state='all', the generator yields a finish_reason='error' frame with an error message and no hidden states). Currently this falls through to a generic 500. Handle finish_reason=='error' (and possibly client disconnect) and surface the actual error message/status to the caller.

Copilot uses AI. Check for mistakes.

# Convert to list (hidden states are already mean-pooled per sequence)
if last_hidden_state.dim() > 1:
# multi-token: mean pool across sequence dimension
emb_list = last_hidden_state.mean(dim=0).tolist()
else:
emb_list = last_hidden_state.tolist()

if request.encoding_format == 'base64':
packed = struct.pack(f'<{len(emb_list)}f', *emb_list)
encoded = base64.b64encode(packed).decode('utf-8')
embedding_data.append({'object': 'embedding', 'embedding': encoded, 'index': idx})
else:
embedding_data.append({'object': 'embedding', 'embedding': emb_list, 'index': idx})

return EmbeddingsResponse(
data=embedding_data,
model=request.model or async_engine.model_name,
usage=UsageInfo(prompt_tokens=total_prompt_tokens, total_tokens=total_prompt_tokens, completion_tokens=0),
)
Comment on lines +1060 to +1064
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prompt_tokens is overwritten per input and the final usage only reflects the last item. Also, when request.model is omitted the response currently returns an empty string, unlike other endpoints (e.g. /pooling) which default to async_engine.model_name. Consider summing prompt tokens across all inputs and defaulting model to the server model name when not provided.

Copilot uses AI. Check for mistakes.


@router.post('/v1/encode', dependencies=[Depends(validate_json_request)])
Expand Down
1 change: 1 addition & 0 deletions lmdeploy/serve/openai/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ class EmbeddingsRequest(BaseModel):
"""Embedding request."""
model: str = None
input: str | list[str]
encoding_format: Literal['float', 'base64'] = 'float'
user: str | None = None


Expand Down
Empty file.
Loading
Loading