Commit 6cb455c5 authored by David Read's avatar David Read
Browse files

Celeryd2 command added, since the one in ckan core does not allow different...

Celeryd2 command added, since the one in ckan core does not allow different queues. Document how to set-up celery daemons.
parent 9a70e1a2
......@@ -63,7 +63,9 @@ To install ckanext-archiver:
config file (by default the config file is located at
``/etc/ckan/default/production.ini``).
5. Restart CKAN. For example if you've deployed CKAN with Apache on Ubuntu::
5. Install a Celery queue backend - see later section.
6. Restart CKAN. For example if you've deployed CKAN with Apache on Ubuntu::
sudo service apache2 reload
......@@ -109,6 +111,46 @@ NB Previously you needed both ckanext-archiver and ckanext-qa to see the broken
python ckanext/archiver/bin/migrate_task_status.py --write production.ini
Installing a Celery queue backend
---------------------------------
Archiver uses Celery to manage its 'queues'. You need to install a queue back-end, such as Redis or RabbitMQ.
Redis backend
-------------
Redis can be installed like this::
sudo apt-get install redis-server
Install the python library into your python environment::
/usr/lib/ckan/default/bin/activate/pip install redis==2.10.1
It must then be configured in your CKAN config (e.g. production.ini) by inserting a new section, e.g. before `[app:main]`::
[app:celery]
BROKER_BACKEND = redis
BROKER_HOST = redis://localhost/1
CELERY_RESULT_BACKEND = redis
REDIS_HOST = 127.0.0.1
REDIS_PORT = 6379
REDIS_DB = 0
REDIS_CONNECT_RETRY = True
Number of items in the queue 'bulk'::
redis-cli -n 1 LLEN bulk
See item 0 in the queue (which is the last to go on the queue & last to be processed)::
redis-cli -n 1 LINDEX bulk 0
To delete all the items on the queue::
redis-cli -n 1 DEL bulk
Config settings
---------------
......@@ -183,11 +225,12 @@ Using Archiver
First, make sure that Celery is running for each queue. For test/local use, you can run::
paster --plugin=ckan celeryd --queue=priority -c production.ini
paster --plugin=ckanext-archiver celeryd2 run all -c development.ini
and in a separate terminal::
However in production you'd run the priority and bulk queues separately, or else the priority queue will not have any priority over the bulk queue. This can be done by running these two commands in separate terminals::
paster --plugin=ckan celeryd --queue=bulk -c production.ini
paster --plugin=ckanext-archiver celeryd2 run priority -c production.ini
paster --plugin=ckanext-archiver celeryd2 run bulk -c production.ini
For production use, we recommend setting up Celery to run with supervisord.
For more information see:
......
......@@ -59,7 +59,7 @@ def migrate(options):
or res.hash
or res.size
or res.mimetype):
print add_stat('No archive data', res, stats)
add_stat('No archive data', res, stats)
continue
for field_name in ('status_id', 'is_broken', 'reason',
'last_success', 'first_failure',
......
import sys
import os
from pkg_resources import iter_entry_points, VersionConflict
import ConfigParser
from celery import Celery
from ckan.lib.cli import CkanCommand
class CeleryCmd(CkanCommand):
'''
Manages the Celery daemons. This is an improved version of CKAN core's
'celeryd' command.
Usage:
paster celeryd2 run [all|bulk|priority]
- Runs a celery daemon to run tasks on the bulk or priority queue
'''
summary = __doc__.split('\n')[0]
usage = __doc__
min_args = 0
max_args = 2
def __init__(self, name):
super(CeleryCmd, self).__init__(name)
self.parser.add_option('--loglevel',
action='store',
dest='loglevel',
default='INFO',
help='Celery logging - choose between DEBUG, INFO, WARNING, ERROR, CRITICAL or FATAL')
self.parser.add_option('--concurrency',
action='store',
dest='concurrency',
default='1',
help='Number of concurrent processes to run')
self.parser.add_option('-n', '--hostname',
action='store',
dest='hostname',
help="Set custom hostname")
def command(self):
"""
Parse command line arguments and call appropriate method.
"""
if not self.args or self.args[0] in ['--help', '-h', 'help']:
print self.usage
sys.exit(1)
cmd = self.args[0]
# Don't need to load the config as the db is generally not needed
#self._load_config()
# But we do want to get the filename of the ini
self._get_config()
# Initialise logger after the config is loaded, so it is not disabled.
#self.log = logging.getLogger(__name__)
if cmd == 'run':
queue = self.args[1]
if queue=='all':
queue = 'priority,bulk'
self.run_(loglevel=self.options.loglevel,
queue=queue,
concurrency=int(self.options.concurrency),
hostname=self.options.hostname)
else:
print 'Command %s not recognized' % cmd
sys.exit(1)
def run_(self, loglevel='INFO', queue=None, concurrency=None,
hostname=None):
default_ini = os.path.join(os.getcwd(), 'development.ini')
if self.options.config:
os.environ['CKAN_CONFIG'] = os.path.abspath(self.options.config)
elif os.path.isfile(default_ini):
os.environ['CKAN_CONFIG'] = default_ini
else:
print 'No .ini specified and none was found in current directory'
sys.exit(1)
#from ckan.lib.celery_app import celery
celery_args = []
if concurrency:
celery_args.append('--concurrency=%d' % concurrency)
if queue:
celery_args.append('--queues=%s' % queue)
if self.options.hostname:
celery_args.append('--hostname=%s' % hostname)
celery_args.append('--loglevel=%s' % loglevel)
argv = ['celeryd'] + celery_args
print 'Running: %s' % ' '.join(argv)
celery_app = self._celery_app()
celery_app.worker_main(argv=argv)
def _celery_app(self):
# reread the ckan ini using ConfigParser so that we can get at the
# non-pylons sections
config = ConfigParser.ConfigParser()
config.read(self.filename)
celery_config = dict(
CELERY_RESULT_SERIALIZER='json',
CELERY_TASK_SERIALIZER='json',
CELERY_IMPORTS=[],
)
for entry_point in iter_entry_points(group='ckan.celery_task'):
try:
celery_config['CELERY_IMPORTS'].extend(
entry_point.load()()
)
except VersionConflict, e:
error = 'ERROR in entry point load: %s %s' % (entry_point, e)
print error
pass
LIST_PARAMS = 'CELERY_IMPORTS ADMINS ROUTES'.split()
try:
for key, value in config.items('app:celery'):
celery_config[key.upper()] = value.split() \
if key in LIST_PARAMS else value
except ConfigParser.NoSectionError:
error = 'Could not find celery config in your ckan ini file (a section headed "[app:celery]".'
print error
sys.exit(1)
celery_app = Celery()
# Thes update of configuration means it is only possible to set each
# key once so this is done once all of the options have been decided.
celery_app.conf.update(celery_config)
celery_app.loader.conf.update(celery_config)
return celery_app
......@@ -82,6 +82,7 @@ setup(
entry_points='''
[paste.paster_command]
archiver = ckanext.archiver.commands:Archiver
celeryd2 = ckanext.archiver.command_celery:CeleryCmd
[ckan.plugins]
archiver = ckanext.archiver.plugin:ArchiverPlugin
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment