Bet Angel Excel Cleaner

The Bet Angel API makes it easy for you to further enhance your betting and trading by integrating your own code into Bet Angel
Post Reply
Diginet
Posts: 130
Joined: Tue Jul 09, 2013 1:43 pm

Hi,

I worked on this code to clean-up excel files extracted by Guardian to make them easier to read. I don't really use it anymore as I have gathered enough data for what I need. I have always found these forums really helpful and I know some people like to collect data. You will need to adjust the headers for what you need. I have included my .baf file which should work as a default. My Excel file are to big to post but you should end up with something like the Excel screenshot posted below


Tips
  • Prefix the Form with ' or excel or it can confuse form with dates.
    Betangel only writes around 13 or 14 columns before it starts new rows it will mess up the file if you try to add more columns/data points
    Set the parameters like in the screen shot
    You need to change the folder in the py file where it says YOUR EXPORTS FOLDER HERE
# Cleaner.py
import os
import glob
import shutil
import pandas as pd

# CONFIG - set to your folder
INPUT_FOLDER = r"YOUR EXPORTS FOLDER HERE"

# If True, original files will be moved to INPUT_FOLDER\backup before being overwritten.
MAKE_BACKUP = True

# Your 27 defined headers
HEADERS = [
"Time", "Market", "DELETE", "Runner", "DELETE", "DELETE", "Claiming", "DELETE", "Days Since Last Ran", "DELETE", "Form", "DELETE", "Jockey", "DELETE", "Position In Range", "DELETE", "Trainer", "DELETE", "Wearing", "DELETE", "Last Traded Price", "DELETE", "Volume Traded At Price", "DELETE", "Volume Of Selection", "DELETE", "Volume Of The Market", "DELETE", "Volume% Of Selection", "DELETE", "Highest Traded Price", "DELETE", "Lowest Traded Price", "DELETE", "Best Lay Price", "DELETE", "2nd Best Lay Price", "DELETE", "3rd Best Lay Price", "DELETE", "Best Back Price", "DELETE", "2nd Best Back Price", "DELETE", "3rd Best Back Price", "DELETE", "VWAP"
]

def process_file(path):
fname = os.path.basename(path)
print(f"Processing: {fname}")

# Read file skipping the first row (the original header row)
try:
df = pd.read_csv(path, header=None, skiprows=1, encoding='utf-8', engine='python', dtype=str)
except Exception:
try:
df = pd.read_csv(path, header=None, skiprows=1, encoding='latin-1', engine='python', dtype=str)
except Exception as e:
print(f" ERROR reading {fname}: {e}")
return

actual_col_count = df.shape[1]
print(f" Columns found in file: {actual_col_count}")

# Create a dynamic header list to match the file exactly
if actual_col_count > len(HEADERS):
extra_count = actual_col_count - len(HEADERS)
headers_to_use = HEADERS + (["DELETE"] * extra_count)
print(f" HEADERS extended by {extra_count} 'DELETE' entries.")
else:
headers_to_use = HEADERS[:actual_col_count]
if len(HEADERS) > actual_col_count:
print(f" HEADERS trimmed to {actual_col_count} entries.")

df.columns = headers_to_use

# Drop all columns labeled "DELETE"
df_clean = df.loc[:, df.columns != "DELETE"].copy()

# Robust whitespace trimming for string/object columns (avoid applymap)
for col in df_clean.columns:
if df_clean[col].dtype == object or df_clean[col].dtype.name == "string":
df_clean[col] = df_clean[col].fillna("").astype(str).str.strip()

# Backup original
if MAKE_BACKUP:
backup_dir = os.path.join(INPUT_FOLDER, "backup")
os.makedirs(backup_dir, exist_ok=True)
shutil.copy2(path, os.path.join(backup_dir, fname))
print(f" Backup created in: backup/{fname}")

# Save the cleaned version back to the original location
try:
df_clean.to_csv(path, index=False, encoding='utf-8')
print(f" Success: {fname} cleaned. (Columns kept: {len(df_clean.columns)})")
except Exception as e:
print(f" ERROR saving {fname}: {e}")

def main():
if not os.path.isdir(INPUT_FOLDER):
print(f"Folder not found: {INPUT_FOLDER}")
return

files = glob.glob(os.path.join(INPUT_FOLDER, "*.csv"))
if not files:
print("No CSV files found.")
return

for f in files:
# Don't process files already in the backup folder
if os.path.normpath(f).lower().find(os.path.join(INPUT_FOLDER, "backup").lower()) != -1:
continue
process_file(f)

if __name__ == "__main__":
main()


THIS BIT WILL TAKE SOME SETTING UP AND KNOWLEDGE OF SQL

So this code will clean the data and upload to Postgres SQL it will take some setting up of SQL but I have put ****** where you need your own passcodes files ect

#!/usr/bin/env python3
"""
Merged Cleaner + Sync script with dropped-rows reporting, robust CSV diagnostics,
and validation that allows negative `position_in_range` values.

Save as: run_all_with_reports.py
Run:
python run_all_with_reports.py # clean then sync (default)
python run_all_with_reports.py --clean-only
python run_all_with_reports.py --sync-only
"""

import os
import glob
import shutil
import argparse
import datetime
import traceback
import csv
from collections import Counter

import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.dialects.postgresql import insert

# ---------------------
# CONFIGURATION
# ---------------------
DB_NAME = "*******"
DB_USER = "*******"
DB_PASS = "*******"
DB_HOST = "*******"
DB_PORT = "*******"

CSV_FOLDER = r"YOUR EXPORTS FOLDER HERE"
PROCESSED_FOLDER = os.path.join(CSV_FOLDER, "Processed")
BACKUP_FOLDER = os.path.join(CSV_FOLDER, "backup")
REPORTS_FOLDER = os.path.join(CSV_FOLDER, "reports")

MAKE_BACKUP = True

# Original header mapping used by the cleaner (keeps only non-DELETE columns)
HEADERS = [
"Time", "Market", "DELETE", "Runner", "DELETE", "DELETE", "Claiming", "DELETE",
"Days Since Last Ran", "DELETE", "Form", "DELETE", "Jockey", "DELETE",
"Position In Range", "DELETE", "Trainer", "DELETE", "Wearing", "DELETE",
"Last Traded Price", "DELETE", "Volume Traded At Price", "DELETE", "Volume Of Selection",
"DELETE", "Volume Of The Market", "DELETE", "Volume% Of Selection", "DELETE",
"Highest Traded Price", "DELETE", "Lowest Traded Price", "DELETE", "Best Lay Price",
"DELETE", "2nd Best Lay Price", "DELETE", "3rd Best Lay Price", "DELETE",
"Best Back Price", "DELETE", "2nd Best Back Price", "DELETE", "pnl",
"DELETE", "VWAP"
]

# Ensure folders exist
os.makedirs(PROCESSED_FOLDER, exist_ok=True)
os.makedirs(REPORTS_FOLDER, exist_ok=True)
if MAKE_BACKUP:
os.makedirs(BACKUP_FOLDER, exist_ok=True)

# SQLAlchemy engine
engine = create_engine(f'postgresql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}')

# ---------------------
# Utilities
# ---------------------
def ts():
return datetime.datetime.utcnow().strftime("%Y%m%d_%H%M%S")

def safe_report_path(prefix, fname):
base = os.path.splitext(fname)[0]
return os.path.join(REPORTS_FOLDER, f"{prefix}_{ts()}_{base}.csv")

# ---------------------
# Cleaner (converts "wide" CSVs produced by BetAngel)
# ---------------------
def clean_file(path: str):
"""
Reads a raw export, removes 'DELETE' columns using the HEADERS template,
trims whitespace, optionally creates a backup, and writes the cleaned CSV back.
"""
fname = os.path.basename(path)
print(f"[CLEAN] Processing: {fname}")
try:
df = pd.read_csv(path, header=None, skiprows=1, encoding='utf-8', engine='python', dtype=str)
except Exception:
try:
df = pd.read_csv(path, header=None, skiprows=1, encoding='latin-1', engine='python', dtype=str)
except Exception as e:
print(f" [CLEAN][ERROR] reading {fname}: {e}")
return False

