timezone fixes

This commit is contained in:
Josh Patra
2024-12-25 18:37:22 -05:00
parent f0cf3e1e9c
commit c537a84a2e

View File

@@ -11,8 +11,6 @@ from typing import Optional
import pytz
from exiftool import ExifToolHelper as et
from PIL import Image, ImageDraw
# Additional libraries for dynamic time zone lookup
from timezonefinder import TimezoneFinder
@@ -78,45 +76,10 @@ def init_parser() -> argparse.Namespace:
return parser.parse_args()
class BasicLogger:
"""
A fallback / minimal logger that simply prints lines.
If not verbose, prints almost nothing (only error messages).
"""
def __init__(self, verbose: bool):
self.verbose = verbose
def print_log(self, text: str, force: bool = False):
if self.verbose or force:
print(text)
def show_progress(self, iteration: int, total: int, prefix="", date_str=""):
"""
Overwrites a single line each time to show a basic progress bar.
If not verbose, we still do a minimal inline bar.
"""
if total == 0:
percent = 100
else:
percent = int(100 * iteration / total)
bar_length = 40
filled_len = bar_length * iteration // max(1, total)
bar = "=" * filled_len + "-" * (bar_length - filled_len)
line_str = f"{prefix} |{bar}| {percent}% - {date_str}"
sys.stdout.write("\r" + line_str)
sys.stdout.flush()
if iteration == total:
print() # final newline
class CursesLogger:
"""
When verbose is True and curses is available, we keep a multi-line log above
and a pinned progress bar at the bottom.
and a pinned progress bar at the bottom. This might restart if the window is resized too small.
"""
def __init__(self, stdscr):
@@ -124,7 +87,7 @@ class CursesLogger:
curses.curs_set(0) # hide cursor
self.max_y, self.max_x = self.stdscr.getmaxyx()
self.log_height = self.max_y - 2 # keep bottom line(s) for progress bar
self.log_height = self.max_y - 2 # keep bottom line(s) for the progress bar
# create log window
self.logwin = curses.newwin(self.log_height, self.max_x, 0, 0)
@@ -136,10 +99,11 @@ class CursesLogger:
self.log_count = 0
def print_log(self, text: str, force: bool = False):
# Force doesn't matter in curses; we always show
self.logwin.addstr(self.log_count, 0, text)
self.logwin.clrtoeol()
self.log_count += 1
if self.log_count >= (self.log_height):
if self.log_count >= self.log_height:
self.logwin.scroll(1)
self.log_count -= 1
self.logwin.refresh()
@@ -159,30 +123,65 @@ class CursesLogger:
line_str = f"{prefix} |{bar}| {percent}% - {date_str}"
self.pbwin.clear()
# Clip if the line is longer than the terminal
self.pbwin.addstr(0, 0, line_str[: self.max_x - 1])
self.pbwin.refresh()
class ExporterBase:
class BasicLogger:
"""
Common logic for BeRealExporter, including date range parsing, etc.
A fallback / minimal logger if curses fails or if verbose isn't set.
"""
def __init__(self, args: argparse.Namespace):
def __init__(self, verbose: bool):
self.verbose = verbose
def print_log(self, text: str, force: bool = False):
if self.verbose or force:
print(text)
def show_progress(self, iteration: int, total: int, prefix="", date_str=""):
# Overwrites one line with a simple bar
if total == 0:
percent = 100
else:
percent = int(100 * iteration / total)
bar_length = 40
filled_len = bar_length * iteration // max(1, total)
bar = "=" * filled_len + "-" * (bar_length - filled_len)
line_str = f"{prefix} |{bar}| {percent}% - {date_str}"
sys.stdout.write("\r" + line_str)
sys.stdout.flush()
if iteration == total:
print() # newline after finishing
class BeRealExporter:
"""
Main exporter logic, with curses or fallback for logs.
Using timezone_at only (not closest_timezone_at).
"""
def __init__(self, args: argparse.Namespace, logger):
self.args = args
self.logger = logger
self.verbose = args.verbose
self.exiftool_path = args.exiftool_path
self.out_path = args.out_path.rstrip("/")
self.bereal_path = args.bereal_path.rstrip("/")
self.create_composites = args.composites
self.default_tz = args.default_tz
# parse timespan/year
self.time_span = self.init_time_span(args)
# For lat/lon lookups
self.tf = TimezoneFinder()
def init_time_span(self, args: argparse.Namespace) -> tuple:
"""
Parse --timespan or --year to define (start_dt, end_dt).
If neither is provided, use entire history: 1970 -> now.
"""
if args.timespan:
try:
start_str, end_str = args.timespan.strip().split("-")
@@ -191,21 +190,17 @@ class ExporterBase:
else:
naive_start = dt.strptime(start_str, "%d.%m.%Y")
start = naive_start.replace(tzinfo=timezone.utc)
if end_str == "*":
end = dt.now(tz=timezone.utc)
else:
naive_end = dt.strptime(end_str, "%d.%m.%Y")
naive_end = naive_end.replace(hour=23, minute=59, second=59)
end = naive_end.replace(tzinfo=timezone.utc)
return start, end
except ValueError:
raise ValueError(
"Invalid timespan format. Use 'DD.MM.YYYY-DD.MM.YYYY' or '*' wildcard."
)
elif args.year:
naive_start = dt(args.year, 1, 1)
naive_end = dt(args.year, 12, 31, 23, 59, 59)
@@ -213,33 +208,37 @@ class ExporterBase:
naive_start.replace(tzinfo=timezone.utc),
naive_end.replace(tzinfo=timezone.utc),
)
else:
return (
dt(1970, 1, 1, tzinfo=timezone.utc),
dt.now(tz=timezone.utc),
)
def verbose_msg(self, msg: str):
if self.verbose:
self.logger.print_log(msg)
def log(self, text: str, force: bool = False):
self.logger.print_log(text, force=force)
def show_progress(self, i: int, total: int, prefix="", date_str=""):
self.logger.show_progress(i, total, prefix, date_str)
def resolve_img_path(self, path_str: str) -> Optional[str]:
"""
Resolves local path for front/back images in Photos/post or Photos/bereal
by checking the basename. If it doesn't exist, returns None.
"""
# If path includes /post/ or /bereal/ specifically, try that
if "/post/" in path_str:
c = os.path.join(
candidate = os.path.join(
self.bereal_path, "Photos/post", os.path.basename(path_str)
)
if os.path.isfile(c):
return c
if os.path.isfile(candidate):
return candidate
elif "/bereal/" in path_str:
c = os.path.join(
candidate = os.path.join(
self.bereal_path, "Photos/bereal", os.path.basename(path_str)
)
if os.path.isfile(c):
return c
if os.path.isfile(candidate):
return candidate
# fallback check both subfolders
# fallback
p1 = os.path.join(self.bereal_path, "Photos/post", os.path.basename(path_str))
p2 = os.path.join(self.bereal_path, "Photos/bereal", os.path.basename(path_str))
if os.path.isfile(p1):
@@ -248,100 +247,55 @@ class ExporterBase:
return p2
return None
class BeRealExporter(ExporterBase):
"""
Handles exporting memories & realmojis, along with local time zone logic.
Composites optionally with rounded corners for the front image.
"""
def __init__(self, args: argparse.Namespace, logger):
super().__init__(args)
self.logger = logger
self.tf = TimezoneFinder()
def verbose_msg(self, text: str):
if self.verbose:
self.logger.print_log(text)
def log(self, text: str, force: bool = False):
self.logger.print_log(text, force=force)
def show_progress(self, iteration: int, total: int, prefix="", date_str=""):
self.logger.show_progress(iteration, total, prefix, date_str)
def localize_datetime(self, dt_utc: dt, lat: float, lon: float) -> dt:
"""
1) If lat/lon is missing, attempt fallback time zone from --default-timezone,
else remain UTC.
2) If lat/lon exist, use closest_timezone_at(...) for best results; if that fails,
we fallback to basic timezone_at(...).
3) If still no zone found or error, use --default-timezone if given, else UTC.
Use tf.timezone_at(...) only. If lat/lon missing or fails, fallback to default tz or stay UTC.
"""
# 1) lat/lon missing => fallback or UTC
if lat is None or lon is None:
# fallback
if self.default_tz:
try:
fallback_zone = pytz.timezone(self.default_tz)
return dt_utc.astimezone(fallback_zone)
except Exception as e:
self.log(
f"Warning: fallback time zone '{self.default_tz}' invalid: {e}",
force=True,
self.verbose_msg(
f"Warning: fallback time zone '{self.default_tz}' invalid: {e}"
)
# remain UTC
return dt_utc
# 2) lat/lon exist => attempt closest time zone
try:
tz_name = self.tf.closest_timezone_at(lng=lon, lat=lat)
# if we want to try forceTZ=True, we can:
# tz_name = self.tf.closest_timezone_at(lng=lon, lat=lat, forceTZ=True)
if tz_name is None:
# fallback to normal polygon-based
tz_name = self.tf.timezone_at(lng=lon, lat=lat)
tz_name = self.tf.timezone_at(lng=lon, lat=lat)
if tz_name:
local_zone = pytz.timezone(tz_name)
return dt_utc.astimezone(local_zone)
else:
# fallback => default tz or remain UTC
# fallback
if self.default_tz:
try:
fallback_zone = pytz.timezone(self.default_tz)
return dt_utc.astimezone(fallback_zone)
except Exception as e:
self.log(
f"Warning: fallback time zone '{self.default_tz}' invalid: {e}",
force=True,
self.verbose_msg(
f"Warning: fallback time zone '{self.default_tz}' invalid: {e}"
)
return dt_utc
except Exception as e:
# timezone lookup failed => fallback => default tz or UTC
self.log(
f"Warning: Time zone lookup failed for lat={lat}, lon={lon}: {e}",
force=True,
self.verbose_msg(
f"Warning: Time zone lookup failed for lat={lat}, lon={lon}: {e}"
)
if self.default_tz:
try:
fallback_zone = pytz.timezone(self.default_tz)
return dt_utc.astimezone(fallback_zone)
except Exception as e2:
self.log(
f"Warning: fallback time zone '{self.default_tz}' invalid: {e2}",
force=True,
self.verbose_msg(
f"Warning: fallback time zone '{self.default_tz}' invalid: {e2}"
)
return dt_utc
def embed_exif(
self, file_name: str, dt_utc: dt, lat: float = None, lon: float = None
):
"""
Convert dt_utc -> local time zone, then store that naive time in EXIF:DateTimeOriginal.
If lat/lon exist, store them as well in GPS tags.
"""
final_dt = self.localize_datetime(dt_utc, lat, lon)
naive_local = final_dt.replace(tzinfo=None)
@@ -371,17 +325,12 @@ class BeRealExporter(ExporterBase):
def copy_and_embed(
self, old_path: str, new_path: str, dt_utc: dt, lat=None, lon=None
) -> Optional[str]:
"""
Copy file from old_path to new_path (keeping extension),
then embed date/time + GPS data in the new file.
"""
if not old_path or not os.path.isfile(old_path):
self.log(f"File not found: {old_path}", force=True)
return None
ext = os.path.splitext(old_path)[1] or ".webp"
new_path = os.path.splitext(new_path)[0] + ext
os.makedirs(os.path.dirname(new_path), exist_ok=True)
cp(old_path, new_path)
@@ -398,13 +347,6 @@ class BeRealExporter(ExporterBase):
lat=None,
lon=None,
) -> Optional[str]:
"""
1) Scale front to 1/4 of back's width.
2) Round corners on front.
3) Composite front at (0,0) on back.
4) Flatten to RGB => out_path
5) Then embed EXIF
"""
ext = os.path.splitext(out_path)[1] or ".webp"
out_path = os.path.splitext(out_path)[0] + ext
try:
@@ -414,13 +356,11 @@ class BeRealExporter(ExporterBase):
b_w, b_h = b_img.size
f_w, f_h = f_img.size
scale_factor = max(1, b_w // 4)
new_f_h = int((scale_factor / f_w) * f_h)
front_resized = f_img.resize((scale_factor, new_f_h), Image.LANCZOS)
# Rounded corners
# Round corners
mask = Image.new("L", front_resized.size, 0)
draw = ImageDraw.Draw(mask)
radius = min(front_resized.size) // 8
@@ -429,12 +369,11 @@ class BeRealExporter(ExporterBase):
)
front_resized.putalpha(mask)
# alpha_composite onto back
b_img.alpha_composite(front_resized, (0, 0))
# Flatten to RGB
final = b_img.convert("RGB")
final.save(out_path)
except Exception as e:
self.log(
f"Error creating composite for {front_path} & {back_path}: {e}",
@@ -442,15 +381,12 @@ class BeRealExporter(ExporterBase):
)
return None
# embed EXIF
# embed exif
self.embed_exif(out_path, dt_utc, lat, lon)
self.verbose_msg(f"Composite saved: {out_path}")
return out_path
def filter_memories_in_timespan(self, memories):
"""
Return only memories whose takenTime is within time_span.
"""
valid = []
start_dt, end_dt = self.time_span
for m in memories:
@@ -464,9 +400,6 @@ class BeRealExporter(ExporterBase):
return valid
def export_memories(self, memories):
"""
Copies front/back images, embed EXIF, optionally create composites.
"""
memories = self.filter_memories_in_timespan(memories)
def dt_key(x):
@@ -506,7 +439,6 @@ class BeRealExporter(ExporterBase):
final_front = self.copy_and_embed(front_src, front_out, m_dt_utc, lat, lon)
final_back = self.copy_and_embed(back_src, back_out, m_dt_utc, lat, lon)
# if user didn't say --no-composites
if self.create_composites and final_front and final_back:
comp_out = os.path.join(out_cmp, f"{base_ts}_composite")
self.create_composite(
@@ -534,10 +466,6 @@ class BeRealExporter(ExporterBase):
return valid
def export_realmojis(self, realmojis):
"""
Copies realmoji files from Photos/Realmoji, embed EXIF date/time
(GPS not typically used).
"""
realmojis = self.filter_realmojis_in_timespan(realmojis)
def dt_key(x):
@@ -574,20 +502,19 @@ class BeRealExporter(ExporterBase):
def run_in_curses():
"""
Attempts to run with curses-based logs/progress if verbose is True.
Otherwise fallback to a non-curses run.
If window is too small, might restart or fallback if curses.error triggers.
"""
args = init_parser()
if not args.verbose:
# skip curses if not verbose
# if not verbose, skip curses
run_no_curses(args)
return
def main_curses(stdscr):
cl = CursesLogger(stdscr)
exporter = BeRealExporter(args, logger=cl)
logger = CursesLogger(stdscr)
exporter = BeRealExporter(args, logger=logger)
# Export memories
if args.memories:
try:
with open(
@@ -596,11 +523,10 @@ def run_in_curses():
mems = json.load(f)
exporter.export_memories(mems)
except FileNotFoundError:
cl.print_log("Error: memories.json file not found.", force=True)
logger.print_log("Error: memories.json file not found.", force=True)
except json.JSONDecodeError:
cl.print_log("Error decoding memories.json file.", force=True)
logger.print_log("Error decoding memories.json file.", force=True)
# Export realmojis
if args.realmojis:
try:
with open(
@@ -609,11 +535,11 @@ def run_in_curses():
rms = json.load(f)
exporter.export_realmojis(rms)
except FileNotFoundError:
cl.print_log("Error: realmojis.json file not found.", force=True)
logger.print_log("Error: realmojis.json file not found.", force=True)
except json.JSONDecodeError:
cl.print_log("Error decoding realmojis.json file.", force=True)
logger.print_log("Error decoding realmojis.json file.", force=True)
cl.print_log("\nAll done. Press any key to exit.")
logger.print_log("\nAll done. Press any key to exit.")
stdscr.getch()
try:
@@ -625,7 +551,7 @@ def run_in_curses():
def run_no_curses(args: argparse.Namespace):
"""
Minimal run without curses-based logging.
Basic logger with inline progress bar, no curses.
"""
logger = BasicLogger(verbose=args.verbose)
exporter = BeRealExporter(args, logger=logger)