> ## Documentation Index
> Fetch the complete documentation index at: https://docs.doman.id/llms.txt
> Use this file to discover all available pages before exploring further.

# ETL Schema-Driven Service Documentation

## 1. Overview

This service provides a simple, powerful, and extensible ETL (Extract, Transform, Load) solution for Laravel applications, specifically designed to handle nested documents from a MongoDB database.

The core philosophy is to be **schema-driven**. A simple YAML file defines the data structure, validation rules, API visibility, and even frontend behavior for a given data model. This creates a "single source of truth" that simplifies development and ensures consistency across your application.

**Key Features:**

* **Schema-Driven:** Define your data models once in a YAML file.
* **MongoDB-Friendly:** Easily import and export complex documents with nested objects and arrays.
* **User-Friendly Templates:** Uses a simple "parent row" convention in spreadsheets, making it easy for non-technical users to populate data with nested items.
* **Extensible Drivers:** Supports both `maatwebsite/excel` (feature-rich) and `openspout` (high-performance) for a balance of features and speed.
* **Fluent Export API:** A clean, chainable interface for building and dispatching complex export jobs (e.g., `Etl::export()->from(...)->chunk(...)`).
* **Integrated Validation:** The schema can define Laravel validation rules, which can be used in your Form Requests.
* **API & UI Generation:** The schema contains metadata to dynamically control API resources and render frontend forms.

***

## 2. Core Concepts

### 2.1. The Schema (`.yml` file)

The schema is the heart of the service. It's a YAML file (e.g., `user_schema.yml`) that defines everything about a data model.

| Property                 | Description                                                                                                               |
| :----------------------- | :------------------------------------------------------------------------------------------------------------------------ |
| `primary_key`            | **Required.** The unique field (e.g., `_id`) used to identify a document. The import process uses this to group rows.     |
| `fields`                 | **Required.** An object containing the definition for each field in the document.                                         |
| `fields.<key>.type`      | The data type (`string`, `integer`, `float`, `boolean`, `datetime`, `array`, `object`). Used for casting.                 |
| `fields.<key>.header`    | (Optional) A human-readable label to use as the column header in the spreadsheet.                                         |
| `fields.<key>.validator` | (Optional) A Laravel-compatible validation string. Use `{{id}}` as a placeholder for the record's ID in `unique` rules.   |
| `fields.<key>.edit`      | (Optional) Defines behavior in an edit form: `true` (editable), `false` (hidden), `'RO'` (Read-Only), `'VO'` (View-Only). |
| `fields.<key>.create`    | (Optional) Defines behavior in a create form: `true` (editable) or `false` (hidden).                                      |
| `fields.<key>.view`      | (Optional) `true` or `false`. Determines if the field is shown on detail pages or tables.                                 |
| `fields.<key>.api`       | (Optional) `true` or `false`. Determines if the field is exposed in your API resources.                                   |
| `fields.<key>.object`    | For fields of `type: array` or `type: object`, this nested key contains the schema for the sub-fields.                    |

### 2.2. The Spreadsheet Format

To represent a nested MongoDB document in a flat spreadsheet, we use a "parent row" convention.

* **Headers:** Column headers are generated from the schema's `header` or field key. Nested object fields use dot notation (e.g., `settings.theme`).
* **Parent Row:** The first row for a document contains all the main data *and* the data for the first item in the primary embedded array.
* **Child Rows:** To add more items to the embedded array, add new rows directly below the parent row. **On these child rows, leave the parent columns blank.** This signals to the importer that they belong to the document above.

***

## 3. Directory Structure

```
app/
├── Console/
│   └── Commands/
│       └── Test/
│           └── EtlTestPipeline.php     # Artisan command for testing
├── Providers/
│   └── EtlServiceProvider.php      # Registers the service
└── Services/
    └── Etl/
        ├── Contracts/
        │   ├── ExportDriver.php
        │   └── ImportDriver.php
        ├── Drivers/
        │   ├── Exports/
        │   │   ├── OpenSpoutSchemaExportDriver.php
        │   │   └── StandardSchemaExportDriver.php
        │   └── Imports/
        │       ├── OpenSpoutSchemaImportDriver.php
        │       └── StandardSchemaImportDriver.php
        ├── Etl.php                     # The service Facade
        ├── EtlManager.php              # Main service manager
        ├── ExportBuilder.php           # Fluent API for exports
        └── SchemaService.php           # Loads and parses YAML schemas
config/
└── etl_schemas/
    └── user_schema.yml             # Example schema definition
```

***

## 4. How to Use

### 4.1. Importing Data

In a controller, use the `Etl` facade to select a driver and process the uploaded file.

```php theme={null}
use App\Models\User;
use Etl; // The alias from config/app.php
use Illuminate\Http\Request;

public function import(Request $request)
{
    $request->validate(['file' => 'required|mimes:xlsx,xls']);
    $filePath = $request->file('file')->getRealPath();

    // Use the high-performance OpenSpout driver
    $dataToInsert = Etl::driver('open_spout_schema_import')
                       ->process($filePath, 'user_schema');

    foreach ($dataToInsert as $doc) {
        User::updateOrCreate(['_id' => $doc['_id']], $doc);
    }
    return back()->with('success', 'Data imported successfully!');
}
```

