Writing a custom one-click connector

Introduction

One-click connectors are an easy way to fetch data from different sources without any configuration on the user side. By using OAuth2 authentication and providing pre-defined custom mappings, the whole loading process is very intuitive, easy and it is almost just ONE CLICK.

Prerequisites

To get started, install the Squirro Toolbox. It can be done it either using https://squirro.atlassian.net/wiki/spaces/DOC/pages/2949361. You will also need access to a Squirro server. This can be achieved either by using https://squirro.atlassian.net/wiki/spaces/BOX or launching a server on start.squirro.com.

Getting Started

This tutorial uses the code examples from the official OneDrive connector.

Setting an OAuth app

The first step to create a one-click connector is setting up an OAuth app. That process deeply depends on the provider. The commons things which are necessary in most cases are:

  • Creating an app

  • Defining scopes

  • Providing redirect URL

At the end of that process, you should have access to client keys.

Note: In most cases, the naming convention is Client ID and Client Secret, but it may differ depending on the provider (In Dropbox case this is App Key and App Secret).

There are some rules which should be followed:

Name of the application

When creating an app only for local testing don’t use the names which can be used later to creating official apps e.g. “Squirro”, “Squirro OneDrive Connector” etc. In some cases it makes impossible to use these names in official apps later.

When creating an official Squirro app follow the naming convention:

  • Squirro <PROVIDER> Connector

NOTE: Optional (if somehow above convention is not possible):

  • Squirro <PROVIDER>

Scopes

As a general rule, choose the most narrowly focused scope possible, and avoid requesting scopes that creating app does not actually need.

Redirect URL:

When creating an app for local testing using a Squirro in a box, use the local host redirect URL e.g. http://localhost:8300/dataloader/onedrive_plugin/pl/azure/authorized

When creating an app to be used by a dataloader deployed on a server on http://start.squirro.com , please use the following URL:

  • https://start.squirro.com/oauth2_middleman

Example:

  • Squirro OneDrive Connector -> https://start.squirro.com/oauth2_middleman

Configuring INI files

OAuth2 client ID and Secret should be set in squirro INI files configuration. To do that edit the /etc/squirro/common.ini file and add the following lines:

1 2 3 [dataloader] <provider>_client_id = CLIENT ID <provider>_client_secret = CLIENT SECRET

Replace <provider> by the name of the related plugin name e.g:

1 2 onedrive_client_id = CLIENT ID onedrive_client_secret = CLIENT SECRET

Note: Use separate configuration keys, even if multiple plugins are using same OAuth2 provider. Each plugin should be configured to use its own OAuth2 Application.

For localhost host setup without HTTPS there is a need to also edit /etc/squirro/frontend.ini and add the following lines:

1 2 [dataloader] OAUTHLIB_INSECURE_TRANSPORT=true

NOTE: If the section [dataloader] already exists, copy the lines under the existing section.


When registering the OAuth2 application to http://start.squirro.com do not register plugin-specific URL, but instead register oauth2_middleman endpoint URL.

Redirect URI example: https://start.squirro.com/oauth2_middleman

Add the same URL to Squirro configuration file /etc/squirro/frontend.ini

1 2 [dataloader] oauth2_middleman_url=https://start.squirro.com/oauth2_middleman

Squirro needs to be restarted after changes to INI files .

Creating a plugin

Example of file tree for OneDrive connector:

File

Required?

Purpose

File

Required?

Purpose

__init__.py

 

marks it as Python package

README.md

 

escribes plugin installation steps, like OAuth2 app configuration

auth.py

 

deals with OAuth2 or other authorization process

dataloader_plugin.json

Yes

contains references to other files and plugin name

facets.json

OCC only

describes common selection of facets

icon.png

Yes

plugin icon

mappings.json

Yes

mapping of dataloader Items to Squirro fields

onedrive_plugin.py

Yes

Core code

pipeline_workflow.json

OCC only

onedrive_plugin/pipeline_workflow.json

requirements.txt

 

Describes python dependencies

scheduling_options.json

OCC only

Defines sane scheduling defaults

When creating your own plugin replace onedrive from file/folder names with name corresponding to connector target service. Use alphanumeric, lowercase characters.

OCC above stands for One-Click Connector.

Configuration files

To be able to create a one-click connector there is a need to provide pre-defined configuration files. These files speed up the plugin setup process and make it much easier for the user to load the data from the source. Explanation of each of the listed files can be found here: https://squirro.atlassian.net/wiki/spaces/DOC/pages/2177204350

dataloader_plugin.json

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 { "title": "OneDrive", "description": "Index files from Microsoft OneDrive and SharePoint", "plugin_file": "onedrive_plugin.py", "auth_file": "auth.py", "auth_description": "Authenticate with Microsoft Azure for Squirro to get permission to access your data.", "category": "enterprise", "thumbnail_file": "icon.png", "scheduling_options_file": "scheduling_options.json", "dataloader_options_file": "mappings.json", "pipeline_workflow_file": "pipeline_workflow.json" }

The file specifies general information about a plugin. Title, description or category are described here. It also specifies which files should be loaded to authorization, scheduling etc.

mappings.json

1 2 3 4 5 6 7 8 9 10 { "map_id": "id", "map_title": "name", "map_created_at": "createdDateTime", "map_file_name": "name", "map_file_mime": "file.mimeType", "map_file_data": "content", "map_url": "webUrl", "facets_file": "facets.json" }

The file is used to set the mapping of various fields coming from source to corresponding Squirro item fields. In that place also a file containing the facets can be specified.

facets.json

1 2 3 4 5 6 7 8 9 10 { "createdBy.user.displayName": { "name": "creator", "display_name": "Creator", "visible": true, "searchable": true, "typeahead": true, "analyzed": true } }

It creates and specifies what facets should be use in the plugin. More information about facets: https://squirro.atlassian.net/wiki/spaces/DOC/pages/55738382

Pay close attention on how you set up the facets.

  • Each added facets increases the index size and slows down query times.

  • Use only a few which can be useful for the end-user and be consistent across plugins.

  • If a facet is only needed for filtering or simple aggregations, set analyzed to false to reduce size / performance impact.

  • Check if similar facets were already used in existing plugins and try to use the same naming convention e.g. Author or Owner facet in new plugin could be treated also as Creator in others

pipeline_workflow.json

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 { "steps": [ { "config": { "policy": "replace" }, "id": "deduplication", "name": "Deduplication", "type": "deduplication" }, { "config": { "fetch_link_content": false }, "id": "content-augmentation", "name": "Content Augmentation", "type": "content-augmentation" }, { "config": {}, "id": "content-conversion", "name": "Content Extraction", "type": "content-conversion" }, { "id": "language-detection", "name": "Language Detection", "type": "language-detection" }, { "id": "cleanup", "name": "Content Standardization", "type": "cleanup" }, { "id": "index", "name": "Indexing", "type": "index" }, { "id": "cache", "name": "Cache Cleaning", "type": "cache" } ] }

Here the steps to create pipeline workflow are described. More about pipeline workflows: https://squirro.atlassian.net/wiki/spaces/DOC/pages/281772038

NOTE: If your plugin loads binary data such as PDF files Content Augmentation and Content Conversion steps are mandatory.

scheduling_options.json

1 2 3 4 { "schedule": true, "repeat": "2h" }

The file specifies parameters for scheduling the plugin.

facets_extended.json

This file is optional, and it is not used by the plugin, but it only serves to stash some advanced facets in case of future using. If you are able to extract some information from fetched data, and they can be useful in the future you can place them in that file. It allows to just copy the facets to facets.json file instead of reading the documentation and code again to extract useful information.

icon.png

Thumbnail of the dataloader displaying on the frontend. Keep size between 512x512px to 16x16px.

README.md

Each plugin should contain a Readme file with instructions on how to configure and use the connector. It is supposed to describe in detail the process how to configure the OAuth app on the provider side, which scopes are necessary, and how to set up and upload the dataloader.

requirements.txt

1 2 3 flask==2.0.1 flask_dance requests

The file lists all the required packages, with one package dependency per line. More information about data loader dependencies: https://squirro.atlassian.net/wiki/spaces/DOC/pages/1859649682

e file:

1 2 3 4 5 6 7 8 config = get_injected("config") auth_tools.configure_oauth2_lib(config) client_id = config.get("dataloader", "onedrive_client_id", fallback=None) client_secret = config.get("dataloader", "onedrive_client_secret", fallback=None) if not client_id or not client_secret: log.warning("Client keys are missing in %s plugin", target_name)

The code above loads client keys from the environment variables and configure necessary OAuth2 options.

NOTE: Pay attention to provide fallback values to the keys and log a warning if that keys won’t be provided. Authorization files for all plugins are loaded when the frontend is initialized, so fallback values prevent the error occurs when some keys has not been configured.

 

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 plugin = DataloaderFrontendPlugin(__name__) auth_blueprint = make_azure_blueprint( client_id=client_id, client_secret=client_secret, scope=[ "offline_access", "https://graph.microsoft.com/Files.Read.All", "https://graph.microsoft.com/GroupMember.Read.All", "https://graph.microsoft.com/Sites.Read.All", "https://graph.microsoft.com/User.ReadBasic.All", "https://graph.microsoft.com/User.Read", ], redirect_to="done", login_url="/login", session_class=auth_tools.flask_dance_session_factory(config), ) plugin.register_blueprint(auth_blueprint, url_prefix="/pl")

Next step is creating and registering blueprint. To do that https://github.com/singingwolfboy/flask-dance library is used. Necessary information like client keys and scopes are passed as arguments to the creating function. Other arguments:

redirect_to - the name of the view to redirect to after the authentication flow is complete.

login_url - the URL path for the login view.

session_class - provides session based on whether oauth2_middleman_url is set up in the config file.

NOTE: Refresh token is a necessary factor for the proper running plugin. It is used to acquire a new access token when it expired. Pay attention to specify a scope or parameter which is responsible for returning the refresh token from the provider ("offline_access" in the example above).

NOTE: Flask-Dance library provides blueprints for many well-known providers however sometimes you may not find out-of-the-box solution for your connector. In that case you can easily write your custom blueprint based on existing ones https://github.com/singingwolfboy/flask-dance/tree/main/flask_dance/contrib.

 

1 2 3 4 5 6 @plugin.route("/") def start(): session["next_url"] = auth_tools.assert_valid_next_url(request.args["back_to"]) login_url = url_for(f"{auth_blueprint.name}.login") log.info("Redirecting to %r with next URL %r", login_url, session["next_url"]) return redirect(login_url)

The whole OAuth flow starts from / endpoint. The back_to URL is saved in the session in order to know where to come back after the flow will be done. After saving URL there is a redirection to the login page of the provider.

assert_valid_next_url() checks if next_url’s hostname is the same as the requesting one. It provides minimal security for a situation when the user specifies an external URL as the one where he wants to go after authorization.

 

1 2 3 4 5 6 7 8 9 10 11 12 13 14 @plugin.route("/done") def done(): next_url = session["next_url"] start_url = f"{url_for('start')}?{urllib.parse.urlencode({'back_to': next_url})}" token = oauth.token # populated by flask-dance if not token: log.warning("OAuth token not acquired, redirecting to OAuth `start`") return redirect(start_url) log.info("Token scope: %s", sorted(token["scope"])) token_as_str = _pack_token(token) log.info("Redirecting to %r with token of len %d", next_url, len(token_as_str)) next_url = f"{next_url}?{urllib.parse.urlencode({'token': token_as_str})}" return redirect(next_url)

When the authorization process is completed, the view defined as redirect_to parameter in the blueprint is loaded. In the above case this is the done view.

If the token is populated in the oauth storage it means that the user is authorized. Otherwise, the user is redirected back to the / endpoint. When authorization goes properly, the token is passed as query parameters to the next URL.

 

1 2 3 4 5 6 7 8 9 10 11 12 13 @plugin.route("/verify", methods=["POST"]) def verify_token(): token = request.json["token"] try: login = get_account_id(token) except TokenExpiredError: log.warning("Token expired") token_delete() raise log.info("Successful login with %s", target_name) return jsonify( {"message": f"You are logged into {target_name} as {html.escape(login)}."} )

The last endpoint to define is /verify. It is used on the frontend to determine and display the logged user identity and should return the message containing information about the actual logged user.

The full code of auth file:

https://github.com/squirro/dataloader-plugins/blob/master/onedrive_plugin/auth.py

Dataloader file:

Dataloader file must define custom Source class which inherits from DataSource class. The dataloader plugin boilerplate you can find here: https://squirro.atlassian.net/wiki/spaces/DOC/pages/1859682364. Details about DataSource class and its method you can find here: https://squirro.atlassian.net/wiki/spaces/DOC/pages/30670956

DataSource

connect
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 def connect(self, inc_column: Optional = None, max_inc_value: Optional = None): self._stats_reset() if self.args.reset: log.info("Resetting key-value stores") self.key_value_cache.clear() self.key_value_store.clear() token = self.args.token log.debug("Token keys %r", sorted(token.keys())) # see if we have access token from previous run access_token, access_token_expires_at = self._load_access_token() if access_token: log.debug("Using access_token from previous run") else: log.debug("Using access_token supplied in args") access_token = token.get("access_token") access_token_expires_at = token.get("expires_at") log.debug("access_token expires_at %s", access_token_expires_at) config = get_injected("config") self.client = OnedriveClient( access_token=access_token, access_token_expiration=( datetime.utcfromtimestamp(int(access_token_expires_at)) if access_token_expires_at else None ), refresh_token=token.get("refresh_token"), client_id=config.get("dataloader", "Onedrive_client_id"), client_secret=config.get("dataloader", "Onedrive_client_secret"), scope=token.get("scope"), )

self.args.reset gives the user the possibility to reset the actual state of cache and storage. More about caching and storage: https://squirro.atlassian.net/wiki/spaces/DOC/pages/346488887.

As was mentioned before after authorization is completed the token is passed to DataSource class using query parameters. self.args.token variable can be used to retrieve that token. However, the access token can be refreshed many times during running data loader. Because of that checking first if the token is not already saved in the storage gives the ability to using the latest token and not the old one got during OAuth flow. If a token is not provided in the storage it is loaded from the query parameters.

The next step is to define a client. Here any third-part library or a custom client can be used. The client has to handle the refreshing token mechanism to be able to fetch a new access token after expiration. Because of that pay attention if a third-party library implements that solution, in the other case creating a custom client is recommended.

disconnect
1 2 3 4 5 6 7 8 9 10 11 12 def disconnect(self): log.info("Disconnecting; cleaning up session & dumping state") if self.client: self._save_access_token( self.client.access_token, self.client.access_token_expiration.timestamp() if self.client.access_token_expiration else None, ) self.client.close() self.client = None self._stats_log()

When the dataloader finishes its run or any error occurs, the disconnect method is called. Here all the tear down functions should be called. In the example above the access token is saved to the storage, the client is closed and stats are logged.

NOTE: As was mentioned before, saving access token let us to keep the latest token (token with the longest expiration time). During the run the access token can be refreshed several times, so it’s good idea to save the latest one after the dataloader finishes its run. It provides that in the next run the latest token is loaded from the storage and not the old one provided in query parameters (as was showed in the connect method) .

getDataBatch
1 2 3 4 def getDataBatch( self, batch_size: Optional[int] = None ) -> Generator[List[Dict[str, Any]], None, None]: pass

The method is the core of the data loading process. It contains the whole instruction of how data is fetched and process. This method returns dictionary structure which later data loader convert into Squirro items.

The getDataBatch method deeply depends on the way how a provider returns the data, what is the type of data and what process is needed to extract useful information.

NOTE: Displaying binary data on the preview dashboard is not possible. Because of that when the dataloader is running in the preview mode, the binary content of the files should not be downloaded. Moreover, it speeds up the loading preview process. To do that in place of binary content some dummy string should be provided e.g:
item["file_content"] = "BINARY CONTENT [disabled in Data Preview]" if preview else download_content()
More about preview mode: https://squirro.atlassian.net/wiki/spaces/DOC/pages/1859649653

getSchema
1 2 3 4 5 6 7 8 9 10 def getSchema(self): fields = set(get_mapped_fields()) # fetch few entries to gather possible fields # to speed things up do not fetch the file content for batch in self._get_data_batch(10, preview=True): for entry in batch: fields |= set(entry.keys()) fields = sorted(fields) log.info("Fields: %r", fields) return fields

This method is used to decide the valid mapping options. The best practice is to call getDataBatch method there, fetch some data examples and returns all their keys. In the example above 10 items are fetched and all their keys are returned. In simplest cases, where field list is not dynamic, you can always return a hardcoded python list.

NOTE: When calling getDataBatch here, the binary content of the files should not be downloaded (as was described below). It speeds up the process of loading keys to create schema.

getJobId
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def getJobId(self) -> str: """Generate a stable ID that changes with the main parameters.""" m = hashlib.blake2b(digest_size=20) for v in ( __plugin_name__, __version__, self.arg_index_all, self.arg_file_size_limit, self.arg_batch_size_limit, self.arg_download_media_files, self.arg_access_id, ): m.update(repr(v).encode()) job_id = base64.urlsafe_b64encode(m.digest()).rstrip(b"=").decode() log.debug("Job ID: %r", job_id) return job_id

Since the end users can configure multiple data sources with this plugin, we need to be able to distinguish them. The key reasons is to keep state and caching information of each instance separated. Therefore, this method is used to define a unique ID for the actual job. To provide that unique ID as many custom parameters as possible should be used. The common approach is used all arguments which can be set up by the user. In case of one click-connector passing refresh token or access token (arg_access_id in the above code) is one of the best option to provide unique argument to generate stable ID.

getArguments
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 def getArguments(self): return [ { "name": "file_size_limit", "display_label": "File Size Limit", "default": 50, "help": "Size limit in megabytes, if a file is bigger than " "this limit, the file content won't be downloaded.", "type": "int", "advanced": True, }, { "name": "batch_size_limit", "display_label": "Batch Size Limit", "default": 50, "help": "Size limit in megabytes for a batch of files (triggering early batch release if necessary).", "type": "int", "advanced": True, }, { "name": "index_all", "display_label": "Index All", "help": "If set, all files will be indexed, even if content" "cannot be retrieved. Those files will be indexed " "only with the metadata. These files can still be " "found using typeahead search and facet filtering", "type": "bool", "default": True, "action": "store_true", "advanced": True, }, { "name": "download_media_files", "display_label": "Download media files content", "help": "If set, content for media files (image, video, audio as" "determined by filename) will be downloaded." "Otherwise it will be skipped.", "type": "bool", "default": False, "action": "store_true", "advanced": True, }, ]

The method specifies custom arguments that allow the user to customize plugin.

If it is only possible the following common arguments should be implemented:

  • file_size_limit - restricts downloading of large files

  • batch_size_limit - allows to implement early batch releasing mechanism

  • download_media_files - gives the possibility to exclude media files

 

The full code of dataloader file:

https://github.com/squirro/dataloader-plugins/blob/master/onedrive_plugin/onedrive_plugin.py

Upload the plugin

After creating the plugin the following method is used to upload it on the cluster

squirro_asset dataloader_plugin upload --folder FOLDER_WITH_PLUGIN --token YOUR_TOKEN --cluster CLUSTER_IP

 

Mentioned links:

Toolbox: https://squirro.atlassian.net/wiki/spaces/DOC/pages/2949361

Squirro in Box: https://squirro.atlassian.net/wiki/spaces/BOX

Dataloader templates: https://squirro.atlassian.net/wiki/spaces/DOC/pages/2177204350

Facets: https://squirro.atlassian.net/wiki/spaces/DOC/pages/55738382

Pipeline worfklow: https://squirro.atlassian.net/wiki/spaces/DOC/pages/281772038

Dataloader dependecies: https://squirro.atlassian.net/wiki/spaces/DOC/pages/1859649682

Dataloader boilerplate: https://squirro.atlassian.net/wiki/spaces/DOC/pages/1859682364

Dataloader full reference: https://squirro.atlassian.net/wiki/spaces/DOC/pages/30670944

Caching and Storage: https://squirro.atlassian.net/wiki/spaces/DOC/pages/346488887

Preview mode: https://squirro.atlassian.net/wiki/spaces/DOC/pages/1859649653