import { createApiRef } from '@backstage/core-plugin-api';
import { DLQEvent, DLQStats } from '../../domain/DLQ';
import { Subscriptions } from '../../domain/Subscriptions';
import { DLQApiGateway } from '../../gateway/DLQApiGateway/DLQApiGateway';
import { NakadiApiGateway } from '../../gateway/NakadiApiGateway/NakadiApiGateway';

export interface DLQManager {
  getDLQStats(
    subscriptions: Subscriptions[],
    application: string,
  ): Promise<DLQStats[]>;
  getDLQEvents(subscription_id: string): Promise<DLQEvent[]>;
  deleteDLQEvent(subscription_id: string, event: DLQEvent): Promise<void>;
  redriveDLQEvents(subscription_id: string, events: DLQEvent[]): Promise<void>;
}

export const DLQApiRef = createApiRef<DLQManager>({
  id: 'plugin.nakadi.nakadi-dlq-api',
});

export class DLQManagerImpl implements DLQManager {
  private readonly dlqApiGateway: DLQApiGateway;
  private readonly nakadiApiGateway: NakadiApiGateway;

  constructor(options: {
    dlqApiGateway: DLQApiGateway;
    nakadiApiGateway: NakadiApiGateway;
  }) {
    this.dlqApiGateway = options.dlqApiGateway;
    this.nakadiApiGateway = options.nakadiApiGateway;
  }

  async getDlqStats(
    subscription: Subscriptions,
    application: string,
  ): Promise<DLQStats[]> {
    const stups_application = `stups_${application}`;

    const is_consuming =
      subscription.authorization?.readers?.some(
        reader =>
          reader.value === stups_application && reader.data_type === 'service',
      ) ?? false;

    const is_owned = subscription.owning_application === stups_application;

    const [dlqStatsRes, subscriptionStatsRes] = await Promise.all([
      this.dlqApiGateway.getDlqStatus(subscription.id),
      this.nakadiApiGateway.getSubscriptionStats(subscription.id),
    ]);

    const subscriptionStats = subscriptionStatsRes.map(stat => {
      const [unconsumed_events_defined, consumer_lag_seconds_defined] =
        stat.partitions.reduce(
          (state, item) => {
            const [ued, clsd] = state;
            return [
              ued && item.unconsumed_events !== undefined,
              clsd && item.consumer_lag_seconds !== undefined,
            ];
          },
          [true, true],
        );

      const [unconsumed_events, consumer_lag_seconds] = stat.partitions.reduce(
        (state, item) => {
          const [uncsEvents, conLag] = state;
          return [
            uncsEvents + Number(item.unconsumed_events || 0),
            Math.max(conLag, Number(item.consumer_lag_seconds || 0)),
          ];
        },
        [0, 0],
      );

      return {
        subscription_id: subscription.id,
        events_count: dlqStatsRes.count,
        consumer_lag_seconds: consumer_lag_seconds_defined
          ? consumer_lag_seconds
          : undefined,
        event_type: stat.event_type,
        unconsumed_events: unconsumed_events_defined
          ? unconsumed_events
          : undefined,
        owned: is_owned,
        consuming: is_consuming,
      };
    });

    return subscriptionStats;
  }

  async getDLQStats(
    subscriptions: Subscriptions[],
    application: string,
  ): Promise<DLQStats[]> {
    const allStats = [];
    for (const subscription of subscriptions) {
      allStats.push(this.getDlqStats(subscription, application));
    }

    return Promise.all(allStats).then(a =>
      a.reduce((accumulator, value) => accumulator.concat(value), []),
    );
  }

  async getDLQEvents(subscription_id: string): Promise<DLQEvent[]> {
    return await this.dlqApiGateway.getDlqEvents(subscription_id);
  }

  async deleteDLQEvent(
    subscription_id: string,
    event: DLQEvent,
  ): Promise<void> {
    await this.dlqApiGateway.deleteDlqEvents(subscription_id, [event]);
  }

  async redriveDLQEvents(
    subscription_id: string,
    events: DLQEvent[],
  ): Promise<void> {
    await this.dlqApiGateway.redriveDLQEvents(subscription_id, events);
  }
}
