Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Excerpt

This tutorial goes step by step through using the Data Loader. This is first done by uploading a CSV file to Squirro. In the last step a custom source script is then implemented to implement a proprietary data format.

Table of Contents

Table of Contents
outlinetrue
excludeTable of Contents

...

Code Block
languagepy
titlemedline.py
# -*- coding: utf-8 -*-
"""Data source implementation for PubMed Medline data.

Data is expected to be on disk, hierarchically stored in the `source_path`.
"""
import codecs
import collections
import logging
import os

from squirro.dataloader.data_source import DataSource

log = logging.getLogger(__name__)


KEYS = ['AB', 'CI', 'AD', 'IRAD', 'AID', 'AU', 'AUID', 'FAU', 'BTI', 'CTI',
        'CN', 'CRDT', 'DCOM', 'DA', 'LR', 'DEP', 'DP', 'EN', 'ED', 'FED',
        'EDAT', 'GS', 'GN', 'GR', 'IR', 'FIR', 'ISBN', 'IS', 'IP', 'TA', 'JT',
        'LA', 'LID', 'MID', 'MHDA', 'MH', 'JID', 'RF', 'OAB', 'OABL', 'OCI',
        'OID', 'OT', 'OTO', 'OWN', 'PG', 'PS', 'FPS', 'PL', 'PHST', 'PST',
        'PT', 'PUBM', 'PMC', 'PMCR', 'PMID', 'RN', 'NM', 'SI', 'SO', 'SFM',
        'STAT', 'SB', 'TI', 'TT', 'VI', 'VTI']


class MedLineSource(DataSource):
    def __init__(self):
        self.args = None

    def connect(self, inc_column=None, max_inc_value=None):
        """Create connection with the source."""
        if not os.path.isdir(self.args.source_path):
            raise IOError("Folder {} does not exist".format(self.args.source_path))

    def disconnect(self):
        """Disconnect from the source."""
        pass

    def getDataBatch(self, batch_size):
        """
        Generator - Get data from source on batches.

        :returns a list of dictionaries
        """
        for root, dirs, files in os.walk(self.args.source_path):
            items = []
            for fname in files:
                item = self._parse_file(os.path.join(root, fname))
                if item and (not item.get('TI') or not item.get('OWN')):
                    log.warn('Missing data %r', fname)
                elif item:
                    items.append(item)
            if items:
                yield items

    def getJobId(self):
        """
        Return a unique string for each different select
        :returns a string
        """
        return os.path.basename(self.args.source_path)

    def getSchema(self):
        """
        Return the schema of the data set
        :returns a List containing the names of the columns retrieved from the source
        """
        return KEYS

    def add_argsgetArguments(self, parser):
        """
        AddReturn source arguments to the main arguments parser.
        """
        source_optionsreturn = parser.add_argument_group("Source Options")[
          source_options.add_argument('--source-path',  {
                "name": "source_path",
                help='"help": "Path of MedLine data folder.')
",
            }
       return parser]

    def _parse_file(self, file_name):
        """
        :param file: Medline text file
        :return: Dictionary with all the key/value pairs from the file.
                 Multi-value keys are joined with a pipe (`|`).
        """
        ret = collections.defaultdict(list)
        key = None
        value = None

        try:
            with codecs.open(file_name, encoding='utf8') as file:
                for line in file:
                    if 'Error occurred:' in line:
                        log.warn("Encountered error in file: %s", file_name)
                        return None
                    if line[0] == '<':
                        # Ignore the XML lines at the beginning and end.
                        continue
                    elif line[0:4].strip():
                        # This introduces a new key / value
                        if key:
                            ret[key].append(value)
                        key = line[0:4].strip()
                        value = line[6:].strip()
                    elif line.strip():
                        # No new key, this is a continuation of the value from
                        # the last key.
                        value += ' ' + line.strip()
                if key:
                    ret[key].append(value)

        except Exception as err:
            log.error("Problem parsing file: %s with error %r", file_name, err)

        item = {}
        for key, value in ret.iteritems():
            item[key] = '|'.join(value)
        for key in KEYS:
            if key not in item:
                item[key] = None
        return item

...

...