BLOG_POST / mogador-scraping

Mogador: enterprise-grade scraping with one beefy workstation

15 min read
2824 words
tl;dr summary

Mogador is a Rust/Python scraping pipeline running on formidable: RabbitMQ queues, Rust workers, MinIO, Elasticsearch, ClickHouse, GPU tagging, embeddings, dedupe, Kafka logging, a friendly osu! ingestion path for ozut, and a compact extension model for new data formats.

Mogador is my media and text ingestion system. It runs locally on formidable, my workstation, and has become the data layer underneath several ML projects. The current planning snapshot is large enough that it has to behave like infrastructure: more than 13 million files processed, about 9.18 million indexed documents in Elasticsearch, and 11.77 million embedded text rows in ClickHouse.

The machine is a Ryzen 9 7950X with 61 GiB RAM, an RTX 4090, fast SSD working storage, and a RAID1 media tier made from 8 TB disks. The media tier exposes around 7.28 TiB usable, with roughly 6.47 TiB currently used. formidable is also my daily workstation, so the pipeline has to share resources with everything else that runs there. That constraint shaped the architecture more than any whiteboard diagram did.

The high-level idea is straightforward: source adapters discover records, then the rest of the system handles downloading, normalization, deduplication, tagging, embedding, thumbnailing, indexing, logging, and analytics. Source-specific code stops at discovery. The expensive and failure-prone stages live behind queues.

That separation is what made the system grow without turning every new dataset into a separate maintenance problem. A source adapter can be messy at the edges because upstream pagination, metadata, and file availability are always a little messy. Once it emits a shared message, the rest of the pipeline behaves the same way as every other source.

Current snapshot

ComponentSnapshot
Indexed Elasticsearch documents (posts)9,183,737
Elasticsearch deleted docs awaiting merge496,766
Elasticsearch primary store330.7 GB
Tags index documents231,226
ClickHouse lake.docs rows11,767,862
ClickHouse lake.embeddings rows11,768,211
BGE-M3 vector raw footprint44.89 GiB
ClickHouse embedding table on disk34.20 GiB
ClickHouse document table on disk824.98 MiB
Current media disk usage6.47 TiB used of 7.28 TiB usable
Current MinIO staging disk100 GB used

The main flow is:

  1. Scrapers discover records.
  2. PostgreSQL stores scraper state, claims, checkpoints, and idempotency keys.
  3. RabbitMQ dispatches download jobs.
  4. downloader-rs streams media into MinIO staging.
  5. optimizer-rs normalizes media and deduplicates it.
  6. Tagger, embedder, thumbnailer, and indexer workers enrich the artifact.
  7. Elasticsearch serves search, filtering, ranking, and dense-vector retrieval.
  8. ClickHouse stores text rows, embeddings, and analytical history.
  9. Kafka records structured service events for audit, replay, and debugging.

Storage responsibilities

Mogador uses several storage systems because the workload asks different questions at different stages.

LayerRole
RabbitMQtask dispatch and retry boundaries
Kafkastructured event log
PostgreSQLscraper state, claims, old metadata, idempotency
MinIOstaged raw/intermediate objects
RocksDBoptimizer dedupe state and publish reservations
Elasticsearchsearch, filtering, ranking, dense-vector retrieval
ClickHousetext lake, embeddings, analytics, feature history
filesystemfinal media and thumbnail storage

RabbitMQ decides what should happen next. Kafka records what happened. PostgreSQL answers whether a scraper item has been seen before. RocksDB answers whether a piece of media is duplicate or currently being processed. Elasticsearch handles interactive search. ClickHouse handles scans over millions of rows.

Final media and thumbnails use hash fan-out / Git-style directory sharding. A UUID or content-derived key is split into prefix directories, for example e9/c8/..., so each directory stays within a reasonable size. Once a corpus reaches millions of files, this stops being a neat trick and becomes basic hygiene.

Queueing and the Rust rewrite

The first versions of the system were more Python-heavy. Python is still the right tool for model work, but the queue workers that stream, hash, transcode, claim, dedupe, and publish need predictable memory behavior and clean concurrency. Those pieces moved to Rust.

The Rust services share a kernel for AMQP, message models, pipeline state, retries, and source adapters. The common model exports messages such as DownloadMessage, OptimizeMessage, TextEmbedMessage, and stable post IDs. A new source adapter mainly needs a config file, a worker implementing the kernel traits, a stable item key, a checkpoint strategy, and a classifier that creates the shared message type.

The kernel also keeps operational details consistent: queue names, durable publish properties, retry accounting, dead-letter routing, and message IDs. Those details are exactly where ingestion systems tend to get weird if each scraper grows its own private habits.

That design keeps new source work local: add the adapter, emit the shared message, and let the existing workers handle the expensive stages. It also keeps failures easier to reason about. If a source emits malformed work, the failure is at the adapter or classifier boundary. If media cannot be transformed, the failure is in the optimizer. If Elasticsearch rejects a bulk request, the failure is in the indexer. The boundaries are visible.

RabbitMQ queues are declared as quorum queues. They cost more than classic queues, but they give the task layer enough durability that worker restarts are ordinary retry events. Messages use stable IDs and durable delivery, and workers decide explicitly whether to ack, retry, or dead-letter.

That retry decision is a real part of the design. A transient HTTP failure should be retried. A missing upstream item should be dead-lettered with http_404. An unsupported file should be classified as unsupported_media. A model failure should keep enough context to re-run the item once the service is healthy again. When failure classes are explicit, cleanup becomes operational work instead of guesswork.

Downloader and optimizer

The downloader is deliberately mechanical. It streams bytes, counts them, enforces max-size limits, blocks unwanted extensions, applies per-source cookies when configured, uploads to MinIO, and publishes an optimization message. Decoding, tagging, and search-document writes belong to later workers.

That restraint is useful because downloading has its own failure modes: timeouts, 404s, redirects, oversized files, broken TLS, missing content types, and temporary upstream weirdness. Keeping the worker focused makes those outcomes easier to classify and retry safely.

The optimizer is where raw artifacts become normalized outputs. It downloads staged objects into a temp directory, computes exact hashes, computes perceptual hashes, reserves dedupe state, transcodes or converts the media, uploads the normalized output, and publishes downstream messages.

For images, it computes BLAKE3 for exact identity, then image perceptual hashes such as aHash, dHash, and pHash. Image normalization goes through libvips, which is fast and memory-efficient enough for this workload.

For videos, it computes BLAKE3, samples frames, computes perceptual hashes for sampled frames, and uses ffmpeg for normalization. AV1-oriented encodes are part of the toolbox where the storage and quality tradeoff makes sense. Video processing is where CPU, disk IO, and codec settings all quietly conspire to make benchmarks look rude.

Deduplication is backed by RocksDB column families for exact, image, and video states. The publish state matters as much as the match itself. A duplicate result with unpublished downstream messages is a different operational state from one that is fully committed. The optimizer therefore tracks items through pending, ready-to-publish, publishing, published, and aborted states.

This is also where the pipeline protects storage. Keeping duplicate detection close to normalization means downstream workers avoid repeating work on the same media. Tagging, embeddings, thumbnails, and indexing can reuse the canonical artifact instead of treating every duplicate URL as a new universe. At this scale, dedupe is a cost-control feature as much as a cleanliness feature.

Tagging and embeddings

The tagger uses SmilingWolf/wd-eva02-large-tagger-v3 through ONNXRuntime GPU. It consumes to_tag, pulls the media, extracts frames for videos, runs batched inference, writes tag results, and publishes indexing work. The output is ranked semantic tags, characters, style descriptors, and safety-ish categories. Those fields are useful for search facets, dataset filtering, training manifest exports, and later ML experiments.

The embedder uses BAAI/bge-m3 through FlagEmbedding. It consumes two streams:

QueueRole
to_embedmedia-derived search documents
to_embed_texttext/comment rows for ClickHouse

Both services run on the 4090 and use short microbatch windows. That sounds like a minor implementation detail until you watch a GPU sit underfed because every handler called inference with one item. The service waits a few milliseconds, collects compatible work, runs a batch, and returns per-message results. The goal is steady throughput without adding meaningful latency.

The ClickHouse side stores raw text and embeddings separately. lake.docs stores source, source ID, UUID, timestamps, author, text, and metadata JSON. lake.embeddings stores model, document UUID, timestamp, and the Array(Float32) embedding. Compression keeps the footprint reasonable: the planning snapshot is 44.89 GiB raw vector data compressed to 34.20 GiB on disk.

That split is useful for research work. Raw text keeps the original analytical surface: lengths, authors, source classes, timestamps, and metadata fields. Vectors add semantic neighborhoods for clustering, retrieval, style grouping, and dataset construction. The same item can be counted like a row and retrieved like an embedding, which is exactly what the downstream projects need.

Elasticsearch and indexing

Elasticsearch is the online search store. It holds metadata, tags, dense vectors, and the fields needed for filtering and ranking experiments. The indexer prepares messages concurrently, estimates batch size, and flushes through the Elasticsearch bulk API. Bulk writes keep throughput predictable and reduce per-document overhead.

This stage is also where metadata consistency matters. Tags, embedding references, thumbnail paths, dedupe status, source-local IDs, and normalized media paths have to arrive as one coherent document. If Elasticsearch rejects part of a bulk request, the indexer retries the affected messages instead of pretending the batch succeeded.

The reason Elasticsearch and ClickHouse both exist is practical. Elasticsearch is better for interactive search, dense-vector retrieval, and filter-heavy user queries. ClickHouse is better for analytical scans, grouping, compression, and feature history. Forcing either one to replace the other would make the whole system harder to operate.

Kafka and observability

Kafka is the structured event log. Services emit events such as:

  • download_succeeded
  • download_failed
  • duplicate_detected
  • optimizer_stage_completed
  • tag_batch_completed
  • embedding_batch_inserted
  • index_bulk_flushed
  • dead_lettered

Each event carries fields like event ID, trace ID, service, event name, source class, media kind, duration, queue, attempt, and timestamp. That gives Mogador a replayable operational history without forcing Elasticsearch or ClickHouse to become the canonical timeline.

Prometheus and Grafana cover service metrics. RabbitMQ, Elasticsearch, node, and cAdvisor exporters cover the infrastructure layer. The services also expose their own counters and histograms for message outcomes, queue latency, per-stage timings, batch sizes, model load events, insert counts, and dead-letter reasons.

Dead-letter queues need reason labels, retry counts, and a path back into the pipeline. A useful dead-letter message should be something I can inspect, repair, and requeue. Otherwise it becomes a storage system for unresolved anxiety.

Observability also keeps resource sharing sane. The workstation is doing normal workstation things while the pipeline runs, so service metrics need to show whether pressure comes from disk IO, ffmpeg transcodes, RocksDB reservations, RabbitMQ backlog, ClickHouse inserts, Elasticsearch bulk rejections, or GPU saturation. Without that separation, every slowdown becomes “the computer is haunted,” which is emotionally satisfying and operationally useless.

Custom formats and transformers

Mogador treats formats as transformable artifacts. A source emits a generic download job, and the downstream pipeline decides how to normalize it.

Input classOutput
static imageWebP
animated/video mediaMP4 preview or normalized MP4
text/comment documentsClickHouse row + BGE-M3 embedding
osu! beatmap archivescleaned archive + Opus audio + patched .osu metadata

The transformer contract is explicit: inspect the input, transform it into a normalized output, and emit downstream messages. That keeps new-format support away from scraper rewrites.

This matters because formats have different failure modes. Images can be corrupt. Videos can have odd containers. Text documents can be empty or too long for a particular model window. Beatmap archives have their own internal references. The transformer layer is where those format-specific rules live, while the rest of the pipeline still sees stable messages and normalized artifacts.

osu! ingestion for ozut

The osu! path supports ozut, my osu! beatmap generation project. A metadata sync keeps a local SQLite database of beatmapsets, beatmaps, cursor state, download status, and hashes. Beatmap archives are then pushed into Mogador’s to_download queue.

The beatmap transformer extracts the .osz archive, converts the primary audio track to Opus 128 kbps, patches every .osu file so AudioFilename: references the new Opus track, and removes unrelated assets such as custom hitsounds, storyboards, videos, and backgrounds. The cleaned archive keeps timing, hit objects, difficulty metadata, and audio. That is the part ozut needs.

The current ozut training corpus is over 200 GB of maps and 301,374 beatmaps. Having it flow through the same queue/worker model means it benefits from the existing retry, storage, and transformation machinery. The useful invariant is that cleaned .osu files point at the normalized Opus track through AudioFilename: audio.opus, while unrelated assets are removed before the archive becomes model input.

