import io
import logging
from typing import Dict
from typing import List
from typing import Optional
from typing import Set
from typing import TextIO
from typing import Union
from urllib.parse import urlparse
from clarin.sru.exception import SRUConfigException
from lxml import etree
from clarin.sru.fcs.constants import ED_NS
from clarin.sru.fcs.constants import ED_PREFIX
from clarin.sru.fcs.constants import LANG_EN
from clarin.sru.fcs.constants import LAYER_TYPE_EXTENSION_PREFIX
from clarin.sru.fcs.constants import RI_NS_LEGACY
from clarin.sru.fcs.constants import XML_NS_URI
from clarin.sru.fcs.constants import Capabilities
from clarin.sru.fcs.constants import FCSDataViewNamespaces
from clarin.sru.fcs.constants import FCSLayerType
from clarin.sru.fcs.server.search import DataView
from clarin.sru.fcs.server.search import EndpointDescription
from clarin.sru.fcs.server.search import Layer
from clarin.sru.fcs.server.search import ResourceInfo
from clarin.sru.fcs.server.search import SimpleEndpointDescription
# ---------------------------------------------------------------------------
LOGGER = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
[docs]class SimpleEndpointDescriptionParser:
"""A parser, that parses an XML file and produces a endpoint
description with static list of resource info records. The XML
file has the same format as the result format defined for
endpoint description of the CLARIN-FCS specification. The `parse`
returns a `SimpleEndpointDescription` instance."""
[docs] @staticmethod
def parse(url: Union[str, TextIO]) -> EndpointDescription:
"""Parse an XML file and return a static list of resource
info records.
Args:
url: the URI pointing to the file to be parsed
Returns:
EndpointDescription: an `EndpointDescription` instance
Raises:
SRUConfigException: if an error occurred
TypeError: if url is None
"""
if url is None:
raise TypeError("url is None")
LOGGER.debug("Parsing endpoint description from: %s", url)
try:
parser = etree.XMLParser(
ns_clean=False, remove_comments=True, strip_cdata=True
)
ed_doc: etree._ElementTree = etree.parse(url, parser)
# TODO: validate with schema? (like fcs-sru-server config?)
# Detect for deprecated resource-info catalog files and bail, if necessary
# url.name if isinstance(url, io.TextIOWrapper) else url ?
SimpleEndpointDescriptionParser._check_legacy_mode(ed_doc, url)
# Parse on and create endpoint description ...
return SimpleEndpointDescriptionParser._parse_EndpointDescription(ed_doc)
except etree.XPathEvalError as ex:
raise SRUConfigException("internal error") from ex
except etree.XMLSyntaxError as ex:
raise SRUConfigException("parsing error") from ex
except SRUConfigException:
raise
except OSError as ex:
raise SRUConfigException("error reading file") from ex
except Exception as ex:
raise SRUConfigException("internal error") from ex
# lxml ParserConfigurationException ?
# ----------------------------------------------------
@staticmethod
def _parse_EndpointDescription(doc: etree._ElementTree) -> EndpointDescription:
version = SimpleEndpointDescriptionParser._parse_version(doc)
LOGGER.debug("Endpoint description version is %s", version)
capabilities = SimpleEndpointDescriptionParser._parse_Capabilities(doc)
LOGGER.debug("CAPS: %s", capabilities)
SimpleEndpointDescriptionParser._check_Capabilities(capabilities, version)
xml_ids: Set[str] = set() # used to check for uniqueness of id attribute
supported_DataViews = SimpleEndpointDescriptionParser._parse_DataViews(
doc, xml_ids
)
LOGGER.debug("DV: %s", supported_DataViews)
SimpleEndpointDescriptionParser._check_DataViews(
supported_DataViews, capabilities
)
supported_Layers = SimpleEndpointDescriptionParser._parse_Layers(doc, xml_ids)
LOGGER.debug("L: %s", supported_Layers)
SimpleEndpointDescriptionParser._check_Layers(supported_Layers, capabilities)
resources = SimpleEndpointDescriptionParser._parse_ResourceInfos(
doc, supported_DataViews, supported_Layers, version
)
if not resources:
raise SRUConfigException(
"No resources where defined in endpoint description"
)
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug("Dumping ResourceInfo:")
SimpleEndpointDescriptionParser._dump_ResourceInfo(resources, 1)
return SimpleEndpointDescription(
version,
capabilities,
supported_DataViews,
supported_Layers,
resources,
False,
)
# ----------------------------------------------------
@staticmethod
def _parse_version(doc: etree._ElementTree) -> int:
version = -1
nodes = doc.xpath(
"//ed:EndpointDescription/@version", namespaces={ED_PREFIX: ED_NS}
)
if nodes:
try:
version = int(nodes[0])
if version not in (1, 2):
raise SRUConfigException(
"Attribute @version element <EndpointDescription> must have a value of either '1' or '2'"
)
except ValueError as ex:
raise SRUConfigException("Cannot parse version number") from ex
if version == -1:
raise SRUConfigException(
"Attribute @version missing on element <EndpointDescription>"
)
return version
@staticmethod
def _parse_Capabilities(doc: etree._ElementTree) -> List[str]:
capabilities: List[str] = list()
nodes = doc.xpath(
"//ed:Capabilities/ed:Capability", namespaces={ED_PREFIX: ED_NS}
)
if nodes:
LOGGER.debug("Parsing capabilities")
for node in nodes:
uri = node.text.strip()
try:
urlparse(uri)
except Exception:
raise SRUConfigException(
f"capability is not encoded as a proper URI: {uri}"
)
if uri not in capabilities:
capabilities.append(uri)
else:
LOGGER.warning("Ignoring duplicate capability entry for '%s'.", uri)
else:
LOGGER.warning("No capabilities where defined in endpoint configuration.")
return capabilities
@staticmethod
def _check_Capabilities(capabilities: List[str], version: int):
if Capabilities.BASIC_SEARCH not in capabilities:
LOGGER.warning(
"Capability '%s' was not defined in endpoint description;"
" it was added to meet the specification. Please update your"
" endpoint description!",
Capabilities.BASIC_SEARCH,
)
capabilities.append(Capabilities.BASIC_SEARCH)
if Capabilities.ADVANCED_SEARCH in capabilities and version < 2:
LOGGER.warning(
"Endpoint description is declared as version FCS 1.0 (@version = 1),"
" but contains support for Advanced Search in capabilities list!"
" FCS 1.0 only supports Basic Search."
)
@staticmethod
def _parse_DataViews(doc: etree._ElementTree, xml_ids: Set[str]) -> List[DataView]:
nodes = doc.xpath(
"//ed:SupportedDataViews/ed:SupportedDataView",
namespaces={ED_PREFIX: ED_NS},
)
if not nodes:
LOGGER.error(
"Endpoint configuration contains no valid information about supported data views"
)
raise SRUConfigException(
"Endpoint configuration contains no valid information about supported data views"
)
dataviews: List[DataView] = list()
LOGGER.debug("Parsing supported data views")
for node in nodes:
id = SimpleEndpointDescriptionParser._get_attribute(node, "id")
if id is None:
raise SRUConfigException(
"Element <SupportedDataView> must have a proper 'id' attribute"
)
if id in xml_ids:
raise SRUConfigException(
f"The value of attribute 'id' of element <SupportedDataView> must be unique: {id}"
)
xml_ids.add(id)
# TODO: refactor as constants?
pval = SimpleEndpointDescriptionParser._get_attribute(
node, "delivery-policy"
)
if pval is None:
raise SRUConfigException(
"Element <SupportedDataView> must have a 'delivery-policy' attribute"
)
policy: DataView.DeliveryPolicy
if DataView.DeliveryPolicy.SEND_BY_DEFAULT == pval:
policy = DataView.DeliveryPolicy.SEND_BY_DEFAULT
elif DataView.DeliveryPolicy.NEED_TO_REQUEST == pval:
policy = DataView.DeliveryPolicy.NEED_TO_REQUEST
else:
raise SRUConfigException(
f"Invalid value '{pval}' for attribute 'delivery-policy' on element <SupportedDataView>"
)
mimetype: Optional[str] = node.text
if mimetype:
mimetype = mimetype.strip()
if not mimetype:
mimetype = None
if not mimetype:
raise SRUConfigException(
"Element <SupportedDataView> must contain a MIME-type as content"
)
# check for duplicate entries ...
for dataview in dataviews:
if id == dataview.identifier:
raise SRUConfigException(
f"A <SupportedDataView> with the id '{id}' is already defined!"
)
if mimetype == dataview.mimetype:
raise SRUConfigException(
f"A <SupportedDataView> with the MIME-type '{mimetype}' is already defined!"
)
dataviews.append(
DataView(identifier=id, mimetype=mimetype, deliveryPolicy=policy)
)
return dataviews
@staticmethod
def _check_DataViews(dataviews: List[DataView], capabilities: List[str]):
# sanity check on data views
has_HITS_view = has_ADV_view = False
for dataview in dataviews:
if dataview.mimetype == FCSDataViewNamespaces.HITS.mimetype:
has_HITS_view = True
elif dataview.mimetype == FCSDataViewNamespaces.ADV.mimetype:
has_ADV_view = True
if not has_HITS_view:
raise SRUConfigException(
f"Generic Hits Data View ({FCSDataViewNamespaces.HITS.mimetype})"
" was not declared in <SupportedDataViews>"
)
if Capabilities.ADVANCED_SEARCH in capabilities and not has_ADV_view:
raise SRUConfigException(
"Endpoint claimes to support Advanced FCS but does not declare"
f" Advanced Data View ({FCSDataViewNamespaces.ADV.mimetype})"
"in <SupportedDataViews>"
)
@staticmethod
def _parse_Layers(doc: etree._ElementTree, xml_ids: Set[str]) -> List[Layer]:
layers: List[Layer] = list()
nodes = doc.xpath(
"//ed:SupportedLayers/ed:SupportedLayer",
namespaces={ED_PREFIX: ED_NS},
)
if nodes:
LOGGER.debug("Parsing supported layers")
for node in nodes:
id = SimpleEndpointDescriptionParser._get_attribute(node, "id")
if id is None:
raise SRUConfigException(
"Element <SupportedLayer> must have a proper 'id' attribute"
)
if id in xml_ids:
raise SRUConfigException(
f"The value of attribute 'id' of element <SupportedLayer> must be unique: {id}"
)
xml_ids.add(id)
# TODO: refactor as constants?
result_id = SimpleEndpointDescriptionParser._get_attribute(
node, "result-id"
)
if result_id is None:
raise SRUConfigException(
"Element <SupportedLayer> must have a 'result-id' attribute"
)
try:
urlparse(result_id)
except Exception:
raise SRUConfigException(
"Attribute 'result-id' on Element <SupportedLayer>"
f" is not encoded as proper URI: {result_id}"
)
rtype = SimpleEndpointDescriptionParser._clean_str(node.text)
if not rtype:
raise SRUConfigException(
"Element <SupportedLayer> does not define a proper layer type"
)
# sanity check on layer types
if rtype not in [
lt.value for lt in FCSLayerType
] and not rtype.startswith(LAYER_TYPE_EXTENSION_PREFIX):
LOGGER.warning(
"layer type '%s' is not defined by specification", rtype
)
qualifier = SimpleEndpointDescriptionParser._get_attribute(
node, "qualifier"
)
encoding = Layer.ContentEncoding.VALUE
eval = SimpleEndpointDescriptionParser._get_attribute(node, "type")
if eval:
if Layer.ContentEncoding.VALUE == eval:
encoding = Layer.ContentEncoding.VALUE
elif Layer.ContentEncoding.EMPTY == eval:
encoding = Layer.ContentEncoding.EMPTY
else:
raise SRUConfigException(f"Invalid layer encoding: {eval}")
alt_value_info = SimpleEndpointDescriptionParser._get_attribute(
node, "alt-value-info"
)
alt_value_info_uri: Optional[str] = None
if alt_value_info:
alt_value_info_uri = SimpleEndpointDescriptionParser._get_attribute(
node, "alt-value-info-uri"
)
try:
urlparse(alt_value_info_uri)
except Exception:
raise SRUConfigException(
"Attribute 'alt-value-info-uri' on Element <SupportedLayer>"
f" is not encoded as proper URI: {alt_value_info_uri}"
)
layers.append(
Layer(
id=id,
result_id=result_id,
type=rtype,
encoding=encoding,
qualifier=qualifier,
alt_ValueInfo=alt_value_info,
alt_ValueInfo_url=alt_value_info_uri,
)
)
return layers
@staticmethod
def _check_Layers(layers: List[Layer], capabilities: List[str]):
if layers and Capabilities.ADVANCED_SEARCH not in capabilities:
LOGGER.warning(
"Endpoint description has <SupportedLayer> but does not indicate"
" support for Advanced Search. Please consider adding capability"
" (%s) to your endpoint description to make use of layers!",
Capabilities.ADVANCED_SEARCH,
)
@staticmethod
def _parse_ResourceInfos(
doc: etree._ElementTree,
supported_DataViews: List[DataView],
supported_Layers: List[Layer],
version: int,
) -> List[ResourceInfo]:
pids: Set[str] = set()
has_ADV_view = any(
dataview.mimetype == FCSDataViewNamespaces.ADV.mimetype
for dataview in supported_DataViews
)
def _parse_resources(nodes) -> List[ResourceInfo]:
if not nodes:
return list()
resources: List[ResourceInfo] = list()
for node in nodes:
titles: Dict[str, str] = dict()
descrs: Dict[str, str] = dict()
link: Optional[str] = None
langs: List[str] = list()
availDataViews: List[DataView] = list()
availLayers: List[Layer] = list()
sub: List[ResourceInfo] = list()
pid = SimpleEndpointDescriptionParser._get_attribute(node, "pid")
if pid is None:
raise SRUConfigException(
"Element <ResourceInfo> must have a proper 'pid' attribute"
)
if pid in pids:
raise SRUConfigException(
f"Another element <Resource> with pid '{pid}' already exists"
)
pids.add(pid)
LOGGER.debug("Processing resource with pid '%s'", pid)
for tnode in node.xpath("ed:Title", namespaces={ED_PREFIX: ED_NS}):
lang = SimpleEndpointDescriptionParser._get_lang_attribute(tnode)
if not lang:
raise SRUConfigException(
"Element <Title> must have a proper 'xml:lang' attribute"
)
title = SimpleEndpointDescriptionParser._clean_str(tnode.text)
if not title:
# NOTE: in java code confusing error message
raise SRUConfigException(
"Element <Title> must not be non-empty"
)
if lang in titles:
LOGGER.warning("Title with language '%s' already exists", lang)
else:
LOGGER.debug("title: '%s' '%s'", lang, title)
titles[lang] = title
if titles and LANG_EN not in titles:
raise SRUConfigException(
"A <Title> with language 'en' is mandatory"
)
for dnode in node.xpath(
"ed:Description", namespaces={ED_PREFIX: ED_NS}
):
lang = SimpleEndpointDescriptionParser._get_lang_attribute(dnode)
if not lang:
raise SRUConfigException(
"Element <Description> must have a proper 'xml:lang' attribute"
)
descr = SimpleEndpointDescriptionParser._clean_str(dnode.text)
if lang in descrs:
LOGGER.warning("Title with language '%s' already exists", lang)
else:
LOGGER.debug("description: '%s' '%s'", lang, descr)
# NOTE: skip if None? - java impl would allow nulls
if not descr:
LOGGER.debug("Skip empty description for lang '%s'", lang)
else:
descrs[lang] = descr
if descrs and LANG_EN not in descrs:
raise SRUConfigException(
"A <Description> with language 'en' is mandatory"
)
for lnode in node.xpath(
"ed:LandingPageURI", namespaces={ED_PREFIX: ED_NS}
):
# TODO: only keep last one? (in java impl)
link = SimpleEndpointDescriptionParser._clean_str(lnode.text)
for lnode in node.xpath(
"ed:Languages/ed:Language", namespaces={ED_PREFIX: ED_NS}
):
val = lnode.text
if val:
val = val.strip()
if not val:
val = None
# enforce three letter codes
if val and len(val) != 3:
raise SRUConfigException(
"Element <Language> must use ISO-639-3 three letter language codes"
)
langs.append(val)
dvnodes = node.xpath(
"ed:AvailableDataViews", namespaces={ED_PREFIX: ED_NS}
)
if not dvnodes:
raise SRUConfigException("Missing element <AvailableDataViews>")
dvnode = dvnodes[0]
ref = SimpleEndpointDescriptionParser._get_attribute(dvnode, "ref")
if not ref:
raise SRUConfigException(
"Element <AvailableDataViews> must have a 'ref' attribute"
)
refs = ref.split()
if not refs:
raise SRUConfigException(
"Attribute 'ref' on element <AvailableDataViews> must contain"
" a whitespace seperated list of data view references"
)
for ref in refs:
for dataview in supported_DataViews:
if ref == dataview.identifier:
availDataViews.append(dataview)
break
else:
raise SRUConfigException(
f"A data view with identifier '{ref}' was not"
" defined in <SupportedDataViews>"
)
if not availDataViews:
raise SRUConfigException(
f"No available data views were defined for resource"
f" with PID '{pid}'"
)
lnodes = node.xpath("ed:AvailableLayers", namespaces={ED_PREFIX: ED_NS})
if lnodes:
lnode = lnodes[0]
ref = SimpleEndpointDescriptionParser._get_attribute(lnode, "ref")
if not ref:
raise SRUConfigException(
"Element <AvailableLayers> must have a 'ref' attribute"
)
refs = ref.split()
if not refs:
# NOTE: copy-paste error in java
raise SRUConfigException(
"Attribute 'ref' on element <AvailableLayers> must contain"
" a whitespace seperated list of layer references"
)
for ref in refs:
for layer in supported_Layers:
if ref == layer.id:
availLayers.append(layer)
break
else:
raise SRUConfigException(
f"A layer with identifier '{ref}' was not"
" defined in <SupportedLayers>"
)
else:
if has_ADV_view:
LOGGER.debug("No <SupportedLayers> for resource '%s'", pid)
rnodes = node.xpath(
"ed:Resources/ed:Resource", namespaces={ED_PREFIX: ED_NS}
)
sub = _parse_resources(rnodes)
# TODO: None if empty?
# NOTE: version check in java faulty?
if availLayers and version <= 1:
LOGGER.warning(
"Endpoint claims to support FCS 1.0, but includes information"
" about <AvailableLayers> for resource with pid '%s'",
pid,
)
resources.append(
ResourceInfo(
pid=pid,
title=titles,
description=descrs,
landing_page_uri=link,
languages=langs,
available_DataViews=availDataViews,
available_Layers=availLayers,
sub_Resources=sub,
)
)
return resources
nodes = doc.xpath(
"//ed:EndpointDescription/ed:Resources/ed:Resource",
namespaces={ED_PREFIX: ED_NS},
)
resources: List[ResourceInfo] = _parse_resources(nodes)
return resources
# ----------------------------------------------------
@staticmethod
def _get_attribute(el: etree._Element, localname: str) -> Optional[str]:
val: Optional[str] = el.get(localname)
if val:
val = val.strip()
if not val.isspace():
return val
return None
@staticmethod
def _get_lang_attribute(el: etree._Element) -> Optional[str]:
name = etree.QName(XML_NS_URI, "lang")
return SimpleEndpointDescriptionParser._get_attribute(el, name)
@staticmethod
def _clean_str(val: str) -> Optional[str]:
if val:
val = val.strip()
val = " ".join(val.split())
if val:
return val
return None
@staticmethod
def _dump_ResourceInfo(ris: List[ResourceInfo], depth: int):
pfx = "--" * depth
for ri in ris:
sris = ri.sub_Resources
LOGGER.debug("%s %s (level=%s)", pfx, ri.pid, depth)
if sris:
SimpleEndpointDescriptionParser._dump_ResourceInfo(sris, depth + 1)
@staticmethod
def _check_legacy_mode(doc: etree._ElementTree, url: Union[str, TextIO]):
try:
if isinstance(url, io.TextIOWrapper) and url.name:
url = url.name
except Exception:
pass
root: etree.Element = doc.getroot()
if root is None:
raise SRUConfigException("Error retrieving root element")
ns = root.xpath("namespace-uri()")
if not ns:
raise SRUConfigException(
f"No namespace URI was detected for resource info catalog file '{url}'!"
)
if ns == RI_NS_LEGACY:
LOGGER.error(
f"Detected out-dated resource info catalog file '{url}'."
"Please update to the current version."
)
raise SRUConfigException(f"unsupport file format: {ns}")
if ns != ED_NS:
LOGGER.error(
f"Detected unsupported resource info catalog file '{url}'"
f" with namespace '{ns}'."
)
raise SRUConfigException(f"unsupport file format: {ns}")
# ---------------------------------------------------------------------------