diff --git a/Cargo.lock b/Cargo.lock index 155a9fc..75e4888 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,18 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "ahash" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy", +] + [[package]] name = "aho-corasick" version = "1.1.4" @@ -85,6 +97,8 @@ dependencies = [ "clap", "hex", "regex", + "rusqlite", + "serde_json", "sha3", "uuid", ] @@ -95,6 +109,12 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "bitflags" +version = "2.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4512299f36f043ab09a583e57bceb5a5aab7a73db1805848e8fef3c9e8c78b3" + [[package]] name = "block-buffer" version = "0.10.4" @@ -220,6 +240,18 @@ dependencies = [ "crypto-common", ] +[[package]] +name = "fallible-iterator" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" + +[[package]] +name = "fallible-streaming-iterator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" + [[package]] name = "find-msvc-tools" version = "0.1.4" @@ -248,6 +280,24 @@ dependencies = [ "wasi", ] +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +dependencies = [ + "ahash", +] + +[[package]] +name = "hashlink" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af" +dependencies = [ + "hashbrown", +] + [[package]] name = "heck" version = "0.5.0" @@ -290,6 +340,12 @@ version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" +[[package]] +name = "itoa" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" + [[package]] name = "js-sys" version = "0.3.81" @@ -315,6 +371,17 @@ version = "0.2.177" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2874a2af47a2325c2001a6e6fad9b16a53b802102b528163885171cf92b15976" +[[package]] +name = "libsqlite3-sys" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "log" version = "0.4.28" @@ -348,6 +415,12 @@ version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4895175b425cb1f87721b59f0f286c2092bd4af812243672510e1ac53e2e0ad" +[[package]] +name = "pkg-config" +version = "0.3.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19f132c84eca552bf34cab8ec81f1c1dcc229b811638f9d283dceabe58c5569e" + [[package]] name = "proc-macro2" version = "1.0.101" @@ -401,12 +474,68 @@ version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58" +[[package]] +name = "rusqlite" +version = "0.32.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7753b721174eb8ff87a9a0e799e2d7bc3749323e773db92e0984debb00019d6e" +dependencies = [ + "bitflags", + "fallible-iterator", + "fallible-streaming-iterator", + "hashlink", + "libsqlite3-sys", + "smallvec", +] + [[package]] name = "rustversion" version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" +[[package]] +name = "serde" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" +dependencies = [ + "serde_core", +] + +[[package]] +name = "serde_core" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.150" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8014e44b4736ed0538adeecded0fce2a272f22dc9578a7eb6b2d9993c74cfb9" +dependencies = [ + "itoa", + "memchr", + "serde", + "serde_core", + "zmij", +] + [[package]] name = "sha3" version = "0.10.8" @@ -423,6 +552,12 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "smallvec" +version = "1.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" + [[package]] name = "strsim" version = "0.11.1" @@ -469,6 +604,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.5" @@ -690,3 +831,29 @@ name = "wit-bindgen" version = "0.46.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" + +[[package]] +name = "zerocopy" +version = "0.8.48" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eed437bf9d6692032087e337407a86f04cd8d6a16a37199ed57949d415bd68e9" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.8.48" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70e3cd084b1788766f53af483dd21f93881ff30d7320490ec3ef7526d203bad4" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "zmij" +version = "1.0.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8848ee67ecc8aedbaf3e4122217aff892639231befc6a1b58d29fff4c2cabaa" diff --git a/Cargo.toml b/Cargo.toml index f40ba88..5b0d0aa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,5 +9,7 @@ chrono = "0.4.42" clap = { version = "4.5.48", features = ["derive"] } hex = "0.4.3" regex = "1.12.2" +rusqlite = { version = "0.32.1", features = ["bundled"] } +serde_json = "1.0.132" sha3 = "0.10.8" uuid = { version = "1.18.1", features = ["v4"] } diff --git a/docs/PLAN.md b/docs/PLAN.md new file mode 100644 index 0000000..b61b357 --- /dev/null +++ b/docs/PLAN.md @@ -0,0 +1,111 @@ +# Archivr Database Design Plan + +## Summary +Design the first database as a `SQLite` metadata/index layer for the existing file-based archive store, while making the schema multi-user and public-archive ready from day one. The filesystem remains the source of truth for bytes and rendered archive output; the database becomes the source of truth for users, roles, archive runs, archived entries, visibility, hierarchy, blob reuse, and organization. + +Each successfully archived thing becomes its own archived entry. Re-archiving the same source creates a new archived entry row, while deduplicated raw files continue to reuse the same blob rows underneath. + +## Key Changes +### Identity, access, and visibility +- `users` + - Columns: stable public `user_uid`, `username`, `email` nullable, `password_hash`, `status`, `role`, `created_at`, `last_login_at` nullable. + - Roles: `admin`, `user`. +- `instance_settings` + - Global booleans for `public_index_enabled`, `public_entry_content_enabled`, `public_archive_submission_enabled`. + - Defaults all `false`. +- `archived_entries` + - Add `created_by_user_id`, `owned_by_user_id`, `visibility`. + - `visibility` values: `private`, `unlisted`, `public`. +- `archive_runs` + - Add `created_by_user_id`. +- Do not add groups or per-entry ACL tables in v1; keep the schema portable enough to add them later. + +### Core archive model +- `archive_runs` + - One user-started archive operation. + - Columns: stable public `run_uid`, `created_by_user_id`, `started_at`, `finished_at`, `status`, `requested_count`, `discovered_count`, `completed_count`, `failed_count`, `error_summary`. +- `archive_run_items` + - One requested or discovered work item inside an archive run. + - Columns: `run_id`, stable `item_uid`, `parent_item_id` nullable, `ordinal`, `requested_locator`, `canonical_locator` nullable, `source_kind`, `entity_kind`, `status`, `error_text`, `produced_entry_id` nullable. + - Supports batch requests and container expansion with progress like `0/14`. +- `source_identities` + - Canonical identity of the thing being archived across re-archives. + - Columns: `source_kind`, `entity_kind`, `external_id` nullable, `canonical_url` nullable, `normalized_locator`, `identity_key`. + - Unique constraint on `identity_key`. +- `archived_entries` + - One archived thing shown in the archive. + - Columns: stable public `entry_uid`, `source_identity_id`, `archive_run_id`, `parent_entry_id` nullable, `root_entry_id`, `created_by_user_id`, `owned_by_user_id`, `source_kind`, `entity_kind`, `title` nullable, `visibility`, `archived_at`, `original_published_at` nullable, `structured_root_relpath`, `representation_kind`, `source_metadata_json`, `display_metadata_json` nullable. + - `structured_root_relpath` is required and points to one root under `structured//`. + - Main archive view queries only rows with `parent_entry_id IS NULL`. + - Child entries remain first-class rows but are nested under the parent in the main view. +- `blobs` + - One deduplicated raw file in `raw/`. + - Columns: `sha256`, `byte_size`, `mime_type` nullable, `extension` nullable, `raw_relpath`, `created_at`. +- `entry_artifacts` + - Selective file pointers attached to an archived entry. + - Columns: `entry_id`, `artifact_role`, `storage_area`, `relpath`, `blob_id` nullable, `logical_path` nullable, `metadata_json` nullable. + - `storage_area`: `raw`, `raw_tweets`, `structured`. + - Store important files only: primary media, raw tweet JSON, avatar, subtitle, thumbnail, manifest, cover image. + +### Organization and extensibility +- `taxonomy_nodes` + - Hierarchical organization tree. + - Columns: stable `node_uid`, `parent_id` nullable, `name`, `slug`, `full_path`. + - `full_path` unique, example `/sciences/computer-science/compilers`. +- `entry_taxonomy_assignments` + - Many-to-many link between archived entries and taxonomy nodes. + - Assign the most specific node; ancestor membership is derived via recursive queries. +- Keep shared fields relational and source-specific details in `source_metadata_json`. + - YouTube examples: `video_id`, `channel_id`, duration, playlist membership. + - Tweet examples: `tweet_id`, `author_handle`, conversation ID, text summary fields. + - Do not create per-source tables in v1. + +### Public/archive access behavior implied by schema +- Public archive browsing is controlled by both instance settings and entry visibility. +- `public` entries are eligible for anonymous listing/viewing only when instance-level public settings allow it. +- `unlisted` entries are not shown in public indexes but can be directly served later by URL/token design. +- `private` entries are visible only to authorized users. +- Ownership is recorded now even if the first UI only exposes simple admin/user behavior. + +## Public APIs / Interfaces +- `archivr init` + - Create the SQLite database and schema alongside the existing archive metadata directory. + - Keep existing store directories. +- `archivr archive` + - Start one `archive_run` owned by a user. + - Insert one or more `archive_run_items`. + - On success, create one or more `archived_entries`. + - Link reused raw files through `blobs` and `entry_artifacts`. + - Record the entry’s `structured_root_relpath`, visibility, and source metadata JSON. +- New persisted domain types + - `User` + - `ArchiveRun` + - `ArchiveRunItem` + - `ArchivedEntry` + - `SourceIdentity` + - `Blob` + - `EntryArtifact` + - `TaxonomyNode` + - `InstanceSettings` + +## Test Plan +- Re-archiving the same YouTube video creates two `archived_entries`, one shared `source_identity`, and one shared primary `blob`. +- Archiving a tweet/thread creates one archived entry, records the raw tweet JSON as an `entry_artifact` in `raw_tweets`, and links downloaded media/avatar blobs correctly. +- Archiving a playlist/channel creates one top-level parent entry plus child entries; the main archive query returns only the parent. +- A single archive run with multiple requested locators records multiple run items and correct progress counters. +- A normal user can create entries but cannot manage other users or instance-wide public settings. +- An admin can manage users and instance-wide public settings. +- A `public` entry is still hidden from anonymous users when `public_index_enabled` or `public_entry_content_enabled` is disabled at the instance level. +- A `private` entry never appears in anonymous/public queries. +- Assigning `/sciences/computer-science/compilers` makes the item discoverable through ancestor queries for `sciences` and `computer-science`. +- A website-style entry can be represented as one archived entry with one structured root and no per-asset DB explosion. + +## Assumptions +- SQLite is the only target for the first implementation, but the schema should avoid SQLite-only modeling that would block a later Postgres migration. +- The database indexes archive metadata; archive bytes stay on disk. +- Every archived entry gets a stable public ID used for `structured//`; timestamps are metadata, not identity. +- `raw_tweets/` remains a valid sibling storage area and is referenced through `entry_artifacts`. +- Titles are optional and nullable. +- Search, FTS, subtitles, transcript indexing, groups, and per-entry ACL sharing are deferred. +- Organization uses hierarchical taxonomy only for now; free-form tags are out of scope. +- The first permissions model matches the simpler ArchiveBox-style shape: admins, normal users, and optional public visibility, without custom group policy in v1. diff --git a/flake.nix b/flake.nix index a050caa..3978a04 100644 --- a/flake.nix +++ b/flake.nix @@ -63,7 +63,7 @@ pname = "archivr"; version = "0.1.0"; src = pkgs.lib.cleanSource ./.; - cargoHash = "sha256-4m+4SMYA/rJ0eHEOc32zA2VdZI1pqzB5NenD0R0f2zM="; + cargoHash = ""; nativeBuildInputs = [ pkgs.pkg-config ]; }; archivr = pkgs.stdenv.mkDerivation { diff --git a/src/database.rs b/src/database.rs new file mode 100644 index 0000000..d9953bc --- /dev/null +++ b/src/database.rs @@ -0,0 +1,1003 @@ +use anyhow::{Context, Result, bail}; +use chrono::Utc; +use rusqlite::{Connection, OptionalExtension, params}; +use std::path::{Path, PathBuf}; +use uuid::Uuid; + +pub const DATABASE_FILE_NAME: &str = "archivr.sqlite"; +pub const DEFAULT_USERNAME: &str = "local-admin"; + +#[derive(Debug, Clone)] +pub struct ArchiveRun { + pub id: i64, + pub run_uid: String, +} + +#[derive(Debug, Clone)] +pub struct ArchiveRunItem { + pub id: i64, + pub item_uid: String, +} + +#[derive(Debug, Clone)] +pub struct ArchivedEntry { + pub id: i64, + pub entry_uid: String, + pub structured_root_relpath: String, +} + +#[derive(Debug, Clone)] +pub struct BlobRecord { + pub sha256: String, + pub byte_size: i64, + pub mime_type: Option, + pub extension: Option, + pub raw_relpath: String, +} + +#[derive(Debug, Clone)] +pub struct NewEntry { + pub source_identity_id: i64, + pub archive_run_id: i64, + pub parent_entry_id: Option, + pub root_entry_id: Option, + pub created_by_user_id: i64, + pub owned_by_user_id: i64, + pub source_kind: String, + pub entity_kind: String, + pub title: Option, + pub visibility: String, + pub representation_kind: String, + pub source_metadata_json: String, + pub display_metadata_json: Option, +} + +#[derive(Debug, Clone)] +pub struct NewArtifact { + pub entry_id: i64, + pub artifact_role: String, + pub storage_area: String, + pub relpath: String, + pub blob_id: Option, + pub logical_path: Option, + pub metadata_json: Option, +} + +pub fn database_path(archive_path: &Path) -> PathBuf { + archive_path.join(DATABASE_FILE_NAME) +} + +pub fn open_or_initialize(archive_path: &Path) -> Result { + let conn = Connection::open(database_path(archive_path)).with_context(|| { + format!( + "failed to open archive database in {}", + archive_path.display() + ) + })?; + initialize_schema(&conn)?; + Ok(conn) +} + +pub fn initialize_schema(conn: &Connection) -> Result<()> { + conn.pragma_update(None, "journal_mode", "WAL")?; + conn.pragma_update(None, "foreign_keys", "ON")?; + conn.execute_batch( + r#" + CREATE TABLE IF NOT EXISTS users ( + id INTEGER PRIMARY KEY, + user_uid TEXT NOT NULL UNIQUE, + username TEXT NOT NULL UNIQUE, + email TEXT UNIQUE, + password_hash TEXT NOT NULL, + status TEXT NOT NULL CHECK (status IN ('active', 'disabled')), + role TEXT NOT NULL CHECK (role IN ('admin', 'user')), + created_at TEXT NOT NULL, + last_login_at TEXT + ); + + CREATE TABLE IF NOT EXISTS instance_settings ( + id INTEGER PRIMARY KEY CHECK (id = 1), + public_index_enabled INTEGER NOT NULL DEFAULT 0 CHECK (public_index_enabled IN (0, 1)), + public_entry_content_enabled INTEGER NOT NULL DEFAULT 0 CHECK (public_entry_content_enabled IN (0, 1)), + public_archive_submission_enabled INTEGER NOT NULL DEFAULT 0 CHECK (public_archive_submission_enabled IN (0, 1)) + ); + + INSERT OR IGNORE INTO instance_settings ( + id, + public_index_enabled, + public_entry_content_enabled, + public_archive_submission_enabled + ) VALUES (1, 0, 0, 0); + + CREATE TABLE IF NOT EXISTS archive_runs ( + id INTEGER PRIMARY KEY, + run_uid TEXT NOT NULL UNIQUE, + created_by_user_id INTEGER NOT NULL REFERENCES users(id), + started_at TEXT NOT NULL, + finished_at TEXT, + status TEXT NOT NULL CHECK (status IN ('in_progress', 'completed', 'failed')), + requested_count INTEGER NOT NULL DEFAULT 0, + discovered_count INTEGER NOT NULL DEFAULT 0, + completed_count INTEGER NOT NULL DEFAULT 0, + failed_count INTEGER NOT NULL DEFAULT 0, + error_summary TEXT + ); + + CREATE TABLE IF NOT EXISTS archive_run_items ( + id INTEGER PRIMARY KEY, + run_id INTEGER NOT NULL REFERENCES archive_runs(id) ON DELETE CASCADE, + item_uid TEXT NOT NULL UNIQUE, + parent_item_id INTEGER REFERENCES archive_run_items(id), + ordinal INTEGER NOT NULL, + requested_locator TEXT NOT NULL, + canonical_locator TEXT, + source_kind TEXT NOT NULL, + entity_kind TEXT NOT NULL, + status TEXT NOT NULL CHECK (status IN ('pending', 'in_progress', 'completed', 'failed')), + error_text TEXT, + produced_entry_id INTEGER REFERENCES archived_entries(id) + ); + + CREATE TABLE IF NOT EXISTS source_identities ( + id INTEGER PRIMARY KEY, + source_kind TEXT NOT NULL, + entity_kind TEXT NOT NULL, + external_id TEXT, + canonical_url TEXT, + normalized_locator TEXT NOT NULL, + identity_key TEXT NOT NULL UNIQUE + ); + + CREATE TABLE IF NOT EXISTS archived_entries ( + id INTEGER PRIMARY KEY, + entry_uid TEXT NOT NULL UNIQUE, + source_identity_id INTEGER NOT NULL REFERENCES source_identities(id), + archive_run_id INTEGER NOT NULL REFERENCES archive_runs(id), + parent_entry_id INTEGER REFERENCES archived_entries(id), + root_entry_id INTEGER REFERENCES archived_entries(id), + created_by_user_id INTEGER NOT NULL REFERENCES users(id), + owned_by_user_id INTEGER NOT NULL REFERENCES users(id), + source_kind TEXT NOT NULL, + entity_kind TEXT NOT NULL, + title TEXT, + visibility TEXT NOT NULL CHECK (visibility IN ('private', 'unlisted', 'public')), + archived_at TEXT NOT NULL, + original_published_at TEXT, + structured_root_relpath TEXT NOT NULL, + representation_kind TEXT NOT NULL, + source_metadata_json TEXT NOT NULL DEFAULT '{}', + display_metadata_json TEXT + ); + + CREATE TABLE IF NOT EXISTS blobs ( + id INTEGER PRIMARY KEY, + sha256 TEXT NOT NULL UNIQUE, + byte_size INTEGER NOT NULL, + mime_type TEXT, + extension TEXT, + raw_relpath TEXT NOT NULL, + created_at TEXT NOT NULL + ); + + CREATE TABLE IF NOT EXISTS entry_artifacts ( + id INTEGER PRIMARY KEY, + entry_id INTEGER NOT NULL REFERENCES archived_entries(id) ON DELETE CASCADE, + artifact_role TEXT NOT NULL, + storage_area TEXT NOT NULL CHECK (storage_area IN ('raw', 'raw_tweets', 'structured')), + relpath TEXT NOT NULL, + blob_id INTEGER REFERENCES blobs(id), + logical_path TEXT, + metadata_json TEXT + ); + + CREATE TABLE IF NOT EXISTS taxonomy_nodes ( + id INTEGER PRIMARY KEY, + node_uid TEXT NOT NULL UNIQUE, + parent_id INTEGER REFERENCES taxonomy_nodes(id), + name TEXT NOT NULL, + slug TEXT NOT NULL, + full_path TEXT NOT NULL UNIQUE + ); + + CREATE TABLE IF NOT EXISTS entry_taxonomy_assignments ( + entry_id INTEGER NOT NULL REFERENCES archived_entries(id) ON DELETE CASCADE, + node_id INTEGER NOT NULL REFERENCES taxonomy_nodes(id) ON DELETE CASCADE, + PRIMARY KEY (entry_id, node_id) + ); + + CREATE INDEX IF NOT EXISTS idx_archive_run_items_run_id ON archive_run_items(run_id); + CREATE INDEX IF NOT EXISTS idx_archived_entries_source_identity_id ON archived_entries(source_identity_id); + CREATE INDEX IF NOT EXISTS idx_archived_entries_created_by_user_id ON archived_entries(created_by_user_id); + CREATE INDEX IF NOT EXISTS idx_archived_entries_parent_entry_id ON archived_entries(parent_entry_id); + CREATE INDEX IF NOT EXISTS idx_archived_entries_root_entry_id ON archived_entries(root_entry_id); + CREATE INDEX IF NOT EXISTS idx_archived_entries_visibility ON archived_entries(visibility); + CREATE INDEX IF NOT EXISTS idx_entry_artifacts_entry_id ON entry_artifacts(entry_id); + CREATE INDEX IF NOT EXISTS idx_entry_artifacts_blob_id ON entry_artifacts(blob_id); + CREATE INDEX IF NOT EXISTS idx_taxonomy_nodes_parent_id ON taxonomy_nodes(parent_id); + "#, + )?; + Ok(()) +} + +pub fn ensure_default_user(conn: &Connection) -> Result { + if let Some(id) = conn + .query_row( + "SELECT id FROM users WHERE username = ?1", + [DEFAULT_USERNAME], + |row| row.get(0), + ) + .optional()? + { + return Ok(id); + } + + conn.execute( + "INSERT INTO users ( + user_uid, username, email, password_hash, status, role, created_at, last_login_at + ) VALUES (?1, ?2, NULL, ?3, 'active', 'admin', ?4, NULL)", + params![ + public_id("usr"), + DEFAULT_USERNAME, + "disabled-local-password", + now_timestamp() + ], + )?; + + Ok(conn.last_insert_rowid()) +} + +pub fn create_archive_run( + conn: &Connection, + created_by_user_id: i64, + requested_count: i64, +) -> Result { + let run_uid = public_id("run"); + conn.execute( + "INSERT INTO archive_runs ( + run_uid, created_by_user_id, started_at, status, requested_count + ) VALUES (?1, ?2, ?3, 'in_progress', ?4)", + params![ + run_uid, + created_by_user_id, + now_timestamp(), + requested_count + ], + )?; + + Ok(ArchiveRun { + id: conn.last_insert_rowid(), + run_uid, + }) +} + +pub fn create_archive_run_item( + conn: &Connection, + run_id: i64, + parent_item_id: Option, + ordinal: i64, + requested_locator: &str, + canonical_locator: Option<&str>, + source_kind: &str, + entity_kind: &str, +) -> Result { + let item_uid = public_id("item"); + conn.execute( + "INSERT INTO archive_run_items ( + run_id, item_uid, parent_item_id, ordinal, requested_locator, canonical_locator, + source_kind, entity_kind, status + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, 'in_progress')", + params![ + run_id, + item_uid, + parent_item_id, + ordinal, + requested_locator, + canonical_locator, + source_kind, + entity_kind + ], + )?; + + Ok(ArchiveRunItem { + id: conn.last_insert_rowid(), + item_uid, + }) +} + +pub fn complete_archive_run_item( + conn: &Connection, + item_id: i64, + produced_entry_id: i64, +) -> Result<()> { + conn.execute( + "UPDATE archive_run_items + SET status = 'completed', produced_entry_id = ?1, error_text = NULL + WHERE id = ?2", + params![produced_entry_id, item_id], + )?; + refresh_run_counters(conn, run_id_for_item(conn, item_id)?)?; + Ok(()) +} + +pub fn fail_archive_run_item(conn: &Connection, item_id: i64, error_text: &str) -> Result<()> { + conn.execute( + "UPDATE archive_run_items + SET status = 'failed', error_text = ?1 + WHERE id = ?2", + params![error_text, item_id], + )?; + refresh_run_counters(conn, run_id_for_item(conn, item_id)?)?; + Ok(()) +} + +pub fn finish_archive_run(conn: &Connection, run_id: i64) -> Result<()> { + refresh_run_counters(conn, run_id)?; + let failed_count: i64 = conn.query_row( + "SELECT failed_count FROM archive_runs WHERE id = ?1", + [run_id], + |row| row.get(0), + )?; + let status = if failed_count > 0 { + "failed" + } else { + "completed" + }; + conn.execute( + "UPDATE archive_runs SET status = ?1, finished_at = ?2 WHERE id = ?3", + params![status, now_timestamp(), run_id], + )?; + Ok(()) +} + +pub fn fail_archive_run(conn: &Connection, run_id: i64, error_summary: &str) -> Result<()> { + refresh_run_counters(conn, run_id)?; + conn.execute( + "UPDATE archive_runs + SET status = 'failed', finished_at = ?1, error_summary = ?2 + WHERE id = ?3", + params![now_timestamp(), error_summary, run_id], + )?; + Ok(()) +} + +pub fn upsert_source_identity( + conn: &Connection, + source_kind: &str, + entity_kind: &str, + external_id: Option<&str>, + canonical_url: Option<&str>, + normalized_locator: &str, +) -> Result { + let identity_key = identity_key( + source_kind, + entity_kind, + external_id, + canonical_url, + normalized_locator, + ); + conn.execute( + "INSERT OR IGNORE INTO source_identities ( + source_kind, entity_kind, external_id, canonical_url, normalized_locator, identity_key + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6)", + params![ + source_kind, + entity_kind, + external_id, + canonical_url, + normalized_locator, + identity_key + ], + )?; + + let id = conn.query_row( + "SELECT id FROM source_identities WHERE identity_key = ?1", + [identity_key], + |row| row.get(0), + )?; + Ok(id) +} + +pub fn upsert_blob(conn: &Connection, blob: &BlobRecord) -> Result { + conn.execute( + "INSERT OR IGNORE INTO blobs ( + sha256, byte_size, mime_type, extension, raw_relpath, created_at + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6)", + params![ + blob.sha256, + blob.byte_size, + blob.mime_type, + blob.extension, + blob.raw_relpath, + now_timestamp() + ], + )?; + + let id = conn.query_row( + "SELECT id FROM blobs WHERE sha256 = ?1", + [blob.sha256.as_str()], + |row| row.get(0), + )?; + Ok(id) +} + +pub fn create_archived_entry(conn: &Connection, entry: &NewEntry) -> Result { + validate_visibility(&entry.visibility)?; + let entry_uid = public_id("entry"); + let structured_root_relpath = format!("structured/{entry_uid}"); + + conn.execute( + "INSERT INTO archived_entries ( + entry_uid, source_identity_id, archive_run_id, parent_entry_id, root_entry_id, + created_by_user_id, owned_by_user_id, source_kind, entity_kind, title, visibility, + archived_at, original_published_at, structured_root_relpath, representation_kind, + source_metadata_json, display_metadata_json + ) VALUES ( + ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, + ?12, NULL, ?13, ?14, ?15, ?16 + )", + params![ + entry_uid, + entry.source_identity_id, + entry.archive_run_id, + entry.parent_entry_id, + entry.root_entry_id, + entry.created_by_user_id, + entry.owned_by_user_id, + entry.source_kind, + entry.entity_kind, + entry.title, + entry.visibility, + now_timestamp(), + structured_root_relpath, + entry.representation_kind, + entry.source_metadata_json, + entry.display_metadata_json + ], + )?; + let id = conn.last_insert_rowid(); + + if entry.root_entry_id.is_none() { + conn.execute( + "UPDATE archived_entries SET root_entry_id = ?1 WHERE id = ?1", + [id], + )?; + } + + Ok(ArchivedEntry { + id, + entry_uid, + structured_root_relpath, + }) +} + +pub fn add_entry_artifact(conn: &Connection, artifact: &NewArtifact) -> Result { + conn.execute( + "INSERT INTO entry_artifacts ( + entry_id, artifact_role, storage_area, relpath, blob_id, logical_path, metadata_json + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)", + params![ + artifact.entry_id, + artifact.artifact_role, + artifact.storage_area, + artifact.relpath, + artifact.blob_id, + artifact.logical_path, + artifact.metadata_json + ], + )?; + Ok(conn.last_insert_rowid()) +} + +#[cfg(test)] +pub fn set_public_settings( + conn: &Connection, + public_index_enabled: bool, + public_entry_content_enabled: bool, + public_archive_submission_enabled: bool, +) -> Result<()> { + conn.execute( + "UPDATE instance_settings + SET public_index_enabled = ?1, + public_entry_content_enabled = ?2, + public_archive_submission_enabled = ?3 + WHERE id = 1", + params![ + public_index_enabled as i64, + public_entry_content_enabled as i64, + public_archive_submission_enabled as i64 + ], + )?; + Ok(()) +} + +#[cfg(test)] +pub fn public_index_entry_count(conn: &Connection) -> Result { + let count = conn.query_row( + "SELECT COUNT(*) + FROM archived_entries + WHERE parent_entry_id IS NULL + AND visibility = 'public' + AND (SELECT public_index_enabled FROM instance_settings WHERE id = 1) = 1 + AND (SELECT public_entry_content_enabled FROM instance_settings WHERE id = 1) = 1", + [], + |row| row.get(0), + )?; + Ok(count) +} + +#[cfg(test)] +pub fn main_archive_entry_count(conn: &Connection) -> Result { + let count = conn.query_row( + "SELECT COUNT(*) FROM archived_entries WHERE parent_entry_id IS NULL", + [], + |row| row.get(0), + )?; + Ok(count) +} + +#[cfg(test)] +pub fn create_taxonomy_path(conn: &Connection, full_path: &str) -> Result { + let segments = normalized_taxonomy_segments(full_path)?; + let mut parent_id = None; + let mut current_path = String::new(); + let mut current_id = 0; + + for segment in segments { + current_path.push('/'); + current_path.push_str(segment); + + if let Some(id) = conn + .query_row( + "SELECT id FROM taxonomy_nodes WHERE full_path = ?1", + [current_path.as_str()], + |row| row.get(0), + ) + .optional()? + { + current_id = id; + parent_id = Some(id); + continue; + } + + conn.execute( + "INSERT INTO taxonomy_nodes (node_uid, parent_id, name, slug, full_path) + VALUES (?1, ?2, ?3, ?4, ?5)", + params![ + public_id("node"), + parent_id, + humanize_slug(segment), + segment, + current_path + ], + )?; + current_id = conn.last_insert_rowid(); + parent_id = Some(current_id); + } + + Ok(current_id) +} + +#[cfg(test)] +pub fn assign_entry_to_taxonomy(conn: &Connection, entry_id: i64, node_id: i64) -> Result<()> { + conn.execute( + "INSERT OR IGNORE INTO entry_taxonomy_assignments (entry_id, node_id) + VALUES (?1, ?2)", + params![entry_id, node_id], + )?; + Ok(()) +} + +#[cfg(test)] +pub fn entry_count_for_taxonomy_path(conn: &Connection, full_path: &str) -> Result { + let count = conn.query_row( + "WITH RECURSIVE descendants(id) AS ( + SELECT id FROM taxonomy_nodes WHERE full_path = ?1 + UNION ALL + SELECT child.id + FROM taxonomy_nodes child + JOIN descendants parent ON child.parent_id = parent.id + ) + SELECT COUNT(DISTINCT eta.entry_id) + FROM entry_taxonomy_assignments eta + JOIN descendants d ON eta.node_id = d.id", + [full_path], + |row| row.get(0), + )?; + Ok(count) +} + +fn refresh_run_counters(conn: &Connection, run_id: i64) -> Result<()> { + conn.execute( + "UPDATE archive_runs + SET discovered_count = (SELECT COUNT(*) FROM archive_run_items WHERE run_id = ?1), + completed_count = (SELECT COUNT(*) FROM archive_run_items WHERE run_id = ?1 AND status = 'completed'), + failed_count = (SELECT COUNT(*) FROM archive_run_items WHERE run_id = ?1 AND status = 'failed') + WHERE id = ?1", + [run_id], + )?; + Ok(()) +} + +fn run_id_for_item(conn: &Connection, item_id: i64) -> Result { + let run_id = conn.query_row( + "SELECT run_id FROM archive_run_items WHERE id = ?1", + [item_id], + |row| row.get(0), + )?; + Ok(run_id) +} + +fn public_id(prefix: &str) -> String { + format!("{prefix}_{}", Uuid::new_v4().simple()) +} + +fn now_timestamp() -> String { + Utc::now().to_rfc3339() +} + +fn identity_key( + source_kind: &str, + entity_kind: &str, + external_id: Option<&str>, + canonical_url: Option<&str>, + normalized_locator: &str, +) -> String { + let stable_locator = external_id.or(canonical_url).unwrap_or(normalized_locator); + format!("{source_kind}:{entity_kind}:{stable_locator}") +} + +fn validate_visibility(visibility: &str) -> Result<()> { + match visibility { + "private" | "unlisted" | "public" => Ok(()), + _ => bail!("invalid archived entry visibility: {visibility}"), + } +} + +#[cfg(test)] +fn normalized_taxonomy_segments(full_path: &str) -> Result> { + let segments = full_path + .trim() + .trim_matches('/') + .split('/') + .filter(|segment| !segment.is_empty()) + .collect::>(); + + if segments.is_empty() { + bail!("taxonomy path must contain at least one segment"); + } + + Ok(segments) +} + +#[cfg(test)] +fn humanize_slug(slug: &str) -> String { + slug.split('-') + .map(|part| { + let mut chars = part.chars(); + match chars.next() { + Some(first) => format!("{}{}", first.to_uppercase(), chars.as_str()), + None => String::new(), + } + }) + .collect::>() + .join(" ") +} + +#[cfg(test)] +mod tests { + use super::*; + use std::{ + env, fs, + time::{SystemTime, UNIX_EPOCH}, + }; + + fn conn() -> Connection { + let conn = Connection::open_in_memory().unwrap(); + initialize_schema(&conn).unwrap(); + conn + } + + fn unique_db_path(prefix: &str) -> PathBuf { + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_nanos(); + env::temp_dir().join(format!("{prefix}-{nanos}-{}.sqlite", std::process::id())) + } + + fn create_entry_fixture( + conn: &Connection, + visibility: &str, + parent_entry_id: Option, + root_entry_id: Option, + ) -> ArchivedEntry { + let user_id = ensure_default_user(conn).unwrap(); + let run = create_archive_run(conn, user_id, 1).unwrap(); + let source_id = upsert_source_identity( + conn, + "youtube", + "video", + Some("video-1"), + Some("https://youtube.com/watch?v=video-1"), + "https://youtube.com/watch?v=video-1", + ) + .unwrap(); + + create_archived_entry( + conn, + &NewEntry { + source_identity_id: source_id, + archive_run_id: run.id, + parent_entry_id, + root_entry_id, + created_by_user_id: user_id, + owned_by_user_id: user_id, + source_kind: "youtube".to_string(), + entity_kind: "video".to_string(), + title: None, + visibility: visibility.to_string(), + representation_kind: "video".to_string(), + source_metadata_json: "{}".to_string(), + display_metadata_json: None, + }, + ) + .unwrap() + } + + #[test] + fn schema_defaults_public_settings_to_private() { + let conn = conn(); + let defaults: (i64, i64, i64) = conn + .query_row( + "SELECT public_index_enabled, public_entry_content_enabled, public_archive_submission_enabled + FROM instance_settings WHERE id = 1", + [], + |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)), + ) + .unwrap(); + + assert_eq!(defaults, (0, 0, 0)); + } + + #[test] + fn file_database_uses_wal_journal_mode() { + let path = unique_db_path("archivr-wal-test"); + let conn = Connection::open(&path).unwrap(); + initialize_schema(&conn).unwrap(); + + let journal_mode: String = conn + .query_row("PRAGMA journal_mode", [], |row| row.get(0)) + .unwrap(); + + assert_eq!(journal_mode, "wal"); + + drop(conn); + let _ = fs::remove_file(&path); + let _ = fs::remove_file(path.with_extension("sqlite-wal")); + let _ = fs::remove_file(path.with_extension("sqlite-shm")); + } + + #[test] + fn root_entry_sets_root_id_after_insert() { + let conn = conn(); + let entry = create_entry_fixture(&conn, "private", None, None); + let root_entry_id: i64 = conn + .query_row( + "SELECT root_entry_id FROM archived_entries WHERE id = ?1", + [entry.id], + |row| row.get(0), + ) + .unwrap(); + + assert_eq!(root_entry_id, entry.id); + } + + #[test] + fn rearchiving_reuses_source_identity_and_blob_but_creates_entries() { + let conn = conn(); + let user_id = ensure_default_user(&conn).unwrap(); + let blob = BlobRecord { + sha256: "abc123".to_string(), + byte_size: 123, + mime_type: Some("video/mp4".to_string()), + extension: Some("mp4".to_string()), + raw_relpath: "raw/a/b/abc123.mp4".to_string(), + }; + let blob_id = upsert_blob(&conn, &blob).unwrap(); + let duplicate_blob_id = upsert_blob(&conn, &blob).unwrap(); + assert_eq!(blob_id, duplicate_blob_id); + + let first_source_id = upsert_source_identity( + &conn, + "youtube", + "video", + Some("video-1"), + Some("https://youtube.com/watch?v=video-1"), + "https://youtube.com/watch?v=video-1", + ) + .unwrap(); + let second_source_id = upsert_source_identity( + &conn, + "youtube", + "video", + Some("video-1"), + Some("https://youtube.com/watch?v=video-1"), + "https://youtube.com/watch?v=video-1", + ) + .unwrap(); + assert_eq!(first_source_id, second_source_id); + + for _ in 0..2 { + let run = create_archive_run(&conn, user_id, 1).unwrap(); + let entry = create_archived_entry( + &conn, + &NewEntry { + source_identity_id: first_source_id, + archive_run_id: run.id, + parent_entry_id: None, + root_entry_id: None, + created_by_user_id: user_id, + owned_by_user_id: user_id, + source_kind: "youtube".to_string(), + entity_kind: "video".to_string(), + title: None, + visibility: "private".to_string(), + representation_kind: "video".to_string(), + source_metadata_json: "{}".to_string(), + display_metadata_json: None, + }, + ) + .unwrap(); + add_entry_artifact( + &conn, + &NewArtifact { + entry_id: entry.id, + artifact_role: "primary_media".to_string(), + storage_area: "raw".to_string(), + relpath: blob.raw_relpath.clone(), + blob_id: Some(blob_id), + logical_path: None, + metadata_json: None, + }, + ) + .unwrap(); + } + + let entry_count: i64 = conn + .query_row("SELECT COUNT(*) FROM archived_entries", [], |row| { + row.get(0) + }) + .unwrap(); + let source_count: i64 = conn + .query_row("SELECT COUNT(*) FROM source_identities", [], |row| { + row.get(0) + }) + .unwrap(); + let blob_count: i64 = conn + .query_row("SELECT COUNT(*) FROM blobs", [], |row| row.get(0)) + .unwrap(); + + assert_eq!(entry_count, 2); + assert_eq!(source_count, 1); + assert_eq!(blob_count, 1); + } + + #[test] + fn source_identity_key_prefers_external_id_over_shared_canonical_url() { + let conn = conn(); + let first_source_id = upsert_source_identity( + &conn, + "x", + "tweet", + Some("tweet-1"), + Some("https://x.com/some-profile"), + "https://x.com/some-profile/status/tweet-1", + ) + .unwrap(); + let second_source_id = upsert_source_identity( + &conn, + "x", + "tweet", + Some("tweet-2"), + Some("https://x.com/some-profile"), + "https://x.com/some-profile/status/tweet-2", + ) + .unwrap(); + + assert_ne!(first_source_id, second_source_id); + } + + #[test] + fn run_items_refresh_progress_counters() { + let conn = conn(); + let user_id = ensure_default_user(&conn).unwrap(); + let run = create_archive_run(&conn, user_id, 2).unwrap(); + let source_id = + upsert_source_identity(&conn, "local", "file", None, None, "file:///a").unwrap(); + let entry = create_archived_entry( + &conn, + &NewEntry { + source_identity_id: source_id, + archive_run_id: run.id, + parent_entry_id: None, + root_entry_id: None, + created_by_user_id: user_id, + owned_by_user_id: user_id, + source_kind: "local".to_string(), + entity_kind: "file".to_string(), + title: None, + visibility: "private".to_string(), + representation_kind: "file".to_string(), + source_metadata_json: "{}".to_string(), + display_metadata_json: None, + }, + ) + .unwrap(); + let first = + create_archive_run_item(&conn, run.id, None, 0, "file:///a", None, "local", "file") + .unwrap(); + let second = + create_archive_run_item(&conn, run.id, None, 1, "file:///b", None, "local", "file") + .unwrap(); + + complete_archive_run_item(&conn, first.id, entry.id).unwrap(); + fail_archive_run_item(&conn, second.id, "copy failed").unwrap(); + finish_archive_run(&conn, run.id).unwrap(); + + let counters: (i64, i64, i64, String) = conn + .query_row( + "SELECT discovered_count, completed_count, failed_count, status + FROM archive_runs WHERE id = ?1", + [run.id], + |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)), + ) + .unwrap(); + + assert_eq!(counters, (2, 1, 1, "failed".to_string())); + } + + #[test] + fn main_archive_query_only_counts_roots() { + let conn = conn(); + let parent = create_entry_fixture(&conn, "private", None, None); + let _child = create_entry_fixture(&conn, "private", Some(parent.id), Some(parent.id)); + + assert_eq!(main_archive_entry_count(&conn).unwrap(), 1); + } + + #[test] + fn public_entries_require_instance_flags_and_public_visibility() { + let conn = conn(); + let _public = create_entry_fixture(&conn, "public", None, None); + let _private = create_entry_fixture(&conn, "private", None, None); + + assert_eq!(public_index_entry_count(&conn).unwrap(), 0); + + set_public_settings(&conn, true, false, false).unwrap(); + assert_eq!(public_index_entry_count(&conn).unwrap(), 0); + + set_public_settings(&conn, true, true, false).unwrap(); + assert_eq!(public_index_entry_count(&conn).unwrap(), 1); + } + + #[test] + fn taxonomy_assignments_are_discoverable_through_ancestors() { + let conn = conn(); + let entry = create_entry_fixture(&conn, "private", None, None); + let node_id = create_taxonomy_path(&conn, "/sciences/computer-science/compilers").unwrap(); + assign_entry_to_taxonomy(&conn, entry.id, node_id).unwrap(); + + assert_eq!( + entry_count_for_taxonomy_path(&conn, "/sciences/computer-science/compilers").unwrap(), + 1 + ); + assert_eq!( + entry_count_for_taxonomy_path(&conn, "/sciences/computer-science").unwrap(), + 1 + ); + assert_eq!( + entry_count_for_taxonomy_path(&conn, "/sciences").unwrap(), + 1 + ); + } +} diff --git a/src/main.rs b/src/main.rs index 177194b..22ce63d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,12 +1,15 @@ -use anyhow::Result; +use anyhow::{Context, Result}; use chrono::Local; use clap::{Parser, Subcommand}; +use serde_json::json; use std::{ + collections::HashSet, env, fs, path::{Path, PathBuf}, process, }; +mod database; mod downloader; mod hash; mod twitter; @@ -54,17 +57,17 @@ enum Command { }, } -fn get_archive_path() -> Option { - let mut dir = env::current_dir().unwrap(); +fn get_archive_path() -> Result> { + let mut dir = env::current_dir().context("failed to read current working directory")?; loop { if dir.join(".archivr").is_dir() { - return Some(dir.join(".archivr")); + return Ok(Some(dir.join(".archivr"))); } if !dir.pop() { break; } } - None + Ok(None) } #[derive(Debug, PartialEq, Eq, Clone, Copy)] @@ -88,13 +91,9 @@ use crate::twitter::parse_tweet_id; fn expand_shorthand_to_url(path: &str, source: &Source) -> String { if *source == Source::X && (path.starts_with("tweet:media:") || path.starts_with("x:media:")) { - return format!( - "https://x.com/i/status/{}", - path.split(':') - .next_back() - .and_then(parse_tweet_id) - .unwrap() - ); + if let Some(tweet_id) = path.split(':').next_back().and_then(parse_tweet_id) { + return format!("https://x.com/i/status/{tweet_id}"); + } } if let Some(path) = path.strip_prefix("instagram:") { @@ -221,7 +220,8 @@ fn determine_source(path: &str) -> Source { return Source::Local; } else if path.starts_with("http://") || path.starts_with("https://") { // Video URLs (watch, youtu.be, shorts) - let video_re = regex::Regex::new(r"^https?://(?:www\.)?(?:youtu\.be/[0-9A-Za-z_-]+|youtube\.com/watch\?v=[0-9A-Za-z_-]+|youtube\.com/shorts/[0-9A-Za-z_-]+)").unwrap(); + let video_re = regex::Regex::new(r"^https?://(?:www\.)?(?:youtu\.be/[0-9A-Za-z_-]+|youtube\.com/watch\?v=[0-9A-Za-z_-]+|youtube\.com/shorts/[0-9A-Za-z_-]+)") + .expect("YouTube video URL regex literal must be valid"); if video_re.is_match(path) { return Source::YouTubeVideo; } @@ -229,13 +229,14 @@ fn determine_source(path: &str) -> Source { // Playlist URLs let playlist_re = regex::Regex::new(r"^https?://(?:www\.)?youtube\.com/playlist\?list=[0-9A-Za-z_-]+") - .unwrap(); + .expect("YouTube playlist URL regex literal must be valid"); if playlist_re.is_match(path) { return Source::YouTubePlaylist; } // Channel or user URLs (channel IDs, /c/, /user/, or @handles) - let channel_re = regex::Regex::new(r"^https?://(?:www\.)?youtube\.com/(?:channel/[0-9A-Za-z_-]+|c/[0-9A-Za-z_-]+|user/[0-9A-Za-z_-]+|@[0-9A-Za-z_-]+)").unwrap(); + let channel_re = regex::Regex::new(r"^https?://(?:www\.)?youtube\.com/(?:channel/[0-9A-Za-z_-]+|c/[0-9A-Za-z_-]+|user/[0-9A-Za-z_-]+|@[0-9A-Za-z_-]+)") + .expect("YouTube channel URL regex literal must be valid"); if channel_re.is_match(path) { return Source::YouTubeChannel; } @@ -291,52 +292,26 @@ fn determine_source(path: &str) -> Source { Source::Other } -fn hash_exists(filename: String, store_path: &Path) -> bool { - let mut chars = filename.chars(); - let first_letter = chars.next().unwrap(); - let second_letter = chars.next().unwrap(); - - let path = store_path - .join("raw") - .join(first_letter.to_string()) - .join(second_letter.to_string()) - .join(filename); +fn hash_exists(hash: &str, file_extension: &str, store_path: &Path) -> Result { + let path = store_path.join(raw_relative_path_from_hash(hash, file_extension)?); println!("Checking {}", path.display()); - path.exists() + Ok(path.exists()) } -fn move_temp_to_raw(file: &Path, hash: &String, store_path: &Path) -> Result<()> { - let mut chars = hash.chars(); - let first_letter = chars.next().unwrap().to_string(); - let second_letter = chars.next().unwrap().to_string(); +fn move_temp_to_raw(file: &Path, hash: &str, store_path: &Path) -> Result<()> { let file_extension = file .extension() .map_or(String::new(), |ext| format!(".{}", ext.to_string_lossy())); + let raw_relpath = raw_relative_path_from_hash(hash, &file_extension)?; + let destination = store_path.join(raw_relpath); - fs::create_dir_all( - store_path - .join("raw") - .join(&first_letter) - .join(&second_letter), - )?; + if let Some(parent) = destination.parent() { + fs::create_dir_all(parent)?; + } - fs::rename( - file, - store_path - .join("raw") - .join(&first_letter) - .join(&second_letter) - .join(format!( - "{hash}{}", - if file_extension.is_empty() { - "" - } else { - &file_extension - } - )), - )?; + fs::rename(file, destination)?; Ok(()) } @@ -349,12 +324,298 @@ fn initialize_store_directories(store_path: &Path) -> Result<()> { Ok(()) } +fn raw_relative_path_from_hash(hash: &str, file_extension: &str) -> Result { + let mut chars = hash.chars(); + let first_letter = chars.next().context("hash must not be empty")?; + let second_letter = chars + .next() + .context("hash must be at least two characters")?; + + Ok(PathBuf::from("raw") + .join(first_letter.to_string()) + .join(second_letter.to_string()) + .join(format!("{hash}{file_extension}"))) +} + +fn path_to_store_string(path: &Path) -> String { + path.to_string_lossy().replace('\\', "/") +} + +fn extension_without_dot(file_extension: &str) -> Option { + file_extension + .strip_prefix('.') + .filter(|extension| !extension.is_empty()) + .map(|extension| extension.to_string()) +} + +fn blob_record_for_raw_relpath( + store_path: &Path, + raw_relpath: &Path, +) -> Result { + let absolute_path = store_path.join(raw_relpath); + let file_name = raw_relpath + .file_name() + .and_then(|name| name.to_str()) + .context("raw artifact path must have a UTF-8 file name")?; + let (sha256, extension) = match file_name.rsplit_once('.') { + Some((hash, extension)) => (hash.to_string(), Some(extension.to_string())), + None => (file_name.to_string(), None), + }; + + Ok(database::BlobRecord { + sha256, + byte_size: fs::metadata(&absolute_path) + .with_context(|| format!("failed to stat raw artifact {}", absolute_path.display()))? + .len() as i64, + mime_type: None, + extension, + raw_relpath: path_to_store_string(raw_relpath), + }) +} + +fn source_metadata(source: Source) -> (&'static str, &'static str, &'static str) { + match source { + Source::YouTubeVideo => ("youtube", "video", "video"), + Source::YouTubePlaylist => ("youtube", "playlist", "container"), + Source::YouTubeChannel => ("youtube", "channel", "container"), + Source::X => ("x", "post", "video"), + Source::Tweet => ("x", "tweet", "tweet_json"), + Source::TweetThread => ("x", "tweet_thread", "tweet_json"), + Source::Instagram => ("instagram", "post", "video"), + Source::Facebook => ("facebook", "post", "video"), + Source::TikTok => ("tiktok", "video", "video"), + Source::Reddit => ("reddit", "post", "video"), + Source::Snapchat => ("snapchat", "story", "video"), + Source::Local => ("local", "file", "file"), + Source::Other => ("other", "unknown", "unknown"), + } +} + +fn local_file_extension(path: &str) -> String { + Path::new(path.trim_start_matches("file://")) + .extension() + .map_or(String::new(), |ext| format!(".{}", ext.to_string_lossy())) +} + +fn media_file_extension(source: Source, path: &str) -> String { + match source { + Source::YouTubeVideo + | Source::X + | Source::Instagram + | Source::Facebook + | Source::TikTok + | Source::Reddit + | Source::Snapchat => ".mp4".to_string(), + Source::Local => local_file_extension(path), + _ => String::new(), + } +} + +fn tweet_id_from_archive_path(path: &str) -> Option { + path.split(':').next_back().and_then(parse_tweet_id) +} + +fn create_structured_root(store_path: &Path, entry: &database::ArchivedEntry) -> Result<()> { + debug_assert!(entry.entry_uid.starts_with("entry_")); + fs::create_dir_all(store_path.join(&entry.structured_root_relpath))?; + Ok(()) +} + +fn record_media_entry( + conn: &rusqlite::Connection, + store_path: &Path, + user_id: i64, + run: &database::ArchiveRun, + item: &database::ArchiveRunItem, + requested_locator: &str, + canonical_locator: &str, + source: Source, + hash: &str, + file_extension: &str, + byte_size: i64, +) -> Result { + debug_assert!(run.run_uid.starts_with("run_")); + debug_assert!(item.item_uid.starts_with("item_")); + let (source_kind, entity_kind, representation_kind) = source_metadata(source); + let raw_relpath = raw_relative_path_from_hash(hash, file_extension)?; + let blob = database::BlobRecord { + sha256: hash.to_string(), + byte_size, + mime_type: None, + extension: extension_without_dot(file_extension), + raw_relpath: path_to_store_string(&raw_relpath), + }; + let blob_id = database::upsert_blob(conn, &blob)?; + let source_identity_id = database::upsert_source_identity( + conn, + source_kind, + entity_kind, + None, + Some(canonical_locator), + canonical_locator, + )?; + let entry = database::create_archived_entry( + conn, + &database::NewEntry { + source_identity_id, + archive_run_id: run.id, + parent_entry_id: None, + root_entry_id: None, + created_by_user_id: user_id, + owned_by_user_id: user_id, + source_kind: source_kind.to_string(), + entity_kind: entity_kind.to_string(), + title: None, + visibility: "private".to_string(), + representation_kind: representation_kind.to_string(), + source_metadata_json: json!({ + "requested_locator": requested_locator, + "canonical_locator": canonical_locator + }) + .to_string(), + display_metadata_json: None, + }, + )?; + create_structured_root(store_path, &entry)?; + database::add_entry_artifact( + conn, + &database::NewArtifact { + entry_id: entry.id, + artifact_role: "primary_media".to_string(), + storage_area: "raw".to_string(), + relpath: blob.raw_relpath, + blob_id: Some(blob_id), + logical_path: None, + metadata_json: None, + }, + )?; + database::complete_archive_run_item(conn, item.id, entry.id)?; + Ok(entry) +} + +fn record_tweet_entry( + conn: &rusqlite::Connection, + store_path: &Path, + user_id: i64, + run: &database::ArchiveRun, + item: &database::ArchiveRunItem, + requested_locator: &str, + source: Source, + tweet_id: &str, +) -> Result { + debug_assert!(run.run_uid.starts_with("run_")); + debug_assert!(item.item_uid.starts_with("item_")); + let (source_kind, entity_kind, representation_kind) = source_metadata(source); + let canonical_locator = format!("https://x.com/i/status/{tweet_id}"); + let source_identity_id = database::upsert_source_identity( + conn, + source_kind, + entity_kind, + Some(tweet_id), + Some(&canonical_locator), + &canonical_locator, + )?; + let entry = database::create_archived_entry( + conn, + &database::NewEntry { + source_identity_id, + archive_run_id: run.id, + parent_entry_id: None, + root_entry_id: None, + created_by_user_id: user_id, + owned_by_user_id: user_id, + source_kind: source_kind.to_string(), + entity_kind: entity_kind.to_string(), + title: None, + visibility: "private".to_string(), + representation_kind: representation_kind.to_string(), + source_metadata_json: json!({ + "tweet_id": tweet_id, + "requested_locator": requested_locator + }) + .to_string(), + display_metadata_json: None, + }, + )?; + create_structured_root(store_path, &entry)?; + + let tweet_json_relpath = PathBuf::from("raw_tweets").join(format!("tweet-{tweet_id}.json")); + database::add_entry_artifact( + conn, + &database::NewArtifact { + entry_id: entry.id, + artifact_role: "raw_tweet_json".to_string(), + storage_area: "raw_tweets".to_string(), + relpath: path_to_store_string(&tweet_json_relpath), + blob_id: None, + logical_path: None, + metadata_json: None, + }, + )?; + + let tweet_json = fs::read_to_string(store_path.join(&tweet_json_relpath))?; + for (role, raw_relpath) in tweet_raw_artifacts(&tweet_json)? { + let raw_path = PathBuf::from(&raw_relpath); + let blob = blob_record_for_raw_relpath(store_path, &raw_path)?; + let blob_id = database::upsert_blob(conn, &blob)?; + database::add_entry_artifact( + conn, + &database::NewArtifact { + entry_id: entry.id, + artifact_role: role, + storage_area: "raw".to_string(), + relpath: raw_relpath, + blob_id: Some(blob_id), + logical_path: None, + metadata_json: None, + }, + )?; + } + + database::complete_archive_run_item(conn, item.id, entry.id)?; + Ok(entry) +} + +fn tweet_raw_artifacts(tweet_json: &str) -> Result> { + let regex = regex::Regex::new(r#""(avatar_local_path|local_path)": "([^"\n]+)""#)?; + let mut seen = HashSet::new(); + let mut artifacts = Vec::new(); + + for captures in regex.captures_iter(tweet_json) { + let relpath = captures[2].to_string(); + if !relpath.starts_with("raw/") || !seen.insert(relpath.clone()) { + continue; + } + + let role = if &captures[1] == "avatar_local_path" { + "avatar" + } else { + "media" + }; + artifacts.push((role.to_string(), relpath)); + } + + Ok(artifacts) +} + +fn fail_archive_and_exit( + conn: &rusqlite::Connection, + run: &database::ArchiveRun, + item: &database::ArchiveRunItem, + message: &str, +) -> ! { + let _ = database::fail_archive_run_item(conn, item.id, message); + let _ = database::fail_archive_run(conn, run.id, message); + eprintln!("{message}"); + process::exit(1); +} + fn main() -> Result<()> { let args = Args::parse(); match args.command { Command::Archive { ref path } => { - let archive_path = match get_archive_path() { + let archive_path = match get_archive_path()? { Some(path) => path, None => { eprintln!("Not in an archive. Use 'archivr init' to create one."); @@ -375,14 +636,42 @@ fn main() -> Result<()> { }; let source = determine_source(path); + let (source_kind, entity_kind, _) = source_metadata(source); + let conn = database::open_or_initialize(&archive_path)?; + let user_id = database::ensure_default_user(&conn)?; + let run = database::create_archive_run(&conn, user_id, 1)?; + let item = database::create_archive_run_item( + &conn, + run.id, + None, + 0, + path, + None, + source_kind, + entity_kind, + )?; // Sources: Tweets or Twitter Threads match source { Source::Other => { - eprintln!("Archiving from this source is not yet implemented."); - process::exit(1); + fail_archive_and_exit( + &conn, + &run, + &item, + "Archiving from this source is not yet implemented.", + ); } Source::Tweet | Source::TweetThread => { + let tweet_id = match tweet_id_from_archive_path(path) { + Some(tweet_id) => tweet_id, + None => fail_archive_and_exit( + &conn, + &run, + &item, + "Failed to archive tweet: invalid tweet ID", + ), + }; + match downloader::tweets::archive( path, source == Source::TweetThread, @@ -390,6 +679,17 @@ fn main() -> Result<()> { ×tamp, ) { Ok(true) => { + record_tweet_entry( + &conn, + &store_path, + user_id, + &run, + &item, + path, + source, + &tweet_id, + )?; + database::finish_archive_run(&conn, run.id)?; println!( "Tweet archived successfully to {}", store_path.join("raw_tweets").display() @@ -397,6 +697,17 @@ fn main() -> Result<()> { return Ok(()); } Ok(false) => { + record_tweet_entry( + &conn, + &store_path, + user_id, + &run, + &item, + path, + source, + &tweet_id, + )?; + database::finish_archive_run(&conn, run.id)?; println!( "Tweet already archived in {}", store_path.join("raw_tweets").display() @@ -404,8 +715,12 @@ fn main() -> Result<()> { return Ok(()); } Err(e) => { - eprintln!("Failed to archive tweet: {e}"); - process::exit(1); + fail_archive_and_exit( + &conn, + &run, + &item, + &format!("Failed to archive tweet: {e}"), + ); } } } @@ -413,6 +728,7 @@ fn main() -> Result<()> { } // Sources, for which yt-dlp is needed + let requested_path = path.to_string(); let path = expand_shorthand_to_url(path, &source); let hash = match source { Source::YouTubeVideo @@ -425,8 +741,12 @@ fn main() -> Result<()> { match downloader::ytdlp::download(path.clone(), &store_path, ×tamp) { Ok(h) => h, Err(e) => { - eprintln!("Failed to download from YouTube: {e}"); - process::exit(1); + fail_archive_and_exit( + &conn, + &run, + &item, + &format!("Failed to download media: {e}"), + ); } } } @@ -434,31 +754,36 @@ fn main() -> Result<()> { match downloader::local::save(path.clone(), &store_path, ×tamp) { Ok(h) => h, Err(e) => { - eprintln!("Failed to archive local file: {e}"); - process::exit(1); + fail_archive_and_exit( + &conn, + &run, + &item, + &format!("Failed to archive local file: {e}"), + ); } } } + Source::YouTubePlaylist | Source::YouTubeChannel => { + fail_archive_and_exit( + &conn, + &run, + &item, + "Playlist and channel container expansion are not yet implemented.", + ); + } _ => unreachable!(), }; - let file_extension = match source { - Source::YouTubeVideo - | Source::X - | Source::Instagram - | Source::Facebook - | Source::TikTok - | Source::Reddit - | Source::Snapchat => ".mp4", - Source::Local => { - let p = Path::new(path.trim_start_matches("file://")); - &p.extension() - .map_or(String::new(), |ext| format!(".{}", ext.to_string_lossy())) - } - _ => "", - }; + let file_extension = media_file_extension(source, &path); + let temp_file = store_path + .join("temp") + .join(×tamp) + .join(format!("{timestamp}{file_extension}")); + let byte_size = fs::metadata(&temp_file) + .with_context(|| format!("failed to stat staged file {}", temp_file.display()))? + .len() as i64; - let hash_exists = hash_exists(format!("{hash}{file_extension}"), &store_path); + let hash_exists = hash_exists(&hash, &file_extension, &store_path)?; // TODO: check for repeated archives? // There could be one of the following: @@ -490,9 +815,20 @@ fn main() -> Result<()> { println!("File archived successfully."); } - // TODO: DB INSERT, inserting a record - // https://github.com/rusqlite/rusqlite - // Think of the DB schema + record_media_entry( + &conn, + &store_path, + user_id, + &run, + &item, + &requested_path, + &path, + source, + &hash, + &file_extension, + byte_size, + )?; + database::finish_archive_run(&conn, run.id)?; Ok(()) } @@ -505,7 +841,9 @@ fn main() -> Result<()> { } => { let archive_path = Path::new(&archive_path_string).join(".archivr"); let store_path = if Path::new(&store_path_string).is_relative() { - env::current_dir().unwrap().join(store_path_string) + env::current_dir() + .context("failed to read current working directory")? + .join(store_path_string) } else { Path::new(store_path_string).to_path_buf() }; @@ -535,14 +873,20 @@ fn main() -> Result<()> { process::exit(1); } - fs::create_dir_all(&archive_path).unwrap(); - fs::create_dir_all(&store_path).unwrap(); - fs::write(archive_path.join("name"), archive_name).unwrap(); - let _ = fs::write( + fs::create_dir_all(&archive_path)?; + fs::create_dir_all(&store_path)?; + fs::write(archive_path.join("name"), archive_name)?; + fs::write( archive_path.join("store_path"), - store_path.canonicalize().unwrap().to_str().unwrap(), - ); - initialize_store_directories(&store_path).unwrap(); + store_path + .canonicalize() + .with_context(|| format!("failed to canonicalize {}", store_path.display()))? + .to_str() + .context("store path is not valid UTF-8")?, + )?; + initialize_store_directories(&store_path)?; + let conn = database::open_or_initialize(&archive_path)?; + let _ = database::ensure_default_user(&conn)?; println!("Initialized empty archive in {}", archive_path.display()); @@ -926,4 +1270,96 @@ mod tests { fs::remove_dir_all(store_path).unwrap(); } + + #[test] + fn test_record_tweet_entry_links_json_and_raw_artifacts() { + let store_path = env::temp_dir().join(format!( + "archivr-tweet-db-test-{}", + Local::now().format("%Y%m%d%H%M%S%3f") + )); + let _ = fs::remove_dir_all(&store_path); + initialize_store_directories(&store_path).unwrap(); + fs::create_dir_all(store_path.join("raw").join("a").join("b")).unwrap(); + fs::create_dir_all(store_path.join("raw").join("c").join("d")).unwrap(); + fs::write( + store_path + .join("raw") + .join("a") + .join("b") + .join("abcdef.jpg"), + b"avatar", + ) + .unwrap(); + fs::write( + store_path + .join("raw") + .join("c") + .join("d") + .join("cdef01.mp4"), + b"media", + ) + .unwrap(); + fs::write( + store_path.join("raw_tweets").join("tweet-123.json"), + r#"{ + "author": { "avatar_local_path": "raw/a/b/abcdef.jpg" }, + "entities": { "media": [{ "local_path": "raw/c/d/cdef01.mp4" }] } +}"#, + ) + .unwrap(); + + let conn = rusqlite::Connection::open_in_memory().unwrap(); + database::initialize_schema(&conn).unwrap(); + let user_id = database::ensure_default_user(&conn).unwrap(); + let run = database::create_archive_run(&conn, user_id, 1).unwrap(); + let item = database::create_archive_run_item( + &conn, + run.id, + None, + 0, + "tweet:123", + None, + "x", + "tweet", + ) + .unwrap(); + + let entry = record_tweet_entry( + &conn, + &store_path, + user_id, + &run, + &item, + "tweet:123", + Source::Tweet, + "123", + ) + .unwrap(); + database::finish_archive_run(&conn, run.id).unwrap(); + + let artifact_count: i64 = conn + .query_row( + "SELECT COUNT(*) FROM entry_artifacts WHERE entry_id = ?1", + [entry.id], + |row| row.get(0), + ) + .unwrap(); + let blob_count: i64 = conn + .query_row("SELECT COUNT(*) FROM blobs", [], |row| row.get(0)) + .unwrap(); + let run_status: String = conn + .query_row( + "SELECT status FROM archive_runs WHERE id = ?1", + [run.id], + |row| row.get(0), + ) + .unwrap(); + + assert_eq!(artifact_count, 3); + assert_eq!(blob_count, 2); + assert_eq!(run_status, "completed"); + assert!(store_path.join(&entry.structured_root_relpath).is_dir()); + + let _ = fs::remove_dir_all(store_path); + } }