On this tutorial, we construct an ultra-advanced agentic AI workflow that behaves like a production-grade analysis and reasoning system slightly than a single immediate name. We ingest actual internet sources asynchronously, cut up them into provenance-tracked chunks, and run hybrid retrieval utilizing each TF-IDF (sparse) and OpenAI embeddings (dense), then fuse outcomes for greater recall and stability. We orchestrate a number of brokers, planning, synthesis, and restore, whereas imposing strict guardrails so each main declare is grounded in retrieved proof, and we persist episodic reminiscence. Therefore, the system improves its technique over time. Try the FULL CODES right here.
!pip -q set up openai openai-agents pydantic httpx beautifulsoup4 lxml scikit-learn numpy
import os, re, json, time, getpass, asyncio, sqlite3, hashlib
from typing import Listing, Dict, Tuple, Non-compulsory, Any
import numpy as np
import httpx
from bs4 import BeautifulSoup
from pydantic import BaseModel, Subject
from sklearn.feature_extraction.textual content import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from openai import AsyncOpenAI
from brokers import Agent, Runner, SQLiteSession
if not os.environ.get("OPENAI_API_KEY"):
os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter your OpenAI API key: ")
if not os.environ.get("OPENAI_API_KEY"):
increase RuntimeError("OPENAI_API_KEY not offered.")
print("✅ OpenAI API key loaded securely.")
oa = AsyncOpenAI(api_key=os.environ["OPENAI_API_KEY"])
def sha1(s: str) -> str:
return hashlib.sha1(s.encode("utf-8", errors="ignore")).hexdigest()
def normalize_url(u: str) -> str:
u = (u or "").strip()
return u.rstrip(").,]"'")
def clean_html_to_text(html: str) -> str:
soup = BeautifulSoup(html, "lxml")
for tag in soup(["script", "style", "noscript"]):
tag.decompose()
txt = soup.get_text("n")
txt = re.sub(r"n{3,}", "nn", txt).strip()
txt = re.sub(r"[ t]+", " ", txt)
return txt
def chunk_text(textual content: str, chunk_chars: int = 1600, overlap_chars: int = 320) -> Listing[str]:
if not textual content:
return []
textual content = re.sub(r"s+", " ", textual content).strip()
n = len(textual content)
step = max(1, chunk_chars - overlap_chars)
chunks = []
i = 0
whereas i < n:
chunks.append(textual content[i:i + chunk_chars])
i += step
return chunks
def canonical_chunk_id(s: str) -> str:
if s is None:
return ""
s = str(s).strip()
s = s.strip("<>"'()[]{}")
s = s.rstrip(".,;:")
return s
def inject_exec_summary_citations(exec_summary: str, citations: Listing[str], allowed_chunk_ids: Listing[str]) -> str:
exec_summary = exec_summary or ""
cset = []
for c in citations:
c = canonical_chunk_id(c)
if c and c in allowed_chunk_ids and c not in cset:
cset.append(c)
if len(cset) >= 2:
break
if len(cset) < 2:
for c in allowed_chunk_ids:
if c not in cset:
cset.append(c)
if len(cset) >= 2:
break
if len(cset) >= 2:
wanted = [c for c in cset if c not in exec_summary]
if wanted:
exec_summary = exec_summary.strip()
if exec_summary and never exec_summary.endswith("."):
exec_summary += "."
exec_summary += f" (cite: {cset[0]}) (cite: {cset[1]})"
return exec_summary
We arrange the surroundings, securely load the OpenAI API key, and initialize core utilities that all the things else is dependent upon. We outline hashing, URL normalization, HTML cleansing, and chunking so all downstream steps function on clear, constant textual content. We additionally add deterministic helpers to normalize and inject citations, making certain guardrails are at all times glad. Try the FULL CODES right here.
async def fetch_many(urls: Listing[str], timeout_s: float = 25.0, per_url_char_limit: int = 60000) -> Dict[str, str]:
headers = {"Consumer-Agent": "Mozilla/5.0 (AgenticAI/4.2)"}
urls = [normalize_url(u) for u in urls]
urls = [u for u in urls if u.startswith("http")]
urls = checklist(dict.fromkeys(urls))
out: Dict[str, str] = {}
async with httpx.AsyncClient(timeout=timeout_s, follow_redirects=True, headers=headers) as consumer:
async def _one(url: str):
strive:
r = await consumer.get(url)
r.raise_for_status()
out[url] = clean_html_to_text(r.textual content)[:per_url_char_limit]
besides Exception as e:
out[url] = f"__FETCH_ERROR__ {sort(e).__name__}: {e}"
await asyncio.collect(*[_one(u) for u in urls])
return out
def dedupe_texts(sources: Dict[str, str]) -> Dict[str, str]:
seen = set()
out = {}
for url, txt in sources.objects():
if not isinstance(txt, str) or txt.startswith("__FETCH_ERROR__"):
proceed
h = sha1(txt[:25000])
if h in seen:
proceed
seen.add(h)
out[url] = txt
return out
class ChunkRecord(BaseModel):
chunk_id: str
url: str
chunk_index: int
textual content: str
class RetrievalHit(BaseModel):
chunk_id: str
url: str
chunk_index: int
score_sparse: float = 0.0
score_dense: float = 0.0
score_fused: float = 0.0
textual content: str
class EvidencePack(BaseModel):
question: str
hits: Listing[RetrievalHit]
We asynchronously fetch a number of internet sources in parallel and aggressively deduplicate content material to keep away from redundant proof. We convert uncooked pages into structured textual content and outline the core information fashions that characterize chunks and retrieval hits. We guarantee every bit of textual content is traceable again to a selected supply and chunk index. Try the FULL CODES right here.
EPISODE_DB = "agentic_episode_memory.db"
def episode_db_init():
con = sqlite3.join(EPISODE_DB)
cur = con.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS episodes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts INTEGER NOT NULL,
query TEXT NOT NULL,
urls_json TEXT NOT NULL,
retrieval_queries_json TEXT NOT NULL,
useful_sources_json TEXT NOT NULL
)
""")
con.commit()
con.shut()
def episode_store(query: str, urls: Listing[str], retrieval_queries: Listing[str], useful_sources: Listing[str]):
con = sqlite3.join(EPISODE_DB)
cur = con.cursor()
cur.execute(
"INSERT INTO episodes(ts, query, urls_json, retrieval_queries_json, useful_sources_json) VALUES(?,?,?,?,?)",
(int(time.time()), query, json.dumps(urls), json.dumps(retrieval_queries), json.dumps(useful_sources)),
)
con.commit()
con.shut()
def episode_recall(query: str, top_k: int = 2) -> Listing[Dict[str, Any]]:
con = sqlite3.join(EPISODE_DB)
cur = con.cursor()
cur.execute("SELECT ts, query, urls_json, retrieval_queries_json, useful_sources_json FROM episodes ORDER BY ts DESC LIMIT 200")
rows = cur.fetchall()
con.shut()
q_tokens = set(re.findall(r"[A-Za-z]{3,}", (query or "").decrease()))
scored = []
for ts, q2, u, rq, us in rows:
t2 = set(re.findall(r"[A-Za-z]{3,}", (q2 or "").decrease()))
if not t2:
proceed
rating = len(q_tokens & t2) / max(1, len(q_tokens))
if rating > 0:
scored.append((rating, {
"ts": ts,
"query": q2,
"urls": json.hundreds(u),
"retrieval_queries": json.hundreds(rq),
"useful_sources": json.hundreds(us),
}))
scored.type(key=lambda x: x[0], reverse=True)
return [x[1] for x in scored[:top_k]]
episode_db_init()
We introduce episodic reminiscence backed by SQLite so the system can recall what labored in earlier runs. We retailer questions, retrieval methods, and helpful sources to information future planning. We additionally implement light-weight similarity-based recall to bias the system towards traditionally efficient patterns. Try the FULL CODES right here.
class HybridIndex:
def __init__(self):
self.data: Listing[ChunkRecord] = []
self.tfidf: Non-compulsory[TfidfVectorizer] = None
self.tfidf_mat = None
self.emb_mat: Non-compulsory[np.ndarray] = None
def build_sparse(self):
corpus = [r.text for r in self.records] if self.data else [""]
self.tfidf = TfidfVectorizer(stop_words="english", ngram_range=(1, 2), max_features=80000)
self.tfidf_mat = self.tfidf.fit_transform(corpus)
def search_sparse(self, question: str, ok: int) -> Listing[Tuple[int, float]]:
if not self.data or self.tfidf is None or self.tfidf_mat is None:
return []
qv = self.tfidf.remodel([query])
sims = cosine_similarity(qv, self.tfidf_mat).flatten()
prime = np.argsort(-sims)[:k]
return [(int(i), float(sims[i])) for i in prime]
def set_dense(self, mat: np.ndarray):
self.emb_mat = mat.astype(np.float32)
def search_dense(self, q_emb: np.ndarray, ok: int) -> Listing[Tuple[int, float]]:
if self.emb_mat is None or not self.data:
return []
M = self.emb_mat
q = q_emb.astype(np.float32).reshape(1, -1)
M_norm = M / (np.linalg.norm(M, axis=1, keepdims=True) + 1e-9)
q_norm = q / (np.linalg.norm(q) + 1e-9)
sims = (M_norm @ q_norm.T).flatten()
prime = np.argsort(-sims)[:k]
return [(int(i), float(sims[i])) for i in prime]
def rrf_fuse(rankings: Listing[List[int]], ok: int = 60) -> Dict[int, float]:
scores: Dict[int, float] = {}
for r in rankings:
for pos, idx in enumerate(r, begin=1):
scores[idx] = scores.get(idx, 0.0) + 1.0 / (ok + pos)
return scores
HYBRID = HybridIndex()
ALLOWED_URLS: Listing[str] = []
EMBED_MODEL = "text-embedding-3-small"
async def embed_batch(texts: Listing[str]) -> np.ndarray:
resp = await oa.embeddings.create(mannequin=EMBED_MODEL, enter=texts, encoding_format="float")
vecs = [np.array(item.embedding, dtype=np.float32) for item in resp.data]
return np.vstack(vecs) if vecs else np.zeros((0, 0), dtype=np.float32)
async def embed_texts(texts: Listing[str], batch_size: int = 96, max_concurrency: int = 3) -> np.ndarray:
sem = asyncio.Semaphore(max_concurrency)
mats: Listing[Tuple[int, np.ndarray]] = []
async def _one(begin: int, batch: Listing[str]):
async with sem:
m = await embed_batch(batch)
mats.append((begin, m))
duties = []
for begin in vary(0, len(texts), batch_size):
batch = [t[:7000] for t in texts[start:start + batch_size]]
duties.append(_one(begin, batch))
await asyncio.collect(*duties)
mats.type(key=lambda x: x[0])
emb = np.vstack([m for _, m in mats]) if mats else np.zeros((len(texts), 0), dtype=np.float32)
if emb.form[0] != len(texts):
increase RuntimeError(f"Embedding rows mismatch: bought {emb.form[0]} anticipated {len(texts)}")
return emb
async def embed_query(question: str) -> np.ndarray:
m = await embed_batch([query[:7000]])
return m[0] if m.form[0] else np.zeros((0,), dtype=np.float32)
async def build_index(urls: Listing[str], max_chunks_per_url: int = 60):
world ALLOWED_URLS
fetched = await fetch_many(urls)
fetched = dedupe_texts(fetched)
data: Listing[ChunkRecord] = []
allowed: Listing[str] = []
for url, txt in fetched.objects():
if not isinstance(txt, str) or txt.startswith("__FETCH_ERROR__"):
proceed
allowed.append(url)
chunks = chunk_text(txt)[:max_chunks_per_url]
for i, ch in enumerate(chunks):
cid = f"{sha1(url)}:{i}"
data.append(ChunkRecord(chunk_id=cid, url=url, chunk_index=i, textual content=ch))
if not data:
err_view = {normalize_url(u): fetched.get(normalize_url(u), "") for u in urls}
increase RuntimeError("No sources fetched efficiently.n" + json.dumps(err_view, indent=2)[:4000])
ALLOWED_URLS = allowed
HYBRID.data = data
HYBRID.build_sparse()
texts = [r.text for r in HYBRID.records]
emb = await embed_texts(texts, batch_size=96, max_concurrency=3)
HYBRID.set_dense(emb)
We construct a hybrid retrieval index that mixes sparse TF-IDF search with dense OpenAI embeddings. We allow reciprocal rank fusion, in order that sparse and dense indicators complement one another slightly than compete. We assemble the index as soon as per run and reuse it throughout all retrieval queries for effectivity. Try the FULL CODES right here.
def build_evidence_pack(question: str, sparse: Listing[Tuple[int,float]], dense: Listing[Tuple[int,float]], ok: int = 10) -> EvidencePack:
sparse_rank = [i for i,_ in sparse]
dense_rank = [i for i,_ in dense]
sparse_scores = {i:s for i,s in sparse}
dense_scores = {i:s for i,s in dense}
fused = rrf_fuse([sparse_rank, dense_rank], ok=60) if dense_rank else rrf_fuse([sparse_rank], ok=60)
prime = sorted(fused.keys(), key=lambda i: fused[i], reverse=True)[:k]
hits: Listing[RetrievalHit] = []
for idx in prime:
r = HYBRID.data[idx]
hits.append(RetrievalHit(
chunk_id=r.chunk_id, url=r.url, chunk_index=r.chunk_index,
score_sparse=float(sparse_scores.get(idx, 0.0)),
score_dense=float(dense_scores.get(idx, 0.0)),
score_fused=float(fused.get(idx, 0.0)),
textual content=r.textual content
))
return EvidencePack(question=question, hits=hits)
async def gather_evidence(queries: Listing[str], per_query_k: int = 10, sparse_k: int = 60, dense_k: int = 60):
proof: Listing[EvidencePack] = []
useful_sources_count: Dict[str, int] = {}
all_chunk_ids: Listing[str] = []
for q in queries:
sparse = HYBRID.search_sparse(q, ok=sparse_k)
q_emb = await embed_query(q)
dense = HYBRID.search_dense(q_emb, ok=dense_k)
pack = build_evidence_pack(q, sparse, dense, ok=per_query_k)
proof.append(pack)
for h in pack.hits[:6]:
useful_sources_count[h.url] = useful_sources_count.get(h.url, 0) + 1
for h in pack.hits:
all_chunk_ids.append(h.chunk_id)
useful_sources = sorted(useful_sources_count.keys(), key=lambda u: useful_sources_count[u], reverse=True)
all_chunk_ids = sorted(checklist(dict.fromkeys(all_chunk_ids)))
return proof, useful_sources[:8], all_chunk_ids
class Plan(BaseModel):
goal: str
subtasks: Listing[str]
retrieval_queries: Listing[str]
acceptance_checks: Listing[str]
class UltraAnswer(BaseModel):
title: str
executive_summary: str
structure: Listing[str]
retrieval_strategy: Listing[str]
agent_graph: Listing[str]
implementation_notes: Listing[str]
risks_and_limits: Listing[str]
citations: Listing[str]
sources: Listing[str]
def normalize_answer(ans: UltraAnswer, allowed_chunk_ids: Listing[str]) -> UltraAnswer:
information = ans.model_dump()
information["citations"] = [canonical_chunk_id(x) for x in (data.get("citations") or [])]
information["citations"] = [x for x in data["citations"] if x in allowed_chunk_ids]
information["executive_summary"] = inject_exec_summary_citations(information.get("executive_summary",""), information["citations"], allowed_chunk_ids)
return UltraAnswer(**information)
def validate_ultra(ans: UltraAnswer, allowed_chunk_ids: Listing[str]) -> None:
extras = [u for u in ans.sources if u not in ALLOWED_URLS]
if extras:
increase ValueError(f"Non-allowed sources in output: {extras}")
cset = set(ans.citations or [])
lacking = [cid for cid in cset if cid not in set(allowed_chunk_ids)]
if lacking:
increase ValueError(f"Citations reference unknown chunk_ids (not retrieved): {lacking}")
if len(cset) < 6:
increase ValueError("Want a minimum of 6 distinct chunk_id citations in extremely mode.")
es_text = ans.executive_summary or ""
es_count = sum(1 for cid in cset if cid in es_text)
if es_count < 2:
increase ValueError("Govt abstract should embrace a minimum of 2 chunk_id citations verbatim.")
PLANNER = Agent(
identify="Planner",
mannequin="gpt-4o-mini",
directions=(
"Return a technical Plan schema.n"
"Make 10-16 retrieval_queries.n"
"Acceptance should embrace: a minimum of 6 citations and exec_summary comprises a minimum of 2 citations verbatim."
),
output_type=Plan,
)
SYNTHESIZER = Agent(
identify="Synthesizer",
mannequin="gpt-4o-mini",
directions=(
"Return UltraAnswer schema.n"
"Exhausting constraints:n"
"- executive_summary MUST embrace a minimum of TWO citations verbatim as: (cite: ).n"
"- citations have to be chosen ONLY from ALLOWED_CHUNK_IDS checklist.n"
"- citations checklist should embrace a minimum of 6 distinctive chunk_ids.n"
"- sources have to be subset of allowed URLs.n"
),
output_type=UltraAnswer,
)
FIXER = Agent(
identify="Fixer",
mannequin="gpt-4o-mini",
directions=(
"Restore to fulfill guardrails.n"
"Guarantee executive_summary consists of a minimum of TWO citations verbatim.n"
"Select citations ONLY from ALLOWED_CHUNK_IDS checklist.n"
"Return UltraAnswer schema."
),
output_type=UltraAnswer,
)
session = SQLiteSession("ultra_agentic_user", "ultra_agentic_session.db")
We collect proof by working a number of focused queries, fusing sparse and dense outcomes, and assembling proof packs with scores and provenance. We outline strict schemas for plans and ultimate solutions, then normalize and validate citations in opposition to retrieved chunk IDs. We implement arduous guardrails so each reply stays grounded and auditable. Try the FULL CODES right here.
async def run_ultra_agentic(query: str, urls: Listing[str], max_repairs: int = 2) -> UltraAnswer:
await build_index(urls)
recall_hint = json.dumps(episode_recall(query, top_k=2), indent=2)[:2000]
plan_res = await Runner.run(
PLANNER,
f"Query:n{query}nnAllowed URLs:n{json.dumps(ALLOWED_URLS, indent=2)}nnRecall:n{recall_hint}n",
session=session
)
plan: Plan = plan_res.final_output
queries = (plan.retrieval_queries or [])[:16]
evidence_packs, useful_sources, allowed_chunk_ids = await gather_evidence(queries)
evidence_json = json.dumps([p.model_dump() for p in evidence_packs], indent=2)[:16000]
allowed_chunk_ids_json = json.dumps(allowed_chunk_ids[:200], indent=2)
draft_res = await Runner.run(
SYNTHESIZER,
f"Query:n{query}nnAllowed URLs:n{json.dumps(ALLOWED_URLS, indent=2)}nn"
f"ALLOWED_CHUNK_IDS:n{allowed_chunk_ids_json}nn"
f"Proof packs:n{evidence_json}nn"
"Return UltraAnswer.",
session=session
)
draft = normalize_answer(draft_res.final_output, allowed_chunk_ids)
last_err = None
for i in vary(max_repairs + 1):
strive:
validate_ultra(draft, allowed_chunk_ids)
episode_store(query, ALLOWED_URLS, plan.retrieval_queries, useful_sources)
return draft
besides Exception as e:
last_err = str(e)
if i >= max_repairs:
draft = normalize_answer(draft, allowed_chunk_ids)
validate_ultra(draft, allowed_chunk_ids)
return draft
fixer_res = await Runner.run(
FIXER,
f"Query:n{query}nnAllowed URLs:n{json.dumps(ALLOWED_URLS, indent=2)}nn"
f"ALLOWED_CHUNK_IDS:n{allowed_chunk_ids_json}nn"
f"Guardrail error:n{last_err}nn"
f"Draft:n{json.dumps(draft.model_dump(), indent=2)[:12000]}nn"
f"Proof packs:n{evidence_json}nn"
"Return corrected UltraAnswer that passes guardrails.",
session=session
)
draft = normalize_answer(fixer_res.final_output, allowed_chunk_ids)
increase RuntimeError(f"Sudden failure: {last_err}")
query = (
"Design a production-lean however superior agentic AI workflow in Python with hybrid retrieval, "
"provenance-first citations, critique-and-repair loops, and episodic reminiscence. "
"Clarify why every layer issues, failure modes, and analysis."
)
urls = [
"https://openai.github.io/openai-agents-python/",
"https://openai.github.io/openai-agents-python/agents/",
"https://openai.github.io/openai-agents-python/running_agents/",
"https://github.com/openai/openai-agents-python",
]
ans = await run_ultra_agentic(query, urls, max_repairs=2)
print("nTITLE:n", ans.title)
print("nEXECUTIVE SUMMARY:n", ans.executive_summary)
print("nARCHITECTURE:")
for x in ans.structure:
print("-", x)
print("nRETRIEVAL STRATEGY:")
for x in ans.retrieval_strategy:
print("-", x)
print("nAGENT GRAPH:")
for x in ans.agent_graph:
print("-", x)
print("nIMPLEMENTATION NOTES:")
for x in ans.implementation_notes:
print("-", x)
print("nRISKS & LIMITS:")
for x in ans.risks_and_limits:
print("-", x)
print("nCITATIONS (chunk_ids):")
for c in ans.citations:
print("-", c)
print("nSOURCES:")
for s in ans.sources:
print("-", s)
We orchestrate the complete agentic loop by chaining planning, synthesis, validation, and restore in an async-safe pipeline. We robotically retry and repair outputs till they cross all constraints with out human intervention. We end by working a full instance and printing a totally grounded, production-ready agentic response.
In conclusion, we developed a complete agentic pipeline sturdy to frequent failure modes: unstable embedding shapes, quotation drift, and lacking grounding in govt summaries. We validated outputs in opposition to allowlisted sources, retrieved chunk IDs, robotically normalized citations, and injected deterministic citations when wanted to ensure compliance with out sacrificing correctness. By combining hybrid retrieval, critique-and-repair loops, and episodic reminiscence, we created a reusable basis we are able to lengthen with stronger evaluations (claim-to-evidence protection scoring, adversarial red-teaming, and regression checks) to constantly harden the system because it scales to new domains and bigger corpora.
Try the FULL CODES right here. Additionally, be happy to observe us on Twitter and don’t neglect to affix our 100k+ ML SubReddit and Subscribe to our Publication. Wait! are you on telegram? now you may be part of us on telegram as properly.
Elevate your perspective with NextTech Information, the place innovation meets perception.
Uncover the most recent breakthroughs, get unique updates, and join with a worldwide community of future-focused thinkers.
Unlock tomorrow’s traits in the present day: learn extra, subscribe to our publication, and develop into a part of the NextTech group at NextTech-news.com

