Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

coop-scala Overview

Scala/Spark mono-repository for Path2Response’s core data processing infrastructure, including audience selection (preselect), householding, fulfillment, and analytics.

Purpose

coop-scala is the heart of P2R’s data processing. It contains all the Scala/Spark applications that run on AWS EMR (Elastic MapReduce) to:

  • Build audiences (Preselect) - Score and select prospects from the cooperative data using machine learning models
  • Process households (Households) - Match, deduplicate, and manage household data across the cooperative
  • Fulfill orders (Fulfillment) - Process keycoding and list preparation for direct mail campaigns
  • Generate statistics (Generate-Stats) - Create analytics and reports from customer transaction data
  • Handle third-party data (Third-Party, 4Cite) - Process external data sources including BigDBM and browse behavior data
  • Extract data (Extract) - Export addresses and identities for CASS/NCOA processing

Architecture

Repository Structure

coop-scala/                           # Root POM - Version 335.0.0-SNAPSHOT
├── scala-parent-pom/                 # Core libraries (non-Spark)
│   ├── common/                       # Shared utilities, collections
│   ├── cli/                          # Command-line interface utilities
│   ├── emr-core/                     # EMR launcher framework
│   │   └── emr-api/                  # EMR API, configuration, logging
│   ├── json/                         # JSON serialization
│   │   ├── json-definitions/         # Core JSON types
│   │   ├── json-definitions-4js/     # JSON for JavaScript compatibility
│   │   └── json-preselect/           # Preselect-specific JSON
│   ├── helpers/                      # Utility scripts (copy-mongo-to-s3)
│   ├── another-name-parser/          # Name parsing library
│   ├── modeling-support/             # ML/modeling utilities
│   ├── operations/                   # Ops automation tools
│   ├── spark-core/                   # Spark API wrappers
│   └── databricks-core/              # Databricks API (legacy)
│
├── spark-parent-pom/                 # Spark applications
│   ├── spark-common/                 # Shared Spark utilities
│   ├── spark-common-test/            # Spark testing framework
│   ├── spark-launcher/               # EMR job submission framework
│   │   ├── emr-launcher/             # Core EMR launcher
│   │   ├── emr-launcher-ext/         # Extended EMR functionality
│   │   ├── emr-revive/               # Retry/recovery (EmrReanimator)
│   │   └── spark-driver/             # Driver utilities
│   │
│   ├── preselect/                    # Audience selection (core logic)
│   │   ├── preselect-app/            # Main application
│   │   ├── preselect-core/           # Core algorithms
│   │   ├── preselect-classifiers/    # Household classification
│   │   ├── preselect-population/     # Population handling
│   │   ├── preselect-variables/      # Variable generation
│   │   ├── preselect-configuration/  # Configuration management
│   │   ├── preselect-callbacks/      # Monitoring hooks
│   │   ├── preselect-collections/    # Data structures
│   │   └── preselect-dates/          # Date handling
│   ├── preselect-emr/                # EMR deployment (main CLI)
│   ├── preselect-cli/                # Cloudera deployment (legacy)
│   ├── preselect-databricks/         # Databricks deployment (legacy)
│   │
│   ├── households/                   # Household processing (core)
│   │   ├── households-capture/       # Data capture processing
│   │   ├── households-common/        # Shared utilities
│   │   ├── households-reports/       # Reporting
│   │   └── households-stages-emr/    # EMR stages
│   ├── households-emr/               # EMR deployment
│   │
│   ├── fulfillment/                  # Order fulfillment (core)
│   ├── fulfillment-emr/              # EMR deployment
│   │
│   ├── generate-stats/               # Statistics generation
│   │   ├── generate-stats-core/      # Core logic
│   │   ├── generate-stats-emr/       # EMR deployment
│   │   └── docs/                     # Documentation
│   │
│   ├── 4cite/                        # Browse data (4Cite partner)
│   ├── 4cite-emr/                    # EMR deployment
│   │
│   ├── third-party/                  # Third-party integrations
│   ├── third-party-emr/              # EMR deployment (BigDBM, etc.)
│   │
│   ├── extract-data/                 # Data extraction core
│   ├── extract-emr/                  # EMR deployment
│   ├── extract-ncoa/                 # CASS/NCOA extraction
│   │
│   ├── convert-emr/                  # Data conversion
│   ├── digital-audience/             # Digital audience building
│   ├── digital-audience-emr/         # EMR deployment
│   ├── reseller-append/              # Reseller data processing
│   ├── reseller-emr/                 # EMR deployment
│   │
│   ├── databricks/                   # Databricks integration (legacy)
│   ├── coop-spark-automation/        # Automated Spark jobs
│   ├── integration-tests/            # Integration test suite
│   └── ops/                          # Operational automation
│
├── docu/                             # Legacy documentation
├── preselect-book/                   # Preselect mdbook docs
└── examples/                         # Example code

