tasks.py 31.4 KB
Newer Older
1
import os
2
3
import hashlib
import httplib
4
5
import requests
import json
6
7
import urllib
import urlparse
8
import tempfile
9
import shutil
10
import datetime
11
import copy
12
import re
13
import time
14

15
16
from requests.packages import urllib3

kindly's avatar
kindly committed
17
from ckan.lib.celery_app import celery
18
from ckan import plugins as p
19
from ckanext.archiver import interfaces as archiver_interfaces
20

21
22
toolkit = p.toolkit

23
24
ALLOWED_SCHEMES = set(('http', 'https', 'ftp'))

25
USER_AGENT = 'ckanext-archiver'
Ross Jones's avatar
Ross Jones committed
26
27


28
def load_config(ckan_ini_filepath):
Ross Jones's avatar
Ross Jones committed
29
    import paste.deploy
30
    config_abs_path = os.path.abspath(ckan_ini_filepath)
Ross Jones's avatar
Ross Jones committed
31
32
33
    conf = paste.deploy.appconfig('config:' + config_abs_path)
    import ckan
    ckan.config.environment.load_environment(conf.global_conf,
34
                                             conf.local_conf)
Ross Jones's avatar
Ross Jones committed
35
36
37
38
39
40
41
42
43


def register_translator():
    # Register a translator in this thread so that
    # the _() functions in logic layer can work
    from paste.registry import Registry
    from pylons import translator
    from ckan.lib.cli import MockTranslator
    global registry
44
    registry = Registry()
Ross Jones's avatar
Ross Jones committed
45
46
    registry.prepare()
    global translator_obj
47
    translator_obj = MockTranslator()
Ross Jones's avatar
Ross Jones committed
48
    registry.register(translator, translator_obj)
49

50
51
class ArchiverError(Exception):
    pass
52
class ArchiverErrorBeforeDownloadStarted(ArchiverError):
53
    pass
54
class DownloadException(ArchiverError):
55
    pass
56
57
58
class ArchiverErrorAfterDownloadStarted(ArchiverError):
    def __init__(self, msg, url_redirected_to=None):
        super(ArchiverError, self).__init__(msg)
59
        self.url_redirected_to = url_redirected_to
60
61
62
63
64
class DownloadError(ArchiverErrorAfterDownloadStarted):
    pass
class ArchiveError(ArchiverErrorAfterDownloadStarted):
    pass
class ChooseNotToDownload(ArchiverErrorAfterDownloadStarted):
65
    pass
66
67
class NotChanged(ArchiverErrorAfterDownloadStarted):
    pass
68
class LinkCheckerError(ArchiverError):
69
    pass
70
71
72
73
class LinkInvalidError(LinkCheckerError):
    pass
class LinkHeadRequestError(LinkCheckerError):
    pass
74
75
class LinkHeadMethodNotSupported(LinkCheckerError):
    pass
76
class CkanError(ArchiverError):
77
78
79
    pass


80
81
@celery.task(name="archiver.update_resource")
def update_resource(ckan_ini_filepath, resource_id, queue='bulk'):
82
    '''
83
    Archive a resource.
84
    '''
85
86
87
    load_config(ckan_ini_filepath)
    register_translator()

88
89
    log = update_resource.get_logger()
    log.info('Starting update_resource task: res_id=%r queue=%s', resource_id, queue)
Ross Jones's avatar
Ross Jones committed
90

91
92
93
    # HACK because of race condition #1481
    time.sleep(2)

94
    # Do all work in a sub-routine since it can then be tested without celery.
95
96
    # Also put try/except around it is easier to monitor ckan's log rather than
    # celery's task status.
97
    try:
98
        result = _update_resource(resource_id, queue, log)
99
        return result
100
    except Exception, e:
101
102
        if os.environ.get('DEBUG'):
            raise
103
        # Any problem at all is logged and reraised so that celery can log it too
104
        log.error('Error occurred during archiving resource: %s\nResource: %r',
105
                  e, resource_id)
106
107
        raise

108
@celery.task(name="archiver.update_package")
109
def update_package(ckan_ini_filepath, package_id, queue='bulk'):
110
    '''
111
    Archive a package.
112
    '''
113
114
115
    load_config(ckan_ini_filepath)
    register_translator()

116
    log = update_package.get_logger()
117
118
    log.info('Starting update_package task: package_id=%r queue=%s',
             package_id, queue)
119

120
121
122
    # Do all work in a sub-routine since it can then be tested without celery.
    # Also put try/except around it is easier to monitor ckan's log rather than
    # celery's task status.
123
    try:
124
        _update_package(package_id, queue, log)
125
126
127
    except Exception, e:
        if os.environ.get('DEBUG'):
            raise
128
129
130
131
        # Any problem at all is logged and reraised so that celery can log it
        # too
        log.error('Error occurred during archiving package: %s\nPackage: %s',
                  e, package_id)
132
133
        raise

134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149

def _update_package(package_id, queue, log):
    from ckan import model

    get_action = toolkit.get_action

    num_archived = 0
    context_ = {'model': model, 'ignore_auth': True, 'session': model.Session}
    package = get_action('package_show')(context_, {'id': package_id})

    for resource in package['resources']:
        resource_id = resource['id']
        res = _update_resource(resource_id, queue, log)
        if res:
            num_archived += 1

150
151
    if num_archived > 0:
        log.info("Notifying package as %d items were archived", num_archived)
152
        notify_package(package, queue)
153
154
    else:
        log.info("Not notifying package as 0 items were archived")
155

156
    # Refresh the index for this dataset, so that it contains the latest
157
158
159
160
161
162
    # archive info. However skip it if there are downstream plugins that will
    # do this anyway, since it is an expensive step to duplicate.
    if 'qa' not in get_plugins_waiting_on_ipipe():
        _update_search_index(package_id, log)
    else:
        log.info('Search index skipped %s', package['name'])
163
164
165
166
167
168
169
170


def _update_search_index(package_id, log):
    '''
    Tells CKAN to update its search index for a given package.
    '''
    from ckan import model
    from ckan.lib.search.index import PackageSearchIndex
171
172
173
    package_index = PackageSearchIndex()
    context_ = {'model': model, 'ignore_auth': True, 'session': model.Session,
                'use_cache': False, 'validate': False}
174
    package = toolkit.get_action('package_show')(context_, {'id': package_id})
175
    package_index.index_package(package, defer_commit=False)
176
    log.info('Search indexed %s', package['name'])
177

178

179
def _update_resource(resource_id, queue, log):
180
181
    """
    Link check and archive the given resource.
182
    If successful, updates the archival table with the cache_url & hash etc.
183
    Finally, a notification of the archival is broadcast.
184

185
186
    Params:
      resource - resource dict
187
      queue - name of the celery queue
188

189
190
191
192
193
194
    Should only raise on a fundamental error:
      ArchiverError
      CkanError

    Returns a JSON dict, ready to be returned from the celery task giving a
    success status:
195
196
197
198
        {
            'resource': the updated resource dict,
            'file_path': path to archived file (if archive successful), or None
        }
199
    If not successful, returns None.
200
    """
201
202
    from ckan import model
    from pylons import config
203
    from ckan.plugins import toolkit
204
    from ckanext.archiver import default_settings as settings
205
206

    get_action = toolkit.get_action
207
208
209
210

    assert is_id(resource_id), resource_id
    context_ = {'model': model, 'ignore_auth': True, 'session': model.Session}
    resource = get_action('resource_show')(context_, {'id': resource_id})
211
212

    if not os.path.exists(settings.ARCHIVE_DIR):
213
        log.info("Creating archive directory: %s" % settings.ARCHIVE_DIR)
214
215
        os.mkdir(settings.ARCHIVE_DIR)

216
217
218
219
220
221
222
    def _save(status_id, exception, resource, url_redirected_to=None,
              download_result=None, archive_result=None):
        reason = '%s' % exception
        save_archival(resource, status_id,
                      reason, url_redirected_to,
                      download_result, archive_result,
                      log)
David Read's avatar
David Read committed
223
224
225
226
        notify_resource(
            resource,
            queue,
            archive_result.get('cache_filename') if archive_result else None)
227

228
    # Download
229
230
231
    try_as_api = False
    requires_archive = True

232
    log.info("Attempting to download resource: %s" % resource['url'])
233
    download_result = None
234
    from ckanext.archiver.model import Status, Archival
235
236
237
    download_status_id = Status.by_text('Archived successfully')
    context = {
        'site_url': config.get('ckan.site_url_internally') or config['ckan.site_url'],
238
        'cache_url_root': config.get('ckanext-archiver.cache_url_root'),
239
        'previous': Archival.get_for_resource(resource_id)
240
        }
241
    try:
242
        download_result = download(context, resource)
243
244
245
246
    except NotChanged, e:
        download_status_id = Status.by_text('Content has not changed')
        try_as_api = False
        requires_archive = False
247
    except LinkInvalidError, e:
248
        download_status_id = Status.by_text('URL invalid')
249
        try_as_api = False
250
    except DownloadException, e:
251
        download_status_id = Status.by_text('Download error')
252
        try_as_api = True
253
    except DownloadError, e:
254
        download_status_id = Status.by_text('Download error')
255
        try_as_api = True
256
    except ChooseNotToDownload, e:
257
        download_status_id = Status.by_text('Chose not to download')
258
        try_as_api = False
259
    except Exception, e:
260
261
        if os.environ.get('DEBUG'):
            raise
262
        log.error('Uncaught download failure: %r, %r', e, e.args)
263
        _save(Status.by_text('Download failure'), e, resource)
264
        return
265

266
267
268
269
270
271
272
273
274
275
276
    if not Status.is_ok(download_status_id):
        log.info('GET error: %s - %r, %r "%s"',
                 Status.by_id(download_status_id), e, e.args,
                 resource.get('url'))

        if try_as_api:
            download_result = api_request(context, resource)
            if download_result:
                download_status_id = Status.by_text('Archived successfully')
            # else the download_status_id (i.e. an error) is left what it was
            # from the previous download (i.e. not when we tried it as an API)
277

278
        if not try_as_api or not Status.is_ok(download_status_id):
279
            extra_args = [e.url_redirected_to] if 'url_redirected_to' in e else []
280
            _save(download_status_id, e, resource, *extra_args)
281
282
            return

283
284
285
    if not requires_archive:
        # We don't need to archive if the remote content has not changed
        return None
286

287
    # Archival
288
289
    log.info('Attempting to archive resource')
    try:
290
        archive_result = archive_resource(context, resource, log, download_result)
291
292
    except ArchiveError, e:
        log.error('System error during archival: %r, %r', e, e.args)
293
        _save(Status.by_text('System error during archival'), e, resource, download_result['url_redirected_to'])
294
        return
295

296
    # Success
297
    _save(Status.by_text('Archived successfully'), '', resource,
298
            download_result['url_redirected_to'], download_result, archive_result)
299

300
301
    # The return value is only used by tests. Serialized for Celery.
    return json.dumps(dict(download_result, **archive_result))
302

303

304
def download(context, resource, url_timeout=30,
305
             max_content_length='default',
306
307
             method='GET'):
    '''Given a resource, tries to download it.
308

309
310
    Params:
      resource - dict of the resource
311

312
313
    Exceptions from tidy_url may be propagated:
       LinkInvalidError if the URL is invalid
314

315
316
317
318
319
320
321
322
323
324
325
326
327
    If there is an error performing the download, raises:
       DownloadException - connection problems etc.
       DownloadError - HTTP status code is an error or 0 length

    If download is not suitable (e.g. too large), raises:
       ChooseNotToDownload

    If the basic GET fails then it will try it with common API
    parameters (SPARQL, WMS etc) to get a better response.

    Returns a dict of results of a successful download:
      mimetype, size, hash, headers, saved_file, url_redirected_to
    '''
328
    from ckanext.archiver import default_settings as settings
329
    log = update_resource.get_logger()
330

331
332
333
    if max_content_length == 'default':
        max_content_length = settings.MAX_CONTENT_LENGTH

334
    url = resource['url']
335

336
    url = tidy_url(url)
337

