coop-scala Overview
Scala/Spark mono-repository for Path2Response’s core data processing infrastructure, including audience selection (preselect), householding, fulfillment, and analytics.
Purpose
coop-scala is the heart of P2R’s data processing. It contains all the Scala/Spark applications that run on AWS EMR (Elastic MapReduce) to:
- Build audiences (Preselect) - Score and select prospects from the cooperative data using machine learning models
- Process households (Households) - Match, deduplicate, and manage household data across the cooperative
- Fulfill orders (Fulfillment) - Process keycoding and list preparation for direct mail campaigns
- Generate statistics (Generate-Stats) - Create analytics and reports from customer transaction data
- Handle third-party data (Third-Party, 4Cite) - Process external data sources including BigDBM and browse behavior data
- Extract data (Extract) - Export addresses and identities for CASS/NCOA processing
Architecture
Repository Structure
coop-scala/ # Root POM - Version 335.0.0-SNAPSHOT
├── scala-parent-pom/ # Core libraries (non-Spark)
│ ├── common/ # Shared utilities, collections
│ ├── cli/ # Command-line interface utilities
│ ├── emr-core/ # EMR launcher framework
│ │ └── emr-api/ # EMR API, configuration, logging
│ ├── json/ # JSON serialization
│ │ ├── json-definitions/ # Core JSON types
│ │ ├── json-definitions-4js/ # JSON for JavaScript compatibility
│ │ └── json-preselect/ # Preselect-specific JSON
│ ├── helpers/ # Utility scripts (copy-mongo-to-s3)
│ ├── another-name-parser/ # Name parsing library
│ ├── modeling-support/ # ML/modeling utilities
│ ├── operations/ # Ops automation tools
│ ├── spark-core/ # Spark API wrappers
│ └── databricks-core/ # Databricks API (legacy)
│
├── spark-parent-pom/ # Spark applications
│ ├── spark-common/ # Shared Spark utilities
│ ├── spark-common-test/ # Spark testing framework
│ ├── spark-launcher/ # EMR job submission framework
│ │ ├── emr-launcher/ # Core EMR launcher
│ │ ├── emr-launcher-ext/ # Extended EMR functionality
│ │ ├── emr-revive/ # Retry/recovery (EmrReanimator)
│ │ └── spark-driver/ # Driver utilities
│ │
│ ├── preselect/ # Audience selection (core logic)
│ │ ├── preselect-app/ # Main application
│ │ ├── preselect-core/ # Core algorithms
│ │ ├── preselect-classifiers/ # Household classification
│ │ ├── preselect-population/ # Population handling
│ │ ├── preselect-variables/ # Variable generation
│ │ ├── preselect-configuration/ # Configuration management
│ │ ├── preselect-callbacks/ # Monitoring hooks
│ │ ├── preselect-collections/ # Data structures
│ │ └── preselect-dates/ # Date handling
│ ├── preselect-emr/ # EMR deployment (main CLI)
│ ├── preselect-cli/ # Cloudera deployment (legacy)
│ ├── preselect-databricks/ # Databricks deployment (legacy)
│ │
│ ├── households/ # Household processing (core)
│ │ ├── households-capture/ # Data capture processing
│ │ ├── households-common/ # Shared utilities
│ │ ├── households-reports/ # Reporting
│ │ └── households-stages-emr/ # EMR stages
│ ├── households-emr/ # EMR deployment
│ │
│ ├── fulfillment/ # Order fulfillment (core)
│ ├── fulfillment-emr/ # EMR deployment
│ │
│ ├── generate-stats/ # Statistics generation
│ │ ├── generate-stats-core/ # Core logic
│ │ ├── generate-stats-emr/ # EMR deployment
│ │ └── docs/ # Documentation
│ │
│ ├── 4cite/ # Browse data (4Cite partner)
│ ├── 4cite-emr/ # EMR deployment
│ │
│ ├── third-party/ # Third-party integrations
│ ├── third-party-emr/ # EMR deployment (BigDBM, etc.)
│ │
│ ├── extract-data/ # Data extraction core
│ ├── extract-emr/ # EMR deployment
│ ├── extract-ncoa/ # CASS/NCOA extraction
│ │
│ ├── convert-emr/ # Data conversion
│ ├── digital-audience/ # Digital audience building
│ ├── digital-audience-emr/ # EMR deployment
│ ├── reseller-append/ # Reseller data processing
│ ├── reseller-emr/ # EMR deployment
│ │
│ ├── databricks/ # Databricks integration (legacy)
│ ├── coop-spark-automation/ # Automated Spark jobs
│ ├── integration-tests/ # Integration test suite
│ └── ops/ # Operational automation
│
├── docu/ # Legacy documentation
├── preselect-book/ # Preselect mdbook docs
└── examples/ # Example code
Technology Stack
| Component | Version | Notes |
|---|---|---|
| Scala | 2.12.18 | Locked to Spark version |
| Spark | 3.5.2 | Core processing engine |
| Hadoop | 3.4.0 | EMR container version |
| Java | 17 | Compilation target |
| Jackson | 2.15.2 | JSON serialization |
| ScalaTest | 3.2.19 | Testing framework |
Build Tools:
- Maven with custom parent POMs
- Scalastyle for code quality
- Scalariform for code formatting
- Maven Shade Plugin for fat JARs (EMR deployment)
EMR Launcher Architecture
The EMR launcher system provides a sophisticated framework for submitting and monitoring Spark jobs:
Local Machine S3 EMR Cluster
───────────── ── ───────────
Script (.sc)
│
Launcher4Emr
│ upload files
├─────────────────────────────→ input files
│ serialize options
├─────────────────────────────→ options.json
│
EmrSimpleLauncher
│
EmrLauncher
│ upload JAR
├─────────────────────────────→ job JAR
│ submit cluster
├──────────────────────────────────────────→ Cluster Created
│ │
EmrProvider.waitForJob() │
│ poll status (30s) │
├────────────────────────────────────────→ get status
│ write log │
├─────────────────────────────→ job log │
│ │ download JAR
│ Driver4Emr runs
│ │
│ Execute Spark Job
│ │
│ results ←────────────┤
│ detect completion │
├────────────────────────────────────────→ Cluster Terminated
│
onAfterJobComplete()
│ download results
Key Components:
- Script Layer (
*.sc) - Entry point bash/scala scripts - Launcher Layer (
*Launcher4Emr) - Parse options, upload files, submit jobs - Simple Launcher (
EmrSimpleLauncher) - Configure cluster specs - EMR Launcher Core (
EmrLauncher) - Builder pattern for configuration - EMR Provider (
EmrProvider) - Polling, monitoring, status logging - Driver Layer (
*Driver4Emr) - Executes on EMR cluster
Spark Jobs
Preselect (Audience Selection)
Purpose: Score and select prospects from cooperative data for direct mail campaigns.
Key Scripts:
| Script | Purpose |
|---|---|
preselect-emr.sc | Main audience selection job |
variables-emr.sc | Generate variables for household IDs |
emailmatch-emr.sc | Email matching operations |
pdp-emr.sc | PDP (data protection) operations |
pickbrowse-emr.sc | Pick and browse operations |
selectattributes-emr.sc | Attribute selection |
title-transaction-counts-emr.sc | Transaction counting by title |
dump-transactions.sc | Transaction data export |
Drivers:
PreselectDriver4Emr- Main preselect executionVariablesDriver- Variable computationEmailMatchDriver4Emr- Email matchingPDPDriver4Emr- Data protectionPickBrowseDriver4Emr- Browse data selectionSelectAttributesDriver4Emr- Attribute selectionTitleTransactionCountsDriver4Emr- Transaction countsDumpTransactionsDriver4Emr- Transaction export
Buyer Types (Responder Logic):
| Type | Description |
|---|---|
| New-Legacy | Original logic - responder start date after prospect end date |
| New | Modified for simulation modeling - dates can overlap |
| All | No house-file restrictions - one purchase required |
| Best | Like All but requires two purchases in responder window |
Training Data Process:
- Step 1: Generate large sample with binary variables (1M prospects, 100K responders)
- Step 2: Select optimal households (Python/external)
- Step 3: Extract full training data for optimal households using
training-ids/folder
Households
Purpose: Match, deduplicate, and manage household data across the cooperative.
Modules:
households-capture/- Data capture processinghouseholds-common/- Shared utilitieshouseholds-reports/- Reportinghouseholds-stages-emr/- EMR stage processing
Integration:
- Households processing runs automatically
- Triggers generate-stats workflow upon completion
- Data stored in dated folders:
extract/YYYY-MM-DD/clientData
Fulfillment
Purpose: Process keycoding and list preparation for direct mail campaigns.
Key Drivers:
FulfillmentInputAnalysisDriver4Emr- Analyze input data, build match filesKeycodingDriver4Emr- Main fulfillment/keycoding processing
Matching Types:
| Type | Format | Example |
|---|---|---|
| States | 2-letter codes | CA, NY, TX |
| ZIP3 | 3-digit | 900, 100 |
| ZIP5 | 5-digit | 90210, 10001 |
| ZIP9 | 9-digit | 902101234 |
Workflow:
- JIRA Attachment - State/zipcode files attached to ORDER tickets
- Match File Processing - Convert to importId format, upload to S3
- Fulfillment Input Analysis - Pre-compute matches, output “line-by-line” files
- Keycoding - Filter based on pre-computed matches
Generate-Stats
Purpose: Create customer statistics and reports from transaction data.
Tools:
| Tool | Purpose |
|---|---|
generate-stats-emr | Main statistics generation |
sku-report-emr | Product (SKU) reports |
keycode-report-emr | Marketing keycode reports |
response-analysis-emr | Customer behavior analysis |
Production Integration:
- Runs automatically after households processing completes
- Uses large, memory-optimized EMR clusters
- Standard statistics enabled by default (titleStats, transRecency, repeatPurchaseHistory, etc.)
- Results stored in MongoDB
4Cite (Browse Data)
Purpose: Process second-party browse/website behavior data from 4Cite partner.
Key Scripts:
extractmd5-emr.sc/extractmd5-databricks.sc- MD5 extraction- Monthly browse reporting
Integration:
allowSecondPartyflag controls 4Cite data inclusion in preselect--override-4cite-exclusionparameter to include blocked data
Third-Party (BigDBM)
Purpose: Process external data enrichment from BigDBM partner.
Drivers:
ConvertBigDBMFullDriver4Emr- Convert BigDBM dataConvertBigDBMNameAddressDriver4Emr- Name/address conversionExtractBigDBMAddressIdentityDriver4Emr- Extract identitiesGatherBigDBMFullDriver4Emr- Gather BigDBM dataVerifyBigDBMFullDriver4Emr- Verify data integrity
Extract (CASS/NCOA)
Purpose: Export addresses and identities for CASS (Coding Accuracy Support System) and NCOA (National Change of Address) processing.
Drivers:
ExtractNCOAAddressIdentityDriver4Emr- Extract for NCOAProcessNcoaResultsDriver4Emr- Process NCOA resultsReportNcoaResultsDriver4Emr- Report on NCOAMatchDataIdentityDriver4Emr- Match identitiesMatchPrisonIdentityDriver4Emr- Prison database matching
Core Libraries
emr-core (EMR Framework)
Location: scala-parent-pom/emr-core/emr-api/
Core EMR launcher infrastructure:
EmrLauncher- Builder pattern for job configurationEmrProvider- Cluster monitoring and statusEmrBehaviors- Launch/monitoring logicEmrAccessors- Configuration defaultsEmrExclusionConfig- Instance/zone exclusion managementEmrReanimator- Retry/recovery strategies
Retry Strategies:
- OPTIMISTIC - Cheapest, prefers ARM/spot
- CAREFUL - Balanced approach
- PESSIMISTIC - Faster failover to x86
- PARANOID - Quickest on-demand failover
common
Location: scala-parent-pom/common/
Shared utilities and collections:
HhIdSet/LongSet- Memory-efficient household ID storage (~8 bytes per ID)- General utilities for all modules
json
Location: scala-parent-pom/json/
JSON serialization:
json-definitions- Core JSON typesjson-definitions-4js- JavaScript compatibilityjson-preselect- Preselect-specific types
spark-core
Location: scala-parent-pom/spark-core/spark-api/
Spark API wrappers and utilities.
another-name-parser
Location: scala-parent-pom/another-name-parser/
Name parsing library for identity matching.
helpers
Location: scala-parent-pom/helpers/
Utility scripts:
copy-mongo-to-s3- Sync MongoDB collections to S3 as JSON lines
Data Flow
Overall Data Pipeline
┌──────────────────┐
│ Raw Data │ (Transaction data from cooperative members)
└────────┬─────────┘
│
▼
┌──────────────────┐
│ Households │ (Match, dedupe, manage household data)
└────────┬─────────┘
│
├─────────────────────────────────────────────┐
│ │
▼ ▼
┌──────────────────┐ ┌──────────────────┐
│ Generate-Stats │ (Automatic) │ Preselect │ (On-demand)
│ (Analytics) │ │ (Audience) │
└──────────────────┘ └────────┬─────────┘
│
▼
┌──────────────────┐
│ Fulfillment │
│ (Keycoding) │
└────────┬─────────┘
│
▼
┌──────────────────┐
│ Extract │
│ (CASS/NCOA) │
└──────────────────┘
Preselect Data Flow
- Input: Preselect configuration (JSON) + households data
- Classification: Identify responders vs. prospects
- Variable Generation: Compute features for scoring
- Model Scoring: Apply ML models
- Selection: Choose top prospects
- Output: Selected audience files
Fulfillment Data Flow
- Input: fulfillment_input2.tsv + match files (states, zips)
- Match File Processing: Convert to importId format
- Fulfillment Input Analysis: Pre-compute all matches
- Output: Line-by-line files with
matchingKeys - Keycoding: Filter based on match keys
Development
Building
# Full build with tests
mvn clean install
# Skip tests
mvn clean install -DskipTests
# Build specific module
mvn clean install -pl spark-parent-pom/preselect-emr -am
Testing
# Run all tests
mvn test
# Run specific test
mvn test -pl spark-parent-pom/preselect-emr -Dtest=MyTestClass
Code Quality
# Check style
mvn scalastyle:check
# Format code
./format.sh
Deployment
Each EMR module has a build.sh script that creates deployment packages:
# Build EMR deployment
cd spark-parent-pom/preselect-emr
./build.sh
# Install all EMR modules
./install-all.sh
Cluster Sizing
Default cluster sizes (compute nodes):
| Size | Nodes | Use Case |
|---|---|---|
| S | 25 | Small jobs, testing |
| M | 47 | Medium workloads |
| L | 95 | Production (full households) |
Sizing Theory:
- Partition count / 8 executors per node = ideal compute nodes
- Add 5-10% fudge factor for EMR partition variance
- Full households file: 2,880 partitions / 8 = 360 executors = 90 nodes + 5% = 95 nodes
Environment Variables
| Variable | Purpose |
|---|---|
P2R_RUNTIME_ENVIRONMENT | SANDBOX, DEVELOPMENT, STAGING, RC, PRODUCTION |
P2R_EMR_CONFIG | Override default EMR configuration |
P2R_EMR_PROFILE | AWS profile for credentials |
P2R_INSTANCE_TYPES | Filter allowed instance types |
P2R_DEFAULT_ZONE | Filter availability zones (regex) |
P2R_FORCE_ON_DEMAND | Force on-demand vs spot |
P2R_DEFAULT_NODE_TYPE | Override default compute node type |
EMR Exclusions
Centralized instance exclusion via S3:
# Add exclusion
emr-exclusions --command add --zone us-east-1a --instance-type m7g.2xlarge --reason "Capacity issues"
# Remove exclusion
emr-exclusions --command remove --zone us-east-1a --instance-type m7g.2xlarge
# List all exclusions
emr-exclusions --command list
Monitoring Scripts
# Watch job progress
emr-watch.sc --cluster-id j-XXXXXXXXXXXXX
# Watch state transitions
emr-watch-states.sc --cluster-id j-XXXXXXXXXXXXX
# View EMR configuration
emr-config
Job Status Logs
Location: s3://bucket/job/[dev/]YYYY/MM/DD/{clusterId}.json
Updated every 30 seconds with:
- Cluster ID and state
- Step ID and state
- Worker counts and fleet type
- Timestamps
- Error information
- Configuration details
Related Documentation
| Document | Location | Description |
|---|---|---|
| Preselect EMR Docs | spark-parent-pom/preselect-emr/docs/ | mdbook documentation |
| Generate-Stats Docs | spark-parent-pom/generate-stats/docs/ | Tool documentation |
| Fulfillment Guide | spark-parent-pom/fulfillment-emr/FULFILLMENT_MATCHING_GUIDE.md | Matching details |
| EMR Architecture | .amazonq/docs/EMR_ARCHITECTURE.md | Architecture details |
| Build Patterns | .amazonq/docs/BUILD_PATTERNS.md | Build conventions |
| Dev Guidelines | .amazonq/docs/DEVELOPMENT_GUIDELINES.md | Coding standards |
| Cluster Sizing | docu/how-to-size-a-cluster.md | EMR sizing guide |
| Path2Acquisition Flow | product-management/docs/path2acquisition-flow.md | Business process |
Key Integration Points
BERT (Base Environment for Re-tooled Technology)
BERT orchestrates coop-scala jobs for:
- Campaign processing workflows
- Automated statistics generation
- Fulfillment operations
Order App
Triggers fulfillment jobs from campaign orders.
Data Tools
Coordinates data processing and extract operations.
MongoDB
- Statistics results storage
- Configuration sync via
copy-mongo-to-s3
Source: README.md, .amazonq/docs/, spark-parent-pom//README.md, spark-parent-pom/preselect-emr/docs/, spark-parent-pom/generate-stats/docs/, spark-parent-pom/fulfillment-emr/FULFILLMENT_MATCHING_GUIDE.md, docu/, preselect-book/*
Documentation created: 2026-01-24