tasks.py 31.2 KB
Newer Older
1
import os
2
3
import hashlib
import httplib
4
5
import requests
import json
6
7
import urllib
import urlparse
8
import tempfile
9
import shutil
10
import datetime
11
import copy
12
import re
13
import time
14

15
16
from requests.packages import urllib3

kindly's avatar
kindly committed
17
from ckan.lib.celery_app import celery
18
from ckan import plugins as p
19
20
21
22
try:
    from ckanext.archiver import settings
except ImportError:
    from ckanext.archiver import default_settings as settings
23
from ckanext.archiver import interfaces as archiver_interfaces
24

25
26
toolkit = p.toolkit

27
28
ALLOWED_SCHEMES = set(('http', 'https', 'ftp'))

29
USER_AGENT = 'ckanext-archiver'
Ross Jones's avatar
Ross Jones committed
30
31


32
def load_config(ckan_ini_filepath):
Ross Jones's avatar
Ross Jones committed
33
    import paste.deploy
34
    config_abs_path = os.path.abspath(ckan_ini_filepath)
Ross Jones's avatar
Ross Jones committed
35
36
37
    conf = paste.deploy.appconfig('config:' + config_abs_path)
    import ckan
    ckan.config.environment.load_environment(conf.global_conf,
38
                                             conf.local_conf)
Ross Jones's avatar
Ross Jones committed
39
40
41
42
43
44
45
46
47


def register_translator():
    # Register a translator in this thread so that
    # the _() functions in logic layer can work
    from paste.registry import Registry
    from pylons import translator
    from ckan.lib.cli import MockTranslator
    global registry
48
    registry = Registry()
Ross Jones's avatar
Ross Jones committed
49
50
    registry.prepare()
    global translator_obj
51
    translator_obj = MockTranslator()
Ross Jones's avatar
Ross Jones committed
52
    registry.register(translator, translator_obj)
53

54
55
class ArchiverError(Exception):
    pass
56
class ArchiverErrorBeforeDownloadStarted(ArchiverError):
57
    pass
58
class DownloadException(ArchiverError):
59
    pass
60
61
62
class ArchiverErrorAfterDownloadStarted(ArchiverError):
    def __init__(self, msg, url_redirected_to=None):
        super(ArchiverError, self).__init__(msg)
63
        self.url_redirected_to = url_redirected_to
64
65
66
67
68
class DownloadError(ArchiverErrorAfterDownloadStarted):
    pass
class ArchiveError(ArchiverErrorAfterDownloadStarted):
    pass
class ChooseNotToDownload(ArchiverErrorAfterDownloadStarted):
69
    pass
70
71
class NotChanged(ArchiverErrorAfterDownloadStarted):
    pass
72
class LinkCheckerError(ArchiverError):
73
    pass
74
75
76
77
class LinkInvalidError(LinkCheckerError):
    pass
class LinkHeadRequestError(LinkCheckerError):
    pass
78
79
class LinkHeadMethodNotSupported(LinkCheckerError):
    pass
80
class CkanError(ArchiverError):
81
82
83
    pass


84
85
@celery.task(name="archiver.update_resource")
def update_resource(ckan_ini_filepath, resource_id, queue='bulk'):
86
    '''
87
    Archive a resource.
88
    '''
89
90
    log = update_resource.get_logger()
    log.info('Starting update_resource task: res_id=%r queue=%s', resource_id, queue)
Ross Jones's avatar
Ross Jones committed
91

92
93
94
    # HACK because of race condition #1481
    time.sleep(2)

95
    # Do all work in a sub-routine since it can then be tested without celery.
96
97
    # Also put try/except around it is easier to monitor ckan's log rather than
    # celery's task status.
98
    try:
99
        result = _update_resource(ckan_ini_filepath, resource_id, queue)
100
        return result
101
    except Exception, e:
102
103
        if os.environ.get('DEBUG'):
            raise
104
        # Any problem at all is logged and reraised so that celery can log it too
105
        log.error('Error occurred during archiving resource: %s\nResource: %r',
106
                  e, resource_id)
107
108
        raise

109
@celery.task(name="archiver.update_package")
110
def update_package(ckan_ini_filepath, package_id, queue='bulk'):
111
    '''
112
    Archive a package.
113
    '''
114
    from ckan import model
115
116

    get_action = toolkit.get_action
117
118
119
120

    load_config(ckan_ini_filepath)
    register_translator()

121
    log = update_package.get_logger()
122
123
    log.info('Starting update_package task: package_id=%r queue=%s', package_id, queue)

124
    num_archived = 0
125
126
127
    # Do all work in a sub-routine since it can then be tested without celery.
    # Also put try/except around it is easier to monitor ckan's log rather than
    # celery's task status.
128
    try:
129
130
131
        context_ = {'model': model, 'ignore_auth': True, 'session': model.Session}
        package = get_action('package_show')(context_, {'id': package_id})

132
133
        for resource in package['resources']:
            resource_id = resource['id']
134
135
136
            res = _update_resource(ckan_ini_filepath, resource_id, queue)
            if res:
                num_archived += 1
137
138
139
140
    except Exception, e:
        if os.environ.get('DEBUG'):
            raise
        # Any problem at all is logged and reraised so that celery can log it too
141
        log.error('Error occurred during archiving package: %s\nPackage: %r %r',
142
                  e, package_id, package['name'] if 'package' in dir() else '')
143
144
        raise

145
146
147
148
149
    if num_archived > 0:
        log.info("Notifying package as %d items were archived", num_archived)
        notify_package(package, queue, ckan_ini_filepath)
    else:
        log.info("Not notifying package as 0 items were archived")
150

151
    # Refresh the index for this dataset, so that it contains the latest
152
153
154
155
156
157
    # archive info. However skip it if there are downstream plugins that will
    # do this anyway, since it is an expensive step to duplicate.
    if 'qa' not in get_plugins_waiting_on_ipipe():
        _update_search_index(package_id, log)
    else:
        log.info('Search index skipped %s', package['name'])
158
159
160
161
162
163
164
165


def _update_search_index(package_id, log):
    '''
    Tells CKAN to update its search index for a given package.
    '''
    from ckan import model
    from ckan.lib.search.index import PackageSearchIndex
166
167
168
    package_index = PackageSearchIndex()
    context_ = {'model': model, 'ignore_auth': True, 'session': model.Session,
                'use_cache': False, 'validate': False}
169
    package = toolkit.get_action('package_show')(context_, {'id': package_id})
170
    package_index.index_package(package, defer_commit=False)
171
    log.info('Search indexed %s', package['name'])
172

173

174
def _update_resource(ckan_ini_filepath, resource_id, queue):
175
176
    """
    Link check and archive the given resource.
177
    If successful, updates the archival table with the cache_url & hash etc.
178
    Finally, a notification of the archival is broadcast.
179

180
181
    Params:
      resource - resource dict
182
      queue - name of the celery queue
183

184
185
186
187
188
189
    Should only raise on a fundamental error:
      ArchiverError
      CkanError

    Returns a JSON dict, ready to be returned from the celery task giving a
    success status:
190
191
192
193
        {
            'resource': the updated resource dict,
            'file_path': path to archived file (if archive successful), or None
        }
194
    If not successful, returns None.
195
    """
196
    log = update_resource.get_logger()
197

198
199
200
201
202
    load_config(ckan_ini_filepath)
    register_translator()

    from ckan import model
    from pylons import config
203
204
205
    from ckan.plugins import toolkit

    get_action = toolkit.get_action
206
207
208
209

    assert is_id(resource_id), resource_id
    context_ = {'model': model, 'ignore_auth': True, 'session': model.Session}
    resource = get_action('resource_show')(context_, {'id': resource_id})
210
211

    if not os.path.exists(settings.ARCHIVE_DIR):
212
        log.info("Creating archive directory: %s" % settings.ARCHIVE_DIR)
213
214
        os.mkdir(settings.ARCHIVE_DIR)

215
216
217
218
219
220
221
    def _save(status_id, exception, resource, url_redirected_to=None,
              download_result=None, archive_result=None):
        reason = '%s' % exception
        save_archival(resource, status_id,
                      reason, url_redirected_to,
                      download_result, archive_result,
                      log)
David Read's avatar
David Read committed
222
223
224
225
        notify_resource(
            resource,
            queue,
            archive_result.get('cache_filename') if archive_result else None)
226

227
    # Download
228
229
230
    try_as_api = False
    requires_archive = True

231
    log.info("Attempting to download resource: %s" % resource['url'])
232
    download_result = None
233
    from ckanext.archiver.model import Status, Archival
234
235
236
    download_status_id = Status.by_text('Archived successfully')
    context = {
        'site_url': config.get('ckan.site_url_internally') or config['ckan.site_url'],
237
        'cache_url_root': config.get('ckanext-archiver.cache_url_root'),
238
        'previous': Archival.get_for_resource(resource_id)
239
        }
240
    try:
241
        download_result = download(context, resource)
242
243
244
245
    except NotChanged, e:
        download_status_id = Status.by_text('Content has not changed')
        try_as_api = False
        requires_archive = False
246
    except LinkInvalidError, e:
247
        download_status_id = Status.by_text('URL invalid')
248
        try_as_api = False
249
    except DownloadException, e:
250
        download_status_id = Status.by_text('Download error')
251
        try_as_api = True
252
    except DownloadError, e:
253
        download_status_id = Status.by_text('Download error')
254
        try_as_api = True
255
    except ChooseNotToDownload, e:
256
        download_status_id = Status.by_text('Chose not to download')
257
        try_as_api = False
258
    except Exception, e:
259
260
        if os.environ.get('DEBUG'):
            raise
261
        log.error('Uncaught download failure: %r, %r', e, e.args)
262
        _save(Status.by_text('Download failure'), e, resource)
263
        return
264

265
266
267
268
269
270
271
272
273
274
275
    if not Status.is_ok(download_status_id):
        log.info('GET error: %s - %r, %r "%s"',
                 Status.by_id(download_status_id), e, e.args,
                 resource.get('url'))

        if try_as_api:
            download_result = api_request(context, resource)
            if download_result:
                download_status_id = Status.by_text('Archived successfully')
            # else the download_status_id (i.e. an error) is left what it was
            # from the previous download (i.e. not when we tried it as an API)
276

277
        if not try_as_api or not Status.is_ok(download_status_id):
278
            extra_args = [e.url_redirected_to] if 'url_redirected_to' in e else []
279
            _save(download_status_id, e, resource, *extra_args)
280
281
            return

282
283
284
    if not requires_archive:
        # We don't need to archive if the remote content has not changed
        return None
285

286
    # Archival
287
288
    log.info('Attempting to archive resource')
    try:
289
        archive_result = archive_resource(context, resource, log, download_result)
290
291
    except ArchiveError, e:
        log.error('System error during archival: %r, %r', e, e.args)
292
        _save(Status.by_text('System error during archival'), e, resource, download_result['url_redirected_to'])
293
        return
294

295
    # Success
296
    _save(Status.by_text('Archived successfully'), '', resource,
297
            download_result['url_redirected_to'], download_result, archive_result)
298

299
300
    # The return value is only used by tests. Serialized for Celery.
    return json.dumps(dict(download_result, **archive_result))
301

302

303
304
305
306
def download(context, resource, url_timeout=30,
             max_content_length=settings.MAX_CONTENT_LENGTH,
             method='GET'):
    '''Given a resource, tries to download it.
307

308
309
    Params:
      resource - dict of the resource
310

311
312
    Exceptions from tidy_url may be propagated:
       LinkInvalidError if the URL is invalid
313

314
315
316
317
318
319
320
321
322
323
324
325
326
    If there is an error performing the download, raises:
       DownloadException - connection problems etc.
       DownloadError - HTTP status code is an error or 0 length

    If download is not suitable (e.g. too large), raises:
       ChooseNotToDownload

    If the basic GET fails then it will try it with common API
    parameters (SPARQL, WMS etc) to get a better response.

    Returns a dict of results of a successful download:
      mimetype, size, hash, headers, saved_file, url_redirected_to
    '''
327
    log = update_resource.get_logger()
328

329
    url = resource['url']
330

331
    url = tidy_url(url)
332

333
334
335
    if (resource.get('resource_type') == 'file.upload' and
            not url.startswith('http')):
        url = context['site_url'].rstrip('/') + url
336

337
338
    headers = _set_user_agent_string({})

339
340
341
    # start the download - just get the headers
    # May raise DownloadException
    method_func = {'GET': requests.get, 'POST': requests.post}[method]
342
343
    res = requests_wrapper(log, method_func, url, timeout=url_timeout,
                           stream=True, headers=headers)
344
    url_redirected_to = res.url if url != res.url else None
345

346
347
    if context.get('previous') and ('etag' in res.headers):
        if context.get('previous').etag == res.headers['etag']:
348
            log.info("ETAG matches, not downloading content")
349
350
            raise NotChanged("etag suggests content has not changed")

351
352
353
354
    if not res.ok:  # i.e. 404 or something
        raise DownloadError('Server reported status error: %s %s' %
                            (res.status_code, res.reason),
                            url_redirected_to)
David Read's avatar
David Read committed
355
    log.info('GET started successfully. Content headers: %r', res.headers)
356

357
358
    # record headers
    mimetype = _clean_content_type(res.headers.get('content-type', '').lower())
359

360
361
    # make sure resource content-length does not exceed our maximum
    content_length = res.headers.get('content-length')
362

363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
    if content_length:
        try:
            content_length = int(content_length)
        except ValueError:
            # if there are multiple Content-Length headers, requests
            # will return all the values, comma separated
            if ',' in content_length:
                try:
                    content_length = int(content_length.split(',')[0])
                except ValueError:
                    pass
    if isinstance(content_length, int) and \
       int(content_length) >= max_content_length:
            # record fact that resource is too large to archive
            log.warning('Resource too large to download: %s > max (%s). '
                        'Resource: %s %r', content_length,
                        max_content_length, resource['id'], url)
            raise ChooseNotToDownload('Content-length %s exceeds maximum '
                                      'allowed value %s' %
                                      (content_length, max_content_length),
                                      url_redirected_to)
    # content_length in the headers is useful but can be unreliable, so when we
    # download, we will monitor it doesn't go over the max.
386

387
388
389
    # continue the download - stream the response body
    def get_content():
        return res.content
390
    log.info('Downloading the body')
391
    content = requests_wrapper(log, get_content)
392

393
394
395
396
397
    # APIs can return status 200, but contain an error message in the body
    if response_is_an_api_error(content):
        raise DownloadError('Server content contained an API error message: %s' % \
                            content[:250],
                            url_redirected_to)
398

399
400
401
402
403
    content_length = len(content)
    if content_length > max_content_length:
        raise ChooseNotToDownload("Content-length %s exceeds maximum allowed value %s" %
                                  (content_length, max_content_length),
                                  url_redirected_to)
404

405
    log.info('Saving resource')
406
    try:
407
408
409
410
        length, hash, saved_file_path = _save_resource(resource, res, max_content_length)
    except ChooseNotToDownload, e:
        raise ChooseNotToDownload(str(e), url_redirected_to)
    log.info('Resource saved. Length: %s File: %s', length, saved_file_path)
411

412
413
414
415
416
417
418
    # zero length (or just one byte) indicates a problem
    if length < 2:
        # record fact that resource is zero length
        log.warning('Resource found was length %i - not archiving. Resource: %s %r',
                 length, resource['id'], url)
        raise DownloadError("Content-length after streaming was %i" % length,
                            url_redirected_to)
419

420
421
    log.info('Resource downloaded: id=%s url=%r cache_filename=%s length=%s hash=%s',
             resource['id'], url, saved_file_path, length, hash)
422

423
424
425
    return {'mimetype': mimetype,
            'size': length,
            'hash': hash,
426
            'headers': dict(res.headers),
427
428
429
            'saved_file': saved_file_path,
            'url_redirected_to': url_redirected_to,
            'request_type': method}
430

431

432
def archive_resource(context, resource, log, result=None, url_timeout=30):
433
    """
434
    Archive the given resource. Moves the file from the temporary location
435
    given in download().
436
437
438
439
440

    Params:
       result - result of the download(), containing keys: length, saved_file

    If there is a failure, raises ArchiveError.
441

442
    Returns: {cache_filepath, cache_url}
443
    """
444
445
446
447
448
449
450
451
452
    relative_archive_path = os.path.join(resource['id'][:2], resource['id'])
    archive_dir = os.path.join(settings.ARCHIVE_DIR, relative_archive_path)
    if not os.path.exists(archive_dir):
        os.makedirs(archive_dir)
    # try to get a file name from the url
    parsed_url = urlparse.urlparse(resource.get('url'))
    try:
        file_name = parsed_url.path.split('/')[-1] or 'resource'
        file_name = file_name.strip()  # trailing spaces cause problems
453
        file_name = file_name.encode('ascii', 'ignore')  # e.g. u'\xa3' signs
454
455
456
457
458
459
460
461
462
463
464
465
466
    except:
        file_name = "resource"

    # move the temp file to the resource's archival directory
    saved_file = os.path.join(archive_dir, file_name)
    shutil.move(result['saved_file'], saved_file)
    log.info('Going to do chmod: %s', saved_file)
    try:
        os.chmod(saved_file, 0644)  # allow other users to read it
    except Exception, e:
        log.error('chmod failed %s: %s', saved_file, e)
        raise
    log.info('Archived resource as: %s', saved_file)
467

468
469
    # calculate the cache_url
    if not context.get('cache_url_root'):
David Read's avatar
David Read committed
470
471
        log.warning('Not saved cache_url because no value for '
                    'ckanext-archiver.cache_url_root in config')
472
        raise ArchiveError('No value for ckanext-archiver.cache_url_root in config')
473
474
475
476
    cache_url = urlparse.urljoin(context['cache_url_root'],
                                 '%s/%s' % (relative_archive_path, file_name))
    return {'cache_filepath': saved_file,
            'cache_url': cache_url}
477
478


David Read's avatar
David Read committed
479
def notify_resource(resource, queue, cache_filepath):
480
    '''
481
482
    Broadcasts an IPipe notification that an resource archival has taken place
    (or at least the archival object is changed somehow).
483
484
485
    '''
    archiver_interfaces.IPipe.send_data('archived',
                                        resource_id=resource['id'],
486
                                        queue=queue,
487
488
                                        cache_filepath=cache_filepath)

489

490
491
def notify_package(package, queue, cache_filepath):
    '''
492
493
494
    Broadcasts an IPipe notification that a package archival has taken place
    (or at least the archival object is changed somehow). e.g.
    ckanext-packagezip listens for this
495
496
497
    '''
    archiver_interfaces.IPipe.send_data('package-archived',
                                        package_id=package['id'],
David Read's avatar
David Read committed
498
                                        queue=queue,
499
500
501
                                        cache_filepath=cache_filepath)


502
503
504
505
506
def get_plugins_waiting_on_ipipe():
    return [observer.name for observer in
            p.PluginImplementations(archiver_interfaces.IPipe)]


507
508
509
510
511
512
513
514
def _clean_content_type(ct):
    # For now we should remove the charset from the content type and
    # handle it better, differently, later on.
    if 'charset' in ct:
        return ct[:ct.index(';')]
    return ct


515
516
517
518
519
520
521
522
523
524
525
def _set_user_agent_string(headers):
    '''
    Update the passed headers object with a `User-Agent` key, if there is a
    USER_AGENT_STRING option in settings.
    '''
    ua_str = settings.USER_AGENT_STRING
    if ua_str is not None:
        headers['User-Agent'] = ua_str
    return headers


