Base
execution.models.base
RecordType
EMAIL = 'email'
class-attribute
instance-attribute
COMMUNICATION = 'communication'
class-attribute
instance-attribute
USER_PROFILE = 'user_profile'
class-attribute
instance-attribute
ANALYTICS = 'analytics'
class-attribute
instance-attribute
DATA_QUALITY = 'data_quality'
class-attribute
instance-attribute
ErrorInfo
dataclass
error_type: str
instance-attribute
error_message: str
instance-attribute
error_code: Optional[str] = None
class-attribute
instance-attribute
timestamp: datetime = field(default_factory=(datetime.now))
class-attribute
instance-attribute
stacktrace: Optional[str] = None
class-attribute
instance-attribute
context: Dict[str, Any] = field(default_factory=dict)
class-attribute
instance-attribute
__init__(error_type: str, error_message: str, error_code: Optional[str] = None, timestamp: datetime = datetime.now(), stacktrace: Optional[str] = None, context: Dict[str, Any] = dict()) -> None
ProcessingStats
dataclass
records_processed: int = 0
class-attribute
instance-attribute
records_successful: int = 0
class-attribute
instance-attribute
records_failed: int = 0
class-attribute
instance-attribute
processing_time_seconds: float = 0.0
class-attribute
instance-attribute
memory_usage_mb: Optional[float] = None
class-attribute
instance-attribute
cpu_usage_percent: Optional[float] = None
class-attribute
instance-attribute
success_rate: float
property
failure_rate: float
property
__init__(records_processed: int = 0, records_successful: int = 0, records_failed: int = 0, processing_time_seconds: float = 0.0, memory_usage_mb: Optional[float] = None, cpu_usage_percent: Optional[float] = None) -> None
BaseRecord
dataclass
id: str
instance-attribute
record_type: RecordType
instance-attribute
created_at: datetime = field(default_factory=(datetime.now))
class-attribute
instance-attribute
updated_at: datetime = field(default_factory=(datetime.now))
class-attribute
instance-attribute
processing_stats: Optional[ProcessingStats] = None
class-attribute
instance-attribute
errors: List[ErrorInfo] = field(default_factory=list)
class-attribute
instance-attribute
__init__(id: str, record_type: RecordType, created_at: datetime = datetime.now(), updated_at: datetime = datetime.now(), metadata: Dict[str, Any] = dict(), processing_stats: Optional[ProcessingStats] = None, errors: List[ErrorInfo] = list()) -> None
add_error(error_type: str, error_message: str, error_code: Optional[str] = None, context: Optional[Dict[str, Any]] = None) -> None