On this tutorial, we design an end-to-end, production-style analytics and modeling pipeline utilizing Vaex to function effectively on tens of millions of rows with out materializing knowledge in reminiscence. We generate a practical, large-scale dataset, engineer wealthy behavioral and city-level options utilizing lazy expressions and approximate statistics, and combination insights at scale. We then combine Vaex with scikit-learn to coach and consider a predictive mannequin, demonstrating how Vaex can act because the spine for high-performance exploratory evaluation and machine-learning workflows.
!pip -q set up "vaex==4.19.0" "vaex-core==4.19.0" "vaex-ml==0.19.0" "vaex-viz==0.6.0" "vaex-hdf5==0.15.0" "pyarrow>=14" "scikit-learn>=1.3"
import os, time, json, numpy as np, pandas as pd
import vaex
import vaex.ml
from vaex.ml.sklearn import Predictor
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_auc_score, average_precision_score
print("Python:", __import__("sys").model.break up()[0])
print("vaex:", vaex.__version__)
print("numpy:", np.__version__)
print("pandas:", pd.__version__)
rng = np.random.default_rng(7)
n = 2_000_000
cities = np.array(["Montreal","Toronto","Vancouver","Calgary","Ottawa","Edmonton","Quebec City","Winnipeg"], dtype=object)
metropolis = rng.alternative(cities, dimension=n, change=True, p=np.array([0.16,0.18,0.12,0.10,0.10,0.10,0.10,0.14]))
age = rng.integers(18, 75, dimension=n, endpoint=False).astype("int32")
tenure_m = rng.integers(0, 180, dimension=n, endpoint=False).astype("int32")
tx = rng.poisson(lam=22, dimension=n).astype("int32")
base_income = rng.lognormal(imply=10.6, sigma=0.45, dimension=n).astype("float64")
city_mult = pd.Collection({"Montreal":0.92,"Toronto":1.05,"Vancouver":1.10,"Calgary":1.02,"Ottawa":1.00,"Edmonton":0.98,"Quebec Metropolis":0.88,"Winnipeg":0.90}).reindex(metropolis).to_numpy()
earnings = (base_income * city_mult * (1.0 + 0.004*(age-35)) * (1.0 + 0.0025*np.minimal(tenure_m,120))).astype("float64")
earnings = np.clip(earnings, 18_000, 420_000)
noise = rng.regular(0, 1, dimension=n).astype("float64")
score_latent = (
0.55*np.log1p(earnings/1000.0)
+ 0.28*np.log1p(tx)
+ 0.18*np.sqrt(np.most(tenure_m,0)/12.0 + 1e-9)
- 0.012*(age-40)
+ 0.22*(metropolis == "Vancouver").astype("float64")
+ 0.15*(metropolis == "Toronto").astype("float64")
+ 0.10*(metropolis == "Ottawa").astype("float64")
+ 0.65*noise
)
p = 1.0/(1.0 + np.exp(-(score_latent - np.quantile(score_latent, 0.70))))
goal = (rng.random(n) < p).astype("int8")
df = vaex.from_arrays(metropolis=metropolis, age=age, tenure_m=tenure_m, tx=tx, earnings=earnings, goal=goal)
df["income_k"] = df.earnings / 1000.0
df["tenure_y"] = df.tenure_m / 12.0
df["log_income"] = df.earnings.log1p()
df["tx_per_year"] = df.tx / (df.tenure_y + 0.25)
df["value_score"] = (0.35*df.log_income + 0.20*df.tx_per_year + 0.10*df.tenure_y - 0.015*df.age)
df["is_new"] = df.tenure_m < 6
df["is_senior"] = df.age >= 60
print("nRows:", len(df), "Columns:", len(df.get_column_names()))
print(df[["city","age","tenure_m","tx","income","income_k","value_score","target"]].head(5))
We generate a big, practical artificial dataset and initialize a Vaex DataFrame to work lazily on tens of millions of rows. We engineer core numerical options instantly as expressions so no intermediate knowledge is materialized. We validate the setup by inspecting schema, row counts, and a small pattern of computed values.
encoder = vaex.ml.LabelEncoder(options=["city"])
df = encoder.fit_transform(df)
city_map = encoder.labels_["city"]
inv_city_map = {v:okay for okay,v in city_map.gadgets()}
n_cities = len(city_map)
p95_income_k_by_city = df.percentile_approx("income_k", 95, binby="label_encoded_city", form=n_cities, limits=[-0.5, n_cities-0.5])
p50_value_by_city = df.percentile_approx("value_score", 50, binby="label_encoded_city", form=n_cities, limits=[-0.5, n_cities-0.5])
avg_income_k_by_city = df.imply("income_k", binby="label_encoded_city", form=n_cities, limits=[-0.5, n_cities-0.5])
target_rate_by_city = df.imply("goal", binby="label_encoded_city", form=n_cities, limits=[-0.5, n_cities-0.5])
n_by_city = df.rely(binby="label_encoded_city", form=n_cities, limits=[-0.5, n_cities-0.5])
p95_income_k_by_city = np.asarray(p95_income_k_by_city).reshape(-1)
p50_value_by_city = np.asarray(p50_value_by_city).reshape(-1)
avg_income_k_by_city = np.asarray(avg_income_k_by_city).reshape(-1)
target_rate_by_city = np.asarray(target_rate_by_city).reshape(-1)
n_by_city = np.asarray(n_by_city).reshape(-1)
city_table = pd.DataFrame({
"city_id": np.arange(n_cities),
"metropolis": [inv_city_map[i] for i in vary(n_cities)],
"n": n_by_city.astype("int64"),
"avg_income_k": avg_income_k_by_city,
"p95_income_k": p95_income_k_by_city,
"median_value_score": p50_value_by_city,
"target_rate": target_rate_by_city
}).sort_values(["target_rate","p95_income_k"], ascending=False)
print("nCity abstract:")
print(city_table.to_string(index=False))
df_city_features = vaex.from_pandas(city_table[["city","p95_income_k","avg_income_k","median_value_score","target_rate"]], copy_index=False)
df = df.be a part of(df_city_features, on="metropolis", rsuffix="_city")
df["income_vs_city_p95"] = df.income_k / (df.p95_income_k + 1e-9)
df["value_vs_city_median"] = df.value_score - df.median_value_score
We encode categorical metropolis knowledge and compute scalable, approximate per-city statistics utilizing binning-based operations. We assemble these aggregates right into a city-level desk and be a part of them again to the principle dataset. We then derive relative options that examine every file in opposition to its metropolis context.
features_num = [
"age","tenure_y","tx","income_k","log_income","tx_per_year","value_score",
"p95_income_k","avg_income_k","median_value_score","target_rate",
"income_vs_city_p95","value_vs_city_median"
]
scaler = vaex.ml.StandardScaler(options=features_num, with_mean=True, with_std=True, prefix="z_")
df = scaler.fit_transform(df)
options = ["z_"+f for f in features_num] + ["label_encoded_city"]
df_train, df_test = df.split_random([0.80, 0.20], random_state=42)
mannequin = LogisticRegression(max_iter=250, solver="lbfgs", n_jobs=None)
vaex_model = Predictor(mannequin=mannequin, options=options, goal="goal", prediction_name="pred")
t0 = time.time()
vaex_model.match(df=df_train)
fit_s = time.time() - t0
df_test = vaex_model.rework(df_test)
y_true = df_test["target"].to_numpy()
y_pred = df_test["pred"].to_numpy()
auc = roc_auc_score(y_true, y_pred)
ap = average_precision_score(y_true, y_pred)
print("nModel:")
print("fit_seconds:", spherical(fit_s, 3))
print("test_auc:", spherical(float(auc), 4))
print("test_avg_precision:", spherical(float(ap), 4))
We standardize all numeric options utilizing Vaex’s ML utilities and put together a constant characteristic vector for modeling. We break up the dataset with out loading the complete dataset into reminiscence. We practice a logistic regression mannequin by means of Vaex’s sklearn wrapper and consider its predictive efficiency.
deciles = np.linspace(0, 1, 11)
cuts = np.quantile(y_pred, deciles)
cuts[0] = -np.inf
cuts[-1] = np.inf
bucket = np.digitize(y_pred, cuts[1:-1], proper=True).astype("int32")
df_test_local = vaex.from_arrays(y_true=y_true.astype("int8"), y_pred=y_pred.astype("float64"), bucket=bucket)
raise = df_test_local.groupby(by="bucket", agg={"n": vaex.agg.rely(), "fee": vaex.agg.imply("y_true"), "avg_pred": vaex.agg.imply("y_pred")}).kind("bucket")
lift_pd = raise.to_pandas_df()
baseline = float(df_test_local["y_true"].imply())
lift_pd["lift"] = lift_pd["rate"] / (baseline + 1e-12)
print("nDecile raise desk:")
print(lift_pd.to_string(index=False))
We analyze mannequin conduct by segmenting predictions into deciles and computing raise metrics. We calculate baseline charges and examine them throughout rating buckets to evaluate rating high quality. We summarize the leads to a compact raise desk that displays real-world mannequin diagnostics.
out_dir = "/content material/vaex_artifacts"
os.makedirs(out_dir, exist_ok=True)
parquet_path = os.path.be a part of(out_dir, "customers_vaex.parquet")
state_path = os.path.be a part of(out_dir, "vaex_pipeline.json")
base_cols = ["city","label_encoded_city","age","tenure_m","tenure_y","tx","income","income_k","value_score","target"]
export_cols = base_cols + ["z_"+f for f in features_num]
df_export = df[export_cols].pattern(n=500_000, random_state=1)
if os.path.exists(parquet_path):
os.take away(parquet_path)
df_export.export_parquet(parquet_path)
pipeline_state = {
"vaex_version": vaex.__version__,
"encoder_labels": {okay: {str(kk): int(vv) for kk,vv in v.gadgets()} for okay,v in encoder.labels_.gadgets()},
"scaler_mean": [float(x) for x in scaler.mean_],
"scaler_std": [float(x) for x in scaler.std_],
"features_num": features_num,
"export_cols": export_cols,
}
with open(state_path, "w") as f:
json.dump(pipeline_state, f)
df_reopen = vaex.open(parquet_path)
df_reopen["income_k"] = df_reopen.earnings / 1000.0
df_reopen["tenure_y"] = df_reopen.tenure_m / 12.0
df_reopen["log_income"] = df_reopen.earnings.log1p()
df_reopen["tx_per_year"] = df_reopen.tx / (df_reopen.tenure_y + 0.25)
df_reopen["value_score"] = (0.35*df_reopen.log_income + 0.20*df_reopen.tx_per_year + 0.10*df_reopen.tenure_y - 0.015*df_reopen.age)
df_city_features = vaex.from_pandas(city_table[["city","p95_income_k","avg_income_k","median_value_score","target_rate"]], copy_index=False)
df_reopen = df_reopen.be a part of(df_city_features, on="metropolis", rsuffix="_city")
df_reopen["income_vs_city_p95"] = df_reopen.income_k / (df_reopen.p95_income_k + 1e-9)
df_reopen["value_vs_city_median"] = df_reopen.value_score - df_reopen.median_value_score
with open(state_path, "r") as f:
st = json.load(f)
labels_city = {okay: int(v) for okay,v in st["encoder_labels"]["city"].gadgets()}
df_reopen["label_encoded_city"] = df_reopen.metropolis.map(labels_city, default_value=-1)
for i, feat in enumerate(st["features_num"]):
mean_i = st["scaler_mean"][i]
std_i = st["scaler_std"][i] if st["scaler_std"][i] != 0 else 1.0
df_reopen["z_"+feat] = (df_reopen[feat] - mean_i) / std_i
df_reopen = vaex_model.rework(df_reopen)
print("nArtifacts written:")
print(parquet_path)
print(state_path)
print("nReopened parquet predictions (head):")
print(df_reopen[["city","income_k","value_score","pred","target"]].head(8))
print("nDone.")
We export a big, feature-complete pattern to Parquet and persist the total preprocessing state for reproducibility. We reload the information and deterministically rebuild all engineered options from saved metadata. We run inference on the reloaded dataset to verify that the pipeline stays steady and deployable end-to-end.
In conclusion, we demonstrated how Vaex allows quick, memory-efficient knowledge processing whereas nonetheless supporting superior characteristic engineering, aggregation, and mannequin integration. We confirmed that approximate statistics, lazy computation, and out-of-core execution permit us to scale cleanly from evaluation to deployment-ready artifacts. By exporting reproducible options and persisting the total pipeline state, we closed the loop from uncooked knowledge to inference, illustrating how Vaex matches naturally into fashionable large-data Python workflows.
Try the Full Codes right here. Additionally, be at liberty to comply with us on Twitter and don’t neglect to affix our 120k+ ML SubReddit and Subscribe to our E-newsletter. Wait! are you on telegram? now you may be a part of us on telegram as properly.
Elevate your perspective with NextTech Information, the place innovation meets perception.
Uncover the newest breakthroughs, get unique updates, and join with a world community of future-focused thinkers.
Unlock tomorrow’s developments at this time: learn extra, subscribe to our e-newsletter, and turn out to be a part of the NextTech group at NextTech-news.com

