From Model to HDB App

- 23 mins read

Author: See Hiong

Building on my previous post, this article outlines how to move the selected features into a Kubeflow Pipeline (KFP) for a full model training and deployment workflow. The pipeline fetches input data from S3-compatible MinIO, trains a model using XGBoost with optimized feature selection, and deploys it via KServe. At the end of this journey, a Streamlit app enables users to make resale price predictions by entering flat attributes.

Deploying the InferenceService

Following the official KServe documentation, let’s begin by setting up the necessary components to enable model inference from an S3 backend.

1. Create Service Account

Define a service account that references the secret used to access MinIO.

kubectl apply -f s3-secret.yaml
# s3-secret.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
  name: sa
  namespace: kubeflow-user-example-com
secrets:
- name: minio-creds

2. Create S3 Secret

Configure your secret to include the MinIO credentials and required annotations for KServe to read from the S3 bucket.

kubectl apply -f secret-minio.yaml
# secret-minio.yaml
apiVersion: v1
kind: Secret
metadata:
  name: minio-creds
  namespace: kubeflow-user-example-com
  annotations:
     serving.kserve.io/s3-endpoint: "minio-service.kubeflow:9000"
     serving.kserve.io/s3-usehttps: "0"
     serving.kserve.io/s3-useanoncredential: "false"
type: Opaque
stringData:
  AWS_ACCESS_KEY_ID: "minio"
  AWS_SECRET_ACCESS_KEY: "minio123"

3. Deploy the InferenceService

Using the XGBoost serving guide, deploy the model. This step will also be handled automatically within the pipeline’s Step 6.

Note: Ensure the storageUri points to the correct S3 path that contains the XGBoost .bst model file.

kubectl apply -f kserve-inference.yaml --server-side=true

# Sample S3 minio url
# storageUri: "s3://mlpipeline/v2/artifacts/hdb-xgboost-retraining-pipeline-multi-step/053453f4-8e52-4e4a-b2b1-1f8658b92b54/train-model/15ee1507-42e0-4a05-b06f-bd0fa12a7a42/output_model"
# kserve-inference.yaml
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
  name: hdb-resale-xgb
  namespace: kubeflow-user-example-com
  annotations:
    autoscaling.knative.dev/maxScale: '1'
    autoscaling.knative.dev/minScale: '0'
spec:
  predictor:
    serviceAccountName: sa
    model:
      modelFormat:
        name: xgboost
      protocolVersion: v2
      runtime: kserve-xgbserver
      storageUri: "s3://mlpipeline/hdb-resale-data/xgb-model-test/model.bst"

4. Grant Istio Access

To allow prediction requests, create an Istio AuthorizationPolicy that explicitly allows POST and GET operations on the deployed model endpoint.

kubectl apply -f allow-hdb-xgb-policy.yaml
# allow-hdb-xgb-policy.yaml
apiVersion: security.istio.io/v1
kind: AuthorizationPolicy
metadata:
  name: allow-hdb-resale-xgb-access
  namespace: kubeflow-user-example-com
spec:
  selector:
    matchLabels:
      serving.kserve.io/inferenceservice: hdb-resale-xgb
  action: ALLOW
  rules:
  - to:
    - operation:
        methods: ["POST","GET"]

5. Run a Prediction

Once the pipeline completes successfully, prepare an inference input JSON file and use curl to invoke the model endpoint. The output will contain the predicted resale prices.

import json

payload = {
    "inputs": [{
        "name": "input-0",
        "shape": [5, 1],
        "datatype": "FP32",
        "data": [
            0.4311926605504587,
            0.2844036697247706,
            0.27522935779816515,
            0.15137614678899083,
            0.4357798165137615
          ]
    }]
}

with open("inference_input.json", "w") as f:
    json.dump(payload, f, indent=2)
SERVICE_URL="http://hdb-resale-xgb.kubeflow-user-example-com.svc.cluster.local"
MODEL_NAME="hdb-resale-xgb"

# Construct the V2 inference endpoint path
INFERENCE_ENDPOINT="${SERVICE_URL}/v2/models/${MODEL_NAME}/infer"

echo "Sending request to: ${INFERENCE_ENDPOINT}"

curl -v -H "Content-Type: application/json" "${INFERENCE_ENDPOINT}" -d @inference_input.json

# Sample Output
# *   Trying 10.98.223.247:80...
# * Connected to hdb-resale-xgb.kubeflow-user-example-com.svc.cluster.local (10.98.223.247) port 80 (#0)
# > POST /v2/models/hdb-resale-xgb/infer HTTP/1.1
# > Host: hdb-resale-xgb.kubeflow-user-example-com.svc.cluster.local
# ...
# < 
# * Connection #0 to host hdb-resale-xgb.kubeflow-user-example-com.svc.cluster.local left intact
# {"model_name":"hdb-resale-xgb","model_version":null,"id":"6d902b7b-a172-4d14-84dd-cc8a004f1200","parameters":null,"outputs":[{ "name":"output-0","shape":[5],"datatype":"FP32","parameters":null,"data":[600095.875,579441.5625,479052.34375,336679.75,586596

hdb-kserve-hdb-resale-xgb

6. Download Scaler File

For accurate predictions, use the same scaler from the training process. Port-forward your MinIO service and download the scaler from the designated path after the pipeline run.

hdb-download-scaler-path


Multistep pipeline

This section outlines the custom Kubeflow pipeline built using KFP DSL. The pipeline handles everything from data ingestion to model deployment. While Kubeflow’s artifact passing is recommended, I encountered limitations and resorted to using OutputPath() and native types for smooth data transfer between steps.

Step 1 - Load & Prepare

This component loads the dataset from MinIO, parses the JSON containing selected features, and splits the data into X (features) and y (target resale price).

Component 1 - python code
import kfp
from kfp import dsl
from kfp.compiler import Compiler
from kfp.dsl import Input, Output, InputPath, OutputPath, Dataset, Metrics, Model, Artifact
import json
from typing import NamedTuple

# ==============================================================================
# Component 1: Load and Prepare Data
# ==============================================================================
@dsl.component(
    base_image='python:3.10',
    packages_to_install=[
        'pandas==2.1.4',
        'numpy==1.24.4',
        's3fs==2025.3.2',
        'pyarrow==19.0.1',
        'kfp==2.12.1',
    ]
)
def load_and_prep_data(
    s3_data_uri: str,
    target_column: str,
    features_json: str,
    x_data_path: OutputPath('Dataset'),
    y_data_path: OutputPath('Dataset'),
) -> NamedTuple('LoadDataOutput', [
    ('feature_names_json_out', str) 
]):
    # Define the NamedTuple
    LoadDataOutput = NamedTuple('LoadDataOutput', [
        ('feature_names_json_out', str)
    ])
    
    import pandas as pd
    import numpy as np
    import os
    import json
    import s3fs

    print("--- Step 1: Load and Prep Data ---")
    print(f"Input S3 URI: {s3_data_uri}")
    print(f"Target Column: {target_column}")

    # Load Data
    minio_endpoint_url = os.environ.get('AWS_ENDPOINT_URL', 'http://minio-service.kubeflow:9000')
    minio_access_key = os.environ.get('AWS_ACCESS_KEY_ID', 'minio')
    minio_secret_key = os.environ.get('AWS_SECRET_ACCESS_KEY', 'minio123')

    s3_storage_options = {
        'key': minio_access_key,
        'secret': minio_secret_key,
        'client_kwargs': {
            'endpoint_url': minio_endpoint_url
        }
    }
    print("Reading parquet from S3...")
    df_processed = pd.read_parquet(s3_data_uri, storage_options=s3_storage_options)
    print(f"Loaded data shape: {df_processed.shape}")

    # Parse Features & Split X/y
    feature_names = json.loads(features_json)
    print(f"Using pre-defined fixed feature set ({len(feature_names)} features).")

    missing_features = [f for f in feature_names if f not in df_processed.columns]
    if missing_features:
         raise ValueError(f"Fixed features not found in dataset: {missing_features}")

    X_data = df_processed[feature_names]
    y_data = df_processed[[target_column]]

    print(f"X_data shape: {X_data.shape}")
    print(f"y_data shape: {y_data.shape}")

    # Save Outputs
    X_data.to_parquet(x_data_path)
    y_data.to_parquet(y_data_path)

    print("--- Step 1 Complete ---")
    return LoadDataOutput(feature_names_json_out=features_json)

