import I18n from 'common/i18n';
import { socket } from 'common/types/dsmapi';
import { Agent } from 'common/types/gateway';
import { Revision } from 'common/types/revision';
import { NestedSource, Source } from 'common/types/source';
import { TaskSet } from 'common/types/taskSet';
import * as Links from 'datasetManagementUI/links/links';
import { parseDate } from 'datasetManagementUI/lib/parseDate';
import { Action, Dispatch, GetState, Params, RevisionId, TaskSetId } from 'datasetManagementUI/lib/types';
import {
  SourceId,
  Transform,
  WithTransform,
  InputSchema,
  OutputColumn,
  OutputSchema,
  NestedInputSchema
} from 'common/types/dsmapiSchemas';
import {
  onAgentCreated,
  onAgentsChannelJoined,
  onAgentsIndexed,
  onAgentUpdated
} from 'datasetManagementUI/reduxStuff/actions/agents';
import * as ApplyRevision from 'datasetManagementUI/reduxStuff/actions/applyRevision';
import { batchActions } from 'datasetManagementUI/reduxStuff/actions/batching';
import * as SourceActions from 'datasetManagementUI/reduxStuff/actions/createSource';
import { showFlashMessage } from 'datasetManagementUI/reduxStuff/actions/flashMessage';
import { editInputSchema } from 'datasetManagementUI/reduxStuff/actions/inputSchemas';
import { removeNotificationAfterTimeout } from 'datasetManagementUI/reduxStuff/actions/notifications';
import { editOutputSchema } from 'datasetManagementUI/reduxStuff/actions/outputSchemas';
import { editRevision } from 'datasetManagementUI/reduxStuff/actions/revisions';
import { getScanResult } from 'datasetManagementUI/reduxStuff/actions/scanResult';
import { redirectToOutputSchema } from 'datasetManagementUI/reduxStuff/actions/showOutputSchema';
import { addTaskSet, editTaskSet } from 'datasetManagementUI/reduxStuff/actions/taskSets';
import { editTransform } from 'datasetManagementUI/reduxStuff/actions/transforms';
import { getView } from 'datasetManagementUI/reduxStuff/actions/views';
import * as Selectors from 'datasetManagementUI/selectors';
import _ from 'lodash';
import { browserHistory } from 'react-router';
import { onMetadataChannelJoined } from './metadataTemplate';
import { normalizeInsertInputSchemaEvent } from 'datasetManagementUI/lib/jsonDecoders';
import { convertLegacyLabelsTemplateToLabelOptionsTemplate } from 'common/dsmapi/metadataTemplate';
import { MetadataTemplateWithDeprecatedLegacyLabels } from 'common/types/metadataTemplate';

const PROGRESS_THROTTLE_TIME = 1000;

function getOrJoinChannel(topic: string, ok = _.noop) {
  const existing = _.find(socket().channels, (c) => c.topic === topic);
  if (existing) {
    return existing;
  }
  const channel = socket().channel(topic);
  channel.join().receive('ok', ok);
  return channel;
}

export function subscribeToAllTheThings(is: InputSchema) {
  return (dispatch: Dispatch) => {
    const [os] = is.output_schemas;
    dispatch(subscribeToRowErrors(is));
    dispatch(subscribeToTotalRows(is));
    dispatch(subscribeToOutputSchema(os));
    dispatch(subscribeToTransformsInOutputSchema(os));
  };
}

export function subscribeToOutputSchemaThings(is: InputSchema) {
  return (dispatch: Dispatch) => {
    dispatch(subscribeToRowErrors(is));
    dispatch(subscribeToTotalRows(is));

    is.output_schemas.forEach((os) => {
      dispatch(subscribeToOutputSchema(os));
    });

    const transforms = _.flatMap(is.output_schemas, (os) => {
      // @ts-ignore-error the OutputColumn may gain a transform later.
      return os.output_columns.map((oc: OutputColumn & WithTransform) => oc.transform);
    });
    const distinct = _.uniqBy(transforms, (t) => t.id);
    dispatch(subscribeToTransforms(distinct));
  };
}

