data-digestor (1.0.0)
Installation
pip install --index-url --extra-index-url https://pypi.org/ data-digestorAbout this package
A highly configurable pipeline runner designed to facilitate data ingestion and processing on large datasets.
Data Digestor
Data Digestor is a library designed to facilitate the building of highly configurable pipelines that can ingest data from an outside source and perform a series of processing steps on the data.
I initially intend this package to be used for ingesting huge amounts of data into a Postgres database from the NOAA data sets. However, I think that it is sufficiently generic and flexible to be adaptable to other scenarios.
Installation
The data_digestor package can be installed from my private PyPi registry by using the following command:
pip install --index-url https://{username}:{password}@git.luchuktech.space/api/packages/Andrew/pypi/simple \
--no-deps data_digestor
In the command above, the {username} and {password} fields should be replaced with login credentials such as those
contained in the .pypirc file.
Usage
There are three main categories of development you will need to do in order to work with this project:
- Implementing a data retriever
- Building several
ProcessStepobjects. - Creating a configuration file.
I will walk through each step below
Data retriever
Data retriever is any Iterator that returns a DataChunk object. DataChunk is a small wrapper object around the
raw contents that exists to generate an initial ProcessFrame object that will be passed to subsequent ProcessStep
objects.
You can either use a Generator method or a callable object as long as the __call__ method returns an
Iterator[DataChunk].
Generator Example:
from data_digestor import DataChunk
def data_retriever():
filenames = ['data1.json', 'data2.json']
for file in filenames:
contents = ""
with open(file, 'r') as f:
contents = f.read()
yield DataChunk(contents)
A callable class can also be used:
import requests
from data_digestor import DataChunk
class DataRetriever:
def __init__(self):
pass # Do some setup
def __call__(self):
for i in range(0, self.request_num):
data = requests.get(self.url)
yield DataChunk(data)
Once you have a data retriever implementation, you should set the value of the RETRIEVER property in your config
to the data retriever you want to use:
# pipeline_conf.py
from my_package.data_retriever import data_retriever
RETRIEVER = data_retriever
ProcessStep
To use your pipeline, you will need one or more ProcessStep classes. Each process step should inherit from the
abstract ProcessStep class. Each will also need to implement an execute() method that mut return a ProcessFrame
instance. ProcessFrame is the medium of data exchange between ProcessStep instances and provides a get_data()
method to retrieve the data contained at that step. Data retrieval for a given ProcessFrame instance is performed
by the instance's Accessor instance. You can create your own Accessor instances if no existing one provides the
functionality you need to retrieve the data for a given ProcessFrame.
from data_digestor import ProcessStep, ProcessFrame
class PassThruStep(ProcessStep):
def execute(self) -> ProcessFrame:
# Perform work
return ProcessFrame(self.initial_frame.metadata, self.initial_frame.data_type)
Getting the data from the ProcessFrame should be straightforward if you are using a built in Accessor or if your
custom Accessor is working properly. You can simply call the get_data(). the return type of this method is entirely
dependent on the type of data your system imports with the data retriever.
For example:
from data_digestor import ProcessStep, ProcessFrame
class SimpleTransformStep(ProcessStep):
def transform_data(self, some_data):
pass
def execute(self) -> ProcessFrame:
data = self.initial_frame.get_data()
metadata = self.initial_frame.metadata
updated_data = self.transform_data(data)
metadata['data'] = updated_data
return ProcessFrame(metadata, type(updated_data).__name__)
Configuration
To build a pipeline, you need a configuration file. The configuration file is inspired by the configuration for Django. There are only three required properties in your configuration file, but you can use it to initialize data as needed or dynamically generate the properties.
For example:
# Configuration file
from my_pkg.data_retriever import data_retriever
from my_pkg.steps import StepOne, StepTwo, ProcessStepOne, ProcessStepTwo, ProcessStepThree
RETRIEVER = data_retriever
RETRIEVAL_STEPS = [
StepOne,
StepTwo,
# ...
]
PROCESSING_STEPS = [
ProcessStepOne,
ProcessStepTwo,
ProcessStepThree,
# ...
]
There are two phases of a pipeline run: the retrieval phase and the processing phase. The retrieval phase runs each step
listed in the RETRIEVAL_STEPS property for every data chunk that is returned by the data retriever. It is intended to
perform the bare minimum processing needed before handing the data to the processing steps. If you do not need to
perform any pre-processing steps on data chunks (for example, if you have only a single chunk of data) you can leave
RETRIEVAL_STEPS as an empty list.
The second phase is the processing phase which is where the bulk of the work should happen. You configrue steps of the
processing phase by adding classes that inherit from ProcessStep to the PROCESSING_STEPS property.
There is no essential difference between a ProcessStep that cn be used in the RETRIEVAL_STEPS property and a
ProcessStep that can be used in the PROCESSING_STEPS property, meaning they are technically interchangable. In
practice, however, it is probably not a good idea to use them interchangeably since the processing phase is intended to
do the heavy lifting and should run on the entire dataset whereas the retrieval steps run for each chunk of data and are
intended to be very lightweight.
Running your Pipeline
Once you have a pipeline configuration file and any steps you need, you can run your pipeline in a couple of ways.
- Run the included CLI script
- Run the module directly
- Run the pipeline programmatically.
1: Running the included CLI script
data-digestor pipeline_conf
2: Running the module directly
python -m data_digestor pipeline_conf
3: Run the pipeline programmatically
This is the most complex method to run your pipeline because it does not provide an out of the box experience, but it is the most flexible and may be more convenient in some automation scenarios.
To do this, you will need to instantiate a Configuration object and a Pipeline instance. Once you have a Pipeline
instance, it provides a simple run() method to run the file:
from data_digestor.config.configuration import Configuration
from data_digestor.pipeline.pipeline import build_pipeline
config = Configuration('my_pkg.pipeline_config') # Must be importable by `importlib`.
pipeline = build_pipeline(config)
pipeline.run()
The purpose of the Configuration object is to supply the RETRIEVER, RETRIEVAL_STEPS, and PROCESS_STEPS
properties to the Pipeline object. This means that you are free to dynamically generate some or all of these
properties in any way you choose.
Developing New Accessors
Accessors are the means by which a ProcessFrame retrieves data. The data can be stored in almost any manner including
in the ProcessFrame's metadata, so Accessors are needed to take the info contained in the ProcessFrame's metadata
and use that to extract the data in question.
There are two key methods that any Accessor implementation needs:
initialization_config()returns a dictionary containing any info that theAccessorneeds to run theget_data()method.get_data()the method that actually returns the data.
Depending on the complexity of the Accessor, you may not need a significant implementation for initialization_config
and could simply return an empty dictionary.
get_data(), on the other hand, is much more significant. get_data() performs the majority of the work of pulling
data from the ProcessFrame. You should rely on the ProcessFrame's metadata for initialization and such. The metadata
dictionary can contain any type of data or keys. The keys and contents will be set by the previous ProcessStep so it
will be important to have good documentation if you perform significant modifications to the ProcessFrame's metadata.
You will also need to add an __init__ method that accepts a ProcessFrame instance as its only parameter.
Example Accessor:
import json
from data_digestor.pipeline.process_frame import Accessor, ProcessFrame
class CustomAccessor(Accessor):
def __init__(self, frame: ProcessFrame):
self.frame = frame
self.required_keys = ['file_name', 'columns']
config = self.initialization_config(frame)
self.file_name = config['file_name']
self.columns = config['columns']
def initialization_config(self, frame: ProcessFrame) -> dict:
config = {}
for key in self.required_keys:
if not key in frame.metadata:
raise ValueError(f"Required key {key} is missing")
else:
config[key] = self.frame.metadata[key]
def get_data(self):
output_data = []
with open(self.file_name, 'r') as f:
for line in f:
line_data = json.loads(line)
output_data.append(list(filter(lambda c: c in self.columns, line_data)))
return output_data
# You can return any data by any mechanism that fits your purpose best