# `Mojentic.Agents.AsyncAggregatorAgent`
[🔗](https://github.com/svetzal/mojentic-ex/blob/v1.5.0/lib/mojentic/agents/async_aggregator_agent.ex#L1)

GenServer-based agent that aggregates events by correlation ID.

The aggregator waits for multiple event types before processing them together.
This is useful for coordinating parallel async operations that need to be
combined (e.g., waiting for fact-checking and answer generation before
producing a final answer).

## Features

- **Event Accumulation** - Collects events by correlation_id
- **Type Tracking** - Waits for specific event types via `event_types_needed`
- **Timeout Support** - Configurable timeout for `wait_for_events/3`
- **Custom Processing** - Override `process_events/2` callback

## State Structure

    %{
      events: %{correlation_id => [events]},
      waiters: %{correlation_id => [caller_pids]},
      event_types_needed: [EventType1, EventType2, ...]
    }

## Usage

Start the aggregator as a supervised process:

    {:ok, pid} = AsyncAggregatorAgent.start_link(
      event_types_needed: [FactCheckEvent, AnswerEvent],
      process_events_fn: &MyModule.process_events/2
    )

Or implement as a module:

    defmodule FinalAnswerAgent do
      use Mojentic.Agents.AsyncAggregatorAgent

      def start_link(opts) do
        AsyncAggregatorAgent.start_link(
          event_types_needed: [FactCheckEvent, AnswerEvent],
          process_events_fn: &__MODULE__.process_events/2,
          name: __MODULE__
        )
      end

      def process_events(events, state) do
        fact_event = Enum.find(events, &match?(%FactCheckEvent{}, &1))
        answer_event = Enum.find(events, &match?(%AnswerEvent{}, &1))

        final_event = %FinalAnswerEvent{
          source: __MODULE__,
          correlation_id: fact_event.correlation_id,
          answer: answer_event.answer,
          facts: fact_event.facts
        }

        {:ok, [final_event], state}
      end
    end

## Examples

    # Start the aggregator
    {:ok, pid} = AsyncAggregatorAgent.start_link(
      event_types_needed: [EventA, EventB],
      process_events_fn: &process/2
    )

    # Dispatch events (via dispatcher or directly)
    AsyncAggregatorAgent.receive_event(pid, event_a)
    AsyncAggregatorAgent.receive_event(pid, event_b)

    # Wait for all needed events
    {:ok, result_events} = AsyncAggregatorAgent.wait_for_events(
      pid,
      correlation_id,
      timeout: 5000
    )

# `state`

```elixir
@type state() :: %{
  events: %{required(String.t()) =&gt; [Mojentic.Event.t()]},
  results: %{required(String.t()) =&gt; [Mojentic.Event.t()]},
  waiters: %{required(String.t()) =&gt; [GenServer.from()]},
  event_types_needed: [module()],
  process_events_fn: ([Mojentic.Event.t()], state() -&gt;
                        {:ok, [Mojentic.Event.t()], state()})
}
```

# `__using__`
*macro* 

Enables using this module in your own aggregator implementations.

## Example

    defmodule MyAggregator do
      use Mojentic.Agents.AsyncAggregatorAgent

      def start_link do
        AsyncAggregatorAgent.start_link(
          event_types_needed: [EventA, EventB],
          process_events_fn: &__MODULE__.process_events/2,
          name: __MODULE__
        )
      end

      def process_events(events, state) do
        # Custom processing logic
        {:ok, [result_event], state}
      end
    end

# `child_spec`

Returns a specification to start this module under a supervisor.

See `Supervisor`.

# `receive_event`

Receives an event and processes it according to async agent behaviour.

This is the main entry point called by the dispatcher. The agent will:
1. Store the event under its correlation_id
2. Check if all needed event types have arrived
3. If complete, call `process_events_fn` and notify waiters
4. Return the resulting events

## Parameters

- `pid` - The aggregator process
- `event` - The event to process

## Returns

- `{:ok, [Event.t()]}` - Successfully processed, returns new events
- `{:error, reason}` - Processing failed

## Examples

    {:ok, events} = AsyncAggregatorAgent.receive_event(pid, event)

# `start_link`

Starts the aggregator agent as a linked process.

## Options

- `:event_types_needed` - List of event type modules to wait for (required)
- `:process_events_fn` - Function to call when all events collected (required)
- `:name` - Process registration name (optional)

## Examples

    {:ok, pid} = AsyncAggregatorAgent.start_link(
      event_types_needed: [EventA, EventB],
      process_events_fn: &MyModule.process/2
    )

# `wait_for_events`

Waits for all needed events for a specific correlation_id.

This function blocks the caller until all required event types have been
received for the given correlation_id, or until the timeout is reached.

## Parameters

- `pid` - The aggregator process
- `correlation_id` - The correlation ID to wait for
- `opts` - Keyword list with:
  - `:timeout` - Maximum wait time in milliseconds (default: 5000)

## Returns

- `{:ok, [Event.t()]}` - All events received and processed
- `{:error, :timeout}` - Timeout reached before all events arrived

## Examples

    {:ok, events} = AsyncAggregatorAgent.wait_for_events(
      pid,
      "correlation-123",
      timeout: 10_000
    )

---

*Consult [api-reference.md](api-reference.md) for complete listing*
