import { createApiRef } from '@backstage/core-plugin-api';
import { Subscriptions } from '../../domain/Subscriptions';
import { NakadiApiGateway } from '../../gateway/NakadiApiGateway/NakadiApiGateway';

export interface SubscriptionsManager {
  getSubscriptions(applicationName: string): Promise<Subscriptions[]>;
  getSubscriptionStats(subscriptionId: string): Promise<
    {
      event_type: string;
      unconsumed_events?: number;
      consumer_lag_seconds?: number;
      number_of_partitions: number;
    }[]
  >;
}

export const subscriptionApiRef = createApiRef<SubscriptionsManager>({
  id: 'plugin.nakadi.nakadi-subscriptions-api',
});

export class SubscriptionsManagerImpl implements SubscriptionsManager {
  private readonly nakadiApiGateway;

  constructor(options: { nakadiApiGateway: NakadiApiGateway }) {
    this.nakadiApiGateway = options.nakadiApiGateway;
  }

  async getSubscriptions(applicationName: string) {
    const [byWriter, byStupsOwningApp, byOwningApp] = await Promise.all([
      this.nakadiApiGateway.getSubscriptionsByReader(
        `stups_${applicationName}`,
      ),
      this.nakadiApiGateway.getSubscriptionsByOwningApp(
        `stups_${applicationName}`,
      ),
      this.nakadiApiGateway.getSubscriptionsByOwningApp(applicationName),
    ]);
    return [...byWriter, ...byStupsOwningApp, ...byOwningApp].filter(
      (sub, index, self) => {
        return self.findIndex(_sub => _sub.id === sub.id) === index;
      },
    );
  }

  async getSubscriptionStats(subscriptionId: string) {
    const allSubscriptionStats =
      await this.nakadiApiGateway.getSubscriptionStats(subscriptionId);

    return allSubscriptionStats.map(stat => {
      const [unconsumed_events_defined, consumer_lag_seconds_defined] =
        stat.partitions.reduce(
          (state, item) => {
            const [ued, clsd] = state;
            return [
              ued && item.unconsumed_events !== undefined,
              clsd && item.consumer_lag_seconds !== undefined,
            ];
          },
          [true, true],
        );
      const [unconsumed_events, consumer_lag_seconds] = stat.partitions.reduce(
        (state, item) => {
          const [uncsEvents, conLag] = state;
          return [
            uncsEvents + Number(item.unconsumed_events || 0),
            Math.max(conLag, Number(item.consumer_lag_seconds || 0)),
          ];
        },
        [0, 0],
      );
      return {
        event_type: stat.event_type,
        unconsumed_events: unconsumed_events_defined
          ? unconsumed_events
          : undefined,
        consumer_lag_seconds: consumer_lag_seconds_defined
          ? consumer_lag_seconds
          : undefined,
        number_of_partitions: stat.partitions.length,
      };
    });
  }
}
