Skip to content

S3-Compatible Services Examples

The S3 Asyncio Client works with any S3-compatible storage service. This guide demonstrates how to connect to and use various popular S3-compatible services including MinIO, DigitalOcean Spaces, Wasabi, Backblaze B2, and more.

MinIO

MinIO is a popular open-source object storage server that's S3-compatible and often used for local development and private cloud deployments.

Basic MinIO Setup

import asyncio
from s3_asyncio_client import S3Client

async def minio_example():
    # MinIO client configuration
    client = S3Client(
        access_key="minio-access-key",
        secret_key="minio-secret-key",
        region="us-east-1",  # MinIO region (can be any string)
        endpoint_url="http://localhost:9000"  # Default MinIO endpoint
    )

    # Test connection
    async with client:
        try:
            # List buckets to test connection
            result = await client.list_objects("test-bucket", max_keys=1)
            print("✓ Connected to MinIO successfully")
        except Exception as e:
            print(f"✗ MinIO connection failed: {e}")

    return client

# Run MinIO test
asyncio.run(minio_example())

MinIO with Custom Configuration

async def minio_custom_setup():
    """Connect to MinIO with custom configuration."""

    # Production MinIO setup with HTTPS
    client = S3Client(
        access_key="prod-minio-key",
        secret_key="prod-minio-secret",
        region="minio-region",
        endpoint_url="https://minio.yourcompany.com"
    )

    async with client:
        # Create a bucket if it doesn't exist
        try:
            await client.head_object("company-data", "test")
        except:
            print("Bucket 'company-data' might not exist")

        # Upload a test file
        test_data = b"Hello from MinIO!"
        result = await client.put_object(
            bucket="company-data",
            key="test/hello.txt",
            data=test_data,
            content_type="text/plain",
            metadata={
                "source": "s3-asyncio-client",
                "environment": "production"
            }
        )

        print(f"Uploaded to MinIO: {result['etag']}")

        # Download and verify
        download_result = await client.get_object(
            bucket="company-data",
            key="test/hello.txt"
        )

        print(f"Downloaded: {download_result['body'].decode()}")

        return result

MinIO Deployment Example

import os
from pathlib import Path

async def backup_to_minio():
    """Backup local files to MinIO server."""

    # MinIO server running in Docker
    # docker run -p 9000:9000 -p 9090:9090 --name minio \
    #   -e "MINIO_ROOT_USER=admin" \
    #   -e "MINIO_ROOT_PASSWORD=password123" \
    #   minio/minio server /data --console-address ":9090"

    client = S3Client(
        access_key="admin",
        secret_key="password123",
        region="us-east-1",
        endpoint_url="http://localhost:9000"
    )

    # Directory to backup
    backup_dir = Path("./documents")
    bucket_name = "file-backups"

    async with client:
        uploaded_files = []

        for file_path in backup_dir.rglob("*"):
            if file_path.is_file():
                try:
                    with open(file_path, "rb") as f:
                        file_data = f.read()

                    # Create MinIO key
                    relative_path = file_path.relative_to(backup_dir)
                    minio_key = f"backups/{relative_path}"

                    # Upload to MinIO
                    result = await client.put_object(
                        bucket=bucket_name,
                        key=minio_key,
                        data=file_data,
                        metadata={
                            "backup-date": datetime.now().isoformat(),
                            "original-path": str(file_path)
                        }
                    )

                    uploaded_files.append({
                        "local_path": str(file_path),
                        "minio_key": minio_key,
                        "etag": result["etag"]
                    })

                    print(f"✓ Backed up: {file_path} -> {minio_key}")

                except Exception as e:
                    print(f"✗ Failed to backup {file_path}: {e}")

        print(f"Backup completed: {len(uploaded_files)} files")
        return uploaded_files

DigitalOcean Spaces

DigitalOcean Spaces is a simple, reliable, and affordable object storage service.

DigitalOcean Spaces Setup

async def digitalocean_spaces_example():
    """Connect to DigitalOcean Spaces."""

    # DigitalOcean Spaces configuration
    client = S3Client(
        access_key="DO_SPACES_ACCESS_KEY",
        secret_key="DO_SPACES_SECRET_KEY",
        region="nyc3",  # DigitalOcean region
        endpoint_url="https://nyc3.digitaloceanspaces.com"
    )

    async with client:
        # Upload a file to Spaces
        image_data = open("logo.png", "rb").read()

        result = await client.put_object(
            bucket="my-app-assets",
            key="images/logo.png",
            data=image_data,
            content_type="image/png",
            metadata={
                "uploaded-from": "production-server",
                "category": "branding"
            }
        )

        print(f"Uploaded to DigitalOcean Spaces: {result['etag']}")

        # Generate public URL (if bucket is public)
        public_url = f"https://my-app-assets.nyc3.digitaloceanspaces.com/images/logo.png"
        print(f"Public URL: {public_url}")

        # Or generate presigned URL for private access
        private_url = client.generate_presigned_url(
            method="GET",
            bucket="my-app-assets",
            key="images/logo.png",
            expires_in=3600
        )
        print(f"Private URL: {private_url}")

        return result

DigitalOcean CDN Integration

async def digitalocean_cdn_setup():
    """Upload files optimized for DigitalOcean CDN."""

    client = S3Client(
        access_key=os.getenv("DO_SPACES_KEY"),
        secret_key=os.getenv("DO_SPACES_SECRET"),
        region="fra1",
        endpoint_url="https://fra1.digitaloceanspaces.com"
    )

    # Files for CDN distribution
    cdn_files = [
        {
            "local_path": "assets/css/style.css",
            "spaces_key": "cdn/css/style.css",
            "content_type": "text/css",
            "cache_control": "public, max-age=31536000"  # 1 year
        },
        {
            "local_path": "assets/js/app.js",
            "spaces_key": "cdn/js/app.js", 
            "content_type": "application/javascript",
            "cache_control": "public, max-age=86400"  # 1 day
        },
        {
            "local_path": "assets/images/hero.jpg",
            "spaces_key": "cdn/images/hero.jpg",
            "content_type": "image/jpeg",
            "cache_control": "public, max-age=604800"  # 1 week
        }
    ]

    async with client:
        for file_config in cdn_files:
            with open(file_config["local_path"], "rb") as f:
                file_data = f.read()

            # Upload with CDN-optimized headers
            result = await client.put_object(
                bucket="my-cdn-bucket",
                key=file_config["spaces_key"],
                data=file_data,
                content_type=file_config["content_type"],
                metadata={
                    "cache-control": file_config["cache_control"],
                    "cdn-optimized": "true"
                }
            )

            # CDN URL format
            cdn_url = f"https://my-cdn-bucket.fra1.cdn.digitaloceanspaces.com/{file_config['spaces_key']}"

            print(f"✓ CDN Asset: {file_config['local_path']} -> {cdn_url}")

Wasabi Hot Cloud Storage

Wasabi offers affordable S3-compatible cloud storage with no egress fees.

Wasabi Configuration

async def wasabi_example():
    """Connect to Wasabi Hot Cloud Storage."""

    # Wasabi configuration
    client = S3Client(
        access_key="WASABI_ACCESS_KEY",
        secret_key="WASABI_SECRET_KEY",
        region="us-east-1",  # Wasabi region
        endpoint_url="https://s3.wasabisys.com"
    )

    async with client:
        # Upload large files efficiently (Wasabi's strength)
        large_file_path = "database-backup.sql.gz"

        with open(large_file_path, "rb") as f:
            file_data = f.read()

        print(f"Uploading {len(file_data):,} bytes to Wasabi...")

        # Use multipart upload for large files
        result = await client.upload_file_multipart(
            bucket="database-backups",
            key=f"daily/{datetime.now().strftime('%Y-%m-%d')}/backup.sql.gz",
            data=file_data,
            part_size=50 * 1024 * 1024,  # 50MB parts
            content_type="application/gzip",
            metadata={
                "backup-type": "daily",
                "database": "production",
                "compressed": "true"
            }
        )

        print(f"✓ Uploaded to Wasabi: {result['etag']}")

        # List recent backups
        backups = await client.list_objects(
            bucket="database-backups",
            prefix="daily/",
            max_keys=10
        )

        print(f"Recent backups ({len(backups['objects'])}):")
        for backup in backups["objects"]:
            size_mb = backup["size"] / 1024 / 1024
            print(f"  {backup['key']} ({size_mb:.1f} MB)")

        return result

