Documentation

Callback Types

Type-safe event objects passed to upload callbacks

Batch progress tracking

client.files.upload() accepts an on_progress callback for per-file read progress. For batch uploads, poll client.files.get_processing_status(file_id) or use client.files.wait_for_session(session_id, on_progress=...) after calling upload_batch(). Cloud storage, video, and document upload flows have their own dedicated callback surfaces.

UploadProgressEvent

Event fired once per file after it is read and prepared for upload

python
@dataclass(frozen=True)
class UploadProgressEvent:
file_index: int # Zero-based index of file being uploaded
uploaded_bytes: int # File size in bytes (always equals total_bytes)
total_bytes: int # Total file size in bytes
filename: Optional[str] # Name of file that was prepared
# Properties
@property
def progress_percent(self) -> float: ... # Always 100.0
# Serialization
def to_dict(self, exclude_none: bool = True) -> dict[str, Any]: ...

Document Upload Callbacks

DocumentUploadProgressEvent

Event fired once per document file after it has been uploaded to storage

python
@dataclass(frozen=True)
class DocumentUploadProgressEvent:
file_index: int # Zero-based index of the file that was uploaded
uploaded_bytes: int # Size of the uploaded file in bytes (always equals total_bytes)
total_bytes: int # Total size of the file in bytes
filename: Optional[str] # Name of the file that was uploaded
# Properties
@property
def progress_percent(self) -> float: ... # Always 100.0
# Serialization
def to_dict(self, exclude_none: bool = True) -> dict[str, Any]: ...

DocumentFileCompleteEvent

Event fired when a single document file upload completes

python
@dataclass(frozen=True)
class DocumentFileCompleteEvent:
file_index: int # Zero-based index of completed file
result: DocumentUploadResult # Document upload result with document_id, filename, etc.
def to_dict(self, exclude_none: bool = True) -> dict[str, Any]: ...

DocumentProcessingProgressEvent

Event fired as document processing (text extraction) completes during polling

python
@dataclass(frozen=True)
class DocumentProcessingProgressEvent:
completed_count: int # Documents completed so far
total_count: int # Total documents awaiting processing
latest_result: DocumentUploadResult # Most recently completed result
# Properties
@property
def progress_percent(self) -> float: ... # 0-100
@property
def remaining_count(self) -> int: ...
def to_dict(self, exclude_none: bool = True) -> dict[str, Any]: ...

DocumentProcessingFailedEvent

Event fired when document processing fails

python
@dataclass(frozen=True)
class DocumentProcessingFailedEvent:
file_index: int # Zero-based index of failed file
result: DocumentUploadResult # Result with error details populated
# Properties
@property
def error_message(self) -> Optional[str]: ...
@property
def is_retryable(self) -> bool: ...
def to_dict(self, exclude_none: bool = True) -> dict[str, Any]: ...

Video Analysis Callbacks

Video uploads go through the unified client.files.upload() path and fire the same UploadProgressEvent as other media types. The video-specific callback below is for monitoring AI scene analysis progress after upload completes.

VideoAnalysisProgressEvent

Event fired on each poll during client.video_analysis.wait_for_completion().

python
@dataclass(frozen=True)
class VideoAnalysisProgressEvent:
job_status: VideoAnalysisJobStatus # Current job status snapshot
# Properties
@property
def progress_percent(self) -> float: ... # Delegates to job_status.progress_percent
@property
def is_terminal(self) -> bool: ... # Delegates to job_status.is_terminal
def to_dict(self, exclude_none: bool = True) -> dict[str, Any]: ...

Example

python
def on_progress(status: VideoAnalysisJobStatus) -> None:
print(f"{status.progress_percent:.0f}% — {status.current_step}")
final = await client.video_analysis.wait_for_completion(
job_id,
timeout=600,
on_progress=on_progress,
)

Cloud Storage Callbacks

CloudStorageJobProgressEvent

Event fired on each poll during wait_for_job, import_and_wait, or export_and_wait

python
@dataclass(frozen=True)
class CloudStorageJobProgressEvent:
job: CloudStorageJob # Current job status snapshot
# Properties
@property
def progress_percent(self) -> float: ... # Delegates to job.progress_percent
@property
def is_terminal(self) -> bool: ... # Delegates to job.is_terminal
def to_dict(self, exclude_none: bool = True) -> dict[str, Any]: ...

Cloud Storage Callback Example

Using progress callbacks with cloud storage import

python
from scopix import Scopix
from scopix.types.callbacks import CloudStorageJobProgressEvent
def on_import_progress(event: CloudStorageJobProgressEvent):
job = event.job
print(f"Import progress: {event.progress_percent:.0f}% "
f"({job.completed_files}/{job.total_files} files, "
f"{job.failed_files} failed)")
async with Scopix(api_key="scopix_...") as client:
job = await client.cloud_storage.import_and_wait(
connection_id="conn_abc123",
files=files,
on_progress=on_import_progress,
)

Usage Example

Using the progress callback on a single file upload

python
from scopix import Scopix, UploadProgressEvent
def on_progress(event: UploadProgressEvent):
print(f"Prepared: {event.filename} ({event.total_bytes} bytes)")
async with Scopix(api_key="scopix_...") as client:
result = await client.files.upload("photo.jpg", on_progress=on_progress)