Python Guide
Installation
pip install protomcpThe @tool() decorator
Decorate any function with @tool("description") to register it as an MCP tool. The function name becomes the tool name.
from protomcp import tool, ToolResult
@tool("Add two integers")def add(a: int, b: int) -> ToolResult: return ToolResult(result=str(a + b))Tool metadata
Pass additional keyword arguments to @tool() to provide metadata hints to the MCP host:
@tool( "Delete a file from disk", title="Delete File", destructive=True, idempotent=False, read_only=False, open_world=False, task_support=False,)def delete_file(path: str) -> ToolResult: os.remove(path) return ToolResult(result=f"Deleted {path}")| Parameter | Type | Default | Description |
|---|---|---|---|
description | str | required | Human-readable description of what the tool does |
title | str | "" | Display name shown in the MCP host UI |
destructive | bool | False | Hint: the tool has destructive side effects |
idempotent | bool | False | Hint: calling the tool multiple times has the same effect as once |
read_only | bool | False | Hint: the tool does not modify state |
open_world | bool | False | Hint: the tool may access resources outside the current context |
task_support | bool | False | Hint: the tool supports long-running async task semantics |
output_type | dataclass type | None | Dataclass type for structured output schema generation |
Type hints and schema generation
protomcp reads Python type hints to generate the JSON Schema for tool inputs automatically.
| Python type | JSON Schema type |
|---|---|
str | "string" |
int | "integer" |
float | "number" |
bool | "boolean" |
list | "array" |
dict | "object" |
list[T] | {"type": "array", "items": <T schema>} |
dict[K, V] | {"type": "object", "additionalProperties": <V schema>} |
str | int / Union[str, int] | {"anyOf": [...]} |
Optional[T] | type of T, not required |
Literal["a", "b"] | {"type": "string", "enum": ["a", "b"]} |
@tool("Search for documents")def search(query: str, limit: int = 10, include_archived: bool = False) -> ToolResult: # limit and include_archived are optional because they have defaults results = do_search(query, limit, include_archived) return ToolResult(result=str(results))Optional parameters
Use Optional[T] or a default value to mark a parameter as optional:
from typing import Optional
@tool("Send a notification")def notify(message: str, channel: Optional[str] = None) -> ToolResult: ch = channel or "default" send(ch, message) return ToolResult(result=f"Sent to {ch}")Parameters with Optional[T] or a default are not added to the required array in the JSON Schema.
ToolResult
All tool handlers must return a ToolResult.
from dataclasses import dataclassfrom typing import Optional
@dataclassclass ToolResult: result: str = "" is_error: bool = False enable_tools: Optional[list[str]] = None disable_tools: Optional[list[str]] = None error_code: Optional[str] = None message: Optional[str] = None suggestion: Optional[str] = None retryable: bool = FalseSuccess
return ToolResult(result="done")Structured error
return ToolResult( is_error=True, error_code="NOT_FOUND", message="The file /tmp/data.csv does not exist", suggestion="Check the path and try again", retryable=False,)Enabling / disabling tools from a result
Return enable_tools or disable_tools to change the active tool list after this call completes:
@tool("Log in and unlock tools")def login(username: str, password: str) -> ToolResult: if not authenticate(username, password): return ToolResult(is_error=True, message="Authentication failed") return ToolResult( result="Logged in", enable_tools=["delete_file", "write_file"], )tool_manager
Use tool_manager to modify the active tool list during a tool call (not just at return time).
from protomcp import tool, ToolResult, tool_manager
@tool("Enable debug tools")def enable_debug() -> ToolResult: active = tool_manager.enable(["debug_dump", "trace_calls"]) return ToolResult(result=f"Active tools: {active}")
@tool("Disable debug tools")def disable_debug() -> ToolResult: active = tool_manager.disable(["debug_dump", "trace_calls"]) return ToolResult(result=f"Active tools: {active}")tool_manager API
All functions return a list[str] of the currently active tool names after the operation.
tool_manager.enable(tool_names: list[str]) -> list[str]tool_manager.disable(tool_names: list[str]) -> list[str]tool_manager.set_allowed(tool_names: list[str]) -> list[str]tool_manager.set_blocked(tool_names: list[str]) -> list[str]tool_manager.get_active_tools() -> list[str]tool_manager.batch(enable=None, disable=None, allow=None, block=None) -> list[str]set_allowed and set_blocked switch the tool list to allowlist/blocklist mode respectively. See Tool List Modes.
Batch operations
Use batch to perform multiple operations atomically:
active = tool_manager.batch( enable=["write_file"], disable=["read_only_mode"],)Progress Reporting
Use ToolContext to report progress during a long-running tool call. Declare a ctx: ToolContext parameter in your handler — protomcp injects it automatically and skips it during schema generation.
from protomcp import tool, ToolResultfrom protomcp.context import ToolContext
@tool("Process a large dataset")def process_data(file_path: str, ctx: ToolContext) -> ToolResult: rows = load_rows(file_path) total = len(rows) for i, row in enumerate(rows): if ctx.is_cancelled(): return ToolResult(is_error=True, message="Cancelled") process_row(row) ctx.report_progress(i + 1, total, f"Processing row {i + 1}/{total}") return ToolResult(result=f"Processed {total} rows")ToolContext API
| Method | Signature | Description |
|---|---|---|
report_progress | (progress: int, total: int = 0, message: str = "") -> None | Send a progress notification to the MCP host |
is_cancelled | () -> bool | Returns True if the MCP host has cancelled this call |
report_progress is a no-op if no progress_token was provided by the host (i.e. the host does not support progress notifications).
Server Logging
Use ServerLogger to send structured log messages back to the MCP host (not to stderr). The host can display or filter these messages.
import protomcpfrom protomcp import tool, ToolResult
@tool("Fetch remote data")def fetch_data(url: str) -> ToolResult: protomcp.log.info("Fetching URL", data={"url": url}) try: data = download(url) protomcp.log.debug("Fetch complete", data={"bytes": len(data)}) return ToolResult(result=data) except Exception as e: protomcp.log.error("Fetch failed", data={"url": url, "error": str(e)}) return ToolResult(is_error=True, message=str(e))ServerLogger API
All methods accept a message string and an optional data keyword argument (a dict serialized to JSON).
| Method | Level |
|---|---|
debug(message, *, data=None) | debug |
info(message, *, data=None) | info |
notice(message, *, data=None) | notice |
warning(message, *, data=None) | warning |
error(message, *, data=None) | error |
critical(message, *, data=None) | critical |
alert(message, *, data=None) | alert |
emergency(message, *, data=None) | emergency |
Structured Output
Use output_type in @tool() to declare a structured output schema. Pass a dataclass type — protomcp generates the JSON Schema automatically.
from dataclasses import dataclassfrom protomcp import tool, ToolResult
@dataclassclass SearchResult: title: str url: str score: float
@tool("Search the web", output_type=SearchResult)def search(query: str) -> ToolResult: results = run_search(query) # Return the structured result serialized to JSON import json, dataclasses return ToolResult(result=json.dumps([dataclasses.asdict(r) for r in results]))The output_type dataclass must use field types that map to JSON Schema primitives (str, int, float, bool, list, dict, Optional[T]).
Cancellation
Check ctx.is_cancelled() periodically in long-running tools to stop early when the MCP host cancels the request.
from protomcp import tool, ToolResultfrom protomcp.context import ToolContext
@tool("Run a slow computation")def slow_compute(n: int, ctx: ToolContext) -> ToolResult: result = 0 for i in range(n): if ctx.is_cancelled(): return ToolResult(is_error=True, message="Cancelled by host") result += expensive_step(i) return ToolResult(result=str(result))Cancellation is cooperative — protomcp sets the cancelled flag and your tool is responsible for checking it. In-flight calls are not interrupted forcibly.
Testing tools
Since @tool() registers handlers in a global registry, test files should use clear_registry() between tests:
from protomcp.tool import clear_registry, get_registered_toolsfrom protomcp import tool, ToolResult
def test_add(): clear_registry()
@tool("Add two numbers") def add(a: int, b: int) -> ToolResult: return ToolResult(result=str(a + b))
tools = get_registered_tools() assert len(tools) == 1 assert tools[0].name == "add"
result = tools[0].handler(a=2, b=3) assert result.result == "5" assert not result.is_errorCall the handler directly — no need to run protomcp for unit tests.
Resources
Resources expose data that MCP clients can read. See the Resources guide for the full pattern.
from protomcp import resource, resource_template, ResourceContent
@resource(uri="config://app", description="App configuration")def app_config(uri: str) -> ResourceContent: return ResourceContent(uri=uri, text='{"debug": false}', mime_type="application/json")
@resource_template(uri_template="db://users/{user_id}", description="Read a user by ID")def read_user(uri: str) -> ResourceContent: user_id = uri.replace("db://users/", "") return ResourceContent(uri=uri, text=json.dumps(get_user(user_id)))Prompts
Prompts define reusable message templates. See the Prompts guide for the full pattern.
from protomcp import prompt, PromptArg, PromptMessage
@prompt( description="Summarize a topic", arguments=[PromptArg(name="topic", required=True)],)def summarize(topic: str) -> list[PromptMessage]: return [PromptMessage(role="user", content=f"Summarize {topic} briefly.")]Completions
Provide autocomplete for prompt and resource arguments. See Prompts guide — Completions.
from protomcp import completion, CompletionResult
@completion("ref/prompt", "summarize", "topic")def complete_topic(value: str) -> CompletionResult: topics = ["architecture", "performance", "security"] return CompletionResult(values=[t for t in topics if t.startswith(value)])Sampling
Request LLM calls from the MCP client. See the Sampling guide.
@tool("Translate text")def translate(ctx: ToolContext, text: str, language: str) -> ToolResult: response = ctx.sample( messages=[{"role": "user", "content": f"Translate to {language}: {text}"}], max_tokens=500, ) return ToolResult(result=response.get("content", ""))Tool Groups
Group related actions under a single tool using @tool_group on a class and @action on its methods.
from protomcp.group import tool_group, actionfrom protomcp import ToolResult
@tool_group("files", description="File operations", strategy="union")class FileTools: @action("read", description="Read a file") def read(self, path: str) -> ToolResult: return ToolResult(result=open(path).read())
@action("write", description="Write a file") def write(self, path: str, content: str) -> ToolResult: open(path, "w").write(content) return ToolResult(result=f"Wrote {path}")Strategy: union (default)
With strategy="union", the group registers as a single tool with a discriminated oneOf schema. The caller passes an action field to select the action.
Strategy: separate
With strategy="separate", each action becomes its own tool, namespaced as group.action (e.g. files.read, files.write).
@tool_group("files", strategy="separate")class FileTools: ...Dispatch and fuzzy matching
When an unknown action is passed, the dispatcher returns an error with a fuzzy “Did you mean?” suggestion based on close matches.
Per-action schema
Each @action generates its own JSON Schema from the method’s type hints, following the same rules as @tool().
Declarative Validation
Declare validation rules on @action() to validate input before your handler runs.
requires
Fail if a required field is missing or empty.
@action("deploy", requires=["env", "version"])def deploy(self, env: str, version: str) -> ToolResult: return ToolResult(result=f"Deployed {version} to {env}")enum_fields
Restrict a field to a set of valid values. Invalid values trigger a “Did you mean?” suggestion.
@action("set_env", enum_fields={"env": ["dev", "staging", "prod"]})def set_env(self, env: str) -> ToolResult: return ToolResult(result=f"Set to {env}")cross_rules
Validate relationships between parameters. Each rule is a (condition_fn, error_message) tuple — if the condition returns True, the error is raised.
@action("scale", cross_rules=[ (lambda args: args.get("min", 0) > args.get("max", 0), "min must be <= max"),])def scale(self, min: int, max: int) -> ToolResult: return ToolResult(result=f"Scaled {min}-{max}")hints
Non-blocking advisory messages appended to the result when a condition is met.
@action("query", hints={ "slow_warning": { "condition": lambda args: args.get("limit", 0) > 1000, "message": "Large limit may cause slow queries", },})def query(self, table: str, limit: int = 100) -> ToolResult: return ToolResult(result=f"Queried {table}")Server Context
Register resolvers that inject shared parameters into tool handlers automatically.
from protomcp.server_context import server_context
@server_context("project_dir", expose=False)def resolve_project_dir(args: dict) -> str: return os.getcwd()| Parameter | Type | Default | Description |
|---|---|---|---|
param_name | str | required | Name of the parameter to inject |
expose | bool | True | If False, the parameter is hidden from the tool schema |
When a tool handler declares a parameter matching param_name, protomcp calls the resolver and injects the returned value. With expose=False, the parameter does not appear in the JSON Schema sent to the MCP host.
Local Middleware
Wrap tool handlers with in-process middleware for cross-cutting concerns like logging, error formatting, or timing.
from protomcp.local_middleware import local_middleware
@local_middleware(priority=10)def timing_middleware(ctx, tool_name, args, next_handler): import time start = time.monotonic() result = next_handler(ctx, args) elapsed = time.monotonic() - start print(f"{tool_name} took {elapsed:.3f}s") return resultPriority chain
Middleware is sorted by priority (lowest first = outermost). A priority-10 middleware wraps a priority-100 middleware, which wraps the tool handler.
Short-circuit
Return a ToolResult directly from middleware to skip the handler entirely.
@local_middleware(priority=5)def auth_gate(ctx, tool_name, args, next_handler): if not is_authenticated(): return ToolResult(is_error=True, message="Not authenticated") return next_handler(ctx, args)Local vs Go-bridge middleware
Local middleware runs in-process in Python. Go-bridge middleware runs cross-process via the Go transport layer. Use local middleware for Python-only concerns; use Go-bridge middleware for transport-level concerns.
Telemetry
Observe tool calls with fail-safe telemetry sinks. Sinks receive events but cannot affect tool execution — exceptions in sinks are silently swallowed.
from protomcp.telemetry import telemetry_sink, ToolCallEvent
@telemetry_sinkdef log_events(event: ToolCallEvent): print(f"[{event.phase}] {event.tool_name}: {event.message}")ToolCallEvent phases
| Phase | When |
|---|---|
"start" | Before the handler runs |
"success" | After the handler returns successfully |
"error" | After the handler raises or returns an error |
"progress" | When report_progress is called |
Example: SQLite event store
@telemetry_sinkdef sqlite_store(event: ToolCallEvent): import sqlite3 conn = sqlite3.connect("telemetry.db") conn.execute( "INSERT INTO events (tool, phase, duration_ms) VALUES (?, ?, ?)", (event.tool_name, event.phase, event.duration_ms), ) conn.commit()Sidecar Management
Declare companion processes that protomcp manages alongside your server.
from protomcp.sidecar import sidecar
@sidecar( name="redis", command=["redis-server", "--port", "6380"], health_check="http://localhost:6380/ping", start_on="server_start", health_timeout=30.0,)def redis_sidecar(): pass| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | required | Unique sidecar identifier |
command | list[str] | required | Process command and arguments |
health_check | str | "" | URL to poll for health (HTTP 200 = healthy) |
start_on | str | "first_tool_call" | "server_start" or "first_tool_call" |
health_timeout | float | 30.0 | Seconds to wait for health check to pass |
Lifecycle
- PID management: PID files are stored in
~/.protomcp/sidecars/. - Graceful shutdown: On exit, processes receive
SIGTERMfollowed bySIGKILLif they do not stop within the shutdown timeout. - Health checks: If
health_checkis set, protomcp polls the URL until it returns 200 or the timeout expires.
Workflows
Workflows are server-defined state machines that guide an agent through a multi-step process. At each point in the workflow, the agent only sees the valid next steps — all other tools are hidden. This prevents the agent from skipping ahead or calling steps out of order.
Defining a workflow
Use the @workflow class decorator and @step method decorators to define a workflow. Each step is a method on the class.
from protomcp import workflow, step, StepResult, tool, ToolResult
@workflow("deploy", allow_during=["status"])class DeployWorkflow: def __init__(self): self.pr_url = None
@step(initial=True, next=["approve", "reject"], description="Review changes before deployment") def review(self, pr_url: str) -> StepResult: self.pr_url = pr_url return StepResult(result=f"Reviewing {pr_url}: 5 files changed")
@step(next=["run_tests"], description="Approve the changes for deployment") def approve(self, reason: str) -> StepResult: return StepResult(result=f"Approved: {reason}")
@step(terminal=True, description="Reject the changes") def reject(self, reason: str) -> StepResult: return StepResult(result=f"Rejected: {reason}")
@step(next=["promote", "rollback"], no_cancel=True, description="Run test suite against staging") def run_tests(self) -> StepResult: return StepResult(result="All 42 tests passed", next=["promote"])
@step(terminal=True, no_cancel=True, description="Deploy to production") def promote(self) -> StepResult: return StepResult(result=f"Deployed {self.pr_url} to production")
@step(terminal=True, description="Roll back staging deployment") def rollback(self) -> StepResult: return StepResult(result="Rolled back staging")@workflow parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | required | Unique workflow identifier. Steps are registered as name.step_name tools |
description | str | "" | Human-readable description |
allow_during | list[str] | None | Glob patterns for external tools visible during the workflow |
block_during | list[str] | None | Glob patterns for external tools hidden during the workflow |
@step parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | method name | Step name. Defaults to the decorated method’s name |
description | str | "" | Human-readable description |
initial | bool | False | Mark as the entry point. Exactly one step must be initial |
next | list[str] | None | Valid next step names. Required for non-terminal steps |
terminal | bool | False | Mark as an exit point. Terminal steps must not have next |
no_cancel | bool | False | Prevent cancellation while at this step |
allow_during | list[str] | None | Step-level visibility override (replaces workflow-level, does not merge) |
block_during | list[str] | None | Step-level block override (replaces workflow-level, does not merge) |
on_error | dict[type, str] | None | Map exception types to step names for error-driven transitions |
requires | list[str] | None | Required field names — validation fails if missing or empty |
enum_fields | dict[str, list] | None | Map of field name to valid values |
Step lifecycle
Every workflow must have exactly one initial=True step — this is the only step visible to the agent before the workflow starts. When the initial step is called, protomcp saves the current tool list, then restricts visibility to the declared next steps. This continues until a terminal=True step completes, at which point the original tool list is restored.
Dynamic next narrowing with StepResult
A step handler returns a StepResult. The next field on the result can narrow the set of valid next steps at runtime, but it can only be a subset of the statically declared next on the @step decorator.
@step(next=["promote", "rollback"], description="Run tests")def run_tests(self) -> StepResult: if all_tests_passed(): return StepResult(result="Tests passed", next=["promote"]) return StepResult(result="Tests failed", next=["rollback"])no_cancel for committed steps
Set no_cancel=True on a step to hide the cancel tool while the agent is choosing the next step. This is useful for steps that represent committed operations (e.g. a production deploy) where cancellation would leave the system in an inconsistent state.
Lifecycle hooks
Define on_cancel and on_complete methods on the workflow class to run cleanup or audit logic.
@workflow("deploy")class DeployWorkflow: # ... steps ...
def on_cancel(self, current_step, history): return f"Deploy cancelled at step '{current_step}'"
def on_complete(self, history): steps = " -> ".join(s[0] for s in history) print(f"[audit] Deploy complete: {steps}")Tool visibility: allow_during and block_during
By default, all external tools are hidden during a workflow. Use allow_during and block_during with glob patterns to control which external tools remain visible.
@workflow("deploy", allow_during=["status", "log_*"])class DeployWorkflow: ...Step-level allow_during / block_during replaces the workflow-level setting entirely (it does not merge). This lets individual steps expose a different set of external tools.
Error handling
By default, if a step handler raises an exception, the workflow stays in the current state and the agent can retry. Use on_error to define exception-driven transitions to other steps:
@step(next=["deploy"], on_error={TimeoutError: "rollback"}, description="Run smoke tests")def smoke_test(self) -> StepResult: run_smoke_tests() # may raise TimeoutError return StepResult(result="Smoke tests passed")Handler Discovery
Auto-discover handler files from a directory instead of importing them manually.
from protomcp.discovery import configure
configure(handlers_dir="./handlers", hot_reload=True)| Parameter | Type | Default | Description |
|---|---|---|---|
handlers_dir | str | "" | Path to the directory containing handler files |
hot_reload | bool | False | Re-import handlers on each discovery pass |
Behavior
- All
.pyfiles inhandlers_dirare imported automatically. - Files prefixed with
_(e.g._helpers.py) are skipped. - With
hot_reload=True, previously loaded modules are cleared and re-imported on each discovery pass.