That makes the corpus reproducible. If the audio conversion rules, asset stripping rules, or metadata sync rules change, the transformer boundary is the place to version and re-run the work. The training side can then consume prepared artifacts instead of encoding ingestion assumptions into every experiment.

Downstream projects

Mogador has already supported several projects. The value is cumulative: once a corpus has stable IDs, dedupe, tags, embeddings, and metadata, each new experiment starts from a much higher floor.

The comment corpus fed a DeBERTa-based style classifier. The embeddings also made it practical to cluster comment styles and build topic-modeling workflows over millions of rows. ClickHouse handles the scans; BGE-M3 provides semantic neighborhoods.

The same text corpus supports local LLM fine-tuning via Unsloth. The commenter service uses a local causal language model with LoRA checkpoints to generate comments conditioned on style and tags. That work stays in the experimental data path, separate from public search signals.

The media side has been used for SDXL and SD3 LoRA dataset construction. Tags, dimensions, dedupe status, and search filters make it possible to select training sets by concept instead of hand-curating folders. This is where a well-maintained index turns into fewer hours of staring at manifests.

ozut uses the cleaned osu! archive path. Recommender and reranking experiments use the Elasticsearch and ClickHouse side: dense vectors, tags, metadata, and analytical aggregates.

AWS estimate

A rough AWS version of this system becomes expensive because every local boundary maps to a managed service or instance class. The estimate assumes:

  • 9.46 TiB object corpus in S3 Standard
  • two 8 TB disks worth of gp3-equivalent block storage for RAID1-like durability
  • 13M processed files
  • 6 queue hops per file
  • send + receive + delete per queue hop
  • OpenSearch with production-ish redundancy
  • ClickHouse Cloud or equivalent managed analytical storage
  • one CPU-heavy instance comparable to the 7950X host
  • one 24 GB GPU instance for ML services
  • MediaConvert as a separate processing/backfill cost

Approximate monthly baseline, excluding MediaConvert:

ComponentEstimate
S3 Standard for 9.46 TiB~$223/mo
S3 requests for 13M objects, rough~$211 per large processing pass
SQS for ~234M billed requests~$93 per large processing pass
gp3 for 16 TB raw RAID1-equivalent block storage~$1,311/mo
OpenSearch compute~$366-$731/mo
OpenSearch storage, 3x 331 GB~$121/mo
ClickHouse Cloud / managed analytical tier~$390-$1,560/mo
CPU instance comparable to host workload~$894-$1,340/mo
24 GB GPU instance~$734/mo
1 TB internet egress~$92/mo

Baseline total:

ScenarioEstimate
no egress~$4,300-$6,300/mo
1 TB egress~$4,400-$6,400/mo
5 TB egress~$4,800-$6,800/mo

MediaConvert is the jump scare. A 1.3M-video processing pass can range from ~$4,875 for very short outputs to ~$78,000 for longer or higher-quality processing assumptions.

Those numbers are directional, but they explain why the local design exists. The cloud version is possible; the cost model turns every queue hop, replica, transcode minute, vector table, and egress path into a line item. Locally, the costs are disks, electricity, hardware wear, and maintenance time. That tradeoff fits this workload.

The local version also makes experimentation cheaper psychologically. Reprocessing a subset, adding a transformer, or testing a new model can happen without opening a billing dashboard first. The constraint becomes hardware scheduling and disk space, which is much easier for me to reason about than a pile of managed-service meters.

Why it works

Mogador works because each expensive stage has a boundary. Items have stable keys. Workers retry safely. Dedupe happens before long-term storage. GPU inference is batched. Bulk writes are batched. Search and analytics are separate stores. Kafka gives the system a replayable timeline. New source adapters and new formats plug into existing contracts rather than creating new ingestion islands.

That is the useful part: the system is large, but the operational model stays legible. When something fails, there is a queue, a state record, an event, a metric, or a dead-letter reason that points to the next action.

The result is a pipeline that can support several different research directions without changing shape every time. Comment classification, topic modeling, LLM fine-tuning, LoRA dataset construction, ozut, retrieval, and reranking all reuse the same foundation: stable IDs, normalized artifacts, dedupe, tags, embeddings, and search/analytics storage.

hash: 66d
EOF