Technology Stack

ComponentVersionNotes
Scala2.12.18Locked to Spark version
Spark3.5.2Core processing engine
Hadoop3.4.0EMR container version
Java17Compilation target
Jackson2.15.2JSON serialization
ScalaTest3.2.19Testing framework

Build Tools:

  • Maven with custom parent POMs
  • Scalastyle for code quality
  • Scalariform for code formatting
  • Maven Shade Plugin for fat JARs (EMR deployment)

EMR Launcher Architecture

The EMR launcher system provides a sophisticated framework for submitting and monitoring Spark jobs:

Local Machine                          S3                          EMR Cluster
─────────────                          ──                          ───────────
Script (.sc)
    │
Launcher4Emr
    │ upload files
    ├─────────────────────────────→ input files
    │ serialize options
    ├─────────────────────────────→ options.json
    │
EmrSimpleLauncher
    │
EmrLauncher
    │ upload JAR
    ├─────────────────────────────→ job JAR
    │ submit cluster
    ├──────────────────────────────────────────→ Cluster Created
    │                                                    │
EmrProvider.waitForJob()                                │
    │ poll status (30s)                                 │
    ├────────────────────────────────────────→ get status
    │ write log                                         │
    ├─────────────────────────────→ job log             │
    │                                                    │ download JAR
    │                                                Driver4Emr runs
    │                                                    │
    │                                                Execute Spark Job
    │                                                    │
    │                              results ←────────────┤
    │ detect completion                                 │
    ├────────────────────────────────────────→ Cluster Terminated
    │
onAfterJobComplete()
    │ download results

Key Components:

  • Script Layer (*.sc) - Entry point bash/scala scripts
  • Launcher Layer (*Launcher4Emr) - Parse options, upload files, submit jobs
  • Simple Launcher (EmrSimpleLauncher) - Configure cluster specs
  • EMR Launcher Core (EmrLauncher) - Builder pattern for configuration
  • EMR Provider (EmrProvider) - Polling, monitoring, status logging
  • Driver Layer (*Driver4Emr) - Executes on EMR cluster

Spark Jobs

Preselect (Audience Selection)

Purpose: Score and select prospects from cooperative data for direct mail campaigns.

Key Scripts:

ScriptPurpose
preselect-emr.scMain audience selection job
variables-emr.scGenerate variables for household IDs
emailmatch-emr.scEmail matching operations
pdp-emr.scPDP (data protection) operations
pickbrowse-emr.scPick and browse operations
selectattributes-emr.scAttribute selection
title-transaction-counts-emr.scTransaction counting by title
dump-transactions.scTransaction data export

Drivers:

  • PreselectDriver4Emr - Main preselect execution
  • VariablesDriver - Variable computation
  • EmailMatchDriver4Emr - Email matching
  • PDPDriver4Emr - Data protection
  • PickBrowseDriver4Emr - Browse data selection
  • SelectAttributesDriver4Emr - Attribute selection
  • TitleTransactionCountsDriver4Emr - Transaction counts
  • DumpTransactionsDriver4Emr - Transaction export

Buyer Types (Responder Logic):

TypeDescription
New-LegacyOriginal logic - responder start date after prospect end date
NewModified for simulation modeling - dates can overlap
AllNo house-file restrictions - one purchase required
BestLike All but requires two purchases in responder window

Training Data Process:

  1. Step 1: Generate large sample with binary variables (1M prospects, 100K responders)
  2. Step 2: Select optimal households (Python/external)
  3. Step 3: Extract full training data for optimal households using training-ids/ folder

Households

Purpose: Match, deduplicate, and manage household data across the cooperative.

Modules:

  • households-capture/ - Data capture processing
  • households-common/ - Shared utilities
  • households-reports/ - Reporting
  • households-stages-emr/ - EMR stage processing

Integration:

  • Households processing runs automatically
  • Triggers generate-stats workflow upon completion
  • Data stored in dated folders: extract/YYYY-MM-DD/clientData

Fulfillment

Purpose: Process keycoding and list preparation for direct mail campaigns.

Key Drivers:

  • FulfillmentInputAnalysisDriver4Emr - Analyze input data, build match files
  • KeycodingDriver4Emr - Main fulfillment/keycoding processing

Matching Types:

TypeFormatExample
States2-letter codesCA, NY, TX
ZIP33-digit900, 100
ZIP55-digit90210, 10001
ZIP99-digit902101234

Workflow:

  1. JIRA Attachment - State/zipcode files attached to ORDER tickets
  2. Match File Processing - Convert to importId format, upload to S3
  3. Fulfillment Input Analysis - Pre-compute matches, output “line-by-line” files
  4. Keycoding - Filter based on pre-computed matches

Generate-Stats

Purpose: Create customer statistics and reports from transaction data.

Tools:

ToolPurpose
generate-stats-emrMain statistics generation
sku-report-emrProduct (SKU) reports
keycode-report-emrMarketing keycode reports
response-analysis-emrCustomer behavior analysis

Production Integration:

  • Runs automatically after households processing completes
  • Uses large, memory-optimized EMR clusters
  • Standard statistics enabled by default (titleStats, transRecency, repeatPurchaseHistory, etc.)
  • Results stored in MongoDB

4Cite (Browse Data)

Purpose: Process second-party browse/website behavior data from 4Cite partner.

Key Scripts:

  • extractmd5-emr.sc / extractmd5-databricks.sc - MD5 extraction
  • Monthly browse reporting

Integration:

  • allowSecondParty flag controls 4Cite data inclusion in preselect
  • --override-4cite-exclusion parameter to include blocked data

Third-Party (BigDBM)

Purpose: Process external data enrichment from BigDBM partner.

Drivers:

  • ConvertBigDBMFullDriver4Emr - Convert BigDBM data
  • ConvertBigDBMNameAddressDriver4Emr - Name/address conversion
  • ExtractBigDBMAddressIdentityDriver4Emr - Extract identities
  • GatherBigDBMFullDriver4Emr - Gather BigDBM data
  • VerifyBigDBMFullDriver4Emr - Verify data integrity

Extract (CASS/NCOA)

Purpose: Export addresses and identities for CASS (Coding Accuracy Support System) and NCOA (National Change of Address) processing.

Drivers:

  • ExtractNCOAAddressIdentityDriver4Emr - Extract for NCOA
  • ProcessNcoaResultsDriver4Emr - Process NCOA results
  • ReportNcoaResultsDriver4Emr - Report on NCOA
  • MatchDataIdentityDriver4Emr - Match identities
  • MatchPrisonIdentityDriver4Emr - Prison database matching

Core Libraries

emr-core (EMR Framework)

Location: scala-parent-pom/emr-core/emr-api/

Core EMR launcher infrastructure:

  • EmrLauncher - Builder pattern for job configuration
  • EmrProvider - Cluster monitoring and status
  • EmrBehaviors - Launch/monitoring logic
  • EmrAccessors - Configuration defaults
  • EmrExclusionConfig - Instance/zone exclusion management
  • EmrReanimator - Retry/recovery strategies

Retry Strategies:

  • OPTIMISTIC - Cheapest, prefers ARM/spot
  • CAREFUL - Balanced approach
  • PESSIMISTIC - Faster failover to x86
  • PARANOID - Quickest on-demand failover

common

Location: scala-parent-pom/common/

Shared utilities and collections:

  • HhIdSet / LongSet - Memory-efficient household ID storage (~8 bytes per ID)
  • General utilities for all modules

json

Location: scala-parent-pom/json/

JSON serialization:

  • json-definitions - Core JSON types
  • json-definitions-4js - JavaScript compatibility
  • json-preselect - Preselect-specific types

spark-core

Location: scala-parent-pom/spark-core/spark-api/

Spark API wrappers and utilities.

another-name-parser

Location: scala-parent-pom/another-name-parser/

Name parsing library for identity matching.

helpers

Location: scala-parent-pom/helpers/

Utility scripts:

  • copy-mongo-to-s3 - Sync MongoDB collections to S3 as JSON lines

Data Flow

Overall Data Pipeline

┌──────────────────┐
│   Raw Data       │  (Transaction data from cooperative members)
└────────┬─────────┘
         │
         ▼
┌──────────────────┐
│   Households     │  (Match, dedupe, manage household data)
└────────┬─────────┘
         │
         ├─────────────────────────────────────────────┐
         │                                             │
         ▼                                             ▼
┌──────────────────┐                       ┌──────────────────┐
│  Generate-Stats  │ (Automatic)           │   Preselect      │ (On-demand)
│  (Analytics)     │                       │   (Audience)     │
└──────────────────┘                       └────────┬─────────┘
                                                    │
                                                    ▼
                                           ┌──────────────────┐
                                           │   Fulfillment    │
                                           │   (Keycoding)    │
                                           └────────┬─────────┘
                                                    │
                                                    ▼
                                           ┌──────────────────┐
                                           │   Extract        │
                                           │   (CASS/NCOA)    │
                                           └──────────────────┘

Preselect Data Flow

  1. Input: Preselect configuration (JSON) + households data
  2. Classification: Identify responders vs. prospects
  3. Variable Generation: Compute features for scoring
  4. Model Scoring: Apply ML models
  5. Selection: Choose top prospects
  6. Output: Selected audience files

Fulfillment Data Flow

  1. Input: fulfillment_input2.tsv + match files (states, zips)
  2. Match File Processing: Convert to importId format
  3. Fulfillment Input Analysis: Pre-compute all matches
  4. Output: Line-by-line files with matchingKeys
  5. Keycoding: Filter based on match keys

Development

Building

# Full build with tests
mvn clean install

# Skip tests
mvn clean install -DskipTests

# Build specific module
mvn clean install -pl spark-parent-pom/preselect-emr -am

Testing

# Run all tests
mvn test

# Run specific test
mvn test -pl spark-parent-pom/preselect-emr -Dtest=MyTestClass

Code Quality

# Check style
mvn scalastyle:check

# Format code
./format.sh

Deployment

Each EMR module has a build.sh script that creates deployment packages:

# Build EMR deployment
cd spark-parent-pom/preselect-emr
./build.sh

# Install all EMR modules
./install-all.sh

Cluster Sizing

Default cluster sizes (compute nodes):

SizeNodesUse Case
S25Small jobs, testing
M47Medium workloads
L95Production (full households)

Sizing Theory:

  • Partition count / 8 executors per node = ideal compute nodes
  • Add 5-10% fudge factor for EMR partition variance
  • Full households file: 2,880 partitions / 8 = 360 executors = 90 nodes + 5% = 95 nodes

Environment Variables

VariablePurpose
P2R_RUNTIME_ENVIRONMENTSANDBOX, DEVELOPMENT, STAGING, RC, PRODUCTION
P2R_EMR_CONFIGOverride default EMR configuration
P2R_EMR_PROFILEAWS profile for credentials
P2R_INSTANCE_TYPESFilter allowed instance types
P2R_DEFAULT_ZONEFilter availability zones (regex)
P2R_FORCE_ON_DEMANDForce on-demand vs spot
P2R_DEFAULT_NODE_TYPEOverride default compute node type

EMR Exclusions

Centralized instance exclusion via S3:

# Add exclusion
emr-exclusions --command add --zone us-east-1a --instance-type m7g.2xlarge --reason "Capacity issues"

# Remove exclusion
emr-exclusions --command remove --zone us-east-1a --instance-type m7g.2xlarge

# List all exclusions
emr-exclusions --command list

Monitoring Scripts

# Watch job progress
emr-watch.sc --cluster-id j-XXXXXXXXXXXXX

# Watch state transitions
emr-watch-states.sc --cluster-id j-XXXXXXXXXXXXX

# View EMR configuration
emr-config

Job Status Logs

Location: s3://bucket/job/[dev/]YYYY/MM/DD/{clusterId}.json

Updated every 30 seconds with:

  • Cluster ID and state
  • Step ID and state
  • Worker counts and fleet type
  • Timestamps
  • Error information
  • Configuration details
DocumentLocationDescription
Preselect EMR Docsspark-parent-pom/preselect-emr/docs/mdbook documentation
Generate-Stats Docsspark-parent-pom/generate-stats/docs/Tool documentation
Fulfillment Guidespark-parent-pom/fulfillment-emr/FULFILLMENT_MATCHING_GUIDE.mdMatching details
EMR Architecture.amazonq/docs/EMR_ARCHITECTURE.mdArchitecture details
Build Patterns.amazonq/docs/BUILD_PATTERNS.mdBuild conventions
Dev Guidelines.amazonq/docs/DEVELOPMENT_GUIDELINES.mdCoding standards
Cluster Sizingdocu/how-to-size-a-cluster.mdEMR sizing guide
Path2Acquisition Flowproduct-management/docs/path2acquisition-flow.mdBusiness process

Key Integration Points

BERT (Base Environment for Re-tooled Technology)

BERT orchestrates coop-scala jobs for:

  • Campaign processing workflows
  • Automated statistics generation
  • Fulfillment operations

Order App

Triggers fulfillment jobs from campaign orders.

Data Tools

Coordinates data processing and extract operations.

MongoDB

  • Statistics results storage
  • Configuration sync via copy-mongo-to-s3

Source: README.md, .amazonq/docs/, spark-parent-pom//README.md, spark-parent-pom/preselect-emr/docs/, spark-parent-pom/generate-stats/docs/, spark-parent-pom/fulfillment-emr/FULFILLMENT_MATCHING_GUIDE.md, docu/, preselect-book/*

Documentation created: 2026-01-24