export function subscribeToRevision(id: RevisionId) {
  return (dispatch: Dispatch, getState: GetState) => {
    const channel = getOrJoinChannel(`revision:${id}`);

    // close over fourfour since we cannot always rely on the socket msg to
    // provide it
    const fourfour = _.get(getState(), `entities.revisions.${id}.fourfour`, '');

    channel.on('update', (revision: Revision) => {
      let rev = revision;

      if (rev.created_at) {
        rev = {
          ...rev,
          created_at: parseDate(rev.created_at)
        };
      }

      dispatch(editRevision(id, rev));

      if (rev.closed_at) {
        dispatch(getView(rev.fourfour || fourfour));
      }
    });

    channel.on('insert_task_set', (ts: TaskSet) => {
      dispatch(addTaskSet(ts));
      if (ApplyRevision.taskSetInProgress(ts)) {
        dispatch(subscribeToTaskSet(ts.id));
      }
    });
  };
}

export function joinMetadataInterpreter(revision: Revision) {
  return (dispatch: Dispatch) => {
    const topic = 'metadata_templates';
    const channel = socket().channel(topic);

    channel
      .join()
      .receive('ok', ({ templates }) => {
        const convertedTemplates = templates.map((template: MetadataTemplateWithDeprecatedLegacyLabels) =>
          convertLegacyLabelsTemplateToLabelOptionsTemplate(template));
        dispatch(onMetadataChannelJoined(channel, revision, convertedTemplates));
      })
      .receive('error', (error) => {
        console.error('unable to join metadata templates channel', error);
      });
  };
}

export function subscribeToTaskSet(id: TaskSetId) {
  return (dispatch: Dispatch) => {
    const channel = getOrJoinChannel(`task_set:${id}`);

    channel.on('update', (change: Partial<TaskSet>) => {
      dispatch(editTaskSet(id, change));
    });
  };
}

export function subscribeToAgents(revision: Revision) {
  return (dispatch: Dispatch) => {
    const topic = `agents:${revision.domain_id}`;
    const channel = socket().channel(topic);

    channel.on('insert', (newAgent: Agent) => {
      dispatch(onAgentCreated(newAgent));
    });

    channel.on('update', (updatedAgent: Partial<Agent>) => {
      dispatch(onAgentUpdated(updatedAgent));
    });

    channel.on('agents', ({ resource: agents }: { resource: Agent[] }) => {
      dispatch(onAgentsIndexed(agents));
    });

    channel
      .join()
      .receive('ok', (response) => {
        // this is for backwards compatibility. it can be removed
        // once dsmapi with async join is out in the wild
        if (response && response.resource) {
          dispatch(onAgentsIndexed(response.resource));
        }
        // end backwards compat

        dispatch(onAgentsChannelJoined(channel));
      })
      .receive('error', (error) => {
        console.error('unable to join agents channel', error);
      });
  };
}

export function subscribeToRowErrors(is: InputSchema) {
  return (dispatch: Dispatch) => {
    const channel = getOrJoinChannel(`row_errors:${is.id}`);

    channel.on('errors', ({ errors }: { errors: number }) =>
      dispatch(
        editInputSchema(is.id, {
          num_row_errors: errors
        })
      )
    );
  };
}

export function subscribeToTotalRows(is: InputSchema) {
  return (dispatch: Dispatch) => {
    const channel = getOrJoinChannel(`input_schema:${is.id}`);

    channel.on('update', ({ total_rows: totalRows }: { total_rows: number }) => {
      // sometimes there's an update that has total_rows as null, but once
      // we've recieved a total row count for the schema, we don't want to blow it away
      if (totalRows) {
        dispatch(
          editInputSchema(is.id, {
            total_rows: totalRows
          })
        );
      }
    });
  };
}

// Called on app load path, upload path, manageColMetadata and showOutputSchema
// actions (e.g. addColumn, dropColumn, which create a new OS). The point of the
// channel is to inform us of DSMAPI's progress on processing a column of data.
// The 'update' message will let us know if the processing is done or not. The
// max_ptr channel will give us a more detailed picture of that process, which
// allows us to create the progress bar.
export function subscribeToTransformsInOutputSchema(os: OutputSchema) {
  // @ts-ignore-error Since these are OutputColumns that will eventually have Transforms.
  return subscribeToTransforms(os.output_columns.map((oc: OutputColumn & WithTransform) => oc.transform));
}

function subscribeToTransforms(transforms: Transform[]) {
  return (dispatch: Dispatch) => {
    let batchedActions: Action[] = [];
    const flushTransformUpdates = _.throttle(
      () => {
        dispatch(batchActions(batchedActions));
        batchedActions = [];
      },
      PROGRESS_THROTTLE_TIME,
      { trailing: true, leading: false }
    );

    const onTransformAction = (action: Action) => {
      batchedActions.push(action);
      flushTransformUpdates();
    };

    transforms.forEach((transform) => {
      // we only want to subscribe to transforms that are NOT completed since,
      // if completed, we don't need to know about their progress. This check
      // is the reason this will be more efficient than what we are curently doing
      if (!transform.finished_at) {
        const channel = getOrJoinChannel(`transform:${transform.id}`);

        channel.on('update', (change: Partial<Transform>) => {
          onTransformAction(editTransform(transform.id, change));

          // Add a slight delay to catch any remaining messages before we clean up the channel
          if (change.finished_at || change.failed_at) {
            setTimeout(() => {
              channel.leave();
            }, 5000);
          }
        });

        channel.on('max_ptr', ({ end_row_offset: endRowOffset }: { end_row_offset: number }) =>
          onTransformAction(editTransform(transform.id, { contiguous_rows_processed: endRowOffset }))
        );

        channel.on('errors', ({ count }: { count: number }) =>
          onTransformAction(editTransform(transform.id, { error_count: count }))
        );
      }
    });
  };
}

export function subscribeToOutputSchema(os: OutputSchema) {
  return (dispatch: Dispatch, getState: GetState) => {
    const channel = getOrJoinChannel(`output_schema:${os.id}`);

    channel.on('update', (newOS: Partial<OutputSchema>) => {
      const updatedOS = {
        ...os,
        ...newOS
      };

      dispatch(editOutputSchema(os.id, updatedOS));

      // This seems like a weird place to do this cascading
      // update, but i can't find a better place
      if (updatedOS.finished_at) {
        const is = Selectors.inputSchemaFromOutputSchema(getState().entities, os.id);

        const transformActions = Selectors.columnsForOutputSchema(getState().entities, updatedOS.id)
          .filter((oc) => !oc.transform.finished_at)
          .map((oc) => {
            // send contiguous_rows_processed because we sometimes get the finished_at
            // and leave the transform channel before we get the last end_row_offset message
            const transformRows = oc.transform.contiguous_rows_processed || 0;
            return editTransform(oc.transform.id, {
              contiguous_rows_processed: is && is.total_rows > transformRows ? is.total_rows : transformRows,
              finished_at: updatedOS.finished_at
            });
          });

        dispatch(batchActions(transformActions));
      }
    });
  };
}

export function subscribeToSource(sourceId: SourceId, params: Params) {
  return (dispatch: Dispatch, getState: GetState) => {
    const getSource = () => getState().entities.sources[sourceId]!;

    const initialPathname = browserHistory.getCurrentLocation().pathname;
    const channel = getOrJoinChannel(`source:${sourceId}`);

    const onNewInputSchema = (is: NestedInputSchema) => {
      // This is a nasty wart that happened in the very early stages of building
      // dsmapi. Instead of keeping everything consistent and having an endpoint
      // for creating a source, and then endpoints for side effects, the URL source
      // endpoint did things differently.
      // It's important to note that:
      // * When uploading a source, you create the upload source
      //   which creates the resource (and allows you to subscribe to updates that you
      //   care about) *AND THEN* start sending bytes.
      // * Agents are similar, you create the agent
      //   source *AND THEN* ask the agent to send bytes to the source.
      // * View sources too, you create the view source *AND THEN* ask it to be loaded.
      // All of these happen in separate API calls, allowing you to create the resource,
      // subscribe to it, and then initiate side effects that create events on
      // that subscription. Great!
      // But URL sources work differently, becuase it I made a mistake. When you create a
      // URL source, dsmapi immediately starts pulling it in over the internet. Given that
      // you can't subscribe until after you get the source's ID back, it's very easy to
      // miss events, because the events are likely to happen before the source creation API
      // call returns.
      // So instead of relying on insert_input_schema, we have to handle any schemas that
      // we don't know about yet in the generic update event for a source.
      let os: OutputSchema | undefined;

      // A source can have more than one output schema even at this point
      // (e.g. changing the schema on a view source) so don't just take the first
      // one and redirect to it. Take the latest one. Otherwise changing a type
      // e.g. will redirect you to the wrong ouput schema preview page.
      if (is.output_schemas.length === 1) {
        os = is.output_schemas[0];
      } else {
        os = _.maxBy(is.output_schemas, 'id');
      }

      const payload = normalizeInsertInputSchemaEvent(is, getSource);

      dispatch(SourceActions.createSourceSuccess(payload));

      dispatch(subscribeToAllTheThings(is));

      const currentPathname = browserHistory.getCurrentLocation().pathname;

      // if initial pathname is the same as current, they have not navigated anywhere else so
      // its safe to redirect them to the output schema
      if (os && initialPathname === currentPathname) {
        dispatch(redirectToOutputSchema(params, os.id));
      }
    };

    channel.on('insert_input_schema', onNewInputSchema);

    channel.on('update', (changes: Partial<NestedSource>) => {
      const redirectToBlobPreview = () => browserHistory.push(Links.showBlobPreview(params, source.id));
      const originalSource = getSource();

      if (changes.finished_at) {
        // Instead of updating things in the same way we update anything else,
        // this polls for stuff
        // One day it'd be nice to have the lambda write to a queue so dsmapi
        // can actually update the state of the source
        dispatch(getScanResult(sourceId));
      }

      // See the note in onNewInputSchema for *why* we can't rely on the
      // insert_input_schema event for this functionality.
      const existingSchemas = Selectors.inputSchemas(getState().entities, sourceId).map((is) => is.id);
      (changes.schemas || [])
        .filter((schema) => !_.includes(existingSchemas, schema.id))
        .forEach(onNewInputSchema);

      dispatch(SourceActions.sourceUpdate(sourceId, _.omit(changes, 'schemas')));

      // This isn't a great place to do this - figure out a nicer way
      // TODO: aaurhgghiguh
      if (!originalSource.finished_at && changes.finished_at) {
        dispatch(removeNotificationAfterTimeout(sourceId));
      }

      const source = getSource();
      if (
        originalSource.finished_at === null &&
        changes.finished_at != null &&
        Selectors.inputSchemas(getState().entities, sourceId).length === 0
      ) {
        redirectToBlobPreview();
      }

      if (!originalSource.failure_details && changes.failure_details) {
        // we can only change the parseOption if the parseSetting is empty (allows for either)
        // this is the case where we try to parse something and fall back to a blob
        // representation, like in the case of a zip file that is not a shapefile
        if (
          changes.failure_details.key === 'unparsable_file' &&
          Selectors.getShouldParseFile(getState().entities) === null
        ) {
          dispatch(SourceActions.dontParseSource(params, source));
          redirectToBlobPreview();
        } else if (changes.failure_details.key === 'too_many_columns') {
          dispatch(
            showFlashMessage({
              kind: 'error',
              id: 'create_upload_source_too_many_columns',
              message: I18n.t('too_many_columns', { scope: 'dataset_management_ui.show_uploads' })
            })
          );
        }
      }
    });
  };
}
