AI

A Code Implementation for Designing Intelligent Multi-Agent Workflows with the BeeAI Framework

Beeai frameworkIn this tutorial, we explore strength and flexibility Beei-Framework By building a multi -agent system that works at full capacity from A to Z. Through the basic components, dedicated agents, tools, memory management, and events monitor, we walk to show how beei extends the development of smart agents and cooperation. Along the way, we explain how these agents can perform complex tasks, such as market research, code analysis, and strategic planning, using a standard style ready for production.

import subprocess
import sys
import asyncio
import json
from typing import Dict, List, Any, Optional
from datetime import datetime
import os


def install_packages():
    packages = [
        "beeai-framework",
        "requests",
        "beautifulsoup4",
        "numpy",
        "pandas",
        "pydantic"
    ]
   
    print("Installing required packages...")
    for package in packages:
        try:
            subprocess.check_call([sys.executable, "-m", "pip", "install", package])
            print(f"✅ {package} installed successfully")
        except subprocess.CalledProcessError as e:
            print(f"❌ Failed to install {package}: {e}")
    print("Installation complete!")


install_packages()


try:
    from beeai_framework import ChatModel
    from beeai_framework.agents import Agent
    from beeai_framework.tools import Tool
    from beeai_framework.workflows import Workflow
    BEEAI_AVAILABLE = True
    print("✅ BeeAI Framework imported successfully")
except ImportError as e:
    print(f"⚠️ BeeAI Framework import failed: {e}")
    print("Falling back to custom implementation...")
    BEEAI_AVAILABLE = False

We start installing all the required packages, including Beei-Framework, to ensure that our environment is ready for multi-agent development. Once installed, we try to import the basic units of Beeii. If import fails, we safely return to a dedicated application to maintain workflow jobs.

class MockChatModel:
    """Mock LLM for demonstration purposes"""
    def __init__(self, model_name: str = "mock-llm"):
        self.model_name = model_name
   
    async def generate(self, messages: List[Dict[str, str]]) -> str:
        """Generate a mock response"""
        last_message = messages[-1]['content'] if messages else ""
       
        if "market" in last_message.lower():
            return "Market analysis shows strong growth in AI frameworks with 42% YoY increase. Key competitors include LangChain, CrewAI, and AutoGen."
        elif "code" in last_message.lower():
            return "Code analysis reveals good structure with async patterns. Consider adding more error handling and documentation."
        elif "strategy" in last_message.lower():
            return "Strategic recommendation: Focus on ease of use, strong documentation, and enterprise features to compete effectively."
        else:
            return f"Analyzed: {last_message[:100]}... Recommendation: Implement best practices for scalability and maintainability."


class CustomTool:
    """Base class for custom tools"""
    def __init__(self, name: str, description: str):
        self.name = name
        self.description = description
   
    async def run(self, input_data: str) -> str:
        """Override this method in subclasses"""
        raise NotImplementedError

We define Mokchatmodel to simulate LLM behavior when Beeii is available, allowing us to test and functioning of the typical work without relying on external application programming facades. Besides, we create the CustomTool base category, which acts as a scheme for the task tools that our agents can use, and set the basis for the normative and directed agent capabilities of the tools.

