# Load required libraries
cat("[CTF-DEBUG] Loading libraries: data.table, tidyverse, furrr, future.callr\n")
library(data.table)
library(tidyverse)
library(furrr)
library(future.callr)
cat("[CTF-DEBUG] Libraries loaded successfully\n")

# Memory profiling helper
log_memory <- function(label) {
  mem_info <- gc(reset = FALSE, verbose = FALSE)
  used_mb <- sum(mem_info[, 2])  # Total used memory in MB
  cat(sprintf("[KNS-MEM] %s: %.1f MB used\n", label, used_mb))
  flush(stdout())
}

prepare_data <- function(chars, features) {
  cat("[CTF-DEBUG] prepare_data: ENTERED - chars dim =", paste(dim(chars), collapse="x"), "features =", length(features), "\n")
  flush(stdout())

  # Convert to data.table (infrastructure loads as tibble via arrow::read_parquet)
  cat("[CTF-DEBUG] prepare_data: chars class BEFORE setDT =", class(chars)[1], "\n")
  flush(stdout())
  chars <- setDT(chars)
  cat("[CTF-DEBUG] prepare_data: chars class AFTER setDT =", class(chars)[1], "\n")
  flush(stdout())

  # Create 'me' column from 'market_equity' if it doesn't exist
  # (new data files only have market_equity, old files had both)
  # This preserves market_equity for use as a feature while me is used for weighting
  if (!"me" %in% names(chars)) {
    cat("[CTF-DEBUG] prepare_data: Creating 'me' column from 'market_equity'\n")
    flush(stdout())
    chars[, me := market_equity]
  }

  # Convert feature columns to double to avoid losing precision
  cat("[CTF-DEBUG] prepare_data: Converting", length(features), "features to double\n")
  flush(stdout())
  chars[, (features) := lapply(.SD, as.double), .SDcols = features]
  cat("[CTF-DEBUG] prepare_data: All features converted to double\n")
  flush(stdout())

  # BATCHED parallel ECDF transformation - process 50 features at a time with 4 workers
  # This avoids overwhelming worker communication pipes with 402 features
  log_memory("Before BATCHED parallel ECDF transformation")
  cat("[CTF-DEBUG] prepare_data: Starting BATCHED parallel ECDF transformation for", length(features), "features\n")
  flush(stdout())

  batch_size <- 50
  n_workers <- 4
  n_batches <- ceiling(length(features) / batch_size)

  cat(sprintf("[CTF-DEBUG] prepare_data: Processing %d batches of %d features with %d workers each\n",
              n_batches, batch_size, n_workers))
  flush(stdout())

  # Set up plan with limited workers for batched processing
  old_plan <- plan()
  plan(callr, workers = n_workers)

  # Process features in batches
  for (batch_idx in 1:n_batches) {
    start_idx <- (batch_idx - 1) * batch_size + 1
    end_idx <- min(batch_idx * batch_size, length(features))
    batch_features <- features[start_idx:end_idx]

    cat(sprintf("[CTF-DEBUG] prepare_data: Batch %d/%d - processing features %d to %d (%d features)\n",
                batch_idx, n_batches, start_idx, end_idx, length(batch_features)))
    flush(stdout())

    # CRITICAL: Extract data BEFORE future_map to avoid capturing full chars table
    # Create a list of data for each feature to avoid closure capture
    cat(sprintf("[CTF-DEBUG] prepare_data: Extracting data for batch %d features\n", batch_idx))
    flush(stdout())

    batch_data_list <- lapply(batch_features, function(f) {
      list(
        feature_name = f,
        data = chars[, .(row_idx = .I, id, eom, value = get(f))]
      )
    })

    cat(sprintf("[CTF-DEBUG] prepare_data: Starting parallel ECDF for batch %d\n", batch_idx))
    flush(stdout())

    # Process this batch in parallel with error handling
    transformed_batch <- tryCatch({
      future_map(batch_data_list, .progress = TRUE,
                 .options = furrr_options(seed = TRUE, packages = "data.table"),
                 function(item) {
        tryCatch({
          f <- item$feature_name
          temp <- copy(item$data)  # Make a copy to avoid any reference issues

          # Mark zeros
          temp[, zero := (value == 0 & !is.na(value))]

          # ECDF transformation by eom group (only for non-NA values)
          temp[!is.na(value), transformed := ecdf(value)(value), by = eom]

          # Restore exact zeros (ECDF always returns >0)
          temp[zero == TRUE, transformed := 0]

          # Sort back to original row order
          setorder(temp, row_idx)

          # Return the transformed column (NAs preserved, order correct)
          return(temp$transformed)
        }, error = function(e) {
          cat(sprintf("[CTF-ERROR] ECDF failed for feature '%s': %s\n", item$feature_name, conditionMessage(e)))
          flush(stdout())
          stop(sprintf("Feature %s failed: %s", item$feature_name, conditionMessage(e)))
        })
      })
    }, error = function(e) {
      cat(sprintf("[CTF-ERROR] Batch %d/%d FAILED: %s\n", batch_idx, n_batches, conditionMessage(e)))
      cat(sprintf("[CTF-ERROR] Error class: %s\n", class(e)))
      cat(sprintf("[CTF-ERROR] Error trace:\n"))
      flush(stdout())
      print(e)
      flush(stdout())
      stop(sprintf("Batch %d failed: %s", batch_idx, conditionMessage(e)))
    })

    # Assign batch results back to chars
    for (i in seq_along(batch_features)) {
      set(chars, j = batch_features[i], value = transformed_batch[[i]])
    }

    cat(sprintf("[CTF-DEBUG] prepare_data: Batch %d/%d completed\n", batch_idx, n_batches))
    flush(stdout())
  }

  # Restore original plan
  plan(old_plan)

  log_memory("After BATCHED parallel ECDF transformation")
  cat("[CTF-DEBUG] prepare_data: BATCHED parallel ECDF transformation completed - all", length(features), "features processed\n")
  flush(stdout())

  # Print sample of transformed data to verify
  cat("[CTF-DEBUG] prepare_data: SAMPLE of first 3 features after ECDF:\n")
  flush(stdout())
  for (i in 1:min(3, length(features))) {
    f <- features[i]
    sample_vals <- chars[1:5, get(f)]
    cat(sprintf("[CTF-DEBUG]   %s: %s\n", f, paste(round(sample_vals, 4), collapse=", ")))
    flush(stdout())
  }

  # Feature Imputation
  cat("[CTF-DEBUG] prepare_data: Starting feature imputation (NA -> 0.5)\n")
  flush(stdout())
  chars[, (features) := lapply(.SD, function(x) if_else(is.na(x), 0.5, x)), .SDcols=features, by=eom]
  cat("[CTF-DEBUG] prepare_data: Feature imputation completed\n")
  flush(stdout())

  # L1 normalization
  cat("[CTF-DEBUG] prepare_data: Starting L1 normalization by eom\n")
  flush(stdout())
  chars[, (features) := lapply(.SD, function(r) (r - mean(r)) / sum(abs(r - mean(r)))), .SDcols=features, by = eom]
  chars[, (features) := lapply(.SD, function(w) ifelse(is.na(w), 0, w)), .SDcols=features]
  cat("[CTF-DEBUG] prepare_data: L1 normalization completed\n")
  flush(stdout())

  # Weight shares per month (normalize me, not market_equity)
  cat("[CTF-DEBUG] prepare_data: Normalizing me (market cap weighting) by eom\n")
  flush(stdout())
  chars[, me := me / sum(me), by = eom]
  cat("[CTF-DEBUG] prepare_data: COMPLETED - returning normalized chars\n")
  flush(stdout())

  # Print final sample to verify full pipeline
  cat("[CTF-DEBUG] prepare_data: FINAL SAMPLE of first 3 features:\n")
  flush(stdout())
  for (i in 1:min(3, length(features))) {
    f <- features[i]
    sample_vals <- chars[1:5, get(f)]
    cat(sprintf("[CTF-DEBUG]   %s: %s\n", f, paste(round(sample_vals, 4), collapse=", ")))
    flush(stdout())
  }

  chars
}

rank_weighted_factor_ws <- function(chars, features) {
  cat("[CTF-DEBUG] rank_weighted_factor_ws: ENTERED - chars dim =", paste(dim(chars), collapse="x"), "features =", length(features), "\n")
  flush(stdout())

  # Use data.table syntax to avoid tibble conversion
  cols_to_keep <- c("id", "eom", "eom_ret", features)
  cat("[CTF-DEBUG] rank_weighted_factor_ws: Melting chars to long format with", length(cols_to_keep), "columns\n")
  flush(stdout())

  result <- melt(
    chars[, ..cols_to_keep],
    id.vars = c("id", "eom", "eom_ret"),
    variable.name = "feature",
    value.name = "w"
  )

  cat("[CTF-DEBUG] rank_weighted_factor_ws: COMPLETED - result dim =", paste(dim(result), collapse="x"), "\n")
  flush(stdout())
  result
}

factor_monthly_returns <- function(factor_ws, chars, features) {
  cat("[CTF-DEBUG] factor_monthly_returns: ENTERED - factor_ws dim =", paste(dim(factor_ws), collapse="x"), "chars dim =", paste(dim(chars), collapse="x"), "\n")
  flush(stdout())

  # Single-pass data.table join-then-group (NO pre-slicing, NO parallelism needed)
  # This is MUCH faster than 864 sequential filter operations
  cat("[CTF-DEBUG] factor_monthly_returns: Extracting needed columns from chars\n")
  flush(stdout())
  sub_chars <- as.data.table(chars)[, .(eom_ret, id, ret_exc_lead1m)]
  cat("[CTF-DEBUG] factor_monthly_returns: sub_chars dim =", paste(dim(sub_chars), collapse="x"), "\n")
  flush(stdout())

  cat("[CTF-DEBUG] factor_monthly_returns: Converting factor_ws to data.table\n")
  flush(stdout())
  factor_ws <- as.data.table(factor_ws)
  cat("[CTF-DEBUG] factor_monthly_returns: factor_ws converted, dim =", paste(dim(factor_ws), collapse="x"), "\n")
  flush(stdout())

  # Set keys for fast join
  cat("[CTF-DEBUG] factor_monthly_returns: Setting keys on factor_ws (eom_ret, id)\n")
  flush(stdout())
  setkey(factor_ws, eom_ret, id)
  cat("[CTF-DEBUG] factor_monthly_returns: Setting keys on sub_chars (eom_ret, id)\n")
  flush(stdout())
  setkey(sub_chars, eom_ret, id)
  cat("[CTF-DEBUG] factor_monthly_returns: Keys set successfully\n")
  flush(stdout())

  # Single join operation (replaces 864 filter+join operations)
  m <- factor_ws[sub_chars, nomatch = 0L]

  # Single grouped aggregation, then dcast to wide format
  cat("[CTF-DEBUG] factor_monthly_returns: Aggregating by (eom_ret, feature)\n")
  flush(stdout())
  res <- m[, .(ret = sum(w * ret_exc_lead1m)), by = .(eom_ret, feature)]
  cat("[CTF-DEBUG] factor_monthly_returns: Aggregation completed - res dim =", paste(dim(res), collapse="x"), "\n")
  flush(stdout())

  cat("[CTF-DEBUG] factor_monthly_returns: Casting to wide format (eom_ret ~ feature)\n")
  flush(stdout())
  result <- dcast(res, eom_ret ~ feature, value.var = "ret")
  cat("[CTF-DEBUG] factor_monthly_returns: COMPLETED - result dim =", paste(dim(result), collapse="x"), "\n")
  flush(stdout())

  return(result)
}

