Source code for rdc.etl.extra.db.extract
# -*- coding: utf-8 -*-
#
# Copyright 2012-2014 Romain Dorgueil
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from rdc.etl.transform.extract import Extract
[docs]class DatabaseExtract(Extract):
"""
Extract data from a database using some raw SQL and yield one output line per query result.
.. attribute:: engine
The sqlalchemy engine to use for extraction.
.. attribute:: query
The database query that will be used to extract data from database. Should not contain OFFSET/LIMIT, nor ";".
.. attribute:: pack_size
The number of records to retrieve at a time (will be used to add OFFSET/LIMIT clauses to SQL).
"""
query = 'SELECT 1'
pack_size = 1000
def __init__(self, engine, query=None, limit=None):
super(DatabaseExtract, self).__init__()
self.engine = engine
try:
self.query = query or self.query
except AttributeError as e:
pass
self.limit = limit
def extract(self):
query = self.query.strip()
if query[-1] == ';':
query = query[0:-1]
offset = 0
while not self.limit or offset * self.pack_size < self.limit:
_query = query + ' LIMIT ' + str(self.pack_size) + ' OFFSET ' + str(offset * self.pack_size) + ';'
results = self.engine.execute(_query, use_labels=True).fetchall()
if not len(results):
break
for row in results:
yield row
offset += 1