Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 165 additions & 0 deletions api/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
assert_permission,
close_cursor,
close_raw_connection,
describe_columns,
describe_constraints,
load_cursor_from_context,
load_session_from_context,
open_cursor,
Expand Down Expand Up @@ -392,3 +394,166 @@ def get_request_data_dict(request: Request) -> dict:
if isinstance(request.data, dict):
return request.data
raise TypeError(type(request.data))


def get_column_description(table_obj: Table):
"""Return list of column descriptions:
[{
"name": str,
"data_type": str,
"is_nullable": bool,
"is_pk": bool
}]

"""

def get_datatype_str(column_def):
"""get single string sql type definition.

We want the data type definition to be a simple string, e.g. decimal(10, 6)
or varchar(128), so we need to combine the various fields
(type, numeric_precision, numeric_scale, ...)
"""
# for reverse validation, see also api.parser.parse_type(dt_string)
dt = column_def["data_type"].lower()
precisions = None
if dt.startswith("character"):
if dt == "character varying":
dt = "varchar"
else:
dt = "char"
precisions = [column_def["character_maximum_length"]]
elif dt.endswith(" without time zone"): # this is the default
dt = dt.replace(" without time zone", "")
elif re.match("(numeric|decimal)", dt):
precisions = [column_def["numeric_precision"], column_def["numeric_scale"]]
elif dt == "interval":
precisions = [column_def["interval_precision"]]
elif re.match(".*int", dt) and re.match(
"nextval", column_def.get("column_default") or ""
):
# dt = dt.replace('int', 'serial')
pass
elif dt.startswith("double"):
dt = "float"
if precisions: # remove None
precisions = [x for x in precisions if x is not None]
if precisions:
dt += "(%s)" % ", ".join(str(x) for x in precisions)
return dt

def get_pk_fields(constraints):
"""Get the column names that make up the primary key
from the constraints definitions.

NOTE: Currently, the wizard to create tables only supports
single fields primary keys (which is advisable anyways)
"""
pk_fields = []
for _name, constraint in constraints.items():
if constraint.get("constraint_type") == "PRIMARY KEY":
m = re.match(
r"PRIMARY KEY[ ]*\(([^)]+)", constraint.get("definition") or ""
)
if m:
# "f1, f2" -> ["f1", "f2"]
pk_fields = [x.strip() for x in m.groups()[0].split(",")]
return pk_fields

_columns = describe_columns(table_obj)
_constraints = describe_constraints(table_obj)
pk_fields = get_pk_fields(_constraints)
# order by ordinal_position
columns = []
for name, col in sorted(
_columns.items(), key=lambda kv: int(kv[1]["ordinal_position"])
):
columns.append(
{
"name": name,
"data_type": get_datatype_str(col),
"is_nullable": col["is_nullable"],
"is_pk": name in pk_fields,
"unit": None,
"description": None,
}
)
return columns


def sync_api_metadata_columns(metadata: dict, table_obj: Table) -> dict:
"""
Enforces that the metadata 'fields' exactly match the physical database columns,
while preserving human annotations (isAbout, valueReference, etc.).
"""
# 1. Get the physical truths from the database
physical_columns = get_column_description(table_obj)

# Ensure resources array exists safely
if not metadata.get("resources"):
metadata["resources"] = [{}]
resource = metadata["resources"][0]

if "schema" not in resource:
resource["schema"] = {}

# 2. Extract incoming fields from the API payload
incoming_fields = resource["schema"].get("fields", [])

# Create a fast lookup dict by column name
incoming_fields_lookup = {
field.get("name"): field
for field in incoming_fields
if isinstance(field, dict) and field.get("name")
}

updated_fields = []

# 3. Iterate through physical database columns (The Source of Truth)
for db_col in physical_columns:
col_name = db_col["name"]

# Grab the incoming human data if it exists, otherwise empty dict
incoming_col = incoming_fields_lookup.get(col_name, {})

# Ensure 'id' is strictly not nullable
is_nullable = db_col["is_nullable"]
if col_name == "id":
is_nullable = False

# Start with a copy of the incoming column to preserve all human annotations
# (isAbout, valueReference, description, unit, etc.)
merged_col = incoming_col.copy()

# Overwrite physical constraints strictly based on the database
merged_col.update(
{
"name": col_name,
"type": db_col["data_type"],
"nullable": is_nullable,
}
)

# Ensure default keys exist if they weren't in the incoming payload
merged_col.setdefault("description", None)
merged_col.setdefault("unit", None)

# --- ARTIFACT SCRUBBER ---
# Just in case a user copy-pasted raw JSON from the UI into an API tool,
# we scrub the UI-only 'openModalButton' to keep the DB clean.
if isinstance(merged_col.get("isAbout"), list):
for item in merged_col["isAbout"]:
item.pop("openModalButton", None)

if isinstance(merged_col.get("valueReference"), list):
for item in merged_col["valueReference"]:
item.pop("openModalButton", None)

updated_fields.append(merged_col)

# 4. Overwrite the payload's fields with our perfectly reconciled list
# Note: Because we iterate over `physical_columns`, any "phantom" columns
# that were in the JSON but not in the DB are automatically dropped!
resource["schema"]["fields"] = updated_fields

return metadata
90 changes: 85 additions & 5 deletions api/tests/test_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,31 @@ def metadata_roundtrip(self, meta):
self.api_req("post", path="meta/", data=meta)
omi_meta_return = self.api_req("get", path="meta/")

omi_meta = meta
# Use deepcopy so we don't mutate the original example constant
omi_meta = deepcopy(meta)

# ignore diff in keywords (by setting resulting keywords == input keywords)
# REASON: the test re-uses the same test table,
# but does not delete the table tags in between
# if we want to synchronize tagsand keywords, the roundtrip would otherwise fail
# if we want to synchronize tags and keywords, the roundtrip would otherwise
# fail
omi_meta["resources"][0]["keywords"] = omi_meta["resources"][0].get(
"keywords", []
)
omi_meta_return["resources"][0]["keywords"] = omi_meta["resources"][0][
"keywords"
]

# ignore diff in schema (by setting resulting schema == input schema)
# REASON: The backend now actively synchronizes the metadata 'schema.fields'
# with the physical database columns. Since the test table's physical columns
# differ from the OEMETADATA_V20_EXAMPLE dummy columns, they will naturally
# differ.
if "schema" in omi_meta_return["resources"][0]:
omi_meta["resources"][0]["schema"] = omi_meta_return["resources"][0][
"schema"
]

self.assertDictEqualKeywise(
omi_meta_return["resources"][0], omi_meta["resources"][0]
)
Expand All @@ -44,10 +56,78 @@ def test_nonexistent_key(self):
self.api_req("post", path="meta/", data=meta)

def test_set_meta(self):
meta = OEMETADATA_V20_EXAMPLE
meta = deepcopy(OEMETADATA_V20_EXAMPLE)
self.metadata_roundtrip(meta)

def test_complete_metadata(self):
meta = OEMETADATA_V20_EXAMPLE

meta = deepcopy(OEMETADATA_V20_EXAMPLE)
self.metadata_roundtrip(meta)

def test_column_sync_preserves_annotations(self):
"""
Verify that the backend sync function correctly enforces physical DB constraints
while strictly preserving human-entered ontology annotations (isAbout, etc.).
"""
meta = deepcopy(OEMETADATA_V20_EXAMPLE)

# 1. Create custom column payload
custom_columns = [
{
"name": "id",
"type": "text", # Intentionally wrong (should sync to bigint...)
"nullable": True, # Intentionally wrong (id should sync to False)
"isAbout": [
{
"@id": "http://openenergy-platform.org/ontology/oeo/OEO_00000001", # noqa: E501
"name": "test identifier",
}
],
"valueReference": [
{"@id": "http://example.com/ref", "name": "ref", "value": "val"}
],
},
{
"name": "fake_ghost_column", # Does not exist in the physical test DB
"type": "varchar",
"isAbout": [
{
"@id": "http://openenergy-platform.org/ontology/oeo/OEO_00000002", # noqa: E501
"name": "ghost",
}
],
},
]

meta["resources"][0]["schema"]["fields"] = custom_columns

# 2. Perform the API POST request
self.api_req("post", path="meta/", data=meta)

# 3. Perform the API GET request to retrieve the synced metadata
synced_meta = self.api_req("get", path="meta/")
synced_fields = synced_meta["resources"][0]["schema"].get("fields", [])

# 4. Assertions
field_names = [f.get("name") for f in synced_fields]

# Assertion A: The 'fake_ghost_column' should have been wiped out by the sync
self.assertNotIn("fake_ghost_column", field_names)

# Assertion B: The 'id' column must exist
self.assertIn("id", field_names)

# Extract the reconciled 'id' column
id_field = next(f for f in synced_fields if f.get("name") == "id")

# Assertion C: Physical constraints must be strictly enforced
self.assertFalse(id_field.get("nullable")) # Enforced to False by backend sync
self.assertNotEqual(
id_field.get("type"), "text"
) # Enforced to match actual DB type (e.g. bigint)

# Assertion D: Human annotations must be perfectly preserved!
self.assertEqual(len(id_field.get("isAbout", [])), 1)
self.assertEqual(id_field["isAbout"][0]["name"], "test identifier")

self.assertEqual(len(id_field.get("valueReference", [])), 1)
self.assertEqual(id_field["valueReference"][0]["value"], "val")
12 changes: 10 additions & 2 deletions api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@
require_delete_permission,
require_write_permission,
stream,
sync_api_metadata_columns,
update_tags_from_keywords,
)
from api.parser import (
Expand Down Expand Up @@ -205,11 +206,15 @@ def post(self, request: Request, table: str) -> JsonLikeResponse:

if not error and metadata is not None:
metadata = try_convert_metadata_to_v2(metadata)

# Enforce database schema and clean artifacts
metadata = sync_api_metadata_columns(metadata, table_obj)

# Now validate the beautifully cleaned and synced metadata
metadata, error = try_validate_metadata(metadata)

if metadata is not None:
# update/sync keywords with tags before saving metadata
# TODO make this iter over all resources
keywords = metadata["resources"][0].get("keywords", []) or []
metadata["resources"][0]["keywords"] = update_tags_from_keywords(
table=table_obj.name, keywords=keywords
Expand All @@ -219,8 +224,11 @@ def post(self, request: Request, table: str) -> JsonLikeResponse:
metadata.pop("connection_id", None)
metadata.pop("cursor_id", None)

# Save the reconciled metadata to the database
set_table_metadata(table=table_obj.name, metadata=metadata)
return JsonResponse(raw_input)

# Return the cleaned metadata
return JsonResponse(metadata)
else:
raise APIError(error)

Expand Down
Loading
Loading