Use when adding new error messages to React, or seeing "unknown error code" warnings.
npx skills add cosmix/loom --skill "code-migration"
Install specific skill from multi-skill repository
# Description
Strategies and patterns for safe code migrations and upgrades. Use when upgrading frameworks, migrating between technologies, handling deprecations, or planning incremental migrations. Triggers: migration, migrate, upgrade, version upgrade, breaking change, deprecation, codemod, codemods, AST transformation, jscodeshift, ts-morph, framework migration, database migration, schema migration, legacy code, modernize, modernization, refactoring, feature flags, rollback, strangler fig, blue-green deployment, canary release, shadow mode, parallel run.
# SKILL.md
name: code-migration
description: Strategies and patterns for safe code migrations and upgrades. Use when upgrading frameworks, migrating between technologies, handling deprecations, or planning incremental migrations. Triggers: migration, migrate, upgrade, version upgrade, breaking change, deprecation, codemod, codemods, AST transformation, jscodeshift, ts-morph, framework migration, database migration, schema migration, legacy code, modernize, modernization, refactoring, feature flags, rollback, strangler fig, blue-green deployment, canary release, shadow mode, parallel run.
Code Migration
Overview
Code migration involves moving codebases between versions, frameworks, or technologies while maintaining stability. This skill covers version upgrade strategies, framework migrations, deprecation handling, automated codemods, incremental migration patterns, testing during migration, feature flags, and rollback strategies.
Instructions
1. Version Upgrade Strategies
Dependency Audit and Planning
# dependency_audit.py
import subprocess
import json
from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum
class RiskLevel(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class DependencyChange:
name: str
current_version: str
target_version: str
risk_level: RiskLevel
breaking_changes: List[str]
migration_notes: str
class DependencyAuditor:
def __init__(self, package_manager: str = "npm"):
self.package_manager = package_manager
def get_outdated(self) -> Dict[str, dict]:
"""Get list of outdated dependencies."""
if self.package_manager == "npm":
result = subprocess.run(
["npm", "outdated", "--json"],
capture_output=True,
text=True
)
return json.loads(result.stdout) if result.stdout else {}
elif self.package_manager == "pip":
result = subprocess.run(
["pip", "list", "--outdated", "--format=json"],
capture_output=True,
text=True
)
return {
pkg["name"]: {
"current": pkg["version"],
"wanted": pkg["latest_version"]
}
for pkg in json.loads(result.stdout)
}
def assess_risk(self, change: DependencyChange) -> RiskLevel:
"""Assess risk level of dependency upgrade."""
current_parts = change.current_version.split(".")
target_parts = change.target_version.split(".")
if current_parts[0] != target_parts[0]:
return RiskLevel.HIGH # Major version change
elif current_parts[1] != target_parts[1]:
return RiskLevel.MEDIUM # Minor version change
else:
return RiskLevel.LOW # Patch version change
def create_migration_plan(dependencies: List[DependencyChange]) -> str:
"""Generate a migration plan document."""
plan = ["# Dependency Migration Plan\n"]
# Group by risk level
for risk in [RiskLevel.LOW, RiskLevel.MEDIUM, RiskLevel.HIGH, RiskLevel.CRITICAL]:
deps = [d for d in dependencies if d.risk_level == risk]
if deps:
plan.append(f"\n## {risk.value.title()} Risk Updates\n")
for dep in deps:
plan.append(f"### {dep.name}: {dep.current_version} -> {dep.target_version}\n")
if dep.breaking_changes:
plan.append("**Breaking Changes:**\n")
for change in dep.breaking_changes:
plan.append(f"- {change}\n")
plan.append(f"**Notes:** {dep.migration_notes}\n")
return "".join(plan)
Staged Upgrade Process
# staged_upgrade.py
from dataclasses import dataclass
from typing import List, Callable
import subprocess
@dataclass
class UpgradeStage:
name: str
description: str
dependencies: List[str]
pre_checks: List[Callable[[], bool]]
post_checks: List[Callable[[], bool]]
rollback_steps: List[str]
class StagedUpgrader:
def __init__(self):
self.stages: List[UpgradeStage] = []
self.completed_stages: List[str] = []
def add_stage(self, stage: UpgradeStage):
self.stages.append(stage)
def execute(self, dry_run: bool = False) -> bool:
for stage in self.stages:
print(f"Executing stage: {stage.name}")
# Run pre-checks
for check in stage.pre_checks:
if not check():
print(f"Pre-check failed for stage: {stage.name}")
return False
if not dry_run:
# Execute upgrade
for dep in stage.dependencies:
self._upgrade_dependency(dep)
# Run post-checks
for check in stage.post_checks:
if not check():
print(f"Post-check failed for stage: {stage.name}")
self._rollback(stage)
return False
self.completed_stages.append(stage.name)
print(f"Completed stage: {stage.name}")
return True
def _upgrade_dependency(self, dep: str):
subprocess.run(["npm", "install", dep], check=True)
def _rollback(self, failed_stage: UpgradeStage):
print(f"Rolling back stage: {failed_stage.name}")
for step in failed_stage.rollback_steps:
subprocess.run(step, shell=True)
# Usage example
def check_tests_pass() -> bool:
result = subprocess.run(["npm", "test"], capture_output=True)
return result.returncode == 0
def check_build_succeeds() -> bool:
result = subprocess.run(["npm", "run", "build"], capture_output=True)
return result.returncode == 0
upgrader = StagedUpgrader()
upgrader.add_stage(UpgradeStage(
name="Core Dependencies",
description="Upgrade React and related packages",
dependencies=["react@18", "react-dom@18"],
pre_checks=[check_tests_pass],
post_checks=[check_tests_pass, check_build_succeeds],
rollback_steps=["git checkout package.json", "npm install"]
))
2. Framework Migrations
Adapter Pattern for Gradual Migration
// Migrating from Express to Fastify example
// adapter.ts - Common interface
interface RequestAdapter {
params: Record<string, string>;
query: Record<string, string>;
body: unknown;
headers: Record<string, string>;
}
interface ResponseAdapter {
status(code: number): ResponseAdapter;
json(data: unknown): void;
send(data: string): void;
}
type RouteHandler = (
req: RequestAdapter,
res: ResponseAdapter,
) => Promise<void>;
// express-adapter.ts
import { Request, Response } from "express";
function adaptExpressRequest(req: Request): RequestAdapter {
return {
params: req.params,
query: req.query as Record<string, string>,
body: req.body,
headers: req.headers as Record<string, string>,
};
}
function adaptExpressResponse(res: Response): ResponseAdapter {
return {
status: (code: number) => {
res.status(code);
return adaptExpressResponse(res);
},
json: (data: unknown) => res.json(data),
send: (data: string) => res.send(data),
};
}
export function wrapForExpress(handler: RouteHandler) {
return async (req: Request, res: Response) => {
await handler(adaptExpressRequest(req), adaptExpressResponse(res));
};
}
// fastify-adapter.ts
import { FastifyRequest, FastifyReply } from "fastify";
function adaptFastifyRequest(req: FastifyRequest): RequestAdapter {
return {
params: req.params as Record<string, string>,
query: req.query as Record<string, string>,
body: req.body,
headers: req.headers as Record<string, string>,
};
}
function adaptFastifyResponse(reply: FastifyReply): ResponseAdapter {
return {
status: (code: number) => {
reply.status(code);
return adaptFastifyResponse(reply);
},
json: (data: unknown) => reply.send(data),
send: (data: string) => reply.send(data),
};
}
export function wrapForFastify(handler: RouteHandler) {
return async (req: FastifyRequest, reply: FastifyReply) => {
await handler(adaptFastifyRequest(req), adaptFastifyResponse(reply));
};
}
// handlers/users.ts - Framework-agnostic handlers
export const getUser: RouteHandler = async (req, res) => {
const { id } = req.params;
const user = await userService.findById(id);
if (!user) {
return res.status(404).json({ error: "User not found" });
}
res.json(user);
};
// routes-express.ts (existing)
import express from "express";
import { wrapForExpress } from "./express-adapter";
import { getUser } from "./handlers/users";
const router = express.Router();
router.get("/users/:id", wrapForExpress(getUser));
// routes-fastify.ts (new)
import fastify from "fastify";
import { wrapForFastify } from "./fastify-adapter";
import { getUser } from "./handlers/users";
const app = fastify();
app.get("/users/:id", wrapForFastify(getUser));
Strangler Fig Pattern
# strangler_fig.py
from fastapi import FastAPI, Request, Response
from httpx import AsyncClient
from typing import Set
class StranglerProxy:
"""Route requests between legacy and new system."""
def __init__(
self,
legacy_url: str,
migrated_routes: Set[str] = None
):
self.legacy_url = legacy_url
self.migrated_routes = migrated_routes or set()
self.client = AsyncClient()
def is_migrated(self, path: str) -> bool:
"""Check if route has been migrated to new system."""
for route in self.migrated_routes:
if path.startswith(route):
return True
return False
async def proxy_to_legacy(self, request: Request) -> Response:
"""Forward request to legacy system."""
url = f"{self.legacy_url}{request.url.path}"
response = await self.client.request(
method=request.method,
url=url,
headers=dict(request.headers),
content=await request.body(),
params=request.query_params,
)
return Response(
content=response.content,
status_code=response.status_code,
headers=dict(response.headers),
)
# Usage
app = FastAPI()
strangler = StranglerProxy(
legacy_url="http://legacy-service:8080",
migrated_routes={"/api/v2/users", "/api/v2/products"}
)
@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def route_request(request: Request, path: str):
if strangler.is_migrated(f"/{path}"):
# Handle in new system (FastAPI routes take precedence)
raise HTTPException(status_code=404)
else:
# Proxy to legacy
return await strangler.proxy_to_legacy(request)
# Migrated endpoint in new system
@app.get("/api/v2/users/{user_id}")
async def get_user(user_id: str):
# New implementation
return await user_service.get(user_id)
3. Deprecation Handling
# deprecation.py
import warnings
import functools
from typing import Callable, Optional
from datetime import date
def deprecated(
reason: str,
replacement: Optional[str] = None,
removal_version: Optional[str] = None,
removal_date: Optional[date] = None
):
"""Mark a function as deprecated with migration guidance."""
def decorator(func: Callable) -> Callable:
message = f"{func.__name__} is deprecated: {reason}"
if replacement:
message += f" Use {replacement} instead."
if removal_version:
message += f" Will be removed in version {removal_version}."
elif removal_date:
message += f" Will be removed after {removal_date}."
@functools.wraps(func)
def wrapper(*args, **kwargs):
warnings.warn(message, DeprecationWarning, stacklevel=2)
return func(*args, **kwargs)
# Add deprecation info to docstring
wrapper.__doc__ = f"DEPRECATED: {message}\n\n{func.__doc__ or ''}"
wrapper._deprecated = True
wrapper._deprecation_info = {
"reason": reason,
"replacement": replacement,
"removal_version": removal_version,
"removal_date": removal_date,
}
return wrapper
return decorator
# Usage
@deprecated(
reason="Legacy authentication method",
replacement="authenticate_v2()",
removal_version="3.0.0"
)
def authenticate(username: str, password: str) -> bool:
"""Old authentication method."""
return legacy_auth(username, password)
def authenticate_v2(credentials: Credentials) -> AuthResult:
"""New authentication method with improved security."""
return modern_auth(credentials)
# Deprecation tracking
class DeprecationTracker:
"""Track deprecated code usage in production."""
def __init__(self):
self.usage_counts: Dict[str, int] = {}
def track(self, func_name: str, caller_info: str):
key = f"{func_name}:{caller_info}"
self.usage_counts[key] = self.usage_counts.get(key, 0) + 1
def get_report(self) -> Dict[str, any]:
return {
"total_deprecated_calls": sum(self.usage_counts.values()),
"by_function": self.usage_counts,
}
tracker = DeprecationTracker()
def tracked_deprecated(tracker: DeprecationTracker):
"""Decorator that tracks deprecated function usage."""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs):
import traceback
caller = traceback.extract_stack()[-2]
tracker.track(func.__name__, f"{caller.filename}:{caller.lineno}")
return func(*args, **kwargs)
return wrapper
return decorator
4. Incremental Migration Patterns
# incremental_migration.py
from typing import TypeVar, Generic, Callable
from dataclasses import dataclass
from enum import Enum
import random
T = TypeVar('T')
class MigrationPhase(Enum):
LEGACY_ONLY = "legacy_only"
SHADOW_MODE = "shadow_mode" # Run both, compare results
CANARY = "canary" # Small % to new
GRADUAL_ROLLOUT = "gradual" # Increasing % to new
NEW_ONLY = "new_only"
@dataclass
class MigrationConfig:
phase: MigrationPhase
new_traffic_percentage: float = 0.0
shadow_mode_enabled: bool = False
comparison_enabled: bool = False
class IncrementalMigrator(Generic[T]):
"""Manage incremental migration between implementations."""
def __init__(
self,
legacy_impl: Callable[..., T],
new_impl: Callable[..., T],
config: MigrationConfig,
comparator: Callable[[T, T], bool] = None
):
self.legacy = legacy_impl
self.new = new_impl
self.config = config
self.comparator = comparator or (lambda a, b: a == b)
self.metrics = MigrationMetrics()
async def execute(self, *args, **kwargs) -> T:
phase = self.config.phase
if phase == MigrationPhase.LEGACY_ONLY:
return await self.legacy(*args, **kwargs)
elif phase == MigrationPhase.SHADOW_MODE:
return await self._execute_shadow(*args, **kwargs)
elif phase in [MigrationPhase.CANARY, MigrationPhase.GRADUAL_ROLLOUT]:
return await self._execute_percentage(*args, **kwargs)
elif phase == MigrationPhase.NEW_ONLY:
return await self.new(*args, **kwargs)
async def _execute_shadow(self, *args, **kwargs) -> T:
"""Run both implementations, return legacy result."""
legacy_result = await self.legacy(*args, **kwargs)
try:
new_result = await self.new(*args, **kwargs)
# Compare results
if self.config.comparison_enabled:
matches = self.comparator(legacy_result, new_result)
self.metrics.record_comparison(matches)
if not matches:
self.metrics.record_mismatch(
args, kwargs, legacy_result, new_result
)
except Exception as e:
self.metrics.record_new_impl_error(e)
return legacy_result
async def _execute_percentage(self, *args, **kwargs) -> T:
"""Route traffic based on percentage."""
use_new = random.random() < self.config.new_traffic_percentage
if use_new:
try:
result = await self.new(*args, **kwargs)
self.metrics.record_new_success()
return result
except Exception as e:
self.metrics.record_new_impl_error(e)
# Fallback to legacy
return await self.legacy(*args, **kwargs)
else:
return await self.legacy(*args, **kwargs)
@dataclass
class MigrationMetrics:
comparisons: int = 0
matches: int = 0
mismatches: int = 0
new_impl_errors: int = 0
new_impl_successes: int = 0
def record_comparison(self, matched: bool):
self.comparisons += 1
if matched:
self.matches += 1
else:
self.mismatches += 1
def record_mismatch(self, args, kwargs, legacy, new):
# Log for debugging
print(f"Mismatch: legacy={legacy}, new={new}")
def record_new_impl_error(self, error: Exception):
self.new_impl_errors += 1
def record_new_success(self):
self.new_impl_successes += 1
@property
def match_rate(self) -> float:
if self.comparisons == 0:
return 0.0
return self.matches / self.comparisons
# Usage
migrator = IncrementalMigrator(
legacy_impl=old_search_service.search,
new_impl=new_search_service.search,
config=MigrationConfig(
phase=MigrationPhase.SHADOW_MODE,
comparison_enabled=True
),
comparator=lambda a, b: set(r.id for r in a) == set(r.id for r in b)
)
results = await migrator.execute(query="test")
5. Feature Flags for Migrations
# feature_flags.py
from typing import Dict, Any, Optional, Callable
from dataclasses import dataclass
from enum import Enum
import hashlib
class FlagState(Enum):
OFF = "off"
ON = "on"
PERCENTAGE = "percentage"
USER_SEGMENT = "user_segment"
@dataclass
class FeatureFlag:
name: str
state: FlagState
percentage: float = 0.0
segments: list = None
default: bool = False
class FeatureFlagService:
def __init__(self):
self.flags: Dict[str, FeatureFlag] = {}
self._overrides: Dict[str, Dict[str, bool]] = {}
def register(self, flag: FeatureFlag):
self.flags[flag.name] = flag
def is_enabled(
self,
flag_name: str,
user_id: Optional[str] = None,
context: Dict[str, Any] = None
) -> bool:
flag = self.flags.get(flag_name)
if not flag:
return False
# Check user override
if user_id and user_id in self._overrides.get(flag_name, {}):
return self._overrides[flag_name][user_id]
if flag.state == FlagState.OFF:
return False
elif flag.state == FlagState.ON:
return True
elif flag.state == FlagState.PERCENTAGE:
return self._check_percentage(flag_name, user_id, flag.percentage)
elif flag.state == FlagState.USER_SEGMENT:
return self._check_segment(user_id, flag.segments, context)
return flag.default
def _check_percentage(
self,
flag_name: str,
user_id: Optional[str],
percentage: float
) -> bool:
"""Deterministic percentage check based on user ID."""
if not user_id:
return False
hash_input = f"{flag_name}:{user_id}"
hash_value = int(hashlib.md5(hash_input.encode()).hexdigest(), 16)
return (hash_value % 100) < (percentage * 100)
def _check_segment(
self,
user_id: Optional[str],
segments: list,
context: Dict[str, Any]
) -> bool:
"""Check if user belongs to enabled segment."""
if not context:
return False
user_segment = context.get("segment")
return user_segment in (segments or [])
def set_override(self, flag_name: str, user_id: str, enabled: bool):
"""Set per-user override for testing."""
if flag_name not in self._overrides:
self._overrides[flag_name] = {}
self._overrides[flag_name][user_id] = enabled
def update_percentage(self, flag_name: str, percentage: float):
"""Update rollout percentage."""
if flag_name in self.flags:
self.flags[flag_name].percentage = percentage
# Migration-specific flags
flags = FeatureFlagService()
flags.register(FeatureFlag(
name="use_new_search_api",
state=FlagState.PERCENTAGE,
percentage=0.1 # 10% rollout
))
flags.register(FeatureFlag(
name="use_new_payment_processor",
state=FlagState.USER_SEGMENT,
segments=["beta_testers", "employees"]
))
# Usage in code
async def search(query: str, user_id: str):
if flags.is_enabled("use_new_search_api", user_id):
return await new_search_service.search(query)
else:
return await legacy_search_service.search(query)
6. Automated Codemods
// codemod-example.ts - Using jscodeshift for automated refactoring
import jscodeshift, { API, FileInfo, Options } from 'jscodeshift';
/**
* Codemod: Migrate from class components to function components
*
* Before:
* class MyComponent extends React.Component {
* render() { return <div>Hello</div>; }
* }
*
* After:
* function MyComponent() {
* return <div>Hello</div>;
* }
*/
export default function transformer(file: FileInfo, api: API, options: Options) {
const j = api.jscodeshift;
const root = j(file.source);
// Find all class components with only render method
root
.find(j.ClassDeclaration)
.filter((path) => {
const superClass = path.value.superClass;
if (!superClass) return false;
// Check if extends React.Component
const extendsReact =
(j.MemberExpression.check(superClass) &&
j.Identifier.check(superClass.object) &&
superClass.object.name === 'React' &&
j.Identifier.check(superClass.property) &&
superClass.property.name === 'Component') ||
(j.Identifier.check(superClass) && superClass.name === 'Component');
if (!extendsReact) return false;
// Check if only has render method
const methods = path.value.body.body.filter((node) =>
j.MethodDefinition.check(node)
);
return (
methods.length === 1 &&
j.Identifier.check(methods[0].key) &&
methods[0].key.name === 'render'
);
})
.replaceWith((path) => {
const className = path.value.id?.name || 'Component';
const renderMethod = path.value.body.body.find(
(node) =>
j.MethodDefinition.check(node) &&
j.Identifier.check(node.key) &&
node.key.name === 'render'
) as any;
const returnStatement = renderMethod.value.body.body.find((stmt: any) =>
j.ReturnStatement.check(stmt)
);
// Create function component
return j.functionDeclaration(
j.identifier(className),
[],
j.blockStatement([returnStatement])
);
});
return root.toSource();
}
// Run with: jscodeshift -t codemod-example.ts src/**/*.tsx
# ast_codemod.py - Using Python AST for refactoring
import ast
import astor
from typing import List
class FunctionRenamer(ast.NodeTransformer):
"""Codemod: Rename deprecated function calls."""
def __init__(self, old_name: str, new_name: str):
self.old_name = old_name
self.new_name = new_name
self.changes: List[str] = []
def visit_Call(self, node: ast.Call) -> ast.Call:
if isinstance(node.func, ast.Name) and node.func.id == self.old_name:
old_lineno = node.lineno
node.func.id = self.new_name
self.changes.append(f"Line {old_lineno}: {self.old_name} -> {self.new_name}")
self.generic_visit(node)
return node
def apply_codemod(source_code: str, old_name: str, new_name: str) -> tuple[str, List[str]]:
"""Apply function renaming codemod to source code."""
tree = ast.parse(source_code)
renamer = FunctionRenamer(old_name, new_name)
new_tree = renamer.visit(tree)
new_source = astor.to_source(new_tree)
return new_source, renamer.changes
# Example: Migrate authenticate() to authenticate_v2()
with open("auth.py") as f:
source = f.read()
new_source, changes = apply_codemod(source, "authenticate", "authenticate_v2")
print("Changes made:")
for change in changes:
print(f" {change}")
with open("auth.py", "w") as f:
f.write(new_source)
// rust_codemod_example.rs - Using syn for Rust AST manipulation
use syn::{visit_mut::VisitMut, File, ItemFn, Expr, ExprMethodCall};
use quote::ToTokens;
/// Codemod: Replace .unwrap() with proper error handling
struct UnwrapReplacer {
changes: Vec<String>,
}
impl VisitMut for UnwrapReplacer {
fn visit_expr_method_call_mut(&mut self, node: &mut ExprMethodCall) {
if node.method == "unwrap" {
// Convert .unwrap() to ?
self.changes.push(format!(
"Replaced .unwrap() with ? operator"
));
// Note: This is simplified - real implementation would
// need to handle the expression transformation properly
}
syn::visit_mut::visit_expr_method_call_mut(self, node);
}
}
fn apply_codemod(source: &str) -> Result<(String, Vec<String>), syn::Error> {
let mut file = syn::parse_file(source)?;
let mut replacer = UnwrapReplacer { changes: Vec::new() };
replacer.visit_file_mut(&mut file);
let new_source = file.to_token_stream().to_string();
Ok((new_source, replacer.changes))
}
7. Incremental Migration Strategies
Dark Launching
# dark_launch.py
from typing import Callable, TypeVar, Generic
import logging
T = TypeVar('T')
class DarkLaunch(Generic[T]):
"""Execute new code path but ignore results (validation only)."""
def __init__(
self,
production_impl: Callable[..., T],
dark_impl: Callable[..., T],
enabled: bool = False,
sample_rate: float = 1.0
):
self.production = production_impl
self.dark = dark_impl
self.enabled = enabled
self.sample_rate = sample_rate
self.logger = logging.getLogger(__name__)
async def execute(self, *args, **kwargs) -> T:
"""Execute production code, optionally run dark code in background."""
# Always run production
result = await self.production(*args, **kwargs)
# Conditionally run dark code (don't block on it)
if self.enabled and random.random() < self.sample_rate:
import asyncio
asyncio.create_task(self._run_dark(*args, **kwargs))
return result
async def _run_dark(self, *args, **kwargs):
"""Run dark implementation and log results/errors."""
try:
dark_result = await self.dark(*args, **kwargs)
self.logger.info(f"Dark launch succeeded: {dark_result}")
except Exception as e:
self.logger.error(f"Dark launch failed: {e}", exc_info=True)
# Usage
dark_search = DarkLaunch(
production_impl=old_search_service.search,
dark_impl=new_search_service.search,
enabled=True,
sample_rate=0.1 # 10% sampling
)
results = await dark_search.execute(query="test")
Branch by Abstraction
// branch-by-abstraction.ts
// Step 1: Create abstraction layer
interface PaymentProcessor {
processPayment(amount: number, token: string): Promise<PaymentResult>;
refund(transactionId: string): Promise<void>;
}
// Step 2: Implement abstraction for legacy system
class StripeAdapter implements PaymentProcessor {
constructor(private stripe: StripeAPI) {}
async processPayment(amount: number, token: string): Promise<PaymentResult> {
const charge = await this.stripe.charges.create({
amount,
source: token,
});
return { transactionId: charge.id, status: 'success' };
}
async refund(transactionId: string): Promise<void> {
await this.stripe.refunds.create({ charge: transactionId });
}
}
// Step 3: Implement abstraction for new system
class BraintreeAdapter implements PaymentProcessor {
constructor(private braintree: BraintreeAPI) {}
async processPayment(amount: number, token: string): Promise<PaymentResult> {
const result = await this.braintree.transaction.sale({
amount: amount.toString(),
paymentMethodNonce: token,
});
return { transactionId: result.transaction.id, status: 'success' };
}
async refund(transactionId: string): Promise<void> {
await this.braintree.transaction.refund(transactionId);
}
}
// Step 4: Route to implementations based on feature flag
class PaymentService {
constructor(
private stripeAdapter: StripeAdapter,
private braintreeAdapter: BraintreeAdapter,
private featureFlags: FeatureFlagService
) {}
private getProcessor(userId: string): PaymentProcessor {
if (this.featureFlags.isEnabled('use_braintree', userId)) {
return this.braintreeAdapter;
}
return this.stripeAdapter;
}
async processPayment(
userId: string,
amount: number,
token: string
): Promise<PaymentResult> {
const processor = this.getProcessor(userId);
return processor.processPayment(amount, token);
}
}
// Step 5: After migration complete, remove abstraction if desired
8. Testing During Migration
# migration_testing.py
import pytest
from typing import Callable, Any, List
from dataclasses import dataclass
@dataclass
class MigrationTestCase:
name: str
input_data: Any
expected_output: Any
legacy_impl: Callable
new_impl: Callable
class MigrationTester:
"""Test framework for validating migration correctness."""
def __init__(self):
self.test_cases: List[MigrationTestCase] = []
self.failures: List[dict] = []
def add_test_case(self, test_case: MigrationTestCase):
self.test_cases.append(test_case)
async def run_all(self) -> bool:
"""Run all test cases comparing legacy vs new implementation."""
all_passed = True
for test_case in self.test_cases:
try:
legacy_result = await test_case.legacy_impl(test_case.input_data)
new_result = await test_case.new_impl(test_case.input_data)
if legacy_result != new_result:
all_passed = False
self.failures.append({
'test': test_case.name,
'input': test_case.input_data,
'legacy': legacy_result,
'new': new_result,
'expected': test_case.expected_output
})
print(f"FAIL: {test_case.name}")
print(f" Legacy: {legacy_result}")
print(f" New: {new_result}")
else:
print(f"PASS: {test_case.name}")
except Exception as e:
all_passed = False
self.failures.append({
'test': test_case.name,
'error': str(e)
})
print(f"ERROR: {test_case.name} - {e}")
return all_passed
def generate_report(self) -> str:
"""Generate migration test report."""
total = len(self.test_cases)
passed = total - len(self.failures)
report = [
"# Migration Test Report",
f"Total: {total} | Passed: {passed} | Failed: {len(self.failures)}",
""
]
if self.failures:
report.append("## Failures")
for failure in self.failures:
report.append(f"- {failure.get('test', 'Unknown')}")
if 'error' in failure:
report.append(f" Error: {failure['error']}")
else:
report.append(f" Legacy: {failure.get('legacy')}")
report.append(f" New: {failure.get('new')}")
return "\n".join(report)
# Usage
tester = MigrationTester()
tester.add_test_case(MigrationTestCase(
name="search_basic_query",
input_data="test query",
expected_output=SearchResults(...),
legacy_impl=old_search.search,
new_impl=new_search.search
))
await tester.run_all()
print(tester.generate_report())
# snapshot_testing.py
import json
import hashlib
from pathlib import Path
class SnapshotTester:
"""Snapshot testing for migration validation."""
def __init__(self, snapshot_dir: str = ".snapshots"):
self.snapshot_dir = Path(snapshot_dir)
self.snapshot_dir.mkdir(exist_ok=True)
def capture_snapshot(self, name: str, data: Any):
"""Capture snapshot of current output."""
snapshot_path = self.snapshot_dir / f"{name}.json"
serialized = json.dumps(data, sort_keys=True, indent=2)
snapshot_path.write_text(serialized)
# Also save hash for quick comparison
hash_path = self.snapshot_dir / f"{name}.hash"
data_hash = hashlib.sha256(serialized.encode()).hexdigest()
hash_path.write_text(data_hash)
def verify_snapshot(self, name: str, data: Any) -> bool:
"""Verify data matches snapshot."""
snapshot_path = self.snapshot_dir / f"{name}.json"
if not snapshot_path.exists():
raise FileNotFoundError(f"Snapshot {name} not found")
expected = snapshot_path.read_text()
actual = json.dumps(data, sort_keys=True, indent=2)
if expected != actual:
# Save diff for debugging
diff_path = self.snapshot_dir / f"{name}.diff"
diff_path.write_text(f"Expected:\n{expected}\n\nActual:\n{actual}")
return False
return True
# Migration workflow with snapshots
snapshot = SnapshotTester()
# 1. Capture baseline from legacy system
legacy_data = await legacy_search.search("test")
snapshot.capture_snapshot("search_test_legacy", legacy_data)
# 2. Run migration
# 3. Verify new system matches legacy snapshots
new_data = await new_search.search("test")
matches = snapshot.verify_snapshot("search_test_legacy", new_data)
assert matches, "New implementation doesn't match legacy behavior"
9. Rollback Strategies
# rollback.py
from dataclasses import dataclass
from typing import List, Callable, Optional
from datetime import datetime
import subprocess
import json
@dataclass
class RollbackPoint:
id: str
timestamp: datetime
description: str
git_commit: str
db_migration_version: str
config_snapshot: dict
rollback_commands: List[str]
class RollbackManager:
def __init__(self):
self.rollback_points: List[RollbackPoint] = []
self.current_point: Optional[RollbackPoint] = None
def create_rollback_point(
self,
description: str,
rollback_commands: List[str] = None
) -> RollbackPoint:
"""Create a rollback point before migration."""
point = RollbackPoint(
id=f"rbp_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
timestamp=datetime.now(),
description=description,
git_commit=self._get_current_commit(),
db_migration_version=self._get_db_version(),
config_snapshot=self._capture_config(),
rollback_commands=rollback_commands or []
)
self.rollback_points.append(point)
self._save_point(point)
return point
def rollback_to(self, point_id: str) -> bool:
"""Execute rollback to specified point."""
point = self._find_point(point_id)
if not point:
raise ValueError(f"Rollback point {point_id} not found")
print(f"Rolling back to: {point.description}")
try:
# 1. Rollback code
self._rollback_code(point.git_commit)
# 2. Rollback database
self._rollback_database(point.db_migration_version)
# 3. Restore config
self._restore_config(point.config_snapshot)
# 4. Execute custom rollback commands
for cmd in point.rollback_commands:
subprocess.run(cmd, shell=True, check=True)
self.current_point = point
return True
except Exception as e:
print(f"Rollback failed: {e}")
return False
def _get_current_commit(self) -> str:
result = subprocess.run(
["git", "rev-parse", "HEAD"],
capture_output=True,
text=True
)
return result.stdout.strip()
def _get_db_version(self) -> str:
# Implementation depends on migration tool
return "migration_version"
def _capture_config(self) -> dict:
# Capture current configuration
return {}
def _rollback_code(self, commit: str):
subprocess.run(["git", "checkout", commit], check=True)
def _rollback_database(self, version: str):
# Example for Alembic
subprocess.run(["alembic", "downgrade", version], check=True)
def _restore_config(self, config: dict):
# Restore configuration
pass
def _save_point(self, point: RollbackPoint):
# Persist rollback point
with open(f"rollback_points/{point.id}.json", "w") as f:
json.dump({
"id": point.id,
"timestamp": point.timestamp.isoformat(),
"description": point.description,
"git_commit": point.git_commit,
"db_migration_version": point.db_migration_version,
"rollback_commands": point.rollback_commands
}, f)
def _find_point(self, point_id: str) -> Optional[RollbackPoint]:
return next(
(p for p in self.rollback_points if p.id == point_id),
None
)
# Automated rollback trigger
class AutoRollback:
def __init__(
self,
rollback_manager: RollbackManager,
health_check: Callable[[], bool],
error_threshold: int = 10,
check_interval: int = 60
):
self.manager = rollback_manager
self.health_check = health_check
self.error_threshold = error_threshold
self.check_interval = check_interval
self.error_count = 0
async def monitor(self, rollback_point_id: str):
"""Monitor health and auto-rollback if needed."""
import asyncio
while True:
try:
if not self.health_check():
self.error_count += 1
print(f"Health check failed ({self.error_count}/{self.error_threshold})")
if self.error_count >= self.error_threshold:
print("Error threshold reached, initiating rollback")
self.manager.rollback_to(rollback_point_id)
break
else:
self.error_count = 0
await asyncio.sleep(self.check_interval)
except Exception as e:
print(f"Monitor error: {e}")
self.error_count += 1
Best Practices
-
Plan Thoroughly: Document migration steps, dependencies, and rollback procedures before starting.
-
Migrate Incrementally: Use strangler fig, feature flags, dark launches, and percentage rollouts to reduce risk.
-
Automate Where Possible: Use codemods (jscodeshift, ts-morph, AST tools) to automate repetitive code transformations.
-
Shadow Test: Run new and old systems in parallel to validate behavior before switching. Compare results and log discrepancies.
-
Test Continuously: Use migration-specific testing (snapshot tests, comparison tests) to validate correctness at each step.
-
Monitor Closely: Track error rates, latency, and business metrics during migration. Set up automated rollback triggers.
-
Branch by Abstraction: Create abstraction layers to decouple migration from feature development.
-
Maintain Backward Compatibility: Keep APIs compatible during transition periods. Support both old and new interfaces.
-
Document Deprecations: Clear deprecation notices with migration guides help consumers. Track deprecated usage.
-
Always Have Rollback: Never deploy a migration without a tested rollback plan. Create rollback points before each phase.
Examples
Complete Migration Workflow
class MigrationWorkflow:
def __init__(self):
self.rollback_manager = RollbackManager()
self.feature_flags = FeatureFlagService()
self.metrics = MigrationMetrics()
async def execute_migration(
self,
name: str,
legacy_impl: Callable,
new_impl: Callable
):
# 1. Create rollback point
rollback_point = self.rollback_manager.create_rollback_point(
description=f"Before {name} migration"
)
# 2. Register feature flag
self.feature_flags.register(FeatureFlag(
name=f"migration_{name}",
state=FlagState.PERCENTAGE,
percentage=0.0
))
# 3. Shadow mode testing
print("Phase 1: Shadow mode testing")
migrator = IncrementalMigrator(
legacy_impl=legacy_impl,
new_impl=new_impl,
config=MigrationConfig(
phase=MigrationPhase.SHADOW_MODE,
comparison_enabled=True
)
)
# Run shadow tests...
if migrator.metrics.match_rate < 0.99:
print("Shadow mode match rate too low, aborting")
return False
# 4. Canary deployment
print("Phase 2: Canary deployment (1%)")
self.feature_flags.update_percentage(f"migration_{name}", 0.01)
# Monitor for issues...
# 5. Gradual rollout
for percentage in [0.05, 0.1, 0.25, 0.5, 0.75, 1.0]:
print(f"Phase 3: Rolling out to {percentage*100}%")
self.feature_flags.update_percentage(f"migration_{name}", percentage)
# Monitor and validate...
# 6. Complete migration
print("Migration complete")
return True
# Supported AI Coding Agents
This skill is compatible with the SKILL.md standard and works with all major AI coding agents:
Learn more about the SKILL.md standard and how to use these skills with your preferred AI coding agent.