actual_col_count = df.shape[1]
if actual_col_count > len(HEADERS):
extra_count = actual_col_count - len(HEADERS)
headers_to_use = HEADERS + (["DELETE"] * extra_count)
print(f" [CLEAN] HEADERS extended by {extra_count} 'DELETE' entries.")
else:
headers_to_use = HEADERS[:actual_col_count]
if len(HEADERS) > actual_col_count:
print(f" [CLEAN] HEADERS trimmed to {actual_col_count} entries.")

df.columns = headers_to_use

# Keep only non-DELETE columns
df_clean = df.loc[:, df.columns != "DELETE"].copy()

# Trim whitespace for object columns
for col in df_clean.columns:
if df_clean[col].dtype == object or df_clean[col].dtype.name == "string":
df_clean[col] = df_clean[col].fillna("").astype(str).str.strip()

# Backup original
if MAKE_BACKUP:
try:
shutil.copy2(path, os.path.join(BACKUP_FOLDER, fname))
print(f" [CLEAN] Backup created: backup/{fname}")
except Exception as e:
print(f" [CLEAN][WARN] Could not backup {fname}: {e}")

# Overwrite cleaned CSV
try:
df_clean.to_csv(path, index=False, encoding='utf-8')
print(f" [CLEAN] Success: {fname} cleaned. (Columns kept: {len(df_clean.columns)})")
return True
except Exception as e:
print(f" [CLEAN][ERROR] saving {fname}: {e}")
return False

def run_cleaner():
if not os.path.isdir(CSV_FOLDER):
print(f"[CLEAN][ERROR] Folder not found: {CSV_FOLDER}")
return

files = glob.glob(os.path.join(CSV_FOLDER, "*.csv"))
if not files:
print("[CLEAN] No CSV files found.")
return

for f in files:
# Skip files already located in backup/processed/reports folders
lower_f = os.path.normpath(f).lower()
if os.path.normpath(BACKUP_FOLDER).lower() in lower_f or os.path.normpath(PROCESSED_FOLDER).lower() in lower_f or os.path.normpath(REPORTS_FOLDER).lower() in lower_f:
continue
clean_file(f)

# ---------------------
# Upsert helper for pandas.to_sql
# ---------------------
def upsert_method(table, conn, keys, data_iter):
data = [dict(zip(keys, row)) for row in data_iter]
stmt = insert(table.table).values(data)
on_conflict_stmt = stmt.on_conflict_do_nothing(index_elements=['time', 'market', 'runner'])
conn.execute(on_conflict_stmt)

# ---------------------
# Validation helpers
# ---------------------
def validate_row(row):
"""
Simple boolean validator used in lighter paths.
Allows negative position_in_range values; requires some core numeric fields non-negative.
"""
try:
if pd.isna(row.get('time', None)):
return False

market = row.get('market', None)
if not (isinstance(market, str) and market.strip()):
return False

runner = row.get('runner', None)
if not (isinstance(runner, str) and runner.strip()):
return False

# Non-negative checks for fields that shouldn't be negative
for col in ['last_traded_price', 'volume_of_selection', 'vwap']:
if col in row:
val = row[col]
if pd.isna(val):
return False
try:
if float(val) < 0:
return False
except Exception:
return False

# position_in_range: allow negatives and positives, but must exist
if 'position_in_range' in row:
if pd.isna(row['position_in_range']):
return False

return True
except Exception:
return False

def validate_row_with_reason(row):
"""
Return None if valid, otherwise a short reason string explaining why the row is invalid.
This is used to produce dropped_invalid reports.
"""
try:
if pd.isna(row.get('time', None)):
return "missing_time"
market = row.get('market', None)
if not (isinstance(market, str) and market.strip()):
return "invalid_market"
runner = row.get('runner', None)
if not (isinstance(runner, str) and runner.strip()):
return "invalid_runner"

# Non-negative checks for fields that shouldn't be negative
for col in ['last_traded_price', 'volume_of_selection', 'vwap']:
if col in row:
val = row[col]
if pd.isna(val):
return f"missing_{col}"
try:
if float(val) < 0:
return f"negative_{col}"
except Exception:
return f"bad_numeric_{col}"

# position_in_range: allow negatives and positives, but must exist and be numeric
if 'position_in_range' in row:
val = row['position_in_range']
if pd.isna(val):
return "missing_position_in_range"
try:
float(val)
except Exception:
return "bad_numeric_position_in_range"

# form: accept string or NaN
if 'form' in row:
val = row['form']
if val is not None and not (isinstance(val, str) or pd.isna(val)):
return "bad_form_type"

return None
except Exception:
return "validation_exception"

# ---------------------
# Diagnostics for malformed CSVs
# ---------------------
def diagnose_file_bad_rows(path, sample_bytes=8192, max_preview_lines=20):
"""
Try to detect delimiter and row-length distribution; write a diagnostic report
to REPORTS_FOLDER and return its path + an in-memory summary.
"""
try:
with open(path, "r", encoding="utf-8", errors="replace", newline="") as f:
sample = f.read(sample_bytes)
try:
dialect = csv.Sniffer().sniff(sample, delimiters=",\t;|")
delim = dialect.delimiter
except Exception:
delim = ','

f.seek(0)
counts = Counter()
preview = []
for i, row in enumerate(csv.reader(f, delimiter=delim)):
counts[len(row)] += 1
if i < max_preview_lines:
preview.append((i + 1, len(row), row))
# write human-readable diagnostic
base = os.path.splitext(os.path.basename(path))[0]
diag_path = os.path.join(REPORTS_FOLDER, f"diagnostic_rows_{ts()}_{base}.txt")
with open(diag_path, "w", encoding="utf-8") as outf:
outf.write(f"Detected delimiter: {repr(delim)}\n\n")
outf.write("First few lines (line_no, cols, sample row):\n")
for ln, length, row in preview:
outf.write(f"{ln}\tcols={length}\t{row}\n")
outf.write("\nRow length distribution (length:count):\n")
for length, cnt in counts.most_common():
outf.write(f"{length}: {cnt}\n")
return diag_path, {"detected_delimiter": delim, "preview": preview, "row_length_distribution": counts.most_common()}
except Exception as e:
return None, {"error": str(e)}

# ---------------------
# Robust run_sync with diagnostics
# ---------------------
def run_sync():
csv_files = glob.glob(os.path.join(CSV_FOLDER, "*.csv"))
if not csv_files:
print("[SYNC] No new CSV files found to sync.")
return

for file_path in csv_files:
fname = os.path.basename(file_path)
lower_fp = os.path.normpath(file_path).lower()
# skip processed/backup/reports folders
if os.path.normpath(BACKUP_FOLDER).lower() in lower_fp or os.path.normpath(PROCESSED_FOLDER).lower() in lower_fp or os.path.normpath(REPORTS_FOLDER).lower() in lower_fp:
continue

print(f"[SYNC] Processing {fname}...")
try:
# detect delimiter heuristically from file sample
with open(file_path, "r", encoding="utf-8", errors="replace", newline="") as fh:
sample = fh.read(8192)
try:
dialect = csv.Sniffer().sniff(sample, delimiters=",\t;|")
delim = dialect.delimiter
except Exception:
delim = ','

# read csv using detected delimiter; use engine='python' and skip bad lines to avoid crash
# pandas.on_bad_lines='skip' avoids exceptions from malformed rows (pandas >= 1.3)
df = pd.read_csv(file_path, sep=delim, engine='python', encoding='utf-8', on_bad_lines='skip')
original_count = len(df)

# Standardize column names
df.columns = [c.strip().lower().replace(" ", "_").replace("%", "_pct") for c in df.columns]

rename_map = {
"2nd_best_lay_price": "second_best_lay_price",
"3rd_best_lay_price": "third_best_lay_price",
"2nd_best_back_price": "second_best_back_price",
"3rd_best_back_price": "third_best_back_price",
# map both possible normalized CSV headers for the 3rd best back price into pnl
"third_best_back_price": "pnl",
"3rd_best_back_price": "pnl"
}
df = df.rename(columns=rename_map)

# Parse timestamps and add batch metadata
df['time'] = pd.to_datetime(df.get('time'), dayfirst=True, errors='coerce')
df['source_file'] = fname
df['batch_ts'] = pd.Timestamp.now()

