Skip to content

Job metadata

execution.models.job_metadata

Job Metadata and Configuration Models

This module defines the data structures and enumerations used for job metadata, configuration, status tracking, and resource monitoring throughout the ETL pipeline.

ResourceUsage dataclass

Container for tracking job resource consumption metrics.

Used for monitoring and optimizing job performance across different execution environments (local, Glue, etc.).

cpu_usage_percent: Optional[float] = None class-attribute instance-attribute

memory_usage_mb: Optional[float] = None class-attribute instance-attribute

disk_usage_mb: Optional[float] = None class-attribute instance-attribute

network_io_mb: Optional[float] = None class-attribute instance-attribute

execution_time_seconds: Optional[float] = None class-attribute instance-attribute

__init__(cpu_usage_percent: Optional[float] = None, memory_usage_mb: Optional[float] = None, disk_usage_mb: Optional[float] = None, network_io_mb: Optional[float] = None, execution_time_seconds: Optional[float] = None) -> None

JobStatus

Enumeration of possible job execution states.

Used for tracking job lifecycle and enabling monitoring, alerting, and recovery mechanisms.

PENDING = 'pending' class-attribute instance-attribute

RUNNING = 'running' class-attribute instance-attribute

COMPLETED = 'completed' class-attribute instance-attribute

FAILED = 'failed' class-attribute instance-attribute

CANCELLED = 'cancelled' class-attribute instance-attribute

RETRYING = 'retrying' class-attribute instance-attribute

JobType

Enumeration of supported ETL job types.

Defines the different categories of processing jobs available in the Owl-Watch pipeline.

ETL = 'etl' class-attribute instance-attribute

CLEANING = 'cleaned' class-attribute instance-attribute

SENTIMENT = 'sentiment' class-attribute instance-attribute

NLP = 'nlp' class-attribute instance-attribute

ANALYTICS = 'analytics' class-attribute instance-attribute

COMMUNICATION_ETL = 'communication_etl' class-attribute instance-attribute

DataSource

Enumeration of supported data sources.

Identifies the origin systems and datasets that can be processed by the ETL pipeline.

ENRON = 'enron' class-attribute instance-attribute

WHATSAPP = 'whatsapp' class-attribute instance-attribute

CLINTON_EMAILS = 'clinton_emails' class-attribute instance-attribute

BIDEN_EMAILS = 'biden_emails' class-attribute instance-attribute

JobConfiguration dataclass

job_type: JobType instance-attribute

job_name: str instance-attribute

incoming_file: Optional[str] = None class-attribute instance-attribute

__init__(job_type: JobType, job_name: str, incoming_file: Optional[str] = None) -> None

ProcessingContext dataclass

execution_id: str instance-attribute

job_type: JobType instance-attribute

environment: str = 'production' class-attribute instance-attribute

aws_region: Optional[str] = None class-attribute instance-attribute

glue_job_name: Optional[str] = None class-attribute instance-attribute

glue_job_run_id: Optional[str] = None class-attribute instance-attribute

spark_application_id: Optional[str] = None class-attribute instance-attribute

input_paths: List[str] = field(default_factory=list) class-attribute instance-attribute

output_paths: List[str] = field(default_factory=list) class-attribute instance-attribute

schema_version: Optional[str] = None class-attribute instance-attribute

batch_size: Optional[int] = None class-attribute instance-attribute

parallelism_level: Optional[int] = None class-attribute instance-attribute

memory_allocation: Optional[str] = None class-attribute instance-attribute

created_at: datetime = field(default_factory=(lambda: datetime.now(UTC))) class-attribute instance-attribute

started_at: Optional[datetime] = None class-attribute instance-attribute

completed_at: Optional[datetime] = None class-attribute instance-attribute

triggered_by: Optional[str] = None class-attribute instance-attribute

trigger_type: str = 'manual' class-attribute instance-attribute

processing_duration: Optional[timedelta] property

is_running: bool property

__init__(execution_id: str, job_type: JobType, environment: str = 'production', aws_region: Optional[str] = None, glue_job_name: Optional[str] = None, glue_job_run_id: Optional[str] = None, spark_application_id: Optional[str] = None, input_paths: List[str] = list(), output_paths: List[str] = list(), schema_version: Optional[str] = None, batch_size: Optional[int] = None, parallelism_level: Optional[int] = None, memory_allocation: Optional[str] = None, created_at: datetime = (lambda: datetime.now(UTC))(), started_at: Optional[datetime] = None, completed_at: Optional[datetime] = None, triggered_by: Optional[str] = None, trigger_type: str = 'manual') -> None

start_processing()

complete_processing()

JobMetadata dataclass

job_type: JobType = JobType.ETL class-attribute instance-attribute

job_id: str = '' class-attribute instance-attribute

job_name: str = '' class-attribute instance-attribute

job_version: str = '1.0' class-attribute instance-attribute

status: JobStatus = JobStatus.PENDING class-attribute instance-attribute

execution_id: Optional[str] = None class-attribute instance-attribute

attempt_number: int = 1 class-attribute instance-attribute

max_attempts: int = 3 class-attribute instance-attribute

configuration: Optional[JobConfiguration] = None class-attribute instance-attribute

processing_context: Optional[ProcessingContext] = None class-attribute instance-attribute

scheduled_at: Optional[datetime] = None class-attribute instance-attribute

started_at: Optional[datetime] = None class-attribute instance-attribute

completed_at: Optional[datetime] = None class-attribute instance-attribute

processing_stats: Optional[ProcessingStats] = None class-attribute instance-attribute

output_locations: List[str] = field(default_factory=list) class-attribute instance-attribute

errors: List[ErrorInfo] = field(default_factory=list) class-attribute instance-attribute

last_error: Optional[str] = None class-attribute instance-attribute

execution_duration: Optional[timedelta] property

is_final_state: bool property

success_rate: float property

__init__(id: str, record_type: RecordType, created_at: datetime = datetime.now(), updated_at: datetime = datetime.now(), metadata: Dict[str, Any] = dict(), processing_stats: Optional[ProcessingStats] = None, errors: List[ErrorInfo] = list(), job_type: JobType = JobType.ETL, job_id: str = '', job_name: str = '', job_version: str = '1.0', status: JobStatus = JobStatus.PENDING, execution_id: Optional[str] = None, attempt_number: int = 1, max_attempts: int = 3, configuration: Optional[JobConfiguration] = None, processing_context: Optional[ProcessingContext] = None, scheduled_at: Optional[datetime] = None, started_at: Optional[datetime] = None, completed_at: Optional[datetime] = None, output_locations: List[str] = list(), last_error: Optional[str] = None) -> None

__post_init__()

validate() -> List[str]

start_job(execution_id: str)

complete_job(stats: ProcessingStats, output_locations: Optional[List[str]] = None)

fail_job(error: ErrorInfo)

retry_job()

JobResult dataclass

job_type: JobType instance-attribute

job_id: str = '' class-attribute instance-attribute

execution_id: str = '' class-attribute instance-attribute

status: JobStatus = JobStatus.PENDING class-attribute instance-attribute

success: bool = False class-attribute instance-attribute

message: str = '' class-attribute instance-attribute

processing_stats: Optional[ProcessingStats] = None class-attribute instance-attribute

output_files: List[str] = field(default_factory=list) class-attribute instance-attribute

output_tables: List[str] = field(default_factory=list) class-attribute instance-attribute

reports: Dict[str, str] = field(default_factory=dict) class-attribute instance-attribute

quality_score: Optional[float] = None class-attribute instance-attribute

validation_results: List[Dict[str, Any]] = field(default_factory=list) class-attribute instance-attribute

errors: List[ErrorInfo] = field(default_factory=list) class-attribute instance-attribute

warnings: List[str] = field(default_factory=list) class-attribute instance-attribute

started_at: Optional[datetime] = None class-attribute instance-attribute

completed_at: Optional[datetime] = None class-attribute instance-attribute

execution_duration: Optional[timedelta] property

__init__(job_type: JobType, job_id: str = '', execution_id: str = '', status: JobStatus = JobStatus.PENDING, success: bool = False, message: str = '', processing_stats: Optional[ProcessingStats] = None, output_files: List[str] = list(), output_tables: List[str] = list(), reports: Dict[str, str] = dict(), quality_score: Optional[float] = None, validation_results: List[Dict[str, Any]] = list(), errors: List[ErrorInfo] = list(), warnings: List[str] = list(), started_at: Optional[datetime] = None, completed_at: Optional[datetime] = None) -> None

add_output_file(file_path: str, file_type: str = 'data')

add_error(error: ErrorInfo)

add_warning(warning: str)

to_summary_dict() -> Dict[str, Any]