Wasabi Archive Solution

async def wasabi_archive_system():
    """Long-term archival system using Wasabi."""

    client = S3Client(
        access_key=os.getenv("WASABI_ACCESS_KEY"),
        secret_key=os.getenv("WASABI_SECRET_KEY"),
        region="us-central-1",
        endpoint_url="https://s3.us-central-1.wasabisys.com"
    )

    # Archive old files by year
    archive_configs = {
        "documents": {
            "local_path": "./old_documents",
            "retention_years": 7
        },
        "media": {
            "local_path": "./old_media",
            "retention_years": 3
        },
        "logs": {
            "local_path": "./old_logs",
            "retention_years": 1
        }
    }

    async with client:
        for archive_type, config in archive_configs.items():
            archive_path = Path(config["local_path"])

            if not archive_path.exists():
                continue

            print(f"Archiving {archive_type}...")

            for file_path in archive_path.rglob("*"):
                if file_path.is_file():
                    # Determine archive date
                    file_date = datetime.fromtimestamp(file_path.stat().st_mtime)
                    archive_key = f"archives/{archive_type}/{file_date.year}/{file_path.name}"

                    try:
                        with open(file_path, "rb") as f:
                            file_data = f.read()

                        await client.put_object(
                            bucket="long-term-archive",
                            key=archive_key,
                            data=file_data,
                            metadata={
                                "archive-date": datetime.now().isoformat(),
                                "original-path": str(file_path),
                                "retention-years": str(config["retention_years"]),
                                "archive-type": archive_type
                            }
                        )

                        print(f"  ✓ Archived: {file_path.name}")

                    except Exception as e:
                        print(f"  ✗ Failed: {file_path.name} - {e}")

Backblaze B2

Backblaze B2 provides affordable S3-compatible storage with a simple pricing model.

Backblaze B2 Setup

async def backblaze_b2_example():
    """Connect to Backblaze B2 storage."""

    # Backblaze B2 configuration
    client = S3Client(
        access_key="B2_APPLICATION_KEY_ID",
        secret_key="B2_APPLICATION_KEY", 
        region="us-west-002",  # Backblaze region
        endpoint_url="https://s3.us-west-002.backblazeb2.com"
    )

    async with client:
        # Upload with lifecycle-aware metadata
        document_data = open("important-contract.pdf", "rb").read()

        result = await client.put_object(
            bucket="legal-documents",
            key="contracts/2024/important-contract.pdf",
            data=document_data,
            content_type="application/pdf",
            metadata={
                "document-type": "contract",
                "retention-period": "7-years",
                "classification": "confidential",
                "uploaded-by": "legal-team"
            }
        )

        print(f"✓ Uploaded to Backblaze B2: {result['etag']}")

        # Generate secure download link
        download_url = client.generate_presigned_url(
            method="GET",
            bucket="legal-documents",
            key="contracts/2024/important-contract.pdf",
            expires_in=24 * 3600  # 24 hours
        )

        print(f"Secure download URL: {download_url}")

        return result

IBM Cloud Object Storage

IBM Cloud Object Storage is enterprise-grade S3-compatible storage.

IBM Cloud Setup

