Last active
June 14, 2019 09:35
-
-
Save amercader/836c48ff0a1b141dc69bef17f1c9e82d to your computer and use it in GitHub Desktop.
Example plugin with some commonly used interfaces implemented
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def collection_show(context, data_dict): | |
model = context['model'] | |
user = context['user'] | |
id = toolkit.get_or_bust(data_dict, id) | |
collection = Collection.get(id) | |
if not collection: | |
raise toolkit.ObjectNotFound() | |
return colletion.as_dict() | |
@toolkit.chained_action | |
def user_show(next_action, context, data_dict): | |
user_dict = next_action(context, data_dict) | |
# Add some properties depending on custom logic | |
user_dict['collections_admin'] = True | |
return user_dict |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@toolkit.allow_anonymous_access | |
def collection_show(context, data_dict): | |
return {'success': True} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def collection_read(collection_id): | |
context = {u'model': model, u'user': toolkit.c.user} | |
data_dict = {'id': collection_id} | |
try: | |
collection = toolkit.get_action('collection_show')(context, data_dict) | |
except toolkit.NotAuthorized: | |
message = 'Unauthorized to read collection {0}'.format(collection_id) | |
return toolkit.abort(401, toolkit._(message)) | |
except toolkit.ObjectNotFound: | |
return toolkit.abort(404, toolkit._(u'Collection not found')) | |
return toolkit.render('collection/read.html', {'collection': collection}) | |
def collection_delete(collection_id): | |
# Similar to above | |
pass | |
class CollectionsCreateView(MethodView): | |
def post(self): | |
# Call the action to create the object in the DB | |
pass | |
def get(self): | |
# Render the form | |
pass | |
collections = Blueprint('collections', __name__) | |
collections.add_url_rule( | |
rule=u'/collection/<collection_id>', | |
endpoint='read', | |
view_func=collection_read, methods=['GET',] | |
) | |
collections.add_url_rule( | |
rule=u'/collection/new', | |
view_func=CollaboratorCreateView.as_view('new'), | |
methods=['GET', 'POST',] | |
) | |
collections.add_url_rule( | |
rule=u'/collection/<collection_id>/delete', | |
endpoint='delete', | |
view_func=collection_delete, methods=['POST',] | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def get_publisher_name(publisher_id): | |
# Do some magic | |
return publisher_name |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from ckan import plugins as p | |
from ckanext.myext import action, auth, blueprints, jobs, helpers | |
toolkit = p.toolkit | |
class ExamplePlugin(p.SingletonPlugin): | |
p.implements(p.IConfigurer, inherit=True) | |
p.implements(p.IActions, inherit=True) | |
p.implements(p.IAuthFunctions, inherit=True) | |
p.implements(p.IPackageController, inherit=True) | |
p.implements(p.IBlueprint, inherit=True) | |
p.implements(p.ITemplateHelpers, inherit=True) | |
p.implements(p.IFacets, inherit=True) | |
# IConfigurer | |
def update_config(self, config): | |
# Perform initialization tasks | |
toolkit.add_template_directory(config, 'templates') | |
toolkit.add_public_directory(config, 'public') | |
toolkit.add_resource('fanstatic', 'mysite') | |
# Check and validate config options | |
if not config.get('ckanext.mysite.custom_endpoint', '').startswith('/'): | |
raise ValueError( | |
'"ckanext.mysite.custom_endpoint" should start with a backslash (/)' | |
) | |
# Check requirements (eg CKAN version or DB tables) | |
if not check_ckan_version(min_version='2.8.0'): | |
raise Exception('This plugin requires CKAN 2.8 or higher') | |
# IActions | |
def get_actions(self): | |
return { | |
# Custom | |
'collection_show': actions.collection_show, | |
'collection_create': actions.collection_create, | |
'collection_delete': actions.collection_delete, | |
# Core | |
'user_show': actions.user_show, | |
} | |
# IAuthFunctions | |
def get_auth_functions(self): | |
return { | |
# Custom | |
'collection_show': actions.collection_show, | |
'collection_create': actions.collection_create, | |
'collection_delete': actions.collection_delete, | |
} | |
# IPackageController | |
def after_create(self, dataset_dict): | |
# At this point the dataset has been validated, ie it contains | |
# any custom fields defined in the extension | |
if dataset_dict.get('custom_field') == 'custom_value': | |
# Do something | |
pass | |
def after_update(self, dataset_dict): | |
if dataset_dict.get('pub_type') == 'external': | |
toolkit.enqueue_job(jobs.publish_to_repo, [dataset_dict]) | |
def before_index(self, dataset_dict): | |
# Index human-friendly field to use in faceting | |
dataset_dict['pub_type_label'] = get_pub_type_label( | |
dataset_dict.get('pub_type')) | |
# Remove fields that should not be available for searching | |
# or that cause problems in Solr | |
for field in ('big_json_blob', 'not_indexed'): | |
dataset_dict.pop(field, None) | |
return dataset_dict | |
# IBlueprint | |
def get_blueprint(self): | |
return blueprint.myext | |
# ITemplateHelpers | |
def get_helpers(self): | |
return { | |
'publisher_name': helpers.get_publisher_name, | |
} | |
# IFacets | |
def dataset_facets(self, facets_dict, package_type): | |
# Remove a facet we don't need | |
facets_dict.pop('groups', None) | |
# Add a custom one | |
facets_dict['my_custom_field'] = toolkit._('Custom field') | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment