Skip to content

Python API reference

Generated with mkdocstrings from package docstrings. Install the package and docs extras: pip install -e ".[docs]".

Runner

lrdbench.runner.BenchmarkRunner

Orchestrate a complete benchmark run.

The runner implements the full benchmark loop:

  1. Load and validate the manifest.
  2. Materialise records from generators or observational sources.
  3. Optionally train data-driven estimators.
  4. Fit every enrolled estimator to every record.
  5. Evaluate mode-appropriate metrics.
  6. Build leaderboards.
  7. Persist results to a :class:CsvResultStore.
  8. Generate HTML/CSV/LaTeX report artefacts.

Example::

from lrdbench.runner import BenchmarkRunner
from lrdbench.manifest import load_manifest

runner = BenchmarkRunner()
manifest = load_manifest("my_suite.yaml")
output = runner.run(manifest)
print(output.run_id)
Source code in src/lrdbench/runner.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
class BenchmarkRunner:
    """Orchestrate a complete benchmark run.

    The runner implements the full benchmark loop:

    1. Load and validate the manifest.
    2. Materialise records from generators or observational sources.
    3. Optionally train data-driven estimators.
    4. Fit every enrolled estimator to every record.
    5. Evaluate mode-appropriate metrics.
    6. Build leaderboards.
    7. Persist results to a :class:`CsvResultStore`.
    8. Generate HTML/CSV/LaTeX report artefacts.

    Example::

        from lrdbench.runner import BenchmarkRunner
        from lrdbench.manifest import load_manifest

        runner = BenchmarkRunner()
        manifest = load_manifest("my_suite.yaml")
        output = runner.run(manifest)
        print(output.run_id)
    """

    def __init__(
        self,
        *,
        generators: GeneratorRegistry | None = None,
        estimators: EstimatorRegistry | None = None,
        contaminations: ContaminationRegistry | None = None,
        discover_plugins: bool = True,
    ) -> None:
        self.generators = generators or build_default_generator_registry()
        self.contaminations = contaminations or build_default_contamination_registry()
        self._plugin_provenance: list[PluginProvenanceRecord] = []
        if estimators is not None:
            self.estimators = estimators
        elif discover_plugins:
            from lrdbench.plugin_loader import build_estimator_registry_with_plugins

            reg, results = build_estimator_registry_with_plugins()
            self.estimators = reg
            self._plugin_provenance = [
                PluginProvenanceRecord(
                    plugin_name=r.plugin_name,
                    module_name_or_path=r.module_name_or_path,
                    entry_point_name=r.entry_point_name,
                    version=r.version,
                    status=r.status,
                    failure_reason=r.failure_reason,
                    source_hash=r.source_hash,
                )
                for r in results
            ]
        else:
            self.estimators = build_default_estimator_registry()
        self._gt_evaluator = GroundTruthEvaluator()
        self._leaderboard = WeightedRankLeaderboardBuilder()
        self._reporter = SimpleHtmlCsvReporter()

    def run(
        self,
        manifest: BenchmarkManifest,
        *,
        manifest_path: Path | None = None,
        base_dir: Path | None = None,
    ) -> BenchmarkRunOutput:
        """Execute the full benchmark loop for ``manifest``.

        Args:
            manifest: A validated :class:`BenchmarkManifest`.
            manifest_path: Path to the manifest file, used to resolve
                relative paths (e.g. observational CSV files).
            base_dir: Alternative directory for relative path resolution.
                If ``None``, defaults to ``manifest_path.parent`` or
                :obj:`Path.cwd()`.

        Returns:
            A :class:`BenchmarkRunOutput` containing the run ID, records,
            estimates, metrics, leaderboards, and report bundle.

        Raises:
            NotImplementedError: If the manifest mode is not supported.
            ValueError: If the manifest requests unsupported generator
                parameters (e.g. ARFIMA with ``p != 0`` or ``q != 0``).
        """
        if manifest.mode not in (
            BenchmarkMode.GROUND_TRUTH,
            BenchmarkMode.STRESS_TEST,
            BenchmarkMode.OBSERVATIONAL,
        ):
            raise NotImplementedError(
                f"mode {manifest.mode.value!r} is not implemented in this release "
                f"(supported: ground_truth, stress_test, observational)"
            )
        run_id = str(uuid.uuid4())
        global_seed = int(manifest.seed_spec.get("global_seed", 0))

        resolve_dir = (
            base_dir
            if base_dir is not None
            else (manifest_path.parent if manifest_path is not None else Path.cwd())
        )

        if manifest.mode is BenchmarkMode.GROUND_TRUTH:
            records = self._generate_records_ground_truth(manifest, global_seed)
            evaluator: BaseEvaluator = self._gt_evaluator
        elif manifest.mode is BenchmarkMode.STRESS_TEST:
            records = self._generate_records_stress_test(manifest, global_seed)
            evaluator = self._gt_evaluator
        else:
            records = load_observational_records(
                manifest, base_dir=resolve_dir, global_seed=global_seed
            )
            evaluator = ObservationalEvaluator(self.estimators)

        report_spec = manifest.report_spec or ReportSpec(
            formats=("html", "csv"),
            leaderboards=tuple(manifest.leaderboard_specs),
        )
        if not report_spec.leaderboards and manifest.leaderboard_specs:
            report_spec = replace(report_spec, leaderboards=tuple(manifest.leaderboard_specs))
        export_root = Path(report_spec.export_root)

        manifest = prepare_data_driven_estimators(
            manifest,
            generators=self.generators,
            contaminations=self.contaminations,
            run_id=run_id,
            artefact_root=export_root,
            global_seed=global_seed,
        )

        estimates = run_fit_jobs(
            collect_fit_jobs(records, manifest.estimator_specs),
            estimators=self.estimators,
            execution_spec=dict(manifest.execution_spec),
            cwd=resolve_dir,
        )

        metrics = evaluator.evaluate(manifest, records, estimates)
        boards = self._leaderboard.build(manifest, metrics)

        store_root = export_root / run_id
        store = CsvResultStore(store_root)
        store.write_run_metadata(manifest, run_id)
        store.write_records(records)
        store.write_estimates(estimates)
        store.write_metrics(metrics)
        store.write_leaderboards(boards)

        bundle = self._reporter.build(
            manifest,
            metrics,
            boards,
            report_spec=report_spec,
            run_id=run_id,
        )
        model_artefacts = _ml_model_artefacts(run_id, export_root)
        if model_artefacts:
            bundle = replace(bundle, artefacts=tuple(bundle.artefacts) + model_artefacts)

        store.write_plugin_provenance(self._plugin_provenance)
        store.write_artefacts(bundle.artefacts)
        store_path = store.finalise()
        bundle = replace(bundle, result_store_path=store_path)

        return BenchmarkRunOutput(
            run_id=run_id,
            records=tuple(records),
            estimates=tuple(estimates),
            metrics=metrics,
            leaderboards=boards,
            report_bundle=bundle,
            result_store_path=store_path,
            plugin_provenance=tuple(self._plugin_provenance),
        )

    def preview(
        self,
        manifest: BenchmarkManifest,
        *,
        manifest_path: Path | None = None,
        base_dir: Path | None = None,
    ) -> dict[str, object]:
        """Dry-run preview: materialise records and report grid size without fitting.

        Returns:
            Dictionary with ``mode``, ``n_records``, ``n_estimators``,
            ``n_fit_jobs``, ``n_clean``, ``n_contaminated``, and ``global_seed``.
        """
        if manifest.mode not in (
            BenchmarkMode.GROUND_TRUTH,
            BenchmarkMode.STRESS_TEST,
            BenchmarkMode.OBSERVATIONAL,
        ):
            raise NotImplementedError(
                f"mode {manifest.mode.value!r} is not implemented in this release "
                f"(supported: ground_truth, stress_test, observational)"
            )
        global_seed = int(manifest.seed_spec.get("global_seed", 0))
        resolve_dir = (
            base_dir
            if base_dir is not None
            else (manifest_path.parent if manifest_path is not None else Path.cwd())
        )

        if manifest.mode is BenchmarkMode.GROUND_TRUTH:
            records = self._generate_records_ground_truth(manifest, global_seed)
        elif manifest.mode is BenchmarkMode.STRESS_TEST:
            records = self._generate_records_stress_test(manifest, global_seed)
        else:
            records = load_observational_records(
                manifest, base_dir=resolve_dir, global_seed=global_seed
            )

        n_records = len(records)
        n_estimators = len(manifest.estimator_specs)
        n_clean = sum(1 for r in records if r.annotations.get("stress_role") != "contaminated")
        n_contaminated = n_records - n_clean
        return {
            "mode": manifest.mode.value,
            "n_records": n_records,
            "n_estimators": n_estimators,
            "n_fit_jobs": n_records * n_estimators,
            "n_clean": n_clean,
            "n_contaminated": n_contaminated,
            "global_seed": global_seed,
        }

    def _generate_records_ground_truth(
        self, manifest: BenchmarkManifest, global_seed: int
    ) -> list[SeriesRecord]:
        triples = _expand_generator_grid(dict(manifest.source_spec))
        records: list[SeriesRecord] = []
        for family, params, rep in triples:
            gen = self.generators.get(family)
            rid = _record_id(manifest.manifest_id, family, params, rep)
            seed = _stable_seed(global_seed, manifest.manifest_id, family, params, rep)
            rec = gen.generate(
                record_id=rid,
                params=params,
                seed=seed,
                manifest_id=manifest.manifest_id,
            )
            records.append(rec)
        return records

    def _generate_records_stress_test(
        self, manifest: BenchmarkManifest, global_seed: int
    ) -> list[SeriesRecord]:
        triples = _expand_generator_grid(dict(manifest.source_spec))
        scenarios = _expand_contamination_grid(dict(manifest.contamination_spec))
        records: list[SeriesRecord] = []
        for family, params, rep in triples:
            gen = self.generators.get(family)
            rid = _record_id(manifest.manifest_id, family, params, rep)
            seed = _stable_seed(global_seed, manifest.manifest_id, family, params, rep)
            rec = gen.generate(
                record_id=rid,
                params=params,
                seed=seed,
                manifest_id=manifest.manifest_id,
            )
            clean = replace(
                rec,
                annotations={
                    **dict(rec.annotations),
                    "stress_role": "clean",
                    "pair_group_id": rec.record_id,
                    "contamination_operator": "clean",
                    "contamination_family": "clean",
                    "contamination_severity": "clean",
                },
            )
            records.append(clean)
            for op_name, op_params in scenarios:
                op = self.contaminations.get(op_name)
                nid = _contam_record_id(manifest.manifest_id, clean.record_id, op_name, op_params)
                cseed = _stable_seed(
                    global_seed,
                    manifest.manifest_id,
                    "contam",
                    clean.record_id,
                    op_name,
                    tuple(sorted(op_params.items())),
                )
                contaminated = op.apply(
                    clean,
                    params=op_params,
                    seed=cseed,
                    manifest_id=manifest.manifest_id,
                    new_record_id=nid,
                )
                records.append(contaminated)
        return records

run(manifest, *, manifest_path=None, base_dir=None)

Execute the full benchmark loop for manifest.

Parameters:

Name Type Description Default
manifest BenchmarkManifest

A validated :class:BenchmarkManifest.

required
manifest_path Path | None

Path to the manifest file, used to resolve relative paths (e.g. observational CSV files).

None
base_dir Path | None

Alternative directory for relative path resolution. If None, defaults to manifest_path.parent or :obj:Path.cwd().

None

Returns:

Name Type Description
A BenchmarkRunOutput

class:BenchmarkRunOutput containing the run ID, records,

BenchmarkRunOutput

estimates, metrics, leaderboards, and report bundle.

Raises:

Type Description
NotImplementedError

If the manifest mode is not supported.

ValueError

If the manifest requests unsupported generator parameters (e.g. ARFIMA with p != 0 or q != 0).

