Source code for clld.web.app

"""Common functionality of clld Apps is cobbled together here."""
import re
import uuid
import pathlib
import datetime
import functools
import importlib
import collections
import urllib.parse

from sqlalchemy import engine_from_config
from sqlalchemy.orm import joinedload, undefer
from sqlalchemy.exc import NoResultFound

from webob.request import Request as WebobRequest
from zope.interface import implementer, implementedBy
from pyramid.httpexceptions import HTTPNotFound, HTTPMovedPermanently, HTTPGone
from pyramid import events
from pyramid.request import Request, reify
from pyramid.interfaces import IRoutesMapper
from pyramid.asset import abspath_from_asset_spec
from pyramid.renderers import JSON, JSONP
from pyramid.settings import asbool
from clldutils.path import md5, git_describe

import clld
from clld.config import get_config
from clld.db.meta import DBSession, Base
from clld.db.models import common
from clld import Resource, RESOURCES
from clld import interfaces
from clld.web.adapters import get_adapters
from clld.web.adapters import geojson, register_resource_adapters
from clld.web.adapters.base import adapter_factory
from clld.web.views import (
    index_view, resource_view, _raise, _ping, js, unapi, xpartial, redirect, gone,
    select_combination,
)
from clld.web.views.olac import olac, OlacConfig
from clld.web.views.sitemap import robots, sitemapindex, sitemap, resourcemap
from clld.web.subscribers import add_renderer_globals, add_localizer, init_map
from clld.web.datatables.base import DataTable
from clld.web import datatables
from clld.web.maps import Map, ParameterMap, LanguageMap, CombinationMap
from clld.web.icon import ICONS, MapMarker
from clld.web import assets

assert clld
assert assets


[docs]class ClldRequest(Request): """Custom Request class.""" @reify def query_params(self): """Convenient access to the query parameters of the current request. :return: dict of the query parameters of the request URL. """ return {k: v[0] for k, v in urllib.parse.parse_qs(urllib.parse.urlparse(self.url).query).items()} @property def db(self): """Convenient access to the db session. We make the db session available as request attribute, so we do not have to import it in templates. """ return DBSession @reify def dataset(self): """Convenient access to the Dataset object. Properties of the :py:class:`clld.db.models.common.Dataset` object an application serves are used in various places, so we want to have a reference to it. """ return self.db.query(common.Dataset).options(undefer('updated')).first() @property def contact_email_address(self): if 'clld.contact' in self.registry.settings: return self.registry.settings['clld.contact'] return self.dataset.contact # pragma: no cover
[docs] def get_datatable(self, name, model, **kw): """Convenient lookup and retrieval of initialized DataTable object. :param name: Name under which the datatable class was registered. :param model: model class to pass as initialization parameter to the datatable. :param kw: Keyword parameters are passed through to the initialization of the datatable. :return: :py:class:`clld.web.datatables.base.DataTable` instance, if a datatable was registered for ``name``. """ dt = self.registry.queryUtility(interfaces.IDataTable, name=name) if dt: return dt(self, model, **kw)
[docs] def get_map(self, name=None, **kw): """Convenient lookup and retrieval of initialized Map object. :param name: Name under which the map was registered. :return: :py:class:`clld.web.maps.Map` instance, if a map was registered else ``None``. """ if name is None and self.matched_route: name = self.matched_route.name if name: map_ = self.registry.queryUtility(interfaces.IMap, name=name) if map_: return map_(self.context, self, **kw)
def _route(self, obj, rsc, **kw): """Determine the name of the canonical route for a resource instance. The resource may be specified as object or as mapper class and id. :return: pair (route_name, kw) suitable as arguments for the Request.route_url method. """ if rsc is None: for _rsc in RESOURCES: if _rsc.interface.providedBy(obj): rsc = _rsc break assert rsc route = rsc.name if 'ext' in kw: route += '_alt' # if rsc is passed explicitely, we allow the object id to be passed in as obj, # to make it possible to create resource URLs without having the "real" object. kw.setdefault('id', getattr(obj, 'id', obj)) return route, kw
[docs] def ctx_for_url(self, url): """Method to reverse URL generation for resources. I.e. given a URL, tries to determine the associated resource. :return: model instance or ``None``. """ mapper = self.registry.getUtility(IRoutesMapper) _path = urllib.parse.urlparse(url).path info = mapper(WebobRequest({'PATH_INFO': _path})) if not info['route']: # FIXME: hack to cater to deployments under a path prefix info = mapper(WebobRequest({'PATH_INFO': re.sub(r'^/[a-z]+', '', _path)})) if info['route']: for rsc in RESOURCES: if rsc.name == info['route'].name: if rsc.name == 'dataset': return self.dataset if info['match']: return rsc.model.get(info['match']['id'], default=None)
[docs] def resource_url(self, obj, rsc=None, **kw): """Get the absolute URL for a resource. :param obj: A resource or the id of a resource; in the latter case ``rsc`` must be passed. :param rsc: A registered :py:class:`clld.Resource`. :param kw: Keyword parameters are passed through to `pyramid.request.Request.route_url <http://docs.pylonsproject.org/projects/\ pyramid/en/1.0-branch/api/request.html#pyramid.request.Request.route_url>`_ :return: URL """ route, kw = self._route(obj, rsc, **kw) return self.route_url(route, **kw)
[docs] def route_url(self, route, *args, **kw): """Facade for Request.route_url.""" if '__locale__' in self.params: if '_query' not in kw: kw['_query'] = {} kw['_query']['__locale__'] = self.params['__locale__'] return Request.route_url(self, route, *args, **kw)
[docs] def resource_path(self, obj, rsc=None, **kw): """Determine the path component of a Resource's URL.""" route, kw = self._route(obj, rsc, **kw) return self.route_path(route, **kw)
def file_ospath(self, file_): if 'clld.files' in self.registry.settings: return str(self.registry.settings['clld.files'].joinpath(file_.relpath)) def file_url(self, file_): if 'url' in file_.jsondata: # just a preparation for full support of non-local files return file_.jsondata['url'] # pragma: no cover if 'clld.files' in self.registry.settings: return self.static_url(self.file_ospath(file_))
def menu_item(route_name, ctx, req, label=None): """Factory function for a menu item specified by route name. :return: A pair (URL, label) to create a menu item. """ return req.route_url(route_name), label or req.translate(route_name.capitalize()) @implementer(interfaces.ICtxFactoryQuery) class CtxFactoryQuery(object): """Implements reasonable default queries to be used in context factories. By reasonable we mean providing good performance for typical data sizes. Applications with a-typical numbers of any resource class may have to implement a custom class for the ICtxFactoryQuery interface. Usually this will be a class derived from CtxFactoryQuery. """ def refined_query(self, query, model, req): """To be overridden. Derived classes may override this method to add model-specific query refinements of their own. """ return query def __call__(self, model, req): query = req.db.query(model).filter(model.id == req.matchdict['id']) custom_query = self.refined_query(query, model, req) if query == custom_query: # no customizations done, apply the defaults f = getattr(model, 'refine_factory_query', None) if f: query = f(query) else: if model == common.Contribution: query = query.options( joinedload( common.Contribution.valuesets ).joinedload( common.ValueSet.parameter ), joinedload( common.Contribution.valuesets ).joinedload( common.ValueSet.values ).joinedload( common.Value.domainelement ), joinedload( common.Contribution.references ).joinedload( common.ContributionReference.source ), joinedload( common.Contribution.data ) ) else: query = custom_query # pragma: no cover return query.one() def ctx_factory(model, type_, req): """Factory function for request contexts. The context of a request is either a single model instance or an instance of DataTable incorporating all information to retrieve an appropriately filtered list of model instances. """ def replacement(id_): raise HTTPMovedPermanently( location=req.route_url(model.__name__.lower(), id=id_)) if type_ == 'index': datatable = req.registry.getUtility( interfaces.IDataTable, name=req.matched_route.name) return datatable(req, model) try: if model == common.Dataset: ctx = req.db.query(model).one() elif model == common.Combination: ctx = common.Combination.get(req.matchdict['id']) else: ctx = req.registry.getUtility(interfaces.ICtxFactoryQuery)(model, req) if ctx.replacement_id: return replacement(ctx.replacement_id) ctx.metadata = get_adapters(interfaces.IMetadata, ctx, req) return ctx except NoResultFound: if req.matchdict.get('id'): replacement_id = common.Config.get_replacement_id(model, req.matchdict['id']) if replacement_id: if replacement_id == common.Config.gone: raise HTTPGone() return replacement(replacement_id) raise HTTPNotFound() def maybe_import(name, pkg_dir=None): exists = False if pkg_dir: rel_path = name.split('.')[1:] if '.' in name else [] rel_path.append('__init__.py') exists = pkg_dir.joinpath(*rel_path).exists() if not exists: rel_path.pop() if rel_path: rel_path[-1] += '.py' exists = pkg_dir.joinpath(*rel_path).exists() try: return importlib.import_module(name) except ImportError: if pkg_dir and exists: print('failed to import existing module {0}'.format(name)) raise # # configurator directives: # def register_utility(config, cls, interface, name='', overwrite=True): if overwrite or not config.registry.queryUtility(interface, name=name): config.registry.registerUtility(cls, provided=interface, name=name) def register_cls(interface, config, route, cls, overwrite=True): register_utility(config, cls, interface, name=route, overwrite=overwrite) if not route.endswith('_alt'): register_utility(config, cls, interface, name=route + '_alt', overwrite=overwrite) def register_adapter(config, cls, from_, to_=None, name=None): if isinstance(cls, dict): cls = adapter_factory(**cls) to_ = to_ or list(implementedBy(cls))[0] name = name or cls.mimetype config.registry.registerAdapter(cls, (from_,), to_, name=name) def register_adapters(config, specs): for interface, base, mimetype, extension, template, extra in specs: extra.update(base=base, mimetype=mimetype, extension=extension, template=template) config.register_adapter(extra, interface, name=mimetype) def register_menu(config, *items): """Register an item for the main menu. :param items: An item may be a (name, factory) pair, where factory is a callable that\ accepts the two parameters (ctx, req) and returns a pair (url, label) to use for the\ menu link; or a route name, or a pair (route name, dict), where dict is used as\ keyword arguments for menu_item. """ menuitems = collections.OrderedDict() for item in items: if isinstance(item, str): item = (item, {}) name, factory = item if isinstance(factory, dict): factory = functools.partial(menu_item, name, **factory) menuitems[name] = factory config.registry.registerUtility(menuitems, interfaces.IMenuItems) def add_route_and_view(config, route_name, route_pattern, view, **kw): """Add a route and a corresponding view and appropriate default routes and views. .. note:: To allow custom route patterns we look them up in a dict in settings. """ route_patterns = config.registry.settings.get('route_patterns', {}) route_pattern = route_patterns.get(route_name, route_pattern) alt_route_pattern = kw.pop('alt_route_pattern', route_pattern + '.{ext}') route_kw = {} factory = kw.pop('factory', None) if factory: route_kw['factory'] = factory config.add_route(route_name, route_pattern, **route_kw) config.add_view(view, route_name=route_name, **kw) config.add_route(route_name + '_alt', alt_route_pattern, **route_kw) config.add_view(view, route_name=route_name + '_alt', **kw) def register_resource_routes_and_views(config, rsc): kw = dict(factory=functools.partial(ctx_factory, rsc.model, 'rsc')) if rsc.model == common.Dataset: pattern = '/' kw['alt_route_pattern'] = '/void.{ext}' else: pattern = r'/%s/{id:[^/\.]+}' % rsc.plural config.add_route_and_view(rsc.name, pattern, resource_view, **kw) if rsc.with_index: config.add_route_and_view( rsc.plural, '/%s' % rsc.plural, index_view, factory=functools.partial(ctx_factory, rsc.model, 'index')) def register_resource(config, name, model, interface, with_index=False, **kw): """Directive to register custom resources. .. note:: The directive accepts arbitrary keyword arguments for backwards compatibility. """ # in case of tests, this method may be called multiple times! if [rsc for rsc in RESOURCES if rsc.name == name]: return rsc = Resource(name, model, interface, with_index=with_index) RESOURCES.append(rsc) config.register_resource_routes_and_views(rsc) if not config.registry.queryUtility(interfaces.IDataTable, name=rsc.plural): config.register_datatable( rsc.plural, getattr(datatables, rsc.plural.capitalize(), DataTable)) register_resource_adapters(config, rsc) def register_download(config, download): config.registry.registerUtility(download, interfaces.IDownload, name=download.name) StaticResource = collections.namedtuple('StaticResource', 'type asset_spec') def register_staticresource(config, type, asset_spec): config.registry.registerUtility( StaticResource(type, asset_spec), interfaces.IStaticResource, name=asset_spec) def add_settings_from_file(config, file_): if file_.exists(): cfg = get_config(file_) if 'mako.directories_list' in cfg: cfg['mako.directories'] = cfg['mako.directories_list'] # pragma: no cover config.add_settings(cfg) def _route_and_view(config, pattern, view, name=None): name = name or str(uuid.uuid4()) config.add_route(name, pattern) config.add_view(view, route_name=name) def add_301(config, pattern, location, name=None): _route_and_view( config, pattern, xpartial(redirect, HTTPMovedPermanently, location), name=name) def add_410(config, pattern, name=None): _route_and_view(config, pattern, gone, name=name) def add_page(config, name, pattern=None, view=None, template=None, views=None): views = views or maybe_import('%s.views' % config.root_package.__name__) config.add_route_and_view( name, pattern or '/' + name, view or getattr(views, name, lambda r: {}), renderer=template or name + '.mako') def includeme(config): """Upgrading: - register utilities "by hand", after config.include('clld.web.app') - add routes by hand (and remove these from the **kw passed to Configurator) :param config: :return: """ # # now we exploit the default package layout as created via the CLLD scaffold: # # note: the following exploits the import time side effect of modifying the webassets # environment! root_package = config.root_package.__name__ pkg_dir = pathlib.Path(config.root_package.__file__).parent.resolve() maybe_import('%s.assets' % root_package, pkg_dir=pkg_dir) json_renderer = JSON() json_renderer.add_adapter(datetime.datetime, lambda obj, req: obj.isoformat()) json_renderer.add_adapter(datetime.date, lambda obj, req: obj.isoformat()) config.add_renderer('json', json_renderer) jsonp_renderer = JSONP(param_name='callback') jsonp_renderer.add_adapter(datetime.datetime, lambda obj, req: obj.isoformat()) jsonp_renderer.add_adapter(datetime.date, lambda obj, req: obj.isoformat()) config.add_renderer('jsonp', jsonp_renderer) config.set_request_factory(ClldRequest) config.registry.registerUtility(CtxFactoryQuery(), interfaces.ICtxFactoryQuery) config.registry.registerUtility(OlacConfig(), interfaces.IOlacConfig) # initialize the db connection engine = engine_from_config(config.registry.settings, 'sqlalchemy.') DBSession.configure(bind=engine) Base.metadata.bind = engine try: git_tag = git_describe(pathlib.Path(pkg_dir).parent) except ValueError: # pragma: no cover git_tag = None config.add_settings({ 'pyramid.default_locale_name': 'en', 'clld.pkg': root_package, 'clld.git_tag': git_tag, 'clld.parameters': {}}) if 'clld.files' in config.registry.settings: # deployment-specific location of static data files abspath = pathlib.Path(config.registry.settings['clld.files']).resolve() config.add_settings({'clld.files': abspath}) config.add_static_view('files', str(abspath)) # event subscribers: config.add_subscriber(add_localizer, events.NewRequest) config.add_subscriber(init_map, events.ContextFound) config.add_subscriber( functools.partial( add_renderer_globals, maybe_import('%s.util' % root_package, pkg_dir=pkg_dir)), events.BeforeRender) # # make it easy to register custom functionality # for name, func in { 'register_utility': register_utility, 'register_datatable': functools.partial(register_cls, interfaces.IDataTable), 'register_map': functools.partial(register_cls, interfaces.IMap), 'register_menu': register_menu, 'register_resource': register_resource, 'register_adapter': register_adapter, 'register_adapters': register_adapters, 'register_download': register_download, 'register_staticresource': register_staticresource, 'add_route_and_view': add_route_and_view, 'add_settings_from_file': add_settings_from_file, 'add_301': add_301, 'add_410': add_410, 'add_page': add_page, 'register_resource_routes_and_views': register_resource_routes_and_views, }.items(): config.add_directive(name, func) # # routes and views # config.add_static_view('clld-static', 'clld:web/static') config.add_static_view('static', '%s:static' % root_package) config.add_route_and_view('_js', '/_js', js, http_cache=3600) # add some maintenance hatches config.add_route_and_view('_raise', '/_raise', _raise) config.add_route_and_view('_ping', '/_ping', _ping, renderer='json') # sitemap support: config.add_route_and_view('robots', '/robots.txt', robots) config.add_route_and_view('sitemapindex', '/sitemap.xml', sitemapindex) config.add_route_and_view('sitemap', '/sitemap.{rsc}.{n}.xml', sitemap) config.add_route('resourcemap', '/resourcemap.json') config.add_view(resourcemap, route_name='resourcemap', renderer='jsonp') config.add_route_and_view( 'select_combination', '/_select_combination', select_combination) config.add_route_and_view('unapi', '/unapi', unapi) config.add_route_and_view('olac', '/olac', olac) config.add_settings_from_file(pkg_dir.joinpath('appconf.ini')) if not config.registry.settings.get('mako.directories'): config.add_settings({'mako.directories': ['clld:web/templates']}) for rsc in RESOURCES: config.register_resource_routes_and_views(rsc) config.register_datatable( rsc.plural, getattr(datatables, rsc.plural.capitalize(), DataTable)) register_resource_adapters(config, rsc) # maps config.register_map('languages', Map) config.register_map('language', LanguageMap) config.register_map('parameter', ParameterMap) config.register_map('combination', CombinationMap) config.include('clld.web.adapters') for icon in ICONS: config.registry.registerUtility(icon, interfaces.IIcon, name=icon.name) config.registry.registerUtility(MapMarker(), interfaces.IMapMarker) # # inspect default locations for views and templates: # home_comp = collections.OrderedDict() for name, template in [ ('introduction', False), ('about', False), ('terms', False), ('glossary', False), ('history', False), ('changes', False), ('credits', False), ('legal', True), ('download', True), ('contact', True), ('help', False), ]: home_comp[name] = template if pkg_dir.joinpath('templates').exists(): for p in pkg_dir.joinpath('templates').iterdir(): if p.stem in home_comp and p.suffix == '.mako': home_comp[p.stem] = True for name, template in home_comp.items(): if template: config.add_page(name) config.add_settings({'home_comp': [k for k in home_comp.keys() if home_comp[k]]}) if 'clld.favicon' not in config.registry.settings: favicon = {'clld.favicon': 'clld:web/static/images/favicon.ico'} # hard to test (in particular on travis) and without too much consequence # (and the consequences faced are easy to spot). if pkg_dir.joinpath('static', 'favicon.ico').exists(): # pragma: no cover favicon['clld.favicon'] = root_package + ':static/favicon.ico' config.add_settings(favicon) config.add_settings({ 'clld.favicon_hash': md5(abspath_from_asset_spec( config.registry.settings['clld.favicon']))}) translation_dirs = ['clld:locale'] if pkg_dir.joinpath('locale').exists(): translation_dirs.append('%s:locale' % root_package) # pragma: no cover config.add_translation_dirs(*translation_dirs) if pkg_dir.joinpath('static/publisher_logo.png').exists(): # pragma: no cover config.add_settings( {'clld.publisher_logo': '%s:static/publisher_logo.png' % root_package}) if asbool(config.registry.settings.get('clld.pacific_centered_maps')): geojson.pacific_centered() v = maybe_import('%s.views' % root_package, pkg_dir=pkg_dir) if v: config.scan(v) # pragma: no cover menuitems = config.registry.settings.get( 'clld.menuitems_list', ['contributions', 'parameters', 'languages', 'contributors']) config.register_menu( ('dataset', lambda ctx, req: (req.resource_url(req.dataset), req.translate('Home'))), *menuitems) config.include('pyramid_mako') for name in ['adapters', 'datatables', 'maps']: mod = maybe_import('%s.%s' % (root_package, name), pkg_dir=pkg_dir) if mod and hasattr(mod, 'includeme'): config.include(mod)