AI

A Coding Guide to Build a Production-Ready Asynchronous Python SDK with Rate Limiting, In-Memory Caching, and Authentication

In this tutorial, we direct users by building a powerful and ready Python SDK production. It begins to show how to install and form an basic innovative HTTP libraries (AIOHTTP, Nest-Easyncio). Then it goes through the implementation of the basic ingredients, including structured response objects, limit the distinctive symbol rate, memory interim storage with TTL, and a clean design, driven by Dataclass. We’ll see how these pieces are wrapped in an advanced category that supports ASYNC context management, automatic behavior of the specified/waiting for a rate, injection of JSON/ATH heads, and comfortable HTTP-Verb methods. Along the way, harnessing the experimental offer against JSONPLALACEHELEDER causes the cache efficiency, bringing the batch with average boundaries, processing errors, and even showing how to extend SDK in a fluent “creative” pattern for the allocated configuration.

import asyncio
import aiohttp
import time
import json
from typing import Dict, List, Optional, Any, Union
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
import hashlib
import logging


!pip install aiohttp nest-asyncio

We install and configure the simultaneous operating time by importing ASYNCIO and AIOHTTP, as well as timing aid, JSON processing, DATACLASS modeling, temporary storage (via Hashlib and Datime), and organized registration. The AIOHTTP Nest-ESYNCIO installation line guarantees! The notebook can run a spying episode inside the colum, which allows strong HTTP requests and the functioning of limited prices.

@dataclass
class APIResponse:
    """Structured response object"""
    data: Any
    status_code: int
    headers: Dict[str, str]
    timestamp: datetime
   
    def to_dict(self) -> Dict:
        return asdict(self)

ApireSponse Dataclass envelops HTTP response details, useful loads (data), status code, heads, and timeline to retrieve in one written object. The assistant turns to_dict () the sympathetic to a normal dictionary for easy registration, sequence, or estuaric treatment.

class RateLimiter:
    """Token bucket rate limiter"""
    def __init__(self, max_calls: int = 100, time_window: int = 60):
        self.max_calls = max_calls
        self.time_window = time_window
        self.calls = []
   
    def can_proceed(self) -> bool:
        now = time.time()
        self.calls = [call_time for call_time in self.calls if now - call_time < self.time_window]
       
        if len(self.calls) < self.max_calls:
            self.calls.append(now)
            return True
        return False
   
    def wait_time(self) -> float:
        if not self.calls:
            return 0
        return max(0, self.time_window - (time.time() - self.calls[0]))

The Ratelimiter category imposes a simple policy of the distinctive symbol by tracking timelines for modern calls and allowing Max_calls to happen during the trading time. Upon reaching the limit, can_proceed () calculates a mistake, and is calculated by wait_time.

class Cache:
    """Simple in-memory cache with TTL"""
    def __init__(self, default_ttl: int = 300):
        self.cache = {}
        self.default_ttl = default_ttl
   
    def _generate_key(self, method: str, url: str, params: Dict = None) -> str:
        key_data = f"{method}:{url}:{json.dumps(params or {}, sort_keys=True)}"
        return hashlib.md5(key_data.encode()).hexdigest()
   
    def get(self, method: str, url: str, params: Dict = None) -> Optional[APIResponse]:
        key = self._generate_key(method, url, params)
        if key in self.cache:
            response, expiry = self.cache[key]
            if datetime.now() < expiry:
                return response
            del self.cache[key]
        return None
   
    def set(self, method: str, url: str, response: APIResponse, params: Dict = None, ttl: int = None):
        key = self._generate_key(method, url, params)
        expiry = datetime.now() + timedelta(seconds=ttl or self.default_ttl)
        self.cache[key] = (response, expiry)

The category of catering memory provides lightweight TTL storage in memory for API responses by dividing the request signature (method, URL, PARAMS) at a unique key. It restores the hidden APIRESPONSE creatures valid before the expiration and expels old entries automatically after the end of their time.

class AdvancedSDK:
    """Advanced SDK with modern Python patterns"""
   
    def __init__(self, base_url: str, api_key: str = None, rate_limit: int = 100):
        self.base_url = base_url.rstrip('/')
        self.api_key = api_key
        self.session = None
        self.rate_limiter = RateLimiter(max_calls=rate_limit)
        self.cache = Cache()
        self.logger = self._setup_logger()
       
    def _setup_logger(self) -> logging.Logger:
        logger = logging.getLogger(f"SDK-{id(self)}")
        if not logger.handlers:
            handler = logging.StreamHandler()
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            handler.setFormatter(formatter)
            logger.addHandler(handler)
            logger.setLevel(logging.INFO)
        return logger
   
    async def __aenter__(self):
        """Async context manager entry"""
        self.session = aiohttp.ClientSession()
        return self
   
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Async context manager exit"""
        if self.session:
            await self.session.close()
   
    def _get_headers(self) -> Dict[str, str]:
        headers = {'Content-Type': 'application/json'}
        if self.api_key:
            headers['Authorization'] = f'Bearer {self.api_key}'
        return headers
   
    async def _make_request(self, method: str, endpoint: str, params: Dict = None,
                          data: Dict = None, use_cache: bool = True) -> APIResponse:
        """Core request method with rate limiting and caching"""
       
        if use_cache and method.upper() == 'GET':
            cached = self.cache.get(method, endpoint, params)
            if cached:
                self.logger.info(f"Cache hit for {method} {endpoint}")
                return cached
       
        if not self.rate_limiter.can_proceed():
            wait_time = self.rate_limiter.wait_time()
            self.logger.warning(f"Rate limit hit, waiting {wait_time:.2f}s")
            await asyncio.sleep(wait_time)
       
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
       
        try:
            async with self.session.request(
                method=method.upper(),
                url=url,
                params=params,
                json=data,
                headers=self._get_headers()
            ) as resp:
                response_data = await resp.json() if resp.content_type == 'application/json' else await resp.text()
               
                api_response = APIResponse(
                    data=response_data,
                    status_code=resp.status,
                    headers=dict(resp.headers),
                    timestamp=datetime.now()
                )
               
                if use_cache and method.upper() == 'GET' and 200 <= resp.status < 300:
                    self.cache.set(method, endpoint, api_response, params)
               
                self.logger.info(f"{method.upper()} {endpoint} - Status: {resp.status}")
                return api_response
               
        except Exception as e:
            self.logger.error(f"Request failed: {str(e)}")
            raise
   
    async def get(self, endpoint: str, params: Dict = None, use_cache: bool = True) -> APIResponse:
        return await self._make_request('GET', endpoint, params=params, use_cache=use_cache)
   
    async def post(self, endpoint: str, data: Dict = None) -> APIResponse:
        return await self._make_request('POST', endpoint, data=data, use_cache=False)
   
    async def put(self, endpoint: str, data: Dict = None) -> APIResponse:
        return await self._make_request('PUT', endpoint, data=data, use_cache=False)
   
    async def delete(self, endpoint: str) -> APIResponse:
        return await self._make_request('DELETE', endpoint, use_cache=False)

The AdvanceDsdk category wraps everything together in a clean, simultaneous customer: it runs the AIOHTTP session via the ASYNC context managers, the JSON and Auth heads, and the Ratelimiter and Cache format under the cap. Its _Make_request method works on Get/Post/PUT/Delete, processing searches for cache, waiting for average, error registration, and filling in response to ApireSpond objects, while the Get/Post/Delete auxiliaries give us comfortable, high -level calls.

async def demo_sdk():
    """Demonstrate SDK capabilities"""
    print("🚀 Advanced SDK Demo")
    print("=" * 50)
   
    async with AdvancedSDK("https://jsonplaceholder.typicode.com") as sdk:
       
        print("\n📥 Testing GET request with caching...")
        response1 = await sdk.get("/posts/1")
        print(f"First request - Status: {response1.status_code}")
        print(f"Title: {response1.data.get('title', 'N/A')}")
       
        response2 = await sdk.get("/posts/1")
        print(f"Second request (cached) - Status: {response2.status_code}")
       
        print("\n📤 Testing POST request...")
        new_post = {
            "title": "Advanced SDK Tutorial",
            "body": "This SDK demonstrates modern Python patterns",
            "userId": 1
        }
        post_response = await sdk.post("/posts", data=new_post)
        print(f"POST Status: {post_response.status_code}")
        print(f"Created post ID: {post_response.data.get('id', 'N/A')}")
       
        print("\n⚡ Testing batch requests with rate limiting...")
        tasks = []
        for i in range(1, 6):
            tasks.append(sdk.get(f"/posts/{i}"))
       
        results = await asyncio.gather(*tasks)
        print(f"Batch completed: {len(results)} requests")
        for i, result in enumerate(results, 1):
            print(f"  Post {i}: {result.data.get('title', 'N/A')[:30]}...")
       
        print("\n❌ Testing error handling...")
        try:
            error_response = await sdk.get("/posts/999999")
            print(f"Error response status: {error_response.status_code}")
        except Exception as e:
            print(f"Handled error: {type(e).__name__}")
   
    print("\n✅ Demo completed successfully!")


async def run_demo():
  """Colab-friendly demo runner"""
  await demo_sdk()

Demo_sdk Coroutine runs through the basic features of SDK, where a request for a simulation was temporarily issued, a publication performance, a set of reduction in price reduction, error processing, for the JSONPLACEHELEDER Application interface, print case symbols and sample data to clarify all the possibility. Helper Run_demo guarantees this Smoothly illustrated presentation within the current event

import nest_asyncio
nest_asyncio.apply()


if __name__ == "__main__":
    try:
        asyncio.run(demo_sdk())
    except RuntimeError:
        loop = asyncio.get_event_loop()
        loop.run_until_complete(demo_sdk())


class SDKBuilder:
    """Builder pattern for SDK configuration"""
    def __init__(self, base_url: str):
        self.base_url = base_url
        self.config = {}
   
    def with_auth(self, api_key: str):
        self.config['api_key'] = api_key
        return self
   
    def with_rate_limit(self, calls_per_minute: int):
        self.config['rate_limit'] = calls_per_minute
        return self
   
    def build(self) -> AdvancedSDK:
        return AdvancedSDK(self.base_url, **self.config)

Finally, we apply Nest_asyncio to enable the interlocking episodes in the colum, then we manage the illustration via Asyncio.run (with a disability of the implementation of the manual loop if necessary). It also offers the SDKBUILDer category that enables the construction pattern fluently to create and create AdvanceDsdk with customized authentication and specific settings of the rate.

In conclusion, this SDK tutorial provides a basis for any comfortable integration, combining modern Python expressions (Dataclasse glasses, ASYNC/AWAIT, context managers) with practical tools (average, cache, organized trees). By adapting the patterns shown here, especially the separation of fears between the coincidence of demand, the temporary storage, and the mixture of response, the teams can accelerate the development of new API customers while ensuring the possibility of prediction, observation, and flexibility.


verify Symbols. All the credit for this research goes to researchers in this project. Also, do not hesitate to follow us twitter And do not forget to join 100K+ ML Subreddit And subscribe to Our newsletter.


SANA Hassan, consultant coach at Marktechpost and a double -class student in Iit Madras, is excited to apply technology and AI to face challenges in the real world. With great interest in solving practical problems, it brings a new perspective to the intersection of artificial intelligence and real life solutions.

Don’t miss more hot News like this! Click here to discover the latest in AI news!

2025-06-23 22:24:00

Related Articles

Back to top button