Skip to content

Commit a2fc49d

Browse files
committed
refactor: decompose func_metadata() into smaller focused helpers
Extract the monolithic func_metadata() function into smaller, well-documented helper functions for improved readability, testability, and extensibility. Changes: - Extract _get_function_signature() for signature introspection - Extract _build_arg_model() for parameter model construction - Extract _resolve_return_annotation() for return type analysis - Extract _create_output_model() for type-based model dispatch - Extract _try_generate_strict_schema() for schema generation with error handling - Simplify _try_create_model_and_schema() to delegate to new helpers All existing tests pass without modification. No behavioral changes.
1 parent 98f8ef2 commit a2fc49d

File tree

1 file changed

+153
-76
lines changed

1 file changed

+153
-76
lines changed

src/mcp/server/mcpserver/utilities/func_metadata.py

Lines changed: 153 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ def pre_parse_json(self, data: dict[str, Any]) -> dict[str, Any]:
155155
continue # Not JSON - skip
156156
if isinstance(pre_parsed, str | int | float):
157157
# This is likely that the raw value is e.g. `"hello"` which we
158-
# Should really be parsed as '"hello"' in Python - but if we parse
158+
# Should really be parsed as '"'hello'"' in Python - but if we parse
159159
# it as JSON it'll turn into just 'hello'. So we skip it.
160160
continue
161161
new_data[data_key] = pre_parsed
@@ -206,19 +206,69 @@ def func_metadata(
206206
A FuncMetadata object containing:
207207
- arg_model: A Pydantic model representing the function's arguments
208208
- output_model: A Pydantic model for the return type if the output is structured
209-
- wrap_output: Whether the function result needs to be wrapped in `{"result": ...}` for structured output.
209+
- wrap_output: Whether the function result needs to be wrapped in {"result": ...}
210+
for structured output.
210211
"""
212+
sig = _get_function_signature(func)
213+
arguments_model = _build_arg_model(sig, func.__name__, skip_names)
214+
215+
if structured_output is False:
216+
return FuncMetadata(arg_model=arguments_model)
217+
218+
resolved = _resolve_return_annotation(sig, structured_output, func.__name__)
219+
if resolved is None:
220+
return FuncMetadata(arg_model=arguments_model)
221+
222+
original_annotation, return_type_expr = resolved
223+
224+
output_model, output_schema, wrap_output = _try_create_model_and_schema(
225+
original_annotation, return_type_expr, func.__name__
226+
)
227+
228+
if output_model is None and structured_output is True:
229+
raise InvalidSignature(
230+
f"Function {func.__name__}: return type {return_type_expr} is not serializable for structured output"
231+
)
232+
233+
return FuncMetadata(
234+
arg_model=arguments_model,
235+
output_schema=output_schema,
236+
output_model=output_model,
237+
wrap_output=wrap_output,
238+
)
239+
240+
241+
def _get_function_signature(func: Callable[..., Any]) -> inspect.Signature:
242+
"""Get the signature of a function, raising InvalidSignature on failure."""
211243
try:
212-
sig = inspect.signature(func, eval_str=True)
244+
return inspect.signature(func, eval_str=True)
213245
except NameError as e: # pragma: no cover
214-
# This raise could perhaps be skipped, and we (MCPServer) just call
215-
# model_rebuild right before using it 🤷
216246
raise InvalidSignature(f"Unable to evaluate type annotations for callable {func.__name__!r}") from e
247+
248+
249+
def _build_arg_model(
250+
sig: inspect.Signature,
251+
func_name: str,
252+
skip_names: Sequence[str] = (),
253+
) -> type[ArgModelBase]:
254+
"""Build a Pydantic model representing the function's arguments.
255+
256+
Iterates over the function's parameters, handling type annotations, defaults,
257+
and BaseModel attribute name conflicts (via aliasing).
258+
259+
Args:
260+
sig: The function's inspect.Signature.
261+
func_name: The function's name (used for the model name).
262+
skip_names: Parameter names to exclude from the model.
263+
264+
Returns:
265+
A dynamically created Pydantic model class.
266+
"""
217267
params = sig.parameters
218268
dynamic_pydantic_model_params: dict[str, Any] = {}
219269
for param in params.values():
220270
if param.name.startswith("_"): # pragma: no cover
221-
raise InvalidSignature(f"Parameter {param.name} of {func.__name__} cannot start with '_'")
271+
raise InvalidSignature(f"Parameter {param.name} of {func_name} cannot start with '_'")
222272
if param.name in skip_names:
223273
continue
224274

@@ -245,24 +295,38 @@ def func_metadata(
245295
else:
246296
dynamic_pydantic_model_params[field_name] = Annotated[(annotation, *field_metadata, Field(**field_kwargs))]
247297

248-
arguments_model = create_model(
249-
f"{func.__name__}Arguments",
298+
return create_model(
299+
f"{func_name}Arguments",
250300
__base__=ArgModelBase,
251301
**dynamic_pydantic_model_params,
252302
)
253303

254-
if structured_output is False:
255-
return FuncMetadata(arg_model=arguments_model)
256304

257-
# set up structured output support based on return type annotation
305+
def _resolve_return_annotation(
306+
sig: inspect.Signature,
307+
structured_output: bool | None,
308+
func_name: str,
309+
) -> tuple[Any, Any] | None:
310+
"""Resolve and validate the function's return type annotation for structured output.
311+
312+
Handles special cases including CallToolResult, Annotated metadata, and Union types.
258313
314+
Args:
315+
sig: The function's inspect.Signature.
316+
structured_output: Whether structured output is requested (None for auto-detect).
317+
func_name: The function's name (used for error messages).
318+
319+
Returns:
320+
A tuple of (original_annotation, type_expr) if structured output should be
321+
attempted, or None if no structured output is needed.
322+
"""
259323
if sig.return_annotation is inspect.Parameter.empty and structured_output is True:
260-
raise InvalidSignature(f"Function {func.__name__}: return annotation required for structured output")
324+
raise InvalidSignature(f"Function {func_name}: return annotation required for structured output")
261325

262326
try:
263327
inspected_return_ann = inspect_annotation(sig.return_annotation, annotation_source=AnnotationSource.FUNCTION)
264328
except ForbiddenQualifier as e:
265-
raise InvalidSignature(f"Function {func.__name__}: return annotation contains an invalid type qualifier") from e
329+
raise InvalidSignature(f"Function {func_name}: return annotation contains an invalid type qualifier") from e
266330

267331
return_type_expr = inspected_return_ann.type
268332

@@ -275,7 +339,7 @@ def func_metadata(
275339
# Check if CallToolResult appears in the union (excluding None for Optional check)
276340
if any(isinstance(arg, type) and issubclass(arg, CallToolResult) for arg in args if arg is not type(None)):
277341
raise InvalidSignature(
278-
f"Function {func.__name__}: CallToolResult cannot be used in Union or Optional types. "
342+
f"Function {func_name}: CallToolResult cannot be used in Union or Optional types. "
279343
"To return empty results, use: CallToolResult(content=[])"
280344
)
281345

@@ -297,26 +361,11 @@ def func_metadata(
297361
# as being `ReturnType`:
298362
original_annotation = return_type_expr
299363
else:
300-
return FuncMetadata(arg_model=arguments_model)
364+
return None
301365
else:
302366
original_annotation = sig.return_annotation
303367

304-
output_model, output_schema, wrap_output = _try_create_model_and_schema(
305-
original_annotation, return_type_expr, func.__name__
306-
)
307-
308-
if output_model is None and structured_output is True:
309-
# Model creation failed or produced warnings - no structured output
310-
raise InvalidSignature(
311-
f"Function {func.__name__}: return type {return_type_expr} is not serializable for structured output"
312-
)
313-
314-
return FuncMetadata(
315-
arg_model=arguments_model,
316-
output_schema=output_schema,
317-
output_model=output_model,
318-
wrap_output=wrap_output,
319-
)
368+
return original_annotation, cast(Any, return_type_expr)
320369

321370

322371
def _try_create_model_and_schema(
@@ -337,16 +386,46 @@ def _try_create_model_and_schema(
337386
Model and schema are None if warnings occur or creation fails.
338387
wrap_output is True if the result needs to be wrapped in {"result": ...}
339388
"""
340-
model = None
341-
wrap_output = False
389+
model, wrap_output = _create_output_model(original_annotation, type_expr, func_name)
390+
391+
if model is not None:
392+
schema = _try_generate_strict_schema(model, type_expr, func_name)
393+
if schema is None:
394+
return None, None, False
395+
return model, schema, wrap_output
396+
397+
return None, None, False
342398

343-
# First handle special case: None
399+
400+
def _create_output_model(
401+
original_annotation: Any,
402+
type_expr: Any,
403+
func_name: str,
404+
) -> tuple[type[BaseModel] | None, bool]:
405+
"""Create a Pydantic model for the function's return type.
406+
407+
Dispatches to the appropriate model creation strategy based on the type:
408+
- None -> wrapped model
409+
- GenericAlias (list, dict, Union, etc.) -> wrapped or dict model
410+
- BaseModel subclasses -> used directly
411+
- TypedDict -> converted to Pydantic model
412+
- Primitive types -> wrapped model
413+
- Classes with type hints -> converted to Pydantic model
414+
415+
Args:
416+
original_annotation: The original return annotation.
417+
type_expr: The underlying type expression.
418+
func_name: The function's name.
419+
420+
Returns:
421+
A tuple of (model or None, wrap_output).
422+
"""
423+
# Special case: None
344424
if type_expr is None:
345-
model = _create_wrapped_model(func_name, original_annotation)
346-
wrap_output = True
425+
return _create_wrapped_model(func_name, original_annotation), True
347426

348427
# Handle GenericAlias types (list[str], dict[str, int], Union[str, int], etc.)
349-
elif isinstance(type_expr, GenericAlias):
428+
if isinstance(type_expr, GenericAlias):
350429
origin = get_origin(type_expr)
351430

352431
# Special case: dict with string keys can use RootModel
@@ -355,65 +434,63 @@ def _try_create_model_and_schema(
355434
if len(args) == 2 and args[0] is str:
356435
# TODO: should we use the original annotation? We are losing any potential `Annotated`
357436
# metadata for Pydantic here:
358-
model = _create_dict_model(func_name, type_expr)
437+
return _create_dict_model(func_name, type_expr), False
359438
else:
360439
# dict with non-str keys needs wrapping
361-
model = _create_wrapped_model(func_name, original_annotation)
362-
wrap_output = True
440+
return _create_wrapped_model(func_name, original_annotation), True
363441
else:
364442
# All other generic types need wrapping (list, tuple, Union, Optional, etc.)
365-
model = _create_wrapped_model(func_name, original_annotation)
366-
wrap_output = True
443+
return _create_wrapped_model(func_name, original_annotation), True
367444

368445
# Handle regular type objects
369-
elif isinstance(type_expr, type):
446+
if isinstance(type_expr, type):
370447
type_annotation = cast(type[Any], type_expr)
371448

372449
# Case 1: BaseModel subclasses (can be used directly)
373450
if issubclass(type_annotation, BaseModel):
374-
model = type_annotation
451+
return type_annotation, False
375452

376453
# Case 2: TypedDicts:
377-
elif is_typeddict(type_annotation):
378-
model = _create_model_from_typeddict(type_annotation)
454+
if is_typeddict(type_annotation):
455+
return _create_model_from_typeddict(type_annotation), False
379456

380457
# Case 3: Primitive types that need wrapping
381-
elif type_annotation in (str, int, float, bool, bytes, type(None)):
382-
model = _create_wrapped_model(func_name, original_annotation)
383-
wrap_output = True
458+
if type_annotation in (str, int, float, bool, bytes, type(None)):
459+
return _create_wrapped_model(func_name, original_annotation), True
384460

385461
# Case 4: Other class types (dataclasses, regular classes with annotations)
386-
else:
387-
type_hints = get_type_hints(type_annotation)
388-
if type_hints:
389-
# Classes with type hints can be converted to Pydantic models
390-
model = _create_model_from_class(type_annotation, type_hints)
391-
# Classes without type hints are not serializable - model remains None
462+
type_hints = get_type_hints(type_annotation)
463+
if type_hints:
464+
# Classes with type hints can be converted to Pydantic models
465+
return _create_model_from_class(type_annotation, type_hints), False
466+
# Classes without type hints are not serializable
467+
return None, False
392468

393469
# Handle any other types not covered above
394-
else:
395-
# This includes typing constructs that aren't GenericAlias in Python 3.10
396-
# (e.g., Union, Optional in some Python versions)
397-
model = _create_wrapped_model(func_name, original_annotation)
398-
wrap_output = True
399-
400-
if model:
401-
# If we successfully created a model, try to get its schema
402-
# Use StrictJsonSchema to raise exceptions instead of warnings
403-
try:
404-
schema = model.model_json_schema(schema_generator=StrictJsonSchema)
405-
except (TypeError, ValueError, pydantic_core.SchemaError, pydantic_core.ValidationError) as e:
406-
# These are expected errors when a type can't be converted to a Pydantic schema
407-
# TypeError: When Pydantic can't handle the type
408-
# ValueError: When there are issues with the type definition (including our custom warnings)
409-
# SchemaError: When Pydantic can't build a schema
410-
# ValidationError: When validation fails
411-
logger.info(f"Cannot create schema for type {type_expr} in {func_name}: {type(e).__name__}: {e}")
412-
return None, None, False
470+
# This includes typing constructs that aren't GenericAlias in Python 3.10
471+
# (e.g., Union, Optional in some Python versions)
472+
return _create_wrapped_model(func_name, original_annotation), True
413473

414-
return model, schema, wrap_output
415474

416-
return None, None, False
475+
def _try_generate_strict_schema(
476+
model: type[BaseModel],
477+
type_expr: Any,
478+
func_name: str,
479+
) -> dict[str, Any] | None:
480+
"""Try to generate a JSON schema using StrictJsonSchema.
481+
482+
Returns the schema dict on success, or None if the type cannot be serialized.
483+
"""
484+
try:
485+
return model.model_json_schema(schema_generator=StrictJsonSchema)
486+
except (TypeError, ValueError, pydantic_core.SchemaError, pydantic_core.ValidationError) as e:
487+
# These are expected errors when a type can't be converted to a Pydantic schema
488+
# TypeError: When Pydantic can't handle the type
489+
# ValueError: When there are issues with the type definition (including our custom warnings)
490+
# SchemaError: When Pydantic can't build a schema
491+
# ValidationError: When validation fails
492+
logger.info(f"Cannot create schema for type {type_expr} in {func_name}: {type(e).__name__}: {e}")
493+
return None
417494

418495

419496
_no_default = object()

0 commit comments

Comments
 (0)