Documentation

CloudStorageResource

Access via client.cloud_storage

Auth & Connections

initiate_auth()

Initiate OAuth flow for a cloud storage provider

python
async def initiate_auth(
provider: str, # e.g. "google_drive"
*,
redirect_uri: Optional[str] = None, # Custom OAuth callback URI
) -> InitiateAuthResult
Returns: InitiateAuthResult - Contains authorization_url and state for CSRF verification

complete_auth()

Complete OAuth flow after user grants permissions

python
async def complete_auth(
provider: str, # e.g. "google_drive"
*,
code: str, # Authorization code from callback
state: str, # State parameter for CSRF verification
redirect_uri: Optional[str] = None, # Must match initiate_auth redirect_uri
) -> CompleteAuthResult
Returns: CompleteAuthResult - Contains connection and is_new flag

list_connections()

List cloud storage connections

python
async def list_connections(
*,
provider: Optional[str] = None, # Filter by provider type
active_only: bool = True, # Only return active connections
) -> ConnectionList

disconnect()

Disconnect a cloud storage account

python
async def disconnect(connection_id: str) -> DisconnectResult

Import & Export

start_import()

Start importing files from cloud storage

python
async def start_import(
connection_id: str,
files: list[Union[CloudFileInput, dict[str, Any]]],
*,
auto_describe: bool = True, # Auto-generate AI descriptions
tags: Optional[list[str]] = None, # Tags to apply to imported files
folder_id: Optional[str] = None, # Destination folder ID
) -> ImportResult
Returns: ImportResult - Contains job_id for async imports or image_ids for sync imports

start_export()

Start exporting files to cloud storage

python
async def start_export(
connection_id: str,
image_ids: list[str],
*,
folder_id: Optional[str] = None, # Target folder ID in cloud storage
folder_name: Optional[str] = None, # Create a new folder with this name
) -> ExportResult
Returns: ExportResult

Job Tracking

get_job()

Get the status of a cloud storage job

python
async def get_job(job_id: str) -> CloudStorageJob
Returns: CloudStorageJob - Current status and progress

wait_for_job()

Poll until a job reaches a terminal state

python
async def wait_for_job(
job_id: str,
*,
timeout: Optional[float] = None, # Default: config polling_timeout
poll_interval: Optional[float] = None, # Default: config polling_interval
on_progress: Optional[Callable[[CloudStorageJobProgressEvent], None]] = None,
) -> CloudStorageJob
Returns: CloudStorageJob - Final job status

Error Behavior

Raises CloudStorageError if the job fails completely. Returns normally for partial status (some files succeeded, some failed). Raises ScopixTimeoutError if the timeout is exceeded.

Convenience Methods

import_and_wait()

Import files and wait for completion (combines start_import + wait_for_job)

python
async def import_and_wait(
connection_id: str,
files: list[Union[CloudFileInput, dict[str, Any]]],
*,
auto_describe: bool = True,
tags: Optional[list[str]] = None,
folder_id: Optional[str] = None,
timeout: Optional[float] = None,
poll_interval: Optional[float] = None,
on_progress: Optional[Callable[[CloudStorageJobProgressEvent], None]] = None,
) -> CloudStorageJob
Returns: CloudStorageJob - Final job status

export_and_wait()

Export files and wait for completion (combines start_export + wait_for_job)

python
async def export_and_wait(
connection_id: str,
image_ids: list[str],
*,
folder_id: Optional[str] = None,
folder_name: Optional[str] = None,
timeout: Optional[float] = None,
poll_interval: Optional[float] = None,
on_progress: Optional[Callable[[CloudStorageJobProgressEvent], None]] = None,
) -> CloudStorageJob
Returns: CloudStorageJob - Final job status