Skip to content

Presigned URL Examples

Presigned URLs allow you to grant temporary access to S3 objects without exposing your AWS credentials. This is essential for web applications, mobile apps, and third-party integrations.

What are Presigned URLs?

Presigned URLs are time-limited URLs that include authentication information, allowing anyone with the URL to perform specific S3 operations (GET, PUT, etc.) without needing AWS credentials.

Common Use Cases

  • Direct uploads from web browsers: Let users upload files directly to S3
  • Secure file sharing: Share files temporarily without making them public
  • Mobile app file access: Allow mobile apps to access S3 without embedded credentials
  • Third-party integrations: Grant temporary access to external services
  • Content delivery: Serve private content through CDNs

Basic Presigned URL Generation

Download URLs (GET)

import asyncio
from s3_asyncio_client import S3Client

async def create_download_urls():
    client = S3Client(
        access_key="your-access-key",
        secret_key="your-secret-key",
        region="us-east-1"
    )

    # Create a presigned URL for downloading a file
    download_url = client.generate_presigned_url(
        method="GET",
        bucket="my-private-bucket",
        key="documents/secret-report.pdf",
        expires_in=3600  # 1 hour
    )

    print(f"Download URL (valid for 1 hour):")
    print(download_url)

    # URL can now be used by anyone to download the file
    # Example: curl "https://my-private-bucket.s3.amazonaws.com/documents/secret-report.pdf?X-Amz-Algorithm=..."

    return download_url

# Generate download URL
asyncio.run(create_download_urls())

Upload URLs (PUT)

async def create_upload_urls():
    client = S3Client(
        access_key="your-access-key",
        secret_key="your-secret-key",
        region="us-east-1"
    )

    # Create a presigned URL for uploading a file
    upload_url = client.generate_presigned_url(
        method="PUT",
        bucket="user-uploads",
        key="uploads/user-123/profile-photo.jpg",
        expires_in=1800,  # 30 minutes
        params={
            "Content-Type": "image/jpeg"
        }
    )

    print(f"Upload URL (valid for 30 minutes):")
    print(upload_url)

    # Client can upload using:
    # curl -X PUT "upload_url" -H "Content-Type: image/jpeg" --data-binary "@photo.jpg"

    return upload_url

Web Application Integration

Backend API for File Uploads

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import uuid
from datetime import datetime

app = FastAPI()

class UploadRequest(BaseModel):
    filename: str
    content_type: str
    file_size: int

class UploadResponse(BaseModel):
    upload_url: str
    key: str
    expires_in: int

@app.post("/api/get-upload-url", response_model=UploadResponse)
async def get_upload_url(request: UploadRequest):
    """Generate presigned URL for file upload."""

    # Validate file type
    allowed_types = ["image/jpeg", "image/png", "application/pdf", "text/plain"]
    if request.content_type not in allowed_types:
        raise HTTPException(status_code=400, detail="File type not allowed")

    # Validate file size (10MB limit)
    if request.file_size > 10 * 1024 * 1024:
        raise HTTPException(status_code=400, detail="File too large")

    # Create unique key
    file_extension = request.filename.split('.')[-1]
    unique_key = f"uploads/{datetime.now().strftime('%Y/%m/%d')}/{uuid.uuid4()}.{file_extension}"

    # Generate presigned URL
    client = S3Client(
        access_key="your-access-key",
        secret_key="your-secret-key"
    )

    upload_url = client.generate_presigned_url(
        method="PUT",
        bucket="user-uploads",
        key=unique_key,
        expires_in=3600,  # 1 hour
        params={
            "Content-Type": request.content_type,
            "Content-Length": str(request.file_size)
        }
    )

    return UploadResponse(
        upload_url=upload_url,
        key=unique_key,
        expires_in=3600
    )

@app.post("/api/confirm-upload")
async def confirm_upload(key: str):
    """Confirm that upload was completed successfully."""
    client = S3Client(
        access_key="your-access-key",
        secret_key="your-secret-key"
    )

    try:
        # Verify file exists
        async with client:
            metadata = await client.head_object(
                bucket="user-uploads",
                key=key
            )

        return {
            "status": "success",
            "file_size": metadata["content_length"],
            "upload_time": metadata["last_modified"]
        }
    except Exception as e:
        raise HTTPException(status_code=404, detail="Upload not found or failed")

Frontend JavaScript Integration

<!DOCTYPE html>
<html>
<head>
    <title>Direct S3 Upload</title>
</head>
<body>
    <input type="file" id="fileInput" />
    <button onclick="uploadFile()">Upload to S3</button>
    <div id="progress"></div>

    <script>
    async function uploadFile() {
        const fileInput = document.getElementById('fileInput');
        const progressDiv = document.getElementById('progress');

        if (!fileInput.files[0]) {
            alert('Please select a file');
            return;
        }

        const file = fileInput.files[0];

        try {
            // Step 1: Get presigned URL from backend
            progressDiv.innerHTML = 'Getting upload URL...';

            const response = await fetch('/api/get-upload-url', {
                method: 'POST',
                headers: {
                    'Content-Type': 'application/json',
                },
                body: JSON.stringify({
                    filename: file.name,
                    content_type: file.type,
                    file_size: file.size
                })
            });

            if (!response.ok) {
                throw new Error('Failed to get upload URL');
            }

            const { upload_url, key } = await response.json();

            // Step 2: Upload directly to S3
            progressDiv.innerHTML = 'Uploading to S3...';

            const uploadResponse = await fetch(upload_url, {
                method: 'PUT',
                headers: {
                    'Content-Type': file.type,
                },
                body: file
            });

            if (!uploadResponse.ok) {
                throw new Error('Upload failed');
            }

            // Step 3: Confirm upload with backend
            progressDiv.innerHTML = 'Confirming upload...';

            const confirmResponse = await fetch('/api/confirm-upload', {
                method: 'POST',
                headers: {
                    'Content-Type': 'application/json',
                },
                body: JSON.stringify({ key })
            });

            if (confirmResponse.ok) {
                progressDiv.innerHTML = 'Upload successful!';
            } else {
                throw new Error('Upload confirmation failed');
            }

        } catch (error) {
            progressDiv.innerHTML = `Error: ${error.message}`;
            console.error('Upload error:', error);
        }
    }
    </script>
</body>
</html>

Advanced Presigned URL Scenarios

Secure File Sharing with Expiration

async def create_secure_share_link():
    """Create secure, time-limited sharing links."""
    client = S3Client(
        access_key="your-access-key",
        secret_key="your-secret-key"
    )

    # Create different expiration times based on file sensitivity
    file_configs = [
        {
            "key": "public-documents/newsletter.pdf",
            "expires_in": 7 * 24 * 3600,  # 7 days
            "description": "Newsletter (public)"
        },
        {
            "key": "confidential/financial-report.pdf", 
            "expires_in": 2 * 3600,  # 2 hours
            "description": "Financial report (confidential)"
        },
        {
            "key": "temp-files/meeting-recording.mp4",
            "expires_in": 24 * 3600,  # 24 hours
            "description": "Meeting recording (temporary)"
        }
    ]

    share_links = []

    for config in file_configs:
        # Generate download URL with custom expiration
        download_url = client.generate_presigned_url(
            method="GET",
            bucket="secure-documents",
            key=config["key"],
            expires_in=config["expires_in"],
            params={
                "response-content-disposition": f"attachment; filename=\"{config['key'].split('/')[-1]}\""
            }
        )

        share_links.append({
            "description": config["description"],
            "url": download_url,
            "expires_hours": config["expires_in"] // 3600
        })

        print(f"{config['description']}:")
        print(f"  URL: {download_url}")
        print(f"  Expires in: {config['expires_in'] // 3600} hours")
        print()

    return share_links

Batch Presigned URL Generation

