# Before cleaningprint("Original Columns:")
print(df.columns.tolist())
# Output: ['Passenger ID', 'Name ', 'Age', 'Fare (Β£)', ' Cabin', 'Survived?', 'Embarked', 'Pclass']# Clean the headersdf = clean_column_headers(df)
# After cleaningprint("\nCleaned Columns:")
print(df.columns.tolist())
# Output: ['passenger_id', 'name', 'age', 'fare', 'cabin', 'survived', 'embarked', 'pclass']
# Inspect the problemprint("Age column before cleaning:")
print(df['age'].head(10))
print(f"\nData type: {df['age'].dtype}")
print(f"Unique values: {df['age'].unique()}")
# Clean the age columndf = clean_numeric_column(
df,
column='age',
fill_method='median', # Use median (robust to outliers) remove_negatives=False, # Ages shouldn't be negative anyway round_decimals=0 # Ages are whole numbers)
print("\nAge column after cleaning:")
print(df['age'].head(10))
print(f"\nData type: {df['age'].dtype}")
print(f"Statistics: Mean={df['age'].mean():.1f}, Median={df['age'].median():.1f}")
# Fare has different requirements than ageprint("Fare column before cleaning:")
print(df['fare'].unique())
df = clean_numeric_column(
df,
column='fare',
fill_method='median',
remove_negatives=True, # Negative fares don't make sense round_decimals=2 # Money typically has 2 decimal places)
print("\nFare column after cleaning:")
print(df['fare'].describe())
# β DON'T clean non-numeric columns as numeric# df = clean_numeric_column(df, 'name') # This will fail!# β DON'T use mean for data with outliers# Outliers will skew the mean and create unrealistic imputations# β
DO check the data type firstprint(f"Is 'fare' numeric? {pd.api.types.is_numeric_dtype(df['fare'])}")
# β
DO inspect unique values before cleaningprint(f"Problematic values in age: {df[df['age'] == '#REF!']['age'].unique()}")
# Tip 1: Always check BEFORE and AFTERdef clean_with_validation(df, column, **kwargs):
"""Clean numeric column with before/after validation""" print(f"π Cleaning '{column}'...")
print(f"Before: {df[column].isna().sum()} missing, {df[column].dtype} dtype")
df = clean_numeric_column(df, column, **kwargs)
print(f"After: {df[column].isna().sum()} missing, {df[column].dtype} dtype")
print(f"Range: {df[column].min():.2f} to {df[column].max():.2f}\n")
return df
# Tip 2: Handle multiple numeric columns efficientlynumeric_columns = ['age', 'fare', 'pclass']
for col in numeric_columns:
df = clean_with_validation(df, col)
# Tip 3: Create a cleaning reportcleaning_report = {
'age': {'method': 'median', 'filled': 2, 'reason': 'Robust to outliers'},
'fare': {'method': 'median', 'filled': 1, 'reason': 'Contains outliers'},
}
print("Cleaning Report:", cleaning_report)
# Compare distributions before and afterimport matplotlib.pyplot as plt
fig, axes = plt.subplots(1, 2, figsize=(12, 4))
# Before (load original data)df_original = pd.read_csv('titanic_dirty.csv')
df_original['Age'] = pd.to_numeric(df_original['Age'], errors='coerce')
# Plotaxes[0].hist(df_original['Age'].dropna(), bins=20, edgecolor='black')
axes[0].set_title('Age Distribution - Before Cleaning')
axes[0].set_xlabel('Age')
axes[1].hist(df['age'], bins=20, edgecolor='black')
axes[1].set_title('Age Distribution - After Cleaning')
axes[1].set_xlabel('Age')
plt.tight_layout()
plt.savefig('numeric_cleaning_comparison.png')
print("β
Saved comparison plot")
# β DON'T remove duplicates without investigating first# Some "duplicates" might be legitimate (e.g., same name, different person)# β DON'T forget to specify subset when needed# remove_duplicates(df) # Removes ONLY if ALL columns match# remove_duplicates(df, subset=['name']) # More flexible# β
DO investigate duplicates before removingduplicates = df[df.duplicated(subset=['name'], keep=False)]
print(f"Found {len(duplicates)} duplicate names")
print(duplicates[['name', 'age', 'fare']]) # Compare the records# β
DO document your duplicate removal strategyduplicate_removal_notes = """Duplicate Removal Strategy:- Remove complete duplicate rows (all columns identical)- Keep first occurrence when passenger_id matches- Investigate name duplicates manually (might be family members)"""
# Tip 1: Create a duplicate analysis functiondef analyze_duplicates(df, subset_columns=None):
"""Analyze duplicate patterns in the dataset""" if subset_columns:
dups = df[df.duplicated(subset=subset_columns, keep=False)]
print(f"Duplicates based on {subset_columns}:")
else:
dups = df[df.duplicated(keep=False)]
print("Complete duplicates:")
print(f" Total duplicate rows: {len(dups)}")
print(f" Unique duplicate sets: {dups.duplicated(subset=subset_columns).sum()}")
if len(dups) > 0:
print("\nExample duplicates:")
print(dups.head().to_string())
return dups
# Tip 2: Use duplicates for data quality assessmentdup_analysis = analyze_duplicates(df, subset_columns=['name'])
# Tip 3: Create a duplicate removal reportdef remove_duplicates_with_report(df, **kwargs):
"""Remove duplicates and generate detailed report""" before_count = len(df)
before_duplicates = df.duplicated().sum()
df_clean = remove_duplicates(df, **kwargs)
after_count = len(df_clean)
removed = before_count - after_count
report = {
'rows_before': before_count,
'rows_after': after_count,
'duplicates_found': before_duplicates,
'rows_removed': removed,
'percentage_removed': (removed / before_count * 100) if before_count > 0 else 0 }
print("\nπ Duplicate Removal Report:")
for key, value in report.items():
print(f" {key}: {value}")
return df_clean, report
# Advanced: Find columns with high duplicate ratesdef find_high_duplicate_columns(df, threshold=0.5):
"""Find columns where >threshold values are duplicates""" results = {}
for col in df.columns:
unique_ratio = df[col].nunique() / len(df)
if unique_ratio < threshold:
results[col] = {
'unique_values': df[col].nunique(),
'total_values': len(df),
'duplicate_rate': 1 - unique_ratio
}
return pd.DataFrame(results).T
print("Columns with high duplicate rates:")
print(find_high_duplicate_columns(df))
# First, understand your missing dataprint("Missing Values Analysis:")
print("=" * 50)
missing_stats = pd.DataFrame({
'Column': df.columns,
'Missing_Count': df.isnull().sum().values,
'Missing_Percent': (df.isnull().sum() / len(df) * 100).round(2).values,
'Data_Type': df.dtypes.values
})
print(missing_stats[missing_stats['Missing_Count'] > 0].to_string(index=False))
# Visualize missing data patternimport matplotlib.pyplot as plt
missing_data = df.isnull()
plt.figure(figsize=(10, 6))
plt.imshow(missing_data, cmap='binary', aspect='auto')
plt.xlabel('Columns')
plt.ylabel('Rows')
plt.title('Missing Data Pattern (Black = Missing)')
plt.xticks(range(len(df.columns)), df.columns, rotation=45)
plt.tight_layout()
plt.savefig('missing_data_pattern.png')
print("β
Saved missing data visualization")
# β DON'T fill all columns with the same method# for col in df.columns:# df = fill_missing_values(df, col, method='mean') # BAD!# β DON'T fill missing values without understanding WHY they're missing# Missing at random? Missing not at random? Systematic bias?# β DON'T use mean for skewed dataskewed_data = df['fare']
print(f"Fare - Mean: {skewed_data.mean():.2f}, Median: {skewed_data.median():.2f}")
# If these differ significantly, use median!# β
DO analyze missing data patterns firstdef missing_data_report(df):
"""Generate comprehensive missing data report""" report = []
for col in df.columns:
missing_count = df[col].isnull().sum()
if missing_count > 0:
missing_pct = (missing_count / len(df)) * 100 dtype = df[col].dtype
# Recommended strategy if pd.api.types.is_numeric_dtype(df[col]):
recommended = 'median (or mean if normally distributed)' else:
recommended = 'mode (or custom value with domain knowledge)' report.append({
'Column': col,
'Missing': missing_count,
'Percent': f'{missing_pct:.1f}%',
'Type': dtype,
'Recommended': recommended
})
return pd.DataFrame(report)
print("\nπ Missing Data Strategy Guide:")
print(missing_data_report(df).to_string(index=False))
# β
DO consider dropping columns with >50% missing datahigh_missing = [col for col in df.columns if df[col].isnull().sum() / len(df) > 0.5]
if high_missing:
print(f"\nβ οΈ Columns with >50% missing data (consider dropping): {high_missing}")
# Tip 1: Create a flexible filling functiondef smart_fill_missing(df, column, strategy='auto'):
""" Intelligently fill missing values based on data type and distribution """ if strategy == 'auto':
if pd.api.types.is_numeric_dtype(df[column]):
# Check for skewness skewness = df[column].skew()
if abs(skewness) > 1: # Highly skewed method = 'median' print(f" {column}: Using median (skewed data)")
else:
method = 'mean' print(f" {column}: Using mean (normally distributed)")
else:
method = 'mode' print(f" {column}: Using mode (categorical)")
else:
method = strategy
return fill_missing_values(df, column, method=method)
# Tip 2: Track all imputation decisionsimputation_log = {
'age': {'method': 'median', 'value': df['age'].median(), 'reason': 'Robust to outliers'},
'cabin': {'method': 'mode', 'value': df['cabin'].mode()[0], 'reason': 'Most common cabin'},
'embarked': {'method': 'custom', 'value': 'Unknown', 'reason': 'Domain knowledge'}
}
print("\nπ Imputation Log:")
for col, info in imputation_log.items():
print(f" {col}: {info['method']} = {info['value']} ({info['reason']})")
# Tip 3: Create indicator variables for missing data# Sometimes "missingness" is informative!df['cabin_was_missing'] = df['cabin'].isnull().astype(int)
df['age_was_missing'] = df['age'].isnull().astype(int)
print("\nMissing data indicators created - useful for ML models!")
# Compare statistics before and after imputationdef compare_imputation_impact(df_before, df_after, column):
"""Compare statistics before and after imputation""" before_stats = df_before[column].describe()
after_stats = df_after[column].describe()
comparison = pd.DataFrame({
'Before': before_stats,
'After': after_stats,
'Change': after_stats - before_stats
})
print(f"\nπ Imputation Impact on '{column}':")
print(comparison)
return comparison
# Example usage:# compare_imputation_impact(df_original, df, 'age')
# For critical analyses, consider multiple imputationdef multiple_imputation_comparison(df, column, methods=['mean', 'median', 'mode']):
"""Compare multiple imputation methods""" results = {}
for method in methods:
df_temp = df.copy()
df_temp = fill_missing_values(df_temp, column, method=method)
results[method] = {
'mean': df_temp[column].mean(),
'median': df_temp[column].median(),
'std': df_temp[column].std(),
'min': df_temp[column].min(),
'max': df_temp[column].max()
}
comparison_df = pd.DataFrame(results).T
print(f"\n㪠Multiple Imputation Comparison for '{column}':")
print(comparison_df)
return comparison_df
# Example:# multiple_imputation_comparison(df, 'age')
# Step 1: Detect outliers first (don't remove yet!)outlier_mask = detect_outliers_iqr(df, 'fare', factor=1.5)
print(f"Total passengers: {len(df)}")
print(f"Outliers detected: {outlier_mask.sum()}")
print(f"Percentage: {(outlier_mask.sum() / len(df) * 100):.1f}%")
# Step 2: Examine the outliersprint("\nOutlier Fares:")
outlier_fares = df[outlier_mask][['name', 'fare', 'pclass']]
print(outlier_fares.sort_values('fare', ascending=False))
# Step 3: Visualize the outliersimport matplotlib.pyplot as plt
fig, axes = plt.subplots(1, 2, figsize=(12, 4))
# Box plotaxes[0].boxplot(df['fare'])
axes[0].set_title('Fare Distribution with Outliers')
axes[0].set_ylabel('Fare (Β£)')
# Histogramaxes[1].hist(df['fare'], bins=20, edgecolor='black')
axes[1].axvline(df['fare'].median(), color='red', linestyle='--', label='Median')
axes[1].axvline(df['fare'].mean(), color='blue', linestyle='--', label='Mean')
axes[1].set_title('Fare Distribution')
axes[1].set_xlabel('Fare (Β£)')
axes[1].legend()
plt.tight_layout()
plt.savefig('outlier_detection.png')
print("β
Saved outlier visualization")
# Check outliers in multiple columnsnumeric_cols = ['age', 'fare']
outlier_summary = {}
for col in numeric_cols:
outliers = detect_outliers_iqr(df, col, factor=1.5)
outlier_summary[col] = {
'count': outliers.sum(),
'percentage': (outliers.sum() / len(df) * 100),
'min_outlier': df[outliers][col].min() if outliers.sum() > 0 else None,
'max_outlier': df[outliers][col].max() if outliers.sum() > 0 else None }
print("\nπ Outlier Summary Across Columns:")
summary_df = pd.DataFrame(outlier_summary).T
print(summary_df)
# β DON'T remove outliers blindly# remove_outliers_iqr(df, 'fare') # Without understanding WHY they're outliers# β DON'T remove outliers from the target variable in ML# y_train = remove_outliers_iqr(y_train, 'survival') # NO! This biases predictions# β DON'T use the same factor for all columns# Different columns have different distributions# β
DO investigate outliers firstdef investigate_outliers(df, column, factor=1.5):
"""Detailed outlier investigation""" outlier_mask = detect_outliers_iqr(df, column, factor)
outliers = df[outlier_mask]
print(f"\nπ Outlier Investigation: '{column}'")
print(f"{'='*50}")
print(f"Number of outliers: {len(outliers)}")
print(f"Percentage: {(len(outliers)/len(df)*100):.2f}%")
if len(outliers) > 0:
print(f"\nOutlier Statistics:")
print(f" Min: {outliers[column].min():.2f}")
print(f" Max: {outliers[column].max():.2f}")
print(f" Mean: {outliers[column].mean():.2f}")
print(f"\nNormal Range:")
Q1 = df[column].quantile(0.25)
Q3 = df[column].quantile(0.75)
IQR = Q3 - Q1
print(f" Lower bound: {Q1 - factor * IQR:.2f}")
print(f" Upper bound: {Q3 + factor * IQR:.2f}")
print(f"\nOutlier Examples:")
print(outliers[[column] + [c for c in df.columns if c != column]].head())
return outliers
# β
DO compare different factor valuesoutliers_1_5 = detect_outliers_iqr(df, 'fare', factor=1.5)
outliers_3_0 = detect_outliers_iqr(df, 'fare', factor=3.0)
print(f"Outliers with factor 1.5: {outliers_1_5.sum()}")
print(f"Outliers with factor 3.0: {outliers_3_0.sum()}")
# Tip 1: Create an outlier analysis reportdef outlier_analysis_report(df, numeric_columns, factor=1.5):
"""Generate comprehensive outlier report""" report = []
for col in numeric_columns:
if col in df.columns:
outliers = detect_outliers_iqr(df, col, factor)
Q1 = df[col].quantile(0.25)
Q3 = df[col].quantile(0.75)
IQR = Q3 - Q1
report.append({
'Column': col,
'Outliers': outliers.sum(),
'Percent': f'{(outliers.sum()/len(df)*100):.1f}%',
'Q1': f'{Q1:.2f}',
'Q3': f'{Q3:.2f}',
'IQR': f'{IQR:.2f}',
'Lower_Bound': f'{Q1 - factor * IQR:.2f}',
'Upper_Bound': f'{Q3 + factor * IQR:.2f}' })
return pd.DataFrame(report)
print("\nπ Outlier Analysis Report:")
print(outlier_analysis_report(df, ['age', 'fare']).to_string(index=False))
# Tip 2: Capping instead of removingdef cap_outliers(df, column, factor=1.5):
"""Cap outliers instead of removing them""" Q1 = df[column].quantile(0.25)
Q3 = df[column].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - factor * IQR
upper_bound = Q3 + factor * IQR
df_capped = df.copy()
df_capped[column] = df_capped[column].clip(lower_bound, upper_bound)
capped_count = ((df[column] < lower_bound) | (df[column] > upper_bound)).sum()
print(f"β
Capped {capped_count} outliers in '{column}'")
return df_capped
# Tip 3: Z-score method (alternative to IQR)def detect_outliers_zscore(df, column, threshold=3):
"""Detect outliers using Z-score method""" z_scores = np.abs((df[column] - df[column].mean()) / df[column].std())
return z_scores > threshold
# Compare methodsiqr_outliers = detect_outliers_iqr(df, 'fare', factor=1.5)
z_outliers = detect_outliers_zscore(df, 'fare', threshold=3)
print(f"\nOutlier Detection Comparison:")
print(f" IQR method: {iqr_outliers.sum()} outliers")
print(f" Z-score method: {z_outliers.sum()} outliers")
print(f" Both methods agree on: {(iqr_outliers & z_outliers).sum()} outliers")
def compare_outlier_removal(df_before, df_after, column):
"""Visualize impact of outlier removal""" fig, axes = plt.subplots(2, 2, figsize=(12, 10))
# Before - Box plot axes[0, 0].boxplot(df_before[column])
axes[0, 0].set_title(f'{column} - Before (with outliers)')
axes[0, 0].set_ylabel('Value')
# After - Box plot axes[0, 1].boxplot(df_after[column])
axes[0, 1].set_title(f'{column} - After (outliers removed)')
axes[0, 1].set_ylabel('Value')
# Before - Histogram axes[1, 0].hist(df_before[column], bins=20, edgecolor='black')
axes[1, 0].set_title('Distribution - Before')
axes[1, 0].set_xlabel(column)
# After - Histogram axes[1, 1].hist(df_after[column], bins=20, edgecolor='black', color='green')
axes[1, 1].set_title('Distribution - After')
axes[1, 1].set_xlabel(column)
plt.tight_layout()
plt.savefig(f'{column}_outlier_comparison.png')
print(f"β
Saved comparison for {column}")
# Usage:# compare_outlier_removal(df, df_clean, 'fare')
# Before cleaningprint("Embarked - Before:")
print(df['embarked'].value_counts())
# Clean with all optionsdf = clean_text_column(
df,
column='embarked',
lowercase=True, # 'Southampton' β 'southampton' remove_extra_spaces=True, # ' Cherbourg' β 'cherbourg' standardize_missing=True # 'unknown' β NaN)
print("\nEmbarked - After:")
print(df['embarked'].value_counts())
# β DON'T lowercase everything blindly# df = clean_text_column(df, 'cabin', lowercase=True)# Result: 'C85' β 'c85' (loses readability)# β DON'T clean numeric columns as text# df = clean_text_column(df, 'age') # This will cause problems!# β DON'T forget that standardize_missing removes many valuesbefore_count = df['cabin'].notna().sum()
df = clean_text_column(df, 'cabin', standardize_missing=True)
after_count = df['cabin'].notna().sum()
print(f"Values changed to NaN: {before_count - after_count}")
# β
DO check data type firsttext_cols = df.select_dtypes(include=['object']).columns
print(f"Text columns: {text_cols.tolist()}")
# β
DO inspect unique values before and afterdef clean_with_comparison(df, column, **kwargs):
"""Clean text column and show comparison""" print(f"\nπ Cleaning '{column}'...")
unique_before = df[column].nunique()
values_before = df[column].unique()[:5] # Show first 5 df = clean_text_column(df, column, **kwargs)
unique_after = df[column].nunique()
values_after = df[column].unique()[:5]
print(f" Unique values: {unique_before} β {unique_after}")
print(f" Sample before: {values_before}")
print(f" Sample after: {values_after}")
return df
# Tip 1: Advanced text cleaning with regexdef advanced_text_clean(df, column):
"""Advanced text cleaning with custom rules""" df = df.copy()
# Remove special characters (keep letters, numbers, spaces) df[column] = df[column].str.replace(r'[^a-zA-Z0-9\s]', '', regex=True)
# Remove multiple spaces df[column] = df[column].str.replace(r'\s+', ' ', regex=True)
# Strip leading/trailing spaces df[column] = df[column].str.strip()
# Convert to title case (First Letter Capitalized) df[column] = df[column].str.title()
print(f"β
Advanced cleaning completed on '{column}'")
return df
# Tip 2: Standardize categorical valuesdef standardize_categories(df, column, mapping=None):
"""Standardize category names using a mapping""" if mapping is None:
# Auto-create mapping from lowercase mapping = {val: val.lower() for val in df[column].unique() if pd.notna(val)}
df[column] = df[column].map(mapping).fillna(df[column])
print(f"β
Standardized categories in '{column}'")
print(f" Unique values: {df[column].nunique()}")
return df
# Example: Standardize embarkation portsport_mapping = {
'Southampton': 'southampton',
'Cherbourg': 'cherbourg',
'Queenstown': 'queenstown',
'Unknown': np.nan
}
df = standardize_categories(df, 'embarked', port_mapping)
# Tip 3: Create a text cleaning reportdef text_cleaning_report(df, column, before_values):
"""Generate report showing text cleaning impact""" after_values = df[column].unique()
report = {
'Unique_Before': len(before_values),
'Unique_After': len(after_values),
'Values_Merged': len(before_values) - len(after_values),
'Missing_Before': pd.isna(before_values).sum(),
'Missing_After': df[column].isna().sum()
}
print(f"\nπ Text Cleaning Report: '{column}'")
for key, value in report.items():
print(f" {key}: {value}")
return report
# Tip 4: Handle common text variationsdef handle_common_variations(df, column):
"""Handle common text variations (y/n, yes/no, etc.)""" df = df.copy()
# Standardize yes/no variations yes_variations = ['yes', 'y', 'true', '1', 'yeah', 'yep']
no_variations = ['no', 'n', 'false', '0', 'nah', 'nope']
df[column] = df[column].str.lower()
df.loc[df[column].isin(yes_variations), column] = 'yes' df.loc[df[column].isin(no_variations), column] = 'no' return df
def assess_text_quality(df, column):
"""Assess the quality of text data in a column""" quality_metrics = {
'Total_Values': len(df[column]),
'Unique_Values': df[column].nunique(),
'Missing_Values': df[column].isna().sum(),
'Empty_Strings': (df[column] == '').sum(),
'Whitespace_Only': df[column].str.strip().eq('').sum(),
'Mixed_Case': df[column].str.contains(r'[a-z].*[A-Z]|[A-Z].*[a-z]', regex=True).sum(),
'Extra_Spaces': df[column].str.contains(r'\s{2,}', regex=True).sum(),
'Leading_Trailing_Spaces': df[column].str.strip().ne(df[column]).sum()
}
print(f"\nπ Text Quality Assessment: '{column}'")
print("="*50)
for metric, value in quality_metrics.items():
percentage = (value / len(df) * 100) if len(df) > 0 else 0 print(f" {metric}: {value} ({percentage:.1f}%)")
# Identify issues issues = []
if quality_metrics['Empty_Strings'] > 0:
issues.append("Empty strings detected")
if quality_metrics['Extra_Spaces'] > 0:
issues.append("Extra whitespace detected")
if quality_metrics['Leading_Trailing_Spaces'] > 0:
issues.append("Leading/trailing spaces detected")
if issues:
print("\nβ οΈ Issues Found:")
for issue in issues:
print(f" β’ {issue}")
else:
print("\nβ
No text quality issues detected!")
return quality_metrics
# Example usage:# assess_text_quality(df, 'name')
def normalize_text_pipeline(df, column,
lowercase=True,
remove_special_chars=True,
remove_numbers=False,
standardize_whitespace=True):
""" Comprehensive text normalization pipeline """ df = df.copy()
original_column = df[column].copy()
print(f"\nπ§ Text Normalization Pipeline: '{column}'")
print("="*50)
# Step 1: Lowercase if lowercase:
df[column] = df[column].str.lower()
print(" β
Converted to lowercase")
# Step 2: Remove special characters if remove_special_chars:
df[column] = df[column].str.replace(r'[^\w\s]', '', regex=True)
print(" β
Removed special characters")
# Step 3: Remove numbers if remove_numbers:
df[column] = df[column].str.replace(r'\d+', '', regex=True)
print(" β
Removed numbers")
# Step 4: Standardize whitespace if standardize_whitespace:
df[column] = df[column].str.replace(r'\s+', ' ', regex=True).str.strip()
print(" β
Standardized whitespace")
# Summary changed = (original_column != df[column]).sum()
print(f"\n Total values changed: {changed} ({changed/len(df)*100:.1f}%)")
return df
# Example:# df = normalize_text_pipeline(df, 'name',# lowercase=True,# remove_special_chars=False,# remove_numbers=False,# standardize_whitespace=True)
# After loading your data, ALWAYS do this first!print_data_summary(df)
# Output:# π DATA SUMMARY# ==================================================# Shape: (15, 8)# Total Rows: 15# Total Columns: 8# Memory Usage: 0.01 MB# Duplicate Rows: 2## π COLUMNS (8)# 1. passenger_id | int64 | Missing: 0 ( 0.0%)# 2. name | object | Missing: 0 ( 0.0%)# 3. age | float64 | Missing: 2 ( 13.3%)# 4. fare | float64 | Missing: 1 ( 6.7%)# 5. cabin | object | Missing: 4 ( 26.7%)# 6. survived | float64 | Missing: 1 ( 6.7%)# 7. embarked | object | Missing: 0 ( 0.0%)# 8. pclass | int64 | Missing: 0 ( 0.0%)
# Get summary as a dictionary for programmatic usesummary = get_data_summary(df)
# Access specific informationprint(f"Dataset has {summary['total_rows']} rows")
print(f"Memory usage: {summary['memory_usage_mb']:.3f} MB")
# Find columns with high missing datahigh_missing = {
col: pct
for col, pct in summary['missing_percentage'].items()
if pct > 20}
print(f"\nColumns with >20% missing data: {high_missing}")
# Create comparison reportdef compare_datasets(df_before, df_after, label_before="Before", label_after="After"):
"""Compare two versions of a dataset""" summary_before = get_data_summary(df_before)
summary_after = get_data_summary(df_after)
print(f"\nπ DATASET COMPARISON: {label_before} vs {label_after}")
print("="*60)
# Row comparison print(f"\nRows:")
print(f" {label_before}: {summary_before['total_rows']}")
print(f" {label_after}: {summary_after['total_rows']}")
print(f" Change: {summary_after['total_rows'] - summary_before['total_rows']}")
# Duplicates comparison print(f"\nDuplicates:")
print(f" {label_before}: {summary_before['duplicates']}")
print(f" {label_after}: {summary_after['duplicates']}")
# Missing data comparison print(f"\nTotal Missing Values:")
missing_before = sum(summary_before['missing_values'].values())
missing_after = sum(summary_after['missing_values'].values())
print(f" {label_before}: {missing_before}")
print(f" {label_after}: {missing_after}")
print(f" Reduced by: {missing_before - missing_after}")
# Memory comparison print(f"\nMemory Usage:")
print(f" {label_before}: {summary_before['memory_usage_mb']:.3f} MB")
print(f" {label_after}: {summary_after['memory_usage_mb']:.3f} MB")
return {
'before': summary_before,
'after': summary_after
}
# Usage:# compare_datasets(df_original, df_cleaned, "Original", "Cleaned")
def missing_data_dashboard(df):
"""Create a comprehensive missing data visualization""" import matplotlib.pyplot as plt
summary = get_data_summary(df)
missing_pct = summary['missing_percentage']
# Filter columns with missing data cols_with_missing = {k: v for k, v in missing_pct.items() if v > 0}
if not cols_with_missing:
print("β
No missing data found!")
return # Create visualization fig, axes = plt.subplots(1, 2, figsize=(14, 5))
# Bar chart cols = list(cols_with_missing.keys())
values = list(cols_with_missing.values())
axes[0].barh(cols, values, color='coral')
axes[0].set_xlabel('Missing Data (%)')
axes[0].set_title('Missing Data by Column')
axes[0].axvline(50, color='red', linestyle='--', label='50% threshold')
axes[0].legend()
# Heatmap-style visualization missing_matrix = df[cols].isnull().astype(int)
axes[1].imshow(missing_matrix.T, cmap='RdYlGn_r', aspect='auto')
axes[1].set_yticks(range(len(cols)))
axes[1].set_yticklabels(cols)
axes[1].set_xlabel('Row Index')
axes[1].set_title('Missing Data Pattern')
plt.tight_layout()
plt.savefig('missing_data_dashboard.png', dpi=300, bbox_inches='tight')
print("β
Saved missing data dashboard")
# Print summary print(f"\nπ Missing Data Summary:")
print(f" Columns with missing data: {len(cols_with_missing)}")
print(f" Highest missing %: {max(values):.1f}% ({cols[values.index(max(values))]})")
print(f" Average missing %: {np.mean(values):.1f}%")
# Usage:# missing_data_dashboard(df)
# Numeric summary contains these statistics:summary = get_data_summary(df)
if 'numeric_summary' in summary:
age_stats = summary['numeric_summary']['age']
print("\nπ Understanding Numeric Statistics:")
print(f" count: Number of non-missing values ({age_stats['count']})")
print(f" mean: Average value ({age_stats['mean']:.2f})")
print(f" std: Standard deviation ({age_stats['std']:.2f})")
print(f" min: Minimum value ({age_stats['min']:.2f})")
print(f" 25%: First quartile (Q1) ({age_stats['25%']:.2f})")
print(f" 50%: Median (Q2) ({age_stats['50%']:.2f})")
print(f" 75%: Third quartile (Q3) ({age_stats['75%']:.2f})")
print(f" max: Maximum value ({age_stats['max']:.2f})")
# Tip 1: Create a data quality scoredef calculate_data_quality_score(df):
"""Calculate overall data quality score (0-100)""" summary = get_data_summary(df)
# Factors affecting quality total_cells = summary['total_rows'] * summary['total_columns']
missing_cells = sum(summary['missing_values'].values())
missing_ratio = missing_cells / total_cells if total_cells > 0 else 0 duplicate_ratio = summary['duplicates'] / summary['total_rows'] if summary['total_rows'] > 0 else 0 # Calculate score (100 = perfect) quality_score = 100 * (1 - missing_ratio) * (1 - duplicate_ratio)
print(f"\nβ Data Quality Score: {quality_score:.1f}/100")
print(f" Missing data penalty: {missing_ratio * 100:.1f}%")
print(f" Duplicate penalty: {duplicate_ratio * 100:.1f}%")
if quality_score >= 90:
grade = "A - Excellent" elif quality_score >= 80:
grade = "B - Good" elif quality_score >= 70:
grade = "C - Fair" elif quality_score >= 60:
grade = "D - Poor" else:
grade = "F - Needs Major Cleaning" print(f" Grade: {grade}")
return quality_score
# Tip 2: Export summary to filedef export_summary_report(df, filename='data_summary_report.txt'):
"""Export data summary to text file""" import sys
from io import StringIO
# Capture printed output old_stdout = sys.stdout
sys.stdout = captured_output = StringIO()
print_data_summary(df)
summary_text = captured_output.getvalue()
sys.stdout = old_stdout
# Write to file with open(filename, 'w') as f:
f.write(summary_text)
print(f"β
Summary exported to {filename}")
# Tip 3: Column-specific deep divedef deep_dive_column(df, column):
"""Detailed analysis of a specific column""" print(f"\n㪠Deep Dive: '{column}'")
print("="*60)
# Basic info print(f"Data type: {df[column].dtype}")
print(f"Total values: {len(df[column])}")
print(f"Missing values: {df[column].isna().sum()} ({df[column].isna().sum()/len(df)*100:.1f}%)")
print(f"Unique values: {df[column].nunique()}")
# Type-specific analysis if pd.api.types.is_numeric_dtype(df[column]):
print(f"\nπ Numeric Statistics:")
print(df[column].describe())
# Check for outliers outliers = detect_outliers_iqr(df, column, factor=1.5)
print(f"\nβ οΈ Potential outliers: {outliers.sum()}")
else: # Categorical/Text print(f"\nπ Value Counts:")
print(df[column].value_counts().head(10))
if df[column].nunique() < 20:
print(f"\nπ All unique values:")
print(df[column].unique())
# Tip 4: Generate HTML reportdef generate_html_report(df, filename='data_report.html'):
"""Generate an HTML report with styling""" summary = get_data_summary(df)
html = f""" <html> <head> <title>Data Cleaning Report</title> <style> body {{ font-family: Arial, sans-serif; margin: 20px; }} h1 {{ color: #2c3e50; }} h2 {{ color: #34495e; }} table {{ border-collapse: collapse; width: 100%; margin: 20px 0; }} th, td {{ border: 1px solid #ddd; padding: 12px; text-align: left; }} th {{ background-color: #3498db; color: white; }} tr:nth-child(even) {{ background-color: #f2f2f2; }} .metric {{ display: inline-block; margin: 10px; padding: 15px; background: #ecf0f1; border-radius: 5px; }} </style> </head> <body> <h1>π Data Cleaning Report</h1> <h2>Overview</h2> <div class="metric"><strong>Rows:</strong> {summary['total_rows']}</div> <div class="metric"><strong>Columns:</strong> {summary['total_columns']}</div> <div class="metric"><strong>Duplicates:</strong> {summary['duplicates']}</div> <div class="metric"><strong>Memory:</strong> {summary['memory_usage_mb']:.2f} MB</div> <h2>Column Details</h2> <table> <tr> <th>Column</th> <th>Data Type</th> <th>Missing Count</th> <th>Missing %</th> </tr> """ for col in summary['columns']:
html += f""" <tr> <td>{col}</td> <td>{summary['dtypes'][col]}</td> <td>{summary['missing_values'][col]}</td> <td>{summary['missing_percentage'][col]:.1f}%</td> </tr> """ html += """ </table> </body> </html> """ with open(filename, 'w') as f:
f.write(html)
print(f"β
HTML report saved to {filename}")
# Usage examples:# calculate_data_quality_score(df)# export_summary_report(df)# deep_dive_column(df, 'age')# generate_html_report(df)
class DataCleaningTracker:
"""Track data cleaning progress across multiple steps""" def __init__(self, df, name="Dataset"):
self.name = name
self.checkpoints = []
self.add_checkpoint(df, "Initial State")
def add_checkpoint(self, df, description):
"""Add a checkpoint with current state""" summary = get_data_summary(df)
checkpoint = {
'step': len(self.checkpoints) + 1,
'description': description,
'rows': summary['total_rows'],
'missing_total': sum(summary['missing_values'].values()),
'duplicates': summary['duplicates'],
'memory_mb': summary['memory_usage_mb']
}
self.checkpoints.append(checkpoint)
print(f"β
Checkpoint {checkpoint['step']}: {description}")
def print_progress(self):
"""Print cleaning progress summary""" print(f"\nπ Cleaning Progress for '{self.name}'")
print("="*80)
for cp in self.checkpoints:
print(f"\nStep {cp['step']}: {cp['description']}")
print(f" Rows: {cp['rows']}")
print(f" Missing values: {cp['missing_total']}")
print(f" Duplicates: {cp['duplicates']}")
print(f" Memory: {cp['memory_mb']:.2f} MB")
# Calculate improvements if len(self.checkpoints) > 1:
initial = self.checkpoints[0]
final = self.checkpoints[-1]
print(f"\nπ― Overall Improvements:")
print(f" Rows removed: {initial['rows'] - final['rows']}")
print(f" Missing values reduced: {initial['missing_total'] - final['missing_total']}")
print(f" Duplicates removed: {initial['duplicates'] - final['duplicates']}")
print(f" Memory saved: {initial['memory_mb'] - final['memory_mb']:.2f} MB")
# Usage:# tracker = DataCleaningTracker(df, "Titanic Dataset")## df = clean_column_headers(df)# tracker.add_checkpoint(df, "Cleaned headers")## df = remove_duplicates(df)# tracker.add_checkpoint(df, "Removed duplicates")## tracker.print_progress()
# Load dirty datadf = pd.read_csv('titanic_dirty.csv')
# One-line cleaning!df_clean = quick_clean(df)
# Output:# π§Ή Starting quick clean...# β
Cleaned column headers# β
Removed 2 duplicate rows# β
Cleaned numeric column: age# β
Cleaned numeric column: fare# β
Cleaned numeric column: pclass# β
Cleaned text column: name# β
Cleaned text column: cabin# β
Cleaned text column: embarked# π Quick clean completed!
# Save original for comparisondf_original = df.copy()
# Quick cleandf_clean = quick_clean(df)
# Compareprint("\nπ Cleaning Impact:")
print(f"Rows: {len(df_original)} β {len(df_clean)}")
print(f"Missing values: {df_original.isnull().sum().sum()} β {df_clean.isnull().sum().sum()}")
print(f"Memory: {df_original.memory_usage(deep=True).sum()/1024**2:.2f} MB β {df_clean.memory_usage(deep=True).sum()/1024**2:.2f} MB")
# β DON'T use quick_clean for production without testing# df = quick_clean(df) # What if something goes wrong?# β DON'T use quick_clean when you need custom parameters# quick_clean() uses default median/mode/lowercase# But maybe you need mean, or want to preserve case!# β DON'T use quick_clean on data with complex requirements# Complex business logic? Use individual functions!# β
DO use quick_clean for initial explorationprint("Original data:")
print_data_summary(df)
df_quick = quick_clean(df.copy())
print("\nAfter quick clean:")
print_data_summary(df_quick)
# β
DO create a custom quick clean for your domaindef quick_clean_medical(df):
"""Custom quick clean for medical data""" print("π₯ Starting medical data quick clean...")
df = clean_column_headers(df)
df = remove_duplicates(df)
# Medical-specific cleaning for col in ['patient_id', 'age', 'weight', 'height']:
if col in df.columns:
df = clean_numeric_column(df, col, fill_method='median')
for col in ['diagnosis', 'medication', 'doctor']:
if col in df.columns:
df = clean_text_column(df, col, lowercase=False) # Keep case! print("π Medical quick clean completed!")
return df
# β
DO validate results after quick cleandef validate_quick_clean(df_before, df_after):
"""Validate quick clean results""" issues = []
# Check for unexpected data loss rows_lost = len(df_before) - len(df_after)
if rows_lost > len(df_before) * 0.1: # Lost >10% of rows issues.append(f"β οΈ Lost {rows_lost} rows ({rows_lost/len(df_before)*100:.1f}%)")
# Check for unexpected column removal cols_lost = set(df_before.columns) - set(df_after.columns)
if cols_lost:
issues.append(f"β οΈ Lost columns: {cols_lost}")
# Check for data type changes for col in df_before.columns:
if col in df_after.columns:
if df_before[col].dtype != df_after[col].dtype:
issues.append(f"β οΈ {col}: {df_before[col].dtype} β {df_after[col].dtype}")
if issues:
print("\nβ οΈ Validation Issues:")
for issue in issues:
print(f" {issue}")
else:
print("\nβ
Validation passed!")
return len(issues) == 0
# Tip 1: Create a configurable quick cleandef configurable_quick_clean(df, config=None):
"""Quick clean with custom configuration""" # Default configuration default_config = {
'clean_headers': True,
'remove_dupes': True,
'numeric_fill_method': 'median',
'text_lowercase': True,
'remove_outliers': False,
'outlier_factor': 1.5 }
# Merge with user config if config:
default_config.update(config)
cfg = default_config
print(f"π§Ή Configurable quick clean with settings: {cfg}")
# Apply cleaning steps if cfg['clean_headers']:
df = clean_column_headers(df)
if cfg['remove_dupes']:
df = remove_duplicates(df)
# Clean numeric columns numeric_cols = df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
df = clean_numeric_column(df, col, fill_method=cfg['numeric_fill_method'])
if cfg['remove_outliers']:
df = remove_outliers_iqr(df, col, factor=cfg['outlier_factor'])
# Clean text columns text_cols = df.select_dtypes(include=['object']).columns
for col in text_cols:
df = clean_text_column(df, col, lowercase=cfg['text_lowercase'])
print("π Configurable quick clean completed!")
return df
# Usage:custom_config = {
'numeric_fill_method': 'mean',
'text_lowercase': False,
'remove_outliers': True}
# df_clean = configurable_quick_clean(df, custom_config)# Tip 2: Quick clean with detailed reportdef quick_clean_with_report(df):
"""Quick clean with comprehensive before/after report""" # Capture before state before = {
'shape': df.shape,
'missing': df.isnull().sum().sum(),
'duplicates': df.duplicated().sum(),
'memory': df.memory_usage(deep=True).sum() / 1024**2 }
# Apply quick clean df_clean = quick_clean(df)
# Capture after state after = {
'shape': df_clean.shape,
'missing': df_clean.isnull().sum().sum(),
'duplicates': df_clean.duplicated().sum(),
'memory': df_clean.memory_usage(deep=True).sum() / 1024**2 }
# Generate report print("\n" + "="*60)
print("π QUICK CLEAN REPORT")
print("="*60)
print(f"\nRows: {before['shape'][0]:6d} β {after['shape'][0]:6d} ({before['shape'][0] - after['shape'][0]:+d})")
print(f"Columns: {before['shape'][1]:6d} β {after['shape'][1]:6d} ({before['shape'][1] - after['shape'][1]:+d})")
print(f"Missing: {before['missing']:6d} β {after['missing']:6d} ({before['missing'] - after['missing']:+d})")
print(f"Duplicates: {before['duplicates']:6d} β {after['duplicates']:6d} ({before['duplicates'] - after['duplicates']:+d})")
print(f"Memory: {before['memory']:6.2f} β {after['memory']:6.2f} MB ({before['memory'] - after['memory']:+.2f} MB)")
print("="*60)
return df_clean
# Tip 3: Timed quick cleandef timed_quick_clean(df):
"""Quick clean with execution time""" import time
print("β±οΈ Starting timed quick clean...")
start_time = time.time()
df_clean = quick_clean(df)
elapsed = time.time() - start_time
rows_per_second = len(df) / elapsed if elapsed > 0 else 0 print(f"\nβ±οΈ Completed in {elapsed:.3f} seconds")
print(f" ({rows_per_second:.0f} rows/second)")
return df_clean
# Tip 4: Quick clean with rollbackdef quick_clean_with_rollback(df):
"""Quick clean with ability to rollback on error""" df_backup = df.copy()
try:
df_clean = quick_clean(df)
print("β
Quick clean succeeded!")
return df_clean
except Exception as e:
print(f"β Quick clean failed: {e}")
print("β©οΈ Rolling back to original data...")
return df_backup
# Usage examples:# df_clean = configurable_quick_clean(df, custom_config)# df_clean = quick_clean_with_report(df)# df_clean = timed_quick_clean(df)# df_clean = quick_clean_with_rollback(df)
"""Complete Titanic Data Cleaning Workflow========================================This example demonstrates a full professional data cleaning pipeline."""import pandas as pd
import numpy as np
from pandas_data_cleaning import *import matplotlib.pyplot as plt
# ============================================================================# STEP 1: LOAD AND INSPECT DATA# ============================================================================print("="*80)
print("STEP 1: LOAD AND INSPECT DATA")
print("="*80)
# Load the dirty datadf = pd.read_csv('titanic_dirty.csv')
# Initial inspectionprint("\nπ Initial Data Inspection:")
print_data_summary(df)
# Calculate initial data quality scoreinitial_quality = calculate_data_quality_score(df)
# Save original for comparisondf_original = df.copy()
# Initialize progress trackertracker = DataCleaningTracker(df, "Titanic Dataset")
# ============================================================================# STEP 2: CLEAN COLUMN HEADERS# ============================================================================print("\n" + "="*80)
print("STEP 2: CLEAN COLUMN HEADERS")
print("="*80)
# Show original column namesprint("\nOriginal columns:", df.columns.tolist())
# Clean headersdf = clean_column_headers(df)
# Show cleaned column namesprint("Cleaned columns:", df.columns.tolist())
tracker.add_checkpoint(df, "Cleaned column headers")
# ============================================================================# STEP 3: HANDLE DUPLICATES# ============================================================================print("\n" + "="*80)
print("STEP 3: HANDLE DUPLICATES")
print("="*80)
# Analyze duplicates before removingprint("\nπ Analyzing duplicates...")
duplicates = df[df.duplicated(keep=False)]
if len(duplicates) > 0:
print(f"Found {len(duplicates)} duplicate entries")
print(duplicates[['passenger_id', 'name', 'age']].sort_values('name'))
# Remove duplicatesdf = remove_duplicates(df)
tracker.add_checkpoint(df, "Removed duplicates")
# ============================================================================# STEP 4: CLEAN NUMERIC COLUMNS# ============================================================================print("\n" + "="*80)
print("STEP 4: CLEAN NUMERIC COLUMNS")
print("="*80)
# Clean age columnprint("\nπ’ Cleaning 'age' column...")
print(f"Before: {df['age'].dtype}, Missing: {df['age'].isna().sum()}")
df = clean_numeric_column(
df,
column='age',
fill_method='median',
remove_negatives=False,
round_decimals=0)
print(f"After: {df['age'].dtype}, Missing: {df['age'].isna().sum()}")
print(f"Age range: {df['age'].min():.0f} to {df['age'].max():.0f}")
# Clean fare columnprint("\nπ’ Cleaning 'fare' column...")
print(f"Before: {df['fare'].dtype}, Missing: {df['fare'].isna().sum()}")
df = clean_numeric_column(
df,
column='fare',
fill_method='median',
remove_negatives=True,
round_decimals=2)
print(f"After: {df['fare'].dtype}, Missing: {df['fare'].isna().sum()}")
print(f"Fare range: Β£{df['fare'].min():.2f} to Β£{df['fare'].max():.2f}")
tracker.add_checkpoint(df, "Cleaned numeric columns")
# ============================================================================# STEP 5: HANDLE OUTLIERS# ============================================================================print("\n" + "="*80)
print("STEP 5: HANDLE OUTLIERS")
print("="*80)
# Detect outliers in fareprint("\nπ― Analyzing outliers in 'fare'...")
outliers = detect_outliers_iqr(df, 'fare', factor=1.5)
print(f"Outliers detected: {outliers.sum()} ({outliers.sum()/len(df)*100:.1f}%)")
if outliers.sum() > 0:
print("\nOutlier fares:")
print(df[outliers][['name', 'fare', 'pclass']].sort_values('fare', ascending=False))
# For this example, we'll cap outliers instead of removing # (preserving passenger records is important!) Q1 = df['fare'].quantile(0.25)
Q3 = df['fare'].quantile(0.75)
IQR = Q3 - Q1
upper_bound = Q3 + 1.5 * IQR
df['fare'] = df['fare'].clip(upper=upper_bound)
print(f"\nβ
Capped {outliers.sum()} outlier fares at Β£{upper_bound:.2f}")
tracker.add_checkpoint(df, "Handled outliers")
# ============================================================================# STEP 6: CLEAN TEXT COLUMNS# ============================================================================print("\n" + "="*80)
print("STEP 6: CLEAN TEXT COLUMNS")
print("="*80)
# Clean name columnprint("\nπ Cleaning 'name' column...")
print(f"Sample before: {df['name'].head(3).tolist()}")
df = clean_text_column(
df,
column='name',
lowercase=True,
remove_extra_spaces=True,
standardize_missing=True)
print(f"Sample after: {df['name'].head(3).tolist()}")
# Clean cabin columnprint("\nπ Cleaning 'cabin' column...")
print(f"Unique values before: {df['cabin'].nunique()}")
df = clean_text_column(
df,
column='cabin',
lowercase=False, # Keep cabin codes in original case remove_extra_spaces=True,
standardize_missing=True)
print(f"Unique values after: {df['cabin'].nunique()}")
# Clean embarked columnprint("\nπ Cleaning 'embarked' column...")
print(f"Values before: {df['embarked'].unique()}")
df = clean_text_column(
df,
column='embarked',
lowercase=True,
remove_extra_spaces=True,
standardize_missing=True)
print(f"Values after: {df['embarked'].unique()}")
tracker.add_checkpoint(df, "Cleaned text columns")
# ============================================================================# STEP 7: HANDLE REMAINING MISSING VALUES# ============================================================================print("\n" + "="*80)
print("STEP 7: HANDLE REMAINING MISSING VALUES")
print("="*80)
# Check remaining missing valuesmissing_summary = df.isnull().sum()
missing_summary = missing_summary[missing_summary > 0]
if len(missing_summary) > 0:
print("\nπ Remaining missing values:")
for col, count in missing_summary.items():
pct = count / len(df) * 100 print(f" {col}: {count} ({pct:.1f}%)")
# Fill cabin with 'Unknown' (many passengers didn't have cabin assignments) if 'cabin' in missing_summary.index:
df = fill_missing_values(df, 'cabin', method='custom', custom_value='Unknown')
# Fill embarked with mode if 'embarked' in missing_summary.index:
df = fill_missing_values(df, 'embarked', method='mode')
# Fill survived with mode (if missing) if 'survived' in missing_summary.index:
df = fill_missing_values(df, 'survived', method='mode')
else:
print("\nβ
No missing values remaining!")
tracker.add_checkpoint(df, "Filled remaining missing values")
# ============================================================================# STEP 8: FINAL VALIDATION AND REPORT# ============================================================================print("\n" + "="*80)
print("STEP 8: FINAL VALIDATION AND REPORT")
print("="*80)
# Final data summaryprint("\nπ Final Data Summary:")
print_data_summary(df)
# Calculate final data quality scorefinal_quality = calculate_data_quality_score(df)
# Show improvementprint(f"\nπ Quality Improvement:")
print(f" Initial score: {initial_quality:.1f}/100")
print(f" Final score: {final_quality:.1f}/100")
print(f" Improvement: +{final_quality - initial_quality:.1f} points")
# Show cleaning progresstracker.print_progress()
# Compare before and afterprint("\n" + "="*80)
print("BEFORE vs AFTER COMPARISON")
print("="*80)
compare_datasets(df_original, df, "Original", "Cleaned")
# ============================================================================# STEP 9: SAVE CLEANED DATA# ============================================================================print("\n" + "="*80)
print("STEP 9: SAVE CLEANED DATA")
print("="*80)
# Save to CSVdf.to_csv('titanic_cleaned.csv', index=False)
print("\nβ
Saved cleaned data to 'titanic_cleaned.csv'")
# Save cleaning reportwith open('cleaning_report.txt', 'w') as f:
f.write("TITANIC DATASET CLEANING REPORT\n")
f.write("="*80 + "\n\n")
f.write(f"Initial rows: {len(df_original)}\n")
f.write(f"Final rows: {len(df)}\n")
f.write(f"Rows removed: {len(df_original) - len(df)}\n\n")
f.write(f"Initial missing values: {df_original.isnull().sum().sum()}\n")
f.write(f"Final missing values: {df.isnull().sum().sum()}\n\n")
f.write(f"Initial duplicates: {df_original.duplicated().sum()}\n")
f.write(f"Final duplicates: {df.duplicated().sum()}\n\n")
f.write(f"Quality score: {initial_quality:.1f} β {final_quality:.1f}\n")
print("β
Saved cleaning report to 'cleaning_report.txt'")
print("\nπ DATA CLEANING COMPLETE!")
print("="*80)
================================================================================
TITANIC DATASET CLEANING COMPLETE!
================================================================================
Summary of Changes:
β’ Column headers: Standardized to lowercase with underscores
β’ Duplicates: Removed 2 duplicate rows
β’ Numeric columns: Cleaned age, fare (filled missing, removed problematic values)
β’ Outliers: Capped extreme fare values
β’ Text columns: Standardized name, cabin, embarked
β’ Missing values: Filled using appropriate methods
β’ Quality score: Improved from 67.3 to 94.8
Files Created:
β
titanic_cleaned.csv - Clean dataset ready for analysis
β
cleaning_report.txt - Detailed cleaning documentation
β
Multiple visualization files showing before/after comparisons
Next Steps:
1. Review the cleaned data: pd.read_csv('titanic_cleaned.csv')
2. Proceed with exploratory data analysis (EDA)
3. Build predictive models
4. Generate insights and visualizations
# Create a cleaning logcleaning_log = {
'age': {
'action': 'filled_missing',
'method': 'median',
'value': 28.0,
'reason': 'Median is robust to outliers in age distribution',
'affected_rows': 177 },
'cabin': {
'action': 'filled_missing',
'method': 'custom',
'value': 'Unknown',
'reason': 'Many passengers did not have cabin assignments',
'affected_rows': 687 }
}
# Save to fileimport json
with open('cleaning_decisions.json', 'w') as f:
json.dump(cleaning_log, f, indent=2)
def validate_cleaning_step(df, step_name):
"""Validate data after each cleaning step""" issues = []
# Check for NaNs in unexpected places if df.isnull().all().any():
cols = df.columns[df.isnull().all()].tolist()
issues.append(f"Columns with all NaNs: {cols}")
# Check for empty dataframe if len(df) == 0:
issues.append("DataFrame is empty!")
# Check for duplicate columns if len(df.columns) != len(set(df.columns)):
issues.append("Duplicate column names detected")
if issues:
print(f"\nβ οΈ Issues after '{step_name}':")
for issue in issues:
print(f" β’ {issue}")
return False else:
print(f"β
Validation passed for '{step_name}'")
return True# Usage:df = clean_column_headers(df)
validate_cleaning_step(df, "Header cleaning")
from typing import Dict, List, Optional
def custom_cleaning_function(
df: pd.DataFrame,
numeric_cols: List[str],
text_cols: List[str],
config: Optional[Dict] = None) -> pd.DataFrame:
""" Custom cleaning function with clear documentation. Args: df: Input DataFrame to clean numeric_cols: List of numeric column names text_cols: List of text column names config: Optional configuration dictionary Returns: Cleaned DataFrame Example: >>> df_clean = custom_cleaning_function( ... df, ... numeric_cols=['age', 'fare'], ... text_cols=['name', 'cabin'] ... ) """ # Implementation here pass
def clean_financial_data(df):
"""Best practices for cleaning financial data""" # 1. Preserve precision for col in ['price', 'revenue', 'profit']:
if col in df.columns:
df = clean_numeric_column(df, col, round_decimals=4)
# 2. Handle negative values carefully (they might be valid!) # Don't use remove_negatives=True for profit/loss columns # 3. Check for currency consistency # Make sure all values are in the same currency # 4. Handle missing trading days (use forward fill) if 'date' in df.columns:
df = df.sort_values('date')
for col in df.select_dtypes(include=[np.number]).columns:
df[col] = df[col].fillna(method='ffill')
return df
def clean_medical_data(df):
"""Best practices for cleaning medical data""" # 1. Preserve case in medical codes (ICD, CPT) for col in ['icd_code', 'cpt_code', 'diagnosis']:
if col in df.columns:
df = clean_text_column(df, col, lowercase=False)
# 2. Handle sensitive data carefully # Don't log or display patient identifiable information # 3. Be conservative with outliers # Extreme values might be real medical conditions, not errors # Use factor=3.0 instead of 1.5 for outlier detection # 4. Standardize date formats # Medical data often has multiple date columns date_cols = ['admission_date', 'discharge_date', 'birth_date']
for col in date_cols:
if col in df.columns:
df[col] = pd.to_datetime(df[col], errors='coerce')
return df
def clean_ecommerce_data(df):
"""Best practices for cleaning e-commerce data""" # 1. Standardize product names if 'product_name' in df.columns:
df = clean_text_column(df, 'product_name', lowercase=True)
# 2. Clean price data if 'price' in df.columns:
df = clean_numeric_column(df, 'price', fill_method='median',
remove_negatives=True, round_decimals=2)
# 3. Handle quantity (must be positive integer) if 'quantity' in df.columns:
df = clean_numeric_column(df, 'quantity', fill_method='zero',
remove_negatives=True, round_decimals=0)
# 4. Standardize categories if 'category' in df.columns:
df['category'] = df['category'].str.lower().str.strip()
df['category'] = df['category'].str.replace('_', ' ')
return df
# 1. Use appropriate data typesdef optimize_dtypes(df):
"""Reduce memory usage by optimizing data types""" # Convert integer columns to smallest possible int type for col in df.select_dtypes(include=['int64']).columns:
if df[col].min() >= 0:
df[col] = df[col].astype('uint32') # Unsigned if no negatives else:
df[col] = df[col].astype('int32')
# Convert float columns to float32 for col in df.select_dtypes(include=['float64']).columns:
df[col] = df[col].astype('float32')
# Convert object columns with few unique values to category for col in df.select_dtypes(include=['object']).columns:
if df[col].nunique() / len(df) < 0.5: # Less than 50% unique df[col] = df[col].astype('category')
return df
# 2. Process in chunks for very large filesdef clean_large_csv(filename, chunksize=10000):
"""Clean large CSV file in chunks""" cleaned_chunks = []
for chunk in pd.read_csv(filename, chunksize=chunksize):
chunk = clean_column_headers(chunk)
chunk = remove_duplicates(chunk)
# Add other cleaning steps... cleaned_chunks.append(chunk)
df_clean = pd.concat(cleaned_chunks, ignore_index=True)
return df_clean
def create_cleaning_pipeline(*cleaning_functions):
""" Create a reusable cleaning pipeline from multiple functions. Example: pipeline = create_cleaning_pipeline( clean_column_headers, remove_duplicates, lambda df: clean_numeric_column(df, 'age'), lambda df: clean_text_column(df, 'name') ) df_clean = pipeline(df) """ def pipeline(df):
for func in cleaning_functions:
df = func(df)
return df
return pipeline
# Usage:my_pipeline = create_cleaning_pipeline(
clean_column_headers,
remove_duplicates,
lambda df: clean_numeric_column(df, 'age', fill_method='median'),
lambda df: clean_text_column(df, 'name', lowercase=True)
)
df_clean = my_pipeline(df)
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β PANDAS DATA CLEANING QUICK REFERENCE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β COLUMN HEADERS β
β ββ clean_column_headers(df) β
β ββ Standardize column names β
β β
β NUMERIC DATA β
β ββ clean_numeric_column(df, 'col', fill_method='median')β
β ββ Handle missing values, negatives, decimals β
β β
β DUPLICATES β
β ββ remove_duplicates(df, subset=['col'], keep='first') β
β ββ Remove duplicate rows β
β β
β MISSING VALUES β
β ββ fill_missing_values(df, 'col', method='median') β
β ββ Fill using statistical methods β
β β
β OUTLIERS β
β ββ detect_outliers_iqr(df, 'col', factor=1.5) β
β ββ remove_outliers_iqr(df, 'col', factor=1.5) β
β ββ Identify and remove outliers β
β β
β TEXT DATA β
β ββ clean_text_column(df, 'col', lowercase=True) β
β ββ Standardize text formatting β
β β
β INSPECTION β
β ββ print_data_summary(df) β
β ββ get_data_summary(df) β
β ββ Comprehensive data overview β
β β
β QUICK CLEAN β
β ββ quick_clean(df) β
β ββ Automated cleaning pipeline β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