Businesses need to track how their key metrics evolve over time to understand trends, make informed decisions, and comply with reporting requirements. However, maintaining accurate historical records presents several challenges:
Capturing point-in-time snapshots of metrics
Tracking changes in dimension attributes over time
Managing slowly changing dimensions (SCD)
Ensuring data consistency across time periods
Creating reliable trend analysis
With Nekt, companies can implement proper historical tracking using SCD Type 2 methodology, maintaining a complete history of changes while enabling powerful trend analysis and historical reporting. This approach ensures data integrity and provides valuable insights into business performance over time.
Teams can now build comprehensive historical records of their KPIs, tracking
changes in both metrics and dimension attributes. This enables accurate trend
analysis, year-over-year comparisons, and reliable historical reporting that
supports better business decisions.
Data Source Integration
Connect to any data source, from CRMs to custom systems.
SCD Implementation
Implement SCD Type 2 to track historical changes in dimensions and
metrics.
Trend Analysis
Create powerful visualizations to analyze trends and patterns over time.
Here’s how teams are using Nekt to implement SCD Type 2 for dimension tracking:
Connect your data source and make sure you have the table you want to generate the history in your catalog.
Create your SCD transformation using the template below.
Expandable Example
def user_transformation(delta_dict):from typing importList,Optional,Literalfrom datetime import datetimefrom collections import defaultdictfrom pyspark.sqlimport functions asFfrom pyspark.sqlimport types asT # SCD columnsAVAILABLE_FROM="_available_from"AVAILABLE_UNTIL="_available_until"DELETED="_deleted" # Sync atNEKT_SYNC_AT="_nekt_sync_at" def calculate_diffs(*,input_df:DataFrame,output_df:DataFrame,primary_keys:List[str],sync_timestamp: datetime): # Get difference between today and latest scd values # Identify added, deleted, and modified rows output_current_rows_df = output_df.filter((F.col(AVAILABLE_UNTIL).isNull())&(~F.col(DELETED))) added_rows_df = input_df.join(output_current_rows_df, on=primary_keys, how="left_anti").select(input_df["*"]) deleted_rows_df = output_current_rows_df.join(input_df, on=primary_keys, how="left_anti").select(output_df["*"]) common_rows_df = output_current_rows_df.join(input_df, on=primary_keys, how="inner").select(output_df["*"]) # Exclude specific columns from comparison exclude_columns =[AVAILABLE_FROM,AVAILABLE_UNTIL,DELETED,NEKT_SYNC_AT] all_columns =[col for col in input_df.columnsif col not in exclude_columns] # Dynamically generate the condition for modified rows modification_condition =Nonefor col inall_columns: col_condition =(output_df[col]!= input_df[col]) modification_condition = col_condition if modification_condition is Noneelse modification_condition | col_condition modified_rows_df = common_rows_df.filter(modification_condition).select(output_df["*"]) # Update the rows in output_df based on modified_rows_df primary_keys_with_sync_key = primary_keys +[NEKT_SYNC_AT]if not modified_rows_df.limit(1).count()==0: updated_rows =( output_df.join(modified_rows_df, primary_keys_with_sync_key, how="inner").select(output_df["*"]).withColumn(AVAILABLE_UNTIL,F.lit(sync_timestamp).cast(T.TimestampType()))) non_updated_rows = output_df.join(modified_rows_df, primary_keys_with_sync_key, how="left_anti") new_rows =( input_df.join(modified_rows_df.select(*primary_keys), on=primary_keys).withColumn(AVAILABLE_FROM,F.lit(sync_timestamp)).withColumn(AVAILABLE_UNTIL,F.lit(None).cast(T.TimestampType())).withColumn(DELETED,F.lit(False))) output_df = updated_rows.unionByName(non_updated_rows).unionByName(new_rows) # Handle added rowsif not added_rows_df.limit(1).count()==0: added_rows_with_timestamp =( added_rows_df.withColumn(AVAILABLE_FROM,F.lit(sync_timestamp)).withColumn(AVAILABLE_UNTIL,F.lit(None).cast(T.TimestampType())).withColumn(DELETED,F.lit(False))) output_df = output_df.unionByName(added_rows_with_timestamp) # Handle deleted rowsif not deleted_rows_df.limit(1).count()==0: deleted_rows_with_timestamp =( deleted_rows_df.withColumn(AVAILABLE_UNTIL,F.lit(sync_timestamp)).withColumn(DELETED,F.lit(True))) non_updated_rows = output_df.join(deleted_rows_df, primary_keys_with_sync_key, how="left_anti") output_df = non_updated_rows.unionByName(deleted_rows_with_timestamp) # Cast the _available_until column to timestamp output_df = output_df.withColumn(AVAILABLE_UNTIL, output_df[AVAILABLE_UNTIL].cast(T.TimestampType())) record_count = output_df.count() transformation_user_logger.info(f"Record count: {record_count}")return output_df def apply_scd_type2(*, input_delta_table, output_delta_table,primary_keys:List[str],input_delta_version:Optional[int]=None): # Sync timestamp output_df = output_delta_table.toDF()if output_delta_table elseNone history_df = input_delta_table.history() delta_path = input_delta_table.detail().select("location").first()[0] # Get difference between today and latest scd values latest_history_version = history_df.orderBy("version", ascending=False).select("version","timestamp").collect()[0] latest_version = latest_history_version["version"] latest_version_timestamp = latest_history_version["timestamp"] latest_version_df = spark.read.format("delta").option("versionAsOf", latest_version).load(delta_path)if not output_df: output_df =( latest_version_df.withColumn(AVAILABLE_FROM,F.lit(latest_version_timestamp)).withColumn(AVAILABLE_UNTIL,F.lit(None).cast(T.TimestampType())).withColumn(DELETED,F.lit(False)))return output_df output_df =calculate_diffs( input_df=latest_version_df, output_df=output_df, primary_keys=primary_keys, sync_timestamp=latest_version_timestamp,)return output_df # Load delta tablesoutput_table_delta:DeltaTable= delta_dict.get("trusted").get("output_table")input_table_delta:DeltaTable= delta_dict.get("raw").get("input_table") # SCDType2: new_df =apply_scd_type2( input_delta_table=input_table_delta, output_delta_table=output_table_delta, primary_keys=["id"],)return new_df
Change the primary_keys array to have your table primary keys on it
Change the input_table_delta and output_table_delta values to point to your tables
Don’t forget to check the Use output table as input box when creating your transformation on our platform
We suggest you set this transformation trigger as ‘event’ choosing the pipeline of your input table extraction. This will get this history properly updated.
3. Check your output table throughout the days to see your history growing 🙌
Here’s a video tutorial to guide you through the process!