On this tutorial, we construct a sophisticated multi-agent communication system utilizing a structured message bus structure powered by LangGraph and Pydantic. We outline a strict ACP-style message schema that permits brokers to speak through a shared state quite than calling one another instantly, enabling modularity, traceability, and production-grade orchestration. We implement three specialised brokers, a Planner, Executor, and Validator, that coordinate by means of structured messages, persistent state, and routing logic. We additionally combine SQLite-based persistence to offer sturdy reminiscence throughout executions and visualize the agent communication circulation to grasp how messages propagate by means of the system.
!pip -q set up -U "pydantic==2.12.3"
!pip -q set up -U langgraph langchain-core networkx matplotlib
!pip -q set up -U langgraph-checkpoint-sqlite
import os
import json
import uuid
import sqlite3
from datetime import datetime, timezone
from typing import Any, Dict, Listing, Literal, Non-compulsory, Tuple
from pydantic import BaseModel, Area
import networkx as nx
import matplotlib.pyplot as plt
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.sqlite import SqliteSaver
Function = Literal["planner", "executor", "validator", "user", "system"]
MsgType = Literal["task", "plan", "result", "validation", "error", "control"]
class ACPMessage(BaseModel):
msg_id: str = Area(default_factory=lambda: str(uuid.uuid4()))
ts: str = Area(default_factory=lambda: datetime.now(timezone.utc).isoformat().substitute("+00:00", "Z"))
sender: Function
receiver: Function
msg_type: MsgType
content material: str
meta: Dict[str, Any] = Area(default_factory=dict)
hint: Dict[str, Any] = Area(default_factory=dict)
def acp_log_path() -> str:
os.makedirs("acp_logs", exist_ok=True)
return os.path.be part of("acp_logs", "acp_messages.jsonl")
def append_acp_log(m: ACPMessage) -> None:
with open(acp_log_path(), "a", encoding="utf-8") as f:
f.write(m.model_dump_json() + "n")
We set up and import all of the required libraries wanted to construct a structured multi-agent communication system. We outline the ACP-style message schema utilizing Pydantic, which permits us to implement a strict and structured format for agent communication. We additionally implement structured logging to persist each message exchanged between brokers, enabling traceability and observability of the system.
class BusState(BaseModel):
objective: str = ""
completed: bool = False
errors: Listing[str] = Area(default_factory=record)
mailbox: Listing[ACPMessage] = Area(default_factory=record)
edges: Listing[Tuple[str, str, str]] = Area(default_factory=record)
active_role: Function = "person"
step: int = 0
def bus_update(
state: BusState,
sender: Function,
receiver: Function,
msg_type: MsgType,
content material: str,
meta: Non-compulsory[Dict[str, Any]] = None,
hint: Non-compulsory[Dict[str, Any]] = None,
) -> Dict[str, Any]:
m = ACPMessage(
sender=sender,
receiver=receiver,
msg_type=msg_type,
content material=content material,
meta=meta or {},
hint=hint or {},
)
append_acp_log(m)
return {
"objective": state.objective,
"completed": state.completed,
"errors": state.errors,
"mailbox": state.mailbox + [m],
"edges": state.edges + [(sender, receiver, msg_type)],
"active_role": receiver,
"step": state.step + 1,
}
We outline the shared state construction that acts because the centralized message bus for all brokers. We implement the BusState class to retailer the objective, mailbox, routing info, and execution progress. We additionally create the bus_update operate, which permits us to generate structured messages, replace the shared state, and constantly persist message logs.
def planner_agent(state_dict: Dict[str, Any]) -> Dict[str, Any]:
state = BusState.model_validate(state_dict)
objective = state.objective.strip()
if not objective:
return bus_update(state, "planner", "validator", "error", "No objective offered.", meta={"purpose": "empty_goal"})
plan = [
"Interpret the goal and extract requirements.",
"Decide an execution strategy with clear outputs.",
"Ask Executor to produce the result.",
"Ask Validator to check correctness + completeness.",
]
plan_text = "n".be part of([f"{i+1}. {p}" for i, p in enumerate(plan)])
return bus_update(
state,
"planner",
"executor",
"plan",
plan_text,
meta={"objective": objective, "plan_steps": len(plan)},
hint={"coverage": "deterministic_planner_v1"},
)
def executor_agent(state_dict: Dict[str, Any]) -> Dict[str, Any]:
state = BusState.model_validate(state_dict)
objective = state.objective.strip()
latest_plan = None
for m in reversed(state.mailbox):
if m.receiver == "executor" and m.msg_type == "plan":
latest_plan = m.content material
break
outcome = {
"objective": objective,
"assumptions": [
"We can produce a concise, actionable output.",
"We can validate via rule-based checks.",
],
"output": f"Executed process for objective: {objective}",
"deliverables": [
"A clear summary",
"A step-by-step action list",
"Any constraints and edge cases",
],
"plan_seen": bool(latest_plan),
}
result_text = json.dumps(outcome, indent=2)
return bus_update(
state,
"executor",
"validator",
"outcome",
result_text,
meta={"artifact_type": "json", "bytes": len(result_text.encode("utf-8"))},
hint={"coverage": "deterministic_executor_v1"},
)
We implement the Planner and Executor brokers, which deal with process planning and execution. We design the Planner agent to interpret the objective and generate a structured execution plan, which is then handed by means of the message bus. We implement the Executor agent to learn the plan, execute it, and produce a structured outcome artifact that downstream brokers can validate.
def validator_agent(state_dict: Dict[str, Any]) -> Dict[str, Any]:
state = BusState.model_validate(state_dict)
objective = state.objective.strip()
latest_result = None
for m in reversed(state.mailbox):
if m.receiver == "validator" and m.msg_type in ("outcome", "error"):
latest_result = m
break
if latest_result is None:
upd = bus_update(state, "validator", "planner", "error", "No outcome to validate.", meta={"purpose": "missing_result"})
upd["done"] = True
upd["errors"] = state.errors + ["missing_result"]
return upd
if latest_result.msg_type == "error":
upd = bus_update(
state,
"validator",
"planner",
"validation",
f"Validation failed as a result of upstream error occurred: {latest_result.content material}",
meta={"standing": "fail"},
)
upd["done"] = True
upd["errors"] = state.errors + [latest_result.content]
return upd
strive:
parsed = json.masses(latest_result.content material)
besides Exception as e:
upd = bus_update(
state,
"validator",
"planner",
"validation",
f"End result is just not legitimate JSON: {e}",
meta={"standing": "fail"},
)
upd["done"] = True
upd["errors"] = state.errors + [f"invalid_json: {e}"]
return upd
points = []
if parsed.get("objective") != objective:
points.append("End result.objective doesn't match enter objective.")
if "deliverables" not in parsed or not isinstance(parsed["deliverables"], record) or len(parsed["deliverables"]) == 0:
points.append("Lacking or empty deliverables record.")
if points:
upd = bus_update(
state,
"validator",
"planner",
"validation",
"Validation failed:n- " + "n- ".be part of(points),
meta={"standing": "fail", "points": points},
)
upd["done"] = True
upd["errors"] = state.errors + points
return upd
upd = bus_update(
state,
"validator",
"person",
"validation",
"Validation handed ✅ End result appears constant and full.",
meta={"standing": "cross"},
)
upd["done"] = True
upd["errors"] = state.errors
return upd
def route_next(state_dict: Dict[str, Any]) -> str:
if state_dict.get("completed", False):
return END
position = state_dict.get("active_role", "person")
if position == "planner":
return "planner"
if position == "executor":
return "executor"
if position == "validator":
return "validator"
return END
We implement the Validator agent and the routing logic that controls agent execution circulation. We design the Validator to examine the execution outcomes, confirm correctness, and generate validation outcomes by means of structured checks. We additionally implement the routing operate that dynamically determines which agent ought to execute subsequent, enabling coordinated multi-agent orchestration.
graph = StateGraph(dict)
graph.add_node("planner", planner_agent)
graph.add_node("executor", executor_agent)
graph.add_node("validator", validator_agent)
graph.set_entry_point("planner")
graph.add_conditional_edges("planner", route_next, {"planner": "planner", "executor": "executor", "validator": "validator", END: END})
graph.add_conditional_edges("executor", route_next, {"planner": "planner", "executor": "executor", "validator": "validator", END: END})
graph.add_conditional_edges("validator", route_next, {"planner": "planner", "executor": "executor", "validator": "validator", END: END})
os.makedirs("checkpoints", exist_ok=True)
db_path = "checkpoints/langgraph_bus.sqlite"
conn = sqlite3.join(db_path, check_same_thread=False)
checkpointer = SqliteSaver(conn)
app = graph.compile(checkpointer=checkpointer)
def run_thread(objective: str, thread_id: str) -> BusState:
init = BusState(objective=objective, active_role="planner", completed=False).model_dump()
final_state_dict = app.invoke(init, config={"configurable": {"thread_id": thread_id}})
return BusState.model_validate(final_state_dict)
thread_id = "demo-thread-001"
objective = "Design an ACP-style message bus the place planner/executor/validator coordinate by means of shared state."
final_state = run_thread(objective, thread_id)
print("Executed:", final_state.completed)
print("Steps:", final_state.step)
print("Errors:", final_state.errors)
print("nLast 5 messages:")
for m in final_state.mailbox[-5:]:
print(f"- [{m.msg_type}] {m.sender} -> {m.receiver}: {m.content material[:80]}")
snapshot = checkpointer.get_tuple({"configurable": {"thread_id": thread_id}})
cp = snapshot.checkpoint or {}
cv = cp.get("channel_values", {}) or {}
sv = cp.get("state", {}) or {}
vals = cv if isinstance(cv, dict) and len(cv) else sv if isinstance(sv, dict) else {}
print("nCheckpoint keys:", record(cp.keys()))
if isinstance(cv, dict):
print("channel_values keys:", record(cv.keys())[:30])
if isinstance(sv, dict):
print("state keys:", record(sv.keys())[:30])
print("nPersisted step (best-effort):", vals.get("step", "NOT_FOUND"))
print("Continued active_role (best-effort):", vals.get("active_role", "NOT_FOUND"))
print("nACP logs:", acp_log_path())
print("Checkpoint DB:", db_path)
G = nx.DiGraph()
G.add_edge("planner", "executor")
G.add_edge("executor", "validator")
G.add_edge("validator", "person")
plt.determine(figsize=(6, 4))
pos = nx.spring_layout(G, seed=7)
nx.draw(G, pos, with_labels=True, node_size=1800, font_size=10, arrows=True)
plt.title("Orchestration Graph: Planner → Executor → Validator")
plt.present()
comm = nx.MultiDiGraph()
for (s, r, t) in final_state.edges:
comm.add_edge(s, r, label=t)
plt.determine(figsize=(8, 5))
pos2 = nx.spring_layout(comm, seed=11)
nx.draw(comm, pos2, with_labels=True, node_size=1800, font_size=10, arrows=True)
plt.title("Communication Graph from Structured Message Bus (Runtime Edges)")
plt.present()
def tail_jsonl(path: str, n: int = 8) -> Listing[Dict[str, Any]]:
if not os.path.exists(path):
return []
with open(path, "r", encoding="utf-8") as f:
traces = f.readlines()[-n:]
return [json.loads(x) for x in lines]
print("nLast ACP log entries:")
for row in tail_jsonl(acp_log_path(), 6):
print(f"{row['msg_type']:>10} | {row['sender']} -> {row['receiver']} | {row['ts']}")
We assemble the LangGraph state graph, allow SQLite-based persistence, and execute the multi-agent workflow. We use a thread identifier to make sure the agent state might be saved and recovered reliably throughout executions. We additionally visualize the orchestration and communication graphs and examine continued logs, which permits us to grasp how brokers work together by means of the structured message bus.
On this tutorial, we efficiently designed and carried out a structured multi-agent communication framework utilizing LangGraph’s shared-state structure and ACP-style message-bus rules. We enabled brokers to function independently whereas speaking by means of structured, persistent messages, which improves reliability, observability, and scalability. We logged each interplay, continued agent state throughout executions, and visualized communication patterns to achieve deep perception into agent coordination. This structure permits us to construct strong, modular, and production-ready multi-agent techniques that may be prolonged with extra brokers, LLM reasoning, reminiscence techniques, and sophisticated routing methods.
Try the Full Codes right here. Additionally, be happy to comply with us on Twitter and don’t overlook to hitch our 120k+ ML SubReddit and Subscribe to our E-newsletter. Wait! are you on telegram? now you’ll be able to be part of us on telegram as properly.
Elevate your perspective with NextTech Information, the place innovation meets perception.
Uncover the newest breakthroughs, get unique updates, and join with a worldwide community of future-focused thinkers.
Unlock tomorrow’s tendencies at the moment: learn extra, subscribe to our publication, and turn out to be a part of the NextTech group at NextTech-news.com