async def generate_batch_urls():
    """Generate presigned URLs for multiple files efficiently."""
    client = S3Client(
        access_key="your-access-key",
        secret_key="your-secret-key"
    )

    # List of files to create URLs for
    files_to_share = [
        "reports/2024/january.pdf",
        "reports/2024/february.pdf", 
        "reports/2024/march.pdf",
        "reports/2024/april.pdf",
        "images/chart1.png",
        "images/chart2.png",
        "data/raw-data.csv"
    ]

    # Generate URLs for all files
    batch_urls = {}

    for file_key in files_to_share:
        # Different expiration based on file type
        if file_key.endswith('.pdf'):
            expires_in = 48 * 3600  # 48 hours for PDFs
        elif file_key.endswith('.png'):
            expires_in = 7 * 24 * 3600  # 7 days for images
        else:
            expires_in = 24 * 3600  # 24 hours for others

        url = client.generate_presigned_url(
            method="GET",
            bucket="report-sharing",
            key=file_key,
            expires_in=expires_in
        )

        batch_urls[file_key] = {
            "url": url,
            "expires_in_hours": expires_in // 3600
        }

    # Output as JSON for API response
    import json
    print(json.dumps(batch_urls, indent=2))

    return batch_urls

Mobile App Integration

from typing import Dict, List
import jwt
from datetime import datetime, timedelta

class MobileAppURLService:
    """Service for generating presigned URLs for mobile apps."""

    def __init__(self):
        self.client = S3Client(
            access_key="your-access-key",
            secret_key="your-secret-key"
        )
        self.jwt_secret = "your-jwt-secret"

    def verify_mobile_token(self, token: str) -> Dict:
        """Verify mobile app JWT token."""
        try:
            payload = jwt.decode(token, self.jwt_secret, algorithms=["HS256"])
            return payload
        except jwt.ExpiredSignatureError:
            raise Exception("Token expired")
        except jwt.InvalidTokenError:
            raise Exception("Invalid token")

    async def get_user_upload_urls(self, auth_token: str, file_requests: List[Dict]) -> Dict:
        """Generate presigned URLs for authenticated mobile user."""

        # Verify user authentication
        user_data = self.verify_mobile_token(auth_token)
        user_id = user_data["user_id"]

        upload_urls = []

        for file_request in file_requests:
            filename = file_request["filename"]
            content_type = file_request["content_type"]
            file_size = file_request.get("file_size", 0)

            # Validate file size (mobile apps often have limits)
            if file_size > 50 * 1024 * 1024:  # 50MB limit
                continue

            # Create user-specific key
            file_extension = filename.split('.')[-1] if '.' in filename else 'bin'
            unique_key = f"mobile-uploads/{user_id}/{datetime.now().strftime('%Y/%m/%d')}/{uuid.uuid4()}.{file_extension}"

            # Generate presigned URL
            upload_url = self.client.generate_presigned_url(
                method="PUT",
                bucket="mobile-app-uploads",
                key=unique_key,
                expires_in=1800,  # 30 minutes
                params={
                    "Content-Type": content_type
                }
            )

            upload_urls.append({
                "original_filename": filename,
                "upload_url": upload_url,
                "key": unique_key,
                "expires_at": (datetime.now() + timedelta(minutes=30)).isoformat()
            })

        return {
            "user_id": user_id,
            "upload_urls": upload_urls,
            "expires_in": 1800
        }

    async def get_user_download_urls(self, auth_token: str, file_keys: List[str]) -> Dict:
        """Generate presigned URLs for user to download their files."""

        user_data = self.verify_mobile_token(auth_token)
        user_id = user_data["user_id"]

        download_urls = []

        for key in file_keys:
            # Verify user owns this file
            if not key.startswith(f"mobile-uploads/{user_id}/"):
                continue  # Skip files not owned by user

            # Check if file exists
            try:
                async with self.client:
                    await self.client.head_object("mobile-app-uploads", key)
            except:
                continue  # Skip non-existent files

            # Generate download URL
            download_url = self.client.generate_presigned_url(
                method="GET",
                bucket="mobile-app-uploads",
                key=key,
                expires_in=3600  # 1 hour
            )

            download_urls.append({
                "key": key,
                "download_url": download_url,
                "expires_at": (datetime.now() + timedelta(hours=1)).isoformat()
            })

        return {
            "user_id": user_id,
            "download_urls": download_urls
        }

# Example usage in mobile API
mobile_service = MobileAppURLService()

@app.post("/mobile/upload-urls")
async def mobile_upload_urls(auth_token: str, files: List[Dict]):
    try:
        result = await mobile_service.get_user_upload_urls(auth_token, files)
        return result
    except Exception as e:
        raise HTTPException(status_code=401, detail=str(e))

CDN Integration with Presigned URLs

async def create_cdn_compatible_urls():
    """Create presigned URLs that work with CloudFront CDN."""
    client = S3Client(
        access_key="your-access-key",
        secret_key="your-secret-key"
    )

    # For files that need CDN caching
    media_files = [
        "media/images/hero-banner.jpg",
        "media/videos/product-demo.mp4",
        "media/documents/user-manual.pdf"
    ]

    cdn_urls = []

    for file_key in media_files:
        # Create presigned URL with cache-friendly parameters
        presigned_url = client.generate_presigned_url(
            method="GET",
            bucket="cdn-content",
            key=file_key,
            expires_in=24 * 3600,  # 24 hours
            params={
                "response-cache-control": "public, max-age=3600",  # 1 hour cache
                "response-content-type": "application/octet-stream"
            }
        )

        # Convert S3 URL to CloudFront URL
        cloudfront_domain = "d1234567890.cloudfront.net"
        s3_domain = "cdn-content.s3.amazonaws.com"

        cdn_url = presigned_url.replace(s3_domain, cloudfront_domain)

        cdn_urls.append({
            "file": file_key,
            "s3_url": presigned_url,
            "cdn_url": cdn_url
        })

        print(f"File: {file_key}")
        print(f"  S3 URL: {presigned_url}")
        print(f"  CDN URL: {cdn_url}")
        print()

    return cdn_urls

Security Best Practices

URL Validation and Sanitization

import re
from urllib.parse import urlparse, parse_qs

class SecurePresignedURLGenerator:
    """Secure presigned URL generator with validation."""

    def __init__(self):
        self.client = S3Client(
            access_key="your-access-key",
            secret_key="your-secret-key"
        )

        # Allowed file extensions
        self.allowed_extensions = {
            'images': ['.jpg', '.jpeg', '.png', '.gif', '.webp'],
            'documents': ['.pdf', '.doc', '.docx', '.txt'],
            'videos': ['.mp4', '.mov', '.avi'],
            'data': ['.csv', '.json', '.xml']
        }

    def validate_file_key(self, key: str, category: str) -> bool:
        """Validate file key for security."""

        # Check for path traversal attempts
        if '..' in key or key.startswith('/'):
            return False

        # Check file extension
        file_ext = '.' + key.split('.')[-1].lower() if '.' in key else ''
        if file_ext not in self.allowed_extensions.get(category, []):
            return False

        # Check key length
        if len(key) > 1000:
            return False

        # Check for suspicious characters
        if re.search(r'[<>:"|?*]', key):
            return False

        return True

    def sanitize_filename(self, filename: str) -> str:
        """Sanitize filename for S3 key."""
        # Remove or replace dangerous characters
        sanitized = re.sub(r'[<>:"|?*\\]', '_', filename)
        sanitized = re.sub(r'\.\.+', '.', sanitized)  # Remove multiple dots
        sanitized = sanitized.strip('. ')  # Remove leading/trailing dots and spaces

        return sanitized[:255]  # Limit length

    async def create_secure_upload_url(
        self, 
        user_id: str, 
        filename: str, 
        category: str,
        max_file_size: int = 10 * 1024 * 1024  # 10MB default
    ) -> Dict:
        """Create secure presigned upload URL."""

        # Sanitize filename
        safe_filename = self.sanitize_filename(filename)

        # Create secure key
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        unique_id = str(uuid.uuid4())[:8]
        secure_key = f"uploads/{category}/{user_id}/{timestamp}_{unique_id}_{safe_filename}"

        # Validate key
        if not self.validate_file_key(secure_key, category):
            raise ValueError("Invalid file for upload")

        # Generate presigned URL with size limit
        upload_url = self.client.generate_presigned_url(
            method="PUT",
            bucket="secure-uploads",
            key=secure_key,
            expires_in=1800,  # 30 minutes
            params={
                "Content-Length": str(max_file_size)  # Enforce size limit
            }
        )

        return {
            "upload_url": upload_url,
            "key": secure_key,
            "max_size": max_file_size,
            "expires_in": 1800,
            "original_filename": filename,
            "sanitized_filename": safe_filename
        }

