Commit 397f982b authored by David Read's avatar David Read
Browse files

Merge commit '5492858f' into fix-travis

parents 14ba731c 5492858f
......@@ -7,6 +7,7 @@ from sqlalchemy import types
from sqlalchemy.ext.declarative import declarative_base
import ckan.model as model
from ckan.lib import dictization
log = __import__('logging').getLogger(__name__)
......@@ -74,6 +75,10 @@ class Status:
def is_ok(cls, status_id):
return status_id == 0
broken_enum = {True: 'Broken',
None: 'Not sure if broken',
False: 'Downloaded OK'}
class Archival(Base):
"""
......@@ -109,14 +114,12 @@ class Archival(Base):
updated = Column(types.DateTime)
def __repr__(self):
broken_or_not = {True: 'Broken', None: 'Not sure if broken',
False: 'Downloaded OK'}
broken_details = '' if not self.is_broken else \
('%d failures' % self.failure_count)
package = model.Package.get(self.package_id)
package_name = package.name if package else '?%s?' % self.package_id
return '<Archival %s /dataset/%s/resource/%s %s>' % \
(broken_or_not[self.is_broken], package_name, self.resource_id,
(broken_enum[self.is_broken], package_name, self.resource_id,
broken_details)
@classmethod
......@@ -159,6 +162,43 @@ class Archival(Base):
return None
return Status.by_id(self.status_id)
def as_dict(self):
context = {'model': model}
archival_dict = dictization.table_dictize(self, context)
archival_dict['status'] = self.status
archival_dict['is_broken_printable'] = broken_enum[self.is_broken]
return archival_dict
def aggregate_archivals_for_a_dataset(archivals):
'''Returns aggregated archival info for a dataset, given the archivals for
its resources (returned by get_for_package).
:param archivals: A list of the archivals for a dataset's resources
:type archivals: A list of Archival objects
:returns: Archival dict about the dataset, with keys:
status_id
status
reason
is_broken
'''
archival_dict = {'status_id': None, 'status': None,
'reason': None, 'is_broken': None}
for archival in archivals:
# status_id takes the highest id i.e. pessimistic
# reason matches the status_id
if archival_dict['status_id'] is None or \
archival.status_id > archival_dict['status_id']:
archival_dict['status_id'] = archival.status_id
archival_dict['reason'] = archival.reason
if archivals:
archival_dict['status'] = Status.by_id(archival_dict['status_id'])
archival_dict['is_broken'] = \
Status.is_status_broken(archival_dict['status_id'])
return archival_dict
def init_tables(engine):
Base.metadata.create_all(engine)
log.info('Archiver database tables are set-up')
import logging
import os
import logging
from ckan import model
from ckan.model.types import make_uuid
from ckan import plugins as p
import ckan.plugins.toolkit as toolkit
from ckan.lib.celery_app import celery
from ckanext.report.interfaces import IReport
from ckanext.archiver.interfaces import IPipe
import ckan.plugins.toolkit as toolkit
from ckanext.archiver.logic import action, auth
from ckanext.archiver import helpers
from ckanext.archiver.model import Archival, aggregate_archivals_for_a_dataset
log = logging.getLogger(__name__)
class ArchiverPlugin(p.SingletonPlugin):
class ArchiverPlugin(p.SingletonPlugin, toolkit.DefaultDatasetForm):
"""
Registers to be notified whenever CKAN resources are created or their URLs
change, and will create a new ckanext.archiver celery task to archive the
......@@ -21,6 +25,10 @@ class ArchiverPlugin(p.SingletonPlugin):
p.implements(p.IDomainObjectModification, inherit=True)
p.implements(IReport)
p.implements(p.IConfigurer, inherit=True)
p.implements(p.IActions)
p.implements(p.IAuthFunctions)
p.implements(p.ITemplateHelpers)
p.implements(p.IDatasetForm, inherit=True)
# IDomainObjectModification
......@@ -45,6 +53,83 @@ class ArchiverPlugin(p.SingletonPlugin):
def update_config(self, config):
toolkit.add_template_directory(config, 'templates')
# IActions
def get_actions(self):
return dict((name, function) for name, function
in action.__dict__.items()
if callable(function))
# IAuthFunctions
def get_auth_functions(self):
return dict((name, function) for name, function
in auth.__dict__.items()
if callable(function))
# ITemplateHelpers
def get_helpers(self):
return dict((name, function) for name, function
in helpers.__dict__.items()
if callable(function) and name[0] != '_')
# IDatasetForm
def package_types(self):
return ['dataset']
def is_fallback(self):
# This is just a fallback, so a site-specific extension can have their
# own IDatasetForm for datasets, but they they will lose the ability to
# see broken-link info on the dataset and in the API, unless they
# integrate the following schema changes in this IDataset form into
# their one.
return True
def update_package_schema(self):
schema = toolkit.DefaultDatasetForm.update_package_schema(self)
# don't save archiver info in the dataset, since it is stored in the
# archival table instead, and the value added into the package_show
# result in the show_package_schema
ignore = toolkit.get_validator('ignore')
schema['archiver'] = [ignore]
schema['resources']['archiver'] = [ignore]
return schema
def show_package_schema(self):
schema = toolkit.DefaultDatasetForm.show_package_schema(self)
schema['archiver'] = [add_archival_information]
return schema
# this is a validator
def add_archival_information(key, data, errors, context):
archivals = Archival.get_for_package(data[('id',)])
# dataset
dataset_archival = aggregate_archivals_for_a_dataset(archivals)
data[key] = dataset_archival
# resources
# (insert archival info into resources here, rather than in a separate
# per-resource validator, because that would mean getting the archival info
# from the database again separately for each resource)
archivals_by_res_id = dict((a.resource_id, a) for a in archivals)
res_index = 0
while True:
res_id_key = ('resources', res_index, u'id')
if res_id_key not in data:
# no more resources
break
res_id = data[res_id_key]
archival = archivals_by_res_id.get(res_id)
archival_dict = archival.as_dict()
if archival:
del archival_dict['id']
del archival_dict['package_id']
del archival_dict['resource_id']
data[('resources', res_index, key[0])] = archival_dict
res_index += 1
def create_archiver_resource_task(resource, queue):
from pylons import config
if hasattr(model, 'ResourceGroup'):
......@@ -66,6 +151,7 @@ def create_archiver_package_task(package, queue):
task_id=task_id, queue=queue)
log.debug('Archival of package put into celery queue %s: %s', queue, package.name)
class TestIPipePlugin(p.SingletonPlugin):
"""
"""
......
......@@ -15,6 +15,7 @@ import time
from requests.packages import urllib3
from ckan.lib.celery_app import celery
from ckan.lib.search.index import PackageSearchIndex
try:
from ckanext.archiver import settings
except ImportError:
......@@ -135,6 +136,15 @@ def update_package(ckan_ini_filepath, package_id, queue='bulk'):
notify_package(package, queue, ckan_ini_filepath)
# Refresh the index for this dataset, so that it contains the latest
# archive info
package_index = PackageSearchIndex()
# need to re-get the package to avoid using the cache
context_ = {'model': model, 'ignore_auth': True, 'session': model.Session,
'use_cache': False, 'validate': False}
package = get_action('package_show')(context_, {'id': package_id})
package_index.index_package(package, defer_commit=False)
def _update_resource(ckan_ini_filepath, resource_id, queue):
"""
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment