import { call, put, select, take, takeEvery } from "redux-saga/effects";
import { eventChannel } from "redux-saga";
import { logger } from "utils/logger";
import * as notificationSocket from "services/sockets/notification-socket";

import predictionActionTypes from "modules/prediction/actionTypes";
import { actionTypes as webNotificationActionsTypes } from "modules/web-notifications/state/actions";
import { stationQueryActionTypes } from "modules/station/query";
import { actionTypes as userSetActionTypes } from "../user/state/actions";

// -------------------------
// -------- ACTIONS --------
// -------------------------

export const actionTypes = {
  NOTIFICATIONS_START_LISTEN: "resource.fetch.start",
  NOTIFICATIONS_STOP_LISTEN: "resource.fetch.stop.listen",
  RESOURCE_FETCH_SUCCESS: "resource.fetch.success"
};

export function notificationListenStartAction(accountId) {
  return { type: actionTypes.NOTIFICATIONS_START_LISTEN, payload: { accountId } };
}

export function notificationStopListenAction(accountId) {
  return { type: actionTypes.NOTIFICATIONS_STOP_LISTEN, payload: { accountId } };
}

// -------------------------
// -------- REDUCER --------
// -------------------------

let notificationSagaChannel = {};

const defaultState = {
  // Stations canonical view
  stations: {},
  error: null
};

export default function reducer(state = defaultState, action) {
  switch (action.type) {
    case actionTypes.NOTIFICATIONS_START_LISTEN: {
      const { stationId } = action.payload;
      if (stationId in state.stations) {
        return {
          ...state,
          stations: {
            ...state.stations,
            [stationId]: { ...state.stations[stationId], listening: true }
          }
        };
      } else {
        return state;
      }
    }

    default: {
      return state;
    }
  }
}

// ---------------------------
// ---------- SAGAS ----------
// ---------------------------

export function* notificationListenSaga(action) {
  const { accountId } = action.payload;

  try {
    if (!accountId) {
      return;
    }

    // This condition if here to validate that there's only 1 channel open at a time.
    // Otherwise it will trigger actions more than once.
    if (notificationSagaChannel.channel) {
      notificationSagaChannel.subscribers += 1;
      return;
    }

    try {
      yield call(notificationSocket.joinAccountEvents, accountId);
      notificationSagaChannel = {
        channel: eventChannel(emitter => {
          return notificationSocket.onUpdateNotification(event => {
            return emitter(event);
          });
        }),
        subscribers: 1
      };

      while (true) {
        try {
          const update = yield take(notificationSagaChannel.channel);
          if (!update) {
            continue;
          }
          if (update.type === "station.update") {
            const products = yield select(state => state?.account?.state?.products);
            const lines = yield select(state => state?.account?.state?.lines);

            const productsMap = (products || []).reduce((res, item) => {
              res[item._id] = item;
              return res;
            }, {});
            const linesMap = (lines || []).reduce((res, item) => {
              res[item._id] = item;
              return res;
            }, {});
            const station = update.resource.data;
            station.products = (station.products || []).map(productId => {
              if (typeof productId === "string") {
                return productsMap[productId] || {};
              }

              return productId;
            });
            station.lineList = (station.lines || []).map(line => {
              if (typeof line === "object") {
                return line?._id?.toString();
              }
              return line.toString();
            });
            station.lines = (station.lines || []).map(lineId => {
              if (typeof lineId === "string") {
                return linesMap[lineId] || {};
              }

              return lineId;
            });

            yield put({
              type: stationQueryActionTypes.ADD_STATION_TO_STATE,
              payload: { station }
            });
          } else if (update.type === "user.update.is_read_notification_center") {
            yield put({
              type: userSetActionTypes.USER_SET_STATE_IS_READ_NOTIFICATION_CENTER_REQUEST,
              payload: update.resource.data
            });
          } else if (update.type === "web-notification.create") {
            yield put({
              type: webNotificationActionsTypes.ADD_WEB_NOTIFICATION,
              payload: update.resource.data
            });
          } else if (update.type === "web-notification.update") {
            yield put({
              type: webNotificationActionsTypes.UPDATE_WEB_NOTIFICATION,
              payload: update.resource.data
            });
          } else if (update.type === "web-notification.delete") {
            yield put({
              type: webNotificationActionsTypes.HIDE_WEB_NOTIFICATION,
              payload: update.resource.data
            });
          } else if (update.type === "block.update") {
            yield put({
              type: stationQueryActionTypes.ADD_BLOCK_TO_STATE,
              payload: update.resource.data
            });
          } else if (update.type === "prediction.update") {
            yield put({
              type: predictionActionTypes.ADD_PREDICTION_TO_STATE,
              payload: { predictionResult: update.resource.data }
            });
          } else if (update.type === "station.delete") {
            yield put({
              type: stationQueryActionTypes.DELETE_STATION_FROM_STATE,
              payload: { stationId: update.resource.id }
            });
          } else if (update.type === "block.delete") {
            yield put({
              type: stationQueryActionTypes.DELETE_BLOCK_FROM_STATE,
              payload: update.resource.data
            });
          }
        } catch (error) {
          logger.error("Error in station directory saga loop", error);
        }
      }
    } catch (error) {
      logger.error("Error in station directory", error);
    } finally {
      logger.info(`Leave websockets room ${accountId}`);
      yield call(notificationSocket.leaveAccountEvents, accountId);
      if (notificationSagaChannel?.channel) {
        notificationSagaChannel.channel.close();
      }
      notificationSagaChannel = {};
    }
  } catch (error) {
    logger.error("Major Error in station directory", accountId, error);
  }
}

export function* notificationStopListenSaga(action) {
  const { accountId } = action.payload;
  yield call(notificationSocket.leaveAccountEvents, accountId);
}

export function* notificationListenWatcher() {
  yield takeEvery(actionTypes.NOTIFICATIONS_START_LISTEN, notificationListenSaga);
}

export function* notificationStopListenWatcher() {
  yield takeEvery(actionTypes.NOTIFICATIONS_STOP_LISTEN, notificationStopListenSaga);
}
