import { Observable } from 'rxjs';
import { share } from 'rxjs/operators';
import { Socket, io } from 'socket.io-client';
import { IEventPayload, ResourceId, SocketIoConfig } from './socket-io.model';

export class SocketIo {
  private subscribersCounter: Record<string, number> = {};
  private eventObservables$: Record<string, Observable<any>> = {};

  private socket: Socket;

  constructor(private config: SocketIoConfig) {}

  connect(featurePath?: string) {
    let url: string;
    if (featurePath) {
      url = `${this.config.url}/${featurePath}`;
    } else {
      url = this.config.url;
    }

    // creates and connects
    this.socket = io(url, this.config.options);
  }

  disconnect() {
    this.socket.disconnect();
  }

  send(event: string, ...args: unknown[]) {
    this.socket.emit(event, ...args);
  }

  join(resource: string, ids: ResourceId[]) {
    this.socket.emit('handshake', { resource, ids });
  }

  leave(resource: string, ids: ResourceId[]) {
    this.socket.emit('leave', { resource, ids });
  }

  unsubscribe() {
    this.socket.emit('leave-all');
  }

  acknowledge(ids: string[]) {
    this.socket.emit('acknowledgement', ids);
  }

  on(event: string, cb: (payload: IEventPayload<any>) => void) {
    this.socket.on(event, (payload: IEventPayload<any>) => {
      cb(payload);

      // ack packet
      if (payload?.ID) {
        this.acknowledge([payload.ID]);
      }
    });
  }

  off(event: string, listener?: (...args: any[]) => void) {
    if (!listener) {
      // Remove all listeners for that event
      return this.socket.off(event);
    }

    // Removes the specified listener from the listener array for the event named
    return this.socket.off(event, listener);
  }

  fromEvent$(event: string): Observable<any> {
    if (!this.subscribersCounter[event]) {
      this.subscribersCounter[event] = 0;
    }
    this.subscribersCounter[event]++;

    // Create a new observable if it doesn't exist
    if (!this.eventObservables$[event]) {
      this.eventObservables$[event] = new Observable<any>((obs) => {
        const listener = (payload: any) => {
          obs.next(payload.message);

          // ack packet
          if (payload?.ID) {
            this.acknowledge([payload.ID]);
          }
        };

        this.socket.on(event, listener);

        return () => {
          this.subscribersCounter[event]--;
          if (this.subscribersCounter[event] === 0) {
            this.socket.off(event, listener);
            delete this.eventObservables$[event];
          }
        };
      }).pipe(share());
    }

    // Reuse the existing observable
    return this.eventObservables$[event];
  }
}
