Source code for rdc.etl.transform

# -*- coding: utf-8 -*-
#
# Copyright 2012-2014 Romain Dorgueil
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import itertools
import types
from abc import ABCMeta, abstractmethod
from rdc.etl import H
from rdc.etl.error import AbstractError
from rdc.etl.hash import Hash
from rdc.etl.io import STDIN, STDOUT, STDERR, InputMultiplexer, OutputDemultiplexer, End
from rdc.etl.stat import Statisticable
from rdc.etl.util import Timer


[docs]class ITransform: __metaclass__ = ABCMeta @abstractmethod
[docs] def transform(self, hash, channel=STDIN): """All input rows that comes to one of this transform's input channels will be passed to this method. If you only have one input channel, you can safely ignore the channel value, although you'll need it in method prototype.""" raise AbstractError(self.transform)
[docs]class Transform(ITransform, Statisticable): """Base class and decorator for transformations. .. automethod:: transform .. attribute:: INPUT_CHANNELS List of input channel names. .. attribute:: OUTPUT_CHANNELS List of output channel names Example:: >>> @Transform ... def my_transform(hash, channel=STDIN): ... yield hash.copy({'foo': hash['foo'].upper()}) >>> print list(my_transform( ... H(('foo', 'bar'), ('bar', 'alpha')), ... H(('foo', 'baz'), ('bar', 'omega')), ... )) [H{'foo': 'BAR', 'bar': 'alpha'}, H{'foo': 'BAZ', 'bar': 'omega'}] """ INPUT_CHANNELS = (STDIN, ) OUTPUT_CHANNELS = (STDOUT, STDERR, ) _name = None def __init__(self, transform=None, input_channels=None, output_channels=None): # Use the callable name if provided if transform and not self._name: self._name = transform.__name__ self.INPUT_CHANNELS = input_channels or self.INPUT_CHANNELS self.OUTPUT_CHANNELS = output_channels or self.OUTPUT_CHANNELS self._input = InputMultiplexer(self.INPUT_CHANNELS) self._output = OutputDemultiplexer(self.OUTPUT_CHANNELS) self._exec_time = 0.0 self._exec_count = 0 self._booted = False self._initialized = False self._finalized = False self.transform = transform or self.transform def __call__(self, *stream, **options): channel = options['channel'] if 'channel' in options else STDIN for hash in stream: if not isinstance(hash, Hash): hash = Hash(hash) for line in self.transform(hash, channel): yield line # ITransform implementation
[docs] def transform(self, hash, channel=STDIN): """Core transformation method that will be called for each input data row.""" raise AbstractError(self.transform) # IO related
def step(self, finalize=False): if not self._booted: # todo find something to make this work self.boot() self._booted = True if not self._initialized: self._initialized = True self.__execute_and_handle_output(self.initialize) try: # Pull data from the first available input channel (blocking) data, channel = self._input.get() # Execute actual transformation self.__execute_and_handle_output(self.transform, data, channel) finally: if finalize and not self._finalized: self._finalized = True self.__execute_and_handle_output(self.finalize) self._output.put_all(End) def boot(self): """Just before transformation is started, validate everything is ready.""" pass def initialize(self): """If you need to execute code before any item is transformed, this is the place.""" pass def finalize(self): """If you need to execute code after all items are transformed, this is the place. It's especially usefull for buffering transformations, or other blocking types.""" pass @property def virgin(self): """Whether or not this transformation already contains a yucca (spéciale dédicace).""" return not self._initialized and not self._finalized # Name, statistics and representation logic. This is basic but important, as it will serve visualisation/debugging # purpose. @property def __name__(self): return self._name or type(self).__name__ @__name__.setter def __name__(self, value): self._name = value def get_local_stats(self, debug=False, profile=False): if profile: return ( (u'τ', '%.2fs' % (self._exec_time, ), ), (u'ε', self._exec_count, ), (u'τ.ε⁻¹', ((self._exec_count > 0) and (u'%.1fms' % (1000 * self._exec_time / self._exec_count, )) or u'∞'), ), (u'ε.τ⁻¹', ((self._exec_time > 0) and (u'%.1f/s' % (self._exec_count / self._exec_time, )) or u'∞'), ), ) return () def get_stats(self, debug=False, profile=False): stats = itertools.chain( self._input.stats, self._output.stats, self.get_local_stats(debug=debug, profile=profile) ) return ( (name, stat, ) for name, stat in stats ) def __repr__(self): return u'<{0} {1}>'.format(self.__name__, self.get_unicode_stats()) # Private def __execute_and_handle_output(self, callable, *args, **kwargs): """Runs a transformation callable with given args/kwargs and flush the result into the right output channel.""" timer = Timer() with timer: results = callable(*args, **kwargs) self._exec_time += timer.duration # Put data onto output channels if isinstance(results, types.GeneratorType): while True: timer = Timer() with timer: try: result = results.next() except StopIteration as e: break self._exec_time += timer.duration self._exec_count += 1 self._output.put(result) elif results is not None: self._exec_count += 1 self._output.put(results) else: self._exec_count += 1