# -*- coding: utf-8 -*-
"""Data source implementation for PubMed Medline data.
Data is expected to be on disk, hierarchically stored in the `source_path`.
"""
import codecs
import collections
import logging
import os
from squirro.dataloader.data_source import DataSource
log = logging.getLogger(__name__)
KEYS = ['AB', 'CI', 'AD', 'IRAD', 'AID', 'AU', 'AUID', 'FAU', 'BTI', 'CTI',
'CN', 'CRDT', 'DCOM', 'DA', 'LR', 'DEP', 'DP', 'EN', 'ED', 'FED',
'EDAT', 'GS', 'GN', 'GR', 'IR', 'FIR', 'ISBN', 'IS', 'IP', 'TA', 'JT',
'LA', 'LID', 'MID', 'MHDA', 'MH', 'JID', 'RF', 'OAB', 'OABL', 'OCI',
'OID', 'OT', 'OTO', 'OWN', 'PG', 'PS', 'FPS', 'PL', 'PHST', 'PST',
'PT', 'PUBM', 'PMC', 'PMCR', 'PMID', 'RN', 'NM', 'SI', 'SO', 'SFM',
'STAT', 'SB', 'TI', 'TT', 'VI', 'VTI']
class MedLineSource(DataSource):
def __init__(self):
self.args = None
def connect(self, inc_column=None, max_inc_value=None):
"""Create connection with the source."""
if not os.path.isdir(self.args.source_path):
raise IOError("Folder {} does not exist".format(self.args.source_path))
def disconnect(self):
"""Disconnect from the source."""
pass
def getDataBatch(self, batch_size):
"""
Generator - Get data from source on batches.
:returns a list of dictionaries
"""
for root, dirs, files in os.walk(self.args.source_path):
items = []
for fname in files:
item = self._parse_file(os.path.join(root, fname))
if item and (not item.get('TI') or not item.get('OWN')):
log.warn('Missing data %r', fname)
elif item:
items.append(item)
if items:
yield items
def getJobId(self):
"""
Return a unique string for each different select
:returns a string
"""
return os.path.basename(self.args.source_path)
def getSchema(self):
"""
Return the schema of the data set
:returns a List containing the names of the columns retrieved from the source
"""
return KEYS
def add_argsgetArguments(self, parser):
"""
AddReturn source arguments to the main arguments parser.
"""
source_optionsreturn = parser.add_argument_group("Source Options")[
source_options.add_argument('--source-path', {
"name": "source_path",
help='"help": "Path of MedLine data folder.')
",
}
return parser]
def _parse_file(self, file_name):
"""
:param file: Medline text file
:return: Dictionary with all the key/value pairs from the file.
Multi-value keys are joined with a pipe (`|`).
"""
ret = collections.defaultdict(list)
key = None
value = None
try:
with codecs.open(file_name, encoding='utf8') as file:
for line in file:
if 'Error occurred:' in line:
log.warn("Encountered error in file: %s", file_name)
return None
if line[0] == '<':
# Ignore the XML lines at the beginning and end.
continue
elif line[0:4].strip():
# This introduces a new key / value
if key:
ret[key].append(value)
key = line[0:4].strip()
value = line[6:].strip()
elif line.strip():
# No new key, this is a continuation of the value from
# the last key.
value += ' ' + line.strip()
if key:
ret[key].append(value)
except Exception as err:
log.error("Problem parsing file: %s with error %r", file_name, err)
item = {}
for key, value in ret.iteritems():
item[key] = '|'.join(value)
for key in KEYS:
if key not in item:
item[key] = None
return item |