Source code in src/lrdbench/runner.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
def run(
    self,
    manifest: BenchmarkManifest,
    *,
    manifest_path: Path | None = None,
    base_dir: Path | None = None,
) -> BenchmarkRunOutput:
    """Execute the full benchmark loop for ``manifest``.

    Args:
        manifest: A validated :class:`BenchmarkManifest`.
        manifest_path: Path to the manifest file, used to resolve
            relative paths (e.g. observational CSV files).
        base_dir: Alternative directory for relative path resolution.
            If ``None``, defaults to ``manifest_path.parent`` or
            :obj:`Path.cwd()`.

    Returns:
        A :class:`BenchmarkRunOutput` containing the run ID, records,
        estimates, metrics, leaderboards, and report bundle.

    Raises:
        NotImplementedError: If the manifest mode is not supported.
        ValueError: If the manifest requests unsupported generator
            parameters (e.g. ARFIMA with ``p != 0`` or ``q != 0``).
    """
    if manifest.mode not in (
        BenchmarkMode.GROUND_TRUTH,
        BenchmarkMode.STRESS_TEST,
        BenchmarkMode.OBSERVATIONAL,
    ):
        raise NotImplementedError(
            f"mode {manifest.mode.value!r} is not implemented in this release "
            f"(supported: ground_truth, stress_test, observational)"
        )
    run_id = str(uuid.uuid4())
    global_seed = int(manifest.seed_spec.get("global_seed", 0))

    resolve_dir = (
        base_dir
        if base_dir is not None
        else (manifest_path.parent if manifest_path is not None else Path.cwd())
    )

    if manifest.mode is BenchmarkMode.GROUND_TRUTH:
        records = self._generate_records_ground_truth(manifest, global_seed)
        evaluator: BaseEvaluator = self._gt_evaluator
    elif manifest.mode is BenchmarkMode.STRESS_TEST:
        records = self._generate_records_stress_test(manifest, global_seed)
        evaluator = self._gt_evaluator
    else:
        records = load_observational_records(
            manifest, base_dir=resolve_dir, global_seed=global_seed
        )
        evaluator = ObservationalEvaluator(self.estimators)

    report_spec = manifest.report_spec or ReportSpec(
        formats=("html", "csv"),
        leaderboards=tuple(manifest.leaderboard_specs),
    )
    if not report_spec.leaderboards and manifest.leaderboard_specs:
        report_spec = replace(report_spec, leaderboards=tuple(manifest.leaderboard_specs))
    export_root = Path(report_spec.export_root)

    manifest = prepare_data_driven_estimators(
        manifest,
        generators=self.generators,
        contaminations=self.contaminations,
        run_id=run_id,
        artefact_root=export_root,
        global_seed=global_seed,
    )

    estimates = run_fit_jobs(
        collect_fit_jobs(records, manifest.estimator_specs),
        estimators=self.estimators,
        execution_spec=dict(manifest.execution_spec),
        cwd=resolve_dir,
    )

    metrics = evaluator.evaluate(manifest, records, estimates)
    boards = self._leaderboard.build(manifest, metrics)

    store_root = export_root / run_id
    store = CsvResultStore(store_root)
    store.write_run_metadata(manifest, run_id)
    store.write_records(records)
    store.write_estimates(estimates)
    store.write_metrics(metrics)
    store.write_leaderboards(boards)

    bundle = self._reporter.build(
        manifest,
        metrics,
        boards,
        report_spec=report_spec,
        run_id=run_id,
    )
    model_artefacts = _ml_model_artefacts(run_id, export_root)
    if model_artefacts:
        bundle = replace(bundle, artefacts=tuple(bundle.artefacts) + model_artefacts)

    store.write_plugin_provenance(self._plugin_provenance)
    store.write_artefacts(bundle.artefacts)
    store_path = store.finalise()
    bundle = replace(bundle, result_store_path=store_path)

    return BenchmarkRunOutput(
        run_id=run_id,
        records=tuple(records),
        estimates=tuple(estimates),
        metrics=metrics,
        leaderboards=boards,
        report_bundle=bundle,
        result_store_path=store_path,
        plugin_provenance=tuple(self._plugin_provenance),
    )

preview(manifest, *, manifest_path=None, base_dir=None)

Dry-run preview: materialise records and report grid size without fitting.

Returns:

Type Description
dict[str, object]

Dictionary with mode, n_records, n_estimators,

dict[str, object]

n_fit_jobs, n_clean, n_contaminated, and global_seed.

Source code in src/lrdbench/runner.py
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
def preview(
    self,
    manifest: BenchmarkManifest,
    *,
    manifest_path: Path | None = None,
    base_dir: Path | None = None,
) -> dict[str, object]:
    """Dry-run preview: materialise records and report grid size without fitting.

    Returns:
        Dictionary with ``mode``, ``n_records``, ``n_estimators``,
        ``n_fit_jobs``, ``n_clean``, ``n_contaminated``, and ``global_seed``.
    """
    if manifest.mode not in (
        BenchmarkMode.GROUND_TRUTH,
        BenchmarkMode.STRESS_TEST,
        BenchmarkMode.OBSERVATIONAL,
    ):
        raise NotImplementedError(
            f"mode {manifest.mode.value!r} is not implemented in this release "
            f"(supported: ground_truth, stress_test, observational)"
        )
    global_seed = int(manifest.seed_spec.get("global_seed", 0))
    resolve_dir = (
        base_dir
        if base_dir is not None
        else (manifest_path.parent if manifest_path is not None else Path.cwd())
    )

    if manifest.mode is BenchmarkMode.GROUND_TRUTH:
        records = self._generate_records_ground_truth(manifest, global_seed)
    elif manifest.mode is BenchmarkMode.STRESS_TEST:
        records = self._generate_records_stress_test(manifest, global_seed)
    else:
        records = load_observational_records(
            manifest, base_dir=resolve_dir, global_seed=global_seed
        )

    n_records = len(records)
    n_estimators = len(manifest.estimator_specs)
    n_clean = sum(1 for r in records if r.annotations.get("stress_role") != "contaminated")
    n_contaminated = n_records - n_clean
    return {
        "mode": manifest.mode.value,
        "n_records": n_records,
        "n_estimators": n_estimators,
        "n_fit_jobs": n_records * n_estimators,
        "n_clean": n_clean,
        "n_contaminated": n_contaminated,
        "global_seed": global_seed,
    }

__init__(*, generators=None, estimators=None, contaminations=None, discover_plugins=True)

Source code in src/lrdbench/runner.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def __init__(
    self,
    *,
    generators: GeneratorRegistry | None = None,
    estimators: EstimatorRegistry | None = None,
    contaminations: ContaminationRegistry | None = None,
    discover_plugins: bool = True,
) -> None:
    self.generators = generators or build_default_generator_registry()
    self.contaminations = contaminations or build_default_contamination_registry()
    self._plugin_provenance: list[PluginProvenanceRecord] = []
    if estimators is not None:
        self.estimators = estimators
    elif discover_plugins:
        from lrdbench.plugin_loader import build_estimator_registry_with_plugins

        reg, results = build_estimator_registry_with_plugins()
        self.estimators = reg
        self._plugin_provenance = [
            PluginProvenanceRecord(
                plugin_name=r.plugin_name,
                module_name_or_path=r.module_name_or_path,
                entry_point_name=r.entry_point_name,
                version=r.version,
                status=r.status,
                failure_reason=r.failure_reason,
                source_hash=r.source_hash,
            )
            for r in results
        ]
    else:
        self.estimators = build_default_estimator_registry()
    self._gt_evaluator = GroundTruthEvaluator()
    self._leaderboard = WeightedRankLeaderboardBuilder()
    self._reporter = SimpleHtmlCsvReporter()

lrdbench.runner.run_manifest_path(path, *, discover_plugins=True)

Convenience entry-point: load a manifest from disk and run it.

Parameters:

Name Type Description Default
path str | Path

Filesystem path to a YAML manifest.

required
discover_plugins bool

Whether to auto-discover third-party estimator plugins via environment variables.

True

Returns:

Type Description
BenchmarkRunOutput

The completed benchmark run output.

Source code in src/lrdbench/runner.py
420
421
422
423
424
425
426
427
428
429
430
431
432
433
def run_manifest_path(path: str | Path, *, discover_plugins: bool = True) -> BenchmarkRunOutput:
    """Convenience entry-point: load a manifest from disk and run it.

    Args:
        path: Filesystem path to a YAML manifest.
        discover_plugins: Whether to auto-discover third-party estimator
            plugins via environment variables.

    Returns:
        The completed benchmark run output.
    """
    p = Path(path)
    manifest = load_manifest(p)
    return BenchmarkRunner(discover_plugins=discover_plugins).run(manifest, manifest_path=p)

lrdbench.runner.run_manifest_mapping(data, *, base_dir=None, discover_plugins=True)

Convenience entry-point: run a benchmark from an in-memory dictionary.

This is useful for programmatic benchmark construction or testing.

Parameters:

Name Type Description Default
data dict[str, Any]

Dictionary matching the manifest schema.

required
base_dir Path | None

Directory used to resolve relative paths (e.g. CSV files).

None
discover_plugins bool

Whether to auto-discover third-party estimator plugins via environment variables.

True

Returns:

Type Description
BenchmarkRunOutput

The completed benchmark run output.

Source code in src/lrdbench/runner.py
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
def run_manifest_mapping(
    data: dict[str, Any], *, base_dir: Path | None = None, discover_plugins: bool = True
) -> BenchmarkRunOutput:
    """Convenience entry-point: run a benchmark from an in-memory dictionary.

    This is useful for programmatic benchmark construction or testing.

    Args:
        data: Dictionary matching the manifest schema.
        base_dir: Directory used to resolve relative paths (e.g. CSV files).
        discover_plugins: Whether to auto-discover third-party estimator
            plugins via environment variables.

    Returns:
        The completed benchmark run output.
    """
    manifest = manifest_from_mapping(data)
    return BenchmarkRunner(discover_plugins=discover_plugins).run(
        manifest, base_dir=base_dir or Path.cwd()
    )

Manifest

lrdbench.manifest.load_manifest(path)

Source code in src/lrdbench/manifest.py
110
111
def load_manifest(path: str | Path) -> BenchmarkManifest:
    return manifest_from_mapping(load_manifest_yaml(path))

lrdbench.manifest.manifest_from_mapping(data)

Source code in src/lrdbench/manifest.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def manifest_from_mapping(data: Mapping[str, Any]) -> BenchmarkManifest:
    mode = BenchmarkMode(str(data["mode"]))
    estimators = _estimator_specs_from_manifest_entries(list(data["estimators"]))
    metrics_list: list[Any] = list(data["metrics"])
    try:
        metric_specs = metric_specs_from_manifest_entries(metrics_list)
    except (TypeError, ValueError) as exc:
        raise ManifestValidationError(str(exc)) from exc

    lbs_raw = data.get("leaderboards") or []
    leaderboard_specs = tuple(leaderboard_spec_from_mapping(x) for x in lbs_raw)

    report_spec = None
    if "report" in data and data["report"] is not None:
        report_spec = report_spec_from_mapping(data["report"])

    manifest = BenchmarkManifest(
        manifest_id=str(data["manifest_id"]),
        name=str(data["name"]),
        mode=mode,
        source_spec=dict(data["source"]),
        contamination_spec=dict(data.get("contamination") or {}),
        segmentation_spec=dict(data.get("segmentation") or {}),
        preprocessing_spec=dict(data.get("preprocessing") or {}),
        estimator_specs=estimators,
        metric_specs=metric_specs,
        leaderboard_specs=leaderboard_specs,
        report_spec=report_spec,
        execution_spec=dict(data.get("execution") or {}),
        uncertainty_spec=dict(data.get("uncertainty") or {}),
        ml_training_spec=dict(data.get("ml_training") or {}),
        seed_spec=dict(data.get("seeds") or {}),
        raw_yaml=dict(data),
    )
    strict = bool((data.get("validation") or {}).get("reject_unknown_keys", True))
    validate_manifest(manifest, strict_unknown_keys=strict)
    return manifest

Estimator interface

lrdbench.interfaces.BaseEstimator

Bases: ABC

Abstract base for long-range dependence estimators.

All estimators enrolled in a benchmark must implement this interface. The :meth:fit method receives a :class:SeriesRecord and must return an :class:EstimateResult containing at minimum a point estimate and a validity flag.

Source code in src/lrdbench/interfaces.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
class BaseEstimator(ABC):
    """Abstract base for long-range dependence estimators.

    All estimators enrolled in a benchmark must implement this interface.
    The :meth:`fit` method receives a :class:`SeriesRecord` and must return
    an :class:`EstimateResult` containing at minimum a point estimate and
    a validity flag.
    """

    @property
    @abstractmethod
    def spec(self) -> Any:
        """The estimator's specification (normally an :class:`EstimatorSpec`)."""
        raise NotImplementedError

    @abstractmethod
    def fit(self, record: SeriesRecord) -> EstimateResult:
        """Compute an estimate for the given record.

        Args:
            record: The time-series record to analyse.

        Returns:
            An :class:`EstimateResult` with ``point``, ``valid``, and optional
            confidence intervals, diagnostics, and runtime.
        """
        raise NotImplementedError

spec abstractmethod property

The estimator's specification (normally an :class:EstimatorSpec).

fit(record) abstractmethod

Compute an estimate for the given record.

Parameters:

Name Type Description Default
record SeriesRecord

The time-series record to analyse.

required

Returns:

Name Type Description
An EstimateResult

class:EstimateResult with point, valid, and optional

EstimateResult

confidence intervals, diagnostics, and runtime.

Source code in src/lrdbench/interfaces.py
128
129
130
131
132
133
134
135
136
137
138
139
@abstractmethod
def fit(self, record: SeriesRecord) -> EstimateResult:
    """Compute an estimate for the given record.

    Args:
        record: The time-series record to analyse.

    Returns:
        An :class:`EstimateResult` with ``point``, ``valid``, and optional
        confidence intervals, diagnostics, and runtime.
    """
    raise NotImplementedError

lrdbench.interfaces.BaseGenerator

Bases: ABC

Abstract base for synthetic time-series generators.

Each generator produces a :class:SeriesRecord from a parameter dictionary and an optional seed. The family property is the registry key used in manifest source blocks (e.g. fGn, ARFIMA).

Source code in src/lrdbench/interfaces.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class BaseGenerator(ABC):
    """Abstract base for synthetic time-series generators.

    Each generator produces a :class:`SeriesRecord` from a parameter dictionary
    and an optional seed. The ``family`` property is the registry key used in
    manifest ``source`` blocks (e.g. ``fGn``, ``ARFIMA``).
    """

    @property
    @abstractmethod
    def family(self) -> str:
        """Registry key for this generator (e.g. ``'fGn'``, ``'ARFIMA'``)."""
        raise NotImplementedError

    @property
    @abstractmethod
    def version(self) -> str:
        """Human-readable version string for provenance tracking."""
        raise NotImplementedError

    @abstractmethod
    def generate(
        self,
        *,
        record_id: str,
        params: Mapping[str, Any],
        seed: int | None,
        manifest_id: str | None,
    ) -> SeriesRecord:
        """Generate a single synthetic record.

        Args:
            record_id: Stable identifier for the record.
            params: Generator-specific parameters (e.g. ``{'H': 0.75, 'n': 1024}``).
            seed: Optional RNG seed for reproducibility.
            manifest_id: Manifest identifier to embed in provenance.

        Returns:
            A fully populated :class:`SeriesRecord` including truth and provenance.
        """
        raise NotImplementedError

family abstractmethod property

Registry key for this generator (e.g. 'fGn', 'ARFIMA').

version abstractmethod property

Human-readable version string for provenance tracking.

generate(*, record_id, params, seed, manifest_id) abstractmethod

Generate a single synthetic record.

Parameters:

Name Type Description Default
record_id str

Stable identifier for the record.

required
params Mapping[str, Any]

Generator-specific parameters (e.g. {'H': 0.75, 'n': 1024}).

required
seed int | None

Optional RNG seed for reproducibility.

required
manifest_id str | None

Manifest identifier to embed in provenance.

required

Returns:

Type Description
SeriesRecord

A fully populated :class:SeriesRecord including truth and provenance.

Source code in src/lrdbench/interfaces.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@abstractmethod
def generate(
    self,
    *,
    record_id: str,
    params: Mapping[str, Any],
    seed: int | None,
    manifest_id: str | None,
) -> SeriesRecord:
    """Generate a single synthetic record.

    Args:
        record_id: Stable identifier for the record.
        params: Generator-specific parameters (e.g. ``{'H': 0.75, 'n': 1024}``).
        seed: Optional RNG seed for reproducibility.
        manifest_id: Manifest identifier to embed in provenance.

    Returns:
        A fully populated :class:`SeriesRecord` including truth and provenance.
    """
    raise NotImplementedError

Bundled temporal estimators

lrdbench.estimators.temporal.RSEstimator

Bases: BaseEstimator

Rescaled-range Hurst proxy with optional block-bootstrap CIs.