338
339
340
    if (resource.get('resource_type') == 'file.upload' and
            not url.startswith('http')):
        url = context['site_url'].rstrip('/') + url
341

342
343
    headers = _set_user_agent_string({})

344
345
346
    # start the download - just get the headers
    # May raise DownloadException
    method_func = {'GET': requests.get, 'POST': requests.post}[method]
347
348
    res = requests_wrapper(log, method_func, url, timeout=url_timeout,
                           stream=True, headers=headers)
349
    url_redirected_to = res.url if url != res.url else None
350

351
352
    if context.get('previous') and ('etag' in res.headers):
        if context.get('previous').etag == res.headers['etag']:
353
            log.info("ETAG matches, not downloading content")
354
355
            raise NotChanged("etag suggests content has not changed")

356
357
358
359
    if not res.ok:  # i.e. 404 or something
        raise DownloadError('Server reported status error: %s %s' %
                            (res.status_code, res.reason),
                            url_redirected_to)
David Read's avatar
David Read committed
360
    log.info('GET started successfully. Content headers: %r', res.headers)
361

362
363
    # record headers
    mimetype = _clean_content_type(res.headers.get('content-type', '').lower())
364

365
366
    # make sure resource content-length does not exceed our maximum
    content_length = res.headers.get('content-length')
367

368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
    if content_length:
        try:
            content_length = int(content_length)
        except ValueError:
            # if there are multiple Content-Length headers, requests
            # will return all the values, comma separated
            if ',' in content_length:
                try:
                    content_length = int(content_length.split(',')[0])
                except ValueError:
                    pass
    if isinstance(content_length, int) and \
       int(content_length) >= max_content_length:
            # record fact that resource is too large to archive
            log.warning('Resource too large to download: %s > max (%s). '
                        'Resource: %s %r', content_length,
                        max_content_length, resource['id'], url)
            raise ChooseNotToDownload('Content-length %s exceeds maximum '
                                      'allowed value %s' %
                                      (content_length, max_content_length),
                                      url_redirected_to)
    # content_length in the headers is useful but can be unreliable, so when we
    # download, we will monitor it doesn't go over the max.
391

392
393
394
    # continue the download - stream the response body
    def get_content():
        return res.content
395
    log.info('Downloading the body')
396
    content = requests_wrapper(log, get_content)
397

398
399
400
401
402
    # APIs can return status 200, but contain an error message in the body
    if response_is_an_api_error(content):
        raise DownloadError('Server content contained an API error message: %s' % \
                            content[:250],
                            url_redirected_to)
403

404
405
406
407
408
    content_length = len(content)
    if content_length > max_content_length:
        raise ChooseNotToDownload("Content-length %s exceeds maximum allowed value %s" %
                                  (content_length, max_content_length),
                                  url_redirected_to)
409

410
    log.info('Saving resource')
411
    try:
412
413
414
415
        length, hash, saved_file_path = _save_resource(resource, res, max_content_length)
    except ChooseNotToDownload, e:
        raise ChooseNotToDownload(str(e), url_redirected_to)
    log.info('Resource saved. Length: %s File: %s', length, saved_file_path)
416

417
418
419
420
421
422
423
    # zero length (or just one byte) indicates a problem
    if length < 2:
        # record fact that resource is zero length
        log.warning('Resource found was length %i - not archiving. Resource: %s %r',
                 length, resource['id'], url)
        raise DownloadError("Content-length after streaming was %i" % length,
                            url_redirected_to)
424

425
426
    log.info('Resource downloaded: id=%s url=%r cache_filename=%s length=%s hash=%s',
             resource['id'], url, saved_file_path, length, hash)
427

428
429
430
    return {'mimetype': mimetype,
            'size': length,
            'hash': hash,
431
            'headers': dict(res.headers),
432
433
434
            'saved_file': saved_file_path,
            'url_redirected_to': url_redirected_to,
            'request_type': method}
435

436

437
def archive_resource(context, resource, log, result=None, url_timeout=30):
438
    """
439
    Archive the given resource. Moves the file from the temporary location
440
    given in download().
441
442
443
444
445

    Params:
       result - result of the download(), containing keys: length, saved_file

    If there is a failure, raises ArchiveError.
446

447
    Returns: {cache_filepath, cache_url}
448
    """
