1 /**
2 Copyright: Copyright (c) 2020, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 */
6 module proc;
7 
8 import core.thread : Thread;
9 import core.time : dur, Duration;
10 import logger = std.experimental.logger;
11 import std.algorithm : filter, count, joiner, map;
12 import std.array : appender, empty, array;
13 import std.exception : collectException;
14 import std.stdio : File, fileno, writeln;
15 import std.typecons : Flag, Yes;
16 static import std.process;
17 static import std.stdio;
18 
19 public import proc.channel;
20 public import proc.pid;
21 
22 version (unittest) {
23     import unit_threaded.assertions;
24     import std.file : remove;
25 }
26 
27 /// Automatically terminate the process when it goes out of scope.
28 auto scopeKill(T)(T p) {
29     return ScopeKill!T(p);
30 }
31 
32 struct ScopeKill(T) {
33     T process;
34     alias process this;
35 
36     ~this() {
37         process.dispose();
38     }
39 }
40 
41 /// Async process wrapper for a std.process SpawnProcess
42 struct SpawnProcess {
43     import std.algorithm : among;
44 
45     private {
46         enum State {
47             running,
48             terminated,
49             exitCode
50         }
51 
52         std.process.Pid process;
53         RawPid pid;
54         int status_;
55         State st;
56     }
57 
58     this(std.process.Pid process) @safe {
59         this.process = process;
60         this.pid = process.osHandle.RawPid;
61     }
62 
63     /// Returns: The raw OS handle for the process ID.
64     RawPid osHandle() nothrow @safe {
65         return pid;
66     }
67 
68     /// Kill and cleanup the process.
69     void dispose() @safe {
70         final switch (st) {
71         case State.running:
72             this.kill;
73             this.wait;
74             break;
75         case State.terminated:
76             this.wait;
77             break;
78         case State.exitCode:
79             break;
80         }
81 
82         st = State.exitCode;
83     }
84 
85     /// Kill the process.
86     void kill() nothrow @trusted {
87         import core.sys.posix.signal : SIGKILL;
88 
89         final switch (st) {
90         case State.running:
91             break;
92         case State.terminated:
93             goto case;
94         case State.exitCode:
95             return;
96         }
97 
98         try {
99             std.process.kill(process, SIGKILL);
100         } catch (Exception e) {
101         }
102 
103         st = State.terminated;
104     }
105 
106     /// Blocking wait for the process to terminated.
107     /// Returns: the exit status.
108     int wait() @safe {
109         final switch (st) {
110         case State.running:
111             status_ = std.process.wait(process);
112             break;
113         case State.terminated:
114             status_ = std.process.wait(process);
115             break;
116         case State.exitCode:
117             break;
118         }
119 
120         st = State.exitCode;
121 
122         return status_;
123     }
124 
125     /// Non-blocking wait for the process termination.
126     /// Returns: `true` if the process has terminated.
127     bool tryWait() @safe {
128         final switch (st) {
129         case State.running:
130             auto s = std.process.tryWait(process);
131             if (s.terminated) {
132                 st = State.exitCode;
133                 status_ = s.status;
134             }
135             break;
136         case State.terminated:
137             status_ = std.process.wait(process);
138             st = State.exitCode;
139             break;
140         case State.exitCode:
141             break;
142         }
143 
144         return st.among(State.terminated, State.exitCode) != 0;
145     }
146 
147     /// Returns: The exit status of the process.
148     int status() @safe {
149         if (st != State.exitCode) {
150             throw new Exception(
151                     "Process has not terminated and wait/tryWait been called to collect the exit status");
152         }
153         return status_;
154     }
155 
156     /// Returns: If the process has terminated.
157     bool terminated() @safe {
158         return st.among(State.terminated, State.exitCode) != 0;
159     }
160 }
161 
162 /// Async process that do not block on read from stdin/stderr.
163 struct PipeProcess {
164     import std.algorithm : among;
165 
166     private {
167         enum State {
168             running,
169             terminated,
170             exitCode
171         }
172 
173         std.process.ProcessPipes process;
174         Pipe pipe_;
175         FileReadChannel stderr_;
176         int status_;
177         State st;
178         RawPid pid;
179     }
180 
181     this(std.process.ProcessPipes process) @safe {
182         this.process = process;
183         this.pipe_ = Pipe(this.process.stdout, this.process.stdin);
184         this.stderr_ = FileReadChannel(this.process.stderr);
185         this.pid = process.pid.osHandle.RawPid;
186     }
187 
188     /// Returns: The raw OS handle for the process ID.
189     RawPid osHandle() nothrow @safe {
190         return this.pid;
191     }
192 
193     /// Access to stdin and stdout.
194     ref Pipe pipe() return scope nothrow @safe {
195         return pipe_;
196     }
197 
198     /// Access stderr.
199     ref FileReadChannel stderr() return scope nothrow @safe {
200         return stderr_;
201     }
202 
203     /// Kill and cleanup the process.
204     void dispose() @safe {
205         final switch (st) {
206         case State.running:
207             this.kill;
208             this.wait;
209             .destroy(process);
210             break;
211         case State.terminated:
212             this.wait;
213             .destroy(process);
214             break;
215         case State.exitCode:
216             break;
217         }
218 
219         st = State.exitCode;
220     }
221 
222     /// Kill the process.
223     void kill() nothrow @trusted {
224         import core.sys.posix.signal : SIGKILL;
225 
226         final switch (st) {
227         case State.running:
228             break;
229         case State.terminated:
230             return;
231         case State.exitCode:
232             return;
233         }
234 
235         try {
236             std.process.kill(process.pid, SIGKILL);
237         } catch (Exception e) {
238         }
239 
240         st = State.terminated;
241     }
242 
243     /// Blocking wait for the process to terminated.
244     /// Returns: the exit status.
245     int wait() @safe {
246         final switch (st) {
247         case State.running:
248             status_ = std.process.wait(process.pid);
249             break;
250         case State.terminated:
251             status_ = std.process.wait(process.pid);
252             break;
253         case State.exitCode:
254             break;
255         }
256 
257         st = State.exitCode;
258 
259         return status_;
260     }
261 
262     /// Non-blocking wait for the process termination.
263     /// Returns: `true` if the process has terminated.
264     bool tryWait() @safe {
265         final switch (st) {
266         case State.running:
267             auto s = std.process.tryWait(process.pid);
268             if (s.terminated) {
269                 st = State.exitCode;
270                 status_ = s.status;
271             }
272             break;
273         case State.terminated:
274             status_ = std.process.wait(process.pid);
275             st = State.exitCode;
276             break;
277         case State.exitCode:
278             break;
279         }
280 
281         return st.among(State.terminated, State.exitCode) != 0;
282     }
283 
284     /// Returns: The exit status of the process.
285     int status() @safe {
286         if (st != State.exitCode) {
287             throw new Exception(
288                     "Process has not terminated and wait/tryWait been called to collect the exit status");
289         }
290         return status_;
291     }
292 
293     /// Returns: If the process has terminated.
294     bool terminated() @safe {
295         return st.among(State.terminated, State.exitCode) != 0;
296     }
297 }
298 
299 SpawnProcess spawnProcess(scope const(char[])[] args, File stdin = std.stdio.stdin,
300         File stdout = std.stdio.stdout, File stderr = std.stdio.stderr,
301         const string[string] env = null, std.process.Config config = std.process.Config.none,
302         scope const char[] workDir = null) {
303     return SpawnProcess(std.process.spawnProcess(args, stdin, stdout, stderr,
304             env, config, workDir));
305 }
306 
307 SpawnProcess spawnProcess(scope const(char[])[] args, const string[string] env,
308         std.process.Config config = std.process.Config.none, scope const(char)[] workDir = null) {
309     return SpawnProcess(std.process.spawnProcess(args, std.stdio.stdin,
310             std.stdio.stdout, std.stdio.stderr, env, config, workDir));
311 }
312 
313 SpawnProcess spawnProcess(scope const(char)[] program,
314         File stdin = std.stdio.stdin, File stdout = std.stdio.stdout,
315         File stderr = std.stdio.stderr, const string[string] env = null,
316         std.process.Config config = std.process.Config.none, scope const(char)[] workDir = null) {
317     return SpawnProcess(std.process.spawnProcess((&program)[0 .. 1], stdin,
318             stdout, stderr, env, config, workDir));
319 }
320 
321 SpawnProcess spawnShell(scope const(char)[] command, File stdin = std.stdio.stdin,
322         File stdout = std.stdio.stdout, File stderr = std.stdio.stderr,
323         scope const string[string] env = null, std.process.Config config = std.process.Config.none,
324         scope const(char)[] workDir = null, scope string shellPath = std.process.nativeShell) {
325     return SpawnProcess(std.process.spawnShell(command, stdin, stdout, stderr,
326             env, config, workDir, shellPath));
327 }
328 
329 /// ditto
330 SpawnProcess spawnShell(scope const(char)[] command, scope const string[string] env,
331         std.process.Config config = std.process.Config.none,
332         scope const(char)[] workDir = null, scope string shellPath = std.process.nativeShell) {
333     return SpawnProcess(std.process.spawnShell(command, env, config, workDir, shellPath));
334 }
335 
336 PipeProcess pipeProcess(scope const(char[])[] args,
337         std.process.Redirect redirect = std.process.Redirect.all,
338         const string[string] env = null, std.process.Config config = std.process.Config.none,
339         scope const(char)[] workDir = null) @safe {
340     return PipeProcess(std.process.pipeProcess(args, redirect, env, config, workDir));
341 }
342 
343 PipeProcess pipeShell(scope const(char)[] command,
344         std.process.Redirect redirect = std.process.Redirect.all,
345         const string[string] env = null, std.process.Config config = std.process.Config.none,
346         scope const(char)[] workDir = null, string shellPath = std.process.nativeShell) @safe {
347     return PipeProcess(std.process.pipeShell(command, redirect, env, config, workDir, shellPath));
348 }
349 
350 /** Moves the process to a separate process group and on exit kill it and all
351  * its children.
352  */
353 struct Sandbox(ProcessT) {
354     private {
355         ProcessT p;
356         RawPid pid;
357     }
358 
359     this(ProcessT p) @safe {
360         import core.sys.posix.unistd : setpgid;
361 
362         this.p = p;
363         this.pid = p.osHandle;
364         setpgid(pid, 0);
365     }
366 
367     RawPid osHandle() nothrow @safe {
368         return pid;
369     }
370 
371     static if (__traits(hasMember, ProcessT, "pipe")) {
372         ref Pipe pipe() nothrow @safe {
373             return p.pipe;
374         }
375     }
376 
377     static if (__traits(hasMember, ProcessT, "stderr")) {
378         ref FileReadChannel stderr() nothrow @safe {
379             return p.stderr;
380         }
381     }
382 
383     void dispose() @safe {
384         // this also reaps the children thus cleaning up zombies
385         this.kill;
386         p.dispose;
387     }
388 
389     void kill() nothrow @safe {
390         // must first retrieve the submap because after the process is killed
391         // its children may have changed.
392         auto pmap = makePidMap.getSubMap(pid);
393 
394         p.kill;
395 
396         // only kill and reap the children
397         pmap.remove(pid);
398         proc.pid.kill(pmap, Yes.onlyCurrentUser).reap;
399     }
400 
401     int wait() @safe {
402         return p.wait;
403     }
404 
405     bool tryWait() @safe {
406         return p.tryWait;
407     }
408 
409     int status() @safe {
410         return p.status;
411     }
412 
413     bool terminated() @safe {
414         return p.terminated;
415     }
416 }
417 
418 auto sandbox(T)(T p) @safe {
419     return Sandbox!T(p);
420 }
421 
422 @("shall terminate a group of processes")
423 unittest {
424     import std.datetime.stopwatch : StopWatch, AutoStart;
425 
426     immutable scriptName = makeScript(`#!/bin/bash
427 sleep 10m &
428 sleep 10m &
429 sleep 10m
430 `);
431     scope (exit)
432         remove(scriptName);
433 
434     auto p = pipeProcess([scriptName]).sandbox.scopeKill;
435     waitUntilChildren(p.osHandle, 3);
436     const preChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length;
437     p.kill;
438     Thread.sleep(500.dur!"msecs"); // wait for the OS to kill the children
439     const postChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length;
440 
441     p.wait.shouldEqual(-9);
442     p.terminated.shouldBeTrue;
443     preChildren.shouldEqual(3);
444     postChildren.shouldEqual(0);
445 }
446 
447 /** dispose the process after the timeout.
448  */
449 struct Timeout(ProcessT) {
450     import std.algorithm : among;
451     import std.datetime : Clock, Duration;
452     import core.thread;
453     import std.typecons : RefCounted, refCounted;
454 
455     private {
456         enum Msg {
457             none,
458             stop,
459             status,
460         }
461 
462         enum Reply {
463             none,
464             running,
465             normalDeath,
466             killedByTimeout,
467         }
468 
469         static struct Payload {
470             ProcessT p;
471             RawPid pid;
472             Background background;
473             Reply backgroundReply;
474         }
475 
476         RefCounted!Payload rc;
477     }
478 
479     this(ProcessT p, Duration timeout) @trusted {
480         import std.algorithm : move;
481 
482         auto pid = p.osHandle;
483         rc = refCounted(Payload(move(p), pid));
484         rc.background = new Background(&rc.p, timeout);
485         rc.background.isDaemon = true;
486         rc.background.start;
487     }
488 
489     private static class Background : Thread {
490         import core.sync.condition : Condition;
491         import core.sync.mutex : Mutex;
492 
493         Duration timeout;
494         ProcessT* p;
495         Mutex mtx;
496         Msg[] msg;
497         Reply reply_;
498         RawPid pid;
499 
500         this(ProcessT* p, Duration timeout) {
501             this.p = p;
502             this.timeout = timeout;
503             this.mtx = new Mutex();
504             this.pid = p.osHandle;
505 
506             super(&run);
507         }
508 
509         void run() {
510             checkProcess(this.pid, this.timeout, this);
511         }
512 
513         void put(Msg msg) @trusted nothrow {
514             this.mtx.lock_nothrow();
515             scope (exit)
516                 this.mtx.unlock_nothrow();
517             this.msg ~= msg;
518         }
519 
520         Msg popMsg() @trusted nothrow {
521             this.mtx.lock_nothrow();
522             scope (exit)
523                 this.mtx.unlock_nothrow();
524             if (msg.empty)
525                 return Msg.none;
526             auto rval = msg[$ - 1];
527             msg = msg[0 .. $ - 1];
528             return rval;
529         }
530 
531         void setReply(Reply reply_) @trusted nothrow {
532             this.mtx.lock_nothrow();
533             scope (exit)
534                 this.mtx.unlock_nothrow();
535             this.reply_ = reply_;
536         }
537 
538         Reply reply() @trusted nothrow {
539             this.mtx.lock_nothrow();
540             scope (exit)
541                 this.mtx.unlock_nothrow();
542             return reply_;
543         }
544 
545         void kill() @trusted nothrow {
546             this.mtx.lock_nothrow();
547             scope (exit)
548                 this.mtx.unlock_nothrow();
549             p.kill;
550         }
551     }
552 
553     private static void checkProcess(RawPid p, Duration timeout, Background bg) nothrow {
554         import core.sys.posix.signal : SIGKILL;
555         import std.algorithm : max, min;
556         import std.variant : Variant;
557         static import core.sys.posix.signal;
558 
559         const stopAt = Clock.currTime + timeout;
560         // the purpose is to poll the process often "enough" that if it
561         // terminates early `Process` detects it fast enough. 1000 is chosen
562         // because it "feels good". the purpose
563         auto sleepInterval = min(500, max(20, timeout.total!"msecs" / 1000)).dur!"msecs";
564 
565         bool forceStop;
566         bool running = true;
567         while (running && Clock.currTime < stopAt) {
568             const msg = bg.popMsg;
569 
570             final switch (msg) {
571             case Msg.none:
572                 Thread.sleep(sleepInterval);
573                 break;
574             case Msg.stop:
575                 forceStop = true;
576                 running = false;
577                 break;
578             case Msg.status:
579                 bg.setReply(Reply.running);
580                 break;
581             }
582 
583             if (core.sys.posix.signal.kill(p, 0) == -1) {
584                 running = false;
585             }
586         }
587 
588         // may be children alive thus must ensure that the whole process tree
589         // is killed if this is a sandbox with a timeout.
590         bg.kill;
591 
592         if (!forceStop && Clock.currTime >= stopAt) {
593             bg.setReply(Reply.killedByTimeout);
594         } else {
595             bg.setReply(Reply.normalDeath);
596         }
597     }
598 
599     RawPid osHandle() nothrow @trusted {
600         return rc.pid;
601     }
602 
603     ref Pipe pipe() nothrow @trusted {
604         return rc.p.pipe;
605     }
606 
607     ref FileReadChannel stderr() nothrow @trusted {
608         return rc.p.stderr;
609     }
610 
611     void dispose() @trusted {
612         if (rc.backgroundReply.among(Reply.none, Reply.running)) {
613             rc.background.put(Msg.stop);
614             rc.background.join;
615             rc.backgroundReply = rc.background.reply;
616         }
617         rc.p.dispose;
618     }
619 
620     void kill() nothrow @trusted {
621         rc.background.kill;
622     }
623 
624     int wait() @trusted {
625         while (!this.tryWait) {
626             Thread.sleep(20.dur!"msecs");
627         }
628         return rc.p.wait;
629     }
630 
631     bool tryWait() @trusted {
632         return rc.p.tryWait;
633     }
634 
635     int status() @trusted {
636         return rc.p.status;
637     }
638 
639     bool terminated() @trusted {
640         return rc.p.terminated;
641     }
642 
643     bool timeoutTriggered() @trusted {
644         if (rc.backgroundReply.among(Reply.none, Reply.running)) {
645             rc.background.put(Msg.status);
646             rc.backgroundReply = rc.background.reply;
647         }
648         return rc.backgroundReply == Reply.killedByTimeout;
649     }
650 }
651 
652 auto timeout(T)(T p, Duration timeout_) @trusted {
653     return Timeout!T(p, timeout_);
654 }
655 
656 /// Returns when the process has pending data.
657 void waitForPendingData(ProcessT)(Process p) {
658     while (!p.pipe.hasPendingData || !p.stderr.hasPendingData) {
659         Thread.sleep(20.dur!"msecs");
660     }
661 }
662 
663 @("shall kill the process after the timeout")
664 unittest {
665     import std.datetime.stopwatch : StopWatch, AutoStart;
666 
667     auto p = pipeProcess(["sleep", "1m"]).timeout(100.dur!"msecs").scopeKill;
668     auto sw = StopWatch(AutoStart.yes);
669     p.wait;
670     sw.stop;
671 
672     sw.peek.shouldBeGreaterThan(100.dur!"msecs");
673     sw.peek.shouldBeSmallerThan(500.dur!"msecs");
674     p.wait.shouldEqual(-9);
675     p.terminated.shouldBeTrue;
676     p.status.shouldEqual(-9);
677     p.timeoutTriggered.shouldBeTrue;
678 }
679 
680 struct DrainElement {
681     enum Type {
682         stdout,
683         stderr,
684     }
685 
686     Type type;
687     const(ubyte)[] data;
688 
689     /// Returns: iterates the data as an input range.
690     auto byUTF8() @safe pure nothrow const @nogc {
691         static import std.utf;
692 
693         return std.utf.byUTF!(const(char))(cast(const(char)[]) data);
694     }
695 
696     bool empty() @safe pure nothrow const @nogc {
697         return data.length == 0;
698     }
699 }
700 
701 /** A range that drains a process stdout/stderr until it terminates.
702  *
703  * There may be `DrainElement` that are empty.
704  */
705 struct DrainRange(ProcessT) {
706     enum State {
707         start,
708         draining,
709         lastStdout,
710         lastStderr,
711         lastElement,
712         empty,
713     }
714 
715     private {
716         Duration timeout;
717         ProcessT p;
718         DrainElement front_;
719         State st;
720         ubyte[] buf;
721         ubyte[] bufRead;
722     }
723 
724     this(ProcessT p, Duration timeout) {
725         this.p = p;
726         this.buf = new ubyte[4096];
727         this.timeout = timeout;
728     }
729 
730     DrainElement front() @safe pure nothrow const @nogc {
731         assert(!empty, "Can't get front of an empty range");
732         return front_;
733     }
734 
735     void popFront() @safe {
736         assert(!empty, "Can't pop front of an empty range");
737 
738         bool isAnyPipeOpen() {
739             return (p.pipe.hasData || p.stderr.hasData) && !p.terminated;
740         }
741 
742         void readData() @safe {
743             if (p.stderr.hasData && p.stderr.hasPendingData) {
744                 front_ = DrainElement(DrainElement.Type.stderr);
745                 bufRead = p.stderr.read(buf);
746             } else if (p.pipe.hasData && p.pipe.hasPendingData) {
747                 front_ = DrainElement(DrainElement.Type.stdout);
748                 bufRead = p.pipe.read(buf);
749             }
750         }
751 
752         void waitUntilData() @safe {
753             // may livelock if the process never terminates and never writes to
754             // the terminal. waitTime ensure that it sooner or later is
755             // interrupted. It lets e.g the timeout handling to kill the
756             // process.
757             const s = 20.dur!"msecs";
758             Duration waitTime;
759             while (waitTime < timeout) {
760                 import core.thread : Thread;
761                 import core.time : dur;
762 
763                 readData();
764                 if (front_.data.empty) {
765                     () @trusted { Thread.sleep(s); }();
766                     waitTime += s;
767                 }
768 
769                 if (!(bufRead.empty && isAnyPipeOpen)) {
770                     front_.data = bufRead.dup;
771                     break;
772                 }
773             }
774         }
775 
776         front_ = DrainElement.init;
777         bufRead = null;
778 
779         final switch (st) {
780         case State.start:
781             st = State.draining;
782             waitUntilData;
783             break;
784         case State.draining:
785             if (isAnyPipeOpen) {
786                 waitUntilData();
787             } else {
788                 st = State.lastStdout;
789             }
790             break;
791         case State.lastStdout:
792             if (p.pipe.hasData && p.pipe.hasPendingData) {
793                 front_ = DrainElement(DrainElement.Type.stdout);
794                 bufRead = p.pipe.read(buf);
795             }
796 
797             front_.data = bufRead.dup;
798             if (!p.pipe.hasData || p.terminated) {
799                 st = State.lastStderr;
800             }
801             break;
802         case State.lastStderr:
803             if (p.stderr.hasData && p.stderr.hasPendingData) {
804                 front_ = DrainElement(DrainElement.Type.stderr);
805                 bufRead = p.stderr.read(buf);
806             }
807 
808             front_.data = bufRead.dup;
809             if (!p.stderr.hasData || p.terminated) {
810                 st = State.lastElement;
811             }
812             break;
813         case State.lastElement:
814             st = State.empty;
815             break;
816         case State.empty:
817             break;
818         }
819     }
820 
821     bool empty() @safe pure nothrow const @nogc {
822         return st == State.empty;
823     }
824 }
825 
826 /// Drain a process pipe until empty.
827 auto drain(T)(T p, Duration timeout) {
828     return DrainRange!T(p, timeout);
829 }
830 
831 /// Read the data from a ReadChannel by line.
832 struct DrainByLineCopyRange(ProcessT) {
833     private {
834         ProcessT process;
835         DrainRange!ProcessT range;
836         const(ubyte)[] buf;
837         const(char)[] line;
838     }
839 
840     this(ProcessT p, Duration timeout) @safe {
841         process = p;
842         range = p.drain(timeout);
843     }
844 
845     string front() @trusted pure nothrow const @nogc {
846         import std.exception : assumeUnique;
847 
848         assert(!empty, "Can't get front of an empty range");
849         return line.assumeUnique;
850     }
851 
852     void popFront() @safe {
853         assert(!empty, "Can't pop front of an empty range");
854         import std.algorithm : countUntil;
855         import std.array : array;
856         static import std.utf;
857 
858         void fillBuf() {
859             if (!range.empty) {
860                 range.popFront;
861             }
862             if (!range.empty) {
863                 buf ~= range.front.data;
864             }
865         }
866 
867         size_t idx;
868         do {
869             fillBuf();
870             idx = buf.countUntil('\n');
871         }
872         while (!range.empty && idx == -1);
873 
874         const(ubyte)[] tmp;
875         if (buf.empty) {
876             // do nothing
877         } else if (idx == -1) {
878             tmp = buf;
879             buf = null;
880         } else {
881             idx = () {
882                 if (idx < buf.length) {
883                     return idx + 1;
884                 }
885                 return idx;
886             }();
887             tmp = buf[0 .. idx];
888             buf = buf[idx .. $];
889         }
890 
891         if (!tmp.empty && tmp[$ - 1] == '\n') {
892             tmp = tmp[0 .. $ - 1];
893         }
894 
895         line = std.utf.byUTF!(const(char))(cast(const(char)[]) tmp).array;
896     }
897 
898     bool empty() @safe pure nothrow const @nogc {
899         return range.empty && buf.empty && line.empty;
900     }
901 }
902 
903 @("shall drain the process output by line")
904 unittest {
905     import std.algorithm : filter, joiner, map;
906     import std.array : array;
907 
908     auto p = pipeProcess(["dd", "if=/dev/zero", "bs=10", "count=3"]).scopeKill;
909     auto res = p.process.drainByLineCopy(1.dur!"minutes").filter!"!a.empty".array;
910 
911     res.length.shouldEqual(4);
912     res.joiner.count.shouldBeGreaterThan(30);
913     p.wait.shouldEqual(0);
914     p.terminated.shouldBeTrue;
915 }
916 
917 auto drainByLineCopy(T)(T p, Duration timeout) @safe {
918     return DrainByLineCopyRange!T(p, timeout);
919 }
920 
921 /// Drain the process output until it is done executing.
922 auto drainToNull(T)(T p, Duration timeout) @safe {
923     foreach (l; p.drain(timeout)) {
924     }
925     return p;
926 }
927 
928 /// Drain the output from the process into an output range.
929 auto drain(ProcessT, T)(ProcessT p, ref T range, Duration timeout) {
930     foreach (l; p.drain(timeout)) {
931         range.put(l);
932     }
933     return p;
934 }
935 
936 @("shall drain the output of a process while it is running with a separation of stdout and stderr")
937 unittest {
938     auto p = pipeProcess(["dd", "if=/dev/urandom", "bs=10", "count=3"]).scopeKill;
939     auto res = p.process.drain(1.dur!"minutes").array;
940 
941     // this is just a sanity check. It has to be kind a high because there is
942     // some wiggleroom allowed
943     res.count.shouldBeSmallerThan(50);
944 
945     res.filter!(a => a.type == DrainElement.Type.stdout)
946         .map!(a => a.data)
947         .joiner
948         .count
949         .shouldEqual(30);
950     res.filter!(a => a.type == DrainElement.Type.stderr).count.shouldBeGreaterThan(0);
951     p.wait.shouldEqual(0);
952     p.terminated.shouldBeTrue;
953 }
954 
955 @("shall kill the process tree when the timeout is reached")
956 unittest {
957     immutable script = makeScript(`#!/bin/bash
958 sleep 10m
959 `);
960     scope (exit)
961         remove(script);
962 
963     auto p = pipeProcess([script]).sandbox.timeout(1.dur!"seconds").scopeKill;
964     waitUntilChildren(p.osHandle, 1);
965     const preChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length;
966     const res = p.process.drain(1.dur!"minutes").array;
967     const postChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length;
968 
969     p.wait.shouldEqual(-9);
970     p.terminated.shouldBeTrue;
971     preChildren.shouldEqual(1);
972     postChildren.shouldEqual(0);
973 }
974 
975 string makeScript(string script, string file = __FILE__, uint line = __LINE__) {
976     import core.sys.posix.sys.stat;
977     import std.file : getAttributes, setAttributes, thisExePath;
978     import std.stdio : File;
979     import std.path : baseName;
980     import std.conv : to;
981 
982     immutable fname = thisExePath ~ "_" ~ file.baseName ~ line.to!string ~ ".sh";
983 
984     File(fname, "w").writeln(script);
985     setAttributes(fname, getAttributes(fname) | S_IXUSR | S_IXGRP | S_IXOTH);
986     return fname;
987 }
988 
989 /// Wait for p to have num children or fail after 10s.
990 void waitUntilChildren(RawPid p, int num) {
991     import std.datetime : Clock;
992 
993     const failAt = Clock.currTime + 10.dur!"seconds";
994     do {
995         Thread.sleep(50.dur!"msecs");
996         if (Clock.currTime > failAt)
997             break;
998     }
999     while (makePidMap.getSubMap(p).remove(p).length < num);
1000 }