• Joined on 2025-07-30

data-digestor (1.0.2)

Published 2026-03-12 22:22:00 -04:00 by Andrew

Installation

pip install --index-url  --extra-index-url https://pypi.org/ data-digestor

About this package

A highly configurable pipeline runner designed to facilitate data ingestion and processing on large datasets.

Data Digestor

Data Digestor is a library designed to facilitate the building of highly configurable pipelines that can ingest data from an outside source and perform a series of processing steps on the data.

I initially intend this package to be used for ingesting huge amounts of data into a Postgres database from the NOAA data sets. However, I think that it is sufficiently generic and flexible to be adaptable to other scenarios.

Installation

The data_digestor package can be installed from my private PyPi registry by using the following command:

pip install --index-url https://{username}:{password}@git.luchuktech.space/api/packages/Andrew/pypi/simple \
  --no-deps data_digestor

In the command above, the {username} and {password} fields should be replaced with login credentials such as those contained in the .pypirc file.

Usage

There are three main categories of development you will need to do in order to work with this project:

  • Implementing a data retriever
  • Building several ProcessStep objects.
  • Creating a configuration file.

I will walk through each step below

Data retriever

Data retriever is any Iterator that returns a DataChunk object. DataChunk is a small wrapper object around the raw contents that exists to generate an initial ProcessFrame object that will be passed to subsequent ProcessStep objects.

You can either use a Generator method or a callable object as long as the __call__ method returns an Iterator[DataChunk].

Generator Example:

from data_digestor import DataChunk

def data_retriever():
    filenames = ['data1.json', 'data2.json']
    for file in filenames:
        contents = ""
        with open(file, 'r') as f:
            contents = f.read()

        yield DataChunk(contents)

A callable class can also be used:

import requests
from data_digestor import DataChunk

class DataRetriever:
    def __init__(self):
        pass # Do some setup

    def __call__(self):
        for i in range(0, self.request_num):
            data = requests.get(self.url)
            yield DataChunk(data)

Once you have a data retriever implementation, you should set the value of the RETRIEVER property in your config to the data retriever you want to use:

# pipeline_conf.py
from my_package.data_retriever import data_retriever

RETRIEVER = data_retriever

ProcessStep

To use your pipeline, you will need one or more ProcessStep classes. Each process step should inherit from the abstract ProcessStep class. Each will also need to implement an execute() method that mut return a ProcessFrame instance. ProcessFrame is the medium of data exchange between ProcessStep instances and provides a get_data() method to retrieve the data contained at that step. Data retrieval for a given ProcessFrame instance is performed by the instance's Accessor instance. You can create your own Accessor instances if no existing one provides the functionality you need to retrieve the data for a given ProcessFrame.

from data_digestor import ProcessStep, ProcessFrame

class PassThruStep(ProcessStep):
    def execute(self) -> ProcessFrame:
        # Perform work
        return ProcessFrame(self.initial_frame.metadata, self.initial_frame.data_type)

Getting the data from the ProcessFrame should be straightforward if you are using a built in Accessor or if your custom Accessor is working properly. You can simply call the get_data(). the return type of this method is entirely dependent on the type of data your system imports with the data retriever.

For example:

from data_digestor import ProcessStep, ProcessFrame

class SimpleTransformStep(ProcessStep):
    def transform_data(self, some_data):
        pass

    def execute(self) -> ProcessFrame:
        data = self.initial_frame.get_data()
        metadata = self.initial_frame.metadata

        updated_data = self.transform_data(data)
        metadata['data'] = updated_data

        return ProcessFrame(metadata, type(updated_data).__name__)

Configuration

To build a pipeline, you need a configuration file. The configuration file is inspired by the configuration for Django. There are only three required properties in your configuration file, but you can use it to initialize data as needed or dynamically generate the properties.

For example:

# Configuration file

from my_pkg.data_retriever import data_retriever
from my_pkg.steps import StepOne, StepTwo, ProcessStepOne, ProcessStepTwo, ProcessStepThree

RETRIEVER = data_retriever

RETRIEVAL_STEPS = [
    StepOne,
    StepTwo,
    # ...
]

PROCESSING_STEPS = [
    ProcessStepOne,
    ProcessStepTwo,
    ProcessStepThree,
    # ...
]

There are two phases of a pipeline run: the retrieval phase and the processing phase. The retrieval phase runs each step listed in the RETRIEVAL_STEPS property for every data chunk that is returned by the data retriever. It is intended to perform the bare minimum processing needed before handing the data to the processing steps. If you do not need to perform any pre-processing steps on data chunks (for example, if you have only a single chunk of data) you can leave RETRIEVAL_STEPS as an empty list.

The second phase is the processing phase which is where the bulk of the work should happen. You configrue steps of the processing phase by adding classes that inherit from ProcessStep to the PROCESSING_STEPS property.

There is no essential difference between a ProcessStep that cn be used in the RETRIEVAL_STEPS property and a ProcessStep that can be used in the PROCESSING_STEPS property, meaning they are technically interchangable. In practice, however, it is probably not a good idea to use them interchangeably since the processing phase is intended to do the heavy lifting and should run on the entire dataset whereas the retrieval steps run for each chunk of data and are intended to be very lightweight.

Running your Pipeline

Once you have a pipeline configuration file and any steps you need, you can run your pipeline in a couple of ways.

  1. Run the included CLI script
  2. Run the module directly
  3. Run the pipeline programmatically.

1: Running the included CLI script

data-digestor pipeline_conf

2: Running the module directly

python -m data_digestor pipeline_conf

3: Run the pipeline programmatically

This is the most complex method to run your pipeline because it does not provide an out of the box experience, but it is the most flexible and may be more convenient in some automation scenarios.

To do this, you will need to instantiate a Configuration object and a Pipeline instance. Once you have a Pipeline instance, it provides a simple run() method to run the file:

from data_digestor.config.configuration import Configuration
from data_digestor.pipeline.pipeline import build_pipeline

config = Configuration('my_pkg.pipeline_config') # Must be importable by `importlib`.

pipeline = build_pipeline(config)

pipeline.run()

The purpose of the Configuration object is to supply the RETRIEVER, RETRIEVAL_STEPS, and PROCESS_STEPS properties to the Pipeline object. This means that you are free to dynamically generate some or all of these properties in any way you choose.

Developing New Accessors

Accessors are the means by which a ProcessFrame retrieves data. The data can be stored in almost any manner including in the ProcessFrame's metadata, so Accessors are needed to take the info contained in the ProcessFrame's metadata and use that to extract the data in question.

There are two key methods that any Accessor implementation needs:

  • initialization_config() returns a dictionary containing any info that the Accessor needs to run the get_data() method.
  • get_data() the method that actually returns the data.

Depending on the complexity of the Accessor, you may not need a significant implementation for initialization_config and could simply return an empty dictionary.

get_data(), on the other hand, is much more significant. get_data() performs the majority of the work of pulling data from the ProcessFrame. You should rely on the ProcessFrame's metadata for initialization and such. The metadata dictionary can contain any type of data or keys. The keys and contents will be set by the previous ProcessStep so it will be important to have good documentation if you perform significant modifications to the ProcessFrame's metadata.

You will also need to add an __init__ method that accepts a ProcessFrame instance as its only parameter.

Example Accessor:

import json

from data_digestor.pipeline.process_frame import Accessor, ProcessFrame

class CustomAccessor(Accessor):
    def __init__(self, frame: ProcessFrame):
        self.frame = frame

        self.required_keys = ['file_name', 'columns']

        config = self.initialization_config(frame)

        self.file_name = config['file_name']
        self.columns = config['columns']

    def initialization_config(self, frame: ProcessFrame) -> dict:
        config = {}

        for key in self.required_keys:
            if not key in frame.metadata:
                raise ValueError(f"Required key {key} is missing")
            else:
                config[key] = self.frame.metadata[key]

    def get_data(self):
        output_data = []
        with open(self.file_name, 'r') as f:
            for line in f:
                line_data = json.loads(line)
                output_data.append(list(filter(lambda c: c in self.columns, line_data)))

        return output_data
        # You can return any data by any mechanism that fits your purpose best

Requirements

Requires Python: >=3.14
Details
PyPI
2026-03-12 22:22:00 -04:00
2
20 KiB
Assets (2)
Versions (4) View all
1.0.2 2026-03-12
1.0.0 2026-03-12
1.0.1 2026-03-12
0.1.0 2026-03-08