Skip to content

Base

execution.models.base

RecordType

EMAIL = 'email' class-attribute instance-attribute

COMMUNICATION = 'communication' class-attribute instance-attribute

USER_PROFILE = 'user_profile' class-attribute instance-attribute

ANALYTICS = 'analytics' class-attribute instance-attribute

DATA_QUALITY = 'data_quality' class-attribute instance-attribute

METADATA = 'metadata' class-attribute instance-attribute

ErrorInfo dataclass

error_type: str instance-attribute

error_message: str instance-attribute

error_code: Optional[str] = None class-attribute instance-attribute

timestamp: datetime = field(default_factory=(datetime.now)) class-attribute instance-attribute

stacktrace: Optional[str] = None class-attribute instance-attribute

context: Dict[str, Any] = field(default_factory=dict) class-attribute instance-attribute

__init__(error_type: str, error_message: str, error_code: Optional[str] = None, timestamp: datetime = datetime.now(), stacktrace: Optional[str] = None, context: Dict[str, Any] = dict()) -> None

ProcessingStats dataclass

records_processed: int = 0 class-attribute instance-attribute

records_successful: int = 0 class-attribute instance-attribute

records_failed: int = 0 class-attribute instance-attribute

processing_time_seconds: float = 0.0 class-attribute instance-attribute

memory_usage_mb: Optional[float] = None class-attribute instance-attribute

cpu_usage_percent: Optional[float] = None class-attribute instance-attribute

success_rate: float property

failure_rate: float property

__init__(records_processed: int = 0, records_successful: int = 0, records_failed: int = 0, processing_time_seconds: float = 0.0, memory_usage_mb: Optional[float] = None, cpu_usage_percent: Optional[float] = None) -> None

BaseRecord dataclass

id: str instance-attribute

record_type: RecordType instance-attribute

created_at: datetime = field(default_factory=(datetime.now)) class-attribute instance-attribute

updated_at: datetime = field(default_factory=(datetime.now)) class-attribute instance-attribute

metadata: Dict[str, Any] = field(default_factory=dict) class-attribute instance-attribute

processing_stats: Optional[ProcessingStats] = None class-attribute instance-attribute

errors: List[ErrorInfo] = field(default_factory=list) class-attribute instance-attribute

__init__(id: str, record_type: RecordType, created_at: datetime = datetime.now(), updated_at: datetime = datetime.now(), metadata: Dict[str, Any] = dict(), processing_stats: Optional[ProcessingStats] = None, errors: List[ErrorInfo] = list()) -> None

__post_init__()

add_error(error_type: str, error_message: str, error_code: Optional[str] = None, context: Optional[Dict[str, Any]] = None) -> None

has_errors() -> bool

update_metadata(key: str, value: Any) -> None