Skip to main content
Enterprise data teams run their analytics on Spark, Hadoop, and lakehouse stacks (Iceberg, Delta). There are two ways to feed that data into eomer:
  • Push to R2 (recommended) — the upstream job writes Parquet directly to your tenant’s R2 bucket via the s3a:// connector.
  • Pull via cloud_fs (fallback) — eomer reads directly from the customer’s Azure/GCS/WebHDFS/S3-compatible store. For regulated or air-gapped environments where a cross-cloud copy is not an option.
Both paths use the same Spark/Hadoop _SUCCESS marker contract, so the consumer never sees a half-written prefix. This guide covers the producer configuration for push, the eomer-side config for both directions, and how to decide which one fits.

Why push to R2 instead of pulling

  • No VPN peering, no firewall holes. The customer’s cluster writes outbound to an HTTPS endpoint.
  • One auth surface. The same R2 token the customer already has covers both direct uploads and Spark pushes.
  • Works with any S3-compatible producer. Spark, Flink, Trino, DuckDB, hadoop distcp, Databricks, EMR — all support the s3a:// scheme out of the box.
  • Chronos-2 is batch anyway. A coherent history snapshot per job run is the right semantic for forecasting; streaming event feeds would need to be materialized into the same shape regardless.

Producer configuration

Apache Spark 3.x

spark = (
    SparkSession.builder
        .appName("eomer-forecast-export")
        .config("spark.hadoop.fs.s3a.endpoint",
                "https://<ACCOUNT_ID>.r2.cloudflarestorage.com")
        .config("spark.hadoop.fs.s3a.access.key", "<R2_ACCESS_KEY_ID>")
        .config("spark.hadoop.fs.s3a.secret.key", "<R2_SECRET_ACCESS_KEY>")
        .config("spark.hadoop.fs.s3a.path.style.access", "true")
        .config("spark.hadoop.fs.s3a.endpoint.region", "auto")
        .getOrCreate()
)

snapshot_prefix = f"s3a://eomer-production-tenant-acme/data/{run_date}/"

(history_df
    .select("item_id", "timestamp", "target")
    .write
    .mode("overwrite")
    .parquet(snapshot_prefix))
Spark’s Parquet writer emits a _SUCCESS marker automatically when all part-files are durable. That’s all eomer needs to know the snapshot is safe to read.

Hadoop distcp

hadoop distcp \
    -Dfs.s3a.endpoint=https://<ACCOUNT_ID>.r2.cloudflarestorage.com \
    -Dfs.s3a.access.key=<R2_ACCESS_KEY_ID> \
    -Dfs.s3a.secret.key=<R2_SECRET_ACCESS_KEY> \
    -Dfs.s3a.path.style.access=true \
    hdfs:///user/analytics/forecast_input/2026-04-17/ \
    s3a://eomer-production-tenant-acme/data/2026-04-17/
distcp writes _SUCCESS at the destination once all files copy cleanly.

Other producers

Any tool that speaks S3 works — Flink, Trino, DuckDB (COPY ... TO 's3://...'), pandas + s3fs. Make sure the job writes a _SUCCESS (or equivalent) marker after the data files are durable. If the tool doesn’t emit one, add a final step: aws s3 cp /dev/null s3://bucket/prefix/_SUCCESS --endpoint-url https://....

Consumer contract (eomer side)

Minimum: require _SUCCESS before reading

ingestion:
  source:
    type: r2
    r2_options:
      bucket: eomer-production-tenant-acme
      key: "data/2026-04-17/"
      file_format: parquet
      require_success_marker: true   # default; fails fast if missing
If the marker is absent, the run aborts with a clear error — eomer never sees a half-written snapshot. require_success_marker is ignored when key points at a single object (no slash).

Rolling pointer: watermark_file

Producers that write dated snapshots and keep history should update a pointer file atomically once each snapshot completes:
r2://bucket/data/2026-04-16/   (old, still retained)
r2://bucket/data/2026-04-17/   (new, with _SUCCESS)
r2://bucket/latest.txt         ← contents: "data/2026-04-17/"
Point eomer at the pointer:
ingestion:
  source:
    type: r2
    r2_options:
      bucket: eomer-production-tenant-acme
      key: ""                         # ignored when watermark_file is set
      file_format: parquet
      watermark_file: "latest.txt"
      require_success_marker: true
eomer reads latest.txt, trims the contents, and uses them as the effective prefix. This decouples the eomer config from the snapshot rotation: the customer’s scheduler owns the pointer; eomer always reads whatever the pointer says “latest” is. A runnable example is in configs/example_r2_spark_push.yaml.

Pull fallback: read directly from the customer’s cloud

Push-to-R2 is the recommended primary path: one auth surface, no VPN peering, producer-side handoff contract. When it’s not viable — regulated environments that disallow cross-cloud copies, customers who already have fresh data in their own Azure/GCS/WebHDFS store, or short pilots where setting up a Spark job is overkill — eomer can pull directly via the cloud_fs connector. One connector covers every fsspec-supported backend:
BackendURI schemeExtras to install
Azure Data Lake / Blobabfs://, abfss://, az://pip install 'eomer-forecasting[azure]'
Google Cloud Storagegs://, gcs://pip install 'eomer-forecasting[gcp]'
S3-compatible (MinIO, Wasabi, AWS S3 in a tenant acct)s3://, s3a://pip install 'eomer-forecasting[s3]'
Hadoop WebHDFSwebhdfs://host:port/pathpip install 'eomer-forecasting[s3]' (fsspec ships the WebHDFS backend)
For Cloudflare R2 keep using type: r2 — the dedicated connector has stricter EOMER_R2_* credential handling and a simpler config. The cloud_fs validator will reject r2:// URIs and point you back to the R2 connector.

Azure Data Lake example

ingestion:
  source:
    type: cloud_fs
    cloud_fs_options:
      uri: "abfs://forecast-inputs@tenantaccount.dfs.core.windows.net/data/2026-04-17/"
      file_format: parquet
      require_success_marker: true
      storage_options:
        account_name: "tenantaccount"
        # account_key / sas_token / client_id etc. resolved from env vars
        # — never embed secrets in the config itself.

GCS example

ingestion:
  source:
    type: cloud_fs
    cloud_fs_options:
      uri: "gs://tenant-forecast-inputs/data/2026-04-17/"
      file_format: parquet
      require_success_marker: true
      # GCS uses Application Default Credentials by default
      # (GOOGLE_APPLICATION_CREDENTIALS pointing at a service-account JSON).

WebHDFS example

ingestion:
  source:
    type: cloud_fs
    cloud_fs_options:
      uri: "webhdfs://hadoop-edge.internal:50070/user/analytics/forecast_input/2026-04-17/"
      file_format: parquet
      require_success_marker: true
      storage_options:
        user: "eomer-service-account"
The _SUCCESS marker contract is identical across all backends — the connector uses the same Spark/Hadoop convention as the R2 push path. Credentials are passed verbatim to fsspec under storage_options; never embed secrets in configs. A runnable example is in configs/example_cloud_fs.yaml.

Choosing between push and pull

Push to R2Pull via cloud_fs
Network directionCustomer → R2 (HTTPS)eomer → customer’s store (HTTPS)
Works across cloudsYesYes (but their firewall must allow eomer’s egress IP)
Handoff contract_SUCCESS marker, producer-owned_SUCCESS marker, producer-owned
Credential scopeOne R2 token, issued by usCustomer-issued read-only token/role
Best forMost enterprise customersAir-gapped / regulated / “already in our cloud, don’t move it”

Troubleshooting

SymptomLikely causeFix
No '_SUCCESS' marker found at r2://.../data/2026-04-17/_SUCCESSSpark job still running, or failed partway throughCheck the upstream job’s state. If the marker is intentionally absent, set require_success_marker: false.
AccessDenied from boto3R2 token missing Object Read on the bucketRecreate the token at Cloudflare dashboard → R2 → Manage R2 API Tokens with both read and write.
SignatureDoesNotMatchWrong region or endpointR2 must use region: auto and the endpoint https://<ACCOUNT_ID>.r2.cloudflarestorage.com. Spark needs spark.hadoop.fs.s3a.path.style.access=true.
Watermark file ... could not be readPointer file missing or wrong bucketVerify the object exists: aws s3 ls s3://bucket/latest.txt --endpoint-url https://....
No objects found under r2://.../data/...Prefix empty or marker present but no part-files (Spark wrote empty partition)Verify the upstream job produced data rows; check Spark’s output plan.
cloud_fs uri '...' uses unsupported protocol 'r2'Tried to use cloud_fs for Cloudflare R2Switch to type: r2 with r2_options — the dedicated connector is intended for R2.
The fsspec backend for 'abfs' is not installed (or gs, etc.)Missing optional extrapip install 'eomer-forecasting[azure]' (Azure) or [gcp] (GCS). Both are included in [all].
See also Tenant-Isolated Storage (R2) for bucket naming and credential setup.