class MarketResearchTool(CustomTool):
    """Custom tool for market research and competitor analysis"""
   
    def __init__(self):
        super().__init__(
            name="market_research",
            description="Analyzes market trends and competitor information"
        )
        self.market_data = {
            "AI_frameworks": {
                "competitors": ["LangChain", "CrewAI", "AutoGen", "Haystack", "Semantic Kernel"],
                "market_size": "$2.8B",
                "growth_rate": "42% YoY",
                "key_trends": ["Multi-agent systems", "Production deployment", "Tool integration", "Enterprise adoption"]
            },
            "enterprise_adoption": {
                "rate": "78%",
                "top_use_cases": ["Customer support", "Data analysis", "Code generation", "Document processing"],
                "challenges": ["Reliability", "Cost control", "Integration complexity", "Governance"]
            }
        }
   
    async def run(self, query: str) -> str:
        """Simulate market research based on query"""
        query_lower = query.lower()
       
        if "competitor" in query_lower or "competition" in query_lower:
            data = self.market_data["AI_frameworks"]
            return f"""Market Analysis Results:
           
Key Competitors: {', '.join(data['competitors'])}
Market Size: {data['market_size']}
Growth Rate: {data['growth_rate']}
Key Trends: {', '.join(data['key_trends'])}


Recommendation: Focus on differentiating features like simplified deployment, better debugging tools, and enterprise-grade security."""
       
        elif "adoption" in query_lower or "enterprise" in query_lower:
            data = self.market_data["enterprise_adoption"]
            return f"""Enterprise Adoption Analysis:
           
Adoption Rate: {data['rate']}
Top Use Cases: {', '.join(data['top_use_cases'])}
Main Challenges: {', '.join(data['challenges'])}


Recommendation: Address reliability and cost control concerns through better monitoring and resource management features."""
       
        else:
            return "Market research available for: competitor analysis, enterprise adoption, or specific trend analysis. Please specify your focus area."

We carry out Marketressechtool as a specialized extension of our CUSTOMTOOL base category. This tool mimics market intelligence in the real world by pre -defined visions about the trends of artificial intelligence, main competitors, adoption rates, and industry challenges. However, we prepare our agents to provide informed recommendations that depend on data while implementing the workflow.

class CodeAnalysisTool(CustomTool):
    """Custom tool for analyzing code patterns and suggesting improvements"""
   
    def __init__(self):
        super().__init__(
            name="code_analysis",
            description="Analyzes code structure and suggests improvements"
        )
   
    async def run(self, code_snippet: str) -> str:
        """Analyze code and provide insights"""
        analysis = {
            "lines": len(code_snippet.split('\n')),
            "complexity": "High" if len(code_snippet) > 500 else "Medium" if len(code_snippet) > 200 else "Low",
            "async_usage": "Yes" if "async" in code_snippet or "await" in code_snippet else "No",
            "error_handling": "Present" if "try:" in code_snippet or "except:" in code_snippet else "Missing",
            "documentation": "Good" if '"""' in code_snippet or "'''" in code_snippet else "Needs improvement",
            "imports": "Present" if "import " in code_snippet else "None detected",
            "classes": len([line for line in code_snippet.split('\n') if line.strip().startswith('class ')]),
            "functions": len([line for line in code_snippet.split('\n') if line.strip().startswith('def ') or line.strip().startswith('async def ')])
        }
       
        suggestions = []
        if analysis["error_handling"] == "Missing":
            suggestions.append("Add try-except blocks for error handling")
        if analysis["documentation"] == "Needs improvement":
            suggestions.append("Add docstrings and comments")
        if "print(" in code_snippet:
            suggestions.append("Consider using proper logging instead of print statements")
        if analysis["async_usage"] == "Yes" and "await" not in code_snippet:
            suggestions.append("Ensure proper await usage with async functions")
        if analysis["complexity"] == "High":
            suggestions.append("Consider breaking down into smaller functions")
       
        return f"""Code Analysis Report:
       
Structure:
- Lines of code: {analysis['lines']}
- Complexity: {analysis['complexity']}
- Classes: {analysis['classes']}
- Functions: {analysis['functions']}


Quality Metrics:
- Async usage: {analysis['async_usage']}
- Error handling: {analysis['error_handling']}
- Documentation: {analysis['documentation']}


Suggestions:
{chr(10).join(f"• {suggestion}" for suggestion in suggestions) if suggestions else "• Code looks good! Following best practices."}


Overall Score: {10 - len(suggestions) * 2}/10"""


