BLOG_POST / mogador-scraping

Mogador: enterprise-grade scraping with one beefy workstation

24 min read
4639 words
tl;dr summary

Mogador is a Rust/Python scraping pipeline running on formidable: RabbitMQ queues, Rust workers, MinIO, Elasticsearch, ClickHouse, GPU tagging, embeddings, dedupe, Kafka logging, a friendly osu! ingestion path for ozut, and a compact extension model for new data formats.

Mogador is my media and text ingestion system.

That sentence is technically true and also deeply rude to the amount of machinery involved. Mogador has processed more than 13 million files, currently keeps about 9.18 million indexed documents, stores 11.77 million embedded text rows in ClickHouse, and now sits underneath a bunch of ML experiments that began as “what if I just collected the data locally?” and then quietly became actual infrastructure.

This post documents the system end-to-end: architecture, queues, storage, deduplication, models, concurrency, hardware, observability, the Rust rewrite, osu! ingestion for ozut, and the approximate AWS bill that would appear if I tried to host the same system in the cloud and gave Jeff Bezos my debit card as a little enrichment toy.

A note before we open the box: I am intentionally omitting most upstream data-source names. The article is about system design rather than a source directory. The osu! path is discussed because it is a friendly integration built around public beatmap metadata/download flows and because it directly supports ozut.

Current snapshot

Measured on formidable on 2026-04-29:

ComponentSnapshot
Indexed Elasticsearch documents (posts)9,183,737
Elasticsearch deleted docs awaiting merge496,766
Elasticsearch primary store330.7 GB
Tags index documents231,226
ClickHouse lake.docs rows11,767,862
ClickHouse lake.embeddings rows11,768,211
BGE-M3 vector raw footprint44.89 GiB
ClickHouse embedding table on disk34.20 GiB
ClickHouse document table on disk824.98 MiB
Current media disk usage6.47 TiB used of 7.28 TiB usable
Current MinIO staging disk100 GB used

The main flow is:

  1. Scrapers discover records.
  2. Records are inserted or claimed idempotently.
  3. Download jobs go into RabbitMQ.
  4. Downloader streams media to object storage.
  5. Optimizer normalizes media and deduplicates it.
  6. Tagger, embedder, thumbnailer, and indexer enrich it.
  7. Elasticsearch serves search and retrieval.
  8. ClickHouse keeps the analytical/text lake.
  9. Kafka receives structured logs for audit and replay.

Hardware: formidable

Mogador runs on formidable, the same machine described in Everything I Self-Host at Home.

Current host:

ResourceValue
Hostnameformidable
CPUAMD Ryzen 9 7950X, 16 cores / 32 threads
RAM61 GiB
GPUNVIDIA GeForce RTX 4090, 24 GiB VRAM
Driver595.58.03
OS/rootencrypted btrfs on 1 TB NVMe
Fast work disk1.8 TiB SSD (javelin)
Media tierRAID1 of 8 TB disks, ~7.28 TiB usable
Current media tier usage6.47 TiB used, 0.81 TiB free, 88.9% full

The important architectural choice is treating the GPU as a shared accelerator across the whole pipeline. The same RTX 4090 handles image tagging, text embeddings, thumbnail/transcode work, comment generation experiments, and other local model workloads. The system therefore uses queues and conservative concurrency instead of letting every worker sprint into CUDA like a toddler into traffic.

Storage layout

Mogador uses several storage layers with clearly separated responsibilities:

LayerRole
MinIOS3-compatible staging bucket for raw and intermediate objects
Local filesystemFinal media and thumbnail storage
PostgreSQLscraper state, claims, old post metadata, idempotency tables
Elasticsearchsearch, filtering, ranking, dense-vector retrieval
ClickHousetext lake, embeddings, analytics, long-term feature work
RocksDBoptimizer dedupe state and publish reservations
Kafkastructured event log for audit, replay, and downstream consumers
RabbitMQoperational task queues with quorum durability
etcdlive config snapshots and service-specific overrides