Step 2 - Split & Scale

Performs train/test splitting and applies scaling. The fitted scaler is saved for later use in inference.

Component 2 - python code
# ==============================================================================
# Component 2: Split and Scale Data
# ==============================================================================
@dsl.component(
    base_image='python:3.10',
    packages_to_install=[
        'pandas==2.1.4',
        'numpy==1.24.4',
        'scikit-learn==1.6.1',
        'joblib==1.4.2',
        'pyarrow==19.0.1',
        'kfp==2.12.1',
    ]
)
def split_and_scale_data(
    x_data_path: InputPath('Dataset'),
    y_data_path: InputPath('Dataset'),
    feature_names_json: str,
    use_scaling: bool,
    # Outputs
    x_train_path: OutputPath('Dataset'),
    x_test_path: OutputPath('Dataset'),
    y_train_path: OutputPath('Dataset'),
    y_test_path: OutputPath('Dataset'),
    scaler_path: OutputPath('Artifact')
):
    import pandas as pd
    import numpy as np
    import joblib
    import json
    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import MinMaxScaler
    import os

    print("--- Step 2: Split and Scale Data ---")
    print(f"Use Scaling: {use_scaling}")

    # Load Inputs
    X_data = pd.read_parquet(x_data_path)
    y_data = pd.read_parquet(y_data_path)
    y_data = y_data.iloc[:, 0]
    print(f"Loaded X shape: {X_data.shape}, y shape: {y_data.shape}")
    feature_names = json.loads(feature_names_json)
    print(f"Loaded feature names list ({len(feature_names)} features).")
    
    # Split
    X_train, X_test, y_train, y_test = train_test_split(X_data, y_data, test_size=0.2, random_state=42)
    print(f"X_train: {X_train.shape}, X_test: {X_test.shape}, y_train: {y_train.shape}, y_test: {y_test.shape}")

    # Scale
    X_scaler = None
    scaler_saved = False
    if use_scaling:
        print("Applying MinMaxScaler.")
        X_scaler = MinMaxScaler()
        # Fit on Training, Transform Train and Test
        X_train_scaled_np = X_scaler.fit_transform(X_train)
        X_test_scaled_np = X_scaler.transform(X_test)
        # Convert back to DataFrame with original index and columns
        X_train = pd.DataFrame(X_train_scaled_np, columns=feature_names, index=X_train.index)
        X_test = pd.DataFrame(X_test_scaled_np, columns=feature_names, index=X_test.index)
        print(f"X_train range (scaled): {X_train.values.min():.4f} to {X_train.values.max():.4f}")

        # Save the scaler
        print(f"Saving scaler to: {scaler_path}")
        joblib.dump(X_scaler, scaler_path)
        scaler_saved = True
    else:
        print("Using Original Features (No Scaling).")
        print(f"No scaler used, creating empty placeholder artifact at: {scaler_path}")
        with open(scaler_path, 'w') as f:
            f.write('')

    # Save Outputs
    X_train.to_parquet(x_train_path)
    X_test.to_parquet(x_test_path)
    pd.DataFrame(y_train).to_parquet(y_train_path)
    pd.DataFrame(y_test).to_parquet(y_test_path)

    print(f"--- Step 2 Complete (Scaler Saved: {scaler_saved}) ---")

Step 3 - Train Model

An XGBoost model is trained using the core API, with the run tracked via MLflow. The model is saved in .bst format as required by KServe.