async def ibm_cloud_example():
    """Connect to IBM Cloud Object Storage."""

    # IBM Cloud Object Storage configuration
    client = S3Client(
        access_key="IBM_CLOUD_ACCESS_KEY",
        secret_key="IBM_CLOUD_SECRET_KEY",
        region="us-south",
        endpoint_url="https://s3.us-south.cloud-object-storage.appdomain.cloud"
    )

    async with client:
        # Enterprise data upload with compliance metadata
        sensitive_data = b"Confidential business data..."

        result = await client.put_object(
            bucket="enterprise-data",
            key="compliance/financial/q4-2024-report.json",
            data=sensitive_data,
            content_type="application/json",
            metadata={
                "data-classification": "confidential",
                "compliance-requirement": "SOX",
                "retention-period": "7-years",
                "encryption-required": "true",
                "access-level": "executive-only"
            }
        )

        print(f"✓ Uploaded to IBM Cloud: {result['etag']}")

        return result

Oracle Cloud Infrastructure (OCI)

Oracle Cloud Infrastructure Object Storage with S3 compatibility.

OCI Configuration

async def oci_example():
    """Connect to Oracle Cloud Infrastructure Object Storage."""

    # OCI Object Storage configuration
    client = S3Client(
        access_key="OCI_ACCESS_KEY",
        secret_key="OCI_SECRET_KEY",
        region="us-ashburn-1",
        endpoint_url="https://namespace.compat.objectstorage.us-ashburn-1.oraclecloud.com"
    )

    async with client:
        # Upload with OCI-specific optimizations
        database_export = open("oracle_export.dmp", "rb").read()

        result = await client.upload_file_multipart(
            bucket="database-exports",
            key=f"exports/{datetime.now().strftime('%Y/%m/%d')}/oracle_export.dmp",
            data=database_export,
            part_size=100 * 1024 * 1024,  # 100MB parts for OCI
            content_type="application/octet-stream",
            metadata={
                "export-type": "full",
                "database-version": "19c",
                "export-tool": "expdp"
            }
        )

        print(f"✓ Uploaded to OCI: {result['etag']}")

        return result

Multi-Cloud Storage Strategy

Cloud-Agnostic Storage Manager

from enum import Enum
from dataclasses import dataclass
from typing import Dict, List

class CloudProvider(Enum):
    AWS_S3 = "aws_s3"
    MINIO = "minio"
    DIGITALOCEAN = "digitalocean"
    WASABI = "wasabi"
    BACKBLAZE = "backblaze"
    IBM_CLOUD = "ibm_cloud"
    ORACLE_CLOUD = "oracle_cloud"

@dataclass
class CloudConfig:
    provider: CloudProvider
    access_key: str
    secret_key: str
    region: str
    endpoint_url: str
    bucket: str

