Documentation
Callback Types
Type-safe event objects passed to upload callbacks
Batch progress tracking
client.files.upload() accepts an on_progress callback for per-file read progress. For batch uploads, poll client.files.get_processing_status(file_id) or use client.files.wait_for_session(session_id, on_progress=...) after calling upload_batch(). Cloud storage, video, and document upload flows have their own dedicated callback surfaces.
UploadProgressEvent
Event fired once per file after it is read and prepared for upload
@dataclass(frozen=True)class UploadProgressEvent: file_index: int # Zero-based index of file being uploaded uploaded_bytes: int # File size in bytes (always equals total_bytes) total_bytes: int # Total file size in bytes filename: Optional[str] # Name of file that was prepared
# Properties @property def progress_percent(self) -> float: ... # Always 100.0
# Serialization def to_dict(self, exclude_none: bool = True) -> dict[str, Any]: ...Document Upload Callbacks
DocumentUploadProgressEvent
Event fired once per document file after it has been uploaded to storage
@dataclass(frozen=True)class DocumentUploadProgressEvent: file_index: int # Zero-based index of the file that was uploaded uploaded_bytes: int # Size of the uploaded file in bytes (always equals total_bytes) total_bytes: int # Total size of the file in bytes filename: Optional[str] # Name of the file that was uploaded
# Properties @property def progress_percent(self) -> float: ... # Always 100.0
# Serialization def to_dict(self, exclude_none: bool = True) -> dict[str, Any]: ...DocumentFileCompleteEvent
Event fired when a single document file upload completes
@dataclass(frozen=True)class DocumentFileCompleteEvent: file_index: int # Zero-based index of completed file result: DocumentUploadResult # Document upload result with document_id, filename, etc.
def to_dict(self, exclude_none: bool = True) -> dict[str, Any]: ...DocumentProcessingProgressEvent
Event fired as document processing (text extraction) completes during polling
@dataclass(frozen=True)class DocumentProcessingProgressEvent: completed_count: int # Documents completed so far total_count: int # Total documents awaiting processing latest_result: DocumentUploadResult # Most recently completed result
# Properties @property def progress_percent(self) -> float: ... # 0-100 @property def remaining_count(self) -> int: ...
def to_dict(self, exclude_none: bool = True) -> dict[str, Any]: ...DocumentProcessingFailedEvent
Event fired when document processing fails
@dataclass(frozen=True)class DocumentProcessingFailedEvent: file_index: int # Zero-based index of failed file result: DocumentUploadResult # Result with error details populated
# Properties @property def error_message(self) -> Optional[str]: ... @property def is_retryable(self) -> bool: ...
def to_dict(self, exclude_none: bool = True) -> dict[str, Any]: ...Video Analysis Callbacks
Video uploads go through the unified client.files.upload() path and fire the same UploadProgressEvent as other media types. The video-specific callback below is for monitoring AI scene analysis progress after upload completes.
VideoAnalysisProgressEvent
Event fired on each poll during client.video_analysis.wait_for_completion().
@dataclass(frozen=True)class VideoAnalysisProgressEvent: job_status: VideoAnalysisJobStatus # Current job status snapshot
# Properties @property def progress_percent(self) -> float: ... # Delegates to job_status.progress_percent @property def is_terminal(self) -> bool: ... # Delegates to job_status.is_terminal
def to_dict(self, exclude_none: bool = True) -> dict[str, Any]: ...Example
def on_progress(status: VideoAnalysisJobStatus) -> None: print(f"{status.progress_percent:.0f}% — {status.current_step}")
final = await client.video_analysis.wait_for_completion( job_id, timeout=600, on_progress=on_progress,)Cloud Storage Callbacks
CloudStorageJobProgressEvent
Event fired on each poll during wait_for_job, import_and_wait, or export_and_wait
@dataclass(frozen=True)class CloudStorageJobProgressEvent: job: CloudStorageJob # Current job status snapshot
# Properties @property def progress_percent(self) -> float: ... # Delegates to job.progress_percent @property def is_terminal(self) -> bool: ... # Delegates to job.is_terminal
def to_dict(self, exclude_none: bool = True) -> dict[str, Any]: ...Cloud Storage Callback Example
Using progress callbacks with cloud storage import
from scopix import Scopixfrom scopix.types.callbacks import CloudStorageJobProgressEvent
def on_import_progress(event: CloudStorageJobProgressEvent): job = event.job print(f"Import progress: {event.progress_percent:.0f}% " f"({job.completed_files}/{job.total_files} files, " f"{job.failed_files} failed)")
async with Scopix(api_key="scopix_...") as client: job = await client.cloud_storage.import_and_wait( connection_id="conn_abc123", files=files, on_progress=on_import_progress, )Usage Example
Using the progress callback on a single file upload
from scopix import Scopix, UploadProgressEvent
def on_progress(event: UploadProgressEvent): print(f"Prepared: {event.filename} ({event.total_bytes} bytes)")
async with Scopix(api_key="scopix_...") as client: result = await client.files.upload("photo.jpg", on_progress=on_progress)