449
    from ckanext.archiver import default_settings as settings
450
451
452
453
454
455
456
457
458
    relative_archive_path = os.path.join(resource['id'][:2], resource['id'])
    archive_dir = os.path.join(settings.ARCHIVE_DIR, relative_archive_path)
    if not os.path.exists(archive_dir):
        os.makedirs(archive_dir)
    # try to get a file name from the url
    parsed_url = urlparse.urlparse(resource.get('url'))
    try:
        file_name = parsed_url.path.split('/')[-1] or 'resource'
        file_name = file_name.strip()  # trailing spaces cause problems
459
        file_name = file_name.encode('ascii', 'ignore')  # e.g. u'\xa3' signs
460
461
462
463
464
465
466
467
468
469
470
471
472
    except:
        file_name = "resource"

    # move the temp file to the resource's archival directory
    saved_file = os.path.join(archive_dir, file_name)
    shutil.move(result['saved_file'], saved_file)
    log.info('Going to do chmod: %s', saved_file)
    try:
        os.chmod(saved_file, 0644)  # allow other users to read it
    except Exception, e:
        log.error('chmod failed %s: %s', saved_file, e)
        raise
    log.info('Archived resource as: %s', saved_file)
473

474
475
    # calculate the cache_url
    if not context.get('cache_url_root'):
David Read's avatar
David Read committed
476
477
        log.warning('Not saved cache_url because no value for '
                    'ckanext-archiver.cache_url_root in config')
478
        raise ArchiveError('No value for ckanext-archiver.cache_url_root in config')
479
480
481
482
    cache_url = urlparse.urljoin(context['cache_url_root'],
                                 '%s/%s' % (relative_archive_path, file_name))
    return {'cache_filepath': saved_file,
            'cache_url': cache_url}
483
484


David Read's avatar
David Read committed
485
def notify_resource(resource, queue, cache_filepath):
486
    '''
487
488
    Broadcasts an IPipe notification that an resource archival has taken place
    (or at least the archival object is changed somehow).
489
490
491
    '''
    archiver_interfaces.IPipe.send_data('archived',
                                        resource_id=resource['id'],
492
                                        queue=queue,
493
494
                                        cache_filepath=cache_filepath)

495

496
def notify_package(package, queue):
497
    '''
498
499
500
    Broadcasts an IPipe notification that a package archival has taken place
    (or at least the archival object is changed somehow). e.g.
    ckanext-packagezip listens for this
501
502
503
    '''
    archiver_interfaces.IPipe.send_data('package-archived',
                                        package_id=package['id'],
504
                                        queue=queue)
505
506


507
508
509
510
511
def get_plugins_waiting_on_ipipe():
    return [observer.name for observer in
            p.PluginImplementations(archiver_interfaces.IPipe)]


512
513
514
515
516
517
518
519
def _clean_content_type(ct):
    # For now we should remove the charset from the content type and
    # handle it better, differently, later on.
    if 'charset' in ct:
        return ct[:ct.index(';')]
    return ct


520
521
522
523
524
def _set_user_agent_string(headers):
    '''
    Update the passed headers object with a `User-Agent` key, if there is a
    USER_AGENT_STRING option in settings.
    '''
525
    from ckanext.archiver import default_settings as settings
526
527
528
529
530
531
    ua_str = settings.USER_AGENT_STRING
    if ua_str is not None:
        headers['User-Agent'] = ua_str
    return headers


532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
def tidy_url(url):
    '''
    Given a URL it does various checks before returning a tidied version
    suitable for calling.

    It may raise LinkInvalidError if the URL has a problem.
    '''

    # Find out if it has unicode characters, and if it does, quote them
    # so we are left with an ascii string
    try:
        url = url.decode('ascii')
    except:
        parts = list(urlparse.urlparse(url))
        parts[2] = urllib.quote(parts[2].encode('utf-8'))
        parts[1] = urllib.quote(parts[1].encode('utf-8'))
        url = urlparse.urlunparse(parts)
    url = str(url)

    # strip whitespace from url
    # (browsers appear to do this)
    url = url.strip()

    # Use urllib3 to parse the url ahead of time, since that is what
    # requests uses, but when it does it during a GET, errors are not
    # caught well
    try:
        parsed_url = urllib3.util.parse_url(url)
    except urllib3.exceptions.LocationParseError, e:
        raise LinkInvalidError('URL parsing failure: %s' % e)

David Read's avatar
David Read committed
563
564
    # Check we aren't using any schemes we shouldn't be.
    # Scheme is case-insensitive.
565
    if not parsed_url.scheme or not parsed_url.scheme.lower() in ALLOWED_SCHEMES:
566
567
568
569
570
571
572
573
574
        raise LinkInvalidError('Invalid url scheme. Please use one of: %s' %
                               ' '.join(ALLOWED_SCHEMES))

    if not parsed_url.host:
        raise LinkInvalidError('URL parsing failure - did not find a host name')

    return url


David Read's avatar
David Read committed
575
def _save_resource(resource, response, max_file_size, chunk_size=1024*16):
576
577
578
579
580
581
582
    """
    Write the response content to disk.

    Returns a tuple:

        (file length: int, content hash: string, saved file path: string)
    """
583
584
    resource_hash = hashlib.sha1()
    length = 0
585

586
    fd, tmp_resource_file_path = tempfile.mkstemp()
587

588
    with open(tmp_resource_file_path, 'wb') as fp:
589
590
        for chunk in response.iter_content(chunk_size=chunk_size,
                                           decode_unicode=False):
591
592
593
            fp.write(chunk)
            length += len(chunk)
            resource_hash.update(chunk)
594

595
            if length >= max_file_size:
596
597
598
                raise ChooseNotToDownload(
                    "Content length %s exceeds maximum allowed value %s" %
                    (length, max_file_size))
599

600
    os.close(fd)
601

602
    content_hash = unicode(resource_hash.hexdigest())
603
    return length, content_hash, tmp_resource_file_path
604

605

606
607
608
def save_archival(resource, status_id, reason, url_redirected_to,
                  download_result, archive_result, log):
    '''Writes to the archival table the result of an attempt to download
609
610
611
612
    the resource.

    May propagate a CkanError.
    '''
613
614
615
616
617
618
    now = datetime.datetime.now()

    from ckanext.archiver.model import Archival, Status
    from ckan import model

    archival = Archival.get_for_resource(resource['id'])
619
    first_archival = not archival
620
    previous_archival_was_broken = None
621
622
623
    if not archival:
        archival = Archival.create(resource['id'])
        model.Session.add(archival)
624
625
626
    else:
        log.info('Archival from before: %r', archival)
        previous_archival_was_broken = archival.is_broken
627
628
629
630
631
632

    revision = model.Session.query(model.Revision).get(resource['revision_id'])
    archival.resource_timestamp = revision.timestamp

    # Details of the latest archival attempt
    archival.status_id = status_id
633
    archival.is_broken = Status.is_status_broken(status_id)
634
635
636
637
638
639
640
641
642
643
    archival.reason = reason
    archival.url_redirected_to = url_redirected_to

    # Details of successful archival
    if archival.is_broken is False:
        archival.cache_filepath = archive_result['cache_filepath']
        archival.cache_url = archive_result['cache_url']
        archival.size = download_result['size']
        archival.mimetype = download_result['mimetype']
        archival.hash = download_result['hash']
644
645
        archival.etag = download_result['headers'].get('etag')
        archival.last_modified = download_result['headers'].get('last-modified')
646
647
648
649
650
651
652

    # History
    if archival.is_broken is False:
        archival.last_success = now
        archival.first_failure = None
        archival.failure_count = 0
    else:
653
654
655
656
        log.info('First_archival=%r Previous_broken=%r Failure_count=%r' %
                 (first_archival, previous_archival_was_broken,
                  archival.failure_count))
        if first_archival or previous_archival_was_broken is False:
657
658
659
660
661
            # i.e. this is the first failure (or the first archival)
            archival.first_failure = now
            archival.failure_count = 1
        else:
            archival.failure_count += 1
662

663
664
665
    archival.updated = now
    log.info('Archival saved: %r', archival)
    model.repo.commit_and_remove()
666

667
668

def requests_wrapper(log, func, *args, **kwargs):
669
    '''
670
671
    Run a requests command, catching exceptions and reraising them as
    DownloadException. Status errors, such as 404 or 500 do not cause
David Read's avatar
David Read committed
672
    exceptions, instead exposed as not response.ok.
673
    e.g.
674
    >>> requests_wrapper(log, requests.get, url, timeout=url_timeout)
675
    runs:
676
        res = requests.get(url, timeout=url_timeout)
677
    '''
678
    from requests_ssl import SSLv3Adapter
679
    try:
680
681
682
683
684
685
686
687
688
689
690
691
        try:
            response = func(*args, **kwargs)
        except requests.exceptions.ConnectionError, e:
            if 'SSL23_GET_SERVER_HELLO' not in str(e):
                raise
            log.info('SSLv23 failed so trying again using SSLv3: %r', args)
            requests_session = requests.Session()
            requests_session.mount('https://', SSLv3Adapter())
            func = {requests.get: requests_session.get,
                    requests.post: requests_session.post}[func]
            response = func(*args, **kwargs)

692
    except requests.exceptions.ConnectionError, e:
693
        raise DownloadException('Connection error: %s' % e)
694
    except requests.exceptions.HTTPError, e:
695
        raise DownloadException('Invalid HTTP response: %s' % e)
696
    except requests.exceptions.Timeout, e:
697
        raise DownloadException('Connection timed out after %ss' % kwargs.get('timeout', '?'))
698
    except requests.exceptions.TooManyRedirects, e:
699
        raise DownloadException('Too many redirects')
700
    except requests.exceptions.RequestException, e:
701
        raise DownloadException('Error downloading: %s' % e)
702
    except Exception, e:
703
704
        if os.environ.get('DEBUG'):
            raise
705
        raise DownloadException('Error with the download: %s' % e)
706
    return response
707

708

709
710
def ogc_request(context, resource, service, wms_version):
    original_url = url = resource['url']
711
712
713
    # Remove parameters
    url = url.split('?')[0]
    # Add WMS GetCapabilities parameters
714
715
    url += '?service=%s&request=GetCapabilities&version=%s' % \
           (service, wms_version)
716
    resource['url'] = url
717
718
719
720
721
    # Make the request
    response = download(context, resource)
    # Restore the URL so that it doesn't get saved in the actual resource
    resource['url'] = original_url
    return response
722

723

724
def wms_1_3_request(context, resource):
725
    res = ogc_request(context, resource, 'WMS', '1.3')
726
727
728
    res['request_type'] = 'WMS 1.3'
    return res

729

730
def wms_1_1_1_request(context, resource):
731
    res = ogc_request(context, resource, 'WMS', '1.1.1')
732
733
734
    res['request_type'] = 'WMS 1.1.1'
    return res

735

736
def wfs_request(context, resource):
737
    res = ogc_request(context, resource, 'WFS', '2.0')
738
739
740
    res['request_type'] = 'WFS 2.0'
    return res

741

742
743
744
def api_request(context, resource):
    '''
    Tries making requests as if the resource is a well-known sort of API to try
745
746
    and get a valid response. If it does it returns the response, otherwise
    Archives the response and stores what sort of request elicited it.
747
    '''
748
    log = update_resource.get_logger()
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
    # 'resource' holds the results of the download and will get saved. Only if
    # an API request is successful do we want to save the details of it.
    # However download() gets altered for these API requests. So only give
    # download() a copy of 'resource'.
    for api_request_func in wms_1_3_request, wms_1_1_1_request, wfs_request:
        resource_copy = copy.deepcopy(resource)
        try:
            download_dict = api_request_func(context, resource_copy)
        except ArchiverError, e:
            log.info('API %s error: %r, %r "%s"', api_request_func,
                     e, e.args, resource.get('url'))
            continue
        except Exception, e:
            if os.environ.get('DEBUG'):
                raise
            log.error('Uncaught API %s failure: %r, %r', api_request_func,
                      e, e.args)
            continue

        return download_dict

770
771
772
773
774
775
776

def is_id(id_string):
    '''Tells the client if the string looks like a revision id or not'''
    reg_ex = '^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$'
    return bool(re.match(reg_ex, id_string))


777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
def response_is_an_api_error(response_body):
    '''Some APIs return errors as the response body, but HTTP status 200. So we
    need to check response bodies for these error messages.
    '''
    response_sample = response_body[:250]  # to allow for <?xml> and <!DOCTYPE> lines

    # WMS spec
    # e.g. https://map.bgs.ac.uk/ArcGIS/services/BGS_Detailed_Geology/MapServer/WMSServer?service=abc
    # <?xml version="1.0" encoding="UTF-8" standalone="yes" ?>
    # <ServiceExceptionReport version="1.3.0"
    if '<ServiceExceptionReport' in response_sample:
        return True

    # This appears to be an alternative - I can't find the spec.
    # e.g. http://sedsh13.sedsh.gov.uk/ArcGIS/services/HS/Historic_Scotland/MapServer/WFSServer?service=abc
    # <ows:ExceptionReport version='1.1.0' language='en' xmlns:ows='http://www.opengis.net/ows'><ows:Exception exceptionCode='NoApplicableCode'><ows:ExceptionText>Wrong service type.</ows:ExceptionText></ows:Exception></ows:ExceptionReport>
    if '<ows:ExceptionReport' in response_sample:
        return True

796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822

@celery.task(name="archiver.clean")
def clean():
    """
    Remove all archived resources.
    """
    log = clean.get_logger()
    log.error("clean task not implemented yet")


@celery.task(name="archiver.link_checker")
def link_checker(context, data):
    """
    Check that the resource's url is valid, and accepts a HEAD request.

    Redirects are not followed - they simple return 'location' in the headers.

    data is a JSON dict describing the link:
        { 'url': url,
          'url_timeout': url_timeout }

    Raises LinkInvalidError if the URL is invalid
    Raises LinkHeadRequestError if HEAD request fails
    Raises LinkHeadMethodNotSupported if server says HEAD is not supported

    Returns a json dict of the headers of the request
    """
823
    log = update_resource.get_logger()
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
    data = json.loads(data)
    url_timeout = data.get('url_timeout', 30)

    error_message = ''
    headers = {'User-Agent': USER_AGENT}

    url = tidy_url(data['url'])

    # Send a head request
    try:
        res = requests.head(url, timeout=url_timeout)
        headers = res.headers
    except httplib.InvalidURL, ve:
        log.error("Could not make a head request to %r, error is: %s. Package is: %r. This sometimes happens when using an old version of requests on a URL which issues a 301 redirect. Version=%s", url, ve, data.get('package'), requests.__version__)
        raise LinkHeadRequestError("Invalid URL or Redirect Link")
    except ValueError, ve:
        log.error("Could not make a head request to %r, error is: %s. Package is: %r.", url, ve, data.get('package'))
        raise LinkHeadRequestError("Could not make HEAD request")
    except requests.exceptions.ConnectionError, e:
        raise LinkHeadRequestError('Connection error: %s' % e)
    except requests.exceptions.HTTPError, e:
        raise LinkHeadRequestError('Invalid HTTP response: %s' % e)
    except requests.exceptions.Timeout, e:
        raise LinkHeadRequestError('Connection timed out after %ss' % url_timeout)
    except requests.exceptions.TooManyRedirects, e:
        raise LinkHeadRequestError('Too many redirects')
    except requests.exceptions.RequestException, e:
        raise LinkHeadRequestError('Error during request: %s' % e)
    except Exception, e:
        raise LinkHeadRequestError('Error with the request: %s' % e)
    else:
        if res.status_code == 405:
            # this suggests a GET request may be ok, so proceed to that
            # in the download
            raise LinkHeadMethodNotSupported()
        if not res.ok or res.status_code >= 400:
            error_message = 'Server returned HTTP error status: %s %s' % \
                (res.status_code, res.reason)
            raise LinkHeadRequestError(error_message)
863
    return json.dumps(dict(headers))
864
865