const CITIES = ['Atlanta', 'Berlin', 'Dublin', 'Boston', 'Denver', 'Tokyo', 'Singapore'];
const REGEX_SSE_LINE = /^([^:]+?)\s*:\s*(.*?)\s*$/;

async function handler(event) {
  const clientReq = event.request;
  const backendResponse = await fetch(clientReq, { backend: "origin_0" });

  const filteredStream = streamFilter(backendResponse.body, {
    delimiter: '\n\n',
    parser: parseSSE,
    validator: isValidEvent
  });

  return new Response(filteredStream, {
    headers: {
      ...backendResponse.headers,
      'cache-control': 'private, no-store'
    }
  });
}

const parseSSE = (str) => {
  return str
    .split('\n')
    .map(line => line.match(REGEX_SSE_LINE))
    .filter(matchResult => matchResult !== null)
    .reduce((out, [, k, v]) => ({
      ...out,
      [k]: (k === 'data' && v) ? JSON.parse(v) : v
    }), [])
  ;
};

const isValidEvent = (sseEvent) => {
  if (!sseEvent.data) return true;
  const city = sseEvent.data.destination;
  const shouldEmit = CITIES.includes(city);
  console.log('City: '+city+', include: '+ shouldEmit);
  return shouldEmit;
};

addEventListener("fetch", event => event.respondWith(handler(event)));







// *********** HELPER FUNCTIONS *************

/**
 * Streamfilter
 * Takes a stream and returns a new stream.
 * 
 * - inputStream: The stream to filter
 * - options.delimiter: Separator between events in the stream
 * - options.parser: Function to parse a single event
 * - options.validator: Function to determine whether an event should be included in the output stream
 */
const streamFilter = (inputStream, options) => {
  let buffer = ''; 
  const decoder = new TextDecoder();
  const encoder = new TextEncoder();
  const inputReader = inputStream.getReader();
  const outputStream = new ReadableStream({
    start() {
      buffer = '';
    },

    // When the output stream is read...
    pull(controller) {

      // Read from the input stream...
      return inputReader.read().then(({value: chunk, done: readerDone}) => {
        const chunkStr = decoder.decode(chunk);
        let events = (buffer + (chunkStr || '')).split(options.delimiter);

        // If we're not done the last event will be incomplete
        if (!readerDone) {
          buffer = events.pop();
        }

        // Re-emit the events from the input stream into
        // the output stream if they meet our criteria
        events
          .filter(str => options.validator(options.parser(str)))
          .forEach(str => controller.enqueue(encoder.encode(str+options.delimiter)))
        ;

        // Flush the queue, and close the stream if we're done
        controller.enqueue(encoder.encode(''));
        if (readerDone) {
          controller.close();
        }
      });
    }
  });
  return outputStream;
}