This split sounds like a lot until you look at the responsibilities. RabbitMQ decides what should happen next. Kafka records what happened. PostgreSQL answers have we seen this scraper item before? RocksDB answers is this media duplicate or currently being processed? Elasticsearch answers can the app search this fast? ClickHouse answers can I aggregate this without crying?

Databases are happiest when they are not being forced to do each other’s hobbies.

The high-level pipeline

Source adapters

Postgres claims + checkpoints

RabbitMQ: to_download

osu! metadata sync

downloader-rs

MinIO staging

RabbitMQ: to_optimize

optimizer-rs

RocksDB dedupe

RAID1 media store

RabbitMQ topic exchange

tagger: WD EVA02

embedding: BGE-M3

thumbnailer-rs

RabbitMQ: to_index

ClickHouse lake

indexer-rs bulk writer

Elasticsearch posts

sharded filesystem paths

all services

Kafka structured event log

The major queue names are direct:

QueuePurpose
to_downloadfetch original media or external artifacts
to_optimizenormalize, transcode, deduplicate
to_taggenerate tags with the vision tagger
to_embedembed media/search documents
to_embed_textembed standalone text/comment documents into ClickHouse
to_thumbnailgenerate preview thumbnails
to_indexwrite final documents to Elasticsearch
to_hashbackfill hash/dedupe state
to_commentgenerate synthetic comments for controlled experiments

RabbitMQ queues are declared as quorum queues. That is mildly more expensive than classic queues, but it gives the task layer enough durability that a worker crash is a retry event rather than data loss.

The Rust AMQP helper keeps the publish path explicit:

pub async fn declare_quorum_queue(&self, name: &str) -> BoxResult<()> {
    let mut arguments = FieldTable::default();
    arguments.insert(
        ShortString::from("x-queue-type"),
        AMQPValue::LongString(LongString::from("quorum")),
    );

    self.publish_channel
        .queue_declare(
            name.into(),
            QueueDeclareOptions {
                durable: true,
                auto_delete: false,
                exclusive: false,
                passive: false,
                nowait: false,
            },
            arguments,
        )
        .await?;

    Ok(())
}

The contract is durable queues, JSON payloads, delivery mode 2, message IDs, and workers that know how to retry or dead-letter.

Why the Rust rewrite happened

The early pipeline was mostly Python. That was good for getting the system moving, but bad for long-running workers that need predictable memory, clean cancellation, backpressure, and explicit concurrency.

The rewrite moved the hot operational path to Rust:

ServiceRuntimeRole
scraper kernelRustcommon traits for adapters, checkpoints, claims, publishing
downloaderRuststream downloads into object storage, retry, dead-letter
optimizerRusttranscode, hash, dedupe, publish ready artifacts
thumbnailerRustgenerate preview images/videos
indexerRustbatch Elasticsearch writes and finalize media moves
source adaptersRustsource-specific discovery workers
embeddingPythonBGE-M3 + GPU inference + ClickHouse/Postgres writers
taggerPythonONNXRuntime GPU image/video tagger
commenterPythonlocal LLM comment generation experiments

The Rust services share a kernel built around a few traits:

pub trait NativeAdapter: Send + Sync {
    type Item;
    type Checkpoint: Checkpoint;
    type Record;
    type Message;

    fn item_key(&self, item: &Self::Item) -> String;
    fn checkpoint(&self, item: &Self::Item) -> Self::Checkpoint;
    fn classify(&self, item: Self::Item) -> BoxResult<NativeAction<Self::Record, Self::Message>>;
}

That abstraction is the main reason adding sources became fast. A source adapter only has to answer:

  1. How do I page through upstream records?
  2. What is the stable item key?
  3. What checkpoint advances monotonically?
  4. Should the item be skipped, persisted only, or persisted and published?
  5. Which queue message should be emitted?

Everything else is shared: claims, checkpoints, duplicate handling, publishing, retries, logging, and metrics.

Idempotency: the tiny feature that saves your weekends

Scraping systems mostly die from duplicate work, partial work, or weird half-success states. Mogador treats every item as something that must be claimed before it can be processed.

The generic batch loop does three important things:

match completion_store.claim(&key).await? {
    ClaimOutcome::Completed => {
        report.duplicates += 1;
        advance_checkpoint(checkpoint_store, checkpoint_key, current_checkpoint, checkpoint).await?;
        continue;
    }
    ClaimOutcome::Busy => {
        return Err(KernelError::Contention(
            "item is already being processed by another worker",
        ));
    }
    ClaimOutcome::Claimed => {}
}

A record can be:

StateMeaning
completedalready handled; skip and advance if safe
busyanother worker owns it; stop before producing duplicate side effects
claimedthis worker may proceed

The effect is operationally important: workers can restart, scrapers can overlap, pages can repeat, and duplicate media still collapses to stable state.

Downloading: stream, upload, publish

downloader-rs consumes to_download, checks policy, streams media, uploads to MinIO, and emits to_optimize.

The worker streams and counts bytes instead of loading whole objects into memory. It rejects blacklisted extensions, respects max-size limits, applies per-source cookies when configured, and treats 404s as dead-letter events rather than eternal retry confetti.

The downloader path is:

let downloaded = self
    .http
    .stream(
        &payload.media_url,
        payload.referer.as_deref(),
        &cookies,
        max_bytes,
        &self.metrics,
    )
    .await?;

let s3_key = format!("{}/{}{}", payload.source, payload.post_id, ext);

self.storage
    .upload_stream(
        &s3_key,
        downloaded.body_stream,
        downloaded.content_length,
        downloaded.mime.as_deref(),
    )
    .await?;

self.amqp
    .publish_json(&self.queue_to_optimize, &optimize_payload, None, None)
    .await?;

Downloading stays deliberately mechanical: stream, upload, publish, ack. If it fails, classify the reason and retry or dead-letter it.

Optimizer: dedupe, normalization, and publish safety

The optimizer is the busiest and most important worker. It consumes staged objects, downloads them locally into a temp directory, computes exact and perceptual hashes, reserves a dedupe record, transcodes the media, uploads the normalized result, then publishes downstream messages.

For images:

  • compute BLAKE3 for exact content identity
  • compute perceptual image hashes (aHash, dHash, pHash)
  • normalize still images with libvips into WebP/AVIF-friendly derivatives

For videos:

  • compute BLAKE3
  • sample frames
  • compute perceptual hashes per sampled frame
  • normalize moving media with ffmpeg, including AV1-oriented encodes where that storage/perceptual-quality tradeoff makes sense

The dedupe layer uses RocksDB column families for exact and perceptual states:

const CF_EXACT_PENDING: &str = "exact_pending";
const CF_EXACT_COMMITTED: &str = "exact_committed";
const CF_IMAGE_PENDING: &str = "image_pending";
const CF_IMAGE_COMMITTED: &str = "image_committed";
const CF_VIDEO_PENDING: &str = "video_pending";
const CF_VIDEO_COMMITTED: &str = "video_committed";

A media object can be Pending, ReadyToPublish, Publishing, Published, or Aborted. That state machine matters because dedupe and publishing must be coupled. If two workers see the same media, one of them may reuse the already prepared publish plan rather than recomputing and re-uploading the same artifact.

The optimizer logs stage timings separately:

StageMeaning
downloadfetch staged object from MinIO
hashexact/perceptual hashing
dedupereserve or detect duplicate in RocksDB
transcodeimage/video normalization
uploadwrite normalized output
publishemit downstream messages

That separation is practical. When the system slows down, I can tell whether the villain is disk IO, ffmpeg AV1/video work, libvips image normalization, GPU saturation, MinIO, RocksDB, or a queue that has started making little haunted noises.

Filesystem layout: Git-style fan-out sharding

Final media files and thumbnails use hash fan-out / Git-style directory sharding. A UUID or content-derived key is split into prefix directories so each directory stays small enough for normal filesystem tooling.

Example:

e9/c8/e9c86a66-597c-42da-86ed-977af1438195.mp4

The first two characters become the top-level directory, the next two characters become the second-level directory, and the full filename sits under that. Git uses the same broad idea for object storage: prefix directories keep directory lookups sane and make filesystem tooling less grumpy.

The helper is:

fn fanout_path(root: &Path, id: Uuid, extension: &str) -> PathBuf {
    let name = id.hyphenated().to_string();
    let a = &name[0..2];
    let b = &name[2..4];
    root.join(a).join(b).join(format!("{name}.{extension}"))
}

This optimization matters once the corpus reaches millions of files; directory fan-out keeps lookup and maintenance tooling predictable.

Tagging: WD EVA02 on the 4090

The tagger is Python because the model ecosystem is Python-shaped and I have chosen peace in this tiny corner of the universe.

Current tagger model:

FieldValue
ModelSmilingWolf/wd-eva02-large-tagger-v3
RuntimeONNXRuntime GPU
Image size448
Video sampling0.5 fps
Batch size8
General tag threshold0.35
Character tag threshold0.75

The service consumes to_tag, pulls the media, extracts image/video frames, runs batched inference, writes tag results, and publishes to indexing. The model is a WaifuDiffusion-family visual tagger: it turns an image/frame into ranked semantic tags, characters, style descriptors, and safety-ish categories that are useful for filtering, search facets, dataset export, and later ML training. It exposes Prometheus metrics for model lifecycle, batch size, latency, output tag counts, dead letters, and HTTP admin endpoints.

Model loading is lazy and TTL-based. That lets the GPU memory return to the rest of the workstation when the tagger is idle.

The tagger uses a short microbatching window:

MODEL_REPO_ID = "SmilingWolf/wd-eva02-large-tagger-v3"
IMAGE_SIZE = 448
EXTRACT_FPS = 0.5
BATCH_SIZE = 8
INFERENCE_BATCH_WAIT_MS = 8.0
GENERAL_THRESHOLD = 0.35
CHARACTER_THRESHOLD = 0.75
DEFAULT_MODEL_TTL_SECONDS = 600

Eight milliseconds sounds silly until it saves you from launching one GPU inference per message like a tiny CUDA woodpecker.

Embeddings: BGE-M3 and ClickHouse

The embedding service consumes two streams:

QueueRole
to_embedembeds media-derived search documents: normalized metadata, tags, captions/comments when present, and source-local fields into retrieval text
to_embed_texttext/comment embedding path into ClickHouse

Current embedding model:

FieldValue
ModelBAAI/bge-m3
LibraryFlagEmbedding
Vector dimensions1024
Rows in ClickHouse11,768,211
Raw vector footprint44.89 GiB
Compressed table footprint34.20 GiB

ClickHouse schema:

CREATE TABLE IF NOT EXISTS lake.docs
(
  source     LowCardinality(String),
  source_id  String,
  doc_uuid   UUID,
  updated_at DateTime64(3),
  ingest_at  DateTime64(3),
  author     String,
  text       String CODEC(ZSTD(3)),
  meta_json  String CODEC(ZSTD(3))
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(ingest_at)
ORDER BY (source, source_id, updated_at);

CREATE TABLE IF NOT EXISTS lake.embeddings
(
  model      LowCardinality(String),
  doc_uuid   UUID,
  updated_at DateTime64(3),
  embedding  Array(Float32)
)
ENGINE = MergeTree
PARTITION BY (model, toYYYYMM(updated_at))
ORDER BY (model, doc_uuid);

The text lake is currently compact:

Source classRowsText characters
media-comment corpus A5,263,380394,130,823
media-comment corpus B2,401,321306,184,712
chat/comment corpus1,306,04437,656,367
smaller corpora81,4575,772,478

Again: source names omitted deliberately. The numbers are the interesting part.

The embedder has Prometheus metrics for:

  • per-pipeline message outcomes
  • end-to-end latency
  • per-stage latency
  • cache hit/miss
  • inference batch size
  • microbatch flush reason
  • writer queue depth
  • ClickHouse inserted rows
  • model load events

The internal lesson from running BGE-M3 locally is that embeddings are only cheap if you care about batching. A single 4090 can chew through a lot of text, but if every message calls model.encode() alone, the GPU spends too much time paying overhead. Mogador batches by queue pressure and writes to ClickHouse in batches as well.

Elasticsearch: nine million searchable documents

Elasticsearch currently holds:

IndexDocsStore
posts9,183,737330.7 GB
tags231,22632.4 MB

The posts index contains dense vectors as well as metadata fields. Cluster stats currently report 45 indices total, 231 segments, and dense-vector mappings with 1024 and 2560 dimensions; the inflated planning figure above treats the live corpus as a 30% larger operating target so the AWS estimate has room for growth instead of pretending disks stop filling themselves.

The indexing service is Rust and uses a batcher between AMQP consumption and Elasticsearch bulk writes:

fn should_flush(batch: &[PreparedMessage], batch_bytes: usize, config: &AppConfig) -> bool {
    batch.len() >= config.batch_size || batch_bytes >= config.batch_max_bytes
}

async fn flush_batch(
    batch: &mut Vec<PreparedMessage>,
    batch_bytes: &mut usize,
    ctx: &BatcherContext,
) {
    if batch.is_empty() {
        *batch_bytes = 0;
        return;
    }

    match ctx.es.bulk_index(batch).await {
        Ok(results) => {
            for (item, result) in batch.drain(..).zip(results) {
                match result {
                    Ok(()) => complete_success(item, ctx).await,
                    Err(error) => retry_prepared(item, ctx, &error).await,
                }
            }
        }
        Err(error) => {
            let reason = error.to_string();
            for item in batch.drain(..) {
                retry_prepared(item, ctx, &reason).await;
            }
        }
    }

    *batch_bytes = 0;
}

Bulk writes keep Elasticsearch throughput predictable. The indexer prepares documents concurrently, then flushes them through the bulk API instead of issuing one request per document.

Kafka logging: the implemented audit spine

Kafka is the structured event log for Mogador. RabbitMQ is for work scheduling. Kafka is for facts.

Every service emits compact JSON events into Kafka topics grouped by domain:

TopicExample events
mogador.scraper.eventspage_fetched, item_claimed, item_skipped, checkpoint_advanced
mogador.download.eventsdownload_started, download_completed, download_failed, dead_lettered
mogador.optimize.eventshash_computed, duplicate_detected, transcode_completed, publish_ready
mogador.ml.eventstagger_loaded, embedding_batch_flushed, model_evicted, inference_failed
mogador.index.eventsbulk_index_started, bulk_index_partial, bulk_index_completed
mogador.storage.eventsobject_written, object_deleted, fanout_path_assigned

Event shape:

{
  "event_id": "01JZ...",
  "trace_id": "optimizer:queued-posts/example-key",
  "service": "optimizer-rs",
  "event": "duplicate_detected",
  "source_class": "image-board",
  "post_id": "9f0b9e6d-...",
  "media_kind": "image",
  "match_kind": "perceptual",
  "duration_ms": 12.7,
  "queue": "to_optimize",
  "attempt": 0,
  "timestamp": "2026-04-29T21:17:00.000Z"
}

Kafka gives Mogador a replayable operational history without forcing Elasticsearch or ClickHouse to become the canonical event stream. It also makes retrospective debugging possible: if a media item disappeared, duplicated, failed tagging, or got indexed with stale metadata, the timeline is reconstructable.

This is useful because “I swear the worker did something weird yesterday” makes a terrible observability strategy.

Scraper design: adding sources

A new source adapter generally needs:

  1. A config file.
  2. A worker implementing the Rust kernel traits.
  3. A stable item key.
  4. A checkpoint strategy.
  5. A classifier that creates a DownloadMessage or TextEmbedMessage.
  6. A compose entry.

The source-specific code stays focused because ownership stops at discovery. Tagging, embedding, transcoding, dedupe, thumbnailing, and indexing remain shared pipeline concerns; the scraper only discovers records and publishes work.

The common message types are shared:

pub use model::{
    DownloadMessage,
    MogadorPostRecord,
    OptimizeMessage,
    TextEmbedMessage,
    stable_post_id,
};

That design choice keeps new source work local: add the adapter, emit the shared message, and let the existing pipeline handle the expensive stages.

Custom formats and transformers

The optimizer treats formats as transformable artifacts. A source can emit a generic download job, and the downstream pipeline decides how to normalize it.

Current transformer classes:

Input classOutput
static imageWebP
animated/video mediaMP4 preview or normalized MP4
text/comment documentsClickHouse row + BGE-M3 embedding
osu! beatmap archivescleaned archive + Opus audio + patched .osu metadata

The transformer contract is explicit:

trait Transformer {
    type Input;
    type Output;

    fn inspect(&self, input: &Self::Input) -> Result<MediaKind>;
    fn transform(&self, input: &Self::Input, output: &Path) -> Result<Self::Output>;
    fn downstream_messages(&self, output: &Self::Output) -> Result<Vec<QueueMessage>>;
}

That keeps new-format support away from scraper rewrites. The scraper emits an artifact; the pipeline either recognizes its transformer or sends it to dead-letter with a reason.

osu! ingestion for ozut

The osu! path is the friendliest scraper in the system and supports ozut, my osu! beatmap generation project.

There are two pieces:

  1. Metadata sync from osu! beatmapset search.
  2. Beatmap archive ingestion into the Mogador queue pipeline.

The metadata sync keeps a local SQLite database of beatmapsets, beatmaps, cursor state, download status, and hashes. It has two modes:

ModePurpose
cursor syncwalk ranked beatmapsets by cursor
incremental syncfetch until the last known head ID

The current sync loop is defensive about cursors:

if page.cursor_string and page.cursor_string == previous_cursor:
    raise RuntimeError("cursor did not advance, aborting to avoid an infinite loop")

After persistence, the integration publishes beatmap archives directly to Mogador’s to_download queue. From that point onward, beatmaps behave like any other artifact class, with a custom Rust transformer.

The beatmap transformer does four things:

  1. Extract the .osz archive.
  2. Convert the primary audio track to Opus 128 kbps.
  3. Patch every .osu file so AudioFilename: references the new Opus track.
  4. Drop non-essential assets such as custom hitsounds, storyboards, videos, backgrounds, and other training-irrelevant baggage.

The actual transformer patcher is direct:

fn patch_audio_filename(osu_text: &str, opus_name: &str) -> String {
    osu_text
        .lines()
        .map(|line| {
            if line.starts_with("AudioFilename:") {
                format!("AudioFilename: {opus_name}")
            } else {
                line.to_owned()
            }
        })
        .collect::<Vec<_>>()
        .join("\n")
}

The result is a cleaner training corpus: timing, hit objects, difficulty metadata, and audio remain; decorative or source-specific assets disappear. Every cleaned .osu file points at the normalized Opus track through AudioFilename: audio.opus, which is the invariant the transformer enforces. This matters because ozut needs structure and rhythm rather than unrelated storyboard and asset baggage.

The current ozut training corpus is over 200 GB of maps and 301,374 beatmaps, and the cleaned archive format keeps the pipeline reproducible. The data path from public beatmap metadata to model-ready map artifacts is now the same queue/worker flow as everything else.

Comment corpus and downstream ML projects

Mogador has already powered several projects:

Comment style classification and topic modeling

The comment corpus fed a DeBERTa-based classifier project. Embeddings also made it practical to cluster and label comment styles with topic-modeling workflows.

The important part is that comments are stored twice:

  1. Raw-ish text and metadata in ClickHouse.
  2. Dense BGE-M3 vectors in lake.embeddings.

That lets me ask both analytical questions and retrieval questions:

  • Which sources/classes produce which style distributions?
  • Which comments cluster together semantically?
  • Which topics correlate with tag families?
  • Which comment styles are suitable for synthetic generation?

LLM fine-tuning with Unsloth

The comment corpus also supports local LLM fine-tuning via Unsloth. The commenter service uses a local causal language model with LoRA checkpoints to generate comments conditioned on style and tags.

Current service defaults:

FieldValue
Base modelHuggingFaceTB/SmolLM3-3B
Adapter typeLoRA checkpoint
RuntimePyTorch + PEFT + Transformers
GPU dtypebfloat16 on CUDA
Queueto_comment

That service stays outside the public-facing search path as a trust signal. It is an experimental generation service for controlled data work.

SDXL / SD3 LoRA fine-tuning

The media side of Mogador has been used to build image datasets for SDXL and SD3 LoRA experiments. The tagger and search index make it possible to filter datasets by visual concepts, style tags, dimensions, and dedupe status before exporting training manifests.

The practical win: the dataset can be selected from local metadata instead of hand-curated one file at a time.

ozut training

ozut uses the osu! side of the pipeline. The cleaned beatmap archives, audio conversion, and asset stripping make it practical to train on hundreds of gigabytes of maps without dragging unrelated assets through every experiment.

See: https://ozut.moe/

Recommendations and reranking

Mogador has also been the dataset substrate for recommendation and reranking experiments:

  • dense-vector retrieval with BGE-M3
  • tag-based candidate generation
  • Elasticsearch ranking features
  • ClickHouse analytics for exposure/click/favorite events
  • reranking candidate logs
  • local evaluation loops

There is already a separate set of ClickHouse runtime tables for user events, exposures, favorites, list impressions, and ranking candidates. The scraper corpus became a retrieval playground because it had the thing recommender systems always want and rarely get cheaply: lots of structured items with tags, vectors, and metadata.

Concurrency model

The concurrency strategy is intentionally uneven. Every service gets the concurrency shape it deserves.

ServiceConcurrency pattern
Rust scrapersperiodic/batch processing with checkpoint order
downloaderasync streaming + bounded prefetch
optimizerRabbitMQ consumer + blocking hash/transcode tasks + RocksDB reservations
taggerasync queue + GPU microbatching
embedderasync queues + microbatch inference + batched DB writers
thumbnailerper-message temp dirs + blocking transcode tasks
indexerconcurrent preparation + single bulk batcher

The indexer shows the pattern clearly: concurrent workers prepare items, then a batcher flushes them to Elasticsearch by size, byte estimate, or timeout. This avoids both extremes: single-message indexing and unlimited memory growth.

The optimizer uses spawn_blocking for CPU-heavy work like hashing and transcoding. That keeps the async runtime responsive while still using the 7950X properly.

The GPU services use tiny wait windows to collect batches. The goal is avoiding the saddest possible batch size: 1 forever, while keeping latency tiny.

Retry and dead-letter policy

Retries are explicit and counted. Failures are classified into reason labels such as:

  • http_404
  • unsupported_media
  • max_retries_exceeded
  • pending_duplicate
  • publish_error
  • clickhouse_error
  • db_error
  • model_error
  • invalid_payload
  • missing_field

Dead-letter queues are labeled parking lots. A dead-letter queue full of unclassified payloads is an operational blind spot.

Observability

Mogador has three observability layers:

  1. Prometheus metrics from services and exporters.
  2. Kafka structured event logs.
  3. Store-native diagnostics from RabbitMQ, Elasticsearch, ClickHouse, and PostgreSQL.

The compose stack includes:

  • RabbitMQ exporter
  • Elasticsearch exporter
  • node-exporter
  • cAdvisor
  • Prometheus with 30-day retention
  • Grafana dashboards
  • custom Mogador metrics scraper

The custom metrics service scrapes Elasticsearch index stats and emits domain-specific gauges. That exists because generic exporters are good at telling you a database exists, but less good at answering “is my corpus growing in the way I expect, or has it become a mushroom?”

Why ClickHouse and Elasticsearch both exist

Elasticsearch is for online search. ClickHouse is for analytical scans.

Elasticsearch is good at:

  • full-text-ish search
  • filter-heavy user queries
  • dense vector retrieval
  • document lookup
  • ranking experiments

ClickHouse is good at:

  • counting millions of rows quickly
  • compressing text columns
  • storing embedding history
  • grouping by source/model/month
  • powering offline analysis

Trying to make one of them do both jobs would be possible in the same way eating soup with a fork is possible. You can prove your dedication; you cannot prove your judgment.

What it costs locally

Local recurring cost is mostly electricity and disks. The hardware already exists because formidable is my workstation. The marginal cost of Mogador is therefore much lower than the cloud equivalent.

Current storage reality:

MountCapacityUsedFreeUsage
media RAID17.28 TiB6.47 TiB0.81 TiB88.9%
fast SSD (javelin)1.82 TiB1.30 TiB0.52 TiB71.5%
MinIO staging SSD119 GiB99 GiB20 GiB83.1%

The RAID1 setup uses two 8 TB disks to expose roughly one disk of usable capacity with redundancy. Backups are still separate; RAID1 protects against a single disk failure and keeps the hot corpus available.

What it would cost on AWS

This is a rough estimate rather than a procurement-grade quote. AWS pricing moves by region, reserved commitments, storage class, egress, and whether you accept managed-service minimums. I used public pricing models and conservative assumptions:

  • 9.46 TiB object corpus in S3 Standard
  • two 8 TB disks worth of gp3-equivalent block storage for RAID1-like durability
  • 13M processed files
  • 6 queue hops per file
  • send + receive + delete per queue hop
  • Elasticsearch/OpenSearch with production-ish redundancy
  • ClickHouse Cloud or equivalent managed analytical storage
  • one CPU-heavy instance comparable to the 7950X host
  • one 24 GB GPU instance comparable to the 4090 class for ML services
  • MediaConvert as a separate one-time/backfill or monthly processing cost, depending on workload

Approximate monthly baseline, excluding MediaConvert:

ComponentEstimate
S3 Standard for 9.46 TiB~$223/mo
S3 requests for 13M objects, rough~$211 per large processing pass
SQS for ~234M billed requests~$93 per large processing pass
gp3 for 16 TB raw RAID1-equivalent block storage~$1,311/mo
OpenSearch compute~$366-$731/mo
OpenSearch storage, 3x 331 GB~$121/mo
ClickHouse Cloud / managed analytical tier~$390-$1,560/mo
CPU instance comparable to host workload~$894-$1,340/mo
24 GB GPU instance~$734/mo
1 TB internet egress~$92/mo

Baseline total:

ScenarioEstimate
no egress~$4,300-$6,300/mo
1 TB egress~$4,400-$6,400/mo
5 TB egress~$4,800-$6,800/mo

MediaConvert is the jump scare. AWS Elemental MediaConvert is billed per output minute with normalization multipliers. A 1.3-million-video pass can range from ~$4,875 for very short/simple outputs to ~$78,000 for longer/higher-quality processing assumptions.

At that point the cloud version mostly becomes a budgeting exercise: every queue hop, transcode minute, vector table, replica, and egress path turns into a small line item with sharp little teeth.

Why this works

Mogador works because it follows a few rules:

  1. Every item has a stable key.
  2. Every worker can retry safely.
  3. Every expensive stage is isolated behind a queue.
  4. Every storage layer has one main job.
  5. Deduplication happens before long-term storage.
  6. GPU inference is batched.
  7. Bulk writes are batched.
  8. Config is centralized.
  9. Logs are structured.
  10. New sources implement a tiny adapter instead of a new platform.
hash: 66d
EOF