Commit 37fdd255 authored by Ross Jones's avatar Ross Jones
Browse files

Only download files that have changed

Some servers provide an etag which is a unique identifier for the
content being served (not always a hash).  This adds two new column to
the archival, an etag and a last-modified field.  If the etag is
available in the archival, and we get the same etag from the request,
then we will not download the file.  The last-modified field is saved
for sanity, it isn't yet used with If-Modified-Since in the request.

This requires a migration:

    alter table archival add column etag character varying;
    alter table archival add column last-modified character varying;

Notification of package updates is only done if at least 1 resource was
downloaded, to avoid unnecessarily running QA and Packagezip.
parent 87b3d8e2
......@@ -29,6 +29,7 @@ class Status:
not_broken = {
# is_broken = False
0: 'Archived successfully',
1: 'Content has not changed',
}
broken = {
# is_broken = True
......@@ -72,7 +73,7 @@ class Status:
@classmethod
def is_ok(cls, status_id):
return status_id == 0
return status_id in [0, 1]
class Archival(Base):
......@@ -99,6 +100,8 @@ class Archival(Base):
size = Column(types.BigInteger, default=0)
mimetype = Column(types.UnicodeText)
hash = Column(types.UnicodeText)
etag = Column(types.UnicodeText)
last_modified = Column(types.UnicodeText)
# History
first_failure = Column(types.DateTime)
......
......@@ -64,6 +64,8 @@ class ArchiveError(ArchiverErrorAfterDownloadStarted):
pass
class ChooseNotToDownload(ArchiverErrorAfterDownloadStarted):
pass
class NotChanged(ArchiverErrorAfterDownloadStarted):
pass
class LinkCheckerError(ArchiverError):
pass
class LinkInvalidError(LinkCheckerError):
......@@ -115,6 +117,7 @@ def update_package(ckan_ini_filepath, package_id, queue='bulk'):
log = update_package.get_logger()
log.info('Starting update_package task: package_id=%r queue=%s', package_id, queue)
num_archived = 0
# Do all work in a sub-routine since it can then be tested without celery.
# Also put try/except around it is easier to monitor ckan's log rather than
# celery's task status.
......@@ -124,7 +127,9 @@ def update_package(ckan_ini_filepath, package_id, queue='bulk'):
for resource in package['resources']:
resource_id = resource['id']
_update_resource(ckan_ini_filepath, resource_id, queue)
res = _update_resource(ckan_ini_filepath, resource_id, queue)
if res:
num_archived += 1
except Exception, e:
if os.environ.get('DEBUG'):
raise
......@@ -133,7 +138,11 @@ def update_package(ckan_ini_filepath, package_id, queue='bulk'):
e, package_id, package['name'] if package in dir() else '')
raise
notify_package(package, queue, ckan_ini_filepath)
if num_archived > 0:
log.info("Notifying package as %d items were archived", num_archived)
notify_package(package, queue, ckan_ini_filepath)
else:
log.info("Not notifying package as 0 items were archived")
def _update_resource(ckan_ini_filepath, resource_id, queue):
......@@ -188,16 +197,24 @@ def _update_resource(ckan_ini_filepath, resource_id, queue):
archive_result.get('cache_filename') if archive_result else None)
# Download
try_as_api = False
requires_archive = True
log.info("Attempting to download resource: %s" % resource['url'])
download_result = None
from ckanext.archiver.model import Status
from ckanext.archiver.model import Status, Archival
download_status_id = Status.by_text('Archived successfully')
context = {
'site_url': config.get('ckan.site_url_internally') or config['ckan.site_url'],
'cache_url_root': config.get('ckan.cache_url_root'),
'previous': Archival.get_for_resource(resource_id)
}
try:
download_result = download(context, resource)
except NotChanged, e:
download_status_id = Status.by_text('Content has not changed')
try_as_api = False
requires_archive = False
except LinkInvalidError, e:
download_status_id = Status.by_text('URL invalid')
try_as_api = False
......@@ -234,22 +251,26 @@ def _update_resource(ckan_ini_filepath, resource_id, queue):
_save(download_status_id, e, resource, *extra_args)
return
# Archival
log.info('Attempting to archive resource')
try:
archive_result = archive_resource(context, resource, log, download_result)
except ArchiveError, e:
log.error('System error during archival: %r, %r', e, e.args)
_save(Status.by_text('System error during archival'), e, resource, download_result['url_redirected_to'])
return
# Archival. We don't need this if the remote content has not changed
if requires_archive:
log.info('Attempting to archive resource')
try:
archive_result = archive_resource(context, resource, log, download_result)
except ArchiveError, e:
log.error('System error during archival: %r, %r', e, e.args)
_save(Status.by_text('System error during archival'), e, resource, download_result['url_redirected_to'])
return
# Success
_save(Status.by_text('Archived successfully'), '', resource,
download_result['url_redirected_to'], download_result, archive_result)
# Success
_save(Status.by_text('Archived successfully'), '', resource,
download_result['url_redirected_to'], download_result, archive_result)
# The return value is only used by tests. Serialized for Celery.
print download_result
print archive_result
return json.dumps(dict(download_result, **archive_result))
# The return value is only used by tests. Serialized for Celery.
print download_result
print archive_result
return json.dumps(dict(download_result, **archive_result))
return None
def download(context, resource, url_timeout=30,
......@@ -292,6 +313,11 @@ def download(context, resource, url_timeout=30,
res = convert_requests_exceptions(log, method_func, url, timeout=url_timeout,
stream=True)
url_redirected_to = res.url if url != res.url else None
if context['previous']:
if 'etag' in res.headers and context['previous'].etag == res.headers['etag']:
raise NotChanged("etag suggests content has not changed")
if not res.ok: # i.e. 404 or something
raise DownloadError('Server reported status error: %s %s' %
(res.status_code, res.reason),
......@@ -560,6 +586,8 @@ def save_archival(resource, status_id, reason, url_redirected_to,
archival.size = download_result['size']
archival.mimetype = download_result['mimetype']
archival.hash = download_result['hash']
archival.etag = download_result['headers'].get('etag')
archival.last_modified = download_result['headers'].get('last-modified')
# History
if archival.is_broken is False:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment