I am trying to build a custom transformer and make it in pyspark pipline but i don't know how.
My goal is to create this transformer to estimate a probability. Here is the code that I use without the transformer:
# Calculate the pass probability for all softbins
softbins_df = (
counts_df
.groupBy('softbin_first_test', 'time_window')
.agg(F.sum('PASS').alias('total_pass'), F.sum('FAIL').alias('total_fail'))
)
# Calculate the prior pass probability for each softbin
softbins_total_counts_df = softbins_df.withColumn('total', F.col('total_pass') + F.col('total_fail'))
prior_pass_prob = F.col('total_pass') / F.col('total')
prior_fail_prob = F.col('total_fail') / F.col('total')
# Calculate probabilities using Bayesian estimation with empirical prior
rho = 0.3
prior_window = Window.partitionBy('softbin_first_test').orderBy(F.col('time_window')).rowsBetween(Window.unboundedPreceding, Window.currentRow)
prior_pass = F.mean(prior_pass_prob).over(prior_window)
prior_fail = F.mean(prior_fail_prob).over(prior_window)
alpha = prior_pass * (1 - rho) / rho
beta = prior_fail * (1 - rho) / rho
pass_prob = alpha / (alpha + beta)
I want to create it to avoid data leakage during the train_test_split because here i calculate the probability of the whole dataset and so when I split the data the training set have information from the future (because the calculated probability uses the whole dataset) that why i want to create this transformer and incorporate a pipeline
Creating a custom transformer is a good choice for dealing with data leakage.
Ouput:
Make appropriate changes as per you data. This is just to showcase
custom transformerfunction