How to Build a Stateless, Secure, and Asynchronous MCP-Style Protocol for Scalable Agent Workflows
In this tutorial, we build a clear, advanced view of modern MCP design by focusing on three core ideas: stateless communication, strict SDK-level validation, and asynchronous and long-running operations. We implement a minimal MCP-like protocol using structured envelopes, signed requests, and Pydantic validation tools to show how agents and services can interact securely without relying on persistent sessions. verify Full codes here.
import asyncio, time, json, uuid, hmac, hashlib
from dataclasses import dataclass
from typing import Any, Dict, Optional, Literal, List
from pydantic import BaseModel, Field, ValidationError, ConfigDict
def _now_ms():
return int(time.time() * 1000)
def _uuid():
return str(uuid.uuid4())
def _canonical_json(obj):
return json.dumps(obj, separators=(",", ":"), sort_keys=True).encode()
def _hmac_hex(secret, payload):
return hmac.new(secret, _canonical_json(payload), hashlib.sha256).hexdigest()
We set up the basic utilities required across the entire system, including time utilities, UUID generation, basic JSON serialization, and cryptographic signing. We ensure that all requests and responses can be signed and verified critically using HMAC. verify Full codes here.
class MCPEnvelope(BaseModel):
model_config = ConfigDict(extra="forbid")
v: Literal["mcp/0.1"] = "mcp/0.1"
request_id: str = Field(default_factory=_uuid)
ts_ms: int = Field(default_factory=_now_ms)
client_id: str
server_id: str
tool: str
args: Dict[str, Any] = Field(default_factory=dict)
nonce: str = Field(default_factory=_uuid)
signature: str
class MCPResponse(BaseModel):
model_config = ConfigDict(extra="forbid")
v: Literal["mcp/0.1"] = "mcp/0.1"
request_id: str
ts_ms: int = Field(default_factory=_now_ms)
ok: bool
server_id: str
status: Literal["ok", "accepted", "running", "done", "error"]
result: Optional[Dict[str, Any]] = None
error: Optional[str] = None
signature: str
We define the structured MCP envelope and response formats that each interaction follows. We enforce strict schemas with Pydantic to ensure that distorted or unexpected fields are rejected early. It ensures consistent contracts between clients and servers, which is critical for SDK standardization. verify Full codes here.
class ServerIdentityOut(BaseModel):
model_config = ConfigDict(extra="forbid")
server_id: str
fingerprint: str
capabilities: Dict[str, Any]
class BatchSumIn(BaseModel):
model_config = ConfigDict(extra="forbid")
numbers: List[float] = Field(min_length=1)
class BatchSumOut(BaseModel):
model_config = ConfigDict(extra="forbid")
count: int
total: float
class StartLongTaskIn(BaseModel):
model_config = ConfigDict(extra="forbid")
seconds: int = Field(ge=1, le=20)
payload: Dict[str, Any] = Field(default_factory=dict)
class PollJobIn(BaseModel):
model_config = ConfigDict(extra="forbid")
job_id: str
We declare validated input and output models for each widget that the server exposes. We use Pydantic constraints to explicitly express what each tool accepts and returns. It makes the tool’s behavior predictable and safe, even when called by LLM-driven agents. verify Full codes here.
@dataclass
class JobState:
job_id: str
status: str
result: Optional[Dict[str, Any]] = None
error: Optional[str] = None
class MCPServer:
def __init__(self, server_id, secret):
self.server_id = server_id
self.secret = secret
self.jobs = {}
self.tasks = {}
def _fingerprint(self):
return hashlib.sha256(self.secret).hexdigest()[:16]
async def handle(self, env_dict, client_secret):
env = MCPEnvelope(**env_dict)
payload = env.model_dump()
sig = payload.pop("signature")
if _hmac_hex(client_secret, payload) != sig:
return {"error": "bad signature"}
if env.tool == "server_identity":
out = ServerIdentityOut(
server_id=self.server_id,
fingerprint=self._fingerprint(),
capabilities={"async": True, "stateless": True},
)
resp = MCPResponse(
request_id=env.request_id,
ok=True,
server_id=self.server_id,
status="ok",
result=out.model_dump(),
signature="",
)
elif env.tool == "batch_sum":
args = BatchSumIn(**env.args)
out = BatchSumOut(count=len(args.numbers), total=sum(args.numbers))
resp = MCPResponse(
request_id=env.request_id,
ok=True,
server_id=self.server_id,
status="ok",
result=out.model_dump(),
signature="",
)
elif env.tool == "start_long_task":
args = StartLongTaskIn(**env.args)
jid = _uuid()
self.jobs[jid] = JobState(jid, "running")
async def run():
await asyncio.sleep(args.seconds)
self.jobs[jid].status = "done"
self.jobs[jid].result = args.payload
self.tasks[jid] = asyncio.create_task(run())
resp = MCPResponse(
request_id=env.request_id,
ok=True,
server_id=self.server_id,
status="accepted",
result={"job_id": jid},
signature="",
)
elif env.tool == "poll_job":
args = PollJobIn(**env.args)
job = self.jobs[args.job_id]
resp = MCPResponse(
request_id=env.request_id,
ok=True,
server_id=self.server_id,
status=job.status,
result=job.result,
signature="",
)
payload = resp.model_dump()
resp.signature = _hmac_hex(self.secret, payload)
return resp.model_dump()
We implement a stateless MCP server along with its asynchronous task management logic. We handle request verification, artifact dispatch, and long-term task execution without relying on session state. By returning job IDs and allowing polling, we offer scalable, non-blocking job execution. verify Full codes here.
class MCPClient:
def __init__(self, client_id, secret, server):
self.client_id = client_id
self.secret = secret
self.server = server
async def call(self, tool, args=None):
env = MCPEnvelope(
client_id=self.client_id,
server_id=self.server.server_id,
tool=tool,
args=args or {},
signature="",
).model_dump()
env["signature"] = _hmac_hex(self.secret, {k: v for k, v in env.items() if k != "signature"})
return await self.server.handle(env, self.secret)
async def demo():
server_secret = b"server_secret"
client_secret = b"client_secret"
server = MCPServer("mcp-server-001", server_secret)
client = MCPClient("client-001", client_secret, server)
print(await client.call("server_identity"))
print(await client.call("batch_sum", {"numbers": [1, 2, 3]}))
start = await client.call("start_long_task", {"seconds": 2, "payload": {"task": "demo"}})
jid = start["result"]["job_id"]
while True:
poll = await client.call("poll_job", {"job_id": jid})
if poll["status"] == "done":
print(poll)
break
await asyncio.sleep(0.5)
await demo()
We are building a lightweight, stateless client that signs every request and interacts with the server through structured envelopes. We show synchronous calls, input validation failures, and asynchronous task polling in a single flow. It demonstrates how clients can reliably consume MCP style services in real proxy pipelines.
In conclusion, we have shown how MCP evolves from a simple interface for tool calls to a powerful protocol suitable for real-world systems. We start tasks asynchronously and poll results without blocking execution, enforce clear contracts through schema validation, and rely on stateless signed messages to maintain security and resiliency. Together, these patterns demonstrate how modern MCP-style systems support reliable, enterprise-ready agent workflows while maintaining simplicity, transparency, and ease of scaling.
verify Full codes here. Also, feel free to follow us on twitter Don’t forget to join us 100k+ mil SubReddit And subscribe to Our newsletter. I am waiting! Are you on telegram? Now you can join us on Telegram too.
Asif Razzaq is CEO of Marktechpost Media Inc. As a visionary entrepreneur and engineer, Asif is committed to harnessing the potential of AI for social good. His most recent endeavor is the launch of the AI media platform, Marktechpost, which features in-depth coverage of machine learning and deep learning news that is technically sound and easy to understand by a broad audience. The platform has more than 2 million views per month, which shows its popularity among the masses.
Don’t miss more hot News like this! Click here to discover the latest in AI news!
2026-01-14 21:31:00