# Usage example
secure_generator = SecurePresignedURLGenerator()

@app.post("/secure-upload-url")
async def get_secure_upload_url(
    user_id: str,
    filename: str,
    category: str,
    max_size: int = 10 * 1024 * 1024
):
    try:
        result = await secure_generator.create_secure_upload_url(
            user_id=user_id,
            filename=filename,
            category=category,
            max_file_size=max_size
        )
        return result
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))

Rate Limiting for Presigned URLs

import time
from collections import defaultdict, deque

class RateLimitedURLGenerator:
    """Presigned URL generator with rate limiting."""

    def __init__(self):
        self.client = S3Client(
            access_key="your-access-key",
            secret_key="your-secret-key"
        )

        # Rate limiting: max 10 URLs per minute per user
        self.rate_limit = 10
        self.time_window = 60  # seconds
        self.user_requests = defaultdict(deque)

    def check_rate_limit(self, user_id: str) -> bool:
        """Check if user is within rate limits."""
        now = time.time()
        user_queue = self.user_requests[user_id]

        # Remove old requests outside time window
        while user_queue and now - user_queue[0] > self.time_window:
            user_queue.popleft()

        # Check if under limit
        if len(user_queue) >= self.rate_limit:
            return False

        # Add current request
        user_queue.append(now)
        return True

    async def create_rate_limited_url(self, user_id: str, **kwargs) -> Dict:
        """Create presigned URL with rate limiting."""

        if not self.check_rate_limit(user_id):
            raise Exception(f"Rate limit exceeded. Max {self.rate_limit} requests per {self.time_window} seconds")

        # Generate URL normally
        url = self.client.generate_presigned_url(**kwargs)

        return {
            "url": url,
            "user_id": user_id,
            "created_at": datetime.now().isoformat()
        }

# Usage
rate_limited_generator = RateLimitedURLGenerator()

@app.post("/rate-limited-url")
async def get_rate_limited_url(user_id: str, bucket: str, key: str):
    try:
        result = await rate_limited_generator.create_rate_limited_url(
            user_id=user_id,
            method="GET",
            bucket=bucket,
            key=key,
            expires_in=3600
        )
        return result
    except Exception as e:
        raise HTTPException(status_code=429, detail=str(e))

Complete Example: File Sharing Service

import asyncio
import sqlite3
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import Optional

@dataclass
class SharedFile:
    id: str
    owner_id: str
    file_key: str
    bucket: str
    expires_at: datetime
    download_count: int
    max_downloads: Optional[int]
    password: Optional[str]