# Convert numeric columns where present
numeric_cols = [
"claiming", "days_since_last_ran", "last_traded_price",
"volume_traded_at_price", "volume_of_selection", "volume_of_the_market",
"volume_pct_of_selection", "highest_traded_price", "lowest_traded_price",
"best_lay_price", "second_best_lay_price", "third_best_lay_price",
"best_back_price", "second_best_back_price", "third_best_back_price",
"vwap", "position_in_range", "pnl"
]
for col in numeric_cols:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors='coerce')

# ---------- Critical columns check & report ----------
critical_cols = ['position_in_range', 'vwap', 'volume_pct_of_selection']
# Ensure columns exist in df for the check
for c in critical_cols:
if c not in df.columns:
df[c] = pd.NA

mask_critical_ok = df[critical_cols].notna().all(axis=1)
dropped_critical = df[~mask_critical_ok].copy()
if not dropped_critical.empty:
report_path = safe_report_path("dropped_critical", fname)
dropped_critical.to_csv(report_path, index=False)
print(f" [SYNC] Dropped {len(dropped_critical)} rows due to missing critical columns. See {report_path}")

# keep only rows with critical cols present
df = df[mask_critical_ok].copy()

# ---------- rounding ----------
if 'vwap' in df.columns:
df['vwap'] = df['vwap'].round(2)
if 'volume_pct_of_selection' in df.columns:
df['volume_pct_of_selection'] = df['volume_pct_of_selection'].round(2)
if 'position_in_range' in df.columns:
df['position_in_range'] = df['position_in_range'].round(2)
if 'pnl' in df.columns:
df['pnl'] = df['pnl'].round(2)

# ---------- Final validation with detailed reasons ----------
df['validation_reason'] = df.apply(validate_row_with_reason, axis=1)
invalid_rows = df[df['validation_reason'].notna()].copy()
if not invalid_rows.empty:
report_path = safe_report_path("dropped_invalid", fname)
# Keep the reason column plus a few useful fields for debugging
cols_to_save = list(invalid_rows.columns)
invalid_rows.to_csv(report_path, index=False, columns=cols_to_save)
print(f" [SYNC] Dropped {len(invalid_rows)} rows based on validation rules. See {report_path}")

# keep only valid rows
df_valid = df[df['validation_reason'].isna()].copy().drop(columns=['validation_reason'])

final_count = len(df_valid)
print(f" [SYNC] Counts for {fname}: original={original_count}, after_critical={len(df)}, final_valid={final_count}")

if final_count == 0:
print(f" [SYNC] No valid rows to sync for {fname}.")
else:
# Upsert into DB
print(f" [SYNC] Syncing {final_count} rows to Database...")
df_valid.to_sql('marketdata', engine, if_exists='append', index=False, method=upsert_method, chunksize=500)

# Move original CSV to Processed (only if we performed any action)
dest_path = os.path.join(PROCESSED_FOLDER, fname)
if os.path.exists(dest_path):
os.remove(dest_path)
shutil.move(file_path, dest_path)
print(f" [SYNC] Done: {fname}")

except Exception as e:
# print full traceback for diagnosis
print(f" [SYNC][ERROR] {fname}: {e}")
traceback.print_exc()
# run lightweight diagnostic to help identify malformed CSV lines / delimiter issues
diag_path, report = diagnose_file_bad_rows(file_path)
if diag_path:
print(f" [SYNC][DIAG] Diagnostic written: {diag_path}")
else:
print(f" [SYNC][DIAG] Diagnostic failed: {report}")

# ---------------------
# CLI and runner
# ---------------------
def main():
parser = argparse.ArgumentParser(description="Cleaner + Sync utility for Bet Angel CSVs (with dropped-row reports and diagnostics).")
parser.add_argument('--clean-only', action='store_true', help='Run cleaner only.')
parser.add_argument('--sync-only', action='store_true', help='Run sync only.')
args = parser.parse_args()

if args.clean_only and args.sync_only:
print("Please choose either --clean-only or --sync-only, not both.")
return

if args.clean_only:
run_cleaner()
elif args.sync_only:
run_sync()
else:
# Default: clean then sync
run_cleaner()
run_sync()

if __name__ == "__main__":
main()

Cheers
You do not have the required permissions to view the files attached to this post.
Post Reply

Return to “Bet Angel - API”