A Step-by-Step Guide to Build an Automated Knowledge Graph Pipeline Using LangGraph and NetworkX

In this tutorial, we explain how to create an automatic knowledge pipeline (KG) using Langgraph and NetworkX. The pipeline simulates a series of smart factors that carry out cooperative tasks such as data collection, entity extraction, identification of the relationship, entity solution, and verify the validity of the graph. Starting with the subject of the user, such as “artificial intelligence”, the system systematically extracts related entities and relationships, repetition, and merges information into a coherent graphic structure. Through the perception of the final graph of knowledge, developers and data scientists acquire clear visions about complex mutual relationships between concepts, which makes this approach very useful for applications in semantic analysis, natural language processing, and knowledge management.
!pip install langgraph langchain_core
We install two basic libraries of Python: Langgraph, which are used to create and coordinate the course of the agent -based arithmetic, and Langchain Core, which provides basic classes and auxiliary tools for building applications that make language models. These libraries allow a smooth integration of agents in smart data pipelines.
import re
import networkx as nx
import matplotlib.pyplot as plt
from typing import TypedDict, List, Tuple, Dict, Any
from langchain_core.messages import HumanMessage, AIMessage
from langgraph.graph import StateGraph, END
We import the basic libraries to create an automatic drawing pipeline for knowledge. Repels include normal expression texts, NetworkX and MATPLOTLIB to create and visualize graphics, technology and writing illustrations to process organized data processing, and Langgraph along with Langchain_Core to regulate the interaction between artificial intelligence factors within the work flow.
class KGState(TypedDict):
topic: str
raw_text: str
entities: List[str]
relations: List[Tuple[str, str, str]]
resolved_relations: List[Tuple[str, str, str]]
graph: Any
validation: Dict[str, Any]
messages: List[Any]
current_agent: str
We define the type of regulator data, KGSTATE, using Typeeddict Python. The status management plan defines different steps from the graphic pipeline knowledge. It includes details such as the chosen topic, the combined text, the specified entities and relationships, the resolved repetitions, the built -in graph object, the results of health verification, the reaction messages, and the current active agent.
def data_gatherer(state: KGState) -> KGState:
topic = state["topic"]
print(f" Data Gatherer: Searching for information about '{topic}'")
collected_text = f"{topic} is an important concept. It relates to various entities like EntityA, EntityB, and EntityC. EntityA influences EntityB. EntityC is a type of EntityB."
state["messages"].append(AIMessage(content=f"Collected raw text about {topic}"))
state["raw_text"] = collected_text
state["current_agent"] = "entity_extractor"
return state
This function, Data_GATHERER, works as a first step in the pipeline. It simulates the collection of raw text data on a available topic (stored in the case[“topic”]). Then this data stores simulation in case[“raw_text”],, A message indicating the completion of the data collection, and updating the pipeline by appointing the following agent (Entity_Extractor) as active.
def entity_extractor(state: KGState) -> KGState:
print("
Entity Extractor: Identifying entities in the text")
text = state["raw_text"]
entities = re.findall(r"Entity[A-Z]", text)
entities = [state["topic"]] + entities
state["entities"] = list(set(entities))
state["messages"].append(AIMessage(content=f"Extracted entities: {state['entities']}"))
print(f" Found entities: {state['entities']}")
state["current_agent"] = "relation_extractor"
return state
Entity_Extractor function is determined by entities from the raw text collected using a simple ordinary expression style that matches terms such as “Entitya”, “Entityb”, etc., it also includes the main theme as an entity and ensures uniqueness by converting the list into a group. The extracted entities are stored in the case, the Amnesty International’s message is recorded, and the pipeline developed into Cleant_Extreator agent.
def relation_extractor(state: KGState) -> KGState:
print("
Relation Extractor: Identifying relationships between entities")
text = state["raw_text"]
entities = state["entities"]
relations = []
relation_patterns = [
(r"([A-Za-z]+) relates to ([A-Za-z]+)", "relates_to"),
(r"([A-Za-z]+) influences ([A-Za-z]+)", "influences"),
(r"([A-Za-z]+) is a type of ([A-Za-z]+)", "is_type_of")
]
for e1 in entities:
for e2 in entities:
if e1 != e2:
for pattern, rel_type in relation_patterns:
if re.search(f"{e1}.*{rel_type}.*{e2}", text.replace("_", " "), re.IGNORECASE) or \
re.search(f"{e1}.*{e2}", text, re.IGNORECASE):
relations.append((e1, rel_type, e2))
state["relations"] = relations
state["messages"].append(AIMessage(content=f"Extracted relations: {relations}"))
print(f" Found relations: {relations}")
state["current_agent"] = "entity_resolver"
return state
The relationship function _Extractor discovers semantic relationships between entities within the raw text. ReGEX patterns are used in advance to determine phrases such as “effects” or “type” between the pairs of the entity. When finding a match, the corresponding relationship adds as a triple (subject, Musnad, object) to the list of relationships. These extracted relationships are stored in the case, a message is recorded to connect the agent, and the control is transmitted to the following agent: Entity_Resolver.
def entity_resolver(state: KGState) -> KGState:
print("
Entity Resolver: Resolving duplicate entities")
entity_map = {}
for entity in state["entities"]:
canonical_name = entity.lower().replace(" ", "_")
entity_map[entity] = canonical_name
resolved_relations = []
for s, p, o in state["relations"]:
s_resolved = entity_map.get(s, s)
o_resolved = entity_map.get(o, o)
resolved_relations.append((s_resolved, p, o_resolved))
state["resolved_relations"] = resolved_relations
state["messages"].append(AIMessage(content=f"Resolved relations: {resolved_relations}"))
state["current_agent"] = "graph_integrator"
return state
Entity_resolver function unifies the names of entities to avoid duplication and contradictions. It creates the Entity_map by converting each entity into small letters and replacing the spaces with the journey. Then, this appointment is applied to all topics and beings in the relations extracted to produce the relationships that have been resolved. These natural trilogies are added to the condition, a confirmation message is recorded, and the control is transferred to the Graph_integrator agent.
def graph_integrator(state: KGState) -> KGState:
print("
Graph Integrator: Building the knowledge graph")
G = nx.DiGraph()
for s, p, o in state["resolved_relations"]:
if not G.has_node(s):
G.add_node(s)
if not G.has_node(o):
G.add_node(o)
G.add_edge(s, o, relation=p)
state["graph"] = G
state["messages"].append(AIMessage(content=f"Built graph with {len(G.nodes)} nodes and {len(G.edges)} edges"))
state["current_agent"] = "graph_validator"
return state
Graph_integator function creates actual knowledge chart using NetworkX.Digraph () that supports mental relationships. Repeat three times (theme, the Musnad, the object), ensures the presence of the two nodes, then adds an edge directed with the relationship such as descriptive data. The resulting graph is saved in the case, a brief message is attached, and the pipeline is transferred to the Graph_validator agent for the final verification.
def graph_validator(state: KGState) -> KGState:
print("
Graph Validator: Validating knowledge graph")
G = state["graph"]
validation_report = {
"num_nodes": len(G.nodes),
"num_edges": len(G.edges),
"is_connected": nx.is_weakly_connected(G) if G.nodes else False,
"has_cycles": not nx.is_directed_acyclic_graph(G) if G.nodes else False
}
state["validation"] = validation_report
state["messages"].append(AIMessage(content=f"Validation report: {validation_report}"))
print(f" Validation report: {validation_report}")
state["current_agent"] = END
return state
Graph_validator function is a basic health examination on the knowledgeable chart. It collects health verification report that contains the number of nodes and edges, whether the graph is weakly connected (i.e., each knot can be reached if the direction is ignored), and whether the graph contains courses. This report is added to the state and records as Amnesty International. Once health verification is complete, the pipeline is distinguished as it ended by setting Current_gent to the end.
def router(state: KGState) -> str:
return state["current_agent"]
def visualize_graph(graph):
plt.figure(figsize=(10, 6))
pos = nx.spring_layout(graph)
nx.draw(graph, pos, with_labels=True, node_color="skyblue", node_size=1500, font_size=10)
edge_labels = nx.get_edge_attributes(graph, 'relation')
nx.draw_networkx_edge_labels(graph, pos, edge_labels=edge_labels)
plt.title("Knowledge Graph")
plt.tight_layout()
plt.show()
The router function is directed to the next agent based on the Current_gent field in the case. Meanwhile, the function of Visualize_GRAPH MATPLOTLIB and Networkx is used to display final knowledge, and the contract appears, edges, and relationships for axiomatic visual understanding.
def build_kg_graph():
workflow = StateGraph(KGState)
workflow.add_node("data_gatherer", data_gatherer)
workflow.add_node("entity_extractor", entity_extractor)
workflow.add_node("relation_extractor", relation_extractor)
workflow.add_node("entity_resolver", entity_resolver)
workflow.add_node("graph_integrator", graph_integrator)
workflow.add_node("graph_validator", graph_validator)
workflow.add_conditional_edges("data_gatherer", router,
{"entity_extractor": "entity_extractor"})
workflow.add_conditional_edges("entity_extractor", router,
{"relation_extractor": "relation_extractor"})
workflow.add_conditional_edges("relation_extractor", router,
{"entity_resolver": "entity_resolver"})
workflow.add_conditional_edges("entity_resolver", router,
{"graph_integrator": "graph_integrator"})
workflow.add_conditional_edges("graph_integrator", router,
{"graph_validator": "graph_validator"})
workflow.add_conditional_edges("graph_validator", router,
{END: END})
workflow.set_entry_point("data_gatherer")
return workflow.compile()
Build_kg_graph function determines the function of the full chart using Langgraph. It continues successively, each worker, as a knot, from collecting data to verifying the authenticity of the graph, and connecting it through police transformations based on the current factor. The entry point is set on Data_GATHERER, and the chart is assembled in an implementable workflow directing the automatic pipeline from start to finish.
def run_knowledge_graph_pipeline(topic):
print(f"
Starting knowledge graph pipeline for: {topic}")
initial_state = {
"topic": topic,
"raw_text": "",
"entities": [],
"relations": [],
"resolved_relations": [],
"graph": None,
"validation": {},
"messages": [HumanMessage(content=f"Build a knowledge graph about {topic}")],
"current_agent": "data_gatherer"
}
kg_app = build_kg_graph()
final_state = kg_app.invoke(initial_state)
print(f"
بناء الرسم البياني المعرفة كاملة لـ: {topic} ") إرجاع Final_state
Run_knowledge_GRAPH_PIPELINE creates the pipeline by preparing an empty state dictionary with the subject presented. It creates the workflow using Build_kg_GRAPH (), then runs it by calling the translated chart with the initial state. Since each agent treats data, the condition develops, and the end result includes full knowledge fee, validation and ready to use.
if __name__ == "__main__":
topic = "Artificial Intelligence"
result = run_knowledge_graph_pipeline(topic)
visualize_graph(result["graph"])
Finally, this mass acts as a script entry point. When implemented directly, it leads to the operation of the graphic pipeline knowledge of the topic of “artificial intelligence”, across all stages of the agent, and finally imagines the resulting graph using the Visualize_Graph function (). It provides a comprehensive offer to generate automated graphs.


In conclusion, we have learned how to integrate many specialized agents smoothly into a firm drawing pipeline through this organized approach, and take advantage of Langgraph and NetworkX. This workflow works to automate entity extraction, relationships relationships and depict complex relationships, providing clear and implemented representation of collected information. By controlling and strengthening individual agents, such as using the methods of identifying the most advanced entities or integrating data sources in actual time, this founding framework can be limited and customized for the tasks of building advanced graph across various fields.
verify Clap notebook. All the credit for this research goes to researchers in this project. Also, do not hesitate to follow us twitter And do not forget to join 90k+ ml subreddit.
Publish a step -by -step guide to create an automatic drawing pipeline for knowledge using Langgraph and NetworkX first appeared on Marktechpost.
Don’t miss more hot News like this! Click here to discover the latest in AI news!
2025-05-15 07:38:00