factor_daily_returns <- function(factor_ws, daily) {
  cat("[CTF-DEBUG] factor_daily_returns: ENTERED - factor_ws dim =", paste(dim(factor_ws), collapse="x"), "daily dim =", paste(dim(daily), collapse="x"), "\n")
  flush(stdout())

  # CHUNKED join approach to avoid 2^31 row limit in data.table
  # Instead of joining all factor_ws × all daily at once (>2B rows),
  # we slice by eom_ret date and join each month separately
  cat("[CTF-DEBUG] factor_daily_returns: Converting to data.table\n")
  flush(stdout())
  factor_ws <- as.data.table(factor_ws)
  daily <- as.data.table(daily)

  # Add eom column to daily if it doesn't exist
  if (!"eom" %in% names(daily)) {
    cat("[CTF-DEBUG] factor_daily_returns: Adding eom column to daily\n")
    flush(stdout())
    daily[, eom := ceiling_date(date, "month") - days(1)]
  }

  log_memory("Before chunked factor_daily_returns join")

  # Get unique eom_ret dates to process
  eom_dates <- sort(unique(factor_ws$eom_ret))
  n_dates <- length(eom_dates)
  cat(sprintf("[CTF-DEBUG] factor_daily_returns: Processing %d unique eom_ret dates sequentially\n", n_dates))
  flush(stdout())

  # Process each eom_ret date separately to avoid cartesian explosion
  # SEQUENTIAL processing (not parallelized) because factor_ws (16GB) and daily (3GB) are too large
  # to serialize to workers (would exceed 5GB future.globals.maxSize limit)
  start_time <- Sys.time()
  result_list <- lapply(seq_along(eom_dates), function(i) {
      e <- eom_dates[i]

      # Progress reporting every 50 iterations
      if (i %% 50 == 0 || i == 1 || i == n_dates) {
        elapsed <- as.numeric(difftime(Sys.time(), start_time, units = "secs"))
        rate <- i / elapsed
        remaining <- (n_dates - i) / rate
        cat(sprintf("[CTF-PROGRESS] factor_daily_returns: %d/%d dates (%.1f%%) - %.1f dates/sec - ETA %.1f min\n",
                    i, n_dates, 100 * i / n_dates, rate, remaining / 60))
        flush(stdout())
      }

      # Filter daily data for this month
      sub_daily <- daily[eom == e]

      # If no daily data for this month, skip
      if (nrow(sub_daily) == 0) {
        return(NULL)
      }

      # Filter factor_ws for this month and only IDs that exist in daily data
      sub_ws <- factor_ws[eom_ret == e & id %in% sub_daily$id]

      # Further filter daily data to only IDs that exist in factor_ws
      sub_daily <- sub_daily[id %in% sub_ws$id]

      # If either subset is empty after filtering, skip
      if (nrow(sub_ws) == 0 || nrow(sub_daily) == 0) {
        return(NULL)
      }

      # Join this month's data (much smaller, won't exceed 2^31 limit)
      m <- sub_ws[sub_daily, on = .(eom_ret = eom, id), nomatch = 0L, allow.cartesian = TRUE]

      # If join produced empty result, skip
      if (nrow(m) == 0) {
        return(NULL)
      }

      # Aggregate by (date, eom_ret, feature)
      res <- m[, .(ret = sum(w * ret_exc)), by = .(date, eom_ret, feature)]

      # Cast to wide format
      dcast(res, date + eom_ret ~ feature, value.var = "ret")
    })

  cat("[CTF-DEBUG] factor_daily_returns: Combining results from all months\n")
  flush(stdout())

  # Combine all monthly results, handling NULL entries and missing features
  result <- rbindlist(result_list, fill = TRUE)
  setorder(result, date)

  log_memory("After chunked factor_daily_returns join")

  cat("[CTF-DEBUG] factor_daily_returns: COMPLETED - result dim =", paste(dim(result), collapse="x"), "\n")
  flush(stdout())

  return(result)
}

create_rw_factors <- function(chars, daily, features) {
  cat("[CTF-DEBUG] create_rw_factors: ENTERED\n")
  flush(stdout())

  cat("[CTF-DEBUG] create_rw_factors: Calling rank_weighted_factor_ws\n")
  flush(stdout())
  rw_factor_ws <- rank_weighted_factor_ws(chars, features)
  cat("[CTF-DEBUG] create_rw_factors: rank_weighted_factor_ws completed\n")
  flush(stdout())

  cat("[CTF-DEBUG] create_rw_factors: Calling factor_monthly_returns\n")
  flush(stdout())
  rw_factor_m <- factor_monthly_returns(rw_factor_ws, chars, features)
  cat("[CTF-DEBUG] create_rw_factors: factor_monthly_returns completed\n")
  flush(stdout())

  cat("[CTF-DEBUG] create_rw_factors: Calling factor_daily_returns\n")
  flush(stdout())
  rw_factor_d_raw <- factor_daily_returns(rw_factor_ws, daily)
  cat("[CTF-DEBUG] create_rw_factors: factor_daily_returns completed\n")
  flush(stdout())

  cat("[CTF-DEBUG] create_rw_factors: COMPLETED - returning list with daily and monthly factors\n")
  flush(stdout())
  list("daily" = rw_factor_d_raw, "monthly" = rw_factor_m)
}

create_mkt_rets <- function(chars, daily) {
  cat("[CTF-DEBUG] create_mkt_rets: ENTERED\n")
  flush(stdout())

  cat("[CTF-DEBUG] create_mkt_rets: Creating monthly market returns\n")
  flush(stdout())
  mkt_ret <- chars |>
    group_by(eom) |>
    summarize(
      ret_exc = sum(me * ret_exc_lead1m, na.rm = TRUE) / sum(me),
      .groups = "drop"
    ) |>
    mutate(
      eom_ret = eom + 1 + months(1) - 1
    ) |>
    select(-eom) |>
    setDT()
  cat("[CTF-DEBUG] create_mkt_rets: Monthly market returns dim =", paste(dim(mkt_ret), collapse="x"), "\n")
  flush(stdout())

  cat("[CTF-DEBUG] create_mkt_rets: Creating daily market returns\n")
  flush(stdout())
  mkt_ret_daily <- daily |>
    mutate(
      eom_l = eom + 1 - months(1) - 1
    ) |>
    inner_join(chars[,c("id", "eom", "me")], by = c("eom_l" = "eom", "id" = "id")) |>
    group_by(date) |>
    summarize(
      ret_exc = sum(me * ret_exc, na.rm = TRUE) / sum(me),
      .groups = "drop"
    ) |>
    setDT()
  cat("[CTF-DEBUG] create_mkt_rets: Daily market returns dim =", paste(dim(mkt_ret_daily), collapse="x"), "\n")
  flush(stdout())

  cat("[CTF-DEBUG] create_mkt_rets: COMPLETED\n")
  flush(stdout())
  list("daily" = mkt_ret_daily, "monthly" = mkt_ret)
}

calc_opt_lambdas <- function(d, rw_factor_d_raw, mkt_ret_daily, features, n_folds, l2s, l1s = c(), use_pc=FALSE) {
  cat(sprintf("[CTF-DEBUG] calc_opt_lambdas: ENTERED for date %s\n", as.character(d)))
  flush(stdout())

  all_months <- sort(unique(rw_factor_d_raw$eom_ret))
  cat(sprintf("[CTF-DEBUG] calc_opt_lambdas: Found %d unique months\n", length(all_months)))
  flush(stdout())

  sub_months <- all_months[all_months < d & all_months >= d + 1 - months(120) - 1]
  cat(sprintf("[CTF-DEBUG] calc_opt_lambdas: Filtered to %d months for 120-month lookback\n", length(sub_months)))
  flush(stdout())

  cut_length <- floor(120 / n_folds)
  cut_idx <- c(0, (1:n_folds)  * cut_length)
  cat(sprintf("[CTF-DEBUG] calc_opt_lambdas: Using %d folds with cut_length=%d\n", n_folds, cut_length))
  flush(stdout())

  lambda_search <- 1:n_folds |>
    map(function(k) {
      val_months <- sub_months[cut_idx[k]:cut_idx[k+1]]
      train_months <- sub_months[! sub_months %in% val_months]

      val_k <- rw_factor_d_raw |> filter(eom_ret %in% val_months)
      train_k <- rw_factor_d_raw |> filter(eom_ret %in% train_months)
      mkt_train <- mkt_ret_daily |> filter(date %in% train_k$date)
      mkt_val <- mkt_ret_daily |> filter(date %in% val_k$date)

      beta <- cov(train_k |> select(all_of(features)), mkt_train$ret_exc) / var(mkt_train$ret_exc)

      X_train <- (train_k |> select(all_of(features))) - mkt_train$ret_exc %*% t(beta)
      X_val <- val_k[, ..features] - mkt_val$ret_exc %*% t(beta)

      fct_train <- cbind(X_train, mkt_train$ret_exc)
      fct_val <- cbind(X_val, mkt_val$ret_exc)

      mu_train <- fct_train |> colMeans()
      Sigma_train <- fct_train |> cov()

      mu_val <- fct_val |> colMeans()
      Sigma_val <- fct_val |> cov()

      if (use_pc) {
        decomp <- eigen(Sigma_train)
        P <- decomp$vectors
      }

      l2s |>
        map(function(l2) {
          if (length(l1s) != 0) {
            if (use_pc) {
              w_tilde <- diag(1/(decomp$values + l2)) %*% t(P) %*% mu_train
            }
            dt <- l1s |>
              map(function(l1) {
                if (use_pc) {
                  w_pc <- sign(w_tilde) * pmax(0, abs(w_tilde) - l1)
                  w <- P %*% w_pc
                } else {
                  print("Warning: L1 regularization without PC is computationally expensive.")
                  model <- glmnet(
                    Sigma_train + diag(l2, ncol(Sigma_train)), mu_train,
                    alpha = 1, lambda = l1,
                    intercept = FALSE,
                    standardize = FALSE
                  )
                  w <- model$beta
                }
                data.table(
                  fold = k,
                  l1 = l1,
                  l2 = l2,
                  r2 = 1 - sum((mu_val - Sigma_val %*% w)^2) / sum(mu_val^2)
                )
              }) |>
              rbindlist()
          } else {
            w <- solve(Sigma_train + diag(l2, ncol(Sigma_train))) %*% mu_train
            dt <- data.table(
              fold = k,
              l1 = NaN,
              l2 = l2,
              r2 = 1 - sum((mu_val - Sigma_val %*% w)^2) / sum(mu_val^2)
            )
          }
          dt
        }) |>
        rbindlist()
    }) |>
    rbindlist() |>
    setDT() |>
    summarize(
      r2 = mean(r2),
      .by = c(l1, l2)
    ) |>
    setDT()

  data.table(
    eom_ret = d,
    l1 = lambda_search[r2 == max(r2)][order(-l1, -l2)][1]$l1,
    l2 = lambda_search[r2 == max(r2)][order(-l1, -l2)][1]$l2,
    r2 = lambda_search[r2 == max(r2)][order(-l1, -l2)][1]$r2
  )
}

kns <- function(d, rw_factor_d_raw, mkt_ret_daily, features, opt_lambdas, use_pc) {
    l1 <- opt_lambdas[eom_ret == d, l1]
    l2 <- opt_lambdas[eom_ret == d, l2]
    hist <- rw_factor_d_raw |> filter(eom_ret < d, eom_ret >= d + 1 - months(120) - 1)
    mkt_hist <- mkt_ret_daily |> filter(date %in% hist$date)

    beta <- cov(hist[, ..features], mkt_hist$ret_exc) / var(mkt_hist$ret_exc)
    X_hist <- hist[, ..features] - mkt_hist$ret_exc %*% t(beta)

    fct_hist <- cbind(X_hist, mkt_hist$ret_exc)

    mu_hist <- fct_hist |> colMeans()
    Sigma_hist <- fct_hist |> cov()

    if (use_pc) {
      decomp <- eigen(Sigma_hist)
      P <- decomp$vectors
      w_tilde <- diag(1/(decomp$values + l2)) %*% t(P) %*% mu_hist
      w_pc <- sign(w_tilde) * pmax(0, abs(w_tilde) - l1)
      w <- P %*% w_pc
      names(w) <- c(features, "me")
    } else {
      inv <- solve(Sigma_hist + diag(l2, ncol(Sigma_hist)))
      w <- inv %*% mu_hist
    }

    # Create data.table with eom_ret and feature weights
    dt <- data.table(
      eom_ret = d,
      t(w[-length(w)])
    )
    # Name columns with features
    names(dt) <- c("eom_ret", features)
    # Set/replace me with the calculated portfolio weight
    # (If me is in features, this replaces it; if not, this adds it)
    dt[, me := sum(t(w) * c(-beta, 1))]
    dt
}