Parameters read from params:

  • n_bootstrap (int, default 200) – number of bootstrap replicates.
  • bootstrap_block_len (int, default max(4, n//10)) – block length.
  • ci_levels (list, default [0.95]) – nominal coverage levels.
  • min_scale (int, default 8) – minimum R/S subseries length.
  • max_scale (int, optional) – maximum R/S subseries length.
  • scale_ratio (float, default 1.5) – geometric scale spacing.
  • use_anis_lloyd_correction (bool, default False) – if True, divide each scale's average R/S value by the Anis-Lloyd white-noise expectation before fitting the slope, then add the 0.5 white-noise baseline back to the fitted slope.
Source code in src/lrdbench/estimators/temporal.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
class RSEstimator(BaseEstimator):
    """Rescaled-range Hurst proxy with optional block-bootstrap CIs.

    Parameters read from ``params``:

    - ``n_bootstrap`` (int, default 200) – number of bootstrap replicates.
    - ``bootstrap_block_len`` (int, default ``max(4, n//10)``) – block length.
    - ``ci_levels`` (list, default ``[0.95]``) – nominal coverage levels.
    - ``min_scale`` (int, default 8) – minimum R/S subseries length.
    - ``max_scale`` (int, optional) – maximum R/S subseries length.
    - ``scale_ratio`` (float, default 1.5) – geometric scale spacing.
    - ``use_anis_lloyd_correction`` (bool, default ``False``) – if ``True``,
      divide each scale's average R/S value by the Anis-Lloyd white-noise
      expectation before fitting the slope, then add the 0.5 white-noise
      baseline back to the fitted slope.
    """

    VERSION = "0.3.0"

    def __init__(self, spec: EstimatorSpec) -> None:
        self._spec = spec

    @property
    def spec(self) -> EstimatorSpec:
        return self._spec

    def fit(self, record: SeriesRecord) -> EstimateResult:
        t0 = time.perf_counter()
        params = dict(self._spec.parameter_schema)
        n_boot = int(params.get("n_bootstrap", 200))
        block_len = int(params.get("bootstrap_block_len", 0)) or max(4, record.values.size // 10)
        levels_raw = params.get("ci_levels")
        ci_levels = tuple(float(x) for x in levels_raw) if levels_raw is not None else (0.95,)
        seed = 0
        if record.provenance is not None and record.provenance.seed is not None:
            seed = int(record.provenance.seed)
        rng = np.random.default_rng(seed & (2**32 - 1))

        try:
            use_corr = bool(params.get("use_anis_lloyd_correction", False))
            min_scale = int(params.get("min_scale", 8))
            max_scale = int(params["max_scale"]) if params.get("max_scale") is not None else None
            scale_ratio = float(params.get("scale_ratio", 1.5))
            h = _rs_hurst_proxy(
                record.values,
                use_correction=use_corr,
                min_scale=min_scale,
                max_scale=max_scale,
                scale_ratio=scale_ratio,
            )
            dt = time.perf_counter() - t0
            if h is None:
                return EstimateResult(
                    record_id=record.record_id,
                    estimator_name=self._spec.name,
                    point=None,
                    runtime_seconds=dt,
                    valid=False,
                    failure_reason="insufficient_signal_for_rs",
                    estimator_version=self.VERSION,
                )

            def _rs_stat(z: np.ndarray) -> float | None:
                return _rs_hurst_proxy(
                    z,
                    use_correction=use_corr,
                    min_scale=min_scale,
                    max_scale=max_scale,
                    scale_ratio=scale_ratio,
                )

            samples = bootstrap_statistic_distribution(
                record.values,
                rng,
                _rs_stat,
                n_boot=n_boot,
                block_len=block_len,
            )
            cis = symmetric_percentile_cis(samples, ci_levels) if samples.size >= 5 else ()
            bstd = float(np.std(samples)) if samples.size >= 2 else None
            ci_low = ci_high = None
            for a, lo, hi in cis:
                if abs(a - 0.95) < 1e-9:
                    ci_low, ci_high = lo, hi
                    break
            if cis and (ci_low is None):
                ci_low, ci_high = cis[-1][1], cis[-1][2]

            diag: dict[str, object] = {
                "ci_method": "circular_block_bootstrap",
                "n_bootstrap": n_boot,
                "bootstrap_block_len": block_len,
                "bootstrap_replicates_used": int(samples.size),
                "bootstrap_point_std": bstd,
                "min_scale": min_scale,
                "max_scale": max_scale,
                "scale_ratio": scale_ratio,
                "use_anis_lloyd_correction": use_corr,
            }
            return EstimateResult(
                record_id=record.record_id,
                estimator_name=self._spec.name,
                point=h,
                ci_low=ci_low,
                ci_high=ci_high,
                runtime_seconds=dt,
                valid=True,
                estimator_version=self.VERSION,
                diagnostics=diag,
                bootstrap_cis=cis,
            )
        except Exception as exc:  # noqa: BLE001
            dt = time.perf_counter() - t0
            return EstimateResult(
                record_id=record.record_id,
                estimator_name=self._spec.name,
                point=None,
                runtime_seconds=dt,
                valid=False,
                failure_reason=f"exception:{type(exc).__name__}:{exc}",
                estimator_version=self.VERSION,
            )

lrdbench.estimators.temporal.DFAEstimator

Bases: BaseEstimator

Detrended fluctuation analysis (DFA) scaling exponent as a Hurst proxy.

Source code in src/lrdbench/estimators/temporal.py
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
class DFAEstimator(BaseEstimator):
    """Detrended fluctuation analysis (DFA) scaling exponent as a Hurst proxy."""

    VERSION = "0.1.0"

    def __init__(self, spec: EstimatorSpec) -> None:
        self._spec = spec

    @property
    def spec(self) -> EstimatorSpec:
        return self._spec

    def fit(self, record: SeriesRecord) -> EstimateResult:
        params = dict(self._spec.parameter_schema)

        def stat(z: np.ndarray) -> float | None:
            return _dfa_hurst(
                z,
                detrend_order=int(params.get("detrend_order", 1)),
                min_scale=int(params.get("min_scale", 16)),
                max_scale=int(params["max_scale"]) if params.get("max_scale") is not None else None,
            )

        return fit_with_block_bootstrap(
            record,
            self._spec,
            statistic=stat,
            estimator_version=self.VERSION,
            failure_reason="insufficient_signal_for_dfa",
            seed_offset=0,
        )

lrdbench.estimators.temporal.DMAEstimator

Bases: BaseEstimator

Detrended moving-average fluctuation scaling (Hurst proxy).

Source code in src/lrdbench/estimators/temporal.py
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
class DMAEstimator(BaseEstimator):
    """Detrended moving-average fluctuation scaling (Hurst proxy)."""

    VERSION = "0.1.0"

    def __init__(self, spec: EstimatorSpec) -> None:
        self._spec = spec

    @property
    def spec(self) -> EstimatorSpec:
        return self._spec

    def fit(self, record: SeriesRecord) -> EstimateResult:
        params = dict(self._spec.parameter_schema)

        def stat(z: np.ndarray) -> float | None:
            return _dma_hurst(
                z,
                min_scale=int(params.get("min_scale", 8)),
                max_scale=int(params["max_scale"]) if params.get("max_scale") is not None else None,
            )

        return fit_with_block_bootstrap(
            record,
            self._spec,
            statistic=stat,
            estimator_version=self.VERSION,
            failure_reason="insufficient_signal_for_dma",
            seed_offset=17,
        )

lrdbench.estimators.temporal.AbsoluteMomentEstimator

Bases: BaseEstimator

Absolute first moment of aggregated series as a Hurst proxy.

Source code in src/lrdbench/estimators/temporal.py
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
class AbsoluteMomentEstimator(BaseEstimator):
    """Absolute first moment of aggregated series as a Hurst proxy."""

    VERSION = "0.1.0"

    def __init__(self, spec: EstimatorSpec) -> None:
        self._spec = spec

    @property
    def spec(self) -> EstimatorSpec:
        return self._spec

    def fit(self, record: SeriesRecord) -> EstimateResult:
        params = dict(self._spec.parameter_schema)

        def stat(z: np.ndarray) -> float | None:
            return _absolute_moment_hurst(
                z,
                min_scale=int(params.get("min_scale", 2)),
                max_scale=int(params["max_scale"]) if params.get("max_scale") is not None else None,
                scale_ratio=float(params.get("scale_ratio", 1.5)),
            )

        return fit_with_block_bootstrap(
            record,
            self._spec,
            statistic=stat,
            estimator_version=self.VERSION,
            failure_reason="insufficient_signal_for_absolute_moment",
            seed_offset=29,
        )

lrdbench.estimators.temporal.VarianceEstimator

Bases: BaseEstimator

Variance of aggregated series as a Hurst proxy.

Source code in src/lrdbench/estimators/temporal.py
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
class VarianceEstimator(BaseEstimator):
    """Variance of aggregated series as a Hurst proxy."""

    VERSION = "0.1.0"

    def __init__(self, spec: EstimatorSpec) -> None:
        self._spec = spec

    @property
    def spec(self) -> EstimatorSpec:
        return self._spec

    def fit(self, record: SeriesRecord) -> EstimateResult:
        params = dict(self._spec.parameter_schema)

        def stat(z: np.ndarray) -> float | None:
            return _variance_aggregation_hurst(
                z,
                min_scale=int(params.get("min_scale", 2)),
                max_scale=int(params["max_scale"]) if params.get("max_scale") is not None else None,
                scale_ratio=float(params.get("scale_ratio", 1.5)),
            )

        return fit_with_block_bootstrap(
            record,
            self._spec,
            statistic=stat,
            estimator_version=self.VERSION,
            failure_reason="insufficient_signal_for_variance",
            seed_offset=31,
        )

lrdbench.estimators.temporal.VarianceResidualEstimator

Bases: BaseEstimator

Variance of block residuals as a Hurst proxy.

Source code in src/lrdbench/estimators/temporal.py
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
class VarianceResidualEstimator(BaseEstimator):
    """Variance of block residuals as a Hurst proxy."""

    VERSION = "0.1.0"

    def __init__(self, spec: EstimatorSpec) -> None:
        self._spec = spec

    @property
    def spec(self) -> EstimatorSpec:
        return self._spec

    def fit(self, record: SeriesRecord) -> EstimateResult:
        params = dict(self._spec.parameter_schema)

        def stat(z: np.ndarray) -> float | None:
            return _variance_residual_hurst(
                z,
                min_scale=int(params.get("min_scale", 8)),
                max_scale=int(params["max_scale"]) if params.get("max_scale") is not None else None,
                scale_ratio=float(params.get("scale_ratio", 1.5)),
                detrend_order=int(params.get("detrend_order", 1)),
            )

        return fit_with_block_bootstrap(
            record,
            self._spec,
            statistic=stat,
            estimator_version=self.VERSION,
            failure_reason="insufficient_signal_for_variance_residual",
            seed_offset=37,
        )

Bundled spectral estimators

lrdbench.estimators.spectral.GPHEstimator

Bases: BaseEstimator

Geweke–Porter–Hudak log-periodogram regression for long-memory parameter d.

Source code in src/lrdbench/estimators/spectral.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
class GPHEstimator(BaseEstimator):
    """Geweke–Porter–Hudak log-periodogram regression for long-memory parameter d."""

    VERSION = "0.3.0"

    def __init__(self, spec: EstimatorSpec) -> None:
        self._spec = spec

    @property
    def spec(self) -> EstimatorSpec:
        return self._spec

    def fit(self, record: SeriesRecord) -> EstimateResult:
        t0 = time.perf_counter()
        params = dict(self._spec.parameter_schema)
        n_boot = int(params.get("n_bootstrap", 200))
        block_len = int(params.get("bootstrap_block_len", 0)) or max(4, record.values.size // 10)
        levels_raw = params.get("ci_levels")
        ci_levels = tuple(float(x) for x in levels_raw) if levels_raw is not None else (0.95,)
        seed = 0
        if record.provenance is not None and record.provenance.seed is not None:
            seed = int(record.provenance.seed) + 7919
        rng = np.random.default_rng(seed & (2**32 - 1))

        try:
            taper = str(params.get("taper", "none")) or "none"
            m = int(params["m"]) if params.get("m") is not None else None
            d = _log_periodogram_regression_d(record.values, m=m, taper=taper)
            dt = time.perf_counter() - t0
            if d is None:
                return EstimateResult(
                    record_id=record.record_id,
                    estimator_name=self._spec.name,
                    point=None,
                    runtime_seconds=dt,
                    valid=False,
                    failure_reason="insufficient_signal_for_gph",
                    estimator_version=self.VERSION,
                )

            def _gph_stat(z: np.ndarray) -> float | None:
                return _log_periodogram_regression_d(z, m=m, taper=taper)

            samples = bootstrap_statistic_distribution(
                record.values,
                rng,
                _gph_stat,
                n_boot=n_boot,
                block_len=block_len,
            )
            cis = symmetric_percentile_cis(samples, ci_levels) if samples.size >= 5 else ()
            bstd = float(np.std(samples)) if samples.size >= 2 else None
            ci_low = ci_high = None
            for a, lo, hi in cis:
                if abs(a - 0.95) < 1e-9:
                    ci_low, ci_high = lo, hi
                    break
            if cis and ci_low is None:
                ci_low, ci_high = cis[-1][1], cis[-1][2]

            diag: dict[str, object] = {
                "ci_method": "circular_block_bootstrap",
                "n_bootstrap": n_boot,
                "bootstrap_block_len": block_len,
                "bootstrap_replicates_used": int(samples.size),
                "bootstrap_point_std": bstd,
                "m": m,
                "taper": taper,
            }
            return EstimateResult(
                record_id=record.record_id,
                estimator_name=self._spec.name,
                point=d,
                ci_low=ci_low,
                ci_high=ci_high,
                runtime_seconds=dt,
                valid=True,
                estimator_version=self.VERSION,
                diagnostics=diag,
                bootstrap_cis=cis,
            )
        except Exception as exc:  # noqa: BLE001
            dt = time.perf_counter() - t0
            return EstimateResult(
                record_id=record.record_id,
                estimator_name=self._spec.name,
                point=None,
                runtime_seconds=dt,
                valid=False,
                failure_reason=f"exception:{type(exc).__name__}:{exc}",
                estimator_version=self.VERSION,
            )

lrdbench.estimators.spectral.PeriodogramRegressionEstimator

Bases: BaseEstimator

Log-periodogram regression (memory parameter d, GPH-type).

Source code in src/lrdbench/estimators/spectral.py
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
class PeriodogramRegressionEstimator(BaseEstimator):
    """Log-periodogram regression (memory parameter d, GPH-type)."""

    VERSION = "0.2.0"

    def __init__(self, spec: EstimatorSpec) -> None:
        self._spec = spec

    @property
    def spec(self) -> EstimatorSpec:
        return self._spec

    def fit(self, record: SeriesRecord) -> EstimateResult:
        params = dict(self._spec.parameter_schema)
        taper = str(params.get("taper", "none")) or "none"

        def stat(z: np.ndarray) -> float | None:
            m = int(params["m"]) if params.get("m") is not None else None
            return _log_periodogram_regression_d(z, m=m, taper=taper)

        return fit_with_block_bootstrap(
            record,
            self._spec,
            statistic=stat,
            estimator_version=self.VERSION,
            failure_reason="insufficient_signal_for_periodogram",
            seed_offset=101,
        )

lrdbench.estimators.spectral.WhittleMLEEstimator

Bases: BaseEstimator

Gaussian Whittle likelihood for ARFIMA(0,d,0) spectral density.

Source code in src/lrdbench/estimators/spectral.py
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
class WhittleMLEEstimator(BaseEstimator):
    """Gaussian Whittle likelihood for ARFIMA(0,d,0) spectral density."""

    VERSION = "0.1.0"

    def __init__(self, spec: EstimatorSpec) -> None:
        self._spec = spec

    @property
    def spec(self) -> EstimatorSpec:
        return self._spec

    def fit(self, record: SeriesRecord) -> EstimateResult:
        params = dict(self._spec.parameter_schema)

        def stat(z: np.ndarray) -> float | None:
            m = int(params["m"]) if params.get("m") is not None else None
            return _whittle_arfima_d(z, m=m)

        return fit_with_block_bootstrap(
            record,
            self._spec,
            statistic=stat,
            estimator_version=self.VERSION,
            failure_reason="insufficient_signal_for_whittle",
            seed_offset=203,
        )

lrdbench.estimators.spectral.ModifiedLocalWhittleEstimator

Bases: BaseEstimator

Modified (Gaussian) local Whittle estimator of long-memory parameter d.

Source code in src/lrdbench/estimators/spectral.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
class ModifiedLocalWhittleEstimator(BaseEstimator):
    """Modified (Gaussian) local Whittle estimator of long-memory parameter d."""

    VERSION = "0.1.0"

    def __init__(self, spec: EstimatorSpec) -> None:
        self._spec = spec

    @property
    def spec(self) -> EstimatorSpec:
        return self._spec

    def fit(self, record: SeriesRecord) -> EstimateResult:
        params = dict(self._spec.parameter_schema)

        def stat(z: np.ndarray) -> float | None:
            m = int(params["m"]) if params.get("m") is not None else None
            return _modified_local_whittle_d(z, m=m)

        return fit_with_block_bootstrap(
            record,
            self._spec,
            statistic=stat,
            estimator_version=self.VERSION,
            failure_reason="insufficient_signal_for_mlw",
            seed_offset=307,
        )

Bundled geometric estimators

lrdbench.estimators.geometric.HiguchiEstimator

Bases: BaseEstimator

Higuchi fractal length curve; Hurst proxy H ≈ 2 − D for the time-series graph.

Source code in src/lrdbench/estimators/geometric.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
class HiguchiEstimator(BaseEstimator):
    """Higuchi fractal length curve; Hurst proxy H ≈ 2 − D for the time-series graph."""

    VERSION = "0.1.0"

    def __init__(self, spec: EstimatorSpec) -> None:
        self._spec = spec

    @property
    def spec(self) -> EstimatorSpec:
        return self._spec

    def fit(self, record: SeriesRecord) -> EstimateResult:
        params = dict(self._spec.parameter_schema)

        def stat(z: np.ndarray) -> float | None:
            km = int(params["k_max"]) if params.get("k_max") is not None else None
            return _higuchi_hurst_proxy(z, k_max=km)

        return fit_with_block_bootstrap(
            record,
            self._spec,
            statistic=stat,
            estimator_version=self.VERSION,
            failure_reason="insufficient_signal_for_higuchi",
            seed_offset=919,
        )

lrdbench.estimators.geometric.GHEEstimator

Bases: BaseEstimator

Geometric Hurst estimator: multiscale variance scaling of lagged increments.

Parameters read from params:

  • n_scales (int, default 16) – number of geometric lags.
  • h_min (int, default 1) – minimum lag in samples.
  • flat_slope_tol (float, default 0.08) – pragmatic threshold below which the log-log slope is treated as flat and the estimate is clamped to 0.5. This is an empiric finite-sample guard, not a theoretically derived bound; set to 0.0 to disable it.
Source code in src/lrdbench/estimators/geometric.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
class GHEEstimator(BaseEstimator):
    """Geometric Hurst estimator: multiscale variance scaling of lagged increments.

    Parameters read from ``params``:

    - ``n_scales`` (int, default 16) – number of geometric lags.
    - ``h_min`` (int, default 1) – minimum lag in samples.
    - ``flat_slope_tol`` (float, default 0.08) – pragmatic threshold below which
      the log-log slope is treated as flat and the estimate is clamped to ``0.5``.
      This is an empiric finite-sample guard, not a theoretically derived bound;
      set to ``0.0`` to disable it.
    """

    VERSION = "0.1.0"

    def __init__(self, spec: EstimatorSpec) -> None:
        self._spec = spec

    @property
    def spec(self) -> EstimatorSpec:
        return self._spec

    def fit(self, record: SeriesRecord) -> EstimateResult:
        params = dict(self._spec.parameter_schema)

        def stat(z: np.ndarray) -> float | None:
            return _ghe_hurst(
                z,
                n_scales=int(params.get("n_scales", 16)),
                h_min=int(params.get("h_min", 1)),
                flat_slope_tol=float(params.get("flat_slope_tol", 0.08)),
            )

        return fit_with_block_bootstrap(
            record,
            self._spec,
            statistic=stat,
            estimator_version=self.VERSION,
            failure_reason="insufficient_signal_for_ghe",
            seed_offset=1021,
        )

Bundled wavelet estimators

lrdbench.estimators.wavelet.WaveletOLSEstimator

Bases: BaseEstimator

Plain OLS on log2 wavelet detail variances vs scale index (log-scale regression).

Source code in src/lrdbench/estimators/wavelet.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
class WaveletOLSEstimator(BaseEstimator):
    """Plain OLS on log2 wavelet detail variances vs scale index (log-scale regression)."""

    VERSION = "0.1.0"

    def __init__(self, spec: EstimatorSpec) -> None:
        self._spec = spec

    @property
    def spec(self) -> EstimatorSpec:
        return self._spec

    def fit(self, record: SeriesRecord) -> EstimateResult:
        params = dict(self._spec.parameter_schema)
        wavelet = str(params.get("wavelet", "db4"))
        j_hi = int(params.get("j_drop_high", 1))
        j_lo = int(params.get("j_drop_low", 1))

        def stat(z: np.ndarray) -> float | None:
            packed = _collect_detail_scales(z, wavelet=wavelet, j_drop_high=j_hi, j_drop_low=j_lo)
            if packed is None:
                return None
            j, v, _ = packed
            return _hurst_from_log2_slope(_ols_slope_log2(j, v))

        return fit_with_block_bootstrap(
            record,
            self._spec,
            statistic=stat,
            estimator_version=self.VERSION,
            failure_reason="insufficient_signal_for_wavelet_ols",
            seed_offset=607,
        )

lrdbench.estimators.wavelet.WaveletAbryVeitchEstimator

Bases: BaseEstimator

Abry–Veitch-type log-scale regression on wavelet detail variances (Hurst proxy).

Source code in src/lrdbench/estimators/wavelet.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
class WaveletAbryVeitchEstimator(BaseEstimator):
    """Abry–Veitch-type log-scale regression on wavelet detail variances (Hurst proxy)."""

    VERSION = "0.1.0"

    def __init__(self, spec: EstimatorSpec) -> None:
        self._spec = spec

    @property
    def spec(self) -> EstimatorSpec:
        return self._spec

    def fit(self, record: SeriesRecord) -> EstimateResult:
        params = dict(self._spec.parameter_schema)
        wavelet = str(params.get("wavelet", "db4"))
        j_hi = int(params.get("j_drop_high", 2))
        j_lo = int(params.get("j_drop_low", 2))

        def stat(z: np.ndarray) -> float | None:
            packed = _collect_detail_scales(z, wavelet=wavelet, j_drop_high=j_hi, j_drop_low=j_lo)
            if packed is None:
                return None
            j, v, _ = packed
            return _hurst_from_log2_slope(_ols_slope_log2(j, v))

        return fit_with_block_bootstrap(
            record,
            self._spec,
            statistic=stat,
            estimator_version=self.VERSION,
            failure_reason="insufficient_signal_for_wavelet_av",
            seed_offset=401,
        )

lrdbench.estimators.wavelet.WaveletBardetEstimator

Bases: BaseEstimator

Weighted log-scale regression (Bardet-type wavelet Hurst proxy).

Source code in src/lrdbench/estimators/wavelet.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
class WaveletBardetEstimator(BaseEstimator):
    """Weighted log-scale regression (Bardet-type wavelet Hurst proxy)."""

    VERSION = "0.1.0"

    def __init__(self, spec: EstimatorSpec) -> None:
        self._spec = spec

    @property
    def spec(self) -> EstimatorSpec:
        return self._spec

    def fit(self, record: SeriesRecord) -> EstimateResult:
        params = dict(self._spec.parameter_schema)
        wavelet = str(params.get("wavelet", "db4"))
        j_hi = int(params.get("j_drop_high", 1))
        j_lo = int(params.get("j_drop_low", 2))

        def stat(z: np.ndarray) -> float | None:
            packed = _collect_detail_scales(z, wavelet=wavelet, j_drop_high=j_hi, j_drop_low=j_lo)
            if packed is None:
                return None
            j, v, n = packed
            return _hurst_from_log2_slope(_wls_slope_log2(j, v, n))

        return fit_with_block_bootstrap(
            record,
            self._spec,
            statistic=stat,
            estimator_version=self.VERSION,
            failure_reason="insufficient_signal_for_wavelet_bardet",
            seed_offset=503,
        )

lrdbench.estimators.wavelet.WaveletJensenEstimator

Bases: BaseEstimator

Two-band wavelet slope extrapolation (Jensen-style bias reduction).

Source code in src/lrdbench/estimators/wavelet.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
class WaveletJensenEstimator(BaseEstimator):
    """Two-band wavelet slope extrapolation (Jensen-style bias reduction)."""

    VERSION = "0.1.0"

    def __init__(self, spec: EstimatorSpec) -> None:
        self._spec = spec

    @property
    def spec(self) -> EstimatorSpec:
        return self._spec

    def fit(self, record: SeriesRecord) -> EstimateResult:
        params = dict(self._spec.parameter_schema)
        wavelet = str(params.get("wavelet", "db4"))
        fb = params.get("fine_band", (2, 4))
        cb = params.get("coarse_band", (4, 6))
        fine_band = (int(fb[0]), int(fb[1]))
        coarse_band = (int(cb[0]), int(cb[1]))

        def stat(z: np.ndarray) -> float | None:
            return _wavelet_jensen_h(
                z, wavelet=wavelet, fine_band=fine_band, coarse_band=coarse_band
            )

        return fit_with_block_bootstrap(
            record,
            self._spec,
            statistic=stat,
            estimator_version=self.VERSION,
            failure_reason="insufficient_signal_for_wavelet_jensen",
            seed_offset=709,
        )

lrdbench.estimators.wavelet.WaveletWhittleEstimator

Bases: BaseEstimator

Wavelet-domain Gaussian Whittle-type fit to detail variances across scales.

Source code in src/lrdbench/estimators/wavelet.py
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
class WaveletWhittleEstimator(BaseEstimator):
    """Wavelet-domain Gaussian Whittle-type fit to detail variances across scales."""

    VERSION = "0.1.0"

    def __init__(self, spec: EstimatorSpec) -> None:
        self._spec = spec

    @property
    def spec(self) -> EstimatorSpec:
        return self._spec

    def fit(self, record: SeriesRecord) -> EstimateResult:
        params = dict(self._spec.parameter_schema)
        wavelet = str(params.get("wavelet", "db4"))
        j_hi = int(params.get("j_drop_high", 1))
        j_lo = int(params.get("j_drop_low", 1))

        def stat(z: np.ndarray) -> float | None:
            return _wavelet_whittle_h(z, wavelet=wavelet, j_drop_high=j_hi, j_drop_low=j_lo)

        return fit_with_block_bootstrap(
            record,
            self._spec,
            statistic=stat,
            estimator_version=self.VERSION,
            failure_reason="insufficient_signal_for_wavelet_whittle",
            seed_offset=811,
        )

Bundled data-driven estimators

lrdbench.estimators.data_driven.MLRandomForestEstimator

Bases: _SklearnEstimator

Source code in src/lrdbench/estimators/data_driven.py
265
266
class MLRandomForestEstimator(_SklearnEstimator):
    MODEL_KIND = "random_forest_regressor"

lrdbench.estimators.data_driven.MLSVREstimator

Bases: _SklearnEstimator

Source code in src/lrdbench/estimators/data_driven.py
269
270
class MLSVREstimator(_SklearnEstimator):
    MODEL_KIND = "support_vector_regressor"

lrdbench.estimators.data_driven.MLCNNEstimator

Bases: _TorchSequenceEstimator

Source code in src/lrdbench/estimators/data_driven.py
356
357
class MLCNNEstimator(_TorchSequenceEstimator):
    MODEL_KIND = "cnn_1d"

lrdbench.estimators.data_driven.MLLSTMEstimator

Bases: _TorchSequenceEstimator

Source code in src/lrdbench/estimators/data_driven.py
360
361
class MLLSTMEstimator(_TorchSequenceEstimator):
    MODEL_KIND = "lstm"

Registries

lrdbench.registries.EstimatorRegistry

Bases: Registry[EstimatorBuilder]

Callable(spec: EstimatorSpec) -> BaseEstimator.

Source code in src/lrdbench/registries.py
44
45
46
47
class EstimatorRegistry(Registry[EstimatorBuilder]):
    """Callable(spec: EstimatorSpec) -> BaseEstimator."""

    pass

lrdbench.registries.GeneratorRegistry

Bases: Registry[BaseGenerator]

Source code in src/lrdbench/registries.py
33
34
class GeneratorRegistry(Registry[BaseGenerator]):
    pass

lrdbench.registries.ContaminationRegistry

Bases: Registry[BaseContamination]

Source code in src/lrdbench/registries.py
37
38
class ContaminationRegistry(Registry[BaseContamination]):
    pass

Defaults

lrdbench.defaults.build_default_estimator_registry()

Source code in src/lrdbench/defaults.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def build_default_estimator_registry() -> EstimatorRegistry:
    reg = EstimatorRegistry()

    def rs_builder(spec: EstimatorSpec) -> BaseEstimator:
        return RSEstimator(spec)

    def gph_builder(spec: EstimatorSpec) -> BaseEstimator:
        return GPHEstimator(spec)

    def dfa_builder(spec: EstimatorSpec) -> BaseEstimator:
        return DFAEstimator(spec)

    def dma_builder(spec: EstimatorSpec) -> BaseEstimator:
        return DMAEstimator(spec)

    def absolute_moment_builder(spec: EstimatorSpec) -> BaseEstimator:
        return AbsoluteMomentEstimator(spec)

    def variance_builder(spec: EstimatorSpec) -> BaseEstimator:
        return VarianceEstimator(spec)

    def variance_residual_builder(spec: EstimatorSpec) -> BaseEstimator:
        return VarianceResidualEstimator(spec)

    def higuchi_builder(spec: EstimatorSpec) -> BaseEstimator:
        return HiguchiEstimator(spec)

    def ghe_builder(spec: EstimatorSpec) -> BaseEstimator:
        return GHEEstimator(spec)

    def periodogram_builder(spec: EstimatorSpec) -> BaseEstimator:
        return PeriodogramRegressionEstimator(spec)

    def whittle_builder(spec: EstimatorSpec) -> BaseEstimator:
        return WhittleMLEEstimator(spec)

    def mlw_builder(spec: EstimatorSpec) -> BaseEstimator:
        return ModifiedLocalWhittleEstimator(spec)

    def w_av_builder(spec: EstimatorSpec) -> BaseEstimator:
        return WaveletAbryVeitchEstimator(spec)

    def w_bardet_builder(spec: EstimatorSpec) -> BaseEstimator:
        return WaveletBardetEstimator(spec)

    def w_ols_builder(spec: EstimatorSpec) -> BaseEstimator:
        return WaveletOLSEstimator(spec)

    def w_jensen_builder(spec: EstimatorSpec) -> BaseEstimator:
        return WaveletJensenEstimator(spec)

    def w_whittle_builder(spec: EstimatorSpec) -> BaseEstimator:
        return WaveletWhittleEstimator(spec)

    def ml_rf_builder(spec: EstimatorSpec) -> BaseEstimator:
        return MLRandomForestEstimator(spec)

    def ml_svr_builder(spec: EstimatorSpec) -> BaseEstimator:
        return MLSVREstimator(spec)

    def ml_cnn_builder(spec: EstimatorSpec) -> BaseEstimator:
        return MLCNNEstimator(spec)

    def ml_lstm_builder(spec: EstimatorSpec) -> BaseEstimator:
        return MLLSTMEstimator(spec)

    reg.register("RS", rs_builder)
    reg.register("GPH", gph_builder)
    reg.register("DFA", dfa_builder)
    reg.register("DMA", dma_builder)
    reg.register("AbsoluteMoment", absolute_moment_builder)
    reg.register("Variance", variance_builder)
    reg.register("VarianceResidual", variance_residual_builder)
    reg.register("Higuchi", higuchi_builder)
    reg.register("GHE", ghe_builder)
    reg.register("Periodogram", periodogram_builder)
    reg.register("WhittleMLE", whittle_builder)
    reg.register("ModifiedLocalWhittle", mlw_builder)
    reg.register("WaveletAbryVeitch", w_av_builder)
    reg.register("WaveletBardet", w_bardet_builder)
    reg.register("WaveletOLS", w_ols_builder)
    reg.register("WaveletJensen", w_jensen_builder)
    reg.register("WaveletWhittle", w_whittle_builder)
    reg.register("MLRandomForest", ml_rf_builder)
    reg.register("MLSVR", ml_svr_builder)
    reg.register("MLCNN", ml_cnn_builder)
    reg.register("MLLSTM", ml_lstm_builder)
    return reg

lrdbench.defaults.build_default_generator_registry()

Source code in src/lrdbench/defaults.py
47
48
49
50
51
52
53
54
def build_default_generator_registry() -> GeneratorRegistry:
    reg = GeneratorRegistry()
    reg.register("fGn", FGNGenerator())
    reg.register("fBm", FBMGenerator())
    reg.register("ARFIMA", ARFIMAGenerator())
    reg.register("MRW", MRWGenerator())
    reg.register("fOU", FOUGenerator())
    return reg

lrdbench.defaults.build_default_contamination_registry()

Source code in src/lrdbench/defaults.py
57
58
59
60
61
62
63
def build_default_contamination_registry() -> ContaminationRegistry:
    reg = ContaminationRegistry()
    reg.register("polynomial_trend", PolynomialTrendContamination())
    reg.register("outliers", OutliersContamination())
    reg.register("level_shift", LevelShiftContamination())
    reg.register("heavy_tail_noise", HeavyTailNoiseContamination())
    return reg

Schema dataclasses

lrdbench.schema.BenchmarkManifest dataclass

Source code in src/lrdbench/schema.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
@dataclass(frozen=True)
class BenchmarkManifest:
    manifest_id: str
    name: str
    mode: BenchmarkMode
    source_spec: Mapping[str, Any]
    contamination_spec: Mapping[str, Any] = field(default_factory=dict)
    segmentation_spec: Mapping[str, Any] = field(default_factory=dict)
    preprocessing_spec: Mapping[str, Any] = field(default_factory=dict)
    estimator_specs: tuple[EstimatorSpec, ...] = ()
    metric_specs: tuple[MetricSpec, ...] = ()
    leaderboard_specs: tuple[LeaderboardSpec, ...] = ()
    report_spec: ReportSpec | None = None
    execution_spec: Mapping[str, Any] = field(default_factory=dict)
    uncertainty_spec: Mapping[str, Any] = field(default_factory=dict)
    ml_training_spec: Mapping[str, Any] = field(default_factory=dict)
    seed_spec: Mapping[str, Any] = field(default_factory=dict)
    raw_yaml: Mapping[str, Any] | None = None

lrdbench.schema.SeriesRecord dataclass

Source code in src/lrdbench/schema.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
@dataclass(frozen=True)
class SeriesRecord:
    record_id: str
    values: np.ndarray
    time_axis: np.ndarray | None
    sampling_rate: float | None
    source_type: SourceType
    source_name: str
    truth: TruthSpec | None = None
    contamination_history: tuple[TransformationRecord, ...] = ()
    preprocessing_history: tuple[TransformationRecord, ...] = ()
    annotations: Mapping[str, Any] = field(default_factory=dict)
    provenance: ProvenanceRecord | None = None

    def __post_init__(self) -> None:
        object.__setattr__(self, "values", np.asarray(self.values, dtype=float))
        if self.time_axis is not None:
            object.__setattr__(self, "time_axis", np.asarray(self.time_axis, dtype=float))

lrdbench.schema.EstimateResult dataclass

Source code in src/lrdbench/schema.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
@dataclass(frozen=True)
class EstimateResult:
    record_id: str
    estimator_name: str
    point: float | None
    ci_low: float | None = None
    ci_high: float | None = None
    stderr: float | None = None
    diagnostics: Mapping[str, Any] = field(default_factory=dict)
    runtime_seconds: float | None = None
    valid: bool = True
    warnings: tuple[str, ...] = ()
    failure_reason: str | None = None
    estimator_version: str | None = None
    # Phase 2: symmetric bootstrap CIs per nominal level (alpha, lo, hi), alpha in (0,1)
    bootstrap_cis: tuple[tuple[float, float, float], ...] = ()

lrdbench.schema.EstimatorSpec dataclass

Source code in src/lrdbench/schema.py
76
77
78
79
80
81
82
83
84
85
86
87
@dataclass(frozen=True)
class EstimatorSpec:
    name: str
    family: str
    target_estimand: str
    assumptions: tuple[str, ...]
    supports_ci: bool
    supports_diagnostics: bool
    input_requirements: Mapping[str, Any] = field(default_factory=dict)
    parameter_schema: Mapping[str, Any] = field(default_factory=dict)
    reference_citations: tuple[str, ...] = ()
    version: str | None = None

lrdbench.schema.MetricSpec dataclass

Definition of a benchmark metric.

Attributes:

Name Type Description
name str

Machine-readable metric identifier (e.g. 'bias').

symbol str

Short symbol for tables (e.g. 'B').

requires_truth bool

Whether the metric needs a ground-truth target value.

admissible_modes tuple[BenchmarkMode, ...]

Benchmark modes where this metric is valid.

aggregation_rule str

How stratum-level values are combined (e.g. 'mean', 'median').

optimisation_direction OptimisationDirection

Whether lower or higher values are better.

unit str | None

Optional unit string for reporting.

null_policy str

How missing values are handled.

nominal_levels tuple[float, ...]

Coverage levels (e.g. (0.95,)) for uncertainty metrics; empty tuple uses evaluator defaults.

parameters Mapping[str, Any]

Extra metric-specific parameters.

Source code in src/lrdbench/schema.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
@dataclass(frozen=True)
class MetricSpec:
    """Definition of a benchmark metric.

    Attributes:
        name: Machine-readable metric identifier (e.g. ``'bias'``).
        symbol: Short symbol for tables (e.g. ``'B'``).
        requires_truth: Whether the metric needs a ground-truth target value.
        admissible_modes: Benchmark modes where this metric is valid.
        aggregation_rule: How stratum-level values are combined (e.g.
            ``'mean'``, ``'median'``).
        optimisation_direction: Whether lower or higher values are better.
        unit: Optional unit string for reporting.
        null_policy: How missing values are handled.
        nominal_levels: Coverage levels (e.g. ``(0.95,)``) for uncertainty
            metrics; empty tuple uses evaluator defaults.
        parameters: Extra metric-specific parameters.
    """
    name: str
    symbol: str
    requires_truth: bool
    admissible_modes: tuple[BenchmarkMode, ...]
    aggregation_rule: str
    optimisation_direction: OptimisationDirection
    unit: str | None = None
    null_policy: str = "explicit_null"
    # Nominal levels (e.g. 0.95) for coverage / ci_width / coverage_error; empty () uses defaults in evaluator
    nominal_levels: tuple[float, ...] = ()
    parameters: Mapping[str, Any] = field(default_factory=dict)

lrdbench.schema.MetricValue dataclass

Source code in src/lrdbench/schema.py
139
140
141
142
143
144
145
146
147
@dataclass(frozen=True)
class MetricValue:
    run_id: str
    record_id: str | None
    estimator_name: str
    metric_name: str
    value: float | None
    stratum: Mapping[str, Any] = field(default_factory=dict)
    metadata: Mapping[str, Any] = field(default_factory=dict)

lrdbench.schema.MetricBundle dataclass

Source code in src/lrdbench/schema.py
150
151
152
153
154
155
@dataclass(frozen=True)
class MetricBundle:
    per_series: tuple[MetricValue, ...]
    aggregate: tuple[MetricValue, ...]
    uncertainty: tuple[MetricValue, ...] = ()
    metadata: Mapping[str, Any] = field(default_factory=dict)

lrdbench.schema.LeaderboardSpec dataclass

Source code in src/lrdbench/schema.py
158
159
160
161
162
163
164
165
@dataclass(frozen=True)
class LeaderboardSpec:
    mode: BenchmarkMode
    component_metrics: tuple[str, ...]
    weights: Mapping[str, float]
    ranking_rule: str = "weighted_rank"
    tie_break_rule: str = "best_primary_metric"
    name: str | None = None

lrdbench.schema.LeaderboardRow dataclass

Source code in src/lrdbench/schema.py
168
169
170
171
172
173
174
175
@dataclass(frozen=True)
class LeaderboardRow:
    run_id: str
    estimator_name: str
    rank: int
    score: float
    component_values: Mapping[str, float | None]
    metadata: Mapping[str, Any] = field(default_factory=dict)

lrdbench.schema.ReportSpec dataclass

Source code in src/lrdbench/schema.py
178
179
180
181
182
183
184
185
186
187
188
189
@dataclass(frozen=True)
class ReportSpec:
    formats: tuple[str, ...]
    leaderboards: tuple[LeaderboardSpec, ...]
    figure_set: tuple[str, ...] = ()
    table_set: tuple[str, ...] = ()
    include_raw_exports: bool = True
    include_provenance: bool = True
    include_environment: bool = True
    export_root: str = "reports"
    naming_policy: str = "deterministic"
    compression_policy: str | None = None

lrdbench.schema.ReportBundle dataclass

Source code in src/lrdbench/schema.py
204
205
206
207
208
209
210
211
212
213
214
215
@dataclass(frozen=True)
class ReportBundle:
    run_id: str
    summary_table_path: str | None = None
    leaderboard_paths: tuple[str, ...] = ()
    figure_paths: tuple[str, ...] = ()
    latex_table_paths: tuple[str, ...] = ()
    html_report_path: str | None = None
    markdown_report_path: str | None = None
    manifest_copy_path: str | None = None
    result_store_path: str | None = None
    artefacts: tuple[ArtefactRecord, ...] = ()

lrdbench.schema.BenchmarkRunOutput dataclass

Source code in src/lrdbench/schema.py
238
239
240
241
242
243
244
245
246
247
@dataclass(frozen=True)
class BenchmarkRunOutput:
    run_id: str
    records: tuple[SeriesRecord, ...]
    estimates: tuple[EstimateResult, ...]
    metrics: MetricBundle
    leaderboards: tuple[LeaderboardRow, ...]
    report_bundle: ReportBundle | None
    result_store_path: str | None = None
    plugin_provenance: tuple[PluginProvenanceRecord, ...] = ()

lrdbench.schema.PluginProvenanceRecord dataclass

Source code in src/lrdbench/schema.py
45
46
47
48
49
50
51
52
53
@dataclass(frozen=True)
class PluginProvenanceRecord:
    plugin_name: str
    module_name_or_path: str
    entry_point_name: str
    version: str | None = None
    status: str = "ok"
    failure_reason: str | None = None
    source_hash: str | None = None

Validation and contracts

lrdbench.validation.validate_manifest(manifest, *, strict_unknown_keys=True)

Source code in src/lrdbench/validation.py
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
def validate_manifest(manifest: BenchmarkManifest, *, strict_unknown_keys: bool = True) -> None:
    data = manifest.raw_yaml or {}
    if strict_unknown_keys:
        allowed = {
            "manifest_id",
            "name",
            "description",
            "mode",
            "source",
            "contamination",
            "segmentation",
            "preprocessing",
            "estimators",
            "metrics",
            "leaderboards",
            "report",
            "execution",
            "uncertainty",
            "ml_training",
            "seeds",
            "validation",
        }
        bad = set(data) - allowed
        if bad:
            raise ManifestValidationError(f"unknown top-level manifest keys: {sorted(bad)}")

    # MV1
    if not manifest.manifest_id or not manifest.name:
        raise ManifestValidationError("manifest_id and name are required")
    if not manifest.estimator_specs:
        raise ManifestValidationError("at least one estimator is required")
    if not manifest.metric_specs:
        raise ManifestValidationError("at least one metric is required")

    # MV2 / MV3
    if manifest.mode is BenchmarkMode.STRESS_TEST and not manifest.contamination_spec:
        raise ManifestValidationError("stress_test mode requires a non-empty contamination block")
    if manifest.mode is BenchmarkMode.STRESS_TEST:
        ops = manifest.contamination_spec.get("operators")
        if not ops:
            raise ManifestValidationError(
                "stress_test mode requires contamination.operators with at least one operator entry"
            )
    if manifest.mode is BenchmarkMode.GROUND_TRUTH and manifest.contamination_spec:
        raise ManifestValidationError(
            "ground_truth mode must not declare contamination unless explicitly permitted"
        )
    if manifest.mode is BenchmarkMode.OBSERVATIONAL and manifest.contamination_spec:
        raise ManifestValidationError(
            "observational mode must not declare a contamination block in this release"
        )

    # MV4
    if manifest.mode is BenchmarkMode.OBSERVATIONAL:
        _validate_observational_source(manifest)

    # MV5
    for e in manifest.estimator_specs:
        if not e.target_estimand:
            raise ManifestValidationError(f"estimator {e.name!r} must declare target_estimand")

    # MV5b (Phase 5 execution)
    _validate_execution_block(manifest.execution_spec)
    _validate_uncertainty_block(manifest.uncertainty_spec)
    _validate_ml_training_block(
        manifest.ml_training_spec,
        estimators=manifest.estimator_specs,
    )

    # MV6
    metric_names = {x.name for x in manifest.metric_specs}
    if "coverage_error" in metric_names and "coverage" not in metric_names:
        raise ManifestValidationError(
            "coverage_error requires a coverage metric in the metrics block"
        )
    if "coverage_collapse" in metric_names and "coverage" not in metric_names:
        raise ManifestValidationError(
            "coverage_collapse requires a coverage metric in the metrics block"
        )
    if "relative_degradation_ratio" in metric_names and "mae" not in metric_names:
        raise ManifestValidationError(
            "relative_degradation_ratio requires an mae metric in the metrics block"
        )
    for m in manifest.metric_specs:
        validate_metric_admissibility(m, manifest.mode)
        for a in m.nominal_levels:
            if not (0.0 < float(a) < 1.0):
                raise ManifestValidationError(
                    f"nominal level for metric {m.name!r} must lie in (0,1), got {a!r}"
                )

    # MV7 / MV8 leaderboards
    for lb in manifest.leaderboard_specs:
        if lb.mode is not manifest.mode:
            raise ManifestValidationError(
                f"leaderboard {lb.component_metrics!r} mode {lb.mode.value!r} "
                f"does not match manifest mode {manifest.mode.value!r}"
            )
        wsum = sum(lb.weights.values())
        if abs(wsum - 1.0) > 1e-6:
            raise ManifestValidationError(f"leaderboard weights must sum to 1, got {wsum}")
        for comp in lb.component_metrics:
            if comp == "coverage_error":
                if "coverage" not in metric_names:
                    raise ManifestValidationError("coverage_error requires coverage metric")
                continue
            if comp not in metric_names:
                raise ManifestValidationError(
                    f"leaderboard component {comp!r} is not declared in metrics block"
                )

lrdbench.validation.validate_metric_admissibility(metric_spec, mode, record=None)

Source code in src/lrdbench/validation.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
def validate_metric_admissibility(
    metric_spec: MetricSpec,
    mode: BenchmarkMode,
    record: SeriesRecord | None = None,
) -> None:
    if mode not in metric_spec.admissible_modes:
        raise ManifestValidationError(
            f"metric {metric_spec.name!r} is not admissible in mode {mode.value!r}"
        )
    if metric_spec.requires_truth and mode is BenchmarkMode.OBSERVATIONAL:
        raise ManifestValidationError(
            f"metric {metric_spec.name!r} requires truth and cannot be used in observational mode"
        )
    if metric_spec.requires_truth and record is not None and record.truth is None:
        raise ManifestValidationError(
            f"metric {metric_spec.name!r} requires truth but record {record.record_id!r} has none"
        )

lrdbench.validation.validate_truth_compatibility(estimator_spec, record)

Source code in src/lrdbench/validation.py
207
208
209
210
211
212
213
214
def validate_truth_compatibility(estimator_spec: EstimatorSpec, record: SeriesRecord) -> None:
    if record.truth is None:
        return
    if estimator_spec.target_estimand != record.truth.target_estimand:
        raise ManifestValidationError(
            f"estimator {estimator_spec.name!r} targets {estimator_spec.target_estimand!r} "
            f"but record truth targets {record.truth.target_estimand!r}"
        )

lrdbench.output_contract.public_output_contract()

Source code in src/lrdbench/output_contract.py
183
184
def public_output_contract() -> dict[str, Any]:
    return dict(PUBLIC_OUTPUT_CONTRACT)

lrdbench.output_contract.required_output_files()

Source code in src/lrdbench/output_contract.py
187
188
189
def required_output_files() -> tuple[str, ...]:
    required = PUBLIC_OUTPUT_CONTRACT["required_files"]
    return (*required["summary"], *required["raw_result_store"])

lrdbench.output_contract.validate_output_contract(run_root)

Source code in src/lrdbench/output_contract.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
def validate_output_contract(run_root: str | Path) -> list[str]:
    root = Path(run_root)
    errors: list[str] = []
    required_columns: dict[str, list[str]] = PUBLIC_OUTPUT_CONTRACT["required_columns"]

    for rel in required_output_files():
        path = root / rel
        if not path.is_file():
            errors.append(f"missing required file: {rel}")
            continue
        if rel.endswith(".csv"):
            errors.extend(_validate_csv_columns(path, rel, required_columns.get(rel, [])))

    for rel, columns in required_columns.items():
        if rel in required_output_files():
            continue
        path = root / rel
        if path.is_file():
            errors.extend(_validate_csv_columns(path, rel, columns))

    return errors

Bootstrap utilities

lrdbench.bootstrap.circular_block_resample(x, rng, block_len)

Circular block bootstrap resample of the same length as x.

The series is treated as circular (wrap-around at the boundaries) to avoid edge artefacts. Blocks are concatenated until the desired length is reached, then truncated.

Parameters:

Name Type Description Default
x ndarray

1-D input array.

required
rng Generator

NumPy random generator instance.

required
block_len int

Block length in samples. Clamped to [1, len(x)].

required

Returns:

Type Description
ndarray

A resampled array of the same shape as x.

Source code in src/lrdbench/bootstrap.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def circular_block_resample(x: np.ndarray, rng: np.random.Generator, block_len: int) -> np.ndarray:
    """Circular block bootstrap resample of the same length as ``x``.

    The series is treated as circular (wrap-around at the boundaries) to
    avoid edge artefacts. Blocks are concatenated until the desired length
    is reached, then truncated.

    Args:
        x: 1-D input array.
        rng: NumPy random generator instance.
        block_len: Block length in samples. Clamped to ``[1, len(x)]``.

    Returns:
        A resampled array of the same shape as ``x``.
    """
    n = int(x.size)
    if n == 0:
        return x
    bl = max(1, min(block_len, n))
    out: list[float] = []
    while len(out) < n:
        start = int(rng.integers(0, n))
        for j in range(bl):
            out.append(float(x[(start + j) % n]))
    return np.asarray(out[:n], dtype=np.float64)

lrdbench.bootstrap.bootstrap_statistic_distribution(x, rng, statistic, *, n_boot, block_len)

Compute a bootstrap distribution for statistic using circular block resampling.

Only finite replicate values are retained; None or non-finite results are silently dropped. This is important for estimators that may fail on short resampled blocks.

Parameters:

Name Type Description Default
x ndarray

1-D input array (the original time series).

required
rng Generator

NumPy random generator instance.

required
statistic Callable[[ndarray], float | None]

Function that takes a 1-D array and returns a scalar or None.

required
n_boot int

Number of bootstrap replicates.

required
block_len int

Block length in samples. A common pragmatic default is max(4, n // 10).

required

Returns:

Type Description
ndarray

1-D array of finite bootstrap replicates.

Source code in src/lrdbench/bootstrap.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def bootstrap_statistic_distribution(
    x: np.ndarray,
    rng: np.random.Generator,
    statistic: Callable[[np.ndarray], float | None],
    *,
    n_boot: int,
    block_len: int,
) -> np.ndarray:
    """Compute a bootstrap distribution for ``statistic`` using circular block resampling.

    Only finite replicate values are retained; ``None`` or non-finite results
    are silently dropped. This is important for estimators that may fail on
    short resampled blocks.

    Args:
        x: 1-D input array (the original time series).
        rng: NumPy random generator instance.
        statistic: Function that takes a 1-D array and returns a scalar or ``None``.
        n_boot: Number of bootstrap replicates.
        block_len: Block length in samples. A common pragmatic default is
            ``max(4, n // 10)``.

    Returns:
        1-D array of finite bootstrap replicates.
    """
    reps: list[float] = []
    for _ in range(max(1, n_boot)):
        xb = circular_block_resample(x, rng, block_len)
        s = statistic(xb)
        if s is not None and np.isfinite(s):
            reps.append(float(s))
    return np.asarray(reps, dtype=np.float64)

lrdbench.bootstrap.symmetric_percentile_cis(samples, alphas)

Symmetric percentile confidence intervals from bootstrap samples.

For each nominal level alpha the interval is [q_{(1-alpha)/2}, q_{1-(1-alpha)/2}] where q denotes the sample quantile of the bootstrap distribution.

Parameters:

Name Type Description Default
samples ndarray

1-D array of bootstrap replicates (e.g. from :func:bootstrap_statistic_distribution).

required
alphas tuple[float, ...]

Nominal coverage levels (e.g. (0.95, 0.99)). Invalid values outside (0, 1) are skipped.

required

Returns:

Type Description
tuple[float, float, float]

Tuple of (alpha, lower, upper) for each valid, deduplicated alpha.

...

Empty if samples has no elements.

Source code in src/lrdbench/bootstrap.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def symmetric_percentile_cis(
    samples: np.ndarray, alphas: tuple[float, ...]
) -> tuple[tuple[float, float, float], ...]:
    """Symmetric percentile confidence intervals from bootstrap samples.

    For each nominal level ``alpha`` the interval is
    ``[q_{(1-alpha)/2}, q_{1-(1-alpha)/2}]`` where ``q`` denotes the sample
    quantile of the bootstrap distribution.

    Args:
        samples: 1-D array of bootstrap replicates (e.g. from
            :func:`bootstrap_statistic_distribution`).
        alphas: Nominal coverage levels (e.g. ``(0.95, 0.99)``). Invalid
            values outside ``(0, 1)`` are skipped.

    Returns:
        Tuple of ``(alpha, lower, upper)`` for each valid, deduplicated alpha.
        Empty if ``samples`` has no elements.
    """
    if samples.size == 0:
        return ()
    out: list[tuple[float, float, float]] = []
    for alpha in sorted({float(a) for a in alphas}):
        if not 0.0 < alpha < 1.0:
            continue
        tail = (1.0 - alpha) / 2.0
        lo = float(np.quantile(samples, tail))
        hi = float(np.quantile(samples, 1.0 - tail))
        out.append((alpha, lo, hi))
    return tuple(out)

Plugin discovery

lrdbench.plugin_loader.PluginDiscoveryResult

Immutable record produced by a single plugin load attempt.

Source code in src/lrdbench/plugin_loader.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
class PluginDiscoveryResult:
    """Immutable record produced by a single plugin load attempt."""

    __slots__ = (
        "plugin_name",
        "module_name_or_path",
        "entry_point_name",
        "builder",
        "version",
        "status",
        "failure_reason",
        "source_hash",
    )

    def __init__(
        self,
        *,
        plugin_name: str,
        module_name_or_path: str,
        entry_point_name: str,
        builder: _EstimatorBuilderFn | None,
        status: str,
        version: str | None = None,
        failure_reason: str | None = None,
        source_hash: str | None = None,
    ) -> None:
        self.plugin_name = plugin_name
        self.module_name_or_path = module_name_or_path
        self.entry_point_name = entry_point_name
        self.builder = builder
        self.version = version
        self.status = status
        self.failure_reason = failure_reason
        self.source_hash = source_hash

    def __repr__(self) -> str:
        return (
            f"PluginDiscoveryResult({self.plugin_name!r}, {self.status}, "
            f"builder={'present' if self.builder is not None else 'None'})"
        )

lrdbench.plugin_loader.discover_plugins_from_env()

Read LRD_BENCH_ESTIMATOR_PLUGIN and LRD_BENCH_ESTIMATOR_PLUGIN_PATH and return all discovered (success or failure) results.

Source code in src/lrdbench/plugin_loader.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
def discover_plugins_from_env() -> tuple[PluginDiscoveryResult, ...]:
    """Read ``LRD_BENCH_ESTIMATOR_PLUGIN`` and ``LRD_BENCH_ESTIMATOR_PLUGIN_PATH``
    and return all discovered (success or failure) results.
    """
    results: list[PluginDiscoveryResult] = []

    # 1. Import-style plugins
    raw_imports = (os.environ.get("LRD_BENCH_ESTIMATOR_PLUGIN") or "").strip()
    if raw_imports:
        for module_name in raw_imports.split(":"):
            module_name = module_name.strip()
            if not module_name:
                continue
            try:
                mod = _load_module_by_import(module_name)
            except Exception as exc:
                results.append(
                    PluginDiscoveryResult(
                        plugin_name="__unknown__",
                        module_name_or_path=module_name,
                        entry_point_name="__import__",
                        builder=None,
                        status="load_failed",
                        failure_reason=f"{type(exc).__name__}: {exc}",
                    )
                )
                continue
            try:
                ep_dict = _load_entry_points(mod)
            except Exception as exc:
                results.append(
                    PluginDiscoveryResult(
                        plugin_name="__unknown__",
                        module_name_or_path=module_name,
                        entry_point_name="ENTRY_POINTS",
                        builder=None,
                        status="invalid_entry_points",
                        failure_reason=f"{type(exc).__name__}: {exc}",
                    )
                )
                continue
            if ep_dict is None:
                results.append(
                    PluginDiscoveryResult(
                        plugin_name="__unknown__",
                        module_name_or_path=module_name,
                        entry_point_name="ENTRY_POINTS",
                        builder=None,
                        status="no_entry_points",
                        failure_reason="module does not define ENTRY_POINTS",
                    )
                )
                continue
            results.extend(
                _plugin_results_from_entry_points(
                    ep_dict,
                    module_name_or_path=module_name,
                    version=_extract_version(mod),
                )
            )

    # 2. File-path plugins
    raw_paths = (os.environ.get("LRD_BENCH_ESTIMATOR_PLUGIN_PATH") or "").strip()
    if raw_paths:
        for p_str in raw_paths.split(os.pathsep):
            p_str = p_str.strip()
            if not p_str:
                continue
            p = Path(p_str)
            if not p.is_file() or p.suffix != ".py":
                results.append(
                    PluginDiscoveryResult(
                        plugin_name="__unknown__",
                        module_name_or_path=str(p.resolve()),
                        entry_point_name="__file__",
                        builder=None,
                        status="load_failed",
                        failure_reason="path does not exist or is not a .py file",
                    )
                )
                continue
            try:
                mod = _load_module_by_path(p)
            except Exception as exc:
                results.append(
                    PluginDiscoveryResult(
                        plugin_name="__unknown__",
                        module_name_or_path=str(p.resolve()),
                        entry_point_name="__file__",
                        builder=None,
                        status="load_failed",
                        failure_reason=f"{type(exc).__name__}: {exc}",
                    )
                )
                continue
            resolved_path = str(p.resolve())
            try:
                ep_dict = _load_entry_points(mod)
            except Exception as exc:
                results.append(
                    PluginDiscoveryResult(
                        plugin_name="__unknown__",
                        module_name_or_path=resolved_path,
                        entry_point_name="ENTRY_POINTS",
                        builder=None,
                        status="invalid_entry_points",
                        failure_reason=f"{type(exc).__name__}: {exc}",
                    )
                )
                continue
            if ep_dict is None:
                results.append(
                    PluginDiscoveryResult(
                        plugin_name="__unknown__",
                        module_name_or_path=resolved_path,
                        entry_point_name="ENTRY_POINTS",
                        builder=None,
                        status="no_entry_points",
                        failure_reason="module does not define ENTRY_POINTS",
                    )
                )
                continue
            file_hash = _file_sha256(p)
            results.extend(
                _plugin_results_from_entry_points(
                    ep_dict,
                    module_name_or_path=resolved_path,
                    version=_extract_version(mod),
                    source_hash=file_hash,
                )
            )

    return tuple(results)

lrdbench.plugin_loader.build_estimator_registry_with_plugins(*, base_registry=None, plugin_results=None)

Build an estimator registry, optionally including third-party plugins.

Returns the combined registry alongside the final plugin discovery results. Built-in estimators take precedence when names collide.

Source code in src/lrdbench/plugin_loader.py
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
def build_estimator_registry_with_plugins(
    *,
    base_registry: EstimatorRegistry | None = None,
    plugin_results: Sequence[PluginDiscoveryResult] | None = None,
) -> tuple[EstimatorRegistry, tuple[PluginDiscoveryResult, ...]]:
    """Build an estimator registry, optionally including third-party plugins.

    Returns the combined registry alongside the final plugin discovery results.
    Built-in estimators take precedence when names collide.
    """
    from lrdbench.defaults import build_default_estimator_registry

    registry = base_registry or build_default_estimator_registry()
    if plugin_results is None:
        plugin_results = discover_plugins_from_env()
    final_results = _register_plugins_into_registry(registry, plugin_results)
    return registry, final_results

Packaged assets and testing helpers

lrdbench.public_assets.list_public_suites()

Source code in src/lrdbench/public_assets.py
23
24
25
26
27
28
29
30
31
32
33
34
35
def list_public_suites() -> tuple[str, ...]:
    names: set[str] = set()
    source_dir = _source_suite_dir()
    if source_dir.is_dir():
        names.update(path.stem for path in source_dir.glob(f"*{_SUITE_SUFFIX}"))
    packaged_dir = _packaged_suite_dir()
    if packaged_dir.is_dir():
        names.update(
            path.name.removesuffix(_SUITE_SUFFIX)
            for path in packaged_dir.iterdir()
            if path.is_file() and path.name.endswith(_SUITE_SUFFIX)
        )
    return tuple(sorted(names))

lrdbench.public_assets.resolve_manifest_argument(value)

Source code in src/lrdbench/public_assets.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@contextmanager
def resolve_manifest_argument(value: str | Path) -> Iterator[Path]:
    path = Path(value)
    if path.is_file():
        yield path
        return

    name = str(value)
    if name.endswith(_SUITE_SUFFIX):
        name = name[: -len(_SUITE_SUFFIX)]

    source_candidate = _source_suite_dir() / f"{name}{_SUITE_SUFFIX}"
    if source_candidate.is_file():
        yield source_candidate
        return

    packaged_candidate = _packaged_suite_dir() / f"{name}{_SUITE_SUFFIX}"
    if packaged_candidate.is_file():
        with resources.as_file(packaged_candidate) as extracted:
            yield extracted
        return

    raise FileNotFoundError(f"manifest file or public suite not found: {value}")

lrdbench.testing.estimator_spec(*, name='CandidateEstimator', family='external', target_estimand='hurst_scaling_proxy', assumptions=(), supports_ci=False, supports_diagnostics=True, params=None, version=None)

Source code in src/lrdbench/testing.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def estimator_spec(
    *,
    name: str = "CandidateEstimator",
    family: str = "external",
    target_estimand: str = "hurst_scaling_proxy",
    assumptions: tuple[str, ...] = (),
    supports_ci: bool = False,
    supports_diagnostics: bool = True,
    params: dict[str, object] | None = None,
    version: str | None = None,
) -> EstimatorSpec:
    return EstimatorSpec(
        name=name,
        family=family,
        target_estimand=target_estimand,
        assumptions=assumptions,
        supports_ci=supports_ci,
        supports_diagnostics=supports_diagnostics,
        parameter_schema={} if params is None else dict(params),
        version=version,
    )

lrdbench.testing.synthetic_series_record(values, *, record_id='test_record', source_name='test')

Source code in src/lrdbench/testing.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def synthetic_series_record(
    values: np.ndarray | list[float],
    *,
    record_id: str = "test_record",
    source_name: str = "test",
) -> SeriesRecord:
    return SeriesRecord(
        record_id=record_id,
        values=np.asarray(values, dtype=float),
        time_axis=None,
        sampling_rate=None,
        source_type=SourceType.SYNTHETIC,
        source_name=source_name,
        truth=None,
    )

lrdbench.testing.smoke_fit_estimator(estimator, values, *, min_value=None, max_value=None)

Source code in src/lrdbench/testing.py
73
74
75
76
77
78
79
80
81
82
def smoke_fit_estimator(
    estimator: BaseEstimator,
    values: np.ndarray | list[float],
    *,
    min_value: float | None = None,
    max_value: float | None = None,
) -> EstimateResult:
    result = estimator.fit(synthetic_series_record(values))
    assert_valid_estimate(result, min_value=min_value, max_value=max_value)
    return result

lrdbench.testing.assert_valid_estimate(result, *, min_value=None, max_value=None)

Source code in src/lrdbench/testing.py
50
51
52
53
54
55
56
57
58
59
60
61
62
def assert_valid_estimate(
    result: EstimateResult,
    *,
    min_value: float | None = None,
    max_value: float | None = None,
) -> None:
    assert result.valid, result.failure_reason
    assert result.point is not None
    value = float(result.point)
    if min_value is not None:
        assert value >= min_value
    if max_value is not None:
        assert value <= max_value

lrdbench.testing.assert_invalid_estimate(result, *, reason_contains=None)

Source code in src/lrdbench/testing.py
65
66
67
68
69
70
def assert_invalid_estimate(result: EstimateResult, *, reason_contains: str | None = None) -> None:
    assert not result.valid
    assert result.point is None
    assert result.failure_reason is not None
    if reason_contains is not None:
        assert reason_contains in result.failure_reason