Component 3 - python code
# ==============================================================================
# Component 3: Train Model
# ==============================================================================
@dsl.component(
    base_image='python:3.10',
    packages_to_install=[
        'pandas==2.1.4',
        'numpy==1.24.4',
        'xgboost==2.1.1',
        'mlflow==2.21.3',
        'pyarrow==19.0.1',
        'joblib==1.4.2',
        'kfp==2.12.1',
        'scikit-learn==1.6.1'
    ]
)
def train_model(
    # Data inputs
    x_train_path: InputPath('Dataset'),
    x_test_path: InputPath('Dataset'),
    y_train_path: InputPath('Dataset'),
    y_test_path: InputPath('Dataset'),
    scaler_path: InputPath('Artifact'),
    use_scaling: bool, 
    feature_names_json: str,
    # Hyperparameters
    n_estimators: int,
    learning_rate: float,
    max_depth: int,
    early_stopping_rounds: int,
    colsample_bytree: float,
    min_child_weight: int,
    subsample: float,
    random_state: int,
    # MLflow info
    run_name: str,
    mlflow_tracking_uri: str,
    mlflow_experiment_name: str,
    # Outputs
    output_model: Output[Model],
) -> NamedTuple('TrainOutput', [
    ('mlflow_run_id_out', str)
]):
    # Define NamedTuple
    TrainOutput = NamedTuple('TrainOutput', [('mlflow_run_id_out', str)])
    
    import pandas as pd
    import numpy as np
    import xgboost as xgb
    import mlflow
    import mlflow.xgboost
    import os
    import joblib
    import json
    from pathlib import Path
    import warnings

    # Ignore the UBJSON format warning during saving
    warnings.filterwarnings('ignore', category=UserWarning, module='xgboost.core')
    
    print("--- Step 3: Train Model ---")
    print(f"MLflow Tracking URI: {mlflow_tracking_uri}")
    print(f"MLflow Experiment Name: {mlflow_experiment_name}")
    print(f"Using XGBoost version: {xgb.__version__}")
    print(f"Hyperparameters: n_est={n_estimators}, lr={learning_rate}, depth={max_depth}, early_stop={early_stopping_rounds}, colsample={colsample_bytree}, min_child={min_child_weight}, subsample={subsample}")

    # Load Data
    print("Loading training/testing data")
    X_train_df = pd.read_parquet(x_train_path)
    X_test_df = pd.read_parquet(x_test_path)
    y_train_df = pd.read_parquet(y_train_path)
    y_test_df = pd.read_parquet(y_test_path)

    # Ensure y is a 1D array
    y_train = y_train_df.iloc[:, 0].values
    y_test = y_test_df.iloc[:, 0].values
    
    # Get feature names and enforce order in DataFrame
    feature_names = json.loads(feature_names_json)
    print(f"Loaded feature names list ({len(feature_names)} features).")

    # Ensure feature order matches the expected order
    X_train = X_train_df[feature_names]
    X_test = X_test_df[feature_names]
    print(f"Verified feature order in datasets. X_train: {X_train.shape}, X_test: {X_test.shape}")

    # Convert to NumPy FOR TRAINING - crucial step
    print("Converting data to NumPy arrays for DMatrix creation...")
    X_train_np = X_train_df.values
    X_test_np = X_test_df.values
    
    # MLflow Setup and Start Run
    mlflow.set_tracking_uri(mlflow_tracking_uri)
    mlflow.set_experiment(mlflow_experiment_name)

    # Configure MLflow autologging - DISABLE model logging
    mlflow.xgboost.autolog(
        log_input_examples=False,
        log_model_signatures=False,
        log_models=False,
        log_datasets=False,
        disable=False,
        registered_model_name=None
    )

    # Start MLflow Run
    kfp_run_id = os.environ.get('KFP_RUN_ID', 'local')
    final_run_name = f"{run_name}_{kfp_run_id}"
    print(f"Starting MLflow run with name: {final_run_name}")
    
    with mlflow.start_run(run_name=final_run_name) as run:
        current_run_id = run.info.run_id
        print(f"Started MLflow Run ID: {current_run_id}")

        # Log parameters
        mlflow.log_params({
            "feature_set_name": final_run_name,
            "num_features": len(feature_names),
            "use_scaling": use_scaling,
            "n_estimators": n_estimators,
            "learning_rate": learning_rate,
            "max_depth": max_depth,
            "early_stopping_rounds": early_stopping_rounds,
            "colsample_bytree": colsample_bytree,
            "min_child_weight": min_child_weight,
            "subsample": subsample,
            "random_state": random_state,
            "model_type": "XGBoost",
            "training_api": "xgb.train",
            "framework": f"xgboost=={xgb.__version__}"
        })

        # Log feature names as artifact
        try:
            temp_feat_path = "feature_names.json" 
            with open(temp_feat_path, "w") as f:
                json.dump(feature_names, f)
            mlflow.log_artifact(temp_feat_path, artifact_path="features")
            print("Feature names list logged as artifact.")
        except Exception as e:
            print(f"Warning: Could not log feature names artifact: {e}")

        # Log scaler artifact if used
        if use_scaling and os.path.exists(scaler_path) and os.path.getsize(scaler_path) > 0:
            try:
                mlflow.log_artifact(scaler_path, artifact_path="scaler")
                print("Scaler logged as artifact to MLflow.")
            except Exception as e:
                print(f"Warning: Could not log scaler artifact: {e}")

        # Model Training using Core API
        print("Creating DMatrix WITHOUT feature names...")
        dtrain = xgb.DMatrix(X_train_np, label=y_train)
        dtest = xgb.DMatrix(X_test_np, label=y_test)
        print("DMatrix created.")
        
        print("Training XGBoost model using xgb.train...")
        params = {
            'objective': 'reg:squarederror',
            'learning_rate': learning_rate,
            'max_depth': max_depth,
            'subsample': subsample,
            'colsample_bytree': colsample_bytree,
            'min_child_weight': min_child_weight,
            'random_state': random_state,
            'eval_metric': 'rmse',
        }
        evals = [(dtrain, 'train'), (dtest, 'eval')]

        # Train the model - returns the booster object
        bst_model = xgb.train(
            params,
            dtrain,
            num_boost_round=n_estimators,
            evals=evals,
            early_stopping_rounds=early_stopping_rounds,
            verbose_eval=False
        )
        print(f"Training complete. Best iteration: {bst_model.best_iteration}, Best score (RMSE): {bst_model.best_score}")
        
        # Save Model Manually (Binary Format)
        try:
            model_dir = Path(output_model.path)
            model_dir.mkdir(parents=True, exist_ok=True)
            model_file_path = model_dir / "model.bst"

            print(f"Saving model in binary format to: {model_file_path}")
            bst_model.save_model(model_file_path)
            
            # Verify the model was saved
            if not model_file_path.exists() or model_file_path.stat().st_size == 0:
                raise RuntimeError(f"Failed to save model or model file is empty at {model_file_path}")
            print(f"Model successfully saved locally to {model_file_path}")
                            
            # Log Model Manually to MLflow
            print("Logging model artifact to MLflow...")
            mlflow.log_artifact(model_file_path, artifact_path="model")

            if 'temp_feat_path' in locals() and Path(temp_feat_path).exists(): 
                mlflow.log_artifact(temp_feat_path, artifact_path="model")
                os.remove(temp_feat_path)
            print("Model artifacts logged to MLflow run.")
            
            # Set KFP output model metadata 
            output_model.metadata.update({
                "framework": "xgboost",
                "format": "bst",
                "feature_order": json.dumps(feature_names),
                "feature_count": str(len(feature_names)),
                "best_iteration": str(bst_model.best_iteration),
                "best_score": str(bst_model.best_score),
                "training_samples": str(len(X_train_np)),
                "validation_samples": str(len(X_test_np)),
                "mlflow_run_id": current_run_id
            })

        except Exception as e:
            print(f"Error saving/logging model artifacts: {str(e)}")
            raise

    print(f"--- Step 3 Complete (MLflow Run ID: {current_run_id}) ---")
    return TrainOutput(mlflow_run_id_out=current_run_id)

Step 4 - Predict & Evaluate

The trained model is used to predict on test data, and performance metrics are logged to MLflow.