class FileShareService:
    """Complete file sharing service using presigned URLs."""

    def __init__(self):
        self.client = S3Client(
            access_key="your-access-key",
            secret_key="your-secret-key"
        )
        self.db = sqlite3.connect("file_shares.db")
        self._init_db()

    def _init_db(self):
        """Initialize database tables."""
        self.db.execute("""
            CREATE TABLE IF NOT EXISTS shared_files (
                id TEXT PRIMARY KEY,
                owner_id TEXT NOT NULL,
                file_key TEXT NOT NULL,
                bucket TEXT NOT NULL,
                expires_at TIMESTAMP NOT NULL,
                download_count INTEGER DEFAULT 0,
                max_downloads INTEGER,
                password TEXT,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """)
        self.db.commit()

    async def create_share_link(
        self,
        owner_id: str,
        bucket: str,
        file_key: str,
        expires_hours: int = 24,
        max_downloads: Optional[int] = None,
        password: Optional[str] = None
    ) -> Dict:
        """Create a shareable link for a file."""

        # Verify file exists
        async with self.client:
            try:
                file_info = await self.client.head_object(bucket, file_key)
            except Exception:
                raise ValueError("File not found")

        # Generate unique share ID
        share_id = str(uuid.uuid4())
        expires_at = datetime.now() + timedelta(hours=expires_hours)

        # Store share information
        self.db.execute(
            """INSERT INTO shared_files 
               (id, owner_id, file_key, bucket, expires_at, max_downloads, password)
               VALUES (?, ?, ?, ?, ?, ?, ?)""",
            (share_id, owner_id, file_key, bucket, expires_at, max_downloads, password)
        )
        self.db.commit()

        # Create share URL (not the actual S3 URL yet)
        share_url = f"https://yourapp.com/share/{share_id}"

        return {
            "share_id": share_id,
            "share_url": share_url,
            "expires_at": expires_at.isoformat(),
            "max_downloads": max_downloads,
            "password_protected": password is not None,
            "file_info": {
                "key": file_key,
                "size": file_info["content_length"],
                "content_type": file_info["content_type"]
            }
        }

    async def get_download_url(
        self,
        share_id: str,
        password: Optional[str] = None
    ) -> Dict:
        """Get actual download URL for a shared file."""

        # Get share information
        cursor = self.db.execute(
            "SELECT * FROM shared_files WHERE id = ?",
            (share_id,)
        )
        row = cursor.fetchone()

        if not row:
            raise ValueError("Share not found")

        # Parse row data
        (id, owner_id, file_key, bucket, expires_at_str, 
         download_count, max_downloads, stored_password) = row[:8]

        expires_at = datetime.fromisoformat(expires_at_str)

        # Check if expired
        if datetime.now() > expires_at:
            raise ValueError("Share has expired")

        # Check download limit
        if max_downloads and download_count >= max_downloads:
            raise ValueError("Download limit reached")

        # Check password
        if stored_password and password != stored_password:
            raise ValueError("Invalid password")

        # Generate presigned URL
        download_url = self.client.generate_presigned_url(
            method="GET",
            bucket=bucket,
            key=file_key,
            expires_in=3600,  # 1 hour
            params={
                "response-content-disposition": f"attachment; filename=\"{file_key.split('/')[-1]}\""
            }
        )

        # Increment download count
        self.db.execute(
            "UPDATE shared_files SET download_count = download_count + 1 WHERE id = ?",
            (share_id,)
        )
        self.db.commit()

        return {
            "download_url": download_url,
            "filename": file_key.split('/')[-1],
            "expires_in": 3600,
            "downloads_remaining": max_downloads - download_count - 1 if max_downloads else None
        }

    async def list_user_shares(self, owner_id: str) -> List[Dict]:
        """List all shares created by a user."""
        cursor = self.db.execute(
            """SELECT id, file_key, expires_at, download_count, max_downloads
               FROM shared_files WHERE owner_id = ?
               ORDER BY created_at DESC""",
            (owner_id,)
        )

        shares = []
        for row in cursor.fetchall():
            share_id, file_key, expires_at_str, download_count, max_downloads = row

            shares.append({
                "share_id": share_id,
                "filename": file_key.split('/')[-1],
                "expires_at": expires_at_str,
                "download_count": download_count,
                "max_downloads": max_downloads,
                "share_url": f"https://yourapp.com/share/{share_id}",
                "active": datetime.fromisoformat(expires_at_str) > datetime.now()
            })

        return shares

    def cleanup_expired_shares(self):
        """Remove expired shares from database."""
        self.db.execute(
            "DELETE FROM shared_files WHERE expires_at < ?",
            (datetime.now(),)
        )
        self.db.commit()

# Example FastAPI integration
share_service = FileShareService()

@app.post("/share/create")
async def create_file_share(
    owner_id: str,
    bucket: str,
    file_key: str,
    expires_hours: int = 24,
    max_downloads: Optional[int] = None,
    password: Optional[str] = None
):
    try:
        result = await share_service.create_share_link(
            owner_id=owner_id,
            bucket=bucket,
            file_key=file_key,
            expires_hours=expires_hours,
            max_downloads=max_downloads,
            password=password
        )
        return result
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))

@app.get("/share/{share_id}")
async def get_shared_file(share_id: str, password: Optional[str] = None):
    try:
        result = await share_service.get_download_url(share_id, password)
        return result
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))

@app.get("/share/user/{owner_id}")
async def list_user_file_shares(owner_id: str):
    shares = await share_service.list_user_shares(owner_id)
    return {"shares": shares}

# Cleanup job (run periodically)
@app.post("/admin/cleanup-expired")
async def cleanup_expired():
    share_service.cleanup_expired_shares()
    return {"status": "cleanup completed"}