Files
BeReal-Export-Manager/bereal_exporter.py

660 lines
22 KiB
Python

import argparse
import curses
import json
import os
import sys
from datetime import datetime as dt
from datetime import timezone
from shutil import copy2 as cp
from typing import Optional
import pytz
from exiftool import ExifToolHelper as et
from PIL import Image, ImageDraw
# Additional libraries for dynamic time zone lookup
from timezonefinder import TimezoneFinder
def init_parser() -> argparse.Namespace:
"""
Initializes the argparse module.
"""
parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument(
"-v", "--verbose", action="store_true", help="Explain what is being done."
)
parser.add_argument(
"--exiftool-path", dest="exiftool_path", help="Path to ExifTool executable."
)
parser.add_argument(
"-t",
"--timespan",
type=str,
help="DD.MM.YYYY-DD.MM.YYYY or wildcards with '*'.",
)
parser.add_argument("-y", "--year", type=int, help="Exports the given year.")
parser.add_argument(
"-p",
"--out-path",
dest="out_path",
default="./out",
help="Export output path (default ./out).",
)
parser.add_argument(
"--bereal-path",
dest="bereal_path",
default=".",
help="Path to BeReal data (default ./).",
)
parser.add_argument(
"--no-memories",
dest="memories",
default=True,
action="store_false",
help="Don't export memories.",
)
parser.add_argument(
"--no-realmojis",
dest="realmojis",
default=True,
action="store_false",
help="Don't export realmojis.",
)
parser.add_argument(
"--no-composites",
dest="composites",
default=True,
action="store_false",
help="Don't create a composite image front-on-back for each memory.",
)
parser.add_argument(
"--default-timezone",
dest="default_tz",
type=str,
default=None,
help="If no lat/lon or time zone lookup fails, fall back to this time zone (e.g. 'America/New_York').",
)
return parser.parse_args()
class BasicLogger:
"""
A fallback / minimal logger that simply prints lines.
If not verbose, prints almost nothing (only error messages).
"""
def __init__(self, verbose: bool):
self.verbose = verbose
def print_log(self, text: str, force: bool = False):
if self.verbose or force:
print(text)
def show_progress(self, iteration: int, total: int, prefix="", date_str=""):
"""
Overwrites a single line each time to show a basic progress bar.
If not verbose, we still do a minimal inline bar.
"""
if total == 0:
percent = 100
else:
percent = int(100 * iteration / total)
bar_length = 40
filled_len = bar_length * iteration // max(1, total)
bar = "=" * filled_len + "-" * (bar_length - filled_len)
line_str = f"{prefix} |{bar}| {percent}% - {date_str}"
sys.stdout.write("\r" + line_str)
sys.stdout.flush()
if iteration == total:
print() # final newline
class CursesLogger:
"""
When verbose is True and curses is available, we keep a multi-line log above
and a pinned progress bar at the bottom.
"""
def __init__(self, stdscr):
self.stdscr = stdscr
curses.curs_set(0) # hide cursor
self.max_y, self.max_x = self.stdscr.getmaxyx()
self.log_height = self.max_y - 2 # keep bottom line(s) for progress bar
# create log window
self.logwin = curses.newwin(self.log_height, self.max_x, 0, 0)
self.logwin.scrollok(True)
# create progress bar window
self.pbwin = curses.newwin(1, self.max_x, self.log_height, 0)
self.log_count = 0
def print_log(self, text: str, force: bool = False):
self.logwin.addstr(self.log_count, 0, text)
self.logwin.clrtoeol()
self.log_count += 1
if self.log_count >= (self.log_height):
self.logwin.scroll(1)
self.log_count -= 1
self.logwin.refresh()
def show_progress(self, iteration: int, total: int, prefix="", date_str=""):
if total == 0:
percent = 100
else:
percent = int(100 * iteration / total)
bar_length = self.max_x - 30
if bar_length < 10:
bar_length = 10
filled_len = bar_length * iteration // max(1, total)
bar = "" * filled_len + "-" * (bar_length - filled_len)
line_str = f"{prefix} |{bar}| {percent}% - {date_str}"
self.pbwin.clear()
self.pbwin.addstr(0, 0, line_str[: self.max_x - 1])
self.pbwin.refresh()
class ExporterBase:
"""
Common logic for BeRealExporter, including date range parsing, etc.
"""
def __init__(self, args: argparse.Namespace):
self.args = args
self.verbose = args.verbose
self.exiftool_path = args.exiftool_path
self.out_path = args.out_path.rstrip("/")
self.bereal_path = args.bereal_path.rstrip("/")
self.create_composites = args.composites
self.default_tz = args.default_tz
self.time_span = self.init_time_span(args)
def init_time_span(self, args: argparse.Namespace) -> tuple:
"""
Parse --timespan or --year to define (start_dt, end_dt).
If neither is provided, use entire history: 1970 -> now.
"""
if args.timespan:
try:
start_str, end_str = args.timespan.strip().split("-")
if start_str == "*":
start = dt(1970, 1, 1, tzinfo=timezone.utc)
else:
naive_start = dt.strptime(start_str, "%d.%m.%Y")
start = naive_start.replace(tzinfo=timezone.utc)
if end_str == "*":
end = dt.now(tz=timezone.utc)
else:
naive_end = dt.strptime(end_str, "%d.%m.%Y")
naive_end = naive_end.replace(hour=23, minute=59, second=59)
end = naive_end.replace(tzinfo=timezone.utc)
return start, end
except ValueError:
raise ValueError(
"Invalid timespan format. Use 'DD.MM.YYYY-DD.MM.YYYY' or '*' wildcard."
)
elif args.year:
naive_start = dt(args.year, 1, 1)
naive_end = dt(args.year, 12, 31, 23, 59, 59)
return (
naive_start.replace(tzinfo=timezone.utc),
naive_end.replace(tzinfo=timezone.utc),
)
else:
return (
dt(1970, 1, 1, tzinfo=timezone.utc),
dt.now(tz=timezone.utc),
)
def resolve_img_path(self, path_str: str) -> Optional[str]:
"""
Resolves local path for front/back images in Photos/post or Photos/bereal
by checking the basename. If it doesn't exist, returns None.
"""
# If path includes /post/ or /bereal/ specifically, try that
if "/post/" in path_str:
c = os.path.join(
self.bereal_path, "Photos/post", os.path.basename(path_str)
)
if os.path.isfile(c):
return c
elif "/bereal/" in path_str:
c = os.path.join(
self.bereal_path, "Photos/bereal", os.path.basename(path_str)
)
if os.path.isfile(c):
return c
# fallback check both subfolders
p1 = os.path.join(self.bereal_path, "Photos/post", os.path.basename(path_str))
p2 = os.path.join(self.bereal_path, "Photos/bereal", os.path.basename(path_str))
if os.path.isfile(p1):
return p1
if os.path.isfile(p2):
return p2
return None
class BeRealExporter(ExporterBase):
"""
Handles exporting memories & realmojis, along with local time zone logic.
Composites optionally with rounded corners for the front image.
"""
def __init__(self, args: argparse.Namespace, logger):
super().__init__(args)
self.logger = logger
self.tf = TimezoneFinder()
def verbose_msg(self, text: str):
if self.verbose:
self.logger.print_log(text)
def log(self, text: str, force: bool = False):
self.logger.print_log(text, force=force)
def show_progress(self, iteration: int, total: int, prefix="", date_str=""):
self.logger.show_progress(iteration, total, prefix, date_str)
def localize_datetime(self, dt_utc: dt, lat: float, lon: float) -> dt:
"""
1) If lat/lon is missing, attempt fallback time zone from --default-timezone,
else remain UTC.
2) If lat/lon exist, use closest_timezone_at(...) for best results; if that fails,
we fallback to basic timezone_at(...).
3) If still no zone found or error, use --default-timezone if given, else UTC.
"""
# 1) lat/lon missing => fallback or UTC
if lat is None or lon is None:
if self.default_tz:
try:
fallback_zone = pytz.timezone(self.default_tz)
return dt_utc.astimezone(fallback_zone)
except Exception as e:
self.log(
f"Warning: fallback time zone '{self.default_tz}' invalid: {e}",
force=True,
)
# remain UTC
return dt_utc
# 2) lat/lon exist => attempt closest time zone
try:
tz_name = self.tf.closest_timezone_at(lng=lon, lat=lat)
# if we want to try forceTZ=True, we can:
# tz_name = self.tf.closest_timezone_at(lng=lon, lat=lat, forceTZ=True)
if tz_name is None:
# fallback to normal polygon-based
tz_name = self.tf.timezone_at(lng=lon, lat=lat)
if tz_name:
local_zone = pytz.timezone(tz_name)
return dt_utc.astimezone(local_zone)
else:
# fallback => default tz or remain UTC
if self.default_tz:
try:
fallback_zone = pytz.timezone(self.default_tz)
return dt_utc.astimezone(fallback_zone)
except Exception as e:
self.log(
f"Warning: fallback time zone '{self.default_tz}' invalid: {e}",
force=True,
)
return dt_utc
except Exception as e:
# timezone lookup failed => fallback => default tz or UTC
self.log(
f"Warning: Time zone lookup failed for lat={lat}, lon={lon}: {e}",
force=True,
)
if self.default_tz:
try:
fallback_zone = pytz.timezone(self.default_tz)
return dt_utc.astimezone(fallback_zone)
except Exception as e2:
self.log(
f"Warning: fallback time zone '{self.default_tz}' invalid: {e2}",
force=True,
)
return dt_utc
def embed_exif(
self, file_name: str, dt_utc: dt, lat: float = None, lon: float = None
):
"""
Convert dt_utc -> local time zone, then store that naive time in EXIF:DateTimeOriginal.
If lat/lon exist, store them as well in GPS tags.
"""
final_dt = self.localize_datetime(dt_utc, lat, lon)
naive_local = final_dt.replace(tzinfo=None)
tags = {
"EXIF:DateTimeOriginal": naive_local.strftime("%Y:%m:%d %H:%M:%S"),
}
if lat is not None and lon is not None:
lat_ref = "N" if lat >= 0 else "S"
lon_ref = "E" if lon >= 0 else "W"
tags.update(
{
"EXIF:GPSLatitude": abs(lat),
"EXIF:GPSLatitudeRef": lat_ref,
"EXIF:GPSLongitude": abs(lon),
"EXIF:GPSLongitudeRef": lon_ref,
}
)
try:
with (
et(executable=self.exiftool_path) if self.exiftool_path else et()
) as ex:
ex.set_tags(file_name, tags=tags, params=["-P", "-overwrite_original"])
except Exception as e:
self.log(f"Error embedding EXIF to {file_name}: {e}", force=True)
def copy_and_embed(
self, old_path: str, new_path: str, dt_utc: dt, lat=None, lon=None
) -> Optional[str]:
"""
Copy file from old_path to new_path (keeping extension),
then embed date/time + GPS data in the new file.
"""
if not old_path or not os.path.isfile(old_path):
self.log(f"File not found: {old_path}", force=True)
return None
ext = os.path.splitext(old_path)[1] or ".webp"
new_path = os.path.splitext(new_path)[0] + ext
os.makedirs(os.path.dirname(new_path), exist_ok=True)
cp(old_path, new_path)
self.embed_exif(new_path, dt_utc, lat, lon)
self.verbose_msg(f"Copied & embedded {old_path} -> {new_path}")
return new_path
def create_composite(
self,
front_path: str,
back_path: str,
out_path: str,
dt_utc: dt,
lat=None,
lon=None,
) -> Optional[str]:
"""
1) Scale front to 1/4 of back's width.
2) Round corners on front.
3) Composite front at (0,0) on back.
4) Flatten to RGB => out_path
5) Then embed EXIF
"""
ext = os.path.splitext(out_path)[1] or ".webp"
out_path = os.path.splitext(out_path)[0] + ext
try:
with Image.open(back_path) as b_img, Image.open(front_path) as f_img:
b_img = b_img.convert("RGBA")
f_img = f_img.convert("RGBA")
b_w, b_h = b_img.size
f_w, f_h = f_img.size
scale_factor = max(1, b_w // 4)
new_f_h = int((scale_factor / f_w) * f_h)
front_resized = f_img.resize((scale_factor, new_f_h), Image.LANCZOS)
# Rounded corners
mask = Image.new("L", front_resized.size, 0)
draw = ImageDraw.Draw(mask)
radius = min(front_resized.size) // 8
draw.rounded_rectangle(
[(0, 0), front_resized.size], radius=radius, fill=255
)
front_resized.putalpha(mask)
# alpha_composite onto back
b_img.alpha_composite(front_resized, (0, 0))
# Flatten to RGB
final = b_img.convert("RGB")
final.save(out_path)
except Exception as e:
self.log(
f"Error creating composite for {front_path} & {back_path}: {e}",
force=True,
)
return None
# embed EXIF
self.embed_exif(out_path, dt_utc, lat, lon)
self.verbose_msg(f"Composite saved: {out_path}")
return out_path
def filter_memories_in_timespan(self, memories):
"""
Return only memories whose takenTime is within time_span.
"""
valid = []
start_dt, end_dt = self.time_span
for m in memories:
try:
raw = m["takenTime"].replace("Z", "+00:00")
d = dt.fromisoformat(raw).astimezone(timezone.utc)
except:
continue
if start_dt <= d <= end_dt:
valid.append(m)
return valid
def export_memories(self, memories):
"""
Copies front/back images, embed EXIF, optionally create composites.
"""
memories = self.filter_memories_in_timespan(memories)
def dt_key(x):
try:
return dt.fromisoformat(x["takenTime"].replace("Z", "+00:00"))
except:
return dt.min.replace(tzinfo=timezone.utc)
memories.sort(key=dt_key)
out_mem = os.path.join(self.out_path, "memories")
out_cmp = os.path.join(self.out_path, "composites")
os.makedirs(out_mem, exist_ok=True)
os.makedirs(out_cmp, exist_ok=True)
total = len(memories)
for i, mem in enumerate(memories, start=1):
raw = mem["takenTime"].replace("Z", "+00:00")
m_dt_utc = dt.fromisoformat(raw).astimezone(timezone.utc)
loc = mem.get("location", {})
lat = loc.get("latitude")
lon = loc.get("longitude")
front_src = self.resolve_img_path(mem["frontImage"]["path"])
back_src = self.resolve_img_path(mem["backImage"]["path"])
if not front_src or not back_src:
self.log(
"Skipping memory due to missing front/back images.", force=True
)
self.show_progress(i, total, prefix="Exporting Memories")
continue
base_ts = m_dt_utc.strftime("%Y-%m-%d_%H-%M-%S")
front_out = os.path.join(out_mem, f"{base_ts}_front")
back_out = os.path.join(out_mem, f"{base_ts}_back")
final_front = self.copy_and_embed(front_src, front_out, m_dt_utc, lat, lon)
final_back = self.copy_and_embed(back_src, back_out, m_dt_utc, lat, lon)
# if user didn't say --no-composites
if self.create_composites and final_front and final_back:
comp_out = os.path.join(out_cmp, f"{base_ts}_composite")
self.create_composite(
final_front, final_back, comp_out, m_dt_utc, lat, lon
)
self.show_progress(
i,
total,
prefix="Exporting Memories",
date_str=m_dt_utc.strftime("%Y-%m-%d"),
)
def filter_realmojis_in_timespan(self, realmojis):
valid = []
start_dt, end_dt = self.time_span
for r in realmojis:
try:
raw = r["postedAt"].replace("Z", "+00:00")
d = dt.fromisoformat(raw).astimezone(timezone.utc)
except:
continue
if start_dt <= d <= end_dt:
valid.append(r)
return valid
def export_realmojis(self, realmojis):
"""
Copies realmoji files from Photos/Realmoji, embed EXIF date/time
(GPS not typically used).
"""
realmojis = self.filter_realmojis_in_timespan(realmojis)
def dt_key(x):
try:
return dt.fromisoformat(x["postedAt"].replace("Z", "+00:00"))
except:
return dt.min.replace(tzinfo=timezone.utc)
realmojis.sort(key=dt_key)
out_rm = os.path.join(self.out_path, "realmojis")
os.makedirs(out_rm, exist_ok=True)
total = len(realmojis)
for i, rm in enumerate(realmojis, start=1):
raw = rm["postedAt"].replace("Z", "+00:00")
rm_dt_utc = dt.fromisoformat(raw).astimezone(timezone.utc)
media_path = os.path.basename(rm["media"]["path"])
old_path = os.path.join(self.bereal_path, "Photos", "Realmoji", media_path)
base_ts = rm_dt_utc.strftime("%Y-%m-%d_%H-%M-%S")
out_file = os.path.join(out_rm, base_ts)
self.copy_and_embed(old_path, out_file, rm_dt_utc)
self.show_progress(
i,
total,
prefix="Exporting Realmojis",
date_str=rm_dt_utc.strftime("%Y-%m-%d"),
)
def run_in_curses():
"""
Attempts to run with curses-based logs/progress if verbose is True.
Otherwise fallback to a non-curses run.
"""
args = init_parser()
if not args.verbose:
# skip curses if not verbose
run_no_curses(args)
return
def main_curses(stdscr):
cl = CursesLogger(stdscr)
exporter = BeRealExporter(args, logger=cl)
# Export memories
if args.memories:
try:
with open(
os.path.join(args.bereal_path, "memories.json"), encoding="utf-8"
) as f:
mems = json.load(f)
exporter.export_memories(mems)
except FileNotFoundError:
cl.print_log("Error: memories.json file not found.", force=True)
except json.JSONDecodeError:
cl.print_log("Error decoding memories.json file.", force=True)
# Export realmojis
if args.realmojis:
try:
with open(
os.path.join(args.bereal_path, "realmojis.json"), encoding="utf-8"
) as f:
rms = json.load(f)
exporter.export_realmojis(rms)
except FileNotFoundError:
cl.print_log("Error: realmojis.json file not found.", force=True)
except json.JSONDecodeError:
cl.print_log("Error decoding realmojis.json file.", force=True)
cl.print_log("\nAll done. Press any key to exit.")
stdscr.getch()
try:
curses.wrapper(main_curses)
except curses.error:
print("Curses failed. Fallback to non-curses run.")
run_no_curses(args)
def run_no_curses(args: argparse.Namespace):
"""
Minimal run without curses-based logging.
"""
logger = BasicLogger(verbose=args.verbose)
exporter = BeRealExporter(args, logger=logger)
if args.memories:
try:
with open(
os.path.join(args.bereal_path, "memories.json"), encoding="utf-8"
) as f:
mems = json.load(f)
exporter.export_memories(mems)
except FileNotFoundError:
logger.print_log("Error: memories.json file not found.", force=True)
except json.JSONDecodeError:
logger.print_log("Error decoding memories.json file.", force=True)
if args.realmojis:
try:
with open(
os.path.join(args.bereal_path, "realmojis.json"), encoding="utf-8"
) as f:
rms = json.load(f)
exporter.export_realmojis(rms)
except FileNotFoundError:
logger.print_log("Error: realmojis.json file not found.", force=True)
except json.JSONDecodeError:
logger.print_log("Error decoding realmojis.json file.", force=True)
if __name__ == "__main__":
run_in_curses()