class CustomAgent:
    """Custom agent implementation"""
   
    def __init__(self, name: str, role: str, instructions: str, tools: List[CustomTool], llm=None):
        self.name = name
        self.role = role
        self.instructions = instructions
        self.tools = tools
        self.llm = llm or MockChatModel()
        self.memory = []
   
    async def run(self, task: str) -> Dict[str, Any]:
        """Execute agent task"""
        print(f"🤖 {self.name} ({self.role}) processing task...")
       
        self.memory.append({"type": "task", "content": task, "timestamp": datetime.now()})
       
        task_lower = task.lower()
        tool_used = None
        tool_result = None
       
        for tool in self.tools:
            if tool.name == "market_research" and ("market" in task_lower or "competitor" in task_lower):
                tool_result = await tool.run(task)
                tool_used = tool.name
                break
            elif tool.name == "code_analysis" and ("code" in task_lower or "analyze" in task_lower):
                tool_result = await tool.run(task)
                tool_used = tool.name
                break
       
        messages = [
            {"role": "system", "content": f"You are {self.role}. {self.instructions}"},
            {"role": "user", "content": task}
        ]
       
        if tool_result:
            messages.append({"role": "system", "content": f"Tool {tool_used} provided: {tool_result}"})
       
        response = await self.llm.generate(messages)
       
        self.memory.append({"type": "response", "content": response, "timestamp": datetime.now()})
       
        return {
            "agent": self.name,
            "task": task,
            "tool_used": tool_used,
            "tool_result": tool_result,
            "response": response,
            "success": True
        }

We are now applying Codeanalysistool, which enables our agents to evaluate the code of code based on the structure, complexity, documentation and error treatment. This tool creates insightful suggestions to improve the quality of the code. We also select the Customagent category, prepare each agent in its own role, instructions, memory, tools, and access to LLM. This design allows each agent to determine whether an intelligent tool is needed and then assembly of responses using both LLM analysis and thinking, ensuring adaptable behavior and a shield in the context.

class WorkflowMonitor:
    """Monitor and log workflow events"""
   
    def __init__(self):
        self.events = []
        self.start_time = datetime.now()
   
    def log_event(self, event_type: str, data: Dict[str, Any]):
        """Log workflow events"""
        timestamp = datetime.now()
        self.events.append({
            "timestamp": timestamp,
            "duration": (timestamp - self.start_time).total_seconds(),
            "event_type": event_type,
            "data": data
        })
        print(f"[{timestamp.strftime('%H:%M:%S')}] {event_type}: {data.get('agent', 'System')}")
   
    def get_summary(self):
        """Get monitoring summary"""
        return {
            "total_events": len(self.events),
            "total_duration": (datetime.now() - self.start_time).total_seconds(),
            "event_types": list(set([e["event_type"] for e in self.events])),
            "events": self.events
        }


class CustomWorkflow:
    """Custom workflow implementation"""
   
    def __init__(self, name: str, description: str):
        self.name = name
        self.description = description
        self.agents = []
        self.monitor = WorkflowMonitor()
   
    def add_agent(self, agent: CustomAgent):
        """Add agent to workflow"""
        self.agents.append(agent)
        self.monitor.log_event("agent_added", {"agent": agent.name, "role": agent.role})
   
    async def run(self, tasks: List[str]) -> Dict[str, Any]:
        """Execute workflow with tasks"""
        self.monitor.log_event("workflow_started", {"tasks": len(tasks)})
       
        results = []
        context = {"shared_insights": []}
       
        for i, task in enumerate(tasks):
            agent = self.agents[i % len(self.agents)]
           
            if context["shared_insights"]:
                enhanced_task = f"{task}\n\nContext from previous analysis:\n" + "\n".join(context["shared_insights"][-2:])
            else:
                enhanced_task = task
           
            result = await agent.run(enhanced_task)
            results.append(result)
           
            context["shared_insights"].append(f"{agent.name}: {result['response'][:200]}...")
           
            self.monitor.log_event("task_completed", {
                "agent": agent.name,
                "task_index": i,
                "success": result["success"]
            })
       
        self.monitor.log_event("workflow_completed", {"total_tasks": len(tasks)})
       
        return {
            "workflow": self.name,
            "results": results,
            "context": context,
            "summary": self._generate_summary(results)
        }
   
    def _generate_summary(self, results: List[Dict[str, Any]]) -> str:
        """Generate workflow summary"""
        summary_parts = []
       
        for result in results:
            summary_parts.append(f"• {result['agent']}: {result['response'][:150]}...")
       
        return f"""Workflow Summary for {self.name}:


{chr(10).join(summary_parts)}


Key Insights:
• Market opportunities identified in AI framework space
• Technical architecture recommendations provided
• Strategic implementation plan outlined
• Multi-agent collaboration demonstrated successfully"""