526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
def tidy_url(url):
    '''
    Given a URL it does various checks before returning a tidied version
    suitable for calling.

    It may raise LinkInvalidError if the URL has a problem.
    '''

    # Find out if it has unicode characters, and if it does, quote them
    # so we are left with an ascii string
    try:
        url = url.decode('ascii')
    except:
        parts = list(urlparse.urlparse(url))
        parts[2] = urllib.quote(parts[2].encode('utf-8'))
        parts[1] = urllib.quote(parts[1].encode('utf-8'))
        url = urlparse.urlunparse(parts)
    url = str(url)

    # strip whitespace from url
    # (browsers appear to do this)
    url = url.strip()

    # Use urllib3 to parse the url ahead of time, since that is what
    # requests uses, but when it does it during a GET, errors are not
    # caught well
    try:
        parsed_url = urllib3.util.parse_url(url)
    except urllib3.exceptions.LocationParseError, e:
        raise LinkInvalidError('URL parsing failure: %s' % e)

    # Check we aren't using any schemes we shouldn't be
    if not parsed_url.scheme in ALLOWED_SCHEMES:
        raise LinkInvalidError('Invalid url scheme. Please use one of: %s' %
                               ' '.join(ALLOWED_SCHEMES))

    if not parsed_url.host:
        raise LinkInvalidError('URL parsing failure - did not find a host name')

    return url


David Read's avatar
David Read committed
568
def _save_resource(resource, response, max_file_size, chunk_size=1024*16):
569
570
571
572
573
574
575
    """
    Write the response content to disk.

    Returns a tuple:

        (file length: int, content hash: string, saved file path: string)
    """
576
577
    resource_hash = hashlib.sha1()
    length = 0
578

579
    fd, tmp_resource_file_path = tempfile.mkstemp()
580

581
    with open(tmp_resource_file_path, 'wb') as fp:
582
583
        for chunk in response.iter_content(chunk_size=chunk_size,
                                           decode_unicode=False):
584
585
586
            fp.write(chunk)
            length += len(chunk)
            resource_hash.update(chunk)
587

588
            if length >= max_file_size:
589
590
591
                raise ChooseNotToDownload(
                    "Content length %s exceeds maximum allowed value %s" %
                    (length, max_file_size))
592

593
    os.close(fd)
594

595
    content_hash = unicode(resource_hash.hexdigest())
596
    return length, content_hash, tmp_resource_file_path
597

598

599
600
601
def save_archival(resource, status_id, reason, url_redirected_to,
                  download_result, archive_result, log):
    '''Writes to the archival table the result of an attempt to download
602
603
604
605
    the resource.

    May propagate a CkanError.
    '''
606
607
608
609
610
611
    now = datetime.datetime.now()

    from ckanext.archiver.model import Archival, Status
    from ckan import model

    archival = Archival.get_for_resource(resource['id'])
612
    first_archival = not archival
613
    previous_archival_was_broken = None
614
615
616
    if not archival:
        archival = Archival.create(resource['id'])
        model.Session.add(archival)
617
618
619
    else:
        log.info('Archival from before: %r', archival)
        previous_archival_was_broken = archival.is_broken
620
621
622
623
624
625

    revision = model.Session.query(model.Revision).get(resource['revision_id'])
    archival.resource_timestamp = revision.timestamp

    # Details of the latest archival attempt
    archival.status_id = status_id
626
    archival.is_broken = Status.is_status_broken(status_id)
627
628
629
630
631
632
633
634
635
636
    archival.reason = reason
    archival.url_redirected_to = url_redirected_to

    # Details of successful archival
    if archival.is_broken is False:
        archival.cache_filepath = archive_result['cache_filepath']
        archival.cache_url = archive_result['cache_url']
        archival.size = download_result['size']
        archival.mimetype = download_result['mimetype']
        archival.hash = download_result['hash']
637
638
        archival.etag = download_result['headers'].get('etag')
        archival.last_modified = download_result['headers'].get('last-modified')
639
640
641
642
643
644
645

    # History
    if archival.is_broken is False:
        archival.last_success = now
        archival.first_failure = None
        archival.failure_count = 0
    else:
646
647
648
649
        log.info('First_archival=%r Previous_broken=%r Failure_count=%r' %
                 (first_archival, previous_archival_was_broken,
                  archival.failure_count))
        if first_archival or previous_archival_was_broken is False:
650
651
652
653
654
            # i.e. this is the first failure (or the first archival)
            archival.first_failure = now
            archival.failure_count = 1
        else:
            archival.failure_count += 1
655

656
657
658
    archival.updated = now
    log.info('Archival saved: %r', archival)
    model.repo.commit_and_remove()
659

660
661

def requests_wrapper(log, func, *args, **kwargs):
662
    '''
663
664
    Run a requests command, catching exceptions and reraising them as
    DownloadException. Status errors, such as 404 or 500 do not cause
David Read's avatar
David Read committed
665
    exceptions, instead exposed as not response.ok.
666
    e.g.
667
    >>> requests_wrapper(log, requests.get, url, timeout=url_timeout)
668
    runs:
669
        res = requests.get(url, timeout=url_timeout)
670
    '''
671
    from requests_ssl import SSLv3Adapter
672
    try:
673
674
675
676
677
678
679
680
681
682
683
684
        try:
            response = func(*args, **kwargs)
        except requests.exceptions.ConnectionError, e:
            if 'SSL23_GET_SERVER_HELLO' not in str(e):
                raise
            log.info('SSLv23 failed so trying again using SSLv3: %r', args)
            requests_session = requests.Session()
            requests_session.mount('https://', SSLv3Adapter())
            func = {requests.get: requests_session.get,
                    requests.post: requests_session.post}[func]
            response = func(*args, **kwargs)

685
    except requests.exceptions.ConnectionError, e:
686
        raise DownloadException('Connection error: %s' % e)
687
    except requests.exceptions.HTTPError, e:
688
        raise DownloadException('Invalid HTTP response: %s' % e)
689
    except requests.exceptions.Timeout, e:
690
        raise DownloadException('Connection timed out after %ss' % kwargs.get('timeout', '?'))
691
    except requests.exceptions.TooManyRedirects, e:
692
        raise DownloadException('Too many redirects')
693
    except requests.exceptions.RequestException, e:
694
        raise DownloadException('Error downloading: %s' % e)
695
    except Exception, e:
696
697
        if os.environ.get('DEBUG'):
            raise
698
        raise DownloadException('Error with the download: %s' % e)
699
    return response
700

701

702
703
def ogc_request(context, resource, service, wms_version):
    original_url = url = resource['url']
704
705
706
    # Remove parameters
    url = url.split('?')[0]
    # Add WMS GetCapabilities parameters
707
708
    url += '?service=%s&request=GetCapabilities&version=%s' % \
           (service, wms_version)
709
    resource['url'] = url
710
711
712
713
714
    # Make the request
    response = download(context, resource)
    # Restore the URL so that it doesn't get saved in the actual resource
    resource['url'] = original_url
    return response
715

716

717
def wms_1_3_request(context, resource):
718
    res = ogc_request(context, resource, 'WMS', '1.3')
719
720
721
    res['request_type'] = 'WMS 1.3'
    return res

722

723
def wms_1_1_1_request(context, resource):
724
    res = ogc_request(context, resource, 'WMS', '1.1.1')
725
726
727
    res['request_type'] = 'WMS 1.1.1'
    return res

728

729
def wfs_request(context, resource):
730
    res = ogc_request(context, resource, 'WFS', '2.0')
731
732
733
    res['request_type'] = 'WFS 2.0'
    return res

734

735
736
737
def api_request(context, resource):
    '''
    Tries making requests as if the resource is a well-known sort of API to try
738
739
    and get a valid response. If it does it returns the response, otherwise
    Archives the response and stores what sort of request elicited it.
740
    '''
741
    log = update_resource.get_logger()
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
    # 'resource' holds the results of the download and will get saved. Only if
    # an API request is successful do we want to save the details of it.
    # However download() gets altered for these API requests. So only give
    # download() a copy of 'resource'.
    for api_request_func in wms_1_3_request, wms_1_1_1_request, wfs_request:
        resource_copy = copy.deepcopy(resource)
        try:
            download_dict = api_request_func(context, resource_copy)
        except ArchiverError, e:
            log.info('API %s error: %r, %r "%s"', api_request_func,
                     e, e.args, resource.get('url'))
            continue
        except Exception, e:
            if os.environ.get('DEBUG'):
                raise
            log.error('Uncaught API %s failure: %r, %r', api_request_func,
                      e, e.args)
            continue

        return download_dict

