Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
265 changes: 263 additions & 2 deletions pygeometa/schemas/dcat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,86 @@
#
# =================================================================

import os
from typing import Union
import os, sys, yaml, json
from typing import Dict, Any, List, Optional, Union
from rdflib import Graph, URIRef, Namespace, Literal
from rdflib.namespace import RDF

from pygeometa.helpers import json_dumps
from pygeometa.schemas.base import BaseOutputSchema


# Namespaces
DCT = Namespace('http://purl.org/dc/terms/')
DCAT = Namespace('http://www.w3.org/ns/dcat#')
SKOS = Namespace('http://www.w3.org/2004/02/skos/core#')
PROV = Namespace('http://www.w3.org/ns/prov#')
FOAF = Namespace('http://xmlns.com/foaf/0.1/')
ADMS = Namespace('http://www.w3.org/ns/adms#')
LOCN = Namespace('http://www.w3.org/ns/locn#')
VCARD = Namespace('http://www.w3.org/2006/vcard/ns#')
OWL = Namespace('http://www.w3.org/2002/07/owl#')
RDF = Namespace('http://www.w3.org/1999/02/22-rdf-syntax-ns#')
RDFS = Namespace('http://www.w3.org/2000/01/rdf-schema#')
SCHEMA = Namespace('http://schema.org/')
TIME = Namespace('http://www.w3.org/2006/time')
VCARD = Namespace('http://www.w3.org/2006/vcard/ns#')
XSD = Namespace('http://www.w3.org/2001/XMLSchema#')

# Default DCAT-AP 3.1 context URL for missing JSON-LD contexts
_DCAT_AP_CONTEXT_URL = "https://github.com/SEMICeu/DCAT-AP/raw/refs/heads/master/releases/3.0.1/context/dcat-ap.jsonld"

# default mapping aligning common DCAT/DCT terms to MCF paths
DEFAULT_MAPPING = {
'dct:title': 'identification.title',
'dct:description': 'identification.abstract',
'dct:abstract': 'identification.abstract',
'dct:subject': 'identification.subjects',
'dct:temporal': 'identification.temporal',
'dct:spatial': 'identification.geographic',
'dct:license': 'identification.licence',
'dcat:keyword': 'identification.subjects',
'dct:language': 'metadata.language',
'dct:modified': 'identification.modified',
'dct:source': 'identification.source',
'dct:accessRights': 'identification.rights',
'dct:conformsTo': 'identification.conformsto',
'dcat:contactPoint': 'identification.contactpoint',
'dcat:endpointUrl': 'identification.endpointurl',
'dct:format': 'identification.format',
'dcat:landingPage': 'identification.landingpage',
'dct:publisher': 'identification.publisher',
'dct:creator': 'identification.creator',
'dcat:distribution': 'identification.distribution',
'dct:accrualPeriodicity': 'identification.accrualPeriodicity',
'dcat:hasVersion': 'identification.hasVersion',
'dct:identifier': 'metadata.identifier',
'dcat:inSeries': 'identification.inSeries',
'dct:isReferencedBy': 'identification.isReferencedBy',
'dct:provenance': 'identification.provenance',
'dct:relation': 'identification.relation',
'dct:issued': 'identification.issued',
'adms:sample': 'identification.sample',
'dcat:spatialResolutionInMeters': 'identification.spatialResolutionInMeters',
'dcat:temporalResolution': 'identification.temporalResolution',
'dcat:theme': 'identification.subjects',
'dct:type': 'metadata.hierarchylevel',
'adms:versionNotes': 'identification.versionnotes',
'prov:wasGeneratedBy': 'identification.wasgeneratedby'
}

INTL_MCF_FIELDS = ["identification.abstract,identification.title"]

# Parser formats to try in order
_PARSER_FORMATS = [
'json-ld',
'xml',
'turtle',
'n3',
'trig',
]


THISDIR = os.path.dirname(os.path.realpath(__file__))


Expand All @@ -65,6 +139,193 @@ def __init__(self):
description = 'DCAT'
super().__init__('dcat', description, 'json', THISDIR)


def _inject_jsonld_context(self, content: str) -> str:
"""
Inject DCAT-AP context into JSON content if missing.


Returns modified JSON string if '@context' not found.
If parsing fails, returns original content.
"""
try:
data = json.loads(content)
if isinstance(data, dict) and '@context' not in data:
data['@context'] = _DCAT_AP_CONTEXT_URL
return json.dumps(data)
except Exception:
pass
return content

def parse_dcat_content(self, content: str, base: Optional[str] = None) -> Graph:
"""
Parse content into an rdflib.Graph, trying a set of common RDF serialisations.

Raises ValueError if none of the attempted formats succeed.
"""
last_exc = None

# Try to detect if JSON-LD and may need context injection
try:
sample = content.strip()[:100].lstrip()
if sample.startswith('{') or sample.startswith('['):
content = self._inject_jsonld_context(content)
except Exception:
pass

for fmt in _PARSER_FORMATS:
try:
g = Graph()
g.parse(data=content, format=fmt, publicID=base)
return g
except Exception as exc:
last_exc = exc
raise ValueError(f"Unable to parse content as a known RDF serialisation. Last error: {last_exc}")


def _to_uriref(self, key: str) -> URIRef:
"""Convert a mapping key into a URIRef.

Accepts full URIs or qnames like 'dct:title' or 'dcat:keyword'.
Unknown prefixes default to the DCT namespace.
"""
if key.startswith('http://') or key.startswith('https://'):
return URIRef(key)
if ':' in key:
prefix, local = key.split(':', 1)
ns = {
'dct': DCT,
'dcat': DCAT,
'skos': SKOS,
'prov': PROV,
'foaf': FOAF,
'adms': ADMS,
'locn': LOCN,
'vcard': VCARD,
'schema': SCHEMA
}.get(prefix)
if ns is not None:
return ns[local]
return DCT[key]


def _collect_literals_by_lang(self, values: List[Any], deflang='eng') -> Dict[str, List[str]]:
"""
Given a list of rdflib nodes, return dict lang -> list(strings).

Literals without language tags go under the empty-string key ''.
Non-literals (URIRefs) are converted to their string representation
and put under ''.
"""
out: Dict[str, List[str]] = {}
for v in values:
if isinstance(v, Literal):
s = str(v)
lang = v.language or deflang
else:
s = str(v)
lang = deflang
if s not in (None,''):
out.setdefault(lang, []).append(s)
return out


def _join_lang_values(self, values: List[str]) -> str:
"""Join multiple values for the same language into a single scalar.

The join token is ' | '. This keeps values readable while producing a
single scalar as required by the MCF core schema.
"""
if not values:
return ''
if len(values) == 1:
return values[0]
return ' | '.join(values)


def build_mcf_dict(self, g: Graph, mapping: Dict[str, str], dataset_uri: Optional[str] = None) -> Dict[str, Any]:
"""
Build an MCF-compatible nested dict from the provided graph.

:param g: rdflib.Graph containing DCAT metadata
:param mapping: dict mapping source DCAT/DCT property (qname or URI) to
a dot-separated MCF path (e.g. 'identification.title')
:param dataset_uri: optional URI to focus extraction on a single dataset
:returns: nested dict suitable for YAML serialization according to pygeometa's MCF
"""
# Identify dataset node
dataset_node = None
if dataset_uri:
dataset_node = URIRef(dataset_uri)
else:
for s, p, o in g.triples((None, RDF.type, DCAT['Dataset'])):
dataset_node = s
break
if dataset_node is None:
# fallback to first subject found in the graph
for s, p, o in g.triples((None, None, None)):
dataset_node = s
break

if dataset_node is None:
raise ValueError('No dataset node found in the provided graph')

mcf: Dict[str, Any] = {}

for src_prop, tgt_path in mapping.items():
prop_ref = self._to_uriref(src_prop)
values = [o for o in g.objects(subject=dataset_node, predicate=prop_ref)]
if not values or len(values) == 0:
continue

if tgt_path in INTL_MCF_FIELDS:
lang_map = self._collect_literals_by_lang(values)
# Convert lists to single scalar per language according to MCF core schema
scalar_lang_map: Dict[str, str] = {}
for lang, vals in lang_map.items():
scalar_lang_map[(lang or 'eng')] = self._join_lang_values(vals)
else:
scalar_lang_map = g.qname(values[0]) if isinstance(values[0], URIRef) else values[0]

# Insert into nested mcf by splitting tgt_path
parts = tgt_path.split('.')
cur = mcf
for part in parts[:-1]:
cur = cur.setdefault(part, {})
final_key = parts[-1]

existing = cur.get(final_key)
if existing is None or tgt_path not in INTL_MCF_FIELDS:
# set the language-keyed scalar mapping
cur[final_key] = scalar_lang_map
else:
# merge: preserve existing languages and overwrite/append others
for lang, val in scalar_lang_map.items():
if lang in existing and existing[lang]:
# if an existing value is present, join with the new one
existing[lang] = existing[lang] + ' | ' + val
else:
existing[lang] = val
cur[final_key] = existing

return mcf


def import_(self, metadata: str) -> dict:
"""
Import metadata into MCF

:param metadata: string of metadata content

:returns: `dict` of MCF content
"""

# Either xml or jsonld

g = self.parse_dcat_content(metadata)
return self.build_mcf_dict(g, DEFAULT_MAPPING, dataset_uri=None)


def write(self, mcf: dict, stringify: str = True) -> Union[dict, str]:
"""
Write MCF to DCAT
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ jsonschema
lxml
OWSLib
pyyaml
rdflib
Loading
Loading