Component 4 - python code
# ==============================================================================
# Component 4: Predict and Evaluate
# ==============================================================================
@dsl.component(
    base_image='python:3.10',
    packages_to_install=[
        'pandas==2.1.4',
        'numpy==1.24.4',
        'xgboost==2.1.1',
        'scikit-learn==1.6.1',
        'mlflow==2.21.3',
        'pyarrow==19.0.1',
        'joblib==1.4.2',
        'kfp==2.12.1',
    ]
)
def predict_and_evaluate(
    input_model: Input[Model],
    x_test_path: InputPath('Dataset'),
    y_test_path: InputPath('Dataset'),
    mlflow_run_id: str,
    mlflow_tracking_uri: str,
    feature_names_json: str,
    # Outputs
    y_pred_path: OutputPath('Dataset'),
    test_rmse_output: OutputPath(float),
    test_mae_output: OutputPath(float),
    test_r2_output: OutputPath(float),
):
    import pandas as pd
    import numpy as np
    import xgboost as xgb
    import mlflow
    import joblib
    import os
    from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
    import json
    from pathlib import Path
    
    print("--- Step 4: Predict and Evaluate ---")
    print(f"Using XGBoost version: {xgb.__version__}")

    # Load Inputs
    print("Loading model...")
    model_dir = Path(input_model.path)

    # Look for model.bst
    model_file_bst = model_dir / "model.bst"
    if not model_file_bst.exists():
        raise FileNotFoundError(f"Model file 'model.bst' not found at {model_dir}")

    print(f"Loading XGBoost model from {model_file_bst}")

    # Load model using Booster interface
    booster = xgb.Booster()
    booster.load_model(model_file_bst)
    print("Model loaded successfully.")

    print("Loading test data...")
    X_test_df = pd.read_parquet(x_test_path)
    y_test_df = pd.read_parquet(y_test_path)
    y_test = y_test_df.iloc[:, 0].values

    # Load feature names to ensure correct column order in X_test_df
    feature_names = json.loads(feature_names_json)
    print(f"Ensuring test data feature order using: {feature_names}")
    try:
        X_test_df = X_test_df[feature_names]
    except KeyError as e:
        print(f"Error: Feature mismatch when ordering X_test columns: {e}")
        missing = set(feature_names) - set(X_test_df.columns)
        if missing:
            print(f"Features expected but missing in X_test data: {missing}")
        raise
    
    # Configure MLflow Client
    mlflow.set_tracking_uri(mlflow_tracking_uri)
    print(f"Using MLflow Run ID: {mlflow_run_id}")

    print("Attempting to log metrics to existing MLflow run...")
    with mlflow.start_run(run_id=mlflow_run_id):
        # Prediction
        print("Generating predictions...")
     
        print(f"Creating DMatrix from X_test data with shape: {X_test_df.shape}")
        dtest = xgb.DMatrix(X_test_df)
        
        y_pred = booster.predict(dtest)        
        y_pred = np.array(y_pred).ravel() # Ensure y_pred is a flat NumPy array
        print(f"Predictions generated with shape: {y_pred.shape}")

        # Evaluation
        print("Calculating evaluation metrics...")

        # Handle potential NaN/inf
        valid_indices = ~np.isnan(y_test) & ~np.isinf(y_test) & ~np.isnan(y_pred) & ~np.isinf(y_pred)
        if not np.all(valid_indices):
            print(f"Warning: NaN or Inf detected... ")
            y_test_valid = y_test[valid_indices]
            y_pred_valid = y_pred[valid_indices]
        else:
            y_test_valid = y_test
            y_pred_valid = y_pred

        # Check if any valid data remains after filtering
        if len(y_test_valid) == 0:
             print("Error: No valid data points remain after filtering NaN/Inf. Cannot calculate metrics.")
             rmse, mae, r2 = np.nan, np.nan, np.nan
        else:
            mse = mean_squared_error(y_test_valid, y_pred_valid)
            rmse = np.sqrt(mse)
            mae = mean_absolute_error(y_test_valid, y_pred_valid)
            r2 = r2_score(y_test_valid, y_pred_valid)

        # Get best iteration from metadata if available
        best_iteration = int(input_model.metadata.get('best_iteration', -1)) 

        # Log Metrics to MLflow
        print("Logging metrics to MLflow...")
        mlflow.log_metrics({
            "eval_mse": mse if not np.isnan(mse) else 0,
            "eval_rmse": rmse if not np.isnan(rmse) else 0,
            "eval_mae": mae if not np.isnan(mae) else 0,
            "eval_r2": r2 if not np.isnan(r2) else 0,
            # Log best_iteration if valid
            **({"best_iteration_eval": best_iteration} if best_iteration != -1 else {})
        })

        print("\n--- Evaluation Results ---")
        print(f"Test Set MSE: {mse:,.4f}")
        print(f"Test Set RMSE: {rmse:,.4f}")
        print(f"Test Set MAE: {mae:,.4f}")
        print(f"Test Set R²: {r2:.4f}")
        if best_iteration != -1:
            print(f"Best Iteration (from training): {best_iteration}")

        # Save predictions as DataFrame/Parquet
        print(f"Saving predictions DataFrame ({len(y_pred)} rows) to: {y_pred_path}")
        
        pred_df = pd.DataFrame({'predictions': y_pred}, index=X_test_df.index)
        pred_df.to_parquet(y_pred_path)

         # Save metrics to KFP output paths
        with open(test_rmse_output, 'w') as f: f.write(str(rmse if not np.isnan(rmse) else 0.0))
        with open(test_mae_output, 'w') as f: f.write(str(mae if not np.isnan(mae) else 0.0))
        with open(test_r2_output, 'w') as f: f.write(str(r2 if not np.isnan(r2) else 0.0))

    print("--- Step 4 Complete ---")

Step 5 - Generate & Log Plot

This component visualizes key performance metrics and logs the resulting plots to the MLflow run.

Component 5 - python code
# ==============================================================================
# Component 5: Generate and Log Plots
# ==============================================================================
@dsl.component(
    base_image='python:3.10',
    packages_to_install=[
        'pandas==2.1.4',
        'numpy==1.24.4',
        'xgboost==2.1.1',
        'matplotlib==3.10.0',
        'mlflow==2.21.3',
        'joblib==1.4.2', 
        'pyarrow==19.0.1',
        'kfp==2.12.1',
    ]
)
def generate_and_log_plots(
    input_model: Input[Model],
    y_test_path: InputPath('Dataset'),
    y_pred_path: InputPath('Dataset'),
    feature_names_json: str,
    mlflow_run_id: str,
    mlflow_tracking_uri: str,
    run_name: str, 
):
    import pandas as pd
    import numpy as np
    import xgboost as xgb
    from xgboost import plot_importance
    import matplotlib.pyplot as plt
    import mlflow
    import os
    import json
    import traceback
    from pathlib import Path

    print("--- Step 5: Generate and Log Plots ---")
    print(f"Using XGBoost version: {xgb.__version__}")

    # Load Inputs
    print("Loading model...")
    model_dir = Path(input_model.path)

    # Check for model file
    model_file_bst = model_dir / "model.bst"
    if not model_file_bst.exists():
        raise FileNotFoundError(f"Model file 'model.bst' not found at {model_dir}")
    
    print(f"Loading XGBoost model from {model_file_bst}")
    booster = xgb.Booster()
    booster.load_model(model_file_bst)
    print("Model loaded successfully.")

    print("Loading y_test data...")
    y_test = pd.read_parquet(y_test_path).iloc[:, 0].values # Load as numpy array

    print("Loading feature names...")
    feature_names = json.loads(feature_names_json)
    print(f"Loaded {len(feature_names)} feature names.")

    if feature_names:
        print("Setting feature names on loaded booster for plotting...")
        booster.feature_names = feature_names
    else:
        print("Warning: Empty feature names list received. Importance plots might use default 'fN' names.")

    print(f"Using MLflow Run ID: {mlflow_run_id}")

    # Load predictions
    print(f"Loading predictions from: {y_pred_path}")
    try:
        y_pred_df = pd.read_parquet(y_pred_path)
        y_pred = y_pred_df.iloc[:, 0].values # Load as numpy array
        print(f"Loaded predictions, shape: {y_pred.shape}")
    except Exception as e:
        print(f"Error loading predictions: {e}")
        raise

    # Configure MLflow Client
    mlflow.set_tracking_uri(mlflow_tracking_uri)
    
    # Log plots to the existing MLflow run
    with mlflow.start_run(run_id=mlflow_run_id):
        # Helper Functions
        def get_dynamic_figsize(feature_count, max_features=20):
            display_count = min(feature_count, max_features)
            height = max(6, display_count * 0.4 + 1)
            width = 10
            return (width, height)

        # Generate and Log Plots
        kfp_run_id = os.environ.get('KFP_RUN_ID', 'local') 
        final_run_name = f"{run_name}_{kfp_run_id}"

        print("Generating and logging plots to MLflow...")

        # Actual vs Predicted Plot
        try:
            fig_pred, ax_pred = plt.subplots(figsize=(10, 6))
            valid_indices = ~np.isnan(y_test_np) & ~np.isinf(y_test_np) & ~np.isnan(y_pred) & ~np.isinf(y_pred)
            y_test_plt = y_test_np[valid_indices]
            y_pred_plt = y_pred[valid_indices]

            ax_pred.scatter(y_test_plt, y_pred_plt, alpha=0.5, label="Predictions")
            min_val = min(y_test_plt.min(), y_pred_plt.min()) * 0.98
            max_val = max(y_test_plt.max(), y_pred_plt.max()) * 1.02
            ax_pred.plot([min_val, max_val], [min_val, max_val], '--', color='red', linewidth=2, label="Ideal Fit")
            ax_pred.set_xlabel('Actual Target')
            ax_pred.set_ylabel('Predicted Target')
            ax_pred.set_title(f'Actual vs. Predicted ({final_run_name})')
            ax_pred.legend()
            ax_pred.grid(True)
            plt.tight_layout()
            mlflow.log_figure(fig_pred, "plots/actual_vs_predicted.png")
            plt.close(fig_pred)
            print("Logged actual_vs_predicted.png")
        except Exception as e:
            print(f"Warning: Could not plot Actual vs Predicted: {e}\n{traceback.format_exc()}")

        # Feature Importance Plots
        try:
            num_features = len(booster.feature_names) if booster.feature_names else 0
            if num_features == 0:
                 print("Skipping importance plots: No feature names set on booster.")
            else:
                max_plot_features = 20
                # Weight
                fig_imp_w, ax_imp_w = plt.subplots(figsize=get_dynamic_figsize(num_features, max_plot_features))
                plot_importance(booster, ax=ax_imp_w, max_num_features=max_plot_features, importance_type='weight', show_values=False, title=f'Importance (Weight) - {final_run_name}')
                plt.tight_layout()
                mlflow.log_figure(fig_imp_w, "plots/feature_importance_weight.png")
                plt.close(fig_imp_w)
                print("Logged feature_importance_weight.png")

                # Gain
                fig_imp_g, ax_imp_g = plt.subplots(figsize=get_dynamic_figsize(num_features, max_plot_features))
                plot_importance(booster, ax=ax_imp_g, max_num_features=max_plot_features, importance_type='gain', show_values=False, title=f'Importance (Gain) - {final_run_name}')
                plt.tight_layout()
                mlflow.log_figure(fig_imp_g, "plots/feature_importance_gain.png")
                plt.close(fig_imp_g)
                print("Logged feature_importance_gain.png")

                # Cover
                fig_imp_c, ax_imp_c = plt.subplots(figsize=get_dynamic_figsize(num_features, max_plot_features))
                plot_importance(booster, ax=ax_imp_c, max_num_features=max_plot_features, importance_type='cover', show_values=False, title=f'Importance (Cover) - {final_run_name}')
                plt.tight_layout()
                mlflow.log_figure(fig_imp_c, "plots/feature_importance_cover.png")
                plt.close(fig_imp_c)
                print("Logged feature_importance_cover.png")
                
        except Exception as e:
            if isinstance(e, xgb.core.XGBoostError) and 'feature_names' in str(e):
                 print(f"Error plotting importance (likely missing feature names internally): {e}")
            else:
                 print(f"Warning: Could not plot importance plots: {e}\n{traceback.format_exc()}")

    print("--- Step 5 Complete ---")

Step 6 - Deploy to KServe

Deploys the final model using KServe with configuration compatible with XGBoost serving. Uses the same storage path and service account setup detailed earlier.

Component 6 - python code
# ==============================================================================
# Component 6: Deploy to KServe
# ==============================================================================
@dsl.component(
    base_image='python:3.10',
    packages_to_install=[
        'kfp==2.12.1',
        'kubernetes==29.0.0',
        'PyYAML==6.0.1'
    ]
)
def deploy_to_kserve(
    # Inputs
    model_name: str,
    model_uri: Input[Model],
    serving_namespace: str,
    # Outputs
    endpoint_url_output: OutputPath(str),
    # Optional Args
    model_format: str = 'xgboost',
    kserve_api_group: str = 'serving.kserve.io',
    kserve_api_version: str = 'v1beta1',
    service_account_name: str = None,
    min_replicas: int = 0,
    max_replicas: int = 1,
    cpu_request: str = "100m",
    cpu_limit: str = "1",
    memory_request: str = "256Mi",
    memory_limit: str = "2Gi",
):
    from kubernetes import client, config
    from kubernetes.client.rest import ApiException
    import time
    import yaml
    import os
    import json
    from urllib.parse import urlparse, urlunparse

    print(f"--- Step 6: Deploy Model to KServe ---")
    print(f"InferenceService Name: {model_name}")
    print(f"Deployment Namespace: {serving_namespace}")
    print(f"Input Model Artifact URI (directory): {model_uri.uri}")
    print(f"Input Model metadata: {json.dumps(model_uri.metadata, indent=2)}")

    expected_metadata_format = "bst"
    actual_metadata_format = model_uri.metadata.get('format')

    if not actual_metadata_format:
         print(f"Warning: Model metadata does not contain 'format' key.")
    elif actual_metadata_format != expected_metadata_format:
        print(f"Warning: Model metadata format ('{actual_metadata_format}') does not match expected format ('{expected_metadata_format}').")
    else:
        print(f"Model format verified from metadata: '{actual_metadata_format}'")
            
    # Adjust storageUri scheme AND append filename
    storage_uri = model_uri.uri
    
    # Ensure the scheme is s3:// for MinIO/S3 compatibility
    parsed_uri = urlparse(storage_uri)
    if parsed_uri.scheme == 'minio':
        storage_uri = urlunparse(parsed_uri._replace(scheme='s3'))
        print(f"Converted storage URI scheme to s3://: {storage_uri}")
    elif parsed_uri.scheme != 's3':
         print(f"Warning: Unexpected storage URI scheme '{parsed_uri.scheme}'. KServe might require 's3://'.")
    
    print(f"Using final storageUri (directory): {storage_uri}")
    
    # Construct InferenceService Object
    isvc_manifest = {
        "apiVersion": f"{kserve_api_group}/{kserve_api_version}",
        "kind": "InferenceService",
        "metadata": {
            "name": model_name,
            "namespace": serving_namespace,
            "annotations": {
                "autoscaling.knative.dev/minScale": str(min_replicas),
                "autoscaling.knative.dev/maxScale": str(max_replicas)
            }
        },
        "spec": {
            "predictor": {
                "model": {
                    "modelFormat": {
                        "name": model_format
                    },
                    "runtime": "kserve-xgbserver", # Use the standard XGBoost runtime
                    "protocolVersion": "v2",                    
                    "storageUri": storage_uri,
                    "resources": {
                        "requests": {
                            "cpu": cpu_request,
                            "memory": memory_request
                        },
                        "limits": {
                            "cpu": cpu_limit,
                            "memory": memory_limit
                        }
                    }
                }
            }
        }
    }

    # Add service account if specified
    if service_account_name:
        isvc_manifest["spec"]["predictor"]["serviceAccountName"] = service_account_name        

    print("\nConstructed InferenceService Manifest:")
    print(yaml.dump(isvc_manifest, indent=2))

    # Authenticate Kubernetes Client
    try:
        config.load_incluster_config()
        print("Loaded in-cluster Kubernetes config.")
    except config.ConfigException:
        try:
            config.load_kube_config()
            print("Loaded local Kubernetes config.")
        except config.ConfigException as e:
             print(f"Error: Could not load any Kubernetes config: {e}")
             raise
            
    api_client = client.ApiClient()
    custom_api = client.CustomObjectsApi(api_client)

    # Create or Update InferenceService
    try:
        print(f"Checking if InferenceService '{model_name}' exists in namespace '{serving_namespace}'...")
        existing_isvc = custom_api.get_namespaced_custom_object(
            group=kserve_api_group,
            version=kserve_api_version,
            namespace=serving_namespace,
            plural="inferenceservices",
            name=model_name
        )

        print("InferenceService exists. Patching...")
        # Update only the relevant parts if needed, or replace the whole spec
        # For simplicity, patching the whole body ensures config drift is fixed
        resp = custom_api.patch_namespaced_custom_object(
            group=kserve_api_group,
            version=kserve_api_version,
            namespace=serving_namespace,
            plural="inferenceservices",
            name=model_name,
            body=isvc_manifest # Apply the desired state
        )
        print("Patch successful.")
    except ApiException as e:
        if e.status == 404:
            print("Creating new InferenceService...")
            resp = custom_api.create_namespaced_custom_object(
                group=kserve_api_group,
                version=kserve_api_version,
                namespace=serving_namespace,
                plural="inferenceservices",
                body=isvc_manifest
            )
            print("Create successful.")
        else:
            # Log the error details properly
            print(f"Error interacting with Kubernetes API: Status={e.status}, Reason={e.reason}")
            print(f"Body: {e.body}")
            raise

    # Wait for InferenceService to be Ready
    print("Waiting for InferenceService to become Ready...")
    timeout_seconds = 300
    start_time = time.time()
    isvc_url = None
    last_message = ""

    while time.time() - start_time < timeout_seconds:
        try:
            isvc = custom_api.get_namespaced_custom_object(
                group=kserve_api_group,
                version=kserve_api_version,
                namespace=serving_namespace,
                plural="inferenceservices",
                name=model_name
            )

            status = isvc.get("status", {})
            url = status.get("url")
            conditions = status.get("conditions", [])

            ready_condition = next(
                (c for c in conditions if c.get("type") == "Ready"),
                None
            )

            current_message = "No Ready condition found"
            if ready_condition:
                current_message = f"Ready={ready_condition.get('status')}, Reason={ready_condition.get('reason', 'N/A')}, Msg={ready_condition.get('message', 'N/A')}"
                if ready_condition.get("status") == "True":
                    if url:
                        isvc_url = url
                        print(f"\nInferenceService is Ready at: {isvc_url}")
                        break
                    else:
                        current_message += " (URL not available yet)"
            # Only print status if it changes
            if current_message != last_message:
                print(f"Current status: {current_message}")
                last_message = current_message

        except ApiException as e:
            error_message = f"Status check error: {e.status} - {e.reason}"
            if error_message != last_message:
                print(error_message)
                last_message = error_message
        except Exception as e:
            # Catch other potential errors during status check
            error_message = f"Unexpected error during status check: {str(e)}"
            if error_message != last_message:
                 print(error_message)
                 last_message = error_message


        time.sleep(15) # Increase sleep interval slightly

    if not isvc_url:
        print(f"Warning: Timeout after {timeout_seconds} seconds waiting for InferenceService '{model_name}' to become ready.")
        # Try to get last known status for debugging
        try:
             isvc = custom_api.get_namespaced_custom_object(group=kserve_api_group, version=kserve_api_version, namespace=serving_namespace, plural="inferenceservices", name=model_name)
             print(f"Final status check: {json.dumps(isvc.get('status'), indent=2)}")
        except Exception as final_e:
             print(f"Could not get final status: {final_e}")
        isvc_url = "ERROR: Deployment timeout or failure"

    # Save Endpoint URL Output
    with open(endpoint_url_output, 'w') as f:
        f.write(isvc_url)

    print("--- Step 6 Complete ---")

Step 7 - Pipeline Definition

Defines task ordering and parameter passing. Note: I noticed set_display_name() sometimes causes unexpected errors and have commented it out.

# load_task.set_display_name("Load & Prep Data")
Component 7 - python code
# ==============================================================================
# Pipeline Definition
# ==============================================================================
@dsl.pipeline(
    name='HDB XGBoost Retraining Pipeline (Multi-Step BST)',
    description='Retrains XGBoost model (BST format) and deploys to KServe.'
)
def hdb_xgboost_multistep_pipeline(
    # Pipeline Parameters
    fixed_processed_data_s3_uri: str = 's3://mlpipeline/hdb-resale-data/resale-data-etl-final.parquet',
    target_column: str = 'resale_price',
    mlflow_tracking_uri: str = 'http://mlflow-tracking.mlflow',
    mlflow_experiment_name: str = 'XGBoostMultistepBST',
    # Features/HPs
    best_features_json: str = '[]',
    best_use_scaling: bool = True,
    best_n_estimators: int = 500,
    best_learning_rate: float = 0.04,
    best_max_depth: int = 7,
    best_early_stopping_rounds: int = 15,
    best_colsample_bytree: float = 0.7,
    best_min_child_weight: int = 3,
    best_subsample: float = 0.8,
    best_random_state: int = 42,  
    base_run_name: str = "XGBoostMultiStepBST",
    # KServe Deployment Params
    kserve_model_name: str = 'hdb-resale-xgb',
    kserve_namespace: str = 'kubeflow-user-example-com',
    kserve_model_format: str = 'xgboost',
    kserve_predictor_service_account: str = 'sa'
):   
    # Define Helper Function for Adding Env Vars
    def add_env_vars(task):
        task.set_env_variable(name="MLFLOW_TRACKING_USERNAME", value="user")
        task.set_env_variable(name="MLFLOW_TRACKING_PASSWORD", value="qYlTnUDQMe")
        
    # Pipeline Steps

    # Step 1: Load Data
    load_task = load_and_prep_data(
        s3_data_uri=fixed_processed_data_s3_uri,
        target_column=target_column,
        features_json=best_features_json
    )
    load_task.set_caching_options(enable_caching=False)

    # Step 2: Split & Scale
    scale_task = split_and_scale_data(
        x_data_path=load_task.outputs['x_data_path'],
        y_data_path=load_task.outputs['y_data_path'],
        feature_names_json=load_task.outputs['feature_names_json_out'],
        use_scaling=best_use_scaling
    )
    scale_task.set_caching_options(enable_caching=False)

    # Step 3: Train Model
    train_task = train_model(
        x_train_path=scale_task.outputs['x_train_path'],
        x_test_path=scale_task.outputs['x_test_path'],
        y_train_path=scale_task.outputs['y_train_path'],
        y_test_path=scale_task.outputs['y_test_path'],
        scaler_path=scale_task.outputs['scaler_path'],
        use_scaling=best_use_scaling,
        feature_names_json=load_task.outputs['feature_names_json_out'],
        # HPs
        n_estimators=best_n_estimators,
        learning_rate=best_learning_rate,
        max_depth=best_max_depth,
        early_stopping_rounds=best_early_stopping_rounds,
        colsample_bytree=best_colsample_bytree,
        min_child_weight=best_min_child_weight,
        subsample=best_subsample,
        random_state=best_random_state,    
        # MLflow
        run_name=base_run_name,
        mlflow_tracking_uri=mlflow_tracking_uri,
        mlflow_experiment_name=mlflow_experiment_name
    )
    train_task.set_caching_options(enable_caching=False)
    add_env_vars(train_task)

    # Step 4: Predict & Evaluate
    eval_task = predict_and_evaluate(
        input_model=train_task.outputs['output_model'],
        x_test_path=scale_task.outputs['x_test_path'],
        y_test_path=scale_task.outputs['y_test_path'],
        mlflow_run_id=train_task.outputs['mlflow_run_id_out'],
        mlflow_tracking_uri=mlflow_tracking_uri,
        feature_names_json=load_task.outputs['feature_names_json_out']
    )
    eval_task.set_caching_options(enable_caching=False)
    add_env_vars(eval_task)

    # Step 5: Generate & Log Plots
    plot_task = generate_and_log_plots(
        input_model=train_task.outputs['output_model'],
        y_test_path=scale_task.outputs['y_test_path'],
        y_pred_path=eval_task.outputs['y_pred_path'],
        feature_names_json=load_task.outputs['feature_names_json_out'],
        mlflow_run_id=train_task.outputs['mlflow_run_id_out'],
        mlflow_tracking_uri=mlflow_tracking_uri,
        run_name=base_run_name
    )
    plot_task.set_caching_options(enable_caching=False)
    add_env_vars(plot_task)

    # Step 6: Deploy Model
    deploy_task = deploy_to_kserve(
        model_name=kserve_model_name,
        model_uri=train_task.outputs['output_model'],
        serving_namespace=kserve_namespace,
        model_format=kserve_model_format,
        service_account_name=kserve_predictor_service_account
    ).after(train_task)
    deploy_task.set_caching_options(enable_caching=False)