class MultiCloudStorageManager:
    """Manage files across multiple S3-compatible cloud providers."""

    def __init__(self):
        self.clients: Dict[CloudProvider, S3Client] = {}
        self.configs: Dict[CloudProvider, CloudConfig] = {}

    def add_provider(self, config: CloudConfig):
        """Add a cloud storage provider."""
        self.configs[config.provider] = config

        client = S3Client(
            access_key=config.access_key,
            secret_key=config.secret_key,
            region=config.region,
            endpoint_url=config.endpoint_url
        )

        self.clients[config.provider] = client

    async def upload_to_multiple(
        self,
        file_data: bytes,
        key: str,
        providers: List[CloudProvider],
        content_type: str = "application/octet-stream",
        metadata: Dict[str, str] = None
    ) -> Dict[CloudProvider, Dict]:
        """Upload file to multiple cloud providers."""

        results = {}

        for provider in providers:
            if provider not in self.clients:
                continue

            try:
                client = self.clients[provider]
                config = self.configs[provider]

                async with client:
                    result = await client.put_object(
                        bucket=config.bucket,
                        key=key,
                        data=file_data,
                        content_type=content_type,
                        metadata={
                            **(metadata or {}),
                            "provider": provider.value,
                            "multi-cloud-upload": "true"
                        }
                    )

                    results[provider] = {
                        "success": True,
                        "etag": result["etag"],
                        "provider": provider.value
                    }

                    print(f"✓ Uploaded to {provider.value}: {key}")

            except Exception as e:
                results[provider] = {
                    "success": False,
                    "error": str(e),
                    "provider": provider.value
                }

                print(f"✗ Failed to upload to {provider.value}: {e}")

        return results

    async def download_from_best_provider(
        self,
        key: str,
        preferred_order: List[CloudProvider] = None
    ) -> tuple[bytes, CloudProvider]:
        """Download file from the first available provider."""

        if not preferred_order:
            preferred_order = list(self.clients.keys())

        for provider in preferred_order:
            if provider not in self.clients:
                continue

            try:
                client = self.clients[provider]
                config = self.configs[provider]

                async with client:
                    result = await client.get_object(
                        bucket=config.bucket,
                        key=key
                    )

                    print(f"✓ Downloaded from {provider.value}: {key}")
                    return result["body"], provider

            except Exception as e:
                print(f"✗ Failed to download from {provider.value}: {e}")
                continue

        raise Exception("File not found in any configured provider")

    async def sync_across_providers(
        self,
        source_provider: CloudProvider,
        target_providers: List[CloudProvider],
        key_prefix: str = ""
    ) -> Dict:
        """Sync files from one provider to others."""

        if source_provider not in self.clients:
            raise ValueError(f"Source provider {source_provider} not configured")

        source_client = self.clients[source_provider]
        source_config = self.configs[source_provider]

        sync_results = {"synced": [], "failed": []}

        async with source_client:
            # List files to sync
            files_result = await source_client.list_objects(
                bucket=source_config.bucket,
                prefix=key_prefix,
                max_keys=1000
            )

            for file_obj in files_result["objects"]:
                key = file_obj["key"]

                try:
                    # Download from source
                    download_result = await source_client.get_object(
                        bucket=source_config.bucket,
                        key=key
                    )

                    # Upload to target providers
                    upload_results = await self.upload_to_multiple(
                        file_data=download_result["body"],
                        key=key,
                        providers=target_providers,
                        content_type=download_result["content_type"],
                        metadata={
                            "synced-from": source_provider.value,
                            "sync-date": datetime.now().isoformat()
                        }
                    )

                    sync_results["synced"].append({
                        "key": key,
                        "results": upload_results
                    })

                except Exception as e:
                    sync_results["failed"].append({
                        "key": key,
                        "error": str(e)
                    })

        return sync_results

# Example usage
async def multi_cloud_example():
    """Example of multi-cloud storage management."""

    manager = MultiCloudStorageManager()

    # Configure multiple providers
    manager.add_provider(CloudConfig(
        provider=CloudProvider.AWS_S3,
        access_key="aws-key",
        secret_key="aws-secret",
        region="us-east-1",
        endpoint_url="https://s3.amazonaws.com",
        bucket="aws-backup"
    ))

    manager.add_provider(CloudConfig(
        provider=CloudProvider.MINIO,
        access_key="minio-key",
        secret_key="minio-secret", 
        region="us-east-1",
        endpoint_url="http://localhost:9000",
        bucket="minio-backup"
    ))

    manager.add_provider(CloudConfig(
        provider=CloudProvider.DIGITALOCEAN,
        access_key="do-key",
        secret_key="do-secret",
        region="nyc3",
        endpoint_url="https://nyc3.digitaloceanspaces.com",
        bucket="do-backup"
    ))

    # Upload to multiple providers for redundancy
    important_data = b"Critical business data that needs multiple backups"

    results = await manager.upload_to_multiple(
        file_data=important_data,
        key="critical/business-data.bin",
        providers=[
            CloudProvider.AWS_S3,
            CloudProvider.MINIO,
            CloudProvider.DIGITALOCEAN
        ],
        metadata={
            "importance": "critical",
            "backup-strategy": "multi-cloud"
        }
    )

    print("Multi-cloud upload results:")
    for provider, result in results.items():
        status = "✓" if result["success"] else "✗"
        print(f"  {status} {provider.value}: {result}")

    # Download from best available provider
    try:
        data, provider = await manager.download_from_best_provider(
            key="critical/business-data.bin",
            preferred_order=[
                CloudProvider.MINIO,  # Try local first
                CloudProvider.DIGITALOCEAN,  # Then regional
                CloudProvider.AWS_S3  # Finally global
            ]
        )
        print(f"Successfully downloaded from {provider.value}")
    except Exception as e:
        print(f"Download failed: {e}")

# Run multi-cloud example
if __name__ == "__main__":
    asyncio.run(multi_cloud_example())

Configuration Management

Environment-Based Configuration

import os
from typing import Optional

class S3CompatibleConfig:
    """Centralized configuration for S3-compatible services."""

    # Service endpoint configurations
    ENDPOINTS = {
        "aws": "https://s3.{region}.amazonaws.com",
        "minio": os.getenv("MINIO_ENDPOINT", "http://localhost:9000"),
        "digitalocean": "https://{region}.digitaloceanspaces.com",
        "wasabi": "https://s3.wasabisys.com",
        "backblaze": "https://s3.{region}.backblazeb2.com",
        "ibm": "https://s3.{region}.cloud-object-storage.appdomain.cloud",
        "oracle": "https://namespace.compat.objectstorage.{region}.oraclecloud.com"
    }

    @classmethod
    def create_client(
        cls,
        service: str,
        access_key: Optional[str] = None,
        secret_key: Optional[str] = None,
        region: str = "us-east-1",
        endpoint_url: Optional[str] = None
    ) -> S3Client:
        """Create S3Client for specific service."""

        # Get credentials from environment if not provided
        if not access_key:
            access_key = os.getenv(f"{service.upper()}_ACCESS_KEY")
        if not secret_key:
            secret_key = os.getenv(f"{service.upper()}_SECRET_KEY")

        if not access_key or not secret_key:
            raise ValueError(f"Missing credentials for {service}")

        # Determine endpoint URL
        if not endpoint_url:
            if service in cls.ENDPOINTS:
                endpoint_url = cls.ENDPOINTS[service].format(region=region)
            else:
                raise ValueError(f"Unknown service: {service}")

        return S3Client(
            access_key=access_key,
            secret_key=secret_key,
            region=region,
            endpoint_url=endpoint_url
        )

# Usage examples
async def config_examples():
    """Examples using centralized configuration."""

    # AWS S3
    aws_client = S3CompatibleConfig.create_client(
        service="aws",
        region="us-west-2"
    )

    # MinIO
    minio_client = S3CompatibleConfig.create_client(
        service="minio"
    )

    # DigitalOcean Spaces
    do_client = S3CompatibleConfig.create_client(
        service="digitalocean",
        region="fra1"
    )

    # Test all clients
    clients = [
        ("AWS S3", aws_client),
        ("MinIO", minio_client),
        ("DigitalOcean", do_client)
    ]

    for name, client in clients:
        try:
            async with client:
                # Simple connectivity test
                result = await client.list_objects("test", max_keys=1)
                print(f"✓ {name}: Connected successfully")
        except Exception as e:
            print(f"✗ {name}: {e}")

# Environment variables setup example
"""
# .env file
AWS_ACCESS_KEY=your-aws-access-key
AWS_SECRET_KEY=your-aws-secret-key

MINIO_ACCESS_KEY=admin
MINIO_SECRET_KEY=password123
MINIO_ENDPOINT=http://localhost:9000

DIGITALOCEAN_ACCESS_KEY=your-do-spaces-key
DIGITALOCEAN_SECRET_KEY=your-do-spaces-secret

WASABI_ACCESS_KEY=your-wasabi-key
WASABI_SECRET_KEY=your-wasabi-secret
"""

This comprehensive guide demonstrates how to use the S3 Asyncio Client with various S3-compatible services. Each service has its own strengths and use cases, and the client works seamlessly with all of them using the same simple interface.