Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 8 Next »

Data Loader Plugins are implemented by creating a Python class that inherits from the built-in DataSource class. This section documents all the methods that should be implemented to create such a plugin.

Table of Contents

 

Methods

connect

connect(self, inc_column=None, max_inc_value=None)

This method is used to create the connection to the source. This can be used to for example connect to the database, open file pointers, or initiate network connections.

The parameters inc_column and max_inc_value are used for incremental loading. They contain the name of the column on which incremental loading is done (specified on the command line with --incremental-column) and the maximum value of that column that was received in the previous load. Well-behaved data loader plugins should implement this property, so that only results are returned where inc_column >= max_inc_value. The data loader takes care of all the required book-keeping.

Data loader plugins that do not support incremental loading, should raise an error when this option is specified:

def connect(self, inc_column=None, max_inc_value=None):
    if inc_column:
        raise ValueError('Incremental loading not supported.')

Sometimes incremental column is only sensible and supported on one specific column. In that case, it is recommended to enforce that as well:

def connect(self, inc_column=None, max_inc_value=None):
    if inc_column and inc_column != 'updated_at':
        raise ValueError('Incremental loading is only supported on the updated_at column.')

disconnect

disconnect(self)

Disconnect from the source if needed: for database - close connection, for file – close file etc. If nothing needs to be disconnected, this can be implemented as a simple pass:

def disconnect(self):
    pass

getDataBatch

getDataBatch(batch_size)

This method returns the data in batches from the source. Use the yield keyword to iteratively "return" batches to the data loader. Return from the function when all the data has been returned.

Each batch is a list of dictionaries, where each dictionary is one record from the source and each key is the column name. This is the input used by the data loader in the various mapping options, facets and templates.

The batch size value is either determined by the data loader automatically, or set using the command line argument --source-batch-size.

A typical implementation of getDataBatch looks like this:

def getDataBatch(batch_size):
    start = 0
    while True:
        ret = self.getRecords(start, batch_size)
        if ret:
            yield ret
            start = start + batch_size
        else:
            return

getJobId

getJobId(self)

Return a unique identifier for the current job. This should take into account all the arguments. The job ID is used for locking (to prevent multiple runs of the same load) as well as for storing the incremental value.

getSchema

getSchema(self)

Returns the header of the data source (list containing the name of the source columns). This is used to decide the valid mapping options and to expand the wildcards inside the facets configuration file.

If this is dynamically decided, it may make sense to return all the keys from the first result from getDataBatch. In the example above (where a custom method getRecords does the actual work), this could be implemented like this:

def getSchema(self):
    items = self.getRecords(0, 1)
    if items:
        return items[0].keys()
    else:
        return []

getArguments

getArguments(self)

Return the list of arguments that the plugin accepts.

The result of this parsing is made available to the data loader plugin as the self.args object.

Each list item is a dictionary with the following options:

ParameterDescription
name

Mandatory - the name of the argument. Recommended naming convention is to keep it all lower case, and separate words with an underscore.

An option with the name mysource_password can be passed in from the command line as --mysource-password.

flagA short flag for the argument. Can be used to keep invocations of the data loader shorter, but this is used very sparingly. For example:
 
helpThe help string, output with --help.
requiredTrue if this argument is mandatory.
defaultThe default value, if the argument has not been specified.
typeThe data type that is expected. Defaults to string, valid values are string, int, float and bool.
actionThe argparse action for this option. Valid options are store, store_true and store_false. store expects a value to be specified, whereas store_true and store_false will always set the value to either True or False.

Examples

def getArguments(self):
    return [
        {
            "name": "file",
            "flag": "f",
            "help": "Excel file to load",
            "required": true,
        },
        {
            "name": "excel_sheet",
            "default": 0,
            "type": "int",
            "help": "Excel sheet name. Default: get first sheet.",
        },
    ]


def connect(self, inc_column=None, max_inc_value=None):
    # Just an example for how to access the options
    self._file = open(self.args.file)

Empty Plugin

This is a boilerplate template for an data loader plugin.

loader_plugin.py
"""
Data loader Plugin Template
"""
import hashlib
import logging

from squirro.dataloader.data_source import DataSource

log = logging.getLogger(__name__)


class TemplateSource(DataSource):
    """
    A Custom data loader Plugin
    """

    def __init__(self):
        pass

    def connect(self, inc_column=None, max_inc_value=None):
        log.debug('Incremental Column: %r', inc_column)
        log.debug('Incremental Last Value: %r', max_inc_value)

    def disconnect(self):
        """Disconnect from the source."""
        # Nothing to do
        pass

    def getDataBatch(self, batch_size):
        """
        Generator - Get data from source on batches.

        :returns a list of dictionaries
        """

        rows = []

        # This call should ideally `yield` and not return all items directly
        content = get_content_from_somewhere()

        for row in content:
            # Emit a `row` here that's flat dictionary. If that's not the case
            # yet, transform it here.
            # But do not return a Squirro item - that's the job of the data
            # loader configuration (facets and mapping).
            rows.append(row)
            if len(rows) >= batch_size:
                yield rows
                rows = []

        if rows:
            yield rows

    def getSchema(self):
        """
        Return the schema of the dataset
        :returns a List containing the names of the columns retrieved from the
        source
        """

        schema = [
            'title',
            'body',
            'created_at',
            'id',
            'summary',
            'abstract',
            'keywords'
        ]

        return schema

    def getJobId(self):
        """
        Return a unique string for each different select
        :returns a string
        """
        # Generate a stable id that changes with the main parameters
        m = hashlib.sha256()
        m.update(self.args.first_custom_param)
        m.update(self.args.second_custom_param)
        job_id = m.hexdigest()
        log.debug("Job ID: %s", job_id)
        return job_id
    def getArguments(self, parser):
        """
        Add source arguments to the main arguments parser
        """
        return [
            {
                'name': 'first_custom_param',
                'help': 'Custom data Loader Plugin Argument 1',
            },
            {
                'name': 'second_custom_param',
                'help': 'Custom Data Loader Plugin Argument 2',
            },
        ]
  • No labels