From c537a84a2eb3caf1f1ac4232f48c9eadce9d714e Mon Sep 17 00:00:00 2001 From: Josh Patra <30350506+SoPat712@users.noreply.github.com> Date: Wed, 25 Dec 2024 18:37:22 -0500 Subject: [PATCH] timezone fixes --- bereal_exporter.py | 250 ++++++++++++++++----------------------------- 1 file changed, 88 insertions(+), 162 deletions(-) diff --git a/bereal_exporter.py b/bereal_exporter.py index 0f80e23..b509b91 100644 --- a/bereal_exporter.py +++ b/bereal_exporter.py @@ -11,8 +11,6 @@ from typing import Optional import pytz from exiftool import ExifToolHelper as et from PIL import Image, ImageDraw - -# Additional libraries for dynamic time zone lookup from timezonefinder import TimezoneFinder @@ -78,45 +76,10 @@ def init_parser() -> argparse.Namespace: return parser.parse_args() -class BasicLogger: - """ - A fallback / minimal logger that simply prints lines. - If not verbose, prints almost nothing (only error messages). - """ - - def __init__(self, verbose: bool): - self.verbose = verbose - - def print_log(self, text: str, force: bool = False): - if self.verbose or force: - print(text) - - def show_progress(self, iteration: int, total: int, prefix="", date_str=""): - """ - Overwrites a single line each time to show a basic progress bar. - If not verbose, we still do a minimal inline bar. - """ - if total == 0: - percent = 100 - else: - percent = int(100 * iteration / total) - - bar_length = 40 - filled_len = bar_length * iteration // max(1, total) - bar = "=" * filled_len + "-" * (bar_length - filled_len) - - line_str = f"{prefix} |{bar}| {percent}% - {date_str}" - sys.stdout.write("\r" + line_str) - sys.stdout.flush() - - if iteration == total: - print() # final newline - - class CursesLogger: """ When verbose is True and curses is available, we keep a multi-line log above - and a pinned progress bar at the bottom. + and a pinned progress bar at the bottom. This might restart if the window is resized too small. """ def __init__(self, stdscr): @@ -124,7 +87,7 @@ class CursesLogger: curses.curs_set(0) # hide cursor self.max_y, self.max_x = self.stdscr.getmaxyx() - self.log_height = self.max_y - 2 # keep bottom line(s) for progress bar + self.log_height = self.max_y - 2 # keep bottom line(s) for the progress bar # create log window self.logwin = curses.newwin(self.log_height, self.max_x, 0, 0) @@ -136,10 +99,11 @@ class CursesLogger: self.log_count = 0 def print_log(self, text: str, force: bool = False): + # Force doesn't matter in curses; we always show self.logwin.addstr(self.log_count, 0, text) self.logwin.clrtoeol() self.log_count += 1 - if self.log_count >= (self.log_height): + if self.log_count >= self.log_height: self.logwin.scroll(1) self.log_count -= 1 self.logwin.refresh() @@ -159,30 +123,65 @@ class CursesLogger: line_str = f"{prefix} |{bar}| {percent}% - {date_str}" self.pbwin.clear() + # Clip if the line is longer than the terminal self.pbwin.addstr(0, 0, line_str[: self.max_x - 1]) self.pbwin.refresh() -class ExporterBase: +class BasicLogger: """ - Common logic for BeRealExporter, including date range parsing, etc. + A fallback / minimal logger if curses fails or if verbose isn't set. """ - def __init__(self, args: argparse.Namespace): + def __init__(self, verbose: bool): + self.verbose = verbose + + def print_log(self, text: str, force: bool = False): + if self.verbose or force: + print(text) + + def show_progress(self, iteration: int, total: int, prefix="", date_str=""): + # Overwrites one line with a simple bar + if total == 0: + percent = 100 + else: + percent = int(100 * iteration / total) + + bar_length = 40 + filled_len = bar_length * iteration // max(1, total) + bar = "=" * filled_len + "-" * (bar_length - filled_len) + + line_str = f"{prefix} |{bar}| {percent}% - {date_str}" + sys.stdout.write("\r" + line_str) + sys.stdout.flush() + + if iteration == total: + print() # newline after finishing + + +class BeRealExporter: + """ + Main exporter logic, with curses or fallback for logs. + Using timezone_at only (not closest_timezone_at). + """ + + def __init__(self, args: argparse.Namespace, logger): self.args = args + self.logger = logger self.verbose = args.verbose self.exiftool_path = args.exiftool_path self.out_path = args.out_path.rstrip("/") self.bereal_path = args.bereal_path.rstrip("/") self.create_composites = args.composites self.default_tz = args.default_tz + + # parse timespan/year self.time_span = self.init_time_span(args) + # For lat/lon lookups + self.tf = TimezoneFinder() + def init_time_span(self, args: argparse.Namespace) -> tuple: - """ - Parse --timespan or --year to define (start_dt, end_dt). - If neither is provided, use entire history: 1970 -> now. - """ if args.timespan: try: start_str, end_str = args.timespan.strip().split("-") @@ -191,21 +190,17 @@ class ExporterBase: else: naive_start = dt.strptime(start_str, "%d.%m.%Y") start = naive_start.replace(tzinfo=timezone.utc) - if end_str == "*": end = dt.now(tz=timezone.utc) else: naive_end = dt.strptime(end_str, "%d.%m.%Y") naive_end = naive_end.replace(hour=23, minute=59, second=59) end = naive_end.replace(tzinfo=timezone.utc) - return start, end - except ValueError: raise ValueError( "Invalid timespan format. Use 'DD.MM.YYYY-DD.MM.YYYY' or '*' wildcard." ) - elif args.year: naive_start = dt(args.year, 1, 1) naive_end = dt(args.year, 12, 31, 23, 59, 59) @@ -213,33 +208,37 @@ class ExporterBase: naive_start.replace(tzinfo=timezone.utc), naive_end.replace(tzinfo=timezone.utc), ) - else: return ( dt(1970, 1, 1, tzinfo=timezone.utc), dt.now(tz=timezone.utc), ) + def verbose_msg(self, msg: str): + if self.verbose: + self.logger.print_log(msg) + + def log(self, text: str, force: bool = False): + self.logger.print_log(text, force=force) + + def show_progress(self, i: int, total: int, prefix="", date_str=""): + self.logger.show_progress(i, total, prefix, date_str) + def resolve_img_path(self, path_str: str) -> Optional[str]: - """ - Resolves local path for front/back images in Photos/post or Photos/bereal - by checking the basename. If it doesn't exist, returns None. - """ - # If path includes /post/ or /bereal/ specifically, try that if "/post/" in path_str: - c = os.path.join( + candidate = os.path.join( self.bereal_path, "Photos/post", os.path.basename(path_str) ) - if os.path.isfile(c): - return c + if os.path.isfile(candidate): + return candidate elif "/bereal/" in path_str: - c = os.path.join( + candidate = os.path.join( self.bereal_path, "Photos/bereal", os.path.basename(path_str) ) - if os.path.isfile(c): - return c + if os.path.isfile(candidate): + return candidate - # fallback check both subfolders + # fallback p1 = os.path.join(self.bereal_path, "Photos/post", os.path.basename(path_str)) p2 = os.path.join(self.bereal_path, "Photos/bereal", os.path.basename(path_str)) if os.path.isfile(p1): @@ -248,100 +247,55 @@ class ExporterBase: return p2 return None - -class BeRealExporter(ExporterBase): - """ - Handles exporting memories & realmojis, along with local time zone logic. - Composites optionally with rounded corners for the front image. - """ - - def __init__(self, args: argparse.Namespace, logger): - super().__init__(args) - self.logger = logger - self.tf = TimezoneFinder() - - def verbose_msg(self, text: str): - if self.verbose: - self.logger.print_log(text) - - def log(self, text: str, force: bool = False): - self.logger.print_log(text, force=force) - - def show_progress(self, iteration: int, total: int, prefix="", date_str=""): - self.logger.show_progress(iteration, total, prefix, date_str) - def localize_datetime(self, dt_utc: dt, lat: float, lon: float) -> dt: """ - 1) If lat/lon is missing, attempt fallback time zone from --default-timezone, - else remain UTC. - 2) If lat/lon exist, use closest_timezone_at(...) for best results; if that fails, - we fallback to basic timezone_at(...). - 3) If still no zone found or error, use --default-timezone if given, else UTC. + Use tf.timezone_at(...) only. If lat/lon missing or fails, fallback to default tz or stay UTC. """ - - # 1) lat/lon missing => fallback or UTC if lat is None or lon is None: + # fallback if self.default_tz: try: fallback_zone = pytz.timezone(self.default_tz) return dt_utc.astimezone(fallback_zone) except Exception as e: - self.log( - f"Warning: fallback time zone '{self.default_tz}' invalid: {e}", - force=True, + self.verbose_msg( + f"Warning: fallback time zone '{self.default_tz}' invalid: {e}" ) - # remain UTC return dt_utc - # 2) lat/lon exist => attempt closest time zone try: - tz_name = self.tf.closest_timezone_at(lng=lon, lat=lat) - # if we want to try forceTZ=True, we can: - # tz_name = self.tf.closest_timezone_at(lng=lon, lat=lat, forceTZ=True) - if tz_name is None: - # fallback to normal polygon-based - tz_name = self.tf.timezone_at(lng=lon, lat=lat) - + tz_name = self.tf.timezone_at(lng=lon, lat=lat) if tz_name: local_zone = pytz.timezone(tz_name) return dt_utc.astimezone(local_zone) else: - # fallback => default tz or remain UTC + # fallback if self.default_tz: try: fallback_zone = pytz.timezone(self.default_tz) return dt_utc.astimezone(fallback_zone) except Exception as e: - self.log( - f"Warning: fallback time zone '{self.default_tz}' invalid: {e}", - force=True, + self.verbose_msg( + f"Warning: fallback time zone '{self.default_tz}' invalid: {e}" ) return dt_utc - except Exception as e: - # timezone lookup failed => fallback => default tz or UTC - self.log( - f"Warning: Time zone lookup failed for lat={lat}, lon={lon}: {e}", - force=True, + self.verbose_msg( + f"Warning: Time zone lookup failed for lat={lat}, lon={lon}: {e}" ) if self.default_tz: try: fallback_zone = pytz.timezone(self.default_tz) return dt_utc.astimezone(fallback_zone) except Exception as e2: - self.log( - f"Warning: fallback time zone '{self.default_tz}' invalid: {e2}", - force=True, + self.verbose_msg( + f"Warning: fallback time zone '{self.default_tz}' invalid: {e2}" ) return dt_utc def embed_exif( self, file_name: str, dt_utc: dt, lat: float = None, lon: float = None ): - """ - Convert dt_utc -> local time zone, then store that naive time in EXIF:DateTimeOriginal. - If lat/lon exist, store them as well in GPS tags. - """ final_dt = self.localize_datetime(dt_utc, lat, lon) naive_local = final_dt.replace(tzinfo=None) @@ -371,17 +325,12 @@ class BeRealExporter(ExporterBase): def copy_and_embed( self, old_path: str, new_path: str, dt_utc: dt, lat=None, lon=None ) -> Optional[str]: - """ - Copy file from old_path to new_path (keeping extension), - then embed date/time + GPS data in the new file. - """ if not old_path or not os.path.isfile(old_path): self.log(f"File not found: {old_path}", force=True) return None ext = os.path.splitext(old_path)[1] or ".webp" new_path = os.path.splitext(new_path)[0] + ext - os.makedirs(os.path.dirname(new_path), exist_ok=True) cp(old_path, new_path) @@ -398,13 +347,6 @@ class BeRealExporter(ExporterBase): lat=None, lon=None, ) -> Optional[str]: - """ - 1) Scale front to 1/4 of back's width. - 2) Round corners on front. - 3) Composite front at (0,0) on back. - 4) Flatten to RGB => out_path - 5) Then embed EXIF - """ ext = os.path.splitext(out_path)[1] or ".webp" out_path = os.path.splitext(out_path)[0] + ext try: @@ -414,13 +356,11 @@ class BeRealExporter(ExporterBase): b_w, b_h = b_img.size f_w, f_h = f_img.size - scale_factor = max(1, b_w // 4) new_f_h = int((scale_factor / f_w) * f_h) - front_resized = f_img.resize((scale_factor, new_f_h), Image.LANCZOS) - # Rounded corners + # Round corners mask = Image.new("L", front_resized.size, 0) draw = ImageDraw.Draw(mask) radius = min(front_resized.size) // 8 @@ -429,12 +369,11 @@ class BeRealExporter(ExporterBase): ) front_resized.putalpha(mask) - # alpha_composite onto back b_img.alpha_composite(front_resized, (0, 0)) - # Flatten to RGB final = b_img.convert("RGB") final.save(out_path) + except Exception as e: self.log( f"Error creating composite for {front_path} & {back_path}: {e}", @@ -442,15 +381,12 @@ class BeRealExporter(ExporterBase): ) return None - # embed EXIF + # embed exif self.embed_exif(out_path, dt_utc, lat, lon) self.verbose_msg(f"Composite saved: {out_path}") return out_path def filter_memories_in_timespan(self, memories): - """ - Return only memories whose takenTime is within time_span. - """ valid = [] start_dt, end_dt = self.time_span for m in memories: @@ -464,9 +400,6 @@ class BeRealExporter(ExporterBase): return valid def export_memories(self, memories): - """ - Copies front/back images, embed EXIF, optionally create composites. - """ memories = self.filter_memories_in_timespan(memories) def dt_key(x): @@ -506,7 +439,6 @@ class BeRealExporter(ExporterBase): final_front = self.copy_and_embed(front_src, front_out, m_dt_utc, lat, lon) final_back = self.copy_and_embed(back_src, back_out, m_dt_utc, lat, lon) - # if user didn't say --no-composites if self.create_composites and final_front and final_back: comp_out = os.path.join(out_cmp, f"{base_ts}_composite") self.create_composite( @@ -534,10 +466,6 @@ class BeRealExporter(ExporterBase): return valid def export_realmojis(self, realmojis): - """ - Copies realmoji files from Photos/Realmoji, embed EXIF date/time - (GPS not typically used). - """ realmojis = self.filter_realmojis_in_timespan(realmojis) def dt_key(x): @@ -574,20 +502,19 @@ class BeRealExporter(ExporterBase): def run_in_curses(): """ Attempts to run with curses-based logs/progress if verbose is True. - Otherwise fallback to a non-curses run. + If window is too small, might restart or fallback if curses.error triggers. """ args = init_parser() if not args.verbose: - # skip curses if not verbose + # if not verbose, skip curses run_no_curses(args) return def main_curses(stdscr): - cl = CursesLogger(stdscr) - exporter = BeRealExporter(args, logger=cl) + logger = CursesLogger(stdscr) + exporter = BeRealExporter(args, logger=logger) - # Export memories if args.memories: try: with open( @@ -596,11 +523,10 @@ def run_in_curses(): mems = json.load(f) exporter.export_memories(mems) except FileNotFoundError: - cl.print_log("Error: memories.json file not found.", force=True) + logger.print_log("Error: memories.json file not found.", force=True) except json.JSONDecodeError: - cl.print_log("Error decoding memories.json file.", force=True) + logger.print_log("Error decoding memories.json file.", force=True) - # Export realmojis if args.realmojis: try: with open( @@ -609,11 +535,11 @@ def run_in_curses(): rms = json.load(f) exporter.export_realmojis(rms) except FileNotFoundError: - cl.print_log("Error: realmojis.json file not found.", force=True) + logger.print_log("Error: realmojis.json file not found.", force=True) except json.JSONDecodeError: - cl.print_log("Error decoding realmojis.json file.", force=True) + logger.print_log("Error decoding realmojis.json file.", force=True) - cl.print_log("\nAll done. Press any key to exit.") + logger.print_log("\nAll done. Press any key to exit.") stdscr.getch() try: @@ -625,7 +551,7 @@ def run_in_curses(): def run_no_curses(args: argparse.Namespace): """ - Minimal run without curses-based logging. + Basic logger with inline progress bar, no curses. """ logger = BasicLogger(verbose=args.verbose) exporter = BeRealExporter(args, logger=logger)