1 /**
2 Copyright: Copyright (c) 2020, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 */
6 module proc.process;
7 
8 import core.sys.posix.signal : SIGKILL;
9 import core.thread : Thread;
10 import core.time : dur, Duration;
11 import logger = std.experimental.logger;
12 import std.algorithm : filter, count, joiner, map;
13 import std.array : appender, empty, array;
14 import std.exception : collectException;
15 import std.stdio : File, fileno, writeln;
16 import std.typecons : Flag, Yes;
17 static import std.process;
18 static import std.stdio;
19 
20 import my.gc.refc;
21 import my.from_;
22 import my.path;
23 import my.named_type;
24 
25 public import proc.channel;
26 public import proc.pid;
27 public import proc.tty;
28 
29 version (unittest) {
30     import std.file : remove;
31 }
32 
33 /** Manage a process by reference counting so that it is terminated when the it
34  * stops being used such as the instance going out of scope.
35  */
36 auto rcKill(T)(T p, int signal = SIGKILL) {
37     return refCounted(ScopeKill!T(p, signal));
38 }
39 
40 // backward compatibility.
41 alias scopeKill = rcKill;
42 
43 struct ScopeKill(T) {
44     T process;
45     alias process this;
46 
47     private int signal = SIGKILL;
48     private bool hasProcess;
49 
50     this(T process, int signal) @safe {
51         this.process = process;
52         this.signal = signal;
53         this.hasProcess = true;
54     }
55 
56     ~this() @safe {
57         if (hasProcess)
58             process.dispose();
59     }
60 }
61 
62 /// Async process wrapper for a std.process SpawnProcess
63 struct SpawnProcess {
64     import core.sys.posix.signal : SIGKILL;
65     import std.algorithm : among;
66 
67     private {
68         enum State {
69             running,
70             terminated,
71             exitCode
72         }
73 
74         std.process.Pid process;
75         RawPid pid;
76         int status_;
77         State st;
78     }
79 
80     this(std.process.Pid process) @safe {
81         this.process = process;
82         this.pid = process.osHandle.RawPid;
83     }
84 
85     ~this() @safe {
86     }
87 
88     /// Returns: The raw OS handle for the process ID.
89     RawPid osHandle() nothrow @safe {
90         return pid;
91     }
92 
93     /// Kill and cleanup the process.
94     void dispose() @safe {
95         final switch (st) {
96         case State.running:
97             this.kill;
98             this.wait;
99             break;
100         case State.terminated:
101             this.wait;
102             break;
103         case State.exitCode:
104             break;
105         }
106 
107         st = State.exitCode;
108     }
109 
110     /** Send `signal` to the process.
111      *
112      * Param:
113      *  signal = a signal from `core.sys.posix.signal`
114      */
115     void kill(int signal = SIGKILL) nothrow @trusted {
116         final switch (st) {
117         case State.running:
118             break;
119         case State.terminated:
120             goto case;
121         case State.exitCode:
122             return;
123         }
124 
125         try {
126             std.process.kill(process, signal);
127         } catch (Exception e) {
128         }
129 
130         st = State.terminated;
131     }
132 
133     /// Blocking wait for the process to terminated.
134     /// Returns: the exit status.
135     int wait() @safe {
136         final switch (st) {
137         case State.running:
138             status_ = std.process.wait(process);
139             break;
140         case State.terminated:
141             status_ = std.process.wait(process);
142             break;
143         case State.exitCode:
144             break;
145         }
146 
147         st = State.exitCode;
148 
149         return status_;
150     }
151 
152     /// Non-blocking wait for the process termination.
153     /// Returns: `true` if the process has terminated.
154     bool tryWait() @safe {
155         final switch (st) {
156         case State.running:
157             auto s = std.process.tryWait(process);
158             if (s.terminated) {
159                 st = State.exitCode;
160                 status_ = s.status;
161             }
162             break;
163         case State.terminated:
164             status_ = std.process.wait(process);
165             st = State.exitCode;
166             break;
167         case State.exitCode:
168             break;
169         }
170 
171         return st.among(State.terminated, State.exitCode) != 0;
172     }
173 
174     /// Returns: The exit status of the process.
175     int status() @safe {
176         if (st != State.exitCode) {
177             throw new Exception(
178                     "Process has not terminated and wait/tryWait been called to collect the exit status");
179         }
180         return status_;
181     }
182 
183     /// Returns: If the process has terminated.
184     bool terminated() @safe {
185         return st.among(State.terminated, State.exitCode) != 0;
186     }
187 }
188 
189 /// Async process that do not block on read from stdin/stderr.
190 struct PipeProcess {
191     import std.algorithm : among;
192     import core.sys.posix.signal : SIGKILL;
193 
194     private {
195         enum State {
196             running,
197             terminated,
198             exitCode
199         }
200 
201         std.process.ProcessPipes process;
202         std.process.Pid pid;
203 
204         FileReadChannel stderr_;
205         FileReadChannel stdout_;
206         FileWriteChannel stdin_;
207         int status_;
208         State st;
209     }
210 
211     this(std.process.Pid pid, File stdin, File stdout, File stderr) @safe {
212         this.pid = pid;
213 
214         this.stdin_ = FileWriteChannel(stdin);
215         this.stdout_ = FileReadChannel(stdout);
216         this.stderr_ = FileReadChannel(stderr);
217     }
218 
219     this(std.process.ProcessPipes process, std.process.Redirect r) @safe {
220         this.process = process;
221         this.pid = process.pid;
222 
223         if (r & std.process.Redirect.stdin) {
224             stdin_ = FileWriteChannel(this.process.stdin);
225         }
226         if (r & std.process.Redirect.stdout) {
227             stdout_ = FileReadChannel(this.process.stdout);
228         }
229         if (r & std.process.Redirect.stderr) {
230             this.stderr_ = FileReadChannel(this.process.stderr);
231         }
232     }
233 
234     /// Returns: The raw OS handle for the process ID.
235     RawPid osHandle() nothrow @safe {
236         return pid.osHandle.RawPid;
237     }
238 
239     /// Access to stdout.
240     ref FileWriteChannel stdin() return scope nothrow @safe {
241         return stdin_;
242     }
243 
244     /// Access to stdout.
245     ref FileReadChannel stdout() return scope nothrow @safe {
246         return stdout_;
247     }
248 
249     /// Access stderr.
250     ref FileReadChannel stderr() return scope nothrow @safe {
251         return stderr_;
252     }
253 
254     /// Kill and cleanup the process.
255     void dispose() @safe {
256         final switch (st) {
257         case State.running:
258             this.kill;
259             this.wait;
260             .destroy(process);
261             break;
262         case State.terminated:
263             this.wait;
264             .destroy(process);
265             break;
266         case State.exitCode:
267             break;
268         }
269 
270         st = State.exitCode;
271     }
272 
273     /** Send `signal` to the process.
274      *
275      * Param:
276      *  signal = a signal from `core.sys.posix.signal`
277      */
278     void kill(int signal = SIGKILL) nothrow @trusted {
279         final switch (st) {
280         case State.running:
281             break;
282         case State.terminated:
283             return;
284         case State.exitCode:
285             return;
286         }
287 
288         try {
289             std.process.kill(pid, signal);
290         } catch (Exception e) {
291         }
292 
293         st = State.terminated;
294     }
295 
296     /// Blocking wait for the process to terminated.
297     /// Returns: the exit status.
298     int wait() @safe {
299         final switch (st) {
300         case State.running:
301             status_ = std.process.wait(pid);
302             break;
303         case State.terminated:
304             status_ = std.process.wait(pid);
305             break;
306         case State.exitCode:
307             break;
308         }
309 
310         st = State.exitCode;
311 
312         return status_;
313     }
314 
315     /// Non-blocking wait for the process termination.
316     /// Returns: `true` if the process has terminated.
317     bool tryWait() @safe {
318         final switch (st) {
319         case State.running:
320             auto s = std.process.tryWait(pid);
321             if (s.terminated) {
322                 st = State.exitCode;
323                 status_ = s.status;
324             }
325             break;
326         case State.terminated:
327             status_ = std.process.wait(pid);
328             st = State.exitCode;
329             break;
330         case State.exitCode:
331             break;
332         }
333 
334         return st.among(State.terminated, State.exitCode) != 0;
335     }
336 
337     /// Returns: The exit status of the process.
338     int status() @safe {
339         if (st != State.exitCode) {
340             throw new Exception(
341                     "Process has not terminated and wait/tryWait been called to collect the exit status");
342         }
343         return status_;
344     }
345 
346     /// Returns: If the process has terminated.
347     bool terminated() @safe {
348         return st.among(State.terminated, State.exitCode) != 0;
349     }
350 }
351 
352 SpawnProcess spawnProcess(scope const(char[])[] args, File stdin = std.stdio.stdin,
353         File stdout = std.stdio.stdout, File stderr = std.stdio.stderr,
354         const string[string] env = null, std.process.Config config = std.process.Config.none,
355         scope const char[] workDir = null) {
356     return SpawnProcess(std.process.spawnProcess(args, stdin, stdout, stderr,
357             env, config, workDir));
358 }
359 
360 SpawnProcess spawnProcess(scope const(char[])[] args, const string[string] env,
361         std.process.Config config = std.process.Config.none, scope const(char)[] workDir = null) {
362     return SpawnProcess(std.process.spawnProcess(args, std.stdio.stdin,
363             std.stdio.stdout, std.stdio.stderr, env, config, workDir));
364 }
365 
366 SpawnProcess spawnProcess(scope const(char)[] program,
367         File stdin = std.stdio.stdin, File stdout = std.stdio.stdout,
368         File stderr = std.stdio.stderr, const string[string] env = null,
369         std.process.Config config = std.process.Config.none, scope const(char)[] workDir = null) {
370     return SpawnProcess(std.process.spawnProcess((&program)[0 .. 1], stdin,
371             stdout, stderr, env, config, workDir));
372 }
373 
374 SpawnProcess spawnShell(scope const(char)[] command, File stdin = std.stdio.stdin,
375         File stdout = std.stdio.stdout, File stderr = std.stdio.stderr,
376         scope const string[string] env = null, std.process.Config config = std.process.Config.none,
377         scope const(char)[] workDir = null, scope string shellPath = std.process.nativeShell) {
378     return SpawnProcess(std.process.spawnShell(command, stdin, stdout, stderr,
379             env, config, workDir, shellPath));
380 }
381 
382 /// ditto
383 SpawnProcess spawnShell(scope const(char)[] command, scope const string[string] env,
384         std.process.Config config = std.process.Config.none,
385         scope const(char)[] workDir = null, scope string shellPath = std.process.nativeShell) {
386     return SpawnProcess(std.process.spawnShell(command, env, config, workDir, shellPath));
387 }
388 
389 PipeProcess pipeProcess(scope const(char[])[] args,
390         std.process.Redirect redirect = std.process.Redirect.all,
391         const string[string] env = null, std.process.Config config = std.process.Config.none,
392         scope const(char)[] workDir = null) @safe {
393     return PipeProcess(std.process.pipeProcess(args, redirect, env, config, workDir), redirect);
394 }
395 
396 PipeProcess pipeShell(scope const(char)[] command,
397         std.process.Redirect redirect = std.process.Redirect.all,
398         const string[string] env = null, std.process.Config config = std.process.Config.none,
399         scope const(char)[] workDir = null, string shellPath = std.process.nativeShell) @safe {
400     return PipeProcess(std.process.pipeShell(command, redirect, env, config,
401             workDir, shellPath), redirect);
402 }
403 
404 /** Moves the process to a separate process group and on exit kill it and all
405  * its children.
406  */
407 @safe struct Sandbox(ProcessT) {
408     import core.sys.posix.signal : SIGKILL;
409 
410     private {
411         ProcessT p;
412         RawPid pid;
413     }
414 
415     this(ProcessT p) @safe {
416         import core.sys.posix.unistd : setpgid;
417 
418         this.p = p;
419         this.pid = p.osHandle;
420         setpgid(pid, 0);
421     }
422 
423     RawPid osHandle() nothrow @safe {
424         return pid;
425     }
426 
427     static if (__traits(hasMember, ProcessT, "stdin")) {
428         ref FileWriteChannel stdin() nothrow @safe {
429             return p.stdin;
430         }
431     }
432 
433     static if (__traits(hasMember, ProcessT, "stdout")) {
434         ref FileReadChannel stdout() nothrow @safe {
435             return p.stdout;
436         }
437     }
438 
439     static if (__traits(hasMember, ProcessT, "stderr")) {
440         ref FileReadChannel stderr() nothrow @safe {
441             return p.stderr;
442         }
443     }
444 
445     void dispose() @safe {
446         // this also reaps the children thus cleaning up zombies
447         this.kill;
448         p.dispose;
449     }
450 
451     /** Send `signal` to the process.
452      *
453      * Param:
454      *  signal = a signal from `core.sys.posix.signal`
455      */
456     void kill(int signal = SIGKILL) nothrow @safe {
457         // must first retrieve the submap because after the process is killed
458         // its children may have changed.
459         auto pmap = makePidMap.getSubMap(pid);
460 
461         p.kill(signal);
462 
463         // only kill and reap the children
464         pmap.remove(pid);
465         proc.pid.kill(pmap, Yes.onlyCurrentUser, signal).reap;
466     }
467 
468     int wait() @safe {
469         return p.wait;
470     }
471 
472     bool tryWait() @safe {
473         return p.tryWait;
474     }
475 
476     int status() @safe {
477         return p.status;
478     }
479 
480     bool terminated() @safe {
481         return p.terminated;
482     }
483 }
484 
485 auto sandbox(T)(T p) @safe {
486     return Sandbox!T(p);
487 }
488 
489 @("shall terminate a group of processes")
490 unittest {
491     import std.datetime.stopwatch : StopWatch, AutoStart;
492 
493     immutable scriptName = makeScript(`#!/bin/bash
494 sleep 10m &
495 sleep 10m &
496 sleep 10m
497 `);
498     scope (exit)
499         remove(scriptName);
500 
501     auto p = pipeProcess([scriptName]).sandbox.rcKill;
502     waitUntilChildren(p.osHandle, 3);
503     const preChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length;
504     p.kill;
505     Thread.sleep(500.dur!"msecs"); // wait for the OS to kill the children
506     const postChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length;
507 
508     assert(p.wait == -9);
509     assert(p.terminated);
510     assert(preChildren == 3);
511     assert(postChildren == 0);
512 }
513 
514 /** dispose the process after the timeout.
515  */
516 @safe struct Timeout(ProcessT) {
517     import core.sys.posix.signal : SIGKILL;
518     import core.thread;
519     import std.algorithm : among;
520     import std.datetime : Clock, Duration;
521 
522     private {
523         enum Msg {
524             none,
525             stop,
526             status,
527         }
528 
529         enum Reply {
530             none,
531             running,
532             normalDeath,
533             killedByTimeout,
534         }
535 
536         static struct Payload {
537             ProcessT p;
538             RawPid pid;
539             Background background;
540             Reply backgroundReply;
541         }
542 
543         RefCounted!Payload rc;
544     }
545 
546     this(ProcessT p, Duration timeout) @trusted {
547         import std.algorithm : move;
548 
549         auto pid = p.osHandle;
550         rc = refCounted(Payload(move(p), pid));
551         rc.background = new Background(&rc.p, timeout);
552         rc.background.isDaemon = true;
553         rc.background.start;
554     }
555 
556     ~this() @trusted {
557         rc.release;
558     }
559 
560     private static class Background : Thread {
561         import core.sync.condition : Condition;
562         import core.sync.mutex : Mutex;
563 
564         Duration timeout;
565         ProcessT* p;
566         Mutex mtx;
567         Msg[] msg;
568         Reply reply_;
569         RawPid pid;
570         int signal = SIGKILL;
571 
572         this(ProcessT* p, Duration timeout) {
573             this.p = p;
574             this.timeout = timeout;
575             this.mtx = new Mutex();
576             this.pid = p.osHandle;
577 
578             super(&run);
579         }
580 
581         void run() {
582             checkProcess(this.pid, this.timeout, this);
583         }
584 
585         void put(Msg msg) @trusted nothrow {
586             this.mtx.lock_nothrow();
587             scope (exit)
588                 this.mtx.unlock_nothrow();
589             this.msg ~= msg;
590         }
591 
592         Msg popMsg() @trusted nothrow {
593             this.mtx.lock_nothrow();
594             scope (exit)
595                 this.mtx.unlock_nothrow();
596             if (msg.empty)
597                 return Msg.none;
598             auto rval = msg[$ - 1];
599             msg = msg[0 .. $ - 1];
600             return rval;
601         }
602 
603         void setReply(Reply reply_) @trusted nothrow {
604             this.mtx.lock_nothrow();
605             scope (exit)
606                 this.mtx.unlock_nothrow();
607             this.reply_ = reply_;
608         }
609 
610         Reply reply() @trusted nothrow {
611             this.mtx.lock_nothrow();
612             scope (exit)
613                 this.mtx.unlock_nothrow();
614             return reply_;
615         }
616 
617         void setSignal(int signal) @trusted nothrow {
618             this.mtx.lock_nothrow();
619             scope (exit)
620                 this.mtx.unlock_nothrow();
621             this.signal = signal;
622         }
623 
624         void kill() @trusted nothrow {
625             this.mtx.lock_nothrow();
626             scope (exit)
627                 this.mtx.unlock_nothrow();
628             p.kill(signal);
629         }
630     }
631 
632     private static void checkProcess(RawPid p, Duration timeout, Background bg) nothrow {
633         import std.algorithm : max, min;
634         import std.variant : Variant;
635         static import core.sys.posix.signal;
636 
637         const stopAt = Clock.currTime + timeout;
638         // the purpose is to poll the process often "enough" that if it
639         // terminates early `Process` detects it fast enough. 1000 is chosen
640         // because it "feels good". the purpose
641         auto sleepInterval = min(50, max(1, timeout.total!"msecs" / 1000)).dur!"msecs";
642 
643         bool forceStop;
644         bool running = true;
645         while (running && Clock.currTime < stopAt) {
646             const msg = bg.popMsg;
647 
648             final switch (msg) {
649             case Msg.none:
650                 () @trusted { Thread.sleep(sleepInterval); }();
651                 break;
652             case Msg.stop:
653                 forceStop = true;
654                 running = false;
655                 break;
656             case Msg.status:
657                 bg.setReply(Reply.running);
658                 break;
659             }
660 
661             () @trusted {
662                 if (core.sys.posix.signal.kill(p, 0) == -1) {
663                     running = false;
664                 }
665             }();
666         }
667 
668         // may be children alive thus must ensure that the whole process tree
669         // is killed if this is a sandbox with a timeout.
670         bg.kill();
671 
672         if (!forceStop && Clock.currTime >= stopAt) {
673             bg.setReply(Reply.killedByTimeout);
674         } else {
675             bg.setReply(Reply.normalDeath);
676         }
677     }
678 
679     RawPid osHandle() nothrow @trusted {
680         return rc.pid;
681     }
682 
683     static if (__traits(hasMember, ProcessT, "stdin")) {
684         ref FileWriteChannel stdin() nothrow @safe {
685             return rc.p.stdin;
686         }
687     }
688 
689     static if (__traits(hasMember, ProcessT, "stdout")) {
690         ref FileReadChannel stdout() nothrow @safe {
691             return rc.p.stdout;
692         }
693     }
694 
695     static if (__traits(hasMember, ProcessT, "stderr")) {
696         ref FileReadChannel stderr() nothrow @trusted {
697             return rc.p.stderr;
698         }
699     }
700 
701     void dispose() @trusted {
702         if (rc.backgroundReply.among(Reply.none, Reply.running)) {
703             rc.background.put(Msg.stop);
704             rc.background.join;
705             rc.backgroundReply = rc.background.reply;
706         }
707         rc.p.dispose;
708     }
709 
710     /** Send `signal` to the process.
711      *
712      * Param:
713      *  signal = a signal from `core.sys.posix.signal`
714      */
715     void kill(int signal = SIGKILL) nothrow @trusted {
716         rc.background.setSignal(signal);
717         rc.background.kill();
718     }
719 
720     int wait() @trusted {
721         while (!this.tryWait) {
722             Thread.sleep(20.dur!"msecs");
723         }
724         return rc.p.wait;
725     }
726 
727     bool tryWait() @trusted {
728         return rc.p.tryWait;
729     }
730 
731     int status() @trusted {
732         return rc.p.status;
733     }
734 
735     bool terminated() @trusted {
736         return rc.p.terminated;
737     }
738 
739     bool timeoutTriggered() @trusted {
740         if (rc.backgroundReply.among(Reply.none, Reply.running)) {
741             rc.background.put(Msg.status);
742             rc.backgroundReply = rc.background.reply;
743         }
744         return rc.backgroundReply == Reply.killedByTimeout;
745     }
746 }
747 
748 auto timeout(T)(T p, Duration timeout_) @trusted {
749     return Timeout!T(p, timeout_);
750 }
751 
752 /// Returns when the process has pending data.
753 void waitForPendingData(ProcessT)(Process p) {
754     while (!p.pipe.hasPendingData || !p.stderr.hasPendingData) {
755         Thread.sleep(20.dur!"msecs");
756     }
757 }
758 
759 @("shall kill the process after the timeout")
760 unittest {
761     import std.datetime.stopwatch : StopWatch, AutoStart;
762 
763     auto p = pipeProcess(["sleep", "1m"]).timeout(100.dur!"msecs").rcKill;
764     auto sw = StopWatch(AutoStart.yes);
765     p.wait;
766     sw.stop;
767 
768     assert(sw.peek >= 100.dur!"msecs");
769     assert(sw.peek <= 500.dur!"msecs");
770     assert(p.wait == -9);
771     assert(p.terminated);
772     assert(p.status == -9);
773     assert(p.timeoutTriggered);
774 }
775 
776 struct DrainElement {
777     enum Type {
778         stdout,
779         stderr,
780     }
781 
782     Type type;
783     const(ubyte)[] data;
784 
785     /// Returns: iterates the data as an input range.
786     auto byUTF8() @safe pure nothrow const @nogc {
787         static import std.utf;
788 
789         return std.utf.byUTF!(const(char))(cast(const(char)[]) data);
790     }
791 
792     bool empty() @safe pure nothrow const @nogc {
793         return data.length == 0;
794     }
795 }
796 
797 /** A range that drains a process stdout/stderr until it terminates.
798  *
799  * There may be `DrainElement` that are empty.
800  */
801 struct DrainRange(ProcessT) {
802     private {
803         enum State {
804             start,
805             draining,
806             lastStdout,
807             lastStderr,
808             lastElement,
809             empty,
810         }
811 
812         ProcessT p;
813         DrainElement front_;
814         State st;
815         ubyte[] buf;
816     }
817 
818     this(ProcessT p) {
819         this.p = p;
820         this.buf = new ubyte[4096];
821     }
822 
823     DrainElement front() @safe pure nothrow const @nogc {
824         assert(!empty, "Can't get front of an empty range");
825         return front_;
826     }
827 
828     void popFront() @safe {
829         assert(!empty, "Can't pop front of an empty range");
830 
831         static bool isAnyPipeOpen(ref ProcessT p) {
832             return p.stdout.isOpen || p.stderr.isOpen;
833         }
834 
835         DrainElement readData(ref ProcessT p) @safe {
836             if (p.stderr.hasPendingData) {
837                 return DrainElement(DrainElement.Type.stderr, p.stderr.read(buf));
838             } else if (p.stdout.hasPendingData) {
839                 return DrainElement(DrainElement.Type.stdout, p.stdout.read(buf));
840             }
841             return DrainElement.init;
842         }
843 
844         DrainElement waitUntilData() @safe {
845             import std.datetime : Clock;
846 
847             // may livelock if the process never terminates and never writes to
848             // the terminal. timeout ensure that it sooner or later is break
849             // the loop. This is important if the drain is part of a timeout
850             // wrapping.
851 
852             const timeout = 100.dur!"msecs";
853             const stopAt = Clock.currTime + timeout;
854             const sleepFor = timeout / 20;
855             const useSleep = Clock.currTime + sleepFor;
856             bool running = true;
857             while (running) {
858                 const now = Clock.currTime;
859 
860                 running = (now < stopAt) && isAnyPipeOpen(p);
861 
862                 auto bufRead = readData(p);
863 
864                 if (!bufRead.empty) {
865                     return DrainElement(bufRead.type, bufRead.data.dup);
866                 } else if (running && now > useSleep && bufRead.empty) {
867                     import core.thread : Thread;
868 
869                     () @trusted { Thread.sleep(sleepFor); }();
870                 }
871             }
872 
873             return DrainElement.init;
874         }
875 
876         front_ = DrainElement.init;
877 
878         final switch (st) {
879         case State.start:
880             st = State.draining;
881             front_ = waitUntilData;
882             break;
883         case State.draining:
884             if (p.terminated) {
885                 st = State.lastStdout;
886             } else if (isAnyPipeOpen(p)) {
887                 front_ = waitUntilData();
888             } else {
889                 st = State.lastStdout;
890             }
891             break;
892         case State.lastStdout:
893             if (p.stdout.hasPendingData) {
894                 front_ = DrainElement(DrainElement.Type.stdout, p.stdout.read(buf).dup);
895             } else {
896                 st = State.lastStderr;
897             }
898             break;
899         case State.lastStderr:
900             if (p.stderr.hasPendingData) {
901                 front_ = DrainElement(DrainElement.Type.stderr, p.stderr.read(buf).dup);
902             } else {
903                 st = State.lastElement;
904             }
905             break;
906         case State.lastElement:
907             st = State.empty;
908             break;
909         case State.empty:
910             break;
911         }
912     }
913 
914     bool empty() @safe pure nothrow const @nogc {
915         return st == State.empty;
916     }
917 }
918 
919 /// Drain a process pipe until empty.
920 auto drain(T)(T p) {
921     return DrainRange!T(p);
922 }
923 
924 /// Read the data from a ReadChannel by line.
925 struct DrainByLineCopyRange(ProcessT) {
926     private {
927         enum State {
928             start,
929             draining,
930             lastLine,
931             lastBuf,
932             empty,
933         }
934 
935         ProcessT process;
936         DrainRange!ProcessT range;
937         State st;
938         const(ubyte)[] buf;
939         const(char)[] line;
940     }
941 
942     this(ProcessT p) {
943         process = p;
944         range = p.drain;
945     }
946 
947     string front() @trusted pure nothrow const @nogc {
948         import std.exception : assumeUnique;
949 
950         assert(!empty, "Can't get front of an empty range");
951         return line.assumeUnique;
952     }
953 
954     void popFront() @safe {
955         assert(!empty, "Can't pop front of an empty range");
956         import std.algorithm : countUntil;
957         import std.array : array;
958         static import std.utf;
959 
960         const(ubyte)[] updateBuf(size_t idx) {
961             const(ubyte)[] tmp;
962             if (buf.empty) {
963                 // do nothing
964             } else if (idx == -1) {
965                 tmp = buf;
966                 buf = null;
967             } else {
968                 idx = () {
969                     if (idx < buf.length) {
970                         return idx + 1;
971                     }
972                     return idx;
973                 }();
974                 tmp = buf[0 .. idx];
975                 buf = buf[idx .. $];
976             }
977 
978             if (!tmp.empty && tmp[$ - 1] == '\n') {
979                 tmp = tmp[0 .. $ - 1];
980             }
981             return tmp;
982         }
983 
984         void drainLine() {
985             void fillBuf() {
986                 if (!range.empty) {
987                     range.popFront;
988                 }
989                 if (!range.empty) {
990                     buf ~= range.front.data;
991                 }
992             }
993 
994             size_t idx;
995             () {
996                 int cnt;
997                 do {
998                     fillBuf();
999                     idx = buf.countUntil('\n');
1000                     // 2 is a magic number which mean that it at most wait 2x timeout for data
1001                 }
1002                 while (!range.empty && idx == -1 && cnt++ < 2);
1003             }();
1004 
1005             if (idx != -1) {
1006                 auto tmp = updateBuf(idx);
1007                 line = std.utf.byUTF!(const(char))(cast(const(char)[]) tmp).array;
1008             }
1009         }
1010 
1011         bool lastLine() {
1012             size_t idx = buf.countUntil('\n');
1013             if (idx == -1)
1014                 return true;
1015 
1016             auto tmp = updateBuf(idx);
1017             line = std.utf.byUTF!(const(char))(cast(const(char)[]) tmp).array;
1018             return false;
1019         }
1020 
1021         line = null;
1022         final switch (st) {
1023         case State.start:
1024             drainLine;
1025             st = State.draining;
1026             break;
1027         case State.draining:
1028             drainLine;
1029             if (range.empty)
1030                 st = State.lastLine;
1031             break;
1032         case State.lastLine:
1033             if (lastLine)
1034                 st = State.lastBuf;
1035             break;
1036         case State.lastBuf:
1037             line = std.utf.byUTF!(const(char))(cast(const(char)[]) buf).array;
1038             st = State.empty;
1039             break;
1040         case State.empty:
1041             break;
1042         }
1043     }
1044 
1045     bool empty() @safe pure nothrow const @nogc {
1046         return st == State.empty;
1047     }
1048 }
1049 
1050 @("shall drain the process output by line")
1051 unittest {
1052     import std.algorithm : filter, joiner, map;
1053     import std.array : array;
1054 
1055     auto p = pipeProcess(["dd", "if=/dev/zero", "bs=10", "count=3"]).rcKill;
1056     auto res = p.process.drainByLineCopy.filter!"!a.empty".array;
1057 
1058     assert(res.length == 3);
1059     assert(res.joiner.count >= 30);
1060     assert(p.wait == 0);
1061     assert(p.terminated);
1062 }
1063 
1064 auto drainByLineCopy(T)(T p) {
1065     return DrainByLineCopyRange!T(p);
1066 }
1067 
1068 /// Drain the process output until it is done executing.
1069 auto drainToNull(T)(T p) {
1070     foreach (l; p.drain()) {
1071     }
1072     return p;
1073 }
1074 
1075 /// Drain the output from the process into an output range.
1076 auto drain(ProcessT, T)(ProcessT p, ref T range) {
1077     foreach (l; p.drain()) {
1078         range.put(l);
1079     }
1080     return p;
1081 }
1082 
1083 @("shall drain the output of a process while it is running with a separation of stdout and stderr")
1084 unittest {
1085     auto p = pipeProcess(["dd", "if=/dev/urandom", "bs=10", "count=3"]).rcKill;
1086     auto res = p.drain.array;
1087 
1088     // this is just a sanity check. It has to be kind a high because there is
1089     // some wiggleroom allowed
1090     assert(res.count > 1 && res.count <= 50);
1091 
1092     assert(res.filter!(a => a.type == DrainElement.Type.stdout)
1093             .map!(a => a.data)
1094             .joiner
1095             .count == 30);
1096     assert(p.wait == 0);
1097     assert(p.terminated);
1098 }
1099 
1100 @("shall kill the process tree when the timeout is reached")
1101 unittest {
1102     immutable script = makeScript(`#!/bin/bash
1103 sleep 10m
1104 `);
1105     scope (exit)
1106         remove(script);
1107 
1108     auto p = pipeProcess([script]).sandbox.timeout(1.dur!"seconds").rcKill;
1109     waitUntilChildren(p.osHandle, 1);
1110     const preChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length;
1111     const res = p.process.drain.array;
1112     const postChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length;
1113 
1114     assert(p.wait == -9);
1115     assert(p.terminated);
1116     assert(preChildren == 1);
1117     assert(postChildren == 0);
1118 }
1119 
1120 string makeScript(string script, string file = __FILE__, uint line = __LINE__) {
1121     import core.sys.posix.sys.stat;
1122     import std.file : getAttributes, setAttributes, thisExePath;
1123     import std.stdio : File;
1124     import std.path : baseName;
1125     import std.conv : to;
1126 
1127     immutable fname = thisExePath ~ "_" ~ file.baseName ~ line.to!string ~ ".sh";
1128 
1129     File(fname, "w").writeln(script);
1130     setAttributes(fname, getAttributes(fname) | S_IXUSR | S_IXGRP | S_IXOTH);
1131     return fname;
1132 }
1133 
1134 /// Wait for p to have num children or fail after 10s.
1135 void waitUntilChildren(RawPid p, int num) {
1136     import std.datetime : Clock;
1137 
1138     const failAt = Clock.currTime + 10.dur!"seconds";
1139     do {
1140         Thread.sleep(50.dur!"msecs");
1141         if (Clock.currTime > failAt)
1142             break;
1143     }
1144     while (makePidMap.getSubMap(p).remove(p).length < num);
1145 }
1146 
1147 alias Address = NamedType!(ulong, Tag!"Address", ulong.init, TagStringable);
1148 struct AddressMap {
1149     Address begin;
1150     Address end;
1151     NamedType!(string, Tag!"Permission", string.init, TagStringable) perm;
1152 }
1153 
1154 /** An assoc array mapping the path of each shared library loaded by
1155  * the process to the address it is loaded at in the process address space.
1156  */
1157 AddressMap[][AbsolutePath] libs(RawPid pid) nothrow {
1158     import std.stdio : File;
1159     import std.format : formattedRead, format;
1160     import std.algorithm : countUntil;
1161     import std.typecons : tuple;
1162     import std.string : strip;
1163 
1164     typeof(return) rval;
1165 
1166     try {
1167         foreach (l; File(format!"/proc/%s/maps"(pid)).byLine
1168                 .map!(a => tuple(a, a.countUntil('/')))
1169                 .filter!(a => a[1] != -1)) {
1170             try {
1171                 auto p = AbsolutePath(l[0][l[1] .. $].idup);
1172                 auto m = l[0][0 .. l[1]].strip;
1173                 ulong begin;
1174                 ulong end;
1175                 char[] perm;
1176                 if (formattedRead!"%x-%x %s "(m, begin, end, perm) != 3) {
1177                     continue;
1178                 }
1179 
1180                 auto amap = AddressMap(Address(begin), Address(end),
1181                         typeof(AddressMap.init.perm)(perm.idup));
1182                 if (auto v = p in rval) {
1183                     *v ~= amap;
1184                 } else {
1185                     rval[p] = [amap];
1186                 }
1187             } catch (Exception e) {
1188             }
1189         }
1190     } catch (Exception e) {
1191         logger.trace(e.msg).collectException;
1192     }
1193 
1194     return rval;
1195 }
1196 
1197 @("shall read the libraries the process is using")
1198 unittest {
1199     immutable scriptName = makeScript(`#!/bin/bash
1200 sleep 10m &
1201 sleep 10m &
1202 sleep 10m
1203 `);
1204     scope (exit)
1205         remove(scriptName);
1206 
1207     auto p = pipeProcess([scriptName]).sandbox.rcKill;
1208     waitUntilChildren(p.osHandle, 3);
1209     auto res = libs(p.osHandle);
1210     p.kill;
1211 
1212     assert(res.length != 0);
1213     assert(AbsolutePath("/bin/bash") in res);
1214 }
1215 
1216 /** Parses the output from a run of 'ldd' on a binary.
1217  *
1218  * Params:
1219  *  input = an input range of lines which is the output from ldd
1220  *
1221  * Example
1222  * ---
1223  * writeln(parseLddOutput(`
1224  *     linux-vdso.so.1 =>  (0x00007fffbf5fe000)
1225  *     libtinfo.so.5 => /lib/x86_64-linux-gnu/libtinfo.so.5 (0x00007fe28117f000)
1226  *     libdl.so.2 => /lib/x86_64-linux-gnu/libdl.so.2 (0x00007fe280f7b000)
1227  *     libc.so.6 => /lib/x86_64-linux-gnu/libc.so.6 (0x00007fe280bb4000)
1228  *     /lib64/ld-linux-x86-64.so.2 (0x00007fe2813dd000)
1229  * `))
1230  * ---
1231  *
1232  * Returns: a dictionary of {path: address} for each library required by the
1233  * specified binary.
1234  */
1235 Address[AbsolutePath] parseLddOutput(T)(T input) if (std_.range.isInputRange!T) {
1236     import std.regex : regex, matchFirst;
1237     import std.format : formattedRead;
1238 
1239     typeof(return) rval;
1240     const reLinux = regex(`\s(?P<lib>\S?/\S+)\s+\((?P<addr>0x.+)\)`);
1241 
1242     foreach (l; input) {
1243         auto m = matchFirst(l, reLinux);
1244         if (m.empty || m.length < 3) {
1245             continue;
1246         }
1247 
1248         try {
1249             ulong addr;
1250             formattedRead!"0x%x"(m["addr"], addr);
1251             rval[AbsolutePath(m["lib"])] = Address(addr);
1252         } catch (Exception e) {
1253         }
1254     }
1255 
1256     return rval;
1257 }
1258 
1259 @("shall parse the output of ldd")
1260 unittest {
1261     auto res = parseLddOutput([
1262             "linux-vdso.so.1 =>  (0x00007fffbf5fe000)",
1263             "libtinfo.so.5 => /lib/x86_64-linux-gnu/libtinfo.so.5 (0x00007fe28117f000)",
1264             "libdl.so.2 => /lib/x86_64-linux-gnu/libdl.so.2 (0x00007fe280f7b000)",
1265             "libc.so.6 => /lib/x86_64-linux-gnu/libc.so.6 (0x00007fe280bb4000)",
1266             "/lib64/ld-linux-x86-64.so.2 (0x00007fe2813dd000)"
1267             ]);
1268     assert(res.length == 3);
1269     assert(res[AbsolutePath("/lib/x86_64-linux-gnu/libtinfo.so.5")].get == 140610805166080);
1270 }