On this tutorial, we implement an end-to-end Sensible Byzantine Fault Tolerance (PBFT) simulator utilizing asyncio. We mannequin a sensible distributed community with asynchronous message passing, configurable delays, and Byzantine nodes that deliberately deviate from the protocol. By explicitly implementing the pre-prepare, put together, and commit phases, we discover how PBFT achieves consensus beneath adversarial circumstances whereas respecting the theoretical 3f+1 certain. We additionally instrument the system to measure consensus latency and success charges because the variety of malicious nodes will increase, permitting us to empirically observe the boundaries of Byzantine fault tolerance.
import asyncio
import random
import time
import hashlib
from dataclasses import dataclass, discipline
from typing import Dict, Set, Tuple, Elective, Listing
import matplotlib.pyplot as plt
PREPREPARE = "PREPREPARE"
PREPARE = "PREPARE"
COMMIT = "COMMIT"
@dataclass(frozen=True)
class Msg:
typ: str
view: int
seq: int
digest: str
sender: int
@dataclass
class NetConfig:
min_delay_ms: int = 5
max_delay_ms: int = 40
drop_prob: float = 0.0
reorder_prob: float = 0.0
We set up the simulator’s basis by importing the required libraries and defining the core PBFT message varieties. We formalize community messages and parameters utilizing dataclasses to make sure structured, constant communication. We additionally outline constants representing the three PBFT phases used all through the system.
class Community:
def __init__(self, cfg: NetConfig):
self.cfg = cfg
self.nodes: Dict[int, "Node"] = {}
def register(self, node: "Node"):
self.nodes[node.nid] = node
async def ship(self, dst: int, msg: Msg):
if random.random() < self.cfg.drop_prob:
return
d = random.uniform(self.cfg.min_delay_ms, self.cfg.max_delay_ms) / 1000.0
await asyncio.sleep(d)
if random.random() < self.cfg.reorder_prob:
await asyncio.sleep(random.uniform(0.0, 0.02))
await self.nodes[dst].inbox.put(msg)
async def broadcast(self, src: int, msg: Msg):
duties = []
for nid in self.nodes.keys():
duties.append(asyncio.create_task(self.ship(nid, msg)))
await asyncio.collect(*duties)
We implement an asynchronous community layer that simulates real-world message supply with delays, reordering, and potential drops. We register nodes dynamically and use asyncio duties to broadcast messages throughout the simulated community. We mannequin non-deterministic communication conduct that immediately impacts consensus latency and robustness.
@dataclass
class NodeConfig:
n: int
f: int
primary_id: int = 0
view: int = 0
timeout_s: float = 2.0
class Node:
def __init__(self, nid: int, web: Community, cfg: NodeConfig, byzantine: bool = False):
self.nid = nid
self.web = web
self.cfg = cfg
self.byzantine = byzantine
self.inbox: asyncio.Queue[Msg] = asyncio.Queue()
self.preprepare_seen: Dict[int, str] = {}
self.prepare_votes: Dict[Tuple[int, str], Set[int]] = {}
self.commit_votes: Dict[Tuple[int, str], Set[int]] = {}
self.dedicated: Dict[int, str] = {}
self.working = True
@property
def f(self) -> int:
return self.cfg.f
def _q_prepare(self) -> int:
return 2 * self.f + 1
def _q_commit(self) -> int:
return 2 * self.f + 1
@staticmethod
def digest_of(payload: str) -> str:
return hashlib.sha256(payload.encode("utf-8")).hexdigest()
We outline the configuration and inside state of every PBFT node taking part within the protocol. We initialize knowledge buildings for monitoring pre-prepare, put together, and commit votes whereas supporting each trustworthy and Byzantine conduct. We additionally implement quorum threshold logic and deterministic digest technology for request validation.
async def suggest(self, payload: str, seq: int):
if self.nid != self.cfg.primary_id:
increase ValueError("Solely the first can suggest on this simplified simulator.")
if not self.byzantine:
dig = self.digest_of(payload)
msg = Msg(PREPREPARE, self.cfg.view, seq, dig, self.nid)
await self.web.broadcast(self.nid, msg)
return
for dst in self.web.nodes.keys():
variant = f"{payload}::to={dst}::salt={random.randint(0,10**9)}"
dig = self.digest_of(variant)
msg = Msg(PREPREPARE, self.cfg.view, seq, dig, self.nid)
await self.web.ship(dst, msg)
async def handle_preprepare(self, msg: Msg):
seq = msg.seq
dig = msg.digest
if self.byzantine:
if random.random() < 0.5:
return
fake_dig = dig if random.random() < 0.5 else self.digest_of(dig + "::faux")
out = Msg(PREPARE, msg.view, seq, fake_dig, self.nid)
await self.web.broadcast(self.nid, out)
return
if seq not in self.preprepare_seen:
self.preprepare_seen[seq] = dig
out = Msg(PREPARE, msg.view, seq, dig, self.nid)
await self.web.broadcast(self.nid, out)
async def handle_prepare(self, msg: Msg):
seq, dig = msg.seq, msg.digest
key = (seq, dig)
voters = self.prepare_votes.setdefault(key, set())
voters.add(msg.sender)
if self.byzantine:
return
if self.preprepare_seen.get(seq) != dig:
return
if len(voters) >= self._q_prepare():
out = Msg(COMMIT, msg.view, seq, dig, self.nid)
await self.web.broadcast(self.nid, out)
async def handle_commit(self, msg: Msg):
seq, dig = msg.seq, msg.digest
key = (seq, dig)
voters = self.commit_votes.setdefault(key, set())
voters.add(msg.sender)
if self.byzantine:
return
if self.preprepare_seen.get(seq) != dig:
return
if seq in self.dedicated:
return
if len(voters) >= self._q_commit():
self.dedicated[seq] = dig
We implement the core PBFT protocol logic, together with proposal dealing with and the pre-prepare and put together phases. We explicitly mannequin Byzantine equivocation by permitting malicious nodes to ship conflicting digests to totally different friends. We advance the protocol to the commit part as soon as the required put together quorum is reached.
async def run(self):
whereas self.working:
msg = await self.inbox.get()
if msg.typ == PREPREPARE:
await self.handle_preprepare(msg)
elif msg.typ == PREPARE:
await self.handle_prepare(msg)
elif msg.typ == COMMIT:
await self.handle_commit(msg)
def cease(self):
self.working = False
def pbft_params(n: int) -> int:
return (n - 1) // 3
async def run_single_consensus(
n: int,
malicious: int,
net_cfg: NetConfig,
payload: str = "tx: pay Alice->Bob 5",
seq: int = 1,
timeout_s: float = 2.0,
seed: Elective[int] = None
) -> Dict[str, object]:
if seed just isn't None:
random.seed(seed)
f_max = pbft_params(n)
f = f_max
web = Community(net_cfg)
cfg = NodeConfig(n=n, f=f, primary_id=0, view=0, timeout_s=timeout_s)
mal_set = set(random.pattern(vary(n), ok=min(malicious, n)))
nodes: Listing[Node] = []
for i in vary(n):
node = Node(i, web, cfg, byzantine=(i in mal_set))
web.register(node)
nodes.append(node)
duties = [asyncio.create_task(node.run()) for node in nodes]
t0 = time.perf_counter()
await nodes[cfg.primary_id].suggest(payload, seq)
trustworthy = [node for node in nodes if not node.byzantine]
goal = max(1, len(trustworthy))
committed_honest = 0
latency = None
async def poll_commits():
nonlocal committed_honest, latency
whereas True:
committed_honest = sum(1 for node in trustworthy if seq in node.dedicated)
if committed_honest >= goal:
latency = time.perf_counter() - t0
return
await asyncio.sleep(0.005)
strive:
await asyncio.wait_for(poll_commits(), timeout=timeout_s)
success = True
besides asyncio.TimeoutError:
success = False
latency = None
for node in nodes:
node.cease()
for job in duties:
job.cancel()
await asyncio.collect(*duties, return_exceptions=True)
digest_set = set(node.dedicated.get(seq) for node in trustworthy if seq in node.dedicated)
agreed = (len(digest_set) == 1) if success else False
return {
"n": n,
"f": f,
"malicious": malicious,
"mal_set": mal_set,
"success": success,
"latency_s": latency,
"honest_committed": committed_honest,
"honest_total": len(trustworthy),
"agreed_digest": agreed,
}
We full the PBFT state machine by processing commit messages and finalizing choices as soon as commit quorums are happy. We run the node occasion loop to constantly course of incoming messages asynchronously. We additionally embrace lifecycle controls to soundly cease nodes after every experiment run.
async def latency_sweep(
n: int = 10,
max_malicious: Elective[int] = None,
trials_per_point: int = 5,
timeout_s: float = 2.0,
net_cfg: Elective[NetConfig] = None,
seed: int = 7
):
if net_cfg is None:
net_cfg = NetConfig(min_delay_ms=5, max_delay_ms=35, drop_prob=0.0, reorder_prob=0.05)
if max_malicious is None:
max_malicious = n
outcomes = []
random.seed(seed)
for m in vary(0, max_malicious + 1):
latencies = []
successes = 0
agreements = 0
for t in vary(trials_per_point):
out = await run_single_consensus(
n=n,
malicious=m,
net_cfg=net_cfg,
timeout_s=timeout_s,
seed=seed + 1000*m + t
)
outcomes.append(out)
if out["success"]:
successes += 1
latencies.append(out["latency_s"])
if out["agreed_digest"]:
agreements += 1
avg_lat = sum(latencies)/len(latencies) if latencies else None
print(
f"malicious={m:second} | success={successes}/{trials_per_point} "
f"| avg_latency={avg_lat if avg_lat just isn't None else 'NA'} "
f"| digest_agreement={agreements}/{successes if successes else 1}"
)
return outcomes
def plot_latency(outcomes: Listing[Dict[str, object]], trials_per_point: int):
by_m = {}
for r in outcomes:
m = r["malicious"]
by_m.setdefault(m, []).append(r)
xs, ys = [], []
success_rate = []
for m in sorted(by_m.keys()):
group = by_m[m]
lats = [g["latency_s"] for g in group if g["latency_s"] just isn't None]
succ = sum(1 for g in group if g["success"])
xs.append(m)
ys.append(sum(lats)/len(lats) if lats else float("nan"))
success_rate.append(succ / len(group))
plt.determine()
plt.plot(xs, ys, marker="o")
plt.xlabel("Variety of malicious (Byzantine) nodes")
plt.ylabel("Consensus latency (seconds) — avg over successes")
plt.title("PBFT Simulator: Latency vs Malicious Nodes")
plt.grid(True)
plt.present()
plt.determine()
plt.plot(xs, success_rate, marker="o")
plt.xlabel("Variety of malicious (Byzantine) nodes")
plt.ylabel("Success price")
plt.title("PBFT Simulator: Success Charge vs Malicious Nodes")
plt.ylim(-0.05, 1.05)
plt.grid(True)
plt.present()
async def principal():
n = 10
trials = 6
f = pbft_params(n)
print(f"n={n} => PBFT theoretical max f = ground((n-1)/3) = {f}")
print("Principle: security/liveness sometimes assumed when malicious <= f and timing assumptions maintain.n")
outcomes = await latency_sweep(
n=n,
max_malicious=min(n, f + 6),
trials_per_point=trials,
timeout_s=2.0,
net_cfg=NetConfig(min_delay_ms=5, max_delay_ms=35, drop_prob=0.0, reorder_prob=0.05),
seed=11
)
plot_latency(outcomes, trials)
await principal()
We orchestrate large-scale experiments by sweeping throughout totally different numbers of malicious nodes and gathering latency statistics. We combination outcomes to investigate consensus success charges and visualize system conduct utilizing plots. We run the complete experiment pipeline and observe how PBFT degrades because the variety of Byzantine faults approaches and exceeds theoretical limits.
In conclusion, we gained hands-on perception into how PBFT behaves past textbook ensures and the way adversarial stress impacts each latency and liveness in apply. We noticed how quorum thresholds implement security, why consensus breaks down as soon as Byzantine nodes exceed the tolerated certain, and the way asynchronous networks amplify these results. This implementation offers a sensible basis for experimenting with extra superior distributed-systems ideas, similar to view adjustments, chief rotation, or authenticated messaging. It helps us construct instinct for the design trade-offs that underpin fashionable blockchain and distributed belief programs.
Take a look at the Full Codes right here. Additionally, be at liberty to observe us on Twitter and don’t overlook to affix our 120k+ ML SubReddit and Subscribe to our E-newsletter. Wait! are you on telegram? now you’ll be able to be part of us on telegram as nicely.
Elevate your perspective with NextTech Information, the place innovation meets perception.
Uncover the most recent breakthroughs, get unique updates, and join with a worldwide community of future-focused thinkers.
Unlock tomorrow’s tendencies at this time: learn extra, subscribe to our publication, and turn out to be a part of the NextTech neighborhood at NextTech-news.com

