Mogador: enterprise-grade scraping with one beefy workstation
tl;dr summary
Mogador is a Rust/Python scraping pipeline running on formidable: RabbitMQ queues, Rust workers, MinIO, Elasticsearch, ClickHouse, GPU tagging, embeddings, dedupe, Kafka logging, a friendly osu! ingestion path for ozut, and a compact extension model for new data formats.
table of contents
Mogador is my media and text ingestion system.
That sentence is technically true and also deeply rude to the amount of machinery involved. Mogador has processed more than 13 million files, currently keeps about 9.18 million indexed documents, stores 11.77 million embedded text rows in ClickHouse, and now sits underneath a bunch of ML experiments that began as “what if I just collected the data locally?” and then quietly became actual infrastructure.
This post documents the system end-to-end: architecture, queues, storage, deduplication, models, concurrency, hardware, observability, the Rust rewrite, osu! ingestion for ozut, and the approximate AWS bill that would appear if I tried to host the same system in the cloud and gave Jeff Bezos my debit card as a little enrichment toy.
A note before we open the box: I am intentionally omitting most upstream data-source names. The article is about system design rather than a source directory. The osu! path is discussed because it is a friendly integration built around public beatmap metadata/download flows and because it directly supports ozut.
Current snapshot
Measured on formidable on 2026-04-29:
| Component | Snapshot |
|---|---|
Indexed Elasticsearch documents (posts) | 9,183,737 |
| Elasticsearch deleted docs awaiting merge | 496,766 |
| Elasticsearch primary store | 330.7 GB |
| Tags index documents | 231,226 |
ClickHouse lake.docs rows | 11,767,862 |
ClickHouse lake.embeddings rows | 11,768,211 |
| BGE-M3 vector raw footprint | 44.89 GiB |
| ClickHouse embedding table on disk | 34.20 GiB |
| ClickHouse document table on disk | 824.98 MiB |
| Current media disk usage | 6.47 TiB used of 7.28 TiB usable |
| Current MinIO staging disk | 100 GB used |
The main flow is:
- Scrapers discover records.
- Records are inserted or claimed idempotently.
- Download jobs go into RabbitMQ.
- Downloader streams media to object storage.
- Optimizer normalizes media and deduplicates it.
- Tagger, embedder, thumbnailer, and indexer enrich it.
- Elasticsearch serves search and retrieval.
- ClickHouse keeps the analytical/text lake.
- Kafka receives structured logs for audit and replay.
Hardware: formidable
Mogador runs on formidable, the same machine described in Everything I Self-Host at Home.
Current host:
| Resource | Value |
|---|---|
| Hostname | formidable |
| CPU | AMD Ryzen 9 7950X, 16 cores / 32 threads |
| RAM | 61 GiB |
| GPU | NVIDIA GeForce RTX 4090, 24 GiB VRAM |
| Driver | 595.58.03 |
| OS/root | encrypted btrfs on 1 TB NVMe |
| Fast work disk | 1.8 TiB SSD (javelin) |
| Media tier | RAID1 of 8 TB disks, ~7.28 TiB usable |
| Current media tier usage | 6.47 TiB used, 0.81 TiB free, 88.9% full |
The important architectural choice is treating the GPU as a shared accelerator across the whole pipeline. The same RTX 4090 handles image tagging, text embeddings, thumbnail/transcode work, comment generation experiments, and other local model workloads. The system therefore uses queues and conservative concurrency instead of letting every worker sprint into CUDA like a toddler into traffic.
Storage layout
Mogador uses several storage layers with clearly separated responsibilities:
| Layer | Role |
|---|---|
| MinIO | S3-compatible staging bucket for raw and intermediate objects |
| Local filesystem | Final media and thumbnail storage |
| PostgreSQL | scraper state, claims, old post metadata, idempotency tables |
| Elasticsearch | search, filtering, ranking, dense-vector retrieval |
| ClickHouse | text lake, embeddings, analytics, long-term feature work |
| RocksDB | optimizer dedupe state and publish reservations |
| Kafka | structured event log for audit, replay, and downstream consumers |
| RabbitMQ | operational task queues with quorum durability |
| etcd | live config snapshots and service-specific overrides |
This split sounds like a lot until you look at the responsibilities. RabbitMQ decides what should happen next. Kafka records what happened. PostgreSQL answers have we seen this scraper item before? RocksDB answers is this media duplicate or currently being processed? Elasticsearch answers can the app search this fast? ClickHouse answers can I aggregate this without crying?
Databases are happiest when they are not being forced to do each other’s hobbies.
The high-level pipeline
The major queue names are direct:
| Queue | Purpose |
|---|---|
to_download | fetch original media or external artifacts |
to_optimize | normalize, transcode, deduplicate |
to_tag | generate tags with the vision tagger |
to_embed | embed media/search documents |
to_embed_text | embed standalone text/comment documents into ClickHouse |
to_thumbnail | generate preview thumbnails |
to_index | write final documents to Elasticsearch |
to_hash | backfill hash/dedupe state |
to_comment | generate synthetic comments for controlled experiments |
RabbitMQ queues are declared as quorum queues. That is mildly more expensive than classic queues, but it gives the task layer enough durability that a worker crash is a retry event rather than data loss.
The Rust AMQP helper keeps the publish path explicit:
pub async fn declare_quorum_queue(&self, name: &str) -> BoxResult<()> {
let mut arguments = FieldTable::default();
arguments.insert(
ShortString::from("x-queue-type"),
AMQPValue::LongString(LongString::from("quorum")),
);
self.publish_channel
.queue_declare(
name.into(),
QueueDeclareOptions {
durable: true,
auto_delete: false,
exclusive: false,
passive: false,
nowait: false,
},
arguments,
)
.await?;
Ok(())
}
The contract is durable queues, JSON payloads, delivery mode 2, message IDs, and workers that know how to retry or dead-letter.
Why the Rust rewrite happened
The early pipeline was mostly Python. That was good for getting the system moving, but bad for long-running workers that need predictable memory, clean cancellation, backpressure, and explicit concurrency.
The rewrite moved the hot operational path to Rust:
| Service | Runtime | Role |
|---|---|---|
| scraper kernel | Rust | common traits for adapters, checkpoints, claims, publishing |
| downloader | Rust | stream downloads into object storage, retry, dead-letter |
| optimizer | Rust | transcode, hash, dedupe, publish ready artifacts |
| thumbnailer | Rust | generate preview images/videos |
| indexer | Rust | batch Elasticsearch writes and finalize media moves |
| source adapters | Rust | source-specific discovery workers |
| embedding | Python | BGE-M3 + GPU inference + ClickHouse/Postgres writers |
| tagger | Python | ONNXRuntime GPU image/video tagger |
| commenter | Python | local LLM comment generation experiments |
The Rust services share a kernel built around a few traits:
pub trait NativeAdapter: Send + Sync {
type Item;
type Checkpoint: Checkpoint;
type Record;
type Message;
fn item_key(&self, item: &Self::Item) -> String;
fn checkpoint(&self, item: &Self::Item) -> Self::Checkpoint;
fn classify(&self, item: Self::Item) -> BoxResult<NativeAction<Self::Record, Self::Message>>;
}
That abstraction is the main reason adding sources became fast. A source adapter only has to answer:
- How do I page through upstream records?
- What is the stable item key?
- What checkpoint advances monotonically?
- Should the item be skipped, persisted only, or persisted and published?
- Which queue message should be emitted?
Everything else is shared: claims, checkpoints, duplicate handling, publishing, retries, logging, and metrics.
Idempotency: the tiny feature that saves your weekends
Scraping systems mostly die from duplicate work, partial work, or weird half-success states. Mogador treats every item as something that must be claimed before it can be processed.
The generic batch loop does three important things:
match completion_store.claim(&key).await? {
ClaimOutcome::Completed => {
report.duplicates += 1;
advance_checkpoint(checkpoint_store, checkpoint_key, current_checkpoint, checkpoint).await?;
continue;
}
ClaimOutcome::Busy => {
return Err(KernelError::Contention(
"item is already being processed by another worker",
));
}
ClaimOutcome::Claimed => {}
}
A record can be:
| State | Meaning |
|---|---|
| completed | already handled; skip and advance if safe |
| busy | another worker owns it; stop before producing duplicate side effects |
| claimed | this worker may proceed |
The effect is operationally important: workers can restart, scrapers can overlap, pages can repeat, and duplicate media still collapses to stable state.
Downloading: stream, upload, publish
downloader-rs consumes to_download, checks policy, streams media, uploads to MinIO, and emits to_optimize.
The worker streams and counts bytes instead of loading whole objects into memory. It rejects blacklisted extensions, respects max-size limits, applies per-source cookies when configured, and treats 404s as dead-letter events rather than eternal retry confetti.
The downloader path is:
let downloaded = self
.http
.stream(
&payload.media_url,
payload.referer.as_deref(),
&cookies,
max_bytes,
&self.metrics,
)
.await?;
let s3_key = format!("{}/{}{}", payload.source, payload.post_id, ext);
self.storage
.upload_stream(
&s3_key,
downloaded.body_stream,
downloaded.content_length,
downloaded.mime.as_deref(),
)
.await?;
self.amqp
.publish_json(&self.queue_to_optimize, &optimize_payload, None, None)
.await?;
Downloading stays deliberately mechanical: stream, upload, publish, ack. If it fails, classify the reason and retry or dead-letter it.
Optimizer: dedupe, normalization, and publish safety
The optimizer is the busiest and most important worker. It consumes staged objects, downloads them locally into a temp directory, computes exact and perceptual hashes, reserves a dedupe record, transcodes the media, uploads the normalized result, then publishes downstream messages.
For images:
- compute BLAKE3 for exact content identity
- compute perceptual image hashes (
aHash,dHash,pHash) - normalize still images with libvips into WebP/AVIF-friendly derivatives
For videos:
- compute BLAKE3
- sample frames
- compute perceptual hashes per sampled frame
- normalize moving media with ffmpeg, including AV1-oriented encodes where that storage/perceptual-quality tradeoff makes sense
The dedupe layer uses RocksDB column families for exact and perceptual states:
const CF_EXACT_PENDING: &str = "exact_pending";
const CF_EXACT_COMMITTED: &str = "exact_committed";
const CF_IMAGE_PENDING: &str = "image_pending";
const CF_IMAGE_COMMITTED: &str = "image_committed";
const CF_VIDEO_PENDING: &str = "video_pending";
const CF_VIDEO_COMMITTED: &str = "video_committed";
A media object can be Pending, ReadyToPublish, Publishing, Published, or Aborted. That state machine matters because dedupe and publishing must be coupled. If two workers see the same media, one of them may reuse the already prepared publish plan rather than recomputing and re-uploading the same artifact.
The optimizer logs stage timings separately:
| Stage | Meaning |
|---|---|
| download | fetch staged object from MinIO |
| hash | exact/perceptual hashing |
| dedupe | reserve or detect duplicate in RocksDB |
| transcode | image/video normalization |
| upload | write normalized output |
| publish | emit downstream messages |
That separation is practical. When the system slows down, I can tell whether the villain is disk IO, ffmpeg AV1/video work, libvips image normalization, GPU saturation, MinIO, RocksDB, or a queue that has started making little haunted noises.
Filesystem layout: Git-style fan-out sharding
Final media files and thumbnails use hash fan-out / Git-style directory sharding. A UUID or content-derived key is split into prefix directories so each directory stays small enough for normal filesystem tooling.
Example:
e9/c8/e9c86a66-597c-42da-86ed-977af1438195.mp4
The first two characters become the top-level directory, the next two characters become the second-level directory, and the full filename sits under that. Git uses the same broad idea for object storage: prefix directories keep directory lookups sane and make filesystem tooling less grumpy.
The helper is:
fn fanout_path(root: &Path, id: Uuid, extension: &str) -> PathBuf {
let name = id.hyphenated().to_string();
let a = &name[0..2];
let b = &name[2..4];
root.join(a).join(b).join(format!("{name}.{extension}"))
}
This optimization matters once the corpus reaches millions of files; directory fan-out keeps lookup and maintenance tooling predictable.
Tagging: WD EVA02 on the 4090
The tagger is Python because the model ecosystem is Python-shaped and I have chosen peace in this tiny corner of the universe.
Current tagger model:
| Field | Value |
|---|---|
| Model | SmilingWolf/wd-eva02-large-tagger-v3 |
| Runtime | ONNXRuntime GPU |
| Image size | 448 |
| Video sampling | 0.5 fps |
| Batch size | 8 |
| General tag threshold | 0.35 |
| Character tag threshold | 0.75 |
The service consumes to_tag, pulls the media, extracts image/video frames, runs batched inference, writes tag results, and publishes to indexing. The model is a WaifuDiffusion-family visual tagger: it turns an image/frame into ranked semantic tags, characters, style descriptors, and safety-ish categories that are useful for filtering, search facets, dataset export, and later ML training. It exposes Prometheus metrics for model lifecycle, batch size, latency, output tag counts, dead letters, and HTTP admin endpoints.
Model loading is lazy and TTL-based. That lets the GPU memory return to the rest of the workstation when the tagger is idle.
The tagger uses a short microbatching window:
MODEL_REPO_ID = "SmilingWolf/wd-eva02-large-tagger-v3"
IMAGE_SIZE = 448
EXTRACT_FPS = 0.5
BATCH_SIZE = 8
INFERENCE_BATCH_WAIT_MS = 8.0
GENERAL_THRESHOLD = 0.35
CHARACTER_THRESHOLD = 0.75
DEFAULT_MODEL_TTL_SECONDS = 600
Eight milliseconds sounds silly until it saves you from launching one GPU inference per message like a tiny CUDA woodpecker.
Embeddings: BGE-M3 and ClickHouse
The embedding service consumes two streams:
| Queue | Role |
|---|---|
to_embed | embeds media-derived search documents: normalized metadata, tags, captions/comments when present, and source-local fields into retrieval text |
to_embed_text | text/comment embedding path into ClickHouse |
Current embedding model:
| Field | Value |
|---|---|
| Model | BAAI/bge-m3 |
| Library | FlagEmbedding |
| Vector dimensions | 1024 |
| Rows in ClickHouse | 11,768,211 |
| Raw vector footprint | 44.89 GiB |
| Compressed table footprint | 34.20 GiB |
ClickHouse schema:
CREATE TABLE IF NOT EXISTS lake.docs
(
source LowCardinality(String),
source_id String,
doc_uuid UUID,
updated_at DateTime64(3),
ingest_at DateTime64(3),
author String,
text String CODEC(ZSTD(3)),
meta_json String CODEC(ZSTD(3))
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(ingest_at)
ORDER BY (source, source_id, updated_at);
CREATE TABLE IF NOT EXISTS lake.embeddings
(
model LowCardinality(String),
doc_uuid UUID,
updated_at DateTime64(3),
embedding Array(Float32)
)
ENGINE = MergeTree
PARTITION BY (model, toYYYYMM(updated_at))
ORDER BY (model, doc_uuid);
The text lake is currently compact:
| Source class | Rows | Text characters |
|---|---|---|
| media-comment corpus A | 5,263,380 | 394,130,823 |
| media-comment corpus B | 2,401,321 | 306,184,712 |
| chat/comment corpus | 1,306,044 | 37,656,367 |
| smaller corpora | 81,457 | 5,772,478 |
Again: source names omitted deliberately. The numbers are the interesting part.
The embedder has Prometheus metrics for:
- per-pipeline message outcomes
- end-to-end latency
- per-stage latency
- cache hit/miss
- inference batch size
- microbatch flush reason
- writer queue depth
- ClickHouse inserted rows
- model load events
The internal lesson from running BGE-M3 locally is that embeddings are only cheap if you care about batching. A single 4090 can chew through a lot of text, but if every message calls model.encode() alone, the GPU spends too much time paying overhead. Mogador batches by queue pressure and writes to ClickHouse in batches as well.
Elasticsearch: nine million searchable documents
Elasticsearch currently holds:
| Index | Docs | Store |
|---|---|---|
posts | 9,183,737 | 330.7 GB |
tags | 231,226 | 32.4 MB |
The posts index contains dense vectors as well as metadata fields. Cluster stats currently report 45 indices total, 231 segments, and dense-vector mappings with 1024 and 2560 dimensions; the inflated planning figure above treats the live corpus as a 30% larger operating target so the AWS estimate has room for growth instead of pretending disks stop filling themselves.
The indexing service is Rust and uses a batcher between AMQP consumption and Elasticsearch bulk writes:
fn should_flush(batch: &[PreparedMessage], batch_bytes: usize, config: &AppConfig) -> bool {
batch.len() >= config.batch_size || batch_bytes >= config.batch_max_bytes
}
async fn flush_batch(
batch: &mut Vec<PreparedMessage>,
batch_bytes: &mut usize,
ctx: &BatcherContext,
) {
if batch.is_empty() {
*batch_bytes = 0;
return;
}
match ctx.es.bulk_index(batch).await {
Ok(results) => {
for (item, result) in batch.drain(..).zip(results) {
match result {
Ok(()) => complete_success(item, ctx).await,
Err(error) => retry_prepared(item, ctx, &error).await,
}
}
}
Err(error) => {
let reason = error.to_string();
for item in batch.drain(..) {
retry_prepared(item, ctx, &reason).await;
}
}
}
*batch_bytes = 0;
}
Bulk writes keep Elasticsearch throughput predictable. The indexer prepares documents concurrently, then flushes them through the bulk API instead of issuing one request per document.
Kafka logging: the implemented audit spine
Kafka is the structured event log for Mogador. RabbitMQ is for work scheduling. Kafka is for facts.
Every service emits compact JSON events into Kafka topics grouped by domain:
| Topic | Example events |
|---|---|
mogador.scraper.events | page_fetched, item_claimed, item_skipped, checkpoint_advanced |
mogador.download.events | download_started, download_completed, download_failed, dead_lettered |
mogador.optimize.events | hash_computed, duplicate_detected, transcode_completed, publish_ready |
mogador.ml.events | tagger_loaded, embedding_batch_flushed, model_evicted, inference_failed |
mogador.index.events | bulk_index_started, bulk_index_partial, bulk_index_completed |
mogador.storage.events | object_written, object_deleted, fanout_path_assigned |
Event shape:
{
"event_id": "01JZ...",
"trace_id": "optimizer:queued-posts/example-key",
"service": "optimizer-rs",
"event": "duplicate_detected",
"source_class": "image-board",
"post_id": "9f0b9e6d-...",
"media_kind": "image",
"match_kind": "perceptual",
"duration_ms": 12.7,
"queue": "to_optimize",
"attempt": 0,
"timestamp": "2026-04-29T21:17:00.000Z"
}
Kafka gives Mogador a replayable operational history without forcing Elasticsearch or ClickHouse to become the canonical event stream. It also makes retrospective debugging possible: if a media item disappeared, duplicated, failed tagging, or got indexed with stale metadata, the timeline is reconstructable.
This is useful because “I swear the worker did something weird yesterday” makes a terrible observability strategy.
Scraper design: adding sources
A new source adapter generally needs:
- A config file.
- A worker implementing the Rust kernel traits.
- A stable item key.
- A checkpoint strategy.
- A classifier that creates a
DownloadMessageorTextEmbedMessage. - A compose entry.
The source-specific code stays focused because ownership stops at discovery. Tagging, embedding, transcoding, dedupe, thumbnailing, and indexing remain shared pipeline concerns; the scraper only discovers records and publishes work.
The common message types are shared:
pub use model::{
DownloadMessage,
MogadorPostRecord,
OptimizeMessage,
TextEmbedMessage,
stable_post_id,
};
That design choice keeps new source work local: add the adapter, emit the shared message, and let the existing pipeline handle the expensive stages.
Custom formats and transformers
The optimizer treats formats as transformable artifacts. A source can emit a generic download job, and the downstream pipeline decides how to normalize it.
Current transformer classes:
| Input class | Output |
|---|---|
| static image | WebP |
| animated/video media | MP4 preview or normalized MP4 |
| text/comment documents | ClickHouse row + BGE-M3 embedding |
| osu! beatmap archives | cleaned archive + Opus audio + patched .osu metadata |
The transformer contract is explicit:
trait Transformer {
type Input;
type Output;
fn inspect(&self, input: &Self::Input) -> Result<MediaKind>;
fn transform(&self, input: &Self::Input, output: &Path) -> Result<Self::Output>;
fn downstream_messages(&self, output: &Self::Output) -> Result<Vec<QueueMessage>>;
}
That keeps new-format support away from scraper rewrites. The scraper emits an artifact; the pipeline either recognizes its transformer or sends it to dead-letter with a reason.
osu! ingestion for ozut
The osu! path is the friendliest scraper in the system and supports ozut, my osu! beatmap generation project.
There are two pieces:
- Metadata sync from osu! beatmapset search.
- Beatmap archive ingestion into the Mogador queue pipeline.
The metadata sync keeps a local SQLite database of beatmapsets, beatmaps, cursor state, download status, and hashes. It has two modes:
| Mode | Purpose |
|---|---|
| cursor sync | walk ranked beatmapsets by cursor |
| incremental sync | fetch until the last known head ID |
The current sync loop is defensive about cursors:
if page.cursor_string and page.cursor_string == previous_cursor:
raise RuntimeError("cursor did not advance, aborting to avoid an infinite loop")
After persistence, the integration publishes beatmap archives directly to Mogador’s to_download queue. From that point onward, beatmaps behave like any other artifact class, with a custom Rust transformer.
The beatmap transformer does four things:
- Extract the
.oszarchive. - Convert the primary audio track to Opus 128 kbps.
- Patch every
.osufile soAudioFilename:references the new Opus track. - Drop non-essential assets such as custom hitsounds, storyboards, videos, backgrounds, and other training-irrelevant baggage.
The actual transformer patcher is direct:
fn patch_audio_filename(osu_text: &str, opus_name: &str) -> String {
osu_text
.lines()
.map(|line| {
if line.starts_with("AudioFilename:") {
format!("AudioFilename: {opus_name}")
} else {
line.to_owned()
}
})
.collect::<Vec<_>>()
.join("\n")
}
The result is a cleaner training corpus: timing, hit objects, difficulty metadata, and audio remain; decorative or source-specific assets disappear. Every cleaned .osu file points at the normalized Opus track through AudioFilename: audio.opus, which is the invariant the transformer enforces. This matters because ozut needs structure and rhythm rather than unrelated storyboard and asset baggage.
The current ozut training corpus is over 200 GB of maps and 301,374 beatmaps, and the cleaned archive format keeps the pipeline reproducible. The data path from public beatmap metadata to model-ready map artifacts is now the same queue/worker flow as everything else.
Comment corpus and downstream ML projects
Mogador has already powered several projects:
Comment style classification and topic modeling
The comment corpus fed a DeBERTa-based classifier project. Embeddings also made it practical to cluster and label comment styles with topic-modeling workflows.
The important part is that comments are stored twice:
- Raw-ish text and metadata in ClickHouse.
- Dense BGE-M3 vectors in
lake.embeddings.
That lets me ask both analytical questions and retrieval questions:
- Which sources/classes produce which style distributions?
- Which comments cluster together semantically?
- Which topics correlate with tag families?
- Which comment styles are suitable for synthetic generation?
LLM fine-tuning with Unsloth
The comment corpus also supports local LLM fine-tuning via Unsloth. The commenter service uses a local causal language model with LoRA checkpoints to generate comments conditioned on style and tags.
Current service defaults:
| Field | Value |
|---|---|
| Base model | HuggingFaceTB/SmolLM3-3B |
| Adapter type | LoRA checkpoint |
| Runtime | PyTorch + PEFT + Transformers |
| GPU dtype | bfloat16 on CUDA |
| Queue | to_comment |
That service stays outside the public-facing search path as a trust signal. It is an experimental generation service for controlled data work.
SDXL / SD3 LoRA fine-tuning
The media side of Mogador has been used to build image datasets for SDXL and SD3 LoRA experiments. The tagger and search index make it possible to filter datasets by visual concepts, style tags, dimensions, and dedupe status before exporting training manifests.
The practical win: the dataset can be selected from local metadata instead of hand-curated one file at a time.
ozut training
ozut uses the osu! side of the pipeline. The cleaned beatmap archives, audio conversion, and asset stripping make it practical to train on hundreds of gigabytes of maps without dragging unrelated assets through every experiment.
See: https://ozut.moe/
Recommendations and reranking
Mogador has also been the dataset substrate for recommendation and reranking experiments:
- dense-vector retrieval with BGE-M3
- tag-based candidate generation
- Elasticsearch ranking features
- ClickHouse analytics for exposure/click/favorite events
- reranking candidate logs
- local evaluation loops
There is already a separate set of ClickHouse runtime tables for user events, exposures, favorites, list impressions, and ranking candidates. The scraper corpus became a retrieval playground because it had the thing recommender systems always want and rarely get cheaply: lots of structured items with tags, vectors, and metadata.
Concurrency model
The concurrency strategy is intentionally uneven. Every service gets the concurrency shape it deserves.
| Service | Concurrency pattern |
|---|---|
| Rust scrapers | periodic/batch processing with checkpoint order |
| downloader | async streaming + bounded prefetch |
| optimizer | RabbitMQ consumer + blocking hash/transcode tasks + RocksDB reservations |
| tagger | async queue + GPU microbatching |
| embedder | async queues + microbatch inference + batched DB writers |
| thumbnailer | per-message temp dirs + blocking transcode tasks |
| indexer | concurrent preparation + single bulk batcher |
The indexer shows the pattern clearly: concurrent workers prepare items, then a batcher flushes them to Elasticsearch by size, byte estimate, or timeout. This avoids both extremes: single-message indexing and unlimited memory growth.
The optimizer uses spawn_blocking for CPU-heavy work like hashing and transcoding. That keeps the async runtime responsive while still using the 7950X properly.
The GPU services use tiny wait windows to collect batches. The goal is avoiding the saddest possible batch size: 1 forever, while keeping latency tiny.
Retry and dead-letter policy
Retries are explicit and counted. Failures are classified into reason labels such as:
http_404unsupported_mediamax_retries_exceededpending_duplicatepublish_errorclickhouse_errordb_errormodel_errorinvalid_payloadmissing_field
Dead-letter queues are labeled parking lots. A dead-letter queue full of unclassified payloads is an operational blind spot.
Observability
Mogador has three observability layers:
- Prometheus metrics from services and exporters.
- Kafka structured event logs.
- Store-native diagnostics from RabbitMQ, Elasticsearch, ClickHouse, and PostgreSQL.
The compose stack includes:
- RabbitMQ exporter
- Elasticsearch exporter
- node-exporter
- cAdvisor
- Prometheus with 30-day retention
- Grafana dashboards
- custom Mogador metrics scraper
The custom metrics service scrapes Elasticsearch index stats and emits domain-specific gauges. That exists because generic exporters are good at telling you a database exists, but less good at answering “is my corpus growing in the way I expect, or has it become a mushroom?”
Why ClickHouse and Elasticsearch both exist
Elasticsearch is for online search. ClickHouse is for analytical scans.
Elasticsearch is good at:
- full-text-ish search
- filter-heavy user queries
- dense vector retrieval
- document lookup
- ranking experiments
ClickHouse is good at:
- counting millions of rows quickly
- compressing text columns
- storing embedding history
- grouping by source/model/month
- powering offline analysis
Trying to make one of them do both jobs would be possible in the same way eating soup with a fork is possible. You can prove your dedication; you cannot prove your judgment.
What it costs locally
Local recurring cost is mostly electricity and disks. The hardware already exists because formidable is my workstation. The marginal cost of Mogador is therefore much lower than the cloud equivalent.
Current storage reality:
| Mount | Capacity | Used | Free | Usage |
|---|---|---|---|---|
| media RAID1 | 7.28 TiB | 6.47 TiB | 0.81 TiB | 88.9% |
fast SSD (javelin) | 1.82 TiB | 1.30 TiB | 0.52 TiB | 71.5% |
| MinIO staging SSD | 119 GiB | 99 GiB | 20 GiB | 83.1% |
The RAID1 setup uses two 8 TB disks to expose roughly one disk of usable capacity with redundancy. Backups are still separate; RAID1 protects against a single disk failure and keeps the hot corpus available.
What it would cost on AWS
This is a rough estimate rather than a procurement-grade quote. AWS pricing moves by region, reserved commitments, storage class, egress, and whether you accept managed-service minimums. I used public pricing models and conservative assumptions:
- 9.46 TiB object corpus in S3 Standard
- two 8 TB disks worth of gp3-equivalent block storage for RAID1-like durability
- 13M processed files
- 6 queue hops per file
- send + receive + delete per queue hop
- Elasticsearch/OpenSearch with production-ish redundancy
- ClickHouse Cloud or equivalent managed analytical storage
- one CPU-heavy instance comparable to the 7950X host
- one 24 GB GPU instance comparable to the 4090 class for ML services
- MediaConvert as a separate one-time/backfill or monthly processing cost, depending on workload
Approximate monthly baseline, excluding MediaConvert:
| Component | Estimate |
|---|---|
| S3 Standard for 9.46 TiB | ~$223/mo |
| S3 requests for 13M objects, rough | ~$211 per large processing pass |
| SQS for ~234M billed requests | ~$93 per large processing pass |
| gp3 for 16 TB raw RAID1-equivalent block storage | ~$1,311/mo |
| OpenSearch compute | ~$366-$731/mo |
| OpenSearch storage, 3x 331 GB | ~$121/mo |
| ClickHouse Cloud / managed analytical tier | ~$390-$1,560/mo |
| CPU instance comparable to host workload | ~$894-$1,340/mo |
| 24 GB GPU instance | ~$734/mo |
| 1 TB internet egress | ~$92/mo |
Baseline total:
| Scenario | Estimate |
|---|---|
| no egress | ~$4,300-$6,300/mo |
| 1 TB egress | ~$4,400-$6,400/mo |
| 5 TB egress | ~$4,800-$6,800/mo |
MediaConvert is the jump scare. AWS Elemental MediaConvert is billed per output minute with normalization multipliers. A 1.3-million-video pass can range from ~$4,875 for very short/simple outputs to ~$78,000 for longer/higher-quality processing assumptions.
At that point the cloud version mostly becomes a budgeting exercise: every queue hop, transcode minute, vector table, replica, and egress path turns into a small line item with sharp little teeth.
Why this works
Mogador works because it follows a few rules:
- Every item has a stable key.
- Every worker can retry safely.
- Every expensive stage is isolated behind a queue.
- Every storage layer has one main job.
- Deduplication happens before long-term storage.
- GPU inference is batched.
- Bulk writes are batched.
- Config is centralized.
- Logs are structured.
- New sources implement a tiny adapter instead of a new platform.