Skip to content

A high-performance data processing pipeline implementation in Go that provides efficient file processing with parallel compression and encryption.

License

Notifications You must be signed in to change notification settings

abitofhelp/multistage_pipeline_fanout

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

4 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

multistage_pipeline_fanout

Go Version License Test Coverage

A high-performance data processing pipeline implementation in Go that provides efficient file processing with parallel compression and encryption.

Overview

multistage_pipeline_fanout implements a multi-stage processing pipeline with concurrent execution of compression and encryption operations for improved performance. The pipeline:

  • Reads data from an input file in configurable chunks
  • Compresses the data using parallel workers
  • Encrypts the compressed data using parallel workers
  • Writes the processed data to an output file
  • Calculates SHA256 checksums on the fly for the input and output files to ensure data integrity
  • Collects detailed statistics about the processing

The application is designed with a focus on performance, reliability, and proper resource management, including graceful shutdown handling.

Project Structure

The project is organized into two main package hierarchies:

/pkg - Core Functionality

The /pkg directory contains packages that implement core, reusable functionality:

  • /pkg/compression - Provides core compression algorithms and utilities
  • /pkg/dataprocessor - Provides generic data processing with context awareness
  • /pkg/encryption - Provides core encryption algorithms and utilities
  • /pkg/errors - Custom error types and error handling utilities
  • /pkg/logger - Logging utilities
  • /pkg/stats - Statistics collection and reporting
  • /pkg/utils - General utility functions

/pkg/pipeline - Pipeline Integration

The /pkg/pipeline directory contains packages that integrate the core functionality into a processing pipeline:

  • /pkg/pipeline/compressor - Pipeline stage that uses the core compression functionality
  • /pkg/pipeline/encryptor - Pipeline stage that uses the core encryption functionality
  • /pkg/pipeline/processor - Generic pipeline stage processor
  • /pkg/pipeline/reader - Pipeline stage for reading input data
  • /pkg/pipeline/writer - Pipeline stage for writing output data
  • /pkg/pipeline/options - Configuration options for the pipeline

Why Similar Package Names Are Not Redundant

The packages in /pkg and /pkg/pipeline with similar names (e.g., compression vs compressor, encryption vs encryptor) serve different purposes and are not redundant:

  1. Core Packages (/pkg):

    • Implement the fundamental algorithms and utilities
    • Are context-aware but not pipeline-specific
    • Can be used independently outside the pipeline
    • Focus on the core functionality (compression, encryption, etc.)
  2. Pipeline Packages (/pkg/pipeline):

    • Integrate the core functionality into the pipeline architecture
    • Handle pipeline-specific concerns like channel communication
    • Manage concurrency, error handling, and statistics within the pipeline
    • Act as adapters between the core functionality and the pipeline framework

This separation allows for:

  • Better code organization and maintainability
  • Reuse of core functionality in different contexts
  • Independent testing of core algorithms and pipeline integration
  • Clearer separation of concerns

Usage

Prerequisites

  • Go 1.24 or later
  • Make

Building and Running

The project includes a comprehensive Makefile that provides various commands for building, testing, and running the application.

Basic Commands

# Build the application
make build

# Run the application with default input and output files
make run

# Run all tests
make test

# Run short tests (faster)
make test-short

# Run unit tests only (excluding integration tests)
make test-unit

# Run integration tests
make test-integration

# Run tests for a specific package
make test-package PKG=./pkg/compression

# Run tests with race detection
make test-race

# Run tests with coverage analysis
make coverage

# Clean build artifacts
make clean

# Format code
make fmt

# Run linter
make lint

# Run security check
make sec

# Run vulnerability scanning
make vuln

# Check Go version compatibility
make check-go-version

# Generate CHANGELOG
make changelog

# Docker operations
make docker-build  # Build Docker image
make docker-run    # Run in Docker container
make docker        # Build and run in Docker

# Generate and serve documentation
make doc

Advanced Commands

# Install dependencies including linting and security tools
make deps

# Update dependencies
make update-deps

# Run tests with a specific tag
make test-tag TAG=unit

# Run tests for CI environments
make test-ci

# Generate mocks for testing
make mocks

# Install the binary to GOPATH/bin
make install

# Run vulnerability scanning
make vuln

# Check Go version compatibility
make check-go-version

# Generate CHANGELOG
make changelog

# Build Docker image
make docker-build

# Run in Docker container
make docker-run

# Show all available commands
make help

Running the Application Directly

After building, you can run the application directly:

./build/multistage_pipeline_fanout <input_file_path> <output_file_path>

The application requires two command-line arguments:

  1. input_file_path: Path to the file to be processed
  2. output_file_path: Path where the processed data will be written

The application processes the input file through a pipeline that includes compression and encryption, then writes the result to the output file.

Running with Docker

The project includes Docker support for containerized execution:

# Build the Docker image
make docker-build

# Run the application in a Docker container
make docker-run

# Or do both in one command
make docker

You can also use Docker commands directly:

# Build the image
docker build -t multistage_pipeline_fanout:latest .

# Run the container
docker run --rm -v $(pwd)/input_file.txt:/app/input.txt -v $(pwd):/app/output multistage_pipeline_fanout:latest input.txt /app/output/output.bin

Configuration

The pipeline behavior can be configured through the options.DefaultPipelineOptions() function in the pkg/pipeline/options/options.go file. Key configurable parameters include:

  • ChunkSize: Size of data chunks read from the input file (default: 32KB)
  • CompressorCount: Number of parallel compression workers (default: 4)
  • EncryptorCount: Number of parallel encryption workers (default: 4)
  • ChannelBufferSize: Size of the channel buffers between pipeline stages (default: 16)

These options can be modified programmatically if you're using the pipeline as a library.

Project Architecture

The application follows a modular architecture with clear separation of concerns:

  1. Entry Point: The application entry point is in cmd/main.go, which parses command-line arguments and initializes the pipeline.

  2. Pipeline: The core pipeline implementation in pkg/pipeline/pipeline.go orchestrates the data flow through multiple stages.

  3. Pipeline Stages:

    • Reader (pkg/pipeline/reader): Reads data from the input file in chunks
    • Compressor (pkg/pipeline/compressor): Compresses data using the Brotli algorithm
    • Encryptor (pkg/pipeline/encryptor): Encrypts data using AES-GCM
    • Writer (pkg/pipeline/writer): Writes processed data to the output file
  4. Core Functionality: Implemented in separate packages for reusability:

    • Compression (pkg/compression): Compression algorithms and utilities
    • Encryption (pkg/encryption): Encryption algorithms and utilities
    • Statistics (pkg/stats): Collection and reporting of processing statistics
    • Logging (pkg/logger): Structured logging using Zap

Development Workflow

A typical development workflow might look like:

  1. Make changes to the code
  2. Format the code: make fmt
  3. Run the linter: make lint
  4. Run tests: make test
  5. Build the application: make build
  6. Run the application: make run or ./build/multistage_pipeline_fanout <input> <output>

Performance

multistage_pipeline_fanout is designed for high performance with parallel processing:

  • Multiple compression workers process chunks concurrently
  • Multiple encryption workers process compressed chunks concurrently
  • Buffered channels prevent pipeline stalls
  • Efficient memory management with controlled chunk sizes
  • Optimized Brotli compression

Performance can be tuned by adjusting the configuration parameters in options.DefaultPipelineOptions().

Examples

Basic File Processing

# Build the application
make build

# Process a 10MB JSON file
./build/multistage_pipeline_fanout input_10mb.jsonl output.bin

# Processing statistics are displayed automatically after completion
# No need to run additional commands to view statistics

Using as a Library

package main

import (
    "context"
    "log"

    "github.com/abitofhelp/multistage_pipeline_fanout/pkg/logger"
    "github.com/abitofhelp/multistage_pipeline_fanout/pkg/pipeline"
)

func main() {
    // Initialize logger
    log := logger.InitLogger()
    defer func() { logger.SafeSync(log) }()

    // Create context
    ctx := context.Background()

    // Process file
    stats, err := pipeline.ProcessFile(ctx, log, "input.txt", "output.bin")
    if err != nil {
        log.Fatal("Failed to process file", err)
    }

    // Use stats as needed
    log.Info("Processing complete", 
        "inputBytes", stats.InputBytes.Load(),
        "outputBytes", stats.OutputBytes.Load(),
        "compressionRatio", float64(stats.InputBytes.Load())/float64(stats.OutputBytes.Load()),
    )
}

Error Handling and Reliability

multistage_pipeline_fanout implements robust error handling and reliability features:

Custom Error Types

The application uses a custom error handling system (pkg/errors) that provides:

  • Categorized error types (I/O errors, timeout errors, cancellation errors, etc.)
  • Rich error context including stage, operation, time, and data size
  • Error aggregation for collecting multiple errors
  • Helper functions for error type checking

Signal Handling

The application implements advanced signal handling for graceful shutdown:

  • Handles SIGINT, SIGTERM, SIGHUP, and SIGQUIT
  • Implements a two-phase shutdown (graceful on first signal, forced on second)
  • Includes a 30-second timeout for graceful shutdown
  • Properly cleans up resources during shutdown

Context Propagation

All operations are context-aware, allowing for:

  • Cancellation propagation throughout the pipeline
  • Timeout handling at all stages
  • Proper resource cleanup on cancellation

Troubleshooting

Common Issues

  1. "Failed to open input file": Ensure the input file exists and has proper read permissions.

  2. "Failed to create output file": Ensure the output directory exists and has proper write permissions.

  3. "Context deadline exceeded": The processing took longer than the context timeout. For large files, consider using a context with a longer timeout.

  4. "Out of memory": If processing very large files, try reducing the chunk size in the options to lower memory usage.

  5. "Pipeline stage blocked": A pipeline stage is waiting too long to send data to the next stage. This could indicate a bottleneck in the pipeline. Try adjusting the number of workers or chunk size.

  6. "Operation canceled": The processing was canceled, either by a signal (Ctrl+C) or programmatically. This is a normal part of graceful shutdown.

Performance Issues

If you're experiencing performance issues:

  1. Try adjusting the number of compressor and encryptor workers based on your system's capabilities
  2. Experiment with different chunk sizes
  3. Ensure your storage devices have sufficient I/O performance
  4. Run with GODEBUG=gctrace=1 to monitor garbage collection overhead

Dependencies

This project relies on the following key dependencies:

For a complete list of dependencies, see the go.mod file.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Copyright (c) 2023 A Bit of Help, Inc.

About

A high-performance data processing pipeline implementation in Go that provides efficient file processing with parallel compression and encryption.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published