We implement the workflow to register and track events during implementation, which allows us to see in the actual time in the measures taken by each agent. With the CUSTOMWORKFlow category, we organize the entire multi -agent process, devote tasks, maintain the common context through agents, and capture all relevant ideas. This structure guarantees not only carrying out tasks in a coordinated and transparent manner, but we also generate a comprehensive summary that highlights cooperation and the main results.

async def advanced_workflow_demo():
    """Demonstrate advanced multi-agent workflow"""
   
    print("🚀 Advanced Multi-Agent Workflow Demo")
    print("=" * 50)
   
    workflow = CustomWorkflow(
        name="Advanced Business Intelligence System",
        description="Multi-agent system for comprehensive business analysis"
    )
   
    market_agent = CustomAgent(
        name="MarketAnalyst",
        role="Senior Market Research Analyst",
        instructions="Analyze market trends, competitor landscape, and business opportunities. Provide data-driven insights with actionable recommendations.",
        tools=[MarketResearchTool()],
        llm=MockChatModel()
    )
   
    tech_agent = CustomAgent(
        name="TechArchitect",
        role="Technical Architecture Specialist",
        instructions="Evaluate technical solutions, code quality, and architectural decisions. Focus on scalability, maintainability, and best practices.",
        tools=[CodeAnalysisTool()],
        llm=MockChatModel()
    )
   
    strategy_agent = CustomAgent(
        name="StrategicPlanner",
        role="Strategic Business Planner",
        instructions="Synthesize market and technical insights into comprehensive strategic recommendations. Focus on ROI, risk assessment, and implementation roadmaps.",
        tools=[],
        llm=MockChatModel()
    )
   
    workflow.add_agent(market_agent)
    workflow.add_agent(tech_agent)
    workflow.add_agent(strategy_agent)
   
    tasks = [
        "Analyze the current AI framework market landscape and identify key opportunities for a new multi-agent framework targeting enterprise users.",
        """Analyze this code architecture pattern and provide technical assessment:


async def multi_agent_workflow():
    agents = [ResearchAgent(), AnalysisAgent(), SynthesisAgent()]
    context = SharedContext()
   
    for agent in agents:
        try:
            result = await agent.run(context.get_task())
            if result.success:
                context.add_insight(result.data)
            else:
                context.add_error(result.error)
        except Exception as e:
            logger.error(f"Agent {agent.name} failed: {e}")
           
    return context.synthesize_recommendations()""",
        "Based on the market analysis and technical assessment, create a comprehensive strategic plan for launching a competitive AI framework with focus on multi-agent capabilities and enterprise adoption."
    ]
   
    print("\n🔄 Executing Advanced Workflow...")
    result = await workflow.run(tasks)
   
    print("\n✅ Workflow Completed Successfully!")
    print("=" * 50)
    print("📊 COMPREHENSIVE ANALYSIS RESULTS")
    print("=" * 50)
    print(result["summary"])
   
    print("\n📈 WORKFLOW MONITORING SUMMARY")
    print("=" * 30)
    summary = workflow.monitor.get_summary()
    print(f"Total Events: {summary['total_events']}")
    print(f"Total Duration: {summary['total_duration']:.2f} seconds")
    print(f"Event Types: {', '.join(summary['event_types'])}")
   
    return workflow, result


async def simple_tool_demo():
    """Demonstrate individual tool functionality"""
   
    print("\n🛠️ Individual Tool Demo")
    print("=" * 30)
   
    market_tool = MarketResearchTool()
    code_tool = CodeAnalysisTool()
   
    print("Available Tools:")
    print(f"• {market_tool.name}: {market_tool.description}")
    print(f"• {code_tool.name}: {code_tool.description}")
   
    print("\n🔍 Market Research Analysis:")
    market_result = await market_tool.run("competitor analysis in AI frameworks")
    print(market_result)
   
    print("\n🔍 Code Analysis:")
    sample_code=""'
