229 lines
7.6 KiB
Python
229 lines
7.6 KiB
Python
#!/usr/bin/env python3
|
|
|
|
# Admin log dump script
|
|
# Dumps existing logs, optionally compressed, and optionally deletes them
|
|
|
|
import argparse
|
|
import os
|
|
import psycopg2
|
|
import gzip
|
|
import datetime
|
|
import calendar
|
|
import json
|
|
|
|
LATEST_DB_MIGRATION = "20250211131517_LoadoutNames"
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Dumps admin logs into files by months and optionally deletes them from a postgres DB.")
|
|
parser.add_argument("out_dir", help="Directory to output data dumps into.")
|
|
parser.add_argument("--date", help="Date to save/remove info until, must be in ISO format - time zone if unspecified will be UTC. Defaults to midnight, UTC, on the beginning of the month, 6 calendar months ago.")
|
|
parser.add_argument("--compress", action="store_true", help="If set, compresses the contents of the file in .gzip format.")
|
|
parser.add_argument("--delete", action="store_true", help="If set, deletes the contents of the tables after writing the output.")
|
|
parser.add_argument("--ignore-schema-mismatch", action="store_true", help="If set, ignores that the DB does not match the expected schema.")
|
|
parser.add_argument("--connection-string", required=True, help="Database connection string to use. See https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING")
|
|
|
|
args = parser.parse_args()
|
|
|
|
arg_output: str = args.out_dir
|
|
|
|
if not os.path.exists(arg_output):
|
|
print(f"Creating output directory {arg_output} (doesn't exist yet)")
|
|
os.mkdir(arg_output)
|
|
|
|
# Get our old time
|
|
if args.date is None:
|
|
today = datetime.datetime.now()
|
|
if today.month > 6:
|
|
month = today.month - 6
|
|
year = today.year
|
|
else:
|
|
month = today.month + 6
|
|
year = today.year - 1
|
|
end_date: "datetime.datetime" = datetime.datetime(year, month, 1, tzinfo=datetime.timezone.utc)
|
|
else:
|
|
end_date: "datetime.datetime" = datetime.datetime.fromisoformat(args.date)
|
|
if end_date.tzinfo is None:
|
|
end_date = end_date.astimezone(datetime.timezone.utc)
|
|
|
|
compressed_string = "compressed" if args.compress else "uncompressed"
|
|
print(f"Exporting {compressed_string} admin logs until {end_date}.")
|
|
|
|
conn = psycopg2.connect(args.connection_string)
|
|
cur = conn.cursor()
|
|
|
|
# Find oldest dated entry - hack: discard time zone info
|
|
oldest_record = get_oldest_admin_log(cur)
|
|
oldest_record = oldest_record.astimezone(None)
|
|
|
|
# From this, create your intervals up to the deleted time.
|
|
if oldest_record > end_date:
|
|
print(f"Nothing to export. Oldest record {oldest_record} is older than given date {end_date}.")
|
|
return
|
|
|
|
first_record_time = datetime.datetime(oldest_record.year, oldest_record.month, oldest_record.day, tzinfo=datetime.timezone.utc)
|
|
old_date = first_record_time
|
|
months_to_add = 1
|
|
|
|
while old_date < end_date:
|
|
new_date = add_months(first_record_time, months_to_add)
|
|
if new_date > end_date:
|
|
new_date = end_date
|
|
|
|
dump_admin_in_range(cur, old_date, new_date, arg_output, args.compress, args.delete)
|
|
|
|
# Ensure modifications go through (or if not deleting, that temp table is destroyed)
|
|
conn.commit()
|
|
|
|
old_date = new_date
|
|
months_to_add += 1
|
|
|
|
# Taken from https://stackoverflow.com/questions/4130922/ (thank you, David Webb)
|
|
def add_months(date_in: "datetime.datetime", months: int) -> datetime.datetime:
|
|
month = date_in.month - 1 + months
|
|
year = date_in.year + date_in.month // 12
|
|
month = date_in.month % 12 + 1
|
|
day = min(date_in.day, calendar.monthrange(year, month)[1])
|
|
return datetime.datetime(year, month, day, tzinfo=datetime.timezone.utc)
|
|
|
|
def check_schema_version(cur: "psycopg2.cursor", ignore_mismatch: bool):
|
|
cur.execute('SELECT "MigrationId" FROM "__EFMigrationsHistory" ORDER BY "__EFMigrationsHistory" DESC LIMIT 1')
|
|
schema_version = cur.fetchone()
|
|
if schema_version == None:
|
|
print("Unable to read database schema version.")
|
|
exit(1)
|
|
|
|
if schema_version[0] != LATEST_DB_MIGRATION:
|
|
print(f"Unsupported schema version of DB: '{schema_version[0]}'. Supported: {LATEST_DB_MIGRATION}")
|
|
if ignore_mismatch:
|
|
return
|
|
exit(1)
|
|
|
|
|
|
def get_oldest_admin_log(cur: "psycopg2.cursor") -> "datetime.datetime":
|
|
cur.execute('SELECT "date" FROM "admin_log" ORDER BY "date" LIMIT 1')
|
|
admin_date = cur.fetchone()
|
|
if admin_date == None:
|
|
print("No admin logs to read.")
|
|
exit(0)
|
|
|
|
return admin_date[0]
|
|
|
|
|
|
def dump_admin_in_range(cur: "psycopg2.cursor", start: "datetime.datetime", end: "datetime.datetime", outdir: str, compress: bool, delete: bool):
|
|
date_suffix = f"{start.strftime('%Y%m%d')}-{end.strftime('%Y%m%d')}"
|
|
print() # Newline
|
|
|
|
# Create a temp table for our admin log rows of interest, make sure it drops on commit.
|
|
cur.execute("""
|
|
CREATE TEMP TABLE admin_dump
|
|
ON COMMIT DROP
|
|
AS
|
|
(SELECT
|
|
admin_log_id, round_id
|
|
FROM
|
|
admin_log
|
|
WHERE
|
|
date >= %s AND date < %s
|
|
)
|
|
""", (start, end))
|
|
|
|
# Export admin_log_player
|
|
print(f"Dumping admin_log_player from {start.date()} to {end.date()}...")
|
|
|
|
if compress:
|
|
file_obj = gzip.GzipFile(os.path.join(outdir, f"admin_log_player-{date_suffix}.json.gz"), "w")
|
|
else:
|
|
file_obj = open(os.path.join(outdir, f"admin_log_player-{date_suffix}.json"), "w")
|
|
|
|
file_obj.write("[".encode("utf-8"))
|
|
|
|
cur.execute("""
|
|
SELECT
|
|
json_agg(to_jsonb(alp.*))
|
|
FROM
|
|
admin_log_player alp JOIN admin_dump ad
|
|
ON
|
|
alp.log_id = ad.admin_log_id AND alp.round_id = ad.round_id
|
|
GROUP BY alp.round_id, alp.log_id
|
|
ORDER BY alp.round_id, alp.log_id
|
|
""")
|
|
|
|
first_row = True
|
|
while True:
|
|
data = cur.fetchmany(500)
|
|
if len(data) <= 0:
|
|
break
|
|
|
|
for row in data:
|
|
# Strip braces off content, add a comma if we're writing to the same file.
|
|
if not first_row:
|
|
file_obj.write(", ".encode('utf-8'))
|
|
else:
|
|
first_row = False
|
|
file_obj.write(json.dumps(row[0][0]).encode('utf-8'))
|
|
|
|
file_obj.write("]".encode("utf-8"))
|
|
|
|
# Export admin_log
|
|
offset = 0
|
|
more_rows = True
|
|
|
|
if compress:
|
|
file_obj = gzip.GzipFile(os.path.join(outdir, f"admin_log-{date_suffix}.json.gz"), "w")
|
|
else:
|
|
file_obj = open(os.path.join(outdir, f"admin_log-{date_suffix}.json"), "w")
|
|
|
|
file_obj.write("[".encode("utf-8"))
|
|
|
|
print(f"Dumping admin_log from {start.date()} to {end.date()}...")
|
|
cur.execute("""
|
|
SELECT
|
|
json_agg(to_jsonb(al.*))
|
|
FROM
|
|
admin_log al JOIN admin_dump ad
|
|
ON
|
|
al.admin_log_id = ad.admin_log_id AND al.round_id = ad.round_id
|
|
GROUP BY al.round_id, al.admin_log_id
|
|
ORDER BY al.round_id, al.admin_log_id
|
|
""")
|
|
|
|
first_row = True
|
|
while True:
|
|
data = cur.fetchmany(500)
|
|
if len(data) <= 0:
|
|
break
|
|
|
|
# Strip braces off content, add a comma if we're writing to the same file.
|
|
for row in data:
|
|
if not first_row:
|
|
file_obj.write(", ".encode('utf-8'))
|
|
else:
|
|
first_row = False
|
|
file_obj.write(json.dumps(row[0][0]).encode('utf-8'))
|
|
|
|
file_obj.write("]".encode("utf-8"))
|
|
|
|
if delete:
|
|
# Delete admin_log_player
|
|
print(f"Deleting admin_log_player from {start.date()} to {end.date()}...")
|
|
cur.execute("""
|
|
DELETE FROM
|
|
admin_log_player alp
|
|
USING
|
|
admin_dump ad
|
|
WHERE
|
|
alp.log_id = ad.admin_log_id AND alp.round_id = ad.round_id
|
|
""")
|
|
|
|
# Delete admin_log
|
|
print(f"Deleting admin_log from {start.date()} to {end.date()}...")
|
|
cur.execute("""
|
|
DELETE FROM
|
|
admin_log
|
|
WHERE
|
|
date < %s
|
|
""", (end,))
|
|
|
|
|
|
main()
|