763
764
765
766
767
768
769

def is_id(id_string):
    '''Tells the client if the string looks like a revision id or not'''
    reg_ex = '^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$'
    return bool(re.match(reg_ex, id_string))


770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
def response_is_an_api_error(response_body):
    '''Some APIs return errors as the response body, but HTTP status 200. So we
    need to check response bodies for these error messages.
    '''
    response_sample = response_body[:250]  # to allow for <?xml> and <!DOCTYPE> lines

    # WMS spec
    # e.g. https://map.bgs.ac.uk/ArcGIS/services/BGS_Detailed_Geology/MapServer/WMSServer?service=abc
    # <?xml version="1.0" encoding="UTF-8" standalone="yes" ?>
    # <ServiceExceptionReport version="1.3.0"
    if '<ServiceExceptionReport' in response_sample:
        return True

    # This appears to be an alternative - I can't find the spec.
    # e.g. http://sedsh13.sedsh.gov.uk/ArcGIS/services/HS/Historic_Scotland/MapServer/WFSServer?service=abc
    # <ows:ExceptionReport version='1.1.0' language='en' xmlns:ows='http://www.opengis.net/ows'><ows:Exception exceptionCode='NoApplicableCode'><ows:ExceptionText>Wrong service type.</ows:ExceptionText></ows:Exception></ows:ExceptionReport>
    if '<ows:ExceptionReport' in response_sample:
        return True

789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815

@celery.task(name="archiver.clean")
def clean():
    """
    Remove all archived resources.
    """
    log = clean.get_logger()
    log.error("clean task not implemented yet")


@celery.task(name="archiver.link_checker")
def link_checker(context, data):
    """
    Check that the resource's url is valid, and accepts a HEAD request.

    Redirects are not followed - they simple return 'location' in the headers.

    data is a JSON dict describing the link:
        { 'url': url,
          'url_timeout': url_timeout }

    Raises LinkInvalidError if the URL is invalid
    Raises LinkHeadRequestError if HEAD request fails
    Raises LinkHeadMethodNotSupported if server says HEAD is not supported

    Returns a json dict of the headers of the request
    """
816
    log = update_resource.get_logger()
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
    data = json.loads(data)
    url_timeout = data.get('url_timeout', 30)

    error_message = ''
    headers = {'User-Agent': USER_AGENT}

    url = tidy_url(data['url'])

    # Send a head request
    try:
        res = requests.head(url, timeout=url_timeout)
        headers = res.headers
    except httplib.InvalidURL, ve:
        log.error("Could not make a head request to %r, error is: %s. Package is: %r. This sometimes happens when using an old version of requests on a URL which issues a 301 redirect. Version=%s", url, ve, data.get('package'), requests.__version__)
        raise LinkHeadRequestError("Invalid URL or Redirect Link")
    except ValueError, ve:
        log.error("Could not make a head request to %r, error is: %s. Package is: %r.", url, ve, data.get('package'))
        raise LinkHeadRequestError("Could not make HEAD request")
    except requests.exceptions.ConnectionError, e:
        raise LinkHeadRequestError('Connection error: %s' % e)
    except requests.exceptions.HTTPError, e:
        raise LinkHeadRequestError('Invalid HTTP response: %s' % e)
    except requests.exceptions.Timeout, e:
        raise LinkHeadRequestError('Connection timed out after %ss' % url_timeout)
    except requests.exceptions.TooManyRedirects, e:
        raise LinkHeadRequestError('Too many redirects')
    except requests.exceptions.RequestException, e:
        raise LinkHeadRequestError('Error during request: %s' % e)
    except Exception, e:
        raise LinkHeadRequestError('Error with the request: %s' % e)
    else:
        if res.status_code == 405:
            # this suggests a GET request may be ok, so proceed to that
            # in the download
            raise LinkHeadMethodNotSupported()
        if not res.ok or res.status_code >= 400:
            error_message = 'Server returned HTTP error status: %s %s' % \
                (res.status_code, res.reason)
            raise LinkHeadRequestError(error_message)
856
    return json.dumps(dict(headers))
857
858