### 4.2. Exporting Data (Fluent API)

Use the fluent `export()` method to build and dispatch export jobs.

**Simple Export:**

```php theme={null}
use App\Models\User;
use Etl;

// Creates 'all-users.xlsx' in storage/app/
$files = Etl::export()
    ->from(User::class)
    ->usingSchema('user_schema')
    ->to('all-users.xlsx')
    ->dispatch();
```

**Paginated (Chunked) Export:**

```php theme={null}
// Creates multiple files (e.g., users-part-1.xlsx, users-part-2.xlsx)
$files = Etl::export()
    ->from(User::class)
    ->usingSchema('user_schema')
    ->to('users.xlsx')
    ->chunk(5000) // Create a new file for every 5000 records
    ->dispatch();
```

### 4.3. Testing the Pipeline

Use the provided Artisan command to test the full export/import cycle.

```bash theme={null}
# Test with the Maatwebsite driver (default)
php artisan etl:test-pipeline user_schema User

# Test with the OpenSpout driver
php artisan etl:test-pipeline user_schema User --driver=openspout

# Test with 100 records and keep the file for inspection
php artisan etl:test-pipeline user_schema User --count=100 --keep-file
```

***

## 5. Full Source Code

Here is the complete source code for every class in the service.

### 5.1. Configuration Schema

**File:** `config/etl_schemas/user_schema.yml`

```yaml theme={null}
primary_key: _id
fields:
  _id: { type: string, header: 'User ID', validator: 'required|string|unique:users,_id,{{id}},_id', edit: 'RO', create: true, view: true, api: true }
  name: { type: string, header: 'Full Name', validator: 'required|string|max:255', edit: true, create: true, view: true, api: true }
  email: { type: string, header: 'Email Address', validator: 'required|email|unique:users,email,{{id}},_id', edit: true, create: true, view: true, api: true }
  age: { type: integer, default: null, validator: 'nullable|integer|min:0|max:150', edit: true, create: true, view: true, api: true }
  registered_at: { type: datetime, default: null, validator: 'nullable|date', edit: 'VO', create: false, view: true, api: true }
  addresses:
    type: array
    validator: 'nullable|array|min:1'
    edit: true
    create: true
    view: true
    api: true
    object:
      street: { type: string, header: 'Address Street', validator: 'required_with:addresses|string|max:255' }
      city: { type: string, header: 'Address City', validator: 'required_with:addresses|string|max:100' }
      postal_code: { type: string, header: 'Address Postal Code', validator: 'required_with:addresses|string|max:20' }
  settings:
    type: object
    validator: 'nullable|array'
    edit: true
    create: true
    view: true
    api: true
    object:
      notifications_enabled: { type: boolean, header: 'Enable Notifications', default: true, validator: 'required|boolean' }
      theme: { type: string, header: 'UI Theme', default: 'light', validator: 'required|string|in:light,dark,system' }
  internal_notes: { type: string, validator: 'nullable|string', edit: false, create: false, view: false, api: false }
```

### 5.2. Contracts (Interfaces)

**File:** `app/Services/Etl/Contracts/ImportDriver.php`

```php theme={null}
<?php
namespace App\Services\Etl\Contracts;
use Illuminate\Support\Collection;
interface ImportDriver
{
    public function process(string $filePath, string $schemaName, array $options = []): Collection;
}
```

**File:** `app/Services/Etl/Contracts/ExportDriver.php`

```php theme={null}
<?php
namespace App\Services\Etl\Contracts;
use Illuminate\Support\Collection;
interface ExportDriver
{
    public function generate(Collection $data, string $schemaName, string $filename);
}
```

### 5.3. Core Services & Builders

**File:** `app/Services/Etl/SchemaService.php`

```php theme={null}
<?php
namespace App\Services\Etl;

use Illuminate\Support\Facades\Cache;
use Symfony\Component\Yaml\Yaml;

class SchemaService
{
    public function get(string $name): array
    {
        return Cache::rememberForever("etl_schema_{$name}", function () use ($name) {
            $path = config_path("etl_schemas/{$name}.yml");
            if (!file_exists($path)) {
                throw new \Exception("Schema file not found for '{$name}' at {$path}");
            }
            $schema = Yaml::parseFile($path);
            return $this->processSchema($schema);
        });
    }

    protected function processSchema(array $schema): array
    {
        $processed = [
            'primary_key' => $schema['primary_key'],
            'fields' => $schema['fields'],
            'flat_fields' => [],
            'array_fields' => [],
        ];
        foreach ($schema['fields'] as $key => $config) {
            if ($config['type'] === 'array') {
                $processed['array_fields'][$key] = $config;
            } elseif ($config['type'] === 'object') {
                foreach ($config['object'] as $subKey => $subConfig) {
                    $processed['flat_fields']["{$key}.{$subKey}"] = $subConfig;
                }
            } else {
                $processed['flat_fields'][$key] = $config;
            }
        }
        return $processed;
    }
}
```