Step 8 - Compilation

Compiles and uploads the pipeline as a .yaml file to Kubeflow Pipelines for execution.

Component 8 - python code
# ==============================================================================
# Pipeline Compilation
# ==============================================================================
if __name__ == '__main__':
    # Define best feature set found during exploration
    BEST_FEATURE_SET = ['floor_area_sqm', 'remaining_lease_years', 'storey_avg', 'sale_year', 'sale_month', 
                        'flat_type_1 ROOM', 'flat_type_2 ROOM', 'flat_type_3 ROOM', 'flat_type_4 ROOM', 'flat_type_5 ROOM', 
                        'flat_type_EXECUTIVE', 'flat_type_MULTI-GENERATION', 'flat_model_2-room', 'flat_model_3Gen', 'flat_model_Adjoined flat', 
                        'flat_model_Apartment', 'flat_model_DBSS', 'flat_model_Improved', 'flat_model_Improved-Maisonette', 'flat_model_Maisonette', 
                        'flat_model_Model A', 'flat_model_Model A-Maisonette', 'flat_model_Model A2', 'flat_model_Multi Generation', 'flat_model_New Generation',
                        'flat_model_Premium Apartment', 'flat_model_Premium Apartment Loft', 'flat_model_Premium Maisonette', 'flat_model_Simplified', 'flat_model_Standard', 
                        'flat_model_Terrace', 'flat_model_Type S1', 'flat_model_Type S2', 'town_ANG MO KIO', 'town_BEDOK', 
                        'town_BISHAN', 'town_BUKIT BATOK', 'town_BUKIT MERAH', 'town_BUKIT PANJANG', 'town_BUKIT TIMAH', 
                        'town_CENTRAL AREA', 'town_CHOA CHU KANG', 'town_CLEMENTI', 'town_GEYLANG', 'town_HOUGANG', 
                        'town_JURONG EAST', 'town_JURONG WEST', 'town_KALLANG/WHAMPOA', 'town_MARINE PARADE', 'town_PASIR RIS', 
                        'town_PUNGGOL', 'town_QUEENSTOWN', 'town_SEMBAWANG', 'town_SENGKANG', 'town_SERANGOON', 
                        'town_TAMPINES', 'town_TOA PAYOH', 'town_WOODLANDS', 'town_YISHUN']
    BEST_FEATURE_SET_JSON = json.dumps(BEST_FEATURE_SET)

    # Define other best parameters if needed for compile-time defaults
    BEST_USE_SCALING = True
    BEST_N_ESTIMATORS = 500
    BEST_LEARNING_RATE = 0.04
    BEST_MAX_DEPTH = 7
    BEST_EARLY_STOPPING_ROUNDS = 15
    BEST_COLSAMPLE_BYTREE = 0.7
    BEST_MIN_CHILD_WEIGHT = 3

    # Set pipeline parameters for compilation
    pipeline_parameters = {
         'best_features_json': BEST_FEATURE_SET_JSON,
         'best_use_scaling': BEST_USE_SCALING,
         'best_n_estimators': BEST_N_ESTIMATORS,
         'best_learning_rate': BEST_LEARNING_RATE,
         'best_max_depth': BEST_MAX_DEPTH,
         'best_early_stopping_rounds': BEST_EARLY_STOPPING_ROUNDS,
         'best_colsample_bytree': BEST_COLSAMPLE_BYTREE,
         'best_min_child_weight': BEST_MIN_CHILD_WEIGHT,
    }

    Compiler().compile(
        pipeline_func=hdb_xgboost_multistep_pipeline,
        package_path='hdb_xgboost_multistep_pipeline.yaml',
        pipeline_parameters=pipeline_parameters
    )
    print("Pipeline compiled to hdb_xgboost_multistep_pipeline.yaml")

hdb-xgboost-multistep-pipeline


HDB Predictor App

Finally, a simple and intuitive Streamlit app lets users interact with the model by entering flat details (e.g., floor area, location, lease year). The app sends the inputs to the KServe endpoint and displays the predicted resale price.

streamlit run app.py
app.py - python code
# app.py
import streamlit as st
import requests
import numpy as np
import pandas as pd
import joblib
import os
import json
from datetime import datetime

# --- SET PAGE CONFIG FIRST ---
st.set_page_config(layout="centered", page_title="HDB Price Predictor")

# --- Configuration ---
KSERVE_URL = os.environ.get("KSERVE_URL", "http://hdb-resale-xgb.kubeflow-user-example-com.svc.cluster.local/v2/models/hdb-resale-xgb/infer")
SCALER_PATH = "scaler.joblib"

# Feature names in the EXACT order the model expects
FEATURE_NAMES = ['floor_area_sqm', 'remaining_lease_years', 'storey_avg', 'sale_year', 'sale_month',
                 'flat_type_1 ROOM', 'flat_type_2 ROOM', 'flat_type_3 ROOM', 'flat_type_4 ROOM', 'flat_type_5 ROOM',
                 'flat_type_EXECUTIVE', 'flat_type_MULTI-GENERATION', 'flat_model_2-room', 'flat_model_3Gen', 'flat_model_Adjoined flat',
                 'flat_model_Apartment', 'flat_model_DBSS', 'flat_model_Improved', 'flat_model_Improved-Maisonette', 'flat_model_Maisonette',
                 'flat_model_Model A', 'flat_model_Model A-Maisonette', 'flat_model_Model A2', 'flat_model_Multi Generation', 'flat_model_New Generation',
                 'flat_model_Premium Apartment', 'flat_model_Premium Apartment Loft', 'flat_model_Premium Maisonette', 'flat_model_Simplified', 'flat_model_Standard',
                 'flat_model_Terrace', 'flat_model_Type S1', 'flat_model_Type S2', 'town_ANG MO KIO', 'town_BEDOK',
                 'town_BISHAN', 'town_BUKIT BATOK', 'town_BUKIT MERAH', 'town_BUKIT PANJANG', 'town_BUKIT TIMAH',
                 'town_CENTRAL AREA', 'town_CHOA CHU KANG', 'town_CLEMENTI', 'town_GEYLANG', 'town_HOUGANG',
                 'town_JURONG EAST', 'town_JURONG WEST', 'town_KALLANG/WHAMPOA', 'town_MARINE PARADE', 'town_PASIR RIS',
                 'town_PUNGGOL', 'town_QUEENSTOWN', 'town_SEMBAWANG', 'town_SENGKANG', 'town_SERANGOON',
                 'town_TAMPINES', 'town_TOA PAYOH', 'town_WOODLANDS', 'town_YISHUN']

NUM_FEATURES = len(FEATURE_NAMES)

# --- Extract Categories for Dropdowns ---
def extract_categories(prefix, feature_list):
    categories = []
    prefix_len = len(prefix)
    for feature in feature_list:
        if feature.startswith(prefix):
            categories.append(feature[prefix_len:])
    return sorted(categories)

FLAT_TYPES = extract_categories("flat_type_", FEATURE_NAMES)
FLAT_MODELS = extract_categories("flat_model_", FEATURE_NAMES)
TOWNS = extract_categories("town_", FEATURE_NAMES)

# --- Load Scaler ---
try:
    scaler = joblib.load(SCALER_PATH)
    print("Scaler loaded successfully.")
except FileNotFoundError:
    st.error(f"Error: Scaler file not found at {SCALER_PATH}. Cannot proceed.")
    st.stop()
except Exception as e:
    st.error(f"Error loading scaler: {e}")
    st.stop()

# --- Streamlit UI ---
st.title("HDB Resale Price Predictor")
st.markdown("Enter the details below to get a price prediction.")

# --- Input Fields ---
col1, col2 = st.columns(2) # Arrange inputs in columns

with col1:
    st.header("Key Features")
    floor_area = st.number_input("Floor Area (sqm)", min_value=20.0, max_value=300.0, value=90.0, step=1.0)
    lease_years = st.number_input("Remaining Lease (Years)", min_value=10.0, max_value=99.0, value=70.0, step=0.5)
    storey = st.number_input("Storey (Average)", min_value=1.0, max_value=50.0, value=10.0, step=1.0)

with col2:
    st.header("Location & Type")
    # Set default indices based on common values or simple index 0
    default_town_index = TOWNS.index("TAMPINES") if "TAMPINES" in TOWNS else 0
    default_flat_type_index = FLAT_TYPES.index("4 ROOM") if "4 ROOM" in FLAT_TYPES else 0
    default_flat_model_index = FLAT_MODELS.index("Improved") if "Improved" in FLAT_MODELS else 0

    selected_town = st.selectbox("Town", TOWNS, index=default_town_index)
    selected_flat_type = st.selectbox("Flat Type", FLAT_TYPES, index=default_flat_type_index)
    selected_flat_model = st.selectbox("Flat Model", FLAT_MODELS, index=default_flat_model_index)


# --- Prediction Button ---
if st.button("Predict Resale Price", type="primary"):
    st.markdown("---")
    st.subheader("Processing...")

    # --- Prepare Input Data ---
    # Start with all features as 0.0
    input_dict = {feat: 0.0 for feat in FEATURE_NAMES}

    # Update with numerical user inputs
    input_dict['floor_area_sqm'] = float(floor_area)
    input_dict['remaining_lease_years'] = float(lease_years)
    input_dict['storey_avg'] = float(storey)

    # Update with default sale year/month
    now = datetime.now()
    input_dict['sale_year'] = float(now.year)
    input_dict['sale_month'] = float(now.month)

    # *** Update one-hot encoded features based on dropdown selections ***
    # Town
    town_feature_name = f"town_{selected_town}"
    if town_feature_name in input_dict:
        input_dict[town_feature_name] = 1.0
    else:
        st.warning(f"Selected town feature '{town_feature_name}' not found in model features. Check FEATURE_NAMES.")

    # Flat Type
    flat_type_feature_name = f"flat_type_{selected_flat_type}"
    if flat_type_feature_name in input_dict:
        input_dict[flat_type_feature_name] = 1.0
    else:
        st.warning(f"Selected flat type feature '{flat_type_feature_name}' not found in model features.")

    # Flat Model
    flat_model_feature_name = f"flat_model_{selected_flat_model}"
    if flat_model_feature_name in input_dict:
        input_dict[flat_model_feature_name] = 1.0
    else:
        st.warning(f"Selected flat model feature '{flat_model_feature_name}' not found in model features.")


    # Convert dictionary to ordered list
    try:
        input_list = [input_dict[feature] for feature in FEATURE_NAMES]
        input_array = np.array(input_list).astype(np.float32).reshape(1, -1) # Reshape for scaler
        st.write(f"Input shape before scaling: {input_array.shape}")
        # Optional: Display which one-hot features are set
        # set_features = {k:v for k,v in input_dict.items() if v==1.0 and ('town_' in k or 'flat_type_' in k or 'flat_model_' in k)}
        # st.write(f"One-hot features set: {set_features}")


        # --- Scale the Input Data ---
        input_scaled = scaler.transform(input_array)
        st.write(f"Input shape after scaling: {input_scaled.shape}")
        payload_data = input_scaled.flatten().tolist()

        # --- Construct KServe Payload ---
        payload = {
            "inputs": [{
                "name": "input-0",
                "shape": [1, NUM_FEATURES],
                "datatype": "FP32",
                "data": payload_data
            }]
        }
        st.write("Sending request to KServe...")

        # --- Send Request ---
        try:
            response = requests.post(KSERVE_URL, json=payload, timeout=30)
            response.raise_for_status()
            result = response.json()
            st.write("Response received:")
            prediction = result.get('outputs', [{}])[0].get('data', [None])[0]

            if prediction is not None:
                st.success(f"**Predicted Resale Price:** S$ {prediction:,.2f}")
            else:
                st.error("Prediction data not found in the response.")
                st.json(result)

        # ... (Error handling remains the same) ...
        except requests.exceptions.Timeout:
            st.error(f"Error: Request timed out connecting to {KSERVE_URL}")
        except requests.exceptions.ConnectionError:
            st.error(f"Error: Could not connect to the prediction service at {KSERVE_URL}. Is it running and accessible?")
        except requests.exceptions.HTTPError as err:
            st.error(f"Error: Prediction service returned status code {err.response.status_code}.")
            try:
                error_detail = err.response.json()
                st.json({"error_details": error_detail})
            except json.JSONDecodeError:
                 st.text(f"Response content:\n{err.response.text}")
        except Exception as e:
            st.error(f"An unexpected error occurred: {e}")

    except Exception as e:
        st.error(f"Error preparing data for prediction: {e}")

Since this app is running from within a Jupyter Notebook environment, we need to port-forward the jupyter-tff service to Streamlit’s default port 8501:

hdb-port-forward-jupyter-tff

Once port-forwarding is active, open your browser and navigate to http://localhost:8501 to access the app interface:

hdb-resale-price-predictor


Bonus: Tracking Model Performance with MLflow

To better monitor model performance and track different training experiments, I used MLflow for experiment tracking. Below is a snapshot of the interface showing various model runs and their corresponding metrics:

hdb-xgboost-multistep-bst-local-artifacts

This setup helps keep experimentation organized and reproducible, especially when tuning hyperparameters or trying different data splits.

hdb-xgboost-multistep-bst-local


Conclusion

This walkthrough showcased how easy it is to get started with a full-featured Jupyter environment in Kubeflow using the jupyter-tensorflow-full image. By combining this with Streamlit, I was able to build and test an HDB resale price predictor app directly within my notebook server. With minimal setup and simple port forwarding, Kubeflow proves to be a solid platform not just for machine learning pipelines but also for interactive app development and experimentation.