Читайте неблокирующее из нескольких fifo параллельно

1087
Ole Tange

Я иногда сижу с кучей выходных данных от программ, которые работают параллельно. Я хотел бы объединить эти пятерки. Наивное решение:

cat fifo* > output 

Но для этого необходимо завершить первый fifo перед чтением первого байта из второго fifo, и это заблокирует параллельно работающие программы.

Другой способ это:

(cat fifo1 & cat fifo2 & ... ) > output 

Но это может смешать вывод, получая в результате половину строк.

При чтении из нескольких пятерок должны быть некоторые правила для объединения файлов. Обычно мне достаточно этого построчно, поэтому я ищу что-то, что делает:

parallel_non_blocking_cat fifo* > output 

который будет читать из всех fifos параллельно и объединять вывод с полной строкой за раз.

Я вижу, что написать эту программу несложно. Все, что вам нужно сделать, это:

  1. открыть все фифы
  2. сделать блокировку выбора на всех из них
  3. читать неблокирование из fifo, в котором есть данные, в буфер для fifo
  4. если буфер содержит полную строку (или запись), распечатайте строку
  5. если все фифы закрыты / eof: выход
  6. перейти к 2

Так что мой вопрос не так : это можно сделать?

Мой вопрос: это уже сделано, и я могу просто установить инструмент, который делает это?

6

3 ответа на вопрос

1
Ole Tange

Это решение будет работать только в том случае, если число fifo меньше, чем количество заданий, которые GNU параллельно может выполнять параллельно (что ограничено дескрипторами файлов и числом процессов):

parallel -j0 --line-buffer cat ::: fifo* 

Кажется, что он может двигаться до 500 МБ / с:

window1$ mkfifo  window1$ parallel -j0 --line-buffer cat ::: | pv >/dev/null  window2$ parallel -j0 'cat bigfile > ' ::: * 

И это не смешивает половинки:

window1$ mkfifo  window1$ parallel -j0 --line-buffer cat ::: &  window2$ parallel -j0 'traceroute {}.1.1.1 > {}' ::: * 

Он читает задания параллельно (он не читает одно задание полностью, прежде чем перейти к следующему):

window1$ mkfifo  window1$ parallel -j0 --line-buffer cat ::: * > >(tr -s ABCabc)  window2$ long_lines_with_pause() { perl -e 'print STDOUT "a"x30000_000," "'  perl -e 'print STDOUT "b"x30000_000," "'  perl -e 'print STDOUT "c"x30000_000," "'  echo "$1"  sleep 2  perl -e 'print STDOUT "A"x30000_000," "'  perl -e 'print STDOUT "B"x30000_000," "'  perl -e 'print STDOUT "C"x30000_000," "'  echo "$1"  } window2$ export -f long_lines_with_pause window2$ parallel -j0 'long_lines_with_pause {} > {}' ::: * 

Здесь много «abc» (первая половина задания) будет напечатано перед «ABC» (вторая половина задания).

1
user928506

Так,

tail -q -n+1 -f --pid=stop-tail-when-this-is-gone fifo1 fifo2 fifo3 

почти работает (как уже упоминалось в первоначальных комментариях об этой более ранней версии моего ответа, хотя вам может понадобиться «для f in fifo *; cat </ dev / null> $ f & done» заранее, чтобы гарантировать, что все FIFO открыты для записи потому что coreutils tail открывает их O_RDONLY без O_NONBLOCK).

К сожалению, есть ошибка, которая tailзаключается в осторожности с окончаниями строк / записей только с входными данными из каналов на stdin, но не с входными данными из именованных каналов / FIFO в аргументах. Когда-нибудь кто-нибудь может починить coreutils хвост.

Тем временем, чтобы получить настоящую очередь для нескольких потребителей / одного производителя с учетом окончаний строк, вы можете использовать простую программу на C из 100 строк, которую я называю tailpipes.c:

#include <stdio.h> #include <stdlib.h> #include <string.h> //TODO: Find&document build environments lacking memrchr #include <unistd.h> #include <fcntl.h> #include <time.h> #include <errno.h> #include <signal.h> #include <sys/types.h> #include <sys/stat.h> #define errstr strerror(errno)  char const * const Use = "%s: %s\n\nUsage:\n\n" " %s [-p PID] [-o OPEN_MODE(RW)] [-d DLM(\\n)] [-s SEC(.01)] PATH1 PATH2..\n\n" "Read delimited records (lines by default) from all input paths, writing only\n" "complete records to stdout and changing to a stop-at-EOF mode upon receiving\n" "SIGHUP (unlike \"tail -fqn+1\" which just dies) OR when we first notice that\n" "PID does not exist (if PID is given). Since by default fifos are opened RW,\n" "signal/PID termination is needed to not loop forever, but said FIFOs may be\n" "closed & reopened by other processes as often as is convenient. For one-shot\n" "writing style, ending input reads at the first EOF, use \"-oRO\". Also, DLM\n" "adjusts the record delimiter byte from the default newline, and SEC adjusts\n" "max select sleep time. Any improperly terminated final records are sent to\n" "stderr at the end of execution (with a label and bracketing).\n";  int writer_done; void sig(int signum) { writer_done = 1; }  int main(int N, char *V[]) { signed char ch; char *buf[N-1], delim = '\n', *V0 = V[0], *eol; int len[N-1], fds[N-1], nBf[N-1], i, fdMx = 0, nS = 0, nF = 0, oFlags = O_RDWR; pid_t pid = 0; ssize_t nR, nW; struct timespec tmOut = { 0, 10000000 }; //10 ms select time out fd_set fdRdMaster, fdRd; //If we get signaled before here, this program dies and data may be lost. //If possible use -p PID option w/pre-extant PID of appropriate lifetime. signal(SIGHUP, sig); //Install sig() for SIGHUP memset((void *)fds, 0, sizeof fds); memset((void *)len, 0, sizeof len); FD_ZERO(&fdRdMaster); fdRd = fdRdMaster; while ((ch = getopt(N, V, "d:p:s:o:")) != -1) switch (ch) { //For \0 do '' as a sep CLI arg double tO; case 'd': delim = optarg ? *optarg : '\n'; break; case 'p': pid = optarg ? atoi(optarg) : 0; break; case 's': tO = optarg ? atof(optarg) : .01; tmOut.tv_sec = (long)tO; tmOut.tv_nsec = 1e9 * (tO - tmOut.tv_sec); break; case 'o': oFlags = (optarg && strcasecmp(optarg, "ro") == 0) ? O_RDONLY | O_NONBLOCK : O_RDWR; break; default: return fprintf(stderr, Use, V0, "bad option", V0), 1; } V += optind; N -= optind; //Shift off option args if (N < 1) return fprintf(stderr, Use, V0, "too few arguments", V0), 2; setvbuf(stdout, NULL, _IONBF, 65536); //Full pipe on Linux for (i = 0; i < N; i++) //Check for any available V[] if ((fds[i] = open(V[i], oFlags)) != -1) { struct stat st; fstat(fds[i], &st); if (!S_ISFIFO(st.st_mode)) return fprintf(stderr,"%s: %s not a named pipe\n", V0, V[i]), 3; nF++; FD_SET(fds[i], &fdRdMaster); //Add fd to master copy for pselect buf[i] = malloc(nBf[i] = 4096); if (fds[i] > fdMx) fdMx = fds[i]; } else if (errno == EINTR) { //We may get signaled to finish up.. i--; continue; //..before we even this far. } else return fprintf(stderr, "%s: open(%s): %s\n", V0, V[i], errstr), 3; fdMx++; fdRd = fdRdMaster; while (nF && (nS = pselect(fdMx, &fdRd, NULL, NULL, &tmOut, NULL)) != -99) { if (pid && kill(pid, 0) != 0 && errno != EPERM) //Given pid didn't exist writer_done = 1; if (nS == 0 && writer_done) //No input & no writers break; else if (nS == -1) { //Some select error: if (errno != EINTR && errno == EAGAIN) //..fatal or retry return fprintf(stderr, "%s: select: %s\n", V0, errstr), 4; continue; } for (i = 0; nS > 0 && i < N; i++) { //For all fds.. if (fds[i] < 0 || !FD_ISSET(fds[i], &fdRd)) //with readable data continue; if ((nR = read(fds[i], buf[i]+len[i], nBf[i] - len[i])) < 0) { if (errno != EAGAIN && errno != EINTR) fprintf(stderr, "%s: read: %s\n", V0, errstr); continue; } else if (oFlags == (O_RDONLY | O_NONBLOCK) && nR == 0) { FD_CLR(fds[i], &fdRdMaster); nF--; free(buf[i]); } len[i] += nR; //Update Re: read data if ((eol = memrchr(buf[i], delim, len[i]))) { nW = eol - buf[i] + 1; //Only to last delim if (fwrite(buf[i], nW, 1, stdout) == 1) { memmove(buf[i], buf[i] + nW, len[i] - nW); len[i] -= nW; //Residual buffer shift } else return fprintf(stderr, "%s: %d bytes->stdout failed: %s\n", V0, len[i], errstr), 5; } else if (len[i] == nBf[i]) { //NoDelim&FullBuf=>GROW void *tmp; if (nBf[i] >= 1 << 30) return fprintf(stderr, "%s: record > 1 GiB\n", V0), 6; nBf[i] *= 2; if (!(tmp = realloc(buf[i], nBf[i]))) return fprintf(stderr,"%s: out of memory\n", V0), 7; buf[i] = tmp; } } fdRd = fdRdMaster; } for (i = 0; i < N; i++) //Ensure any residual data is.. if (len[i] > 0) { //..labeled,bracketed,=>stderr. fprintf(stderr, "%s: %s: final unterminated record: {", V0, V[i]); fwrite(buf[i], len[i], 1, stderr); fputs("}\n", stderr); } return 0; } 

Установить вырезать и вставить & cc -Owhatever tailpipes.c -o somewhere-in-$PATH/tailpipes. Протестировано на Linux и FreeBSD. Я получаю около 2500e6 байт / сек, но память может быть быстрее, чем блок 500e6 байт / сек.

Алгоритм примерно такой, как предлагается, но более общий. O_NONBLOCK требуется только с O_RDONLY и с некоторыми опциями для простоты использования, такими как открытие FIFO O_RDWR по умолчанию, чтобы авторы могли закрывать и открывать много раз и использовать -p PID-отслеживание для протокола без гонки. Вы можете передать -oRO, чтобы использовать EOF, если хотите. tailpipesтакже обрабатывает неполные строки при завершении программы, отправляя их с метками и заключенными в квадратные скобки в stderr в случае простой постобработки, которую можно выполнить, чтобы сделать записи целыми или если их журналы будут полезны для отладки.

Пример использования. GNU xargsможет быть частью для одного потребителя, с несколькими производителями / разветвителями параллельного конвейера map-Reduce-Ish с tailpipesфункционированием в качестве разветвителя для записи границ записи, и все без дискового пространства, используемого для временных файлов:

export MYTEMP=$(mktemp -d /tmp/MYPROG.XXXXX) FIFOs=`n=0; while [ $n -lt 8 ]; do echo $MYTEMP/$n; n=$((n+1)); done` mkfifo $FIFOs sleep 2147483647 & p=$! #Cannot know xargs pid is good for long ( find . -print0 | xargs -0 -P8 --process-slot-var=MYSLOT MYPROGRAM kill $p ) & #Inform tailpipes writers are done tailpipes -p$p $FIFOs | CONSUMING-PIPELINE rm -rf $MYTEMP wait #Wait for xargs subshell to finish 

В приведенном выше описании важно, чтобы A) nпереходил 0к соответствующей верхней границе, потому что это схема, xargsдля которой используется MYSLOT, и B) MYPROGRAMнаправлял свои выходные данные во вновь назначенный $MYSLOTфайл с ключом $MYTEMP/$MYSLOT, например, exec > $MYTEMP/$MYSLOTесли MYPROGRAMэто сценарий оболочки. Оболочка оболочки / программы может быть исключена во многих случаях, если принять xargsгипотетическое --process-slot-outрешение для установки своих дочерних stdouts.

Я проверял это. Это не работает в моей системе CygWin: если я только отправляю данные в `fifo3`, ничего не происходит. Что должно произойти, это то, что данные на `fifo3` должны быть напечатаны. Вы, кажется, предполагаете, что продюсеры будут производить все время. Это не безопасное предположение. Ole Tange 6 лет назад 0
Ваш хвост, вероятно, блокируется на открытии для записи fifo1 и fifo2. Это нормально для некоторых производителей, чтобы они никогда ничего не производили, но какой-то процесс должен открывать каждый для записи, как показано в моем объяснении цикла 'cat' в более подробном примере. user928506 6 лет назад 0
Теперь я получаю неблокирующие, но смешивающие строки, что аналогично моему примеру с `cat fifo1 &`. Можете ли вы адаптировать свой пример, чтобы показать, что вы можете работать с такими программами, как `traceroute`? Ole Tange 6 лет назад 0
Лично я предпочитаю воспроизводимые примеры ошибок, но мне удалось получить смешанный вывод: #! / Bin / sh sleep 1000 & tail --pid = $! -qfn + 1 0 1> out & ((echo -na; sleep 1; echo b; sleep 1; echo -nc)> 0 & (echo 1; echo -n 2)> 1 & wait pkill -P $$ sleep ) или убить всех и т.д. Я не могу заставить форматирование работать в поле для комментариев, я получил строку "c1" с этим, и цифры и буквы не должны быть перепутаны. В свете подобных ошибок я обновил свой ответ, чтобы обеспечить более надежную программу на Си для этого использования. user928506 6 лет назад 0
«Ха. Я не могу заставить форматирование работать в поле для комментариев». И вы теперь ответили, почему я не включил MCVE в комментарий :) Ole Tange 6 лет назад 0
0
Ole Tange

Более элегантный ответ, который не буферизует бесполезную копию на диске:

#!/usr/bin/perl   use threads; use threads::shared; use Thread::Queue;  my $done :shared;  my $DataQueue = Thread::Queue->new();  my @producers; for (@ARGV) { push @producers, threads->create('producer', $_); }  while($done <= $#ARGV) { # This blocks until $DataQueue->pending > 0  print $DataQueue->dequeue(); }  for (@producers) { $_->join(); }   sub producer { open(my $fh, "<", shift) || die; while(<$fh>) { $DataQueue->enqueue($_); } # Closing $fh blocks  # close $fh;  $done++; # Guard against race condition  $DataQueue->enqueue(""); } 

Похожие вопросы