import asyncio
from typing import List, Dict


class AgentManager:
    """Manages multiple AI agents"""
   
    def __init__(self):
        self.agents = []
        self.results = []
   
    async def add_agent(self, agent):
        """Add agent to manager"""
        self.agents.append(agent)
   
    async def run_all(self, task: str) -> List[Dict]:
        """Run task on all agents"""
        results = []
        for agent in self.agents:
            try:
                result = await agent.execute(task)
                results.append(result)
            except Exception as e:
                print(f"Agent failed: {e}")
                results.append({"error": str(e)})
        return results
'''
   
    code_result = await code_tool.run(sample_code)
    print(code_result)

We show two strong workflow tasks. First, in the display of the individual tool, we directly test the possibilities of Marketresearchtool and Codeanalysistool, ensuring the generation of relevant visions independently. Next, we collect everything together in the advanced demonstration of the workflow, where we publish three specialized agents, Marketanalst analyst, Techachitect, and StrateGicPlanner, to address business analysis tasks.

async def main():
    """Main demo function"""
   
    print("🐝 Advanced BeeAI Framework Tutorial")
    print("=" * 40)
    print("This tutorial demonstrates:")
    print("• Multi-agent workflows")
    print("• Custom tool development")
    print("• Memory management")
    print("• Event monitoring")
    print("• Production-ready patterns")
   
    if BEEAI_AVAILABLE:
        print("• Using real BeeAI Framework")
    else:
        print("• Using custom implementation (BeeAI not available)")
   
    print("=" * 40)
   
    await simple_tool_demo()
   
    print("\n" + "="*50)
    await advanced_workflow_demo()
   
    print("\n🎉 Tutorial Complete!")
    print("\nNext Steps:")
    print("1. Install BeeAI Framework properly: pip install beeai-framework")
    print("2. Configure your preferred LLM (OpenAI, Anthropic, local models)")
    print("3. Explore the official BeeAI documentation")
    print("4. Build custom agents for your specific use case")
    print("5. Deploy to production with proper monitoring")


if __name__ == "__main__":
    try:
        import nest_asyncio
        nest_asyncio.apply()
        print("✅ Applied nest_asyncio for Colab compatibility")
    except ImportError:
        print("⚠️ nest_asyncio not available - may not work in some environments")
   
    asyncio.run(main())

We conclude our tutorial with the main job (), which links everything we created, which indicates all the capabilities of the level of tools and the functioning of multi -agent business intelligence. Whether we run Beeai originally or with a backup setting, we guarantee compatibility with environments such as Google Colab using Nest_asyncio. With this structure, we are ready to expand the scope of our agent systems, explore deeper use cases, and spread AI’s work progress ready for production.

In conclusion, we have built and implemented a strong multi -agent workflow using the beei framework (or dedicated equivalent), and presented its potential in business intelligence applications in the real world. We have seen how easy to create agents with specific roles, attach tools to increase tasks, and monitor implementation in a transparent manner.


verify Symbols. All the credit for this research goes to researchers in this project. Also, do not hesitate to follow us twitterand YouTube and Spotify And do not forget to join 100K+ ML Subreddit And subscribe to Our newsletter.


Asif Razzaq is the CEO of Marktechpost Media Inc .. As a pioneer and vision engineer, ASIF is committed to harnessing the potential of artificial intelligence for social goodness. His last endeavor is to launch the artificial intelligence platform, Marktechpost, which highlights its in -depth coverage of machine learning and deep learning news, which is technically sound and can be easily understood by a wide audience. The platform is proud of more than 2 million monthly views, which shows its popularity among the masses.

Don’t miss more hot News like this! Click here to discover the latest in AI news!

2025-07-08 06:51:00

Related Articles

Back to top button