A comprehensive Python SDK for performing data quality validations on streaming data records (arrays), Pandas DataFrames, and PySpark DataFrames with complete REST API integration for IBM Cloud Pak for Data.
- Array-based Records: Optimized for streaming data where records are arrays of values
- Metadata-driven: Define table structure and column mappings once
- Fluent API: Chainable method calls for intuitive rule definition
- Score-based Results: Each validation returns detailed scores and pass rates
- Data Quality Dimensions: Track validation checks by 8 standard DQ dimensions (Accuracy, Completeness, Conformity, Consistency, Coverage, Timeliness, Uniqueness, Validity)
- Nine Validation Checks: Comprehensive validation coverage
- LengthCheck: Validates length of any value (converts to string)
- ValidValuesCheck: Validates against allowed list with case-insensitive option
- ComparisonCheck: Compares values using operators, supports all types
- CaseCheck: Validates character case (upper, lower, name, sentence)
- CompletenessCheck: Validates presence (non-null) of values
- RangeCheck: Validates values within min/max range
- RegexCheck: Validates values match regular expression patterns
- FormatCheck: Validates value formats using intelligent format detection
- DataTypeCheck: Validates data types with intelligent type inference
- Type Safety: Full type hints throughout
- Extensible: Easy to add new checks via BaseCheck
- Pandas Support: Memory-efficient chunked processing for large DataFrames
- PySpark Support: Distributed validation using Spark UDFs
- Consistent API: Same interface for both Pandas and PySpark
- Struct Column Output: Single validation result column containing all metrics
- Scalable: Handles DataFrames from thousands to billions of rows
- GlossaryProvider: Fetch glossary terms and data quality constraints from IBM Cloud Pak for Data
- CamsProvider: Fetch data assets from CAMS (Catalog Asset Management System)
- IssuesProvider: Manage data quality issues (occurrences, tested records, ignored status)
- DQSearchProvider: Search for DQ checks and assets by native ID
- Thread-Safe: Concurrent access support with thread-local sessions
- Multi-Environment Support: IBM Cloud, AWS Cloud, Government Cloud, and On-Premises
- Automatic Protocol Handling: Environment-specific authentication methods
- Type-Safe Configuration: Full type hints and validation
- SSL Control: Configurable SSL verification for on-premises
git clone https://github.com/IBM/data-intelligence-sdk.git
cd data-intelligence-sdk
pip install -e .# Install with Pandas support
pip install -e ".[pandas]"
# Install with PySpark support
pip install -e ".[spark]"
# Install with both Pandas and PySpark
pip install -e ".[dataframes]"
# Install everything (including dev dependencies)
pip install -e ".[all]"pip install -e ".[dev]"from wxdi.dq_validator import (
AssetMetadata, ColumnMetadata, DataType,
Validator, ValidationRule,
ComparisonCheck, ComparisonOperator, ValidValuesCheck, LengthCheck
)
# 1. Define asset metadata
metadata = AssetMetadata(
table_name='employee_data',
columns=[
ColumnMetadata('emp_id', DataType.INTEGER),
ColumnMetadata('name', DataType.STRING, length=100),
ColumnMetadata('age', DataType.INTEGER),
ColumnMetadata('department', DataType.STRING, length=50),
ColumnMetadata('salary', DataType.DECIMAL, precision=10, scale=2),
]
)
# 2. Create validator with rules
validator = Validator(metadata)
# Add validation rules
validator.add_rule(
ValidationRule('name')
.add_check(LengthCheck(min_length=2, max_length=100))
)
validator.add_rule(
ValidationRule('department')
.add_check(ValidValuesCheck(
['Engineering', 'Sales', 'HR', 'Finance'],
case_sensitive=False # Default is False
))
)
validator.add_rule(
ValidationRule('age')
.add_check(ComparisonCheck(
operator=ComparisonOperator.GREATER_THAN_OR_EQUAL,
target_value=18
))
)
# 3. Validate records (arrays)
records = [
[1001, 'John Doe', 30, 'Engineering', 75000.00],
[1002, 'J', 25, 'SALES', 65000.00], # Will fail: name too short
[1003, 'Bob Smith', 17, 'HR', 50000.00], # Will fail: age < 18
]
results = validator.validate_batch(records)
# 4. Check results
for idx, result in enumerate(results):
if result.is_valid:
print(f"Record {idx}: ✓ PASS (Score: {result.score})")
else:
print(f"Record {idx}: ✗ FAIL (Score: {result.score})")
for error in result.errors:
print(f" - {error.column_name}: {error.message}")import pandas as pd
from wxdi.dq_validator import AssetMetadata, ColumnMetadata, DataType, Validator, ValidationRule
from wxdi.dq_validator.checks import LengthCheck, ValidValuesCheck
from wxdi.dq_validator.integrations import PandasValidator
# Define metadata and validator (same as array-based validation)
metadata = AssetMetadata(
table_name='employees',
columns=[
ColumnMetadata('emp_id', DataType.INTEGER),
ColumnMetadata('name', DataType.STRING, length=100),
ColumnMetadata('department', DataType.STRING, length=50),
]
)
validator = Validator(metadata)
validator.add_rule(ValidationRule('name').add_check(LengthCheck(min_length=2)))
validator.add_rule(ValidationRule('department').add_check(
ValidValuesCheck(['Engineering', 'Sales', 'HR'], case_sensitive=False)
))
# Create DataFrame
df = pd.DataFrame({
'emp_id': [1001, 1002, 1003],
'name': ['John Doe', 'J', 'Alice'],
'department': ['Engineering', 'SALES', 'Marketing']
})
# Create Pandas validator
pandas_validator = PandasValidator(validator, chunk_size=10000)
# Get summary statistics (memory efficient)
summary = pandas_validator.get_summary_statistics(df)
print(f"Pass Rate: {summary['pass_rate']:.2f}%")
# Add validation column (returns DataFrame with struct column)
df_validated = pandas_validator.add_validation_column(df)
print(df_validated['dq_validation_result'])
# Get invalid rows
invalid_df = pandas_validator.get_invalid_rows(df)
print(f"Found {len(invalid_df)} invalid rows")
# Expand validation column into separate columns
df_expanded = pandas_validator.expand_validation_column(df_validated)
print(df_expanded[['name', 'dq_is_valid', 'dq_score', 'dq_pass_rate']])from pyspark.sql import SparkSession
from wxdi.dq_validator import AssetMetadata, ColumnMetadata, DataType, Validator, ValidationRule
from wxdi.dq_validator.checks import LengthCheck, ValidValuesCheck
from wxdi.dq_validator.integrations import SparkValidator
# Initialize Spark
spark = SparkSession.builder.appName("DataQuality").getOrCreate()
# Define metadata and validator (same as above)
metadata = AssetMetadata(
table_name='employees',
columns=[
ColumnMetadata('emp_id', DataType.INTEGER),
ColumnMetadata('name', DataType.STRING, length=100),
ColumnMetadata('department', DataType.STRING, length=50),
]
)
validator = Validator(metadata)
validator.add_rule(ValidationRule('name').add_check(LengthCheck(min_length=2)))
validator.add_rule(ValidationRule('department').add_check(
ValidValuesCheck(['Engineering', 'Sales', 'HR'], case_sensitive=False)
))
# Create DataFrame
df = spark.createDataFrame([
(1001, 'John Doe', 'Engineering'),
(1002, 'J', 'SALES'),
(1003, 'Alice', 'Marketing')
], ['emp_id', 'name', 'department'])
# Create Spark validator
spark_validator = SparkValidator(validator)
# Get summary statistics (distributed aggregation)
summary = spark_validator.get_summary_statistics(df)
print(f"Pass Rate: {summary['pass_rate']:.2f}%")
# Add validation column (returns DataFrame with struct column)
df_validated = spark_validator.add_validation_column(df)
df_validated.select('name', 'dq_validation_result').show()
# Get invalid rows (distributed filtering)
invalid_df = spark_validator.get_invalid_rows(df)
print(f"Found {invalid_df.count()} invalid rows")
# Expand validation column
df_expanded = spark_validator.expand_validation_column(df_validated)
df_expanded.select('name', 'dq_is_valid', 'dq_score', 'dq_pass_rate').show()
# Write validation report
spark_validator.write_validation_report(df, output_path='validation_report', format='parquet')Defines the structure of your data asset (table) with column information:
metadata = AssetMetadata(
table_name='my_table',
columns=[
ColumnMetadata('id', DataType.INTEGER),
ColumnMetadata('name', DataType.STRING, length=100),
ColumnMetadata('amount', DataType.DECIMAL, precision=10, scale=2),
]
)Defines validation rules for a specific column:
rule = ValidationRule('column_name')
rule.add_check(LengthCheck(min_length=5, max_length=50))
rule.add_check(ValidValuesCheck(['value1', 'value2']))Orchestrates validation across all rules:
validator = Validator(metadata)
validator.add_rule(rule1)
validator.add_rule(rule2)
result = validator.validate(record) # Single record
results = validator.validate_batch(records) # Multiple recordsEach validation check is associated with a Data Quality Dimension that categorizes the type of quality issue it addresses. The SDK supports 8 standard data quality dimensions:
from wxdi.dq_validator.data_quality_dimension import DataQualityDimension
# Available dimensions:
DataQualityDimension.ACCURACY # Data correctly represents real-world values
DataQualityDimension.COMPLETENESS # All required data is present
DataQualityDimension.CONFORMITY # Data conforms to specified formats
DataQualityDimension.CONSISTENCY # Data is consistent across systems
DataQualityDimension.COVERAGE # Data covers the required scope
DataQualityDimension.TIMELINESS # Data is available when needed
DataQualityDimension.UNIQUENESS # No duplicate records exist
DataQualityDimension.VALIDITY # Data values are valid and reasonableDefault Dimensions by Check:
LengthCheck→ VALIDITYValidValuesCheck→ VALIDITYComparisonCheck→ VALIDITYCaseCheck→ CONSISTENCYCompletenessCheck→ COMPLETENESSRangeCheck→ VALIDITYRegexCheck→ VALIDITYFormatCheck→ VALIDITYDataTypeCheck→ VALIDITY
Getting and Setting Dimensions:
from wxdi.dq_validator.checks import LengthCheck
from wxdi.dq_validator.data_quality_dimension import DataQualityDimension
# Create a check (uses default dimension)
check = LengthCheck(min_length=5, max_length=50)
# Get the current dimension
dimension = check.get_dimension()
print(dimension) # DataQualityDimension.VALIDITY
# Change the dimension
check.set_dimension(DataQualityDimension.CONFORMITY)
# Verify the change
print(check.get_dimension()) # DataQualityDimension.CONFORMITYUse Cases:
- Categorize validation failures by dimension for better reporting
- Track dimension-specific metrics (e.g., completeness rate, validity rate)
- Prioritize remediation efforts based on dimension criticality
- Align with data governance frameworks that use dimension-based quality metrics
Validates the length of any value (converted to string).
# String length
LengthCheck(min_length=3, max_length=20)
# Works with any type (converts to string)
LengthCheck(min_length=5) # Integer 12345 → "12345" (length=5)Parameters:
min_length(int, optional): Minimum allowed length (inclusive)max_length(int, optional): Maximum allowed length (inclusive)
Edge Cases:
- None values: Returns error
- Any type: Converts to string using
str(value) - At least one of min_length or max_length must be specified
Validates that a value is in a predefined list of allowed values.
# Case-insensitive (default)
ValidValuesCheck(['active', 'inactive', 'pending'], case_sensitive=False)
# Case-sensitive
ValidValuesCheck(['Active', 'Inactive'], case_sensitive=True)Parameters:
valid_values(list): List of allowed valuescase_sensitive(bool, default=False): If False, string comparisons are case-insensitive
Edge Cases:
- None values: Returns error
- Case-insensitive: 'ACTIVE' matches 'active' when case_sensitive=False
- Non-string types: Always exact match (case_sensitive ignored)
Validates that a value satisfies a comparison operation.
# Column vs constant
ComparisonCheck(
operator=ComparisonOperator.GREATER_THAN,
target_value=18
)
# Column vs column
ComparisonCheck(
operator=ComparisonOperator.GREATER_THAN,
target_column='min_salary'
)
# Using string operator
ComparisonCheck(operator='>=', target_value=0)Operators:
ComparisonOperator.GREATER_THANor'>'ComparisonOperator.LESS_THANor'<'ComparisonOperator.GREATER_THAN_OR_EQUALor'>='ComparisonOperator.LESS_THAN_OR_EQUALor'<='ComparisonOperator.EQUALor'=='ComparisonOperator.NOT_EQUALor'!='
Parameters:
operator(ComparisonOperator or str): Comparison operatortarget_column(str, optional): Column name to compare againsttarget_value(any, optional): Constant value to compare against
Supported Types:
- Numbers (int, float, Decimal)
- Strings (lexicographic comparison)
- Dates and datetimes
- Booleans
- Any comparable type
Validates the character case of string values.
from wxdi.dq_validator import CaseCheck, ColumnCaseEnum
# Upper case
CaseCheck(case_type=ColumnCaseEnum.UPPER_CASE)
# Lower case
CaseCheck(case_type=ColumnCaseEnum.LOWER_CASE)
# Name case (Title Case)
CaseCheck(case_type=ColumnCaseEnum.NAME_CASE)
# Sentence case
CaseCheck(case_type=ColumnCaseEnum.SENTENCE_CASE)Parameters:
case_type(ColumnCaseEnum): Type of case validation
Case Types:
ANY_CASE: Any case is validUPPER_CASE: All uppercase (ABC)LOWER_CASE: All lowercase (abc)NAME_CASE: Title case (John Doe)SENTENCE_CASE: First letter uppercase (Hello world)
Validates presence (non-null) of values.
# Require non-null values
CompletenessCheck(missing_values_allowed=False)
# Allow null values
CompletenessCheck(missing_values_allowed=True)Parameters:
missing_values_allowed(bool): Whether None/null values are allowed
Validates values within min/max range.
# Numeric range
RangeCheck(min_value=0, max_value=100)
# Date range
from datetime import date
RangeCheck(min_value=date(2020, 1, 1), max_value=date(2025, 12, 31))
# String range (lexicographic)
RangeCheck(min_value='A', max_value='Z')Parameters:
min_value(any, optional): Minimum allowed value (inclusive)max_value(any, optional): Maximum allowed value (inclusive)
Supported Types:
- Numeric types (int, float, Decimal)
- Date and datetime
- Strings (lexicographic comparison)
Validates values match regular expression patterns.
# Phone number pattern
RegexCheck(pattern=r'^\d{3}-\d{3}-\d{4}$')
# Email pattern (case-insensitive)
RegexCheck(pattern=r'^[a-z0-9._%+-]+@[a-z0-9.-]+\.[a-z]{2,}$', case_sensitive=False)Parameters:
pattern(str): Regular expression patterncase_sensitive(bool, default=True): Whether pattern matching is case-sensitive
Validates value formats using intelligent format detection.
from wxdi.dq_validator import FormatCheck, FormatConstraintType
# Valid formats
FormatCheck(
constraint_type=FormatConstraintType.ValidFormats,
formats={'%Y-%m-%d', '%d/%m/%Y', '%m-%d-%Y'}
)
# Invalid formats
FormatCheck(
constraint_type=FormatConstraintType.InvalidFormats,
formats={'%Y%m%d'} # Reject this format
)Parameters:
constraint_type(FormatConstraintType): ValidFormats or InvalidFormatsformats(set): Set of format strings
Features:
- Intelligent format detection using InferredTypeEngine
- Supports date, time, and timestamp formats
- UTF-16 compatible format matching
Validates data types with intelligent type inference.
from wxdi.dq_validator import DataTypeCheck, DataType
# Integer type
DataTypeCheck(expected_type=DataType.INTEGER)
# Date type
DataTypeCheck(expected_type=DataType.DATE)
# Decimal type
DataTypeCheck(expected_type=DataType.DECIMAL)Parameters:
expected_type(DataType): Expected data type
Supported Types:
- INTEGER, FLOAT, DECIMAL
- STRING, BOOLEAN
- DATE, TIME, DATETIME, TIMESTAMP
Features:
- Intelligent type inference
- Handles numeric formats (US and DE)
- Date/time format detection
- Pandas Support: Memory-efficient chunked processing for large DataFrames
- PySpark Support: Distributed validation using Spark UDFs
- Consistent API: Same interface for both Pandas and PySpark
- Struct Column Output: Single validation result column containing all metrics
- Column Prefix: Configurable
dq_prefix to prevent column name conflicts - Summary Statistics: Aggregated validation metrics without collecting data
- Invalid Row Filtering: Extract rows that failed validation
- Column Expansion: Expand struct column into individual columns
PandasValidator(validator: Validator, chunk_size: int = 10000, column_prefix: str = "dq_")Methods:
get_summary_statistics(df: pd.DataFrame) -> Dict[str, Any]: Get aggregated validation metricsadd_validation_column(df: pd.DataFrame) -> pd.DataFrame: Add struct column with validation resultsget_invalid_rows(df: pd.DataFrame) -> pd.DataFrame: Filter rows that failed validationget_valid_rows(df: pd.DataFrame) -> pd.DataFrame: Filter rows that passed validationexpand_validation_column(df: pd.DataFrame) -> pd.DataFrame: Expand struct into separate columns
Memory Efficiency:
- Processes data in chunks (default: 10,000 rows)
- O(chunk_size) memory complexity
- Suitable for DataFrames larger than available RAM
SparkValidator(validator: Validator, column_prefix: str = "dq_")Methods:
get_summary_statistics(df: DataFrame) -> Dict[str, Any]: Distributed aggregation of validation metricsadd_validation_column(df: DataFrame) -> DataFrame: Add struct column using UDFget_invalid_rows(df: DataFrame) -> DataFrame: Distributed filtering of invalid rowsget_valid_rows(df: DataFrame) -> DataFrame: Distributed filtering of valid rowsexpand_validation_column(df: DataFrame) -> DataFrame: Expand struct into separate columnswrite_validation_report(df: DataFrame, output_path: str, format: str = 'parquet', mode: str = 'overwrite'): Write validation resultsget_error_sample(df: DataFrame, limit: int = 100) -> List[Dict]: Collect sample of errors
Distributed Processing:
- All operations use Spark's distributed computing
- O(1) driver memory for aggregations
- Scales to billions of rows
The dq_validation_result struct column contains:
{
'is_valid': bool, # True if all checks passed
'score': str, # "5/5" format (passed/total)
'pass_rate': float, # Percentage (0-100)
'total_checks': int, # Total number of checks
'passed_checks': int, # Number of passed checks
'failed_checks': int, # Number of failed checks
'error_count': int, # Number of errors
'errors': List[str] # List of error messages
}ProviderConfig supports two authentication methods:
Option 1: Static Auth Token
from wxdi.dq_validator.provider import ProviderConfig
config = ProviderConfig(
url="https://your-instance.cloud.ibm.com",
auth_token="Bearer your-token",
project_id="your-project-id" # or catalog_id
)Option 2: AuthConfig (Recommended for automatic token management)
from wxdi.dq_validator.provider import ProviderConfig
from wxdi.common.auth import AuthConfig, EnvironmentType
# Create AuthConfig for your environment
auth_config = AuthConfig(
environment_type=EnvironmentType.IBM_CLOUD,
api_key="your-api-key"
)
# Pass AuthConfig to ProviderConfig
config = ProviderConfig(
url="https://your-instance.cloud.ibm.com",
auth_config=auth_config,
project_id="your-project-id"
)
# Token is automatically retrieved when needed
token = config.auth_token # Calls AuthProvider.get_token() internallyThe auth_config parameter enables automatic token management across all providers. When both auth_token and auth_config are provided, auth_config takes precedence for token retrieval.
Fetch glossary terms and data quality constraints from IBM Cloud Pak for Data.
from wxdi.dq_validator.provider import GlossaryProvider
glossary = GlossaryProvider(config)
# Get published artifact by ID
term = glossary.get_published_artifact_by_id("term-id")
# Get term by version ID
term = glossary.get_term_by_version_id("version-id")Fetch data assets from CAMS (Catalog Asset Management System).
from wxdi.dq_validator.provider import CamsProvider
cams = CamsProvider(config)
# Get asset by ID
asset = cams.get_asset_by_id(
asset_id="asset-id",
options={"hide_deprecated_response_fields": "false"}
)
# Access column information
for column in asset.column_info:
print(f"Column: {column.name}, Type: {column.data_type}")Manage data quality issues (occurrences, tested records, ignored status).
from wxdi.dq_validator.provider import IssuesProvider
issues = IssuesProvider(config)
# Update issue occurrences
issues.update_issue_occurrences(issue_id="issue-123", occurrences=767)
# Update tested records
issues.update_tested_records(issue_id="issue-123", tested_records=1000)
# Set ignored status
issues.set_issue_ignored(issue_id="issue-123", ignored=True)
# Update multiple metrics at once
issues.update_issue_metrics(
issue_id="issue-123",
occurrences=767,
tested_records=1000
)Search for DQ checks and assets by native ID.
from wxdi.dq_validator.provider import DQSearchProvider
dq_search = DQSearchProvider(config)
# Search DQ check
check = dq_search.search_dq_check(
native_id="asset-id/check-id",
check_type="format",
project_id="project-id"
)
# Search DQ asset
asset = dq_search.search_dq_asset(
native_id="asset-id/column-name",
asset_type="column",
project_id="project-id"
)Retrieve data assets from CAMS with filtering and pagination support.
from wxdi.dq_validator.provider import DQAssetsProvider
assets = DQAssetsProvider(config)
# Get assets by project ID
assets_list = assets.get_assets(
project_id="project-id",
include_children=True,
asset_type="table"
)
# Get assets by catalog ID
assets_list = assets.get_assets(
catalog_id="catalog-id",
limit=100,
start_token="next-page-token"
)Create and manage data quality checks in CAMS.
from wxdi.dq_validator.provider import ChecksProvider
checks = ChecksProvider(config)
# Create a new check
check_id = checks.create_check(
native_id="asset-id/column-name",
check_type="format",
dimension_id="dimension-id",
project_id="project-id"
)
# Get existing checks
checks_list = checks.get_checks(
native_id="asset-id/column-name",
project_id="project-id",
include_children=True
)Search for data quality dimensions by name.
from wxdi.dq_validator.provider import DimensionsProvider
dimensions = DimensionsProvider(config)
# Search for a dimension by name (case-insensitive)
dimension_id = dimensions.search_dimension("Completeness")The SDK includes a comprehensive authentication module for generating Bearer tokens across different IBM Cloud environments and on-premises installations.
| Environment | Enum Value | Auth Method | Required Credentials |
|---|---|---|---|
| IBM Cloud Standard | EnvironmentType.IBM_CLOUD |
POST (form-encoded) | API Key |
| AWS Cloud (MCSP) | EnvironmentType.AWS_CLOUD |
POST (header-based) | API Key |
| IBM Government Cloud | EnvironmentType.GOV_CLOUD |
POST (JSON) | API Key |
| On-Premises | EnvironmentType.ON_PREM |
GET (headers) | User ID + Password |
from wxdi.dq_validator import EnvironmentType, AuthConfig, TokenGenerator
# IBM Cloud Standard (Production)
config = AuthConfig(
url="https://iam.cloud.ibm.com/identity/token",
environment=EnvironmentType.IBM_CLOUD,
api_key="your-api-key-here"
)
generator = TokenGenerator(config)
token = generator.generate_token()
print(token) # Bearer eyJhbGc...config = AuthConfig(
url="https://iam.cloud.ibm.com/identity/token",
environment=EnvironmentType.IBM_CLOUD,
api_key="your-api-key"
)
generator = TokenGenerator(config)
token = generator.generate_token()
# Returns: "Bearer {access_token}"config = AuthConfig(
url="https://account-iam.platform.test.saas.ibm.com/api/2.0/accounts/your-account-id/apikeys/token",
environment=EnvironmentType.AWS_CLOUD,
api_key="your-aws-cloud-api-key"
)
generator = TokenGenerator(config)
token = generator.generate_token()config = AuthConfig(
url="https://dai.ibmforusgov.com/api/rest/mcsp/apikeys/token",
environment=EnvironmentType.GOV_CLOUD,
api_key="your-gov-api-key"
)
generator = TokenGenerator(config)
token = generator.generate_token()config = AuthConfig(
url="https://localhost:8443/v1/preauth/validateAuth",
environment=EnvironmentType.ON_PREM,
user_id="admin",
password="your-password"
)
generator = TokenGenerator(config)
token = generator.generate_token()import requests
# Generate token
config = AuthConfig(
url="https://iam.cloud.ibm.com/identity/token",
environment=EnvironmentType.IBM_CLOUD,
api_key="your-api-key"
)
generator = TokenGenerator(config)
token = generator.generate_token()
# Use token in API requests
headers = {
'Authorization': token, # Already in "Bearer {token}" format
'Content-Type': 'application/json'
}
response = requests.get('https://api.example.com/endpoint', headers=headers)Each validation returns a ValidationResult object:
result = validator.validate(record)
# Properties
result.is_valid # bool: True if no errors
result.score # str: "5/5" (passed/total)
result.pass_rate # float: 100.0 (percentage)
result.total_checks # int: Total number of checks
result.passed_checks # int: Number of passed checks
result.failed_checks # int: Number of failed checks
result.errors # List[ValidationError]: List of errors
# Convert to dictionary
result_dict = result.to_dict()Each error contains detailed information:
error = result.errors[0]
error.column_name # str: Name of the column
error.check_name # str: Type of check that failed
error.message # str: Human-readable error message
error.value # any: The value that failed
error.expected # any: Expected value/constraint
# Convert to dictionary
error_dict = error.to_dict()See complete working examples in the examples/ directory:
basic_usage.py- Array-based validation examplepandas_dataframe_usage.py- Pandas DataFrame validation examplespark_dataframe_usage.py- PySpark DataFrame validation exampleconsolidation_usage.py- Consolidated statistics and dimension-based reportingauth_usage.py- Authentication examples (296 lines)assets_usage.py- DQAssetsProvider usage examples (210 lines)glossary_usage.py- GlossaryProvider usage examples (250 lines)checks_usage.py- ChecksProvider usage examples (272 lines)dimensions_usage.py- DimensionsProvider usage examples (146 lines)issues_usage.py- IssuesProvider usage examples (124 lines)dq_workflow_usage.py- Complete DQ workflow (317 lines)
data-intelligence-sdk/
├── src/
│ └── wxdi/
│ ├── __init__.py
│ ├── common/
│ │ ├── __init__.py
│ │ └── auth/
│ │ ├── __init__.py
│ │ ├── auth_config.py
│ │ ├── auth_provider.py
│ │ ├── gov_cloud_authenticator.py
│ │ └── gov_cloud_token_manager.py
│ └── dq_validator/
│ ├── __init__.py
│ ├── metadata.py # DataType, ColumnMetadata, AssetMetadata
│ ├── datatypes.py # DataType enum
│ ├── data_quality_dimension.py # DataQualityDimension enum
│ ├── base.py # BaseCheck, ValidationError
│ ├── result.py # ValidationResult
│ ├── rule.py # ValidationRule
│ ├── validator.py # Validator
│ ├── rule_loader.py # RuleLoader for external providers
│ ├── inferred_engine.py # InferredTypeEngine
│ ├── format_engine.py # FormatEngine
│ ├── issue_reporting.py # Issue reporter utility
│ ├── datetime_formats.py # Date/time format definitions
│ ├── utils.py # Utility functions
│ ├── version.py # Version information
│ ├── result_consolidator.py # Result consolidation
│ ├── checks/
│ │ ├── __init__.py
│ │ ├── length_check.py
│ │ ├── valid_values_check.py
│ │ ├── comparison_check.py
│ │ ├── case_check.py
│ │ ├── completeness_check.py
│ │ ├── range_check.py
│ │ ├── regex_check.py
│ │ ├── format_check.py
│ │ └── datatype_check.py
│ ├── integrations/
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── pandas_validator.py
│ │ └── spark_validator.py
│ └── provider/
│ ├── __init__.py
│ ├── base_provider.py
│ ├── config.py
│ ├── glossary.py
│ ├── cams.py
│ ├── assets.py
│ ├── checks.py
│ ├── dimensions.py
│ ├── issues.py
│ ├── dq_search.py
│ ├── constraint_model.py
│ ├── data_asset_model.py
│ └── response_model.py
├── examples/
│ ├── basic_usage.py
│ ├── pandas_dataframe_usage.py
│ ├── spark_dataframe_usage.py
│ ├── auth_usage.py
│ ├── assets_usage.py
│ ├── checks_usage.py
│ ├── dimensions_usage.py
│ ├── issues_usage.py
│ └── dq_workflow_usage.py
├── setup.py
├── requirements.txt
├── pyproject.toml
└── README.md
Contributions are welcome! Please follow these guidelines:
- Fork the repository
- Create a feature branch
- Add tests for new functionality
- Ensure all tests pass
- Submit a pull request
Apache License 2.0 - see LICENSE file for details
For issues, questions, or contributions, please open an issue on GitHub.
- README.md: This file - comprehensive user guide
✅ 9 Validation Checks - Comprehensive validation coverage
✅ DataFrame Support - Pandas and PySpark integration
✅ REST API Integration - Complete provider module
✅ Multi-Environment Auth - 4 cloud environments supported
✅ Memory Efficient - Chunked processing for Pandas
✅ Distributed Processing - Spark UDF-based validation
✅ Thread-Safe - Concurrent access support
✅ Type-Safe - Full type hints throughout
✅ Extensible - Easy to add new checks
✅ Production Ready - 400+ tests, fully documented
- Python 3.8
- Python 3.9
- Python 3.10
- Python 3.11
- Python 3.12
Core:
- pydantic >= 2.12.0
- requests >= 2.28.0
- regex >= 2023.0.0
Optional:
- pandas >= 1.3.0 (for Pandas support)
- pyspark >= 3.0.0 (for PySpark support)
Development:
- pytest >= 7.0.0
- pytest-cov >= 4.0.0
- pytest-mock >= 3.7.0
- black >= 23.0.0
- mypy >= 1.0.0
- flake8 >= 6.0.0