**File:** `app/Services/Etl/EtlManager.php`

```php theme={null}
<?php
namespace App\Services\Etl;

use App\Services\Etl\Drivers\Exports\OpenSpoutSchemaExportDriver;
use App\Services\Etl\Drivers\Exports\StandardSchemaExportDriver;
use App\Services\Etl\Drivers\Imports\OpenSpoutSchemaImportDriver;
use App\Services\Etl\Drivers\Imports\StandardSchemaImportDriver;
use Illuminate\Support\Manager;

class EtlManager extends Manager
{
    public function getDefaultDriver(){ return 'standard_schema_import'; }
    public function createStandardSchemaImportDriver(){ return new StandardSchemaImportDriver(app(SchemaService::class)); }
    public function createStandardSchemaExportDriver(){ return new StandardSchemaExportDriver(app(SchemaService::class)); }
    public function createOpenSpoutSchemaImportDriver(){ return new OpenSpoutSchemaImportDriver(app(SchemaService::class)); }
    public function createOpenSpoutSchemaExportDriver(){ return new OpenSpoutSchemaExportDriver(app(SchemaService::class)); }
    public function export(): ExportBuilder { return new ExportBuilder($this); }
}
```

**File:** `app/Services/Etl/ExportBuilder.php`

```php theme={null}
<?php
namespace App\Services\Etl;

use Illuminate\Database\Eloquent\Builder;
use Illuminate\Support\Facades\Storage;
use Illuminate\Support\Str;

class ExportBuilder
{
    protected string $modelClass;
    protected ?Builder $query = null;
    protected string $schemaName;
    protected string $driverName = 'standard_schema_export';
    protected string $disk = 'local';
    protected string $baseFilename;
    protected ?int $chunkSize = null;

    public function __construct(protected EtlManager $etlManager) {}
    public function from(string $modelClass): self { $this->modelClass = $modelClass; return $this; }
    public function query(Builder $query): self { $this->query = $query; return $this; }
    public function usingSchema(string $schemaName): self { $this->schemaName = $schemaName; return $this; }
    public function withDriver(string $driverName): self { $this->driverName = $driverName; return $this; }
    public function to(string $filename): self { $this->baseFilename = Str::endsWith($filename, '.xlsx') ? $filename : $filename . '.xlsx'; return $this; }
    public function onDisk(string $disk): self { $this->disk = $disk; return $this; }
    public function chunk(int $size): self { $this->chunkSize = $size; return $this; }

    public function dispatch(): array
    {
        $driver = $this->etlManager->driver($this->driverName);
        $baseQuery = $this->query ?? $this->modelClass::query();
        $generatedFiles = [];

        if ($this->chunkSize > 0) {
            $fileNumber = 1;
            $baseName = pathinfo($this->baseFilename, PATHINFO_FILENAME);
            $extension = pathinfo($this->baseFilename, PATHINFO_EXTENSION);

            $baseQuery->chunkById($this->chunkSize, function ($records) use (&$fileNumber, &$generatedFiles, $driver, $baseName, $extension) {
                if ($records->isEmpty()) return;
                $chunkFilename = "{$baseName}-part-{$fileNumber}.{$extension}";
                $fullPath = Storage::disk($this->disk)->path($chunkFilename);
                $driver->generate($records, $this->schemaName, $fullPath);
                $generatedFiles[] = $chunkFilename;
                $fileNumber++;
            });
        } else {
            $data = $baseQuery->get();
            $fullPath = Storage::disk($this->disk)->path($this->baseFilename);
            $driver->generate($data, $this->schemaName, $fullPath);
            $generatedFiles[] = $this->baseFilename;
        }
        return $generatedFiles;
    }
}
```

### 5.4. Drivers

**File:** `app/Services/Etl/Drivers/Imports/StandardSchemaImportDriver.php`