main <- function(chars, features, daily_ret) {
  # Extract feature names from DataFrame (infrastructure passes 60x1 DataFrame)
  cat("[CTF-DEBUG] main: features class =", class(features), "dim =", paste(dim(features), collapse="x"), "\n")
  features <- features$features
  cat("[CTF-DEBUG] main: After extraction, features class =", class(features), "length =", length(features), "\n")

  # Convert date columns from strings to Date objects (WRDS format: YYYY-MM-DD)
  chars <- chars |> mutate(
    eom = as.Date(eom),
    eom_ret = as.Date(eom_ret)
  )

  daily_ret <- daily_ret |> mutate(
    date = as.Date(date)
  )

  # Add eom (end-of-month) column to daily_ret - required by factor_daily_returns()
  daily_ret <- daily_ret |> mutate(eom = ceiling_date(date, "month") - days(1))

  # Read worker count from environment (infrastructure sets based on vCPU allocation)
  # CTF_N_JOBS: Number of parallel workers (separate from BLAS threading)
  # OMP_NUM_THREADS: BLAS threads per worker (set to 1 to prevent oversubscription)
  # CTF_N_JOBS=-1 means "use all cores" (Python convention), convert to actual CPU count
  n_workers_raw <- Sys.getenv("CTF_N_JOBS", unset = "4")
  n_workers <- as.integer(n_workers_raw)

  # Handle -1 (use all cores) - R's future package requires explicit worker count
  # Cap at 8 workers maximum - parallel computation with future.callr is memory-intensive
  # (observed failures with 16+ workers due to /dev/shm exhaustion and memory pressure)
  if (n_workers == -1) {
    total_cores <- parallel::detectCores()
    n_workers <- min(8, max(1, floor(total_cores * 0.17)))
    cat("[CTF-INFO] CTF_N_JOBS=-1 detected,", total_cores, "cores available, using", n_workers, "workers (capped at 8 for stability)\n")
  } else {
    cat("[CTF-INFO] Configuring parallelism: using", n_workers, "workers from CTF_N_JOBS\n")
  }
  flush(stdout())

  # Use future.callr backend for better process cleanup in containers (prevents zombies)
  # Configure workers with single-threaded BLAS to prevent oversubscription
  cat("[CTF-DEBUG] Setting BLAS environment variables to 1 before spawning workers\n")
  flush(stdout())

  # Set environment variables BEFORE plan() so workers inherit them
  # This is the most reliable way to ensure workers start with single-threaded BLAS
  Sys.setenv(OPENBLAS_NUM_THREADS = 1)
  Sys.setenv(MKL_NUM_THREADS = 1)
  Sys.setenv(OMP_NUM_THREADS = 1)
  Sys.setenv(BLAS_NUM_THREADS = 1)
  Sys.setenv(NUMEXPR_NUM_THREADS = 1)

  cat("[CTF-DEBUG] Environment variables set - spawning callr workers\n")
  flush(stdout())

  plan(future.callr::callr, workers = n_workers)
  options(future.globals.maxSize = 5000*1024^2, future.rng.onMisuse = "ignore")

  cat("[CTF-INFO] Starting data preparation\n")
  flush(stdout())
  log_memory("Before prepare_data")
  chars_normalized <- prepare_data(chars, features)

  # Free original chars data to save memory (~8GB for full dataset)
  rm(chars)
  gc(verbose = FALSE)
  log_memory("After prepare_data (chars freed)")

  cat("[CTF-INFO] Creating rank-weighted factors\n")
  flush(stdout())
  # Single-pass data.table operations (no parallelism needed here)
  rw_factors <- create_rw_factors(chars_normalized, daily_ret, features)
  gc(verbose = FALSE)
  log_memory("After create_rw_factors")

  cat("[CTF-INFO] Creating market returns\n")
  flush(stdout())
  mkt_rets <- create_mkt_rets(chars_normalized, daily_ret)
  gc(verbose = FALSE)
  log_memory("After create_mkt_rets")

  pf_dates <- unique(subset(chars_normalized, ctff_test == TRUE)$eom_ret)
  cat("[CTF-INFO] Found", length(pf_dates), "portfolio dates to process\n")
  flush(stdout())

  l2s <- 10^seq(-10, 0, 0.1)
  l1s <- 10^seq(-10, 4, 0.1)
  n_folds <- 5

  # Don't call plan() again - already set up with callr backend above
  log_memory("Before PARALLEL optimal lambda calculation")
  cat("[KNS-PROGRESS] Starting PARALLEL optimal lambda calculation for", length(pf_dates), "dates with", n_workers, "workers\n")
  flush(stdout())

  # Parallelize across portfolio dates using future_map
  # Workers inherit single-threaded BLAS from plan() env configuration
  opt_lambdas_list <- pf_dates |>
    future_map(function(d) {
      calc_opt_lambdas(d, rw_factors$daily, mkt_rets$daily, features, n_folds, l2s, l1s, use_pc=TRUE)
    }, .progress = TRUE, .options = furrr_options(packages = c("data.table", "tidyverse")))

  opt_lambdas <- rbindlist(opt_lambdas_list) |> setDT()
  rm(opt_lambdas_list)  # Free intermediate list
  gc(verbose = FALSE)
  log_memory("After opt_lambdas calculation")
  cat("[KNS-PROGRESS] Completed optimal lambda calculation for all", length(pf_dates), "dates\n")
  flush(stdout())

  log_memory("Before PARALLEL KNS portfolio weight calculation")
  cat("[KNS-PROGRESS] Starting PARALLEL KNS portfolio weight calculation for", length(pf_dates), "dates with", n_workers, "workers\n")
  flush(stdout())

  # Parallelize across portfolio dates using future_map
  # Workers inherit single-threaded BLAS from plan() env configuration
  w_kns_pc_list <- pf_dates |>
    future_map(function(d) {
      kns(d, rw_factors$daily, mkt_rets$daily, features, opt_lambdas, use_pc=TRUE)
    }, .progress = TRUE, .options = furrr_options(packages = c("data.table", "tidyverse")))

  w_kns_pc <- rbindlist(w_kns_pc_list) |> setDT()
  rm(w_kns_pc_list)  # Free intermediate list
  gc(verbose = FALSE)
  log_memory("After w_kns_pc calculation")
  cat("[KNS-PROGRESS] Completed KNS weight calculation for all", length(pf_dates), "dates\n")
  flush(stdout())

  cat("[KNS-PROGRESS] Creating final portfolio weights\n")
  flush(stdout())

  test_chars <- chars_normalized |>
    filter(ctff_test == TRUE) |>
    select(id, eom, eom_ret, me, all_of(features))

  pf <- unique(test_chars$eom_ret) |>
    future_map(function(d) {
      # Workers inherit single-threaded BLAS from plan() env configuration

      test_chars_d <- test_chars |> filter(eom_ret == d)
      w_kns_pc_d <- w_kns_pc |> filter(eom_ret == d)

      for (f in c(features, "me")) {
        test_chars_d[[f]] <- test_chars_d[[f]] * w_kns_pc_d[[f]]
      }
      test_chars_d
    }, .progress = TRUE, .options = furrr_options(packages = c("data.table", "tidyverse"))) |>
    rbindlist() |>
    setDT()

  pf$w <- pf |> select(all_of(features), "me") |> rowSums()

  cat("[CTF-INFO] KNS computation completed successfully\n")
  flush(stdout())

  # Output
  pf[, .(id, eom, w)]
}


if (interactive()) {
  features <- read_parquet("data/raw/ctff_features.parquet") |> pull(features)
  chars <- read_parquet("data/raw/ctff_chars.parquet")
  daily_ret <- read_parquet("data/raw/ctff_daily_ret.parquet")

  pf <- chars |> main(daily_ret=daily_ret, features=features)
  export_data(pf)
}
