alinaqi

azure-cosmosdb

457
37
# Install this skill:
npx skills add alinaqi/claude-bootstrap --skill "azure-cosmosdb"

Install specific skill from multi-skill repository

# Description

Azure Cosmos DB partition keys, consistency levels, change feed, SDK patterns

# SKILL.md


name: azure-cosmosdb
description: Azure Cosmos DB partition keys, consistency levels, change feed, SDK patterns


Azure Cosmos DB Skill

Load with: base.md + [typescript.md | python.md]

Azure Cosmos DB is a globally distributed, multi-model database with guaranteed low latency, elastic scalability, and multiple consistency models.

Sources: Cosmos DB Docs | Partitioning | SDK


Core Principle

Choose partition key wisely, design for your access patterns, understand consistency tradeoffs.

Cosmos DB distributes data across partitions. Your partition key choice determines scalability, performance, and cost. Design for even distribution and query efficiency.


Cosmos DB APIs

API Use Case
NoSQL (Core) Document database, most flexible
MongoDB MongoDB wire protocol compatible
PostgreSQL Distributed PostgreSQL (Citus)
Apache Cassandra Wide-column store
Apache Gremlin Graph database
Table Key-value (Azure Table Storage compatible)

This skill focuses on NoSQL (Core) API - the most common choice.


Key Concepts

Concept Description
Container Collection of items (like a table)
Item Single document/record (JSON)
Partition Key Determines data distribution
Logical Partition Items with same partition key
Physical Partition Storage unit (max 50GB, 10K RU/s)
RU (Request Unit) Throughput currency

Partition Key Design

Good Partition Keys

// High cardinality, even distribution, used in queries

// E-commerce: userId for user data
{ "id": "order-123", "userId": "user-456", ... }  // PK: /userId

// Multi-tenant: tenantId
{ "id": "doc-1", "tenantId": "tenant-abc", ... }  // PK: /tenantId

// IoT: deviceId for telemetry
{ "id": "reading-1", "deviceId": "device-789", ... }  // PK: /deviceId

// Logs: synthetic key (date + category)
{ "id": "log-1", "partitionKey": "2024-01-15_errors", ... }  // PK: /partitionKey

Hierarchical Partition Keys

// For multi-level distribution (e.g., tenant β†’ user)
// Container created with: /tenantId, /userId

{
  "id": "order-123",
  "tenantId": "acme-corp",
  "userId": "user-456",
  "items": [...]
}

// Query within tenant and user efficiently

Bad Partition Keys

// Avoid:
// - Low cardinality (status, type, boolean)
// - Monotonically increasing (timestamp, auto-increment)
// - Frequently updated fields
// - Fields not used in queries

// Bad: Only 3 values β†’ hot partitions
{ "status": "pending" | "completed" | "cancelled" }

// Bad: All writes go to latest partition
{ "timestamp": "2024-01-15T10:30:00Z" }

SDK Setup (TypeScript)

Install

npm install @azure/cosmos

Initialize Client

// lib/cosmosdb.ts
import { CosmosClient, Database, Container } from '@azure/cosmos';

const endpoint = process.env.COSMOS_ENDPOINT!;
const key = process.env.COSMOS_KEY!;
const databaseId = process.env.COSMOS_DATABASE!;

const client = new CosmosClient({ endpoint, key });

// Or with connection string
// const client = new CosmosClient(process.env.COSMOS_CONNECTION_STRING!);

export const database: Database = client.database(databaseId);

export function getContainer(containerId: string): Container {
  return database.container(containerId);
}

Type Definitions

// types/cosmos.ts
export interface BaseItem {
  id: string;
  _ts?: number;      // Auto-generated timestamp
  _etag?: string;    // For optimistic concurrency
}

export interface User extends BaseItem {
  userId: string;    // Partition key
  email: string;
  name: string;
  createdAt: string;
  updatedAt: string;
}

export interface Order extends BaseItem {
  userId: string;    // Partition key
  orderId: string;
  items: OrderItem[];
  total: number;
  status: 'pending' | 'paid' | 'shipped' | 'delivered';
  createdAt: string;
}

export interface OrderItem {
  productId: string;
  name: string;
  quantity: number;
  price: number;
}

CRUD Operations

Create Item

import { getContainer } from './cosmosdb';
import { User } from './types';

const usersContainer = getContainer('users');

