GradeHistoryImportServiceImpl.java

package edu.ucsb.cs156.courses.services;

import com.opencsv.CSVReader;
import com.opencsv.CSVReaderBuilder;
import edu.ucsb.cs156.courses.entities.GradeHistory;
import edu.ucsb.cs156.courses.services.jobs.JobContext;
import edu.ucsb.cs156.courses.utilities.CourseUtilities;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpMethod;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

@Slf4j
@Service
public class GradeHistoryImportServiceImpl implements GradeHistoryImportService {

  public static class NullHeaderException extends RuntimeException {
    public NullHeaderException(String message) {
      super(message);
    }
  }

  @Autowired private JdbcTemplate jdbcTemplate;

  @Autowired private RestTemplate restTemplate;

  private static final int BATCH_SIZE = 1000;

  @Override
  public void importGradesFromUrl(String url, JobContext ctx, int batchSize) throws Exception {
    final int[] recordsProcessed = {0};

    restTemplate.execute(
        url,
        HttpMethod.GET,
        null,
        response -> {
          try (BufferedReader reader =
                  new BufferedReader(new InputStreamReader(response.getBody()));
              CSVReader csvReader = new CSVReaderBuilder(reader).build()) {

            String[] header = csvReader.readNext();
            if (header == null) throw new NullHeaderException("CSV header is missing");

            Map<String, Integer> col = mapHeaders(header);
            List<GradeHistory> buffer = new ArrayList<>();
            String[] nextLine;

            while ((nextLine = csvReader.readNext()) != null) {
              List<GradeHistory> gradesFromLine = mapLineToGrades(nextLine, col);
              buffer.addAll(gradesFromLine);

              if (buffer.size() >= batchSize) {
                flushBuffer(buffer);
                recordsProcessed[0] += buffer.size();
                ctx.log("Processed " + recordsProcessed[0] + " grade history records so far.");
                buffer.clear();
              }
            }
            recordsProcessed[0] += buffer.size();
            ctx.log("Processed " + recordsProcessed[0] + " grade history records. Done!");
            flushBuffer(buffer);

          } catch (NullHeaderException nhe) {
            log.error("Error processing CSV from URL: {}", url, nhe);
            throw nhe;
          } catch (Exception e) {
            log.error("Error processing CSV from URL: {}", url, e);
            throw new RuntimeException("CSV processing failed", e);
          }
          return null;
        });
  }

  private Map<String, Integer> mapHeaders(String[] header) {
    Map<String, Integer> map = new HashMap<>();
    for (int i = 0; i < header.length; i++) {
      map.put(header[i].trim(), i);
    }
    return map;
  }

  private List<GradeHistory> mapLineToGrades(String[] line, Map<String, Integer> col) {
    List<GradeHistory> list = new ArrayList<>();

    String year = line[col.get("year")];
    String quarter = line[col.get("quarter")];
    String yyyyq = year + CourseUtilities.quarterToDigit(quarter);
    String course = line[col.get("course")];
    String instructor = line[col.get("instructor")];

    // Map column names to cleaned Grade strings
    String[] gradeCols = {
      "Ap", "A", "Am", "Bp", "B", "Bm", "Cp", "C", "Cm", "Dp", "D", "Dm", "F", "P", "S"
    };

    for (String grade : gradeCols) {
      if (col.containsKey(grade)) {
        String val = line[col.get(grade)];
        String convertedGrade = grade.replace("p", "+").replace("m", "-");
        int count = (val.isEmpty()) ? 0 : Integer.parseInt(val);
        if (count > 0) {
          list.add(
              GradeHistory.builder()
                  .yyyyq(yyyyq)
                  .course(course)
                  .instructor(instructor)
                  .grade(convertedGrade)
                  .count(count)
                  .build());
        }
      }
    }

    int countNP = calculateNP(line, col);
    if (countNP > 0) {
      list.add(
          GradeHistory.builder()
              .yyyyq(yyyyq)
              .course(course)
              .instructor(instructor)
              .grade("NP")
              .count(countNP)
              .build());
    }
    return list;
  }

  private int calculateNP(String[] line, Map<String, Integer> col) {
    String pVal = line[col.get("P")];
    String nPnpVal = line[col.get("nPNPStudents")];

    int pCount = (pVal.isEmpty()) ? 0 : Integer.parseInt(pVal);
    int nPnpCount = (nPnpVal.isEmpty()) ? 0 : Integer.parseInt(nPnpVal);

    return nPnpCount - pCount;
  }

  private void flushBuffer(List<GradeHistory> buffer) {
    // Note: 'count' is excluded from the ON clause because it is the value we want
    // to update
    String sql =
        """
            MERGE INTO "historygrade" AS t
            USING (VALUES (?, ?, ?, ?, ?)) AS s(yyyyq, course, instructor, grade, count)
            ON (t."yyyyq" = s.yyyyq AND t."course" = s.course AND t."instructor" = s.instructor AND t."grade" = s.grade)
            WHEN MATCHED THEN
                UPDATE SET "count" = s.count
            WHEN NOT MATCHED THEN
                INSERT ("yyyyq", "course", "instructor", "grade", "count")
                VALUES (s.yyyyq, s.course, s.instructor, s.grade, s.count);
        """;

    jdbcTemplate.batchUpdate(
        sql,
        buffer,
        BATCH_SIZE,
        (ps, entity) -> {
          ps.setString(1, entity.getYyyyq());
          ps.setString(2, entity.getCourse());
          ps.setString(3, entity.getInstructor());
          ps.setString(4, entity.getGrade());
          ps.setInt(5, entity.getCount());
        });
  }
}