Skip to content

API Reference

This page is automatically generated from the source code docstrings.

open_auto_loader.main.OpenAutoLoader

The main orchestrator for incremental data loading.

Attributes:

Name Type Description
source str

The source path (Local or Cloud URL).

target str

The target Delta Lake path.

format_type str

Format of source files (csv, parquet, ndjson).

Source code in src\open_auto_loader\main.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
class OpenAutoLoader:
    """
    The main orchestrator for incremental data loading.

    Attributes:
        source (str): The source path (Local or Cloud URL).
        target (str): The target Delta Lake path.
        format_type (str): Format of source files (csv, parquet, ndjson).
    """

    _RESERVED_KEYS: set[str] = {"_file_path", "_processed_at", "_batch_id"}

    def __init__(
        self,
        source: str,
        target: str,
        check_point: str,
        schema_path: str,
        format_type: str = "csv",
        table_type: str = "delta",
        storage_config: StorageConfig | dict | None = None,
        metadata: dict | None = None,
        evolution_mode: SchemaEvolutionMode = SchemaEvolutionMode.FAIL_ON_NEW_COLUMNS,
    ):
        self.source = source
        self.target = target
        self.check_point = check_point
        self.schema_path = schema_path
        self.format_type = format_type
        self.evolution_mode = evolution_mode

        # 1. Metadata setup & Validation
        self.metadata = metadata or {}
        self._validate_metadata()

        # 2. Standardize Storage Config
        if isinstance(storage_config, dict):
            self.config = get_storage_config(source, storage_config)
        else:
            self.config = storage_config

        # 3. Extract options for internal modules
        self.storage_options = (
            self.config.get_options() if hasattr(self.config, "get_options") else {}
        )

        # 4. Initialize Managers
        self.check_point_manager = CheckPointManager(check_point)
        self.schema_manager = SchemaManager(schema_path)

        # 5. Initialize Cloud-Aware Modules
        self.engine = PolarsEngine(
            target_path=target,
            format_type=format_type,
            table_type=table_type,
            storage_options=self.storage_options,
        )
        self.file_scanner = FileScanner(
            source, format_type, storage_options=self.storage_options
        )

    def _validate_metadata(self) -> None:
        """Ensures user metadata doesn't collide with internal audit columns."""
        for key in self.metadata:
            if key in self._RESERVED_KEYS:
                raise ValueError(
                    f"Forbidden key '{key}' found in metadata. "
                    f"Reserved keys are: {list(self._RESERVED_KEYS)}"
                )

    def run(self, batch_id: str) -> None:
        """
        Executes the ingestion loop for all new files.

        Args:
            batch_id: A unique identifier for this processing run.
        """
        logger.info("Starting OpenAutoLoader batch", extra={"batch_id": batch_id})

        new_files = self.file_scanner.get_eligible_files(self.check_point_manager)

        if not new_files:
            logger.info("No new files found to process. Exiting gracefully.")
            return

        if not self.schema_manager.schema_exists():
            logger.info("No schema found. Bootstrapping...")
            inferred_schema = self.engine.get_inferred_schema(new_files[0])
            self.schema_manager.save_schema(inferred_schema)

        locked_schema = self.schema_manager.load_schema()

        for file_path in new_files:
            current_file_schema = self.engine.get_inferred_schema(file_path)
            diffs = self.schema_manager.check_differences(current_file_schema)

            columns_to_write = None
            if self.evolution_mode == SchemaEvolutionMode.NONE:
                columns_to_write = list(locked_schema.keys())

            if diffs["extra"]:
                if self.evolution_mode == SchemaEvolutionMode.FAIL_ON_NEW_COLUMNS:
                    raise SchemaMismatchError(
                        "New Columns detected",
                        extra_columns=diffs["extra"],
                        file_path=str(file_path),
                    )

                elif self.evolution_mode == SchemaEvolutionMode.ADD_NEW_COLUMNS:
                    logger.info(f"Evolving schema: adding {diffs['extra']}")
                    self.schema_manager.evolve_schema(current_file_schema)
                    locked_schema = self.schema_manager.load_schema()
                    columns_to_write = None

                elif self.evolution_mode == SchemaEvolutionMode.NONE:
                    logger.warning(f"Ignoring extra columns: {diffs['extra']}")

            self.schema_manager.validate(
                current_schema=current_file_schema,
                evolution_mode=self.evolution_mode,
                file_path=str(file_path),
            )

            try:
                self.engine.process_single_file(
                    file_path=file_path,
                    schema_dict=locked_schema,
                    batch_id=batch_id,
                    metadata=self.metadata,
                    evolution_mode=self.evolution_mode,
                    selected_columns=columns_to_write,
                )

                self.check_point_manager.mark_processed(file_path, batch_id=batch_id)
                logger.info("Processed successfully", extra={"file_path": file_path})

            except Exception as e:
                logger.exception(
                    "Operational failure during ingestion",
                    extra={"file_path": file_path, "error": str(e)},
                )
                raise

        logger.info("Batch completed.", extra={"batch_id": batch_id})

