Объедините Apache Camel CSV с шаблоном разделителя

У меня есть большой CSV-файл, который я хочу обработать с ограничением скорости. Шаблон разделителя предоставляет именно то, что я ищу, за исключением того, что я не могу понять, как как объединить его с компонентом CSV.

Из документации по разделителю вы можете обрабатывать CSV, например:

from("file:inbox")
  .split().tokenize("\n", 1000).streaming()
     .to("activemq:queue:order");

Но в идеале я бы хотел использовать CSV-компонент Apache Camel, чтобы обрабатывать месиво, что-то вроде:

from("file:inbox")
    .unmarshal().csv().split()
    .streaming().parallelProcessing()
    .throttle(requestsPerSecond)
    .bean(new ValidateProcess(), "validate")
    .marshal().csv().to("file:outbox");

Я знаю, что приведенный выше код полностью неверен, но, надеюсь, он передает то, чего я пытаюсь достичь. Возможно ли это вообще?


person Martinffx    schedule 23.09.2016    source источник
comment
Какой аспект формата данных csv () вы хотите использовать? Например, вы хотите использовать его способность демаршалировать каждую строку как карту или вы предпочитаете списки? Маршалинг предполагает использование карты или списка ‹Map› в качестве входных данных.   -  person Darius X.    schedule 23.09.2016
comment
Я хочу демаршалировать каждую строку в List<List<String>>   -  person Martinffx    schedule 25.09.2016


Ответы (1)


Поэтому по какой-то причине я не мог понять это раньше, я думаю, у меня была проблема с моим путем к классам, который не улавливал зависимость от org.apache.camel:camel-csv. Как только я разобрался, все было хорошо.

Вот что у меня получилось:

final CsvDataFormat csv = new CsvDataFormat(";");
csv.setLazyLoad(true);
csv.setSkipFirstLine(true);

from(in).unmarshal(csv).split(body()).streaming().parallelProcessing()
                    .bean(validator, "validateNumber")
                    .filter(header(ValidateProcess.Valid).isEqualTo(true))
                    .throttle(tps).bean(validator, "validate")
                    .marshal().csv()
                    .to(out).log("done.").end();

В основном я хотел обработать поток CSV, содержащий числа, против API, скорость которого ограничена 50 TPS, и вывести результат в файл csv.

person Martinffx    schedule 25.09.2016
comment
Разве это не втягивает весь файл (в виде списка ‹List ‹String›› и применяется только потоковая передача + параллельная обработка к этому полному списку списков? Или это ваше намерение? Кроме того, прежде чем вы начнете использовать CSV, вы преобразование в карту, так как это то, что ожидает csv? - person Darius X.; 26.09.2016