Excerpt |
---|
Pipelets are plugins to the Squirro pipeline, used to customize the data processing. |
Table of Contents
Table of Contents | ||||
---|---|---|---|---|
|
Overview
Items that are processed by Squirro go through a pipeline process before they are indexed. In that process a number of built-in pipeline steps are executed. On top of that, custom enrichment steps can be inserted in the form of pipelets. These pipelets are written in the Python programming language. Pipelets can be uploaded to the Squirro server and then be configured in the user interface (Enrichments tab) or through the API.
This reference documentation covers the basic workflow of working with pipelets and the interface that pipelets need to implement.
...
Writing Pipelets
Pipelets are written in Python. They need to inherit from the squirro.sdk.PipeletV1
class and implement the consume
method. The simplest possible pipelet looks like this:
Code Block | ||
---|---|---|
| ||
from squirro.sdk import PipeletV1
class NoopPipelet(PipeletV1):
def consume(self, item):
return item |
As it name says it does nothing but return the item
unchanged.
Modifying Items
The item
is a Python dict
type and can be modified before it is returned. The available item
fields are documented in the Item Format reference. The following example illustrates modifying an item
:
Code Block | ||
---|---|---|
| ||
from squirro.sdk import PipeletV1
class ModifyTitlePipelet(PipeletV1):
def consume(self, item):
item['title'] = item.get('title', '') + ' - Hello, World!'
return item |
This pipelet will modify each item
it processes, appending the string "Hello, World!" to the title.
Skipping Items
Items can be "skipped" ( i.e. not added to Squirro's search index ) by return None
instead of the item
for example;
Code Block | ||
---|---|---|
| ||
from squirro.sdk import PipeletV1
class SkipItemPipelet(PipeletV1):
def consume(self, item):
if not item.get('title', '').startswith('[IMPORTANT]')
return None
return item |
In this example we discard all items where the title does not start with the string "[IMPORTANT]
".
Returning multiple items
The pipelet is always called for each item individually. But in some use cases the pipelet should not just return one item but multiple ones. In those cases use the Python yield
statement to return each individual item. For example:
Code Block | ||
---|---|---|
| ||
from squirro.sdk import PipeletV1
class ExtendTitlePipelet(PipeletV1):
def consume(self, item):
for i in range(10):
new_item = dict(item)
new_item['title'] = '{0} ({1})'.format(item.get('title', ''), i)
yield new_item |
Dependencies
Pipelets are limited in what you can do. For example the print
statement is disallowed and you can not import any external libraries except squirro.sdk
. If you do need access to external libraries, you need to use the @require
decorator. For example to log some output:
Code Block | ||
---|---|---|
| ||
from squirro.sdk import PipeletV1, require
@require('log')
class LoggingPipelet(PipeletV1):
def consume(self, item):
self.log.debug('Processing item: %r', item['id'])
return item |
As seen from the example, the @require
decorator takes a name of a dependency. That dependency is then made available to the pipelet class.
HTTP requests can be executed by using the requests
dependency. The following pipelet shows an example for sentiment detection:
Code Block | ||
---|---|---|
| ||
from squirro.sdk import PipeletV1, require
@require('requests')
class SentimentPipelet(PipeletV1):
def consume(self, item):
text_content = ' '.join([item.get('title', ''),
item.get('body', '')])
res = self.requests.post('http://example.com/detect',
data={'text': text_content},
headers={'Accept': 'application/json'})
sentiment = res.json()['sentiment']
item.setdefault('keywords', {})['sentiment'] = [sentiment]
return item |
Available Dependencies
The following dependencies can be requested:
...
Dependency
...
Description
...
cache
...
Non-persisted cache.
...
log
...
A logging.Logger
instance from Python's standard logging framework.
...
requests
...
Python requests library for to execute HTTP requests.
...
files
...
Python component which provides access to data files on disk.
Default Configuration
Note |
---|
The default configuration handling is deprecated as of Squirro 2.3.10. For new pipelets please add a |
When adding enrichments in the user interface, the administrator can pass in configuration to the pipelet. For example the title modification pipelet could accept a custom suffix that is to be added to the title.
The configuration for this is provided in as JSON data structure:
...
This full data structure is then passed in to the pipelet constructor, where it can be retrieved. Usually it's then simply stored in a object variable so it can be used in the consume
method again.
Code Block | ||
---|---|---|
| ||
from squirro.sdk import PipeletV1
DEFAULT_SUFFIX = ' - Hello, World!'
class ModifyTitlePipelet(PipeletV1):
def __init__(self, config):
self.config = config
def consume(self, item):
suffix = self.config.get('suffix', DEFAULT_SUFFIX)
item['title'] = item.get('title', '') + suffix
return item |
In this example, when the suffix hasn't been provided, the default suffix is used.
Custom Configuration
Pipelets can define a custom set of configuration properties, which will be exposed in the UI as a form (instead of a JSON input). This makes the configuration much more user friendly, and is the recommended way. To define its configuration, a pipelet should implement the getArguments()
method.
The method is expected to return an array of objects defining each property. Inside each object, the fields name
, display_label
and type
are required. Optional field required
specifies whether the property is required to be filled by the user. Additionally, a property can be placed in an Advanced section of the configuration, by setting advanced
to True
on that property.
The type
should be one of the following: int
, string
, bool
, password
, code
.
Code Block | ||
---|---|---|
| ||
from squirro.sdk import PipeletV1
class ModifyTitlePipelet(PipeletV1):
# def __init__, def consume, etc.
@staticmethod
def getArguments():
return [
{
'name': 'commands',
'display_label': 'Source code',
'type': 'code',
'required': True,
},
{
'name': 'debug',
'display_label': 'Log debug output',
'type': 'bool',
'help': 'Logs debug output to the server log files',
'advanced': True,
},
] |
Documentation
A pipelet class can be documented using doc-strings. The first sentence (separated by period) is used as a summary in the user interface. All the remaining text is used as a description and is often used to document the expected configuration. The description is parsed as Markdown (using the CommonMark dialect). The 60-second overview serves as a good reference.
Code Block | ||
---|---|---|
| ||
from squirro.sdk import PipeletV1
DEFAULT_SUFFIX = ' - Hello, World!'
class ModifyTitlePipelet(PipeletV1):
"""Modify item titles.
This appends a suffix to the title of each item. When no suffix is
provided, it appends the default suffix of "- Hello, World!".
"""
def __init__(self, config):
self.config = config
def consume(self, item):
suffix = self.config.get('suffix', DEFAULT_SUFFIX)
item['title'] = item.get('title', '') + suffix
return item
@staticmethod
def getArguments():
return [
{
'name': 'suffix',
'display_label': 'Suffix',
'type': 'string',
'default': DEFAULT_SUFFIX,
}
] |
Development Workflow
For developing pipelets, Squirro provides the pipelet
command line tool as part of the Toolbox.
Develop
The first step is to create the pipelet. In the following examples the pipelet will have been written to a file called pipelet.py
in the current directory.
pipelet.py
Code Block |
---|
from squirro.sdk import PipeletV1
class ModifyTitlePipelet(PipeletV1):
def consume(self, item):
item['title'] = item.get('title', '') + ' - Hello, World!'
return item |
Validate
On the command line execute the pipelet validate
command to verify that there are no errors in the pipelet code. For example this will ensure that no modules are imported that are disallowed from pipelets. See the section on Dependencies for more information.
Code Block | ||
---|---|---|
| ||
pipelet validate pipelet.py |
Test
The pipelet consume
command can be used to simulate pipelet running. For this purpose, the test items should be present in JSON text files on the disk. In the following example there is a item.json
file in the current directory with this contents:
item.json
Code Block | ||
---|---|---|
| ||
{
"title": "Sample",
"id": "first_item"
} |
To test the pipelet with this test file, use:
Code Block | ||
---|---|---|
| ||
pipelet consume pipelet.py -i item.json |
This command will output the items that have been returned by the pipelet:
Code Block | ||
---|---|---|
| ||
Loading items...
Loading item.json ...
Loaded.
Consuming item first_item
yielded item
{u'id': u'first_item', u'title': u'Sample - Hello, World!'} |
On top of these manual tests, automated tests can be implemented easily using the usual Python tools such as Nose.
Deploy
Once the pipelet is ready, it can be uploaded to the Squirro server. The pipelet upload
command achieves that:
Code Block | ||
---|---|---|
| ||
pipelet upload --token <your_token> --cluster <cluster> pipelet.py "Hello World" |
This will make the pipelet available with the name "Hello World". To update the pipelet code on the server, this command can be re-executed at any time.
To use this in a project use the Pipeline Editor to add the pipelet as an enrichment step to a pipeline.
Using additional files with Pipelets
In many cases, additional files like libraries and pre-trained models must be uploaded and used by a pipelet when it is run.
To accomplish this, additional files can be uploaded and accessed by the pipelet by following these steps:
...
Specify the additional files in the pipelet upload command. For Example:
Pipelet Upload Command
Code Block | ||
---|---|---|
| ||
pipelet upload \
--data-file 'resource.txt' \
--cluster <cluster> \
--token <your_token> \
'pipelet.py' \
'TestPipelet' |
In this example, we are uploading a file "resource.txt" along with the pipelet.
Access the contents of the file(s) from within the pipelet
Pipelet File
Code Block | ||
---|---|---|
| ||
from squirro.sdk import require, PipeletV1
@require('files')
class TestPipelet(PipeletV1):
def consume(self, item):
with self.files.get_file('resource.txt') as f:
data = f.read() |
Processing old items
Pipelets are only run for items that are processed in the system after the enrichment has been configured. For information on how to process old items with a pipelet, see Rerunning a Pipelet.
Adding Enrichments
...
Info |
---|
The pipelets documentation has moved to https://docs.squirro.com/en/latest/pipelets/index.html! You can find a tutorial here. |