Skip to content

Base job

execution.models.base_job

Base ETL Job Model

This module provides the abstract base class for all ETL jobs in the Owl-Watch pipeline. It standardizes job initialization, Spark/Glue context management, and common operations.

BaseGlueETLJob

Abstract base class for all ETL jobs in the Owl-Watch pipeline.

This class provides common functionality for AWS Glue ETL jobs including: - Spark/Glue context initialization - Argument parsing and validation - Schema validation - Common data operations (ID generation, field addition) - Job lifecycle management (commit, cleanup)

Attributes:

Name Type Description
args_to_extract

List of command line arguments to extract

args

Dictionary of parsed job arguments

sc

SparkContext for Spark operations

glue_context

GlueContext for AWS Glue operations

spark

SparkSession for DataFrame operations

job

Glue Job instance for job management

schema_validator

Validator for DataFrame schemas

args_to_extract = args_to_extract + ['JOB_NAME'] instance-attribute

args = self._get_args() instance-attribute

schema_validator = SchemaValidator(check_nullable=False) instance-attribute

__init__(args_to_extract: List[str])

Initialize the base ETL job with argument parsing and context setup.

Parameters:

Name Type Description Default
args_to_extract List[str]

List of command line arguments to extract and parse

required

commit()

Commit the Glue job (save state, bookmarks, etc.).

cleanup()

Clean up resources by stopping the Spark context.

get_expected_schema() -> StructType abstractmethod

Return the expected schema for this job's data.

Must be implemented by subclasses to define their expected data structure.

Returns:

Type Description
StructType

StructType defining the expected DataFrame schema

run() -> DataFrame abstractmethod

add_base_fields(df: DataFrame, data_source: DataSource, dataset_name: str) -> DataFrame

generate_record_id(df: DataFrame, prefix: str = 'REC') -> DataFrame

validate_schema(df: DataFrame, expected_schema: StructType) -> ValidationResult

add_quality_flags(df: DataFrame) -> DataFrame

add_partitions_to_dataframe(df: DataFrame, partitions: Dict[str, str], source_key: str) -> DataFrame

select_expected_fields(df: DataFrame) -> DataFrame