Skip to content

Schema Algebra

Compose schemas from smaller pieces using inheritance or the + operator. This is useful for join results, enriched views, or any DataFrame that combines columns from multiple sources.

Schema inheritance

from typedframes import BaseSchema, Column

class OrderSchema(BaseSchema):
    order_id   = Column(type=int)
    amount     = Column(type=float)

class CustomerSchema(BaseSchema):
    customer_id = Column(type=int)
    name        = Column(type=str)

class ReportSchema(OrderSchema, CustomerSchema):
    """Inherits all columns from both parents."""
    region = Column(type=str)  # add extra columns

The + operator

SchemaA + SchemaB creates a merged schema with all columns from both:

from typedframes import combine_schemas

ReportSchema = OrderSchema + CustomerSchema
# equivalent to multiple inheritance with no extra columns

The + operator raises SchemaConflictError if the same column name appears in both schemas with different types.

Merge key access

Use .s for the merge key when joining DataFrames:

from typing import Annotated
import pandas as pd

JoinedSchema = OrderSchema + CustomerSchema

left:   Annotated[pd.DataFrame, OrderSchema]   = pd.read_csv("orders.csv")
right:  Annotated[pd.DataFrame, CustomerSchema] = pd.read_csv("customers.csv")
merged: Annotated[pd.DataFrame, JoinedSchema]  = left.merge(
    right, on=OrderSchema.order_id.s
)

typedframes.combine_schemas(schema_a, schema_b, name=None)

Combine two schemas into a new schema with all columns from both.

This is the functional equivalent of SchemaA + SchemaB.

Parameters:

Name Type Description Default
schema_a type[BaseSchema]

First schema class.

required
schema_b type[BaseSchema]

Second schema class.

required
name str | None

Optional name for the combined schema. Defaults to "SchemaA_SchemaB".

None

Returns:

Type Description
type[BaseSchema]

A new schema class with columns from both schemas.

Raises:

Type Description
SchemaConflictError

If both schemas have a column with the same name but different types.

Source code in src/typedframes/schema_algebra.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def combine_schemas(
    schema_a: type[BaseSchema],
    schema_b: type[BaseSchema],
    name: str | None = None,
) -> type[BaseSchema]:
    """
    Combine two schemas into a new schema with all columns from both.

    This is the functional equivalent of SchemaA + SchemaB.

    Args:
        schema_a: First schema class.
        schema_b: Second schema class.
        name: Optional name for the combined schema. Defaults to "SchemaA_SchemaB".

    Returns:
        A new schema class with columns from both schemas.

    Raises:
        SchemaConflictError: If both schemas have a column with the same name but different types.

    """
    from .base_schema import BaseSchema

    combined_name = name or f"{schema_a.__name__}_{schema_b.__name__}"

    attrs: dict = {}

    for col_name, col in schema_a.columns().items():
        attrs[col_name] = Column(
            type=col.type,
            alias=col.alias,
            nullable=col.nullable,
            description=col.description,
        )

    for col_name, col in schema_b.columns().items():
        if col_name in attrs:
            existing = attrs[col_name]
            if existing.type != col.type:
                raise SchemaConflictError(col_name, existing.type, schema_a.__name__, col.type, schema_b.__name__)
        else:
            attrs[col_name] = Column(
                type=col.type,
                alias=col.alias,
                nullable=col.nullable,
                description=col.description,
            )

    for cs_name, cs in schema_a.column_sets().items():
        attrs[cs_name] = ColumnSet(
            members=cs.members.copy() if isinstance(cs.members, list) else cs.members,
            type=cs.type,
            regex=cs.regex,
            description=cs.description,
        )

    for cs_name, cs in schema_b.column_sets().items():
        if cs_name not in attrs:
            attrs[cs_name] = ColumnSet(
                members=cs.members.copy() if isinstance(cs.members, list) else cs.members,
                type=cs.type,
                regex=cs.regex,
                description=cs.description,
            )

    # Create the new schema class
    combined_schema = type(combined_name, (BaseSchema,), attrs)

    # Set names on Column/ColumnSet descriptors
    for attr_name, attr_value in attrs.items():
        attr_value.__set_name__(combined_schema, attr_name)

    return combined_schema

typedframes.SchemaConflictError(column_name, type_a, schema_a, type_b, schema_b)

Bases: TypeError

Raised when combining schemas with conflicting column types.

Initialize with details about the conflicting column and types.

Source code in src/typedframes/schema_algebra.py
17
18
19
20
21
22
23
24
25
def __init__(self, column_name: str, type_a: type, schema_a: str, type_b: type, schema_b: str) -> None:
    """Initialize with details about the conflicting column and types."""
    self.column_name = column_name
    self.type_a = type_a
    self.type_b = type_b
    super().__init__(
        f"Column '{column_name}' has conflicting types: "
        f"{type_a.__name__} ({schema_a}) vs {type_b.__name__} ({schema_b})"
    )

Functions