On this tutorial, we construct an AI-powered cryptographic agent system that mixes the energy of classical encryption with adaptive intelligence. We design brokers able to performing hybrid encryption with RSA and AES, producing digital signatures, detecting anomalies in message patterns, and intelligently recommending key rotations. As we progress, we witness these autonomous brokers securely set up communication channels, alternate encrypted knowledge, and constantly assess safety dangers in actual time, all inside a compact, environment friendly implementation. Take a look at the FULL CODES right here.
import hashlib, hmac, json, time, secrets and techniques, numpy as np
from dataclasses import dataclass
from typing import Dict, Record
from cryptography.hazmat.primitives.uneven import rsa, padding
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
@dataclass
class SecurityEvent:
timestamp: float
event_type: str
risk_score: float
particulars: Dict
We start by importing all vital libraries for cryptography, AI-based evaluation, and knowledge dealing with. We additionally outline a SecurityEvent dataclass to report and analyze all necessary occasions within the cryptographic system. Take a look at the FULL CODES right here.
class CryptoAgent:
def __init__(self, agent_id: str):
self.agent_id = agent_id
self.private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048, backend=default_backend())
self.public_key = self.private_key.public_key()
self.session_keys = {}
self.security_events = []
self.encryption_count = 0
self.key_rotation_threshold = 100
def get_public_key_bytes(self) -> bytes:
return self.public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
def establish_session(self, partner_id: str, partner_public_key_bytes: bytes) -> bytes:
session_key = secrets and techniques.token_bytes(32)
self.session_keys[partner_id] = session_key
partner_public_key = serialization.load_pem_public_key(partner_public_key_bytes, backend=default_backend())
encrypted_session_key = partner_public_key.encrypt(
session_key,
padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None)
)
self.log_security_event("SESSION_ESTABLISHED", 0.1, {"accomplice": partner_id})
return encrypted_session_key
def receive_session_key(self, partner_id: str, encrypted_session_key: bytes):
session_key = self.private_key.decrypt(
encrypted_session_key,
padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None)
)
self.session_keys[partner_id] = session_key
We outline the CryptoAgent class and initialize its keys, session storage, and safety monitoring system. We then implement strategies that permit brokers generate and alternate RSA public keys and set up safe hybrid-encrypted classes. Take a look at the FULL CODES right here.
def encrypt_message(self, partner_id: str, plaintext: str) -> Dict:
if partner_id not in self.session_keys:
elevate ValueError(f"No session established with {partner_id}")
self.encryption_count += 1
if self.encryption_count >= self.key_rotation_threshold:
self.log_security_event("KEY_ROTATION_NEEDED", 0.3, {"depend": self.encryption_count})
iv = secrets and techniques.token_bytes(12)
cipher = Cipher(algorithms.AES(self.session_keys[partner_id]), modes.GCM(iv), backend=default_backend())
encryptor = cipher.encryptor()
ciphertext = encryptor.replace(plaintext.encode()) + encryptor.finalize()
message_data = iv + ciphertext + encryptor.tag
signature = self.sign_data(message_data)
risk_score = self.analyze_encryption_pattern(len(plaintext))
return {
"sender": self.agent_id,
"recipient": partner_id,
"iv": iv.hex(),
"ciphertext": ciphertext.hex(),
"tag": encryptor.tag.hex(),
"signature": signature.hex(),
"timestamp": time.time(),
"risk_score": risk_score
}
def decrypt_message(self, encrypted_msg: Dict) -> str:
sender_id = encrypted_msg["sender"]
if sender_id not in self.session_keys:
elevate ValueError(f"No session established with {sender_id}")
iv = bytes.fromhex(encrypted_msg["iv"])
ciphertext = bytes.fromhex(encrypted_msg["ciphertext"])
tag = bytes.fromhex(encrypted_msg["tag"])
cipher = Cipher(algorithms.AES(self.session_keys[sender_id]), modes.GCM(iv, tag), backend=default_backend())
decryptor = cipher.decryptor()
plaintext = decryptor.replace(ciphertext) + decryptor.finalize()
if encrypted_msg.get("risk_score", 0) > 0.7:
self.log_security_event("HIGH_RISK_MESSAGE", 0.8, {"sender": sender_id})
return plaintext.decode()
We implement the encryption and decryption logic that enables brokers to speak securely utilizing AES-GCM. Every message is encrypted, signed, risk-scored, after which safely decrypted by the recipient, preserving authenticity and confidentiality. Take a look at the FULL CODES right here.
def sign_data(self, knowledge: bytes) -> bytes:
return self.private_key.signal(
knowledge,
padg.PSS(mgf=padg.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH),
hashes.SHA256()
)
def analyze_encryption_pattern(self, message_length: int) -> float:
recent_events = self.security_events[-10:] if len(self.security_events) >= 10 else self.security_events
avg_risk = np.imply([e.risk_score for e in recent_events]) if recent_events else 0.1
risk_score = 0.1
if message_length > 10000:
risk_score += 0.3
if self.encryption_count % 50 == 0 and self.encryption_count > 0:
risk_score += 0.2
risk_score = (risk_score + avg_risk) / 2
self.log_security_event("ENCRYPTION_ANALYSIS", risk_score, {"msg_len": message_length})
return min(risk_score, 1.0)
def log_security_event(self, event_type: str, risk_score: float, particulars: Dict):
occasion = SecurityEvent(timestamp=time.time(), event_type=event_type, risk_score=risk_score, particulars=particulars)
self.security_events.append(occasion)
def generate_security_report(self) -> Dict:
if not self.security_events:
return {"standing": "No occasions recorded"}
total_events = len(self.security_events)
high_risk_events = [e for e in self.security_events if e.risk_score > 0.7]
avg_risk = np.imply([e.risk_score for e in self.security_events])
event_types = {}
for occasion in self.security_events:
event_types[event.event_type] = event_types.get(occasion.event_type, 0) + 1
return {
"agent_id": self.agent_id,
"total_events": total_events,
"high_risk_events": len(high_risk_events),
"average_risk_score": spherical(avg_risk, 3),
"encryption_count": self.encryption_count,
"key_rotation_needed": self.encryption_count >= self.key_rotation_threshold,
"event_breakdown": event_types,
"security_status": "CRITICAL" if avg_risk > 0.7 else "WARNING" if avg_risk > 0.4 else "NORMAL"
}
We add superior AI-driven parts that analyze encryption conduct, detect anomalies, and log danger occasions. This snippet provides our agent adaptive intelligence, the flexibility to determine uncommon patterns, and the flexibility to generate an entire safety report. Take a look at the FULL CODES right here.
def demo_crypto_agent_system():
print("🔐 Superior Cryptographic Agent System Demon")
print("=" * 60)
alice = CryptoAgent("Alice")
bob = CryptoAgent("Bob")
print("n1. Brokers Created")
print(f" Alice ID: {alice.agent_id}")
print(f" Bob ID: {bob.agent_id}")
print("n2. Establishing Safe Session (Hybrid Encryption)")
alice_public_key = alice.get_public_key_bytes()
bob_public_key = bob.get_public_key_bytes()
encrypted_session_key = alice.establish_session("Bob", bob_public_key)
bob.receive_session_key("Alice", encrypted_session_key)
print(f" ✓ Session established with {len(encrypted_session_key)} byte encrypted key")
print("n3. Encrypting and Transmitting Messages")
messages = [
"Hello Bob! This is a secure message.",
"The launch codes are: Alpha-7-Charlie-9",
"Meeting at 3 PM tomorrow.",
"This is a very long message " * 100
]
for i, msg in enumerate(messages, 1):
encrypted = alice.encrypt_message("Bob", msg)
print(f"n Message {i}:")
print(f" - Plaintext size: {len(msg)} chars")
print(f" - Ciphertext: {encrypted['ciphertext'][:60]}...")
print(f" - Threat Rating: {encrypted['risk_score']:.3f}")
print(f" - Signature: {encrypted['signature'][:40]}...")
decrypted = bob.decrypt_message(encrypted)
print(f" - Decrypted: {decrypted[:60]}{'...' if len(decrypted) > 60 else ''}")
print(f" - Verification: {'✓ SUCCESS' if decrypted == msg else '✗ FAILED'}")
print("n4. AI-Powered Safety Evaluation")
print("n Alice's Safety Report:")
alice_report = alice.generate_security_report()
for ok, v in alice_report.gadgets(): print(f" - {ok}: {v}")
print("n Bob's Safety Report:")
bob_report = bob.generate_security_report()
for ok, v in bob_report.gadgets(): print(f" - {ok}: {v}")
print("n" + "=" * 60)
print("Demo Full! Key Options Demonstrated:")
print("✓ Hybrid encryption (RSA + AES-GCM)")
print("✓ Digital signatures for authentication")
print("✓ AI-powered anomaly detection")
print("✓ Clever key rotation suggestions")
print("✓ Actual-time safety monitoring")
if __name__ == "__main__":
demo_crypto_agent_system()
We exhibit the total cryptographic workflow during which two brokers securely alternate messages, detect anomalies, and assessment detailed safety stories. We conclude the demo with an summary of all of the clever options the system performs.
In conclusion, we exhibit how synthetic intelligence can improve conventional cryptography by introducing adaptability and context consciousness. We not solely encrypt and authenticate messages securely but in addition allow our brokers to be taught from communication conduct and dynamically regulate safety measures. By the tip, we see how combining AI-driven analytics with hybrid encryption empowers a brand new technology of clever, self-monitoring cryptographic programs.
Take a look at the FULL CODES right here. Be at liberty to take a look at our GitHub Web page for Tutorials, Codes and Notebooks. Additionally, be happy to observe us on Twitter and don’t neglect to affix our 100k+ ML SubReddit and Subscribe to our E-newsletter. Wait! are you on telegram? now you possibly can be a part of us on telegram as properly.
Asif Razzaq is the CEO of Marktechpost Media Inc.. As a visionary entrepreneur and engineer, Asif is dedicated to harnessing the potential of Synthetic Intelligence for social good. His most up-to-date endeavor is the launch of an Synthetic Intelligence Media Platform, Marktechpost, which stands out for its in-depth protection of machine studying and deep studying information that’s each technically sound and simply comprehensible by a large viewers. The platform boasts of over 2 million month-to-month views, illustrating its recognition amongst audiences.
Elevate your perspective with NextTech Information, the place innovation meets perception.
Uncover the newest breakthroughs, get unique updates, and join with a worldwide community of future-focused thinkers.
Unlock tomorrow’s tendencies at this time: learn extra, subscribe to our e-newsletter, and turn out to be a part of the NextTech neighborhood at NextTech-news.com