Data Loader Plugins are implemented by creating a Python class that inherits from the built-in DataSource
class. This section documents all the methods that should be implemented to create such a plugin.
Table of Contents
Methods
connect
connect(self, inc_column=None, max_inc_value=None)
This method is used to create the connection to the source. This can be used to for example connect to the database, open file pointers, or initiate network connections.
The parameters inc_column
and max_inc_value
are used for incremental loading. They contain the name of the column on which incremental loading is done (specified on the command line with --incremental-column
) and the maximum value of that column that was received in the previous load. Well-behaved data loader plugins should implement this property, so that only results are returned where inc_column >= max_inc_value
. The data loader takes care of all the required book-keeping.
Data loader plugins that do not support incremental loading, should raise an error when this option is specified:
def connect(self, inc_column=None, max_inc_value=None): if inc_column: raise ValueError('Incremental loading not supported.')
Sometimes incremental column is only sensible and supported on one specific column. In that case, it is recommended to enforce that as well:
def connect(self, inc_column=None, max_inc_value=None): if inc_column and inc_column != 'updated_at': raise ValueError('Incremental loading is only supported on the updated_at column.')
disconnect
disconnect(self)
Disconnect from the source if needed: for database - close connection, for file – close file etc. If nothing needs to be disconnected, this can be implemented as a simple pass
:
def disconnect(self): pass
getDataBatch
getDataBatch(batch_size)
This method returns the data in batches from the source. Use the yield
keyword to iteratively "return" batches to the data loader. Return from the function when all the data has been returned.
Each batch is a list of dictionaries, where each dictionary is one record from the source and each key is the column name. This is the input used by the data loader in the various mapping options, facets and templates.
The batch size value is either determined by the data loader automatically, or set using the command line argument --source-batch-size
.
A typical implementation of getDataBatch
looks like this:
def getDataBatch(batch_size): start = 0 while True: ret = self.getRecords(start, batch_size) if ret: yield ret start = start + batch_size else: return
getJobId
getJobId(self)
Return a unique identifier for the current job. This should take into account all the arguments. The job ID is used for locking (to prevent multiple runs of the same load) as well as for storing the incremental value.
getSchema
getSchema(self)
Returns the header of the data source (list containing the name of the source columns). This is used to decide the valid mapping options and to expand the wildcards inside the facets configuration file.
If this is dynamically decided, it may make sense to return all the keys from the first result from getDataBatch. In the example above (where a custom method getRecords
does the actual work), this could be implemented like this:
def getSchema(self): items = self.getRecords(0, 1) if items: return items[0].keys() else: return []
getArguments
getArguments(self)
Return the list of arguments that the plugin accepts.
The result of this parsing is made available to the data loader plugin as the self.args
object.
Each list item is a dictionary with the following options:
Parameter | Description |
---|---|
name | Mandatory - the name of the argument. Recommended naming convention is to keep it all lower case, and separate words with an underscore. An option with the name |
flag | A short flag for the argument. Can be used to keep invocations of the data loader shorter, but this is used very sparingly. For example: |
help | The help string, output with --help . |
required | True if this argument is mandatory. |
default | The default value, if the argument has not been specified. |
type | The data type that is expected. Defaults to string , valid values are string , int , float and bool . |
action | The argparse action for this option. Valid options are store , store_true and store_false . store expects a value to be specified, whereas store_true and store_false will always set the value to either True or False . |
Examples
def getArguments(self): return [ { "name": "file", "flag": "f", "help": "Excel file to load", "required": True, }, { "name": "excel_sheet", "default": 0, "type": "int", "help": "Excel sheet name. Default: get first sheet.", }, ] def connect(self, inc_column=None, max_inc_value=None): # Just an example for how to access the options self._file = open(self.args.file)
Empty Plugin
This is a boilerplate template for an data loader plugin.
""" Data loader Plugin Template """ import hashlib import logging from squirro.dataloader.data_source import DataSource log = logging.getLogger(__name__) class TemplateSource(DataSource): """ A Custom data loader Plugin """ def __init__(self): pass def connect(self, inc_column=None, max_inc_value=None): log.debug('Incremental Column: %r', inc_column) log.debug('Incremental Last Value: %r', max_inc_value) def disconnect(self): """Disconnect from the source.""" # Nothing to do pass def getDataBatch(self, batch_size): """ Generator - Get data from source on batches. :returns a list of dictionaries """ rows = [] # This call should ideally `yield` and not return all items directly content = get_content_from_somewhere() for row in content: # Emit a `row` here that's flat dictionary. If that's not the case # yet, transform it here. # But do not return a Squirro item - that's the job of the data # loader configuration (facets and mapping). rows.append(row) if len(rows) >= batch_size: yield rows rows = [] if rows: yield rows def getSchema(self): """ Return the schema of the dataset :returns a List containing the names of the columns retrieved from the source """ schema = [ 'title', 'body', 'created_at', 'id', 'summary', 'abstract', 'keywords' ] return schema def getJobId(self): """ Return a unique string for each different select :returns a string """ # Generate a stable id that changes with the main parameters m = hashlib.sha256() m.update(self.args.first_custom_param) m.update(self.args.second_custom_param) job_id = m.hexdigest() log.debug("Job ID: %s", job_id) return job_id def getArguments(self, parser): """ Add source arguments to the main arguments parser """ return [ { 'name': 'first_custom_param', 'help': 'Custom data Loader Plugin Argument 1', }, { 'name': 'second_custom_param', 'help': 'Custom Data Loader Plugin Argument 2', }, ]