async function createUser(data: Omit<User, 'id' | 'createdAt' | 'updatedAt'>): Promise<User> {
  const now = new Date().toISOString();
  const user: User = {
    id: crypto.randomUUID(),
    ...data,
    createdAt: now,
    updatedAt: now
  };

  const { resource } = await usersContainer.items.create(user);
  return resource as User;
}

Read Item (Point Read)

// Most efficient read - requires id AND partition key
async function getUser(userId: string, id: string): Promise<User | null> {
  try {
    const { resource } = await usersContainer.item(id, userId).read<User>();
    return resource || null;
  } catch (error: any) {
    if (error.code === 404) return null;
    throw error;
  }
}

// If id equals partition key value
async function getUserById(userId: string): Promise<User | null> {
  try {
    const { resource } = await usersContainer.item(userId, userId).read<User>();
    return resource || null;
  } catch (error: any) {
    if (error.code === 404) return null;
    throw error;
  }
}

Query Items

// Query within partition (efficient)
async function getUserOrders(userId: string): Promise<Order[]> {
  const ordersContainer = getContainer('orders');

  const { resources } = await ordersContainer.items
    .query<Order>({
      query: 'SELECT * FROM c WHERE c.userId = @userId ORDER BY c.createdAt DESC',
      parameters: [{ name: '@userId', value: userId }]
    })
    .fetchAll();

  return resources;
}

// Cross-partition query (use sparingly)
async function getOrdersByStatus(status: string): Promise<Order[]> {
  const ordersContainer = getContainer('orders');

  const { resources } = await ordersContainer.items
    .query<Order>({
      query: 'SELECT * FROM c WHERE c.status = @status',
      parameters: [{ name: '@status', value: status }]
    })
    .fetchAll();

  return resources;
}

// Paginated query
async function getOrdersPaginated(
  userId: string,
  pageSize: number = 10,
  continuationToken?: string
): Promise<{ items: Order[]; continuationToken?: string }> {
  const ordersContainer = getContainer('orders');

  const queryIterator = ordersContainer.items.query<Order>(
    {
      query: 'SELECT * FROM c WHERE c.userId = @userId ORDER BY c.createdAt DESC',
      parameters: [{ name: '@userId', value: userId }]
    },
    {
      maxItemCount: pageSize,
      continuationToken
    }
  );

  const { resources, continuationToken: nextToken } = await queryIterator.fetchNext();

  return {
    items: resources,
    continuationToken: nextToken
  };
}

Update Item

// Replace entire item
async function updateUser(userId: string, id: string, updates: Partial<User>): Promise<User> {
  const existing = await getUser(userId, id);
  if (!existing) throw new Error('User not found');

  const updated: User = {
    ...existing,
    ...updates,
    updatedAt: new Date().toISOString()
  };

  const { resource } = await usersContainer.item(id, userId).replace(updated);
  return resource as User;
}

// Partial update (patch operations)
async function patchUser(userId: string, id: string, operations: any[]): Promise<User> {
  const { resource } = await usersContainer.item(id, userId).patch(operations);
  return resource as User;
}

// Usage:
await patchUser('user-123', 'user-123', [
  { op: 'set', path: '/name', value: 'New Name' },
  { op: 'set', path: '/updatedAt', value: new Date().toISOString() },
  { op: 'incr', path: '/loginCount', value: 1 }
]);

Delete Item

async function deleteUser(userId: string, id: string): Promise<void> {
  await usersContainer.item(id, userId).delete();
}

Optimistic Concurrency (ETags)

async function updateUserWithETag(
  userId: string,
  id: string,
  updates: Partial<User>,
  etag: string
): Promise<User> {
  const existing = await getUser(userId, id);
  if (!existing) throw new Error('User not found');

  const updated: User = {
    ...existing,
    ...updates,
    updatedAt: new Date().toISOString()
  };

  try {
    const { resource } = await usersContainer.item(id, userId).replace(updated, {
      accessCondition: { type: 'IfMatch', condition: etag }
    });
    return resource as User;
  } catch (error: any) {
    if (error.code === 412) {
      throw new Error('Document was modified by another process');
    }
    throw error;
  }
}

Consistency Levels

Level Guarantees Latency Use Case
Strong Linearizable reads Highest Financial, inventory
Bounded Staleness Consistent within bounds High Leaderboards, counters
Session Read your writes Medium User sessions (default)
Consistent Prefix Ordered reads Low Social feeds
Eventual No ordering guarantee Lowest Analytics, logs

Set Consistency Per Request

// Override default consistency
const { resource } = await usersContainer.item(id, userId).read<User>({
  consistencyLevel: 'Strong'
});

// For queries
const { resources } = await container.items.query(
  { query: 'SELECT * FROM c' },
  { consistencyLevel: 'BoundedStaleness' }
).fetchAll();

Batch Operations

Transactional Batch (Same Partition)

async function createOrderWithItems(userId: string, order: Order, items: any[]): Promise<void> {
  const ordersContainer = getContainer('orders');

  const operations = [
    { operationType: 'Create' as const, resourceBody: order },
    ...items.map(item => ({
      operationType: 'Create' as const,
      resourceBody: { ...item, userId, orderId: order.orderId }
    }))
  ];

  const { result } = await ordersContainer.items.batch(operations, userId);

  // Check if any operation failed
  if (result.some(r => r.statusCode >= 400)) {
    throw new Error('Batch operation failed');
  }
}

Bulk Operations

// For large-scale imports (not transactional)
async function bulkImportUsers(users: User[]): Promise<void> {
  const operations = users.map(user => ({
    operationType: 'Create' as const,
    resourceBody: user,
    partitionKey: user.userId
  }));

  // Process in chunks
  const chunkSize = 100;
  for (let i = 0; i < operations.length; i += chunkSize) {
    const chunk = operations.slice(i, i + chunkSize);
    await usersContainer.items.bulk(chunk);
  }
}

Change Feed

Process Changes

import { ChangeFeedStartFrom } from '@azure/cosmos';

async function processChangeFeed(): Promise<void> {
  const container = getContainer('orders');

  const changeFeedIterator = container.items.changeFeed({
    changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
  });

  while (changeFeedIterator.hasMoreResults) {
    const { result: items, statusCode } = await changeFeedIterator.fetchNext();

    if (statusCode === 304) {
      // No new changes
      await sleep(1000);
      continue;
    }

    for (const item of items) {
      console.log('Changed item:', item);
      // Process the change...
    }
  }
}

// For production, use Change Feed Processor with lease container

Change Feed Processor Pattern

async function startChangeFeedProcessor(): Promise<void> {
  const sourceContainer = getContainer('orders');
  const leaseContainer = getContainer('leases');

  const changeFeedProcessor = sourceContainer.items.changeFeed
    .for(item => {
      // Process each change
      console.log('Processing:', item);
    })
    .withLeaseContainer(leaseContainer)
    .build();

  await changeFeedProcessor.start();
}

Python SDK

Install

pip install azure-cosmos

Setup and Operations

# cosmos_db.py
import os
from azure.cosmos import CosmosClient, PartitionKey
from azure.cosmos.exceptions import CosmosResourceNotFoundError
from typing import Optional, List
from datetime import datetime
import uuid

# Initialize client
endpoint = os.environ['COSMOS_ENDPOINT']
key = os.environ['COSMOS_KEY']
database_name = os.environ['COSMOS_DATABASE']

client = CosmosClient(endpoint, key)
database = client.get_database_client(database_name)


def get_container(container_name: str):
    return database.get_container_client(container_name)


# CRUD Operations
users_container = get_container('users')


def create_user(email: str, name: str, user_id: str = None) -> dict:
    user_id = user_id or str(uuid.uuid4())
    now = datetime.utcnow().isoformat()

    user = {
        'id': user_id,
        'userId': user_id,  # Partition key
        'email': email,
        'name': name,
        'createdAt': now,
        'updatedAt': now
    }

    return users_container.create_item(user)


def get_user(user_id: str) -> Optional[dict]:
    try:
        return users_container.read_item(item=user_id, partition_key=user_id)
    except CosmosResourceNotFoundError:
        return None


def query_users(email_domain: str) -> List[dict]:
    query = "SELECT * FROM c WHERE CONTAINS(c.email, @domain)"
    parameters = [{'name': '@domain', 'value': email_domain}]

    return list(users_container.query_items(
        query=query,
        parameters=parameters,
        enable_cross_partition_query=True
    ))


def update_user(user_id: str, **updates) -> dict:
    user = get_user(user_id)
    if not user:
        raise ValueError('User not found')

    user.update(updates)
    user['updatedAt'] = datetime.utcnow().isoformat()

    return users_container.replace_item(item=user_id, body=user)


def delete_user(user_id: str) -> None:
    users_container.delete_item(item=user_id, partition_key=user_id)


# Paginated query
def get_users_paginated(page_size: int = 10, continuation_token: str = None):
    query = "SELECT * FROM c ORDER BY c.createdAt DESC"

    items = users_container.query_items(
        query=query,
        enable_cross_partition_query=True,
        max_item_count=page_size,
        continuation_token=continuation_token
    )

    page = items.by_page()
    results = list(next(page))

    return {
        'items': results,
        'continuation_token': page.continuation_token
    }

Indexing

Custom Indexing Policy

{
  "indexingMode": "consistent",
  "automatic": true,
  "includedPaths": [
    { "path": "/userId/?" },
    { "path": "/status/?" },
    { "path": "/createdAt/?" }
  ],
  "excludedPaths": [
    { "path": "/content/*" },
    { "path": "/_etag/?" }
  ],
  "compositeIndexes": [
    [
      { "path": "/userId", "order": "ascending" },
      { "path": "/createdAt", "order": "descending" }
    ]
  ]
}

Create Container with Index

await database.containers.createIfNotExists({
  id: 'orders',
  partitionKey: { paths: ['/userId'] },
  indexingPolicy: {
    indexingMode: 'consistent',
    includedPaths: [
      { path: '/userId/?' },
      { path: '/status/?' },
      { path: '/createdAt/?' }
    ],
    excludedPaths: [
      { path: '/*' }  // Exclude all by default
    ]
  }
});

Throughput Management

Provisioned Throughput

// Container level
await database.containers.createIfNotExists({
  id: 'orders',
  partitionKey: { paths: ['/userId'] },
  throughput: 1000  // RU/s
});

// Scale throughput
const container = database.container('orders');
await container.throughput.replace(2000);

Autoscale

await database.containers.createIfNotExists({
  id: 'orders',
  partitionKey: { paths: ['/userId'] },
  maxThroughput: 10000  // Auto-scales 10% to 100%
});

Serverless

// No throughput configuration needed
// Pay per request (good for dev/test, intermittent workloads)
await database.containers.createIfNotExists({
  id: 'orders',
  partitionKey: { paths: ['/userId'] }
  // No throughput = serverless
});

CLI Quick Reference

# Azure CLI
az cosmosdb create --name myaccount --resource-group mygroup
az cosmosdb sql database create --account-name myaccount --name mydb --resource-group mygroup
az cosmosdb sql container create \
  --account-name myaccount \
  --database-name mydb \
  --name orders \
  --partition-key-path /userId \
  --throughput 400

# Query
az cosmosdb sql query --account-name myaccount --database-name mydb \
  --container-name orders --query "SELECT * FROM c"

# Keys
az cosmosdb keys list --name myaccount --resource-group mygroup
az cosmosdb keys list --name myaccount --resource-group mygroup --type connection-strings

Cost Optimization

Strategy Impact
Right partition key Avoid hot partitions (wasted RUs)
Index only what you query Reduce write RU cost
Use point reads 1 RU vs 3+ RU for queries
Serverless for dev/test Pay per request
Autoscale for production Scale down during low traffic
TTL for temporary data Auto-delete old items

Time-to-Live (TTL)

// Enable TTL on container
await database.containers.createIfNotExists({
  id: 'sessions',
  partitionKey: { paths: ['/userId'] },
  defaultTtl: 3600  // 1 hour
});

// Per-item TTL
const session = {
  id: 'session-123',
  userId: 'user-456',
  ttl: 1800  // Override: 30 minutes
};

Anti-Patterns

  • Bad partition key - Low cardinality causes hot partitions
  • Cross-partition queries - Expensive; design for single-partition queries
  • Over-indexing - Increases write cost; index only queried paths
  • Large items - Max 2MB; store blobs in Azure Blob Storage
  • Ignoring RU cost - Monitor and optimize expensive queries
  • Strong consistency everywhere - Use Session (default) unless required
  • No retry logic - Handle 429 (throttling) with exponential backoff
  • Missing TTL - Set TTL for temporary/session data

# Supported AI Coding Agents

This skill is compatible with the SKILL.md standard and works with all major AI coding agents:

Learn more about the SKILL.md standard and how to use these skills with your preferred AI coding agent.