run(batch_id)

Executes the ingestion loop for all new files.

Parameters:

Name Type Description Default
batch_id str

A unique identifier for this processing run.

required
Source code in src\open_auto_loader\main.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def run(self, batch_id: str) -> None:
    """
    Executes the ingestion loop for all new files.

    Args:
        batch_id: A unique identifier for this processing run.
    """
    logger.info("Starting OpenAutoLoader batch", extra={"batch_id": batch_id})

    new_files = self.file_scanner.get_eligible_files(self.check_point_manager)

    if not new_files:
        logger.info("No new files found to process. Exiting gracefully.")
        return

    if not self.schema_manager.schema_exists():
        logger.info("No schema found. Bootstrapping...")
        inferred_schema = self.engine.get_inferred_schema(new_files[0])
        self.schema_manager.save_schema(inferred_schema)

    locked_schema = self.schema_manager.load_schema()

    for file_path in new_files:
        current_file_schema = self.engine.get_inferred_schema(file_path)
        diffs = self.schema_manager.check_differences(current_file_schema)

        columns_to_write = None
        if self.evolution_mode == SchemaEvolutionMode.NONE:
            columns_to_write = list(locked_schema.keys())

        if diffs["extra"]:
            if self.evolution_mode == SchemaEvolutionMode.FAIL_ON_NEW_COLUMNS:
                raise SchemaMismatchError(
                    "New Columns detected",
                    extra_columns=diffs["extra"],
                    file_path=str(file_path),
                )

            elif self.evolution_mode == SchemaEvolutionMode.ADD_NEW_COLUMNS:
                logger.info(f"Evolving schema: adding {diffs['extra']}")
                self.schema_manager.evolve_schema(current_file_schema)
                locked_schema = self.schema_manager.load_schema()
                columns_to_write = None

            elif self.evolution_mode == SchemaEvolutionMode.NONE:
                logger.warning(f"Ignoring extra columns: {diffs['extra']}")

        self.schema_manager.validate(
            current_schema=current_file_schema,
            evolution_mode=self.evolution_mode,
            file_path=str(file_path),
        )

        try:
            self.engine.process_single_file(
                file_path=file_path,
                schema_dict=locked_schema,
                batch_id=batch_id,
                metadata=self.metadata,
                evolution_mode=self.evolution_mode,
                selected_columns=columns_to_write,
            )

            self.check_point_manager.mark_processed(file_path, batch_id=batch_id)
            logger.info("Processed successfully", extra={"file_path": file_path})

        except Exception as e:
            logger.exception(
                "Operational failure during ingestion",
                extra={"file_path": file_path, "error": str(e)},
            )
            raise

    logger.info("Batch completed.", extra={"batch_id": batch_id})

open_auto_loader.core.engine.PolarsEngine

Source code in src\open_auto_loader\core\engine.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
class PolarsEngine:
    def __init__(
        self,
        target_path: str,
        format_type: str = "csv",
        table_type: str = "delta",
        storage_options: dict | None = None,
    ):
        self.target_path = target_path
        self.table_type = table_type
        self.storage_options = storage_options or {}

        # Determine if target is cloud or local
        self.target_protocol = (
            target_path.split("://")[0] if "://" in target_path else "file"
        )

        self.reader = ReaderFactory.get_reader_by_format(format_type)

    def get_inferred_schema(self, file_path: str):
        """Helper for the SchemaManager to bootstrap the JSON contract."""
        return self.reader.get_schema(file_path, storage_options=self.storage_options)

    def process_single_file(
        self,
        file_path: str,
        schema_dict: dict,
        batch_id: str,
        metadata: dict,
        evolution_mode: SchemaEvolutionMode,
        selected_columns: list[str] | None = None,
    ):
        """The core ETL step: Read -> Enrich -> Sink with Rescue logic."""

        reserved = {"_batch_id", "_processed_at", "_file_path"}
        if overlap := reserved & metadata.keys():
            raise ValueError(f"Metadata keys conflict with audit columns: {overlap}")

        # 1. Build scan kwargs
        # If mode is RESCUE, we DON'T pass 'columns' to the reader because
        # we need to read 'extra' columns to bundle them into JSON.
        is_rescue = evolution_mode == SchemaEvolutionMode.RESCUE

        scan_kwargs: dict = {
            "schema": (
                {k: v for k, v in schema_dict.items() if k in selected_columns}
                if (selected_columns and not is_rescue)
                else schema_dict
            )
        }

        # Prune only if NOT rescuing
        if selected_columns and not is_rescue:
            scan_kwargs["columns"] = selected_columns

        # 2. Scan
        lf = self.reader.scan(file_path, **scan_kwargs)
        cast_exprs = [
            pl.col(c).cast(schema_dict[c])
            for c in schema_dict
            if c in lf.collect_schema().names()
        ]
        if cast_exprs:
            lf = lf.with_columns(cast_exprs)
        # 3. Handle Rescue Logic
        rescue_col_name = "_rescued_data"
        active_selection = selected_columns.copy() if selected_columns else None

        if is_rescue:
            all_file_cols = lf.collect_schema().names()
            known_cols = set(selected_columns or schema_dict.keys())
            extra_cols = [c for c in all_file_cols if c not in known_cols]

            if extra_cols:
                lf = lf.with_columns(
                    pl.struct(extra_cols)
                    .map_elements(json.dumps, return_dtype=pl.String)
                    .alias(rescue_col_name)
                )
                lf = lf.drop(extra_cols)  # drop raw extras, keep only _rescued_data
                if active_selection is not None:
                    active_selection.append(rescue_col_name)

        # 4. Audit + metadata columns
        processed_at = datetime.now(UTC)
        audit_cols = [
            pl.lit(batch_id).alias("_batch_id"),
            pl.lit(processed_at).alias("_processed_at"),
            pl.lit(file_path).alias("_file_path"),
            *[pl.lit(v).alias(k) for k, v in metadata.items()],
        ]
        lf = lf.with_columns(audit_cols)

        # 5. Final column selection
        audit_col_names = [*reserved, *metadata.keys()]
        if active_selection:
            lf = lf.select([*active_selection, *audit_col_names])

        # 6. Sink
        lf.sink_delta(
            self.target_path,
            mode="append",
            delta_write_options={"schema_mode": "merge"},
            storage_options=self.storage_options or None,
        )

get_inferred_schema(file_path)

Helper for the SchemaManager to bootstrap the JSON contract.

Source code in src\open_auto_loader\core\engine.py
31
32
33
def get_inferred_schema(self, file_path: str):
    """Helper for the SchemaManager to bootstrap the JSON contract."""
    return self.reader.get_schema(file_path, storage_options=self.storage_options)

process_single_file(file_path, schema_dict, batch_id, metadata, evolution_mode, selected_columns=None)

The core ETL step: Read -> Enrich -> Sink with Rescue logic.

Source code in src\open_auto_loader\core\engine.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def process_single_file(
    self,
    file_path: str,
    schema_dict: dict,
    batch_id: str,
    metadata: dict,
    evolution_mode: SchemaEvolutionMode,
    selected_columns: list[str] | None = None,
):
    """The core ETL step: Read -> Enrich -> Sink with Rescue logic."""

    reserved = {"_batch_id", "_processed_at", "_file_path"}
    if overlap := reserved & metadata.keys():
        raise ValueError(f"Metadata keys conflict with audit columns: {overlap}")

    # 1. Build scan kwargs
    # If mode is RESCUE, we DON'T pass 'columns' to the reader because
    # we need to read 'extra' columns to bundle them into JSON.
    is_rescue = evolution_mode == SchemaEvolutionMode.RESCUE

    scan_kwargs: dict = {
        "schema": (
            {k: v for k, v in schema_dict.items() if k in selected_columns}
            if (selected_columns and not is_rescue)
            else schema_dict
        )
    }

    # Prune only if NOT rescuing
    if selected_columns and not is_rescue:
        scan_kwargs["columns"] = selected_columns

    # 2. Scan
    lf = self.reader.scan(file_path, **scan_kwargs)
    cast_exprs = [
        pl.col(c).cast(schema_dict[c])
        for c in schema_dict
        if c in lf.collect_schema().names()
    ]
    if cast_exprs:
        lf = lf.with_columns(cast_exprs)
    # 3. Handle Rescue Logic
    rescue_col_name = "_rescued_data"
    active_selection = selected_columns.copy() if selected_columns else None

    if is_rescue:
        all_file_cols = lf.collect_schema().names()
        known_cols = set(selected_columns or schema_dict.keys())
        extra_cols = [c for c in all_file_cols if c not in known_cols]

        if extra_cols:
            lf = lf.with_columns(
                pl.struct(extra_cols)
                .map_elements(json.dumps, return_dtype=pl.String)
                .alias(rescue_col_name)
            )
            lf = lf.drop(extra_cols)  # drop raw extras, keep only _rescued_data
            if active_selection is not None:
                active_selection.append(rescue_col_name)

    # 4. Audit + metadata columns
    processed_at = datetime.now(UTC)
    audit_cols = [
        pl.lit(batch_id).alias("_batch_id"),
        pl.lit(processed_at).alias("_processed_at"),
        pl.lit(file_path).alias("_file_path"),
        *[pl.lit(v).alias(k) for k, v in metadata.items()],
    ]
    lf = lf.with_columns(audit_cols)

    # 5. Final column selection
    audit_col_names = [*reserved, *metadata.keys()]
    if active_selection:
        lf = lf.select([*active_selection, *audit_col_names])

    # 6. Sink
    lf.sink_delta(
        self.target_path,
        mode="append",
        delta_write_options={"schema_mode": "merge"},
        storage_options=self.storage_options or None,
    )