```php theme={null}
<?php
namespace App\Services\Etl\Drivers\Imports;

use App\Services\Etl\Contracts\ImportDriver;
use App\Services\Etl\SchemaService;
use Carbon\Carbon;
use Illuminate\Support\Collection;
use Illuminate\Support\Str;
use Maatwebsite\Excel\Concerns\ToCollection;
use Maatwebsite\Excel\Concerns\WithHeadingRow;
use Maatwebsite\Excel\Facades\Excel;

class StandardSchemaImportDriver implements ImportDriver, ToCollection, WithHeadingRow
{
    protected Collection $results;
    protected array $schema;

    public function __construct(protected SchemaService $schemaService) { $this->results = new Collection(); }

    public function process(string $filePath, string $schemaName, array $options = []): Collection
    {
        $this->schema = $this->schemaService->get($schemaName);
        Excel::import($this, $filePath);
        return $this->results;
    }

    public function collection(Collection $rows)
    {
        $currentDocument = null;
        $primaryKey = $this->schema['primary_key'];
        $primaryKeyHeader = Str::snake($this->schema['fields'][$primaryKey]['header'] ?? $primaryKey);

        foreach ($rows as $row) {
            $rowArray = $row->toArray();
            if (!empty($rowArray[$primaryKeyHeader])) {
                if ($currentDocument !== null) $this->results->push($currentDocument);
                $currentDocument = $this->buildDocumentFromRow($rowArray);
            }
            if ($currentDocument !== null) {
                $this->addArrayItemsToDocument($currentDocument, $rowArray);
            }
        }
        if ($currentDocument !== null) $this->results->push($currentDocument);
    }

    private function buildDocumentFromRow(array $row): array
    {
        $doc = [];
        foreach ($this->schema['flat_fields'] as $key => $config) {
            $header = Str::snake($config['header'] ?? $key);
            $value = $row[$header] ?? $config['default'] ?? null;
            data_set($doc, $key, $this->castValue($value, $config['type']));
        }
        foreach ($this->schema['array_fields'] as $key => $config) { $doc[$key] = []; }
        return $doc;
    }

    private function addArrayItemsToDocument(array &$document, array $row)
    {
        foreach ($this->schema['array_fields'] as $key => $config) {
            if (isset($config['object']['type'])) {
                $header = Str::snake($config['header'] ?? $key);
                if (isset($row[$header]) && $row[$header] !== '' && !is_null($row[$header])) {
                     $document[$key][] = $this->castValue($row[$header], $config['object']['type']);
                }
            } else {
                $item = []; $hasData = false;
                foreach ($config['object'] as $subKey => $subConfig) {
                    $header = Str::snake($subConfig['header'] ?? "{$key}.{$subKey}");
                    if (isset($row[$header]) && $row[$header] !== '' && !is_null($row[$header])) {
                        $hasData = true;
                        $item[$subKey] = $this->castValue($row[$header], $subConfig['type']);
                    }
                }
                if ($hasData) $document[$key][] = $item;
            }
        }
    }

    private function castValue($value, string $type)
    {
        if (is_null($value)) return null;
        return match ($type) {
            'integer' => (int) $value, 'float' => (float) $value,
            'boolean' => filter_var($value, FILTER_VALIDATE_BOOLEAN),
            'datetime'=> Carbon::parse($value)->toDateTime(),
            default => (string) $value,
        };
    }
}
```

**File:** `app/Services/Etl/Drivers/Imports/OpenSpoutSchemaImportDriver.php`

```php theme={null}
<?php
namespace App\Services\Etl\Drivers\Imports;

use App\Services\Etl\Contracts\ImportDriver;
use App\Services\Etl\SchemaService;
use Carbon\Carbon;
use Illuminate\Support\Collection;
use OpenSpout\Common\Entity\Row;
use OpenSpout\Reader\XLSX\Reader;

class OpenSpoutSchemaImportDriver implements ImportDriver
{
    protected array $schema;
    public function __construct(protected SchemaService $schemaService) {}

    public function process(string $filePath, string $schemaName, array $options = []): Collection
    {
        $this->schema = $this->schemaService->get($schemaName);
        $reader = new Reader();
        $reader->open($filePath);
        $results = new Collection();
        $currentDocument = null;
        $headers = [];
        $primaryKey = $this->schema['primary_key'];
        $sheet = $reader->getSheetIterator()->current();

        foreach ($sheet->getRowIterator() as $rowIndex => $row) {
            $rowData = array_map(fn($cell) => $cell->getValue(), $row->getCells());
            if ($rowIndex === 1) {
                $headers = array_map('trim', $rowData); continue;
            }
            if (empty(array_filter($rowData))) continue;

            $assocRow = array_combine($headers, array_pad($rowData, count($headers), null));
            $primaryKeyHeader = $this->schema['fields'][$primaryKey]['header'] ?? $primaryKey;

            if (!empty($assocRow[$primaryKeyHeader])) {
                if ($currentDocument !== null) $results->push($currentDocument);
                $currentDocument = $this->buildDocumentFromRow($assocRow);
            }
            if ($currentDocument !== null) {
                $this->addArrayItemsToDocument($currentDocument, $assocRow);
            }
        }
        if ($currentDocument !== null) $results->push($currentDocument);
        $reader->close();
        return $results;
    }

    private function buildDocumentFromRow(array $assocRow): array
    {
        $doc = [];
        foreach ($this->schema['flat_fields'] as $key => $config) {
            $header = $config['header'] ?? $key;
            $value = $assocRow[$header] ?? $config['default'] ?? null;
            data_set($doc, $key, $this->castValue($value, $config['type']));
        }
        foreach ($this->schema['array_fields'] as $key => $config) { $doc[$key] = []; }
        return $doc;
    }

    private function addArrayItemsToDocument(array &$document, array $assocRow)
    {
        foreach ($this->schema['array_fields'] as $key => $config) {
            if (isset($config['object']['type'])) {
                $header = $config['header'] ?? $key;
                if (isset($assocRow[$header]) && $assocRow[$header] !== '' && !is_null($assocRow[$header])) {
                    $document[$key][] = $this->castValue($assocRow[$header], $config['object']['type']);
                }
            } else {
                $item = []; $hasData = false;
                foreach ($config['object'] as $subKey => $subConfig) {
                    $header = $subConfig['header'] ?? "{$key}.{$subKey}";
                    if (isset($assocRow[$header]) && $assocRow[$header] !== '' && !is_null($assocRow[$header])) {
                        $hasData = true;
                        $item[$subKey] = $this->castValue($assocRow[$header], $subConfig['type']);
                    }
                }
                if ($hasData) $document[$key][] = $item;
            }
        }
    }

    private function castValue($value, string $type)
    {
        if (is_null($value)) return null;
        return match ($type) {
            'integer' => (int) $value, 'float' => (float) $value,
            'boolean' => filter_var($value, FILTER_VALIDATE_BOOLEAN),
            'datetime' => $value instanceof \DateTime ? $value : Carbon::parse($value)->toDateTime(),
            default => (string) $value,
        };
    }
}
```

**File:** `app/Services/Etl/Drivers/Exports/StandardSchemaExportDriver.php`

```php theme={null}
<?php
namespace App\Services\Etl\Drivers\Exports;

use App\Services\Etl\Contracts\ExportDriver;
use App\Services\Etl\SchemaService;
use Illuminate\Support\Collection;
use Maatwebsite\Excel\Concerns\Exportable;
use Maatwebsite\Excel\Concerns\FromCollection;
use Maatwebsite\Excel\Concerns\WithHeadings;
use Maatwebsite\Excel\Concerns\WithMapping;
use Maatwebsite\Excel\Facades\Excel;

class StandardSchemaExportDriver implements ExportDriver, FromCollection, WithHeadings, WithMapping
{
    use Exportable;
    protected Collection $data;
    protected array $schema;
    protected array $flattenedData = [];
    protected ?string $primaryArrayKey = null;

    public function __construct(protected SchemaService $schemaService) {}

    public function generate(Collection $data, string $schemaName, string $filename)
    {
        $this->data = $data;
        $this->schema = $this->schemaService->get($schemaName);
        if (!empty($this->schema['array_fields'])) {
            $this->primaryArrayKey = array_key_first($this->schema['array_fields']);
        }
        $this->flattenData();
        return Excel::download($this, $filename);
    }

    protected function flattenData()
    {
        foreach ($this->data as $document) {
            $arrayItems = $this->primaryArrayKey ? data_get($document, $this->primaryArrayKey, []) : [];
            if (empty($arrayItems)) {
                $this->flattenedData[] = ['document' => $document, 'item' => null, 'is_first' => true];
            } else {
                foreach ($arrayItems as $index => $item) {
                    $this->flattenedData[] = ['document' => $document, 'item' => $item, 'is_first' => ($index === 0)];
                }
            }
        }
    }

    public function collection() { return collect($this->flattenedData); }

    public function headings(): array
    {
        $headings = [];
        foreach ($this->schema['flat_fields'] as $key => $config) {
            $headings[] = $config['header'] ?? $key;
        }
        if ($this->primaryArrayKey) {
            $arrayConfig = $this->schema['array_fields'][$this->primaryArrayKey];
            if (isset($arrayConfig['object']['type'])) {
                $headings[] = $arrayConfig['header'] ?? $this->primaryArrayKey;
            } else {
                 foreach ($arrayConfig['object'] as $subKey => $subConfig) {
                     $headings[] = $subConfig['header'] ?? "{$this->primaryArrayKey}.{$subKey}";
                 }
            }
        }
        return $headings;
    }

    public function map($row): array
    {
        $mappedRow = [];
        foreach ($this->schema['flat_fields'] as $key => $config) {
            $value = $row['is_first'] ? data_get($row['document'], $key) : '';
            if ($value instanceof \DateTime) $value = $value->format('Y-m-d H:i:s');
            $mappedRow[] = $value;
        }
        if ($this->primaryArrayKey) {
            $item = $row['item'];
            $arrayConfig = $this->schema['array_fields'][$this->primaryArrayKey];
            if (is_object($item)) $item = (array) $item;
            if (isset($arrayConfig['object']['type'])) {
                $mappedRow[] = $item ?? '';
            } else {
                foreach ($arrayConfig['object'] as $subKey => $subConfig) {
                    $mappedRow[] = $item ? ($item[$subKey] ?? null) : '';
                }
            }
        }
        return $mappedRow;
    }
}
```

**File:** `app/Services/Etl/Drivers/Exports/OpenSpoutSchemaExportDriver.php`

```php theme={null}
<?php
namespace App\Services\Etl\Drivers\Exports;

use App\Services\Etl\Contracts\ExportDriver;
use App\Services\Etl\SchemaService;
use Illuminate\Support\Collection;
use Illuminate\Support\Str;
use OpenSpout\Common\Entity\Row;
use OpenSpout\Writer\XLSX\Writer;

class OpenSpoutSchemaExportDriver implements ExportDriver
{
    protected ?string $primaryArrayKey;
    public function __construct(protected SchemaService $schemaService) {}

    public function generate(Collection $data, string $schemaName, string $filename)
    {
        $schema = $this->schemaService->get($schemaName);
        $this->primaryArrayKey = empty($schema['array_fields']) ? null : array_key_first($schema['array_fields']);

        $writer = new Writer();
        if (Str::startsWith($filename, ['/', '\\']) || Str::of($filename)->contains(':')) {
            $writer->openToFile($filename);
        } else {
            $writer->openToBrowser($filename);
        }

        $writer->addRow(Row::fromValues($this->getHeadings($schema)));

        foreach ($data as $document) {
            $arrayItems = $this->primaryArrayKey ? data_get($document, $this->primaryArrayKey, []) : [];
            if (empty($arrayItems)) {
                $writer->addRow(Row::fromValues($this->mapRow($document, null, true, $schema)));
            } else {
                foreach ($arrayItems as $index => $item) {
                    $writer->addRow(Row::fromValues($this->mapRow($document, $item, $index === 0, $schema)));
                }
            }
        }
        $writer->close();
    }

    private function getHeadings(array $schema): array
    {
        $headings = [];
        foreach ($schema['flat_fields'] as $key => $config) {
            $headings[] = $config['header'] ?? $key;
        }
        if ($this->primaryArrayKey) {
            $arrayConfig = $schema['array_fields'][$this->primaryArrayKey];
            if (isset($arrayConfig['object']['type'])) {
                $headings[] = $arrayConfig['header'] ?? $this->primaryArrayKey;
            } else {
                foreach ($arrayConfig['object'] as $subKey => $subConfig) {
                    $headings[] = $subConfig['header'] ?? "{$this->primaryArrayKey}.{$subKey}";
                }
            }
        }
        return $headings;
    }

    private function mapRow($document, $item, bool $isFirst, array $schema): array
    {
        $mappedRow = [];
        foreach ($schema['flat_fields'] as $key => $config) {
            $value = $isFirst ? data_get($document, $key) : '';
            if ($value instanceof \DateTime) $value = $value->format('Y-m-d H:i:s');
            $mappedRow[] = $value;
        }
        if ($this->primaryArrayKey) {
            $arrayConfig = $schema['array_fields'][$this->primaryArrayKey];
            if (is_object($item)) $item = (array) $item;
            if (isset($arrayConfig['object']['type'])) {
                $mappedRow[] = $item ?? '';
            } else {
                foreach ($arrayConfig['object'] as $subKey => $subConfig) {
                    $mappedRow[] = $item ? ($item[$subKey] ?? null) : '';
                }
            }
        }
        return $mappedRow;
    }
}
```

### 5.5. Service Integration & Testing

**File:** `app/Services/Etl/Etl.php`

```php theme={null}
<?php
namespace App\Services\Etl;
use Illuminate\Support\Facades\Facade;

/**
 * @method static \App\Services\Etl\EtlManager driver(string $driver)
 * @method static \App\Services\Etl\ExportBuilder export()
 * @see \App\Services\Etl\EtlManager
 */
class Etl extends Facade
{
    protected static function getFacadeAccessor()
    {
        return 'etl';
    }
}
```

**File:** `app/Providers/EtlServiceProvider.php`

```php theme={null}
<?php
namespace App\Providers;

use App\Services\Etl\EtlManager;
use App\Services\Etl\SchemaService;
use Illuminate\Support\ServiceProvider;

class EtlServiceProvider extends ServiceProvider
{
    public function register(): void
    {
        $this->app->singleton('etl', function ($app) {
            return new EtlManager($app);
        });
        $this->app->singleton(SchemaService::class, function ($app) {
            return new SchemaService();
        });
    }
}
```

**File:** `config/app.php` (Add this to the `aliases` array)

```php theme={null}
'aliases' => Facade::defaultAliases()->merge([
    // ... other aliases
    'Etl' => App\Services\Etl\Etl::class,
])->toArray(),
```

**File:** `app/Console/Commands/Test/EtlTestPipeline.php`

```php theme={null}
<?php
namespace App\Console\Commands\Test;

use Etl;
use Illuminate\Console\Command;
use Illuminate\Database\Eloquent\Model;
use Illuminate\Support\Collection;
use Illuminate\Support\Facades\File;
use Illuminate\Support\Str;
use MongoDB\BSON\UTCDateTime;

class EtlTestPipeline extends Command
{
    protected $signature = 'etl:test-pipeline {schema} {model} {--count=5} {--driver=standard} {--keep-file}';
    protected $description = 'Tests the full ETL export/import pipeline for a given schema and model.';

    public function handle()
    {
        $schemaName = $this->argument('schema');
        $modelClass = $this->qualifyModel($this->argument('model'));
        $count = (int) $this->option('count');
        $driverType = $this->option('driver');

        $this->info("Starting ETL pipeline test for '{$schemaName}' schema using '{$driverType}' driver.");
        if (!class_exists($modelClass)) {
            $this->error("Model class '{$modelClass}' does not exist.");
            return Command::FAILURE;
        }
        $model = new $modelClass();
        $originalData = $model->take($count)->get();
        if ($originalData->isEmpty()) {
            $this->warn("No data found in '{$model->getTable()}'. Test cannot proceed.");
            return Command::SUCCESS;
        }
        $tempFilePath = storage_path('app/' . Str::slug($schemaName) . '_test_' . time() . '.xlsx');

        try {
            $this->comment("Exporting data to: {$tempFilePath}");
            $exportDriverName = ($driverType === 'openspout') ? 'open_spout_schema_export' : 'standard_schema_export';
            Etl::driver($exportDriverName)->generate($originalData, $schemaName, $tempFilePath);
            if (!File::exists($tempFilePath)) { $this->error("Export failed."); return Command::FAILURE; }
            $this->info("Export successful.");

            $this->comment("Importing data from temporary file...");
            $importDriverName = ($driverType === 'openspout') ? 'open_spout_schema_import' : 'standard_schema_import';
            $importedData = Etl::driver($importDriverName)->process($tempFilePath, $schemaName);
            $this->info("Import successful.");

            $this->comment("Verifying data integrity...");
            $this->verifyData($originalData, $importedData);
        } catch (\Exception $e) {
            $this->error("An exception occurred: " . $e->getMessage());
            $this->line($e->getTraceAsString());
            return Command::FAILURE;
        } finally {
            if (!$this->option('keep-file') && File::exists($tempFilePath)) {
                File::delete($tempFilePath);
                $this->comment("Temporary file deleted.");
            }
        }
        return Command::SUCCESS;
    }

    private function verifyData(Collection $originalData, Collection $importedData)
    {
        if ($originalData->count() !== $importedData->count()) {
            $this->error(sprintf("Count mismatch. Original: %d, Imported: %d.", $originalData->count(), $importedData->count()));
            return;
        }
        $errors = 0;
        $bar = $this->output->createProgressBar($originalData->count());
        $bar->start();
        foreach ($originalData as $index => $originalModel) {
            $originalArray = $this->normalizeArray($originalModel->toArray());
            $importedArray = $this->normalizeArray($importedData->get($index));
            if (json_encode($originalArray) !== json_encode($importedArray)) {
                $this->newLine(2);
                $this->warn("Mismatch for record #{$index} (ID: {$originalModel->getKey()})");
                $this->table(['Key', 'Original (JSON)', 'Imported (JSON)'], $this->getDiff($originalArray, $importedArray));
                $errors++;
            }
            $bar->advance();
        }
        $bar->finish();
        $this->newLine(2);
        if ($errors === 0) {
            $this->info("✅ Verification successful! All {$originalData->count()} records match.");
        } else {
            $this->error("❌ Verification failed! Found {$errors} mismatched records.");
        }
    }

    private function qualifyModel(string $model): string { return Str::contains($model, '\\') ? $model : 'App\\Models\\' . Str::studly($model); }

    private function normalizeArray(array $array): array
    {
        ksort($array);
        foreach ($array as $key => &$value) {
            if ($value instanceof UTCDateTime) $value = $value->toDateTime()->format('Y-m-d H:i:s');
            if (is_object($value) && method_exists($value, 'toDateTime')) $value = $value->toDateTime()->format('Y-m-d H:i:s');
            if (is_array($value)) $value = $this->normalizeArray($value);
        }
        return $array;
    }

    private function getDiff(array $original, array $imported): array
    {
        $keys = array_unique(array_merge(array_keys($original), array_keys($imported)));
        sort($keys);
        $diff = [];
        foreach ($keys as $key) {
            $originalValue = json_encode(data_get($original, $key));
            $importedValue = json_encode(data_get($imported, $key));
            if ($originalValue !== $importedValue) {
                $diff[] = ["<fg=red>{$key}</>", $originalValue, $importedValue];
            }
        }
        return $diff;
    }
}
```

### **Addendum A: CSV Import Functionality**

This section details the necessary additions to support importing data from `.csv` files.

#### **1. Conceptual Changes**

To handle CSV files, a new import driver, `OpenSpoutCsvSchemaImportDriver`, has been created. This driver uses the same schema-driven logic as the XLSX driver but is specifically configured to read and parse the CSV format using the high-performance OpenSpout library.

The CSV file must adhere to the same "parent-child row" convention. A child row is indicated by leaving the parent columns empty, which translates to consecutive commas at the beginning of the line.

**Example `users.csv` format:**

```csv theme={null}
User ID,Full Name,Email Address,Address Street,"Address City"
"USR-001","John Doe","john.doe@example.com","123 Main St","Anytown"
,,,"456 Oak Ave","Anytown"
```

#### **2. Directory Structure Updates**

A new driver has been added to the `Imports` directory:

```
app/Services/Etl/Drivers/
└── Imports/
    ├── OpenSpoutCsvSchemaImportDriver.php   <-- NEW FILE
    ├── OpenSpoutSchemaImportDriver.php
    └── StandardSchemaImportDriver.php
```

#### **3. New Driver Source Code**

**File:** `app/Services/Etl/Drivers/Imports/OpenSpoutCsvSchemaImportDriver.php`

```php theme={null}
<?php
namespace App\Services\Etl\Drivers\Imports;

use App\Services\Etl\Contracts\ImportDriver;
use App\Services\Etl\SchemaService;
use Carbon\Carbon;
use Illuminate\Support\Collection;
use OpenSpout\Reader\CSV\Reader; // Using the CSV Reader

class OpenSpoutCsvSchemaImportDriver implements ImportDriver
{
    protected array $schema;

    public function __construct(protected SchemaService $schemaService) {}

    public function process(string $filePath, string $schemaName, array $options = []): Collection
    {
        $this->schema = $this->schemaService->get($schemaName);

        $reader = new Reader(); // Use the CSV Reader
        $reader->open($filePath);

        $results = new Collection();
        $currentDocument = null;
        $headers = [];
        $primaryKey = $this->schema['primary_key'];
        $sheet = $reader->getSheetIterator()->current();

        foreach ($sheet->getRowIterator() as $rowIndex => $row) {
            $rowData = array_map(fn($cell) => $cell->getValue(), $row->getCells());

            if ($rowIndex === 1) {
                $headers = array_map('trim', $rowData);
                continue;
            }
            if (empty(array_filter($rowData))) continue;

            $assocRow = array_combine($headers, array_pad($rowData, count($headers), null));
            $primaryKeyHeader = $this->schema['fields'][$primaryKey]['header'] ?? $primaryKey;

            if (!empty($assocRow[$primaryKeyHeader])) {
                if ($currentDocument !== null) $results->push($currentDocument);
                $currentDocument = $this->buildDocumentFromRow($assocRow);
            }

            if ($currentDocument !== null) {
                $this->addArrayItemsToDocument($currentDocument, $assocRow);
            }
        }

        if ($currentDocument !== null) $results->push($currentDocument);
        $reader->close();
        return $results;
    }

    private function buildDocumentFromRow(array $assocRow): array
    {
        $doc = [];
        foreach ($this->schema['flat_fields'] as $key => $config) {
            $header = $config['header'] ?? $key;
            $value = $assocRow[$header] ?? $config['default'] ?? null;
            data_set($doc, $key, $this->castValue($value, $config['type']));
        }
        foreach ($this->schema['array_fields'] as $key => $config) { $doc[$key] = []; }
        return $doc;
    }

    private function addArrayItemsToDocument(array &$document, array $assocRow)
    {
        foreach ($this->schema['array_fields'] as $key => $config) {
            if (isset($config['object']['type'])) {
                $header = $config['header'] ?? $key;
                if (isset($assocRow[$header]) && $assocRow[$header] !== '' && !is_null($assocRow[$header])) {
                    $document[$key][] = $this->castValue($assocRow[$header], $config['object']['type']);
                }
            } else {
                $item = []; $hasData = false;
                foreach ($config['object'] as $subKey => $subConfig) {
                    $header = $subConfig['header'] ?? "{$key}.{$subKey}";
                    if (isset($assocRow[$header]) && $assocRow[$header] !== '' && !is_null($assocRow[$header])) {
                        $hasData = true;
                        $item[$subKey] = $this->castValue($assocRow[$header], $subConfig['type']);
                    }
                }
                if ($hasData) $document[$key][] = $item;
            }
        }
    }

    private function castValue($value, string $type)
    {
        if (is_null($value)) return null;
        return match ($type) {
            'integer' => (int) $value,
            'float' => (float) $value,
            'boolean' => filter_var($value, FILTER_VALIDATE_BOOLEAN),
            'datetime' => $value instanceof \DateTime ? $value : Carbon::parse($value)->toDateTime(),
            default => (string) $value,
        };
    }
}
```

#### **4. Service Manager Update**

The `EtlManager` must be updated to register the new driver.

**File:** `app/Services/Etl/EtlManager.php`

```php theme={null}
<?php
namespace App\Services\Etl;

use App\Services\Etl\Drivers\Exports\OpenSpoutSchemaExportDriver;
use App\Services\Etl\Drivers\Exports\StandardSchemaExportDriver;
use App\Services\Etl\Drivers\Imports\OpenSpoutCsvSchemaImportDriver; // <-- ADD THIS
use App\Services\Etl\Drivers\Imports\OpenSpoutSchemaImportDriver;
use App\Services\Etl\Drivers\Imports\StandardSchemaImportDriver;
use Illuminate\Support\Manager;

class EtlManager extends Manager
{
    // ... other methods ...
    public function createOpenSpoutSchemaExportDriver(){ /* ... */ }

    // ADD THIS METHOD
    public function createOpenSpoutCsvSchemaImportDriver(){
        return new OpenSpoutCsvSchemaImportDriver(app(SchemaService::class));
    }

    public function export(): ExportBuilder { /* ... */ }
}
```

#### **5. Usage Example**

To use the new driver, simply call it by its key, `open_spout_csv_schema_import`.

```php theme={null}
use Etl;
use Illuminate\Http\Request;

public function importCsv(Request $request)
{
    $request->validate(['file' => 'required|mimes:csv,txt']);
    $filePath = $request->file('file')->getRealPath();

    $dataToInsert = Etl::driver('open_spout_csv_schema_import')
                       ->process($filePath, 'user_schema');

    // ... database logic ...
}
```
