1 /** 2 Copyright: Copyright (c) 2020, Joakim Brännström. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 */ 6 module proc; 7 8 import core.thread : Thread; 9 import core.time : dur, Duration; 10 import logger = std.experimental.logger; 11 import std.algorithm : filter, count, joiner, map; 12 import std.array : appender, empty, array; 13 import std.exception : collectException; 14 import std.stdio : File, fileno, writeln; 15 import std.typecons : Flag, Yes; 16 static import std.process; 17 static import std.stdio; 18 19 public import proc.channel; 20 public import proc.pid; 21 22 version (unittest) { 23 import unit_threaded.assertions; 24 import std.file : remove; 25 } 26 27 /// Automatically terminate the process when it goes out of scope. 28 auto scopeKill(T)(T p) { 29 return ScopeKill!T(p); 30 } 31 32 struct ScopeKill(T) { 33 T process; 34 alias process this; 35 36 ~this() { 37 process.dispose(); 38 } 39 } 40 41 /// Async process wrapper for a std.process SpawnProcess 42 struct SpawnProcess { 43 import std.algorithm : among; 44 45 private { 46 enum State { 47 running, 48 terminated, 49 exitCode 50 } 51 52 std.process.Pid process; 53 RawPid pid; 54 int status_; 55 State st; 56 } 57 58 this(std.process.Pid process) @safe { 59 this.process = process; 60 this.pid = process.osHandle.RawPid; 61 } 62 63 /// Returns: The raw OS handle for the process ID. 64 RawPid osHandle() nothrow @safe { 65 return pid; 66 } 67 68 /// Kill and cleanup the process. 69 void dispose() @safe { 70 final switch (st) { 71 case State.running: 72 this.kill; 73 this.wait; 74 break; 75 case State.terminated: 76 this.wait; 77 break; 78 case State.exitCode: 79 break; 80 } 81 82 st = State.exitCode; 83 } 84 85 /// Kill the process. 86 void kill() nothrow @trusted { 87 import core.sys.posix.signal : SIGKILL; 88 89 final switch (st) { 90 case State.running: 91 break; 92 case State.terminated: 93 goto case; 94 case State.exitCode: 95 return; 96 } 97 98 try { 99 std.process.kill(process, SIGKILL); 100 } catch (Exception e) { 101 } 102 103 st = State.terminated; 104 } 105 106 /// Blocking wait for the process to terminated. 107 /// Returns: the exit status. 108 int wait() @safe { 109 final switch (st) { 110 case State.running: 111 status_ = std.process.wait(process); 112 break; 113 case State.terminated: 114 status_ = std.process.wait(process); 115 break; 116 case State.exitCode: 117 break; 118 } 119 120 st = State.exitCode; 121 122 return status_; 123 } 124 125 /// Non-blocking wait for the process termination. 126 /// Returns: `true` if the process has terminated. 127 bool tryWait() @safe { 128 final switch (st) { 129 case State.running: 130 auto s = std.process.tryWait(process); 131 if (s.terminated) { 132 st = State.exitCode; 133 status_ = s.status; 134 } 135 break; 136 case State.terminated: 137 status_ = std.process.wait(process); 138 st = State.exitCode; 139 break; 140 case State.exitCode: 141 break; 142 } 143 144 return st.among(State.terminated, State.exitCode) != 0; 145 } 146 147 /// Returns: The exit status of the process. 148 int status() @safe { 149 if (st != State.exitCode) { 150 throw new Exception( 151 "Process has not terminated and wait/tryWait been called to collect the exit status"); 152 } 153 return status_; 154 } 155 156 /// Returns: If the process has terminated. 157 bool terminated() @safe { 158 return st.among(State.terminated, State.exitCode) != 0; 159 } 160 } 161 162 /// Async process that do not block on read from stdin/stderr. 163 struct PipeProcess { 164 import std.algorithm : among; 165 166 private { 167 enum State { 168 running, 169 terminated, 170 exitCode 171 } 172 173 std.process.ProcessPipes process; 174 Pipe pipe_; 175 FileReadChannel stderr_; 176 int status_; 177 State st; 178 RawPid pid; 179 } 180 181 this(std.process.ProcessPipes process) @safe { 182 this.process = process; 183 this.pipe_ = Pipe(this.process.stdout, this.process.stdin); 184 this.stderr_ = FileReadChannel(this.process.stderr); 185 this.pid = process.pid.osHandle.RawPid; 186 } 187 188 /// Returns: The raw OS handle for the process ID. 189 RawPid osHandle() nothrow @safe { 190 return this.pid; 191 } 192 193 /// Access to stdin and stdout. 194 ref Pipe pipe() return scope nothrow @safe { 195 return pipe_; 196 } 197 198 /// Access stderr. 199 ref FileReadChannel stderr() return scope nothrow @safe { 200 return stderr_; 201 } 202 203 /// Kill and cleanup the process. 204 void dispose() @safe { 205 final switch (st) { 206 case State.running: 207 this.kill; 208 this.wait; 209 .destroy(process); 210 break; 211 case State.terminated: 212 this.wait; 213 .destroy(process); 214 break; 215 case State.exitCode: 216 break; 217 } 218 219 st = State.exitCode; 220 } 221 222 /// Kill the process. 223 void kill() nothrow @trusted { 224 import core.sys.posix.signal : SIGKILL; 225 226 final switch (st) { 227 case State.running: 228 break; 229 case State.terminated: 230 return; 231 case State.exitCode: 232 return; 233 } 234 235 try { 236 std.process.kill(process.pid, SIGKILL); 237 } catch (Exception e) { 238 } 239 240 st = State.terminated; 241 } 242 243 /// Blocking wait for the process to terminated. 244 /// Returns: the exit status. 245 int wait() @safe { 246 final switch (st) { 247 case State.running: 248 status_ = std.process.wait(process.pid); 249 break; 250 case State.terminated: 251 status_ = std.process.wait(process.pid); 252 break; 253 case State.exitCode: 254 break; 255 } 256 257 st = State.exitCode; 258 259 return status_; 260 } 261 262 /// Non-blocking wait for the process termination. 263 /// Returns: `true` if the process has terminated. 264 bool tryWait() @safe { 265 final switch (st) { 266 case State.running: 267 auto s = std.process.tryWait(process.pid); 268 if (s.terminated) { 269 st = State.exitCode; 270 status_ = s.status; 271 } 272 break; 273 case State.terminated: 274 status_ = std.process.wait(process.pid); 275 st = State.exitCode; 276 break; 277 case State.exitCode: 278 break; 279 } 280 281 return st.among(State.terminated, State.exitCode) != 0; 282 } 283 284 /// Returns: The exit status of the process. 285 int status() @safe { 286 if (st != State.exitCode) { 287 throw new Exception( 288 "Process has not terminated and wait/tryWait been called to collect the exit status"); 289 } 290 return status_; 291 } 292 293 /// Returns: If the process has terminated. 294 bool terminated() @safe { 295 return st.among(State.terminated, State.exitCode) != 0; 296 } 297 } 298 299 SpawnProcess spawnProcess(scope const(char[])[] args, File stdin = std.stdio.stdin, 300 File stdout = std.stdio.stdout, File stderr = std.stdio.stderr, 301 const string[string] env = null, std.process.Config config = std.process.Config.none, 302 scope const char[] workDir = null) { 303 return SpawnProcess(std.process.spawnProcess(args, stdin, stdout, stderr, 304 env, config, workDir)); 305 } 306 307 SpawnProcess spawnProcess(scope const(char[])[] args, const string[string] env, 308 std.process.Config config = std.process.Config.none, scope const(char)[] workDir = null) { 309 return SpawnProcess(std.process.spawnProcess(args, std.stdio.stdin, 310 std.stdio.stdout, std.stdio.stderr, env, config, workDir)); 311 } 312 313 SpawnProcess spawnProcess(scope const(char)[] program, 314 File stdin = std.stdio.stdin, File stdout = std.stdio.stdout, 315 File stderr = std.stdio.stderr, const string[string] env = null, 316 std.process.Config config = std.process.Config.none, scope const(char)[] workDir = null) { 317 return SpawnProcess(std.process.spawnProcess((&program)[0 .. 1], stdin, 318 stdout, stderr, env, config, workDir)); 319 } 320 321 SpawnProcess spawnShell(scope const(char)[] command, File stdin = std.stdio.stdin, 322 File stdout = std.stdio.stdout, File stderr = std.stdio.stderr, 323 scope const string[string] env = null, std.process.Config config = std.process.Config.none, 324 scope const(char)[] workDir = null, scope string shellPath = std.process.nativeShell) { 325 return SpawnProcess(std.process.spawnShell(command, stdin, stdout, stderr, 326 env, config, workDir, shellPath)); 327 } 328 329 /// ditto 330 SpawnProcess spawnShell(scope const(char)[] command, scope const string[string] env, 331 std.process.Config config = std.process.Config.none, 332 scope const(char)[] workDir = null, scope string shellPath = std.process.nativeShell) { 333 return SpawnProcess(std.process.spawnShell(command, env, config, workDir, shellPath)); 334 } 335 336 PipeProcess pipeProcess(scope const(char[])[] args, 337 std.process.Redirect redirect = std.process.Redirect.all, 338 const string[string] env = null, std.process.Config config = std.process.Config.none, 339 scope const(char)[] workDir = null) @safe { 340 return PipeProcess(std.process.pipeProcess(args, redirect, env, config, workDir)); 341 } 342 343 PipeProcess pipeShell(scope const(char)[] command, 344 std.process.Redirect redirect = std.process.Redirect.all, 345 const string[string] env = null, std.process.Config config = std.process.Config.none, 346 scope const(char)[] workDir = null, string shellPath = std.process.nativeShell) @safe { 347 return PipeProcess(std.process.pipeShell(command, redirect, env, config, workDir, shellPath)); 348 } 349 350 /** Moves the process to a separate process group and on exit kill it and all 351 * its children. 352 */ 353 struct Sandbox(ProcessT) { 354 private { 355 ProcessT p; 356 RawPid pid; 357 } 358 359 this(ProcessT p) @safe { 360 import core.sys.posix.unistd : setpgid; 361 362 this.p = p; 363 this.pid = p.osHandle; 364 setpgid(pid, 0); 365 } 366 367 RawPid osHandle() nothrow @safe { 368 return pid; 369 } 370 371 static if (__traits(hasMember, ProcessT, "pipe")) { 372 ref Pipe pipe() nothrow @safe { 373 return p.pipe; 374 } 375 } 376 377 static if (__traits(hasMember, ProcessT, "stderr")) { 378 ref FileReadChannel stderr() nothrow @safe { 379 return p.stderr; 380 } 381 } 382 383 void dispose() @safe { 384 // this also reaps the children thus cleaning up zombies 385 this.kill; 386 p.dispose; 387 } 388 389 void kill() nothrow @safe { 390 // must first retrieve the submap because after the process is killed 391 // its children may have changed. 392 auto pmap = makePidMap.getSubMap(pid); 393 394 p.kill; 395 396 // only kill and reap the children 397 pmap.remove(pid); 398 proc.pid.kill(pmap, Yes.onlyCurrentUser).reap; 399 } 400 401 int wait() @safe { 402 return p.wait; 403 } 404 405 bool tryWait() @safe { 406 return p.tryWait; 407 } 408 409 int status() @safe { 410 return p.status; 411 } 412 413 bool terminated() @safe { 414 return p.terminated; 415 } 416 } 417 418 auto sandbox(T)(T p) @safe { 419 return Sandbox!T(p); 420 } 421 422 @("shall terminate a group of processes") 423 unittest { 424 import std.datetime.stopwatch : StopWatch, AutoStart; 425 426 immutable scriptName = makeScript(`#!/bin/bash 427 sleep 10m & 428 sleep 10m & 429 sleep 10m 430 `); 431 scope (exit) 432 remove(scriptName); 433 434 auto p = pipeProcess([scriptName]).sandbox.scopeKill; 435 waitUntilChildren(p.osHandle, 3); 436 const preChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length; 437 p.kill; 438 Thread.sleep(500.dur!"msecs"); // wait for the OS to kill the children 439 const postChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length; 440 441 p.wait.shouldEqual(-9); 442 p.terminated.shouldBeTrue; 443 preChildren.shouldEqual(3); 444 postChildren.shouldEqual(0); 445 } 446 447 /** dispose the process after the timeout. 448 */ 449 struct Timeout(ProcessT) { 450 import std.algorithm : among; 451 import std.datetime : Clock, Duration; 452 import core.thread; 453 import std.typecons : RefCounted, refCounted; 454 455 private { 456 enum Msg { 457 none, 458 stop, 459 status, 460 } 461 462 enum Reply { 463 none, 464 running, 465 normalDeath, 466 killedByTimeout, 467 } 468 469 static struct Payload { 470 ProcessT p; 471 RawPid pid; 472 Background background; 473 Reply backgroundReply; 474 } 475 476 RefCounted!Payload rc; 477 } 478 479 this(ProcessT p, Duration timeout) @trusted { 480 import std.algorithm : move; 481 482 auto pid = p.osHandle; 483 rc = refCounted(Payload(move(p), pid)); 484 rc.background = new Background(&rc.p, timeout); 485 rc.background.isDaemon = true; 486 rc.background.start; 487 } 488 489 private static class Background : Thread { 490 import core.sync.condition : Condition; 491 import core.sync.mutex : Mutex; 492 493 Duration timeout; 494 ProcessT* p; 495 Mutex mtx; 496 Msg[] msg; 497 Reply reply_; 498 RawPid pid; 499 500 this(ProcessT* p, Duration timeout) { 501 this.p = p; 502 this.timeout = timeout; 503 this.mtx = new Mutex(); 504 this.pid = p.osHandle; 505 506 super(&run); 507 } 508 509 void run() { 510 checkProcess(this.pid, this.timeout, this); 511 } 512 513 void put(Msg msg) @trusted nothrow { 514 this.mtx.lock_nothrow(); 515 scope (exit) 516 this.mtx.unlock_nothrow(); 517 this.msg ~= msg; 518 } 519 520 Msg popMsg() @trusted nothrow { 521 this.mtx.lock_nothrow(); 522 scope (exit) 523 this.mtx.unlock_nothrow(); 524 if (msg.empty) 525 return Msg.none; 526 auto rval = msg[$ - 1]; 527 msg = msg[0 .. $ - 1]; 528 return rval; 529 } 530 531 void setReply(Reply reply_) @trusted nothrow { 532 this.mtx.lock_nothrow(); 533 scope (exit) 534 this.mtx.unlock_nothrow(); 535 this.reply_ = reply_; 536 } 537 538 Reply reply() @trusted nothrow { 539 this.mtx.lock_nothrow(); 540 scope (exit) 541 this.mtx.unlock_nothrow(); 542 return reply_; 543 } 544 545 void kill() @trusted nothrow { 546 this.mtx.lock_nothrow(); 547 scope (exit) 548 this.mtx.unlock_nothrow(); 549 p.kill; 550 } 551 } 552 553 private static void checkProcess(RawPid p, Duration timeout, Background bg) nothrow { 554 import core.sys.posix.signal : SIGKILL; 555 import std.algorithm : max, min; 556 import std.variant : Variant; 557 static import core.sys.posix.signal; 558 559 const stopAt = Clock.currTime + timeout; 560 // the purpose is to poll the process often "enough" that if it 561 // terminates early `Process` detects it fast enough. 1000 is chosen 562 // because it "feels good". the purpose 563 auto sleepInterval = min(500, max(20, timeout.total!"msecs" / 1000)).dur!"msecs"; 564 565 bool forceStop; 566 bool running = true; 567 while (running && Clock.currTime < stopAt) { 568 const msg = bg.popMsg; 569 570 final switch (msg) { 571 case Msg.none: 572 Thread.sleep(sleepInterval); 573 break; 574 case Msg.stop: 575 forceStop = true; 576 running = false; 577 break; 578 case Msg.status: 579 bg.setReply(Reply.running); 580 break; 581 } 582 583 if (core.sys.posix.signal.kill(p, 0) == -1) { 584 running = false; 585 } 586 } 587 588 // may be children alive thus must ensure that the whole process tree 589 // is killed if this is a sandbox with a timeout. 590 bg.kill; 591 592 if (!forceStop && Clock.currTime >= stopAt) { 593 bg.setReply(Reply.killedByTimeout); 594 } else { 595 bg.setReply(Reply.normalDeath); 596 } 597 } 598 599 RawPid osHandle() nothrow @trusted { 600 return rc.pid; 601 } 602 603 ref Pipe pipe() nothrow @trusted { 604 return rc.p.pipe; 605 } 606 607 ref FileReadChannel stderr() nothrow @trusted { 608 return rc.p.stderr; 609 } 610 611 void dispose() @trusted { 612 if (rc.backgroundReply.among(Reply.none, Reply.running)) { 613 rc.background.put(Msg.stop); 614 rc.background.join; 615 rc.backgroundReply = rc.background.reply; 616 } 617 rc.p.dispose; 618 } 619 620 void kill() nothrow @trusted { 621 rc.background.kill; 622 } 623 624 int wait() @trusted { 625 while (!this.tryWait) { 626 Thread.sleep(20.dur!"msecs"); 627 } 628 return rc.p.wait; 629 } 630 631 bool tryWait() @trusted { 632 return rc.p.tryWait; 633 } 634 635 int status() @trusted { 636 return rc.p.status; 637 } 638 639 bool terminated() @trusted { 640 return rc.p.terminated; 641 } 642 643 bool timeoutTriggered() @trusted { 644 if (rc.backgroundReply.among(Reply.none, Reply.running)) { 645 rc.background.put(Msg.status); 646 rc.backgroundReply = rc.background.reply; 647 } 648 return rc.backgroundReply == Reply.killedByTimeout; 649 } 650 } 651 652 auto timeout(T)(T p, Duration timeout_) @trusted { 653 return Timeout!T(p, timeout_); 654 } 655 656 /// Returns when the process has pending data. 657 void waitForPendingData(ProcessT)(Process p) { 658 while (!p.pipe.hasPendingData || !p.stderr.hasPendingData) { 659 Thread.sleep(20.dur!"msecs"); 660 } 661 } 662 663 @("shall kill the process after the timeout") 664 unittest { 665 import std.datetime.stopwatch : StopWatch, AutoStart; 666 667 auto p = pipeProcess(["sleep", "1m"]).timeout(100.dur!"msecs").scopeKill; 668 auto sw = StopWatch(AutoStart.yes); 669 p.wait; 670 sw.stop; 671 672 sw.peek.shouldBeGreaterThan(100.dur!"msecs"); 673 sw.peek.shouldBeSmallerThan(500.dur!"msecs"); 674 p.wait.shouldEqual(-9); 675 p.terminated.shouldBeTrue; 676 p.status.shouldEqual(-9); 677 p.timeoutTriggered.shouldBeTrue; 678 } 679 680 struct DrainElement { 681 enum Type { 682 stdout, 683 stderr, 684 } 685 686 Type type; 687 const(ubyte)[] data; 688 689 /// Returns: iterates the data as an input range. 690 auto byUTF8() @safe pure nothrow const @nogc { 691 static import std.utf; 692 693 return std.utf.byUTF!(const(char))(cast(const(char)[]) data); 694 } 695 696 bool empty() @safe pure nothrow const @nogc { 697 return data.length == 0; 698 } 699 } 700 701 /** A range that drains a process stdout/stderr until it terminates. 702 * 703 * There may be `DrainElement` that are empty. 704 */ 705 struct DrainRange(ProcessT) { 706 enum State { 707 start, 708 draining, 709 lastStdout, 710 lastStderr, 711 lastElement, 712 empty, 713 } 714 715 private { 716 Duration timeout; 717 ProcessT p; 718 DrainElement front_; 719 State st; 720 ubyte[] buf; 721 ubyte[] bufRead; 722 } 723 724 this(ProcessT p, Duration timeout) { 725 this.p = p; 726 this.buf = new ubyte[4096]; 727 this.timeout = timeout; 728 } 729 730 DrainElement front() @safe pure nothrow const @nogc { 731 assert(!empty, "Can't get front of an empty range"); 732 return front_; 733 } 734 735 void popFront() @safe { 736 assert(!empty, "Can't pop front of an empty range"); 737 738 bool isAnyPipeOpen() { 739 return (p.pipe.hasData || p.stderr.hasData) && !p.terminated; 740 } 741 742 void readData() @safe { 743 if (p.stderr.hasData && p.stderr.hasPendingData) { 744 front_ = DrainElement(DrainElement.Type.stderr); 745 bufRead = p.stderr.read(buf); 746 } else if (p.pipe.hasData && p.pipe.hasPendingData) { 747 front_ = DrainElement(DrainElement.Type.stdout); 748 bufRead = p.pipe.read(buf); 749 } 750 } 751 752 void waitUntilData() @safe { 753 // may livelock if the process never terminates and never writes to 754 // the terminal. waitTime ensure that it sooner or later is 755 // interrupted. It lets e.g the timeout handling to kill the 756 // process. 757 const s = 20.dur!"msecs"; 758 Duration waitTime; 759 while (waitTime < timeout) { 760 import core.thread : Thread; 761 import core.time : dur; 762 763 readData(); 764 if (front_.data.empty) { 765 () @trusted { Thread.sleep(s); }(); 766 waitTime += s; 767 } 768 769 if (!(bufRead.empty && isAnyPipeOpen)) { 770 front_.data = bufRead.dup; 771 break; 772 } 773 } 774 } 775 776 front_ = DrainElement.init; 777 bufRead = null; 778 779 final switch (st) { 780 case State.start: 781 st = State.draining; 782 waitUntilData; 783 break; 784 case State.draining: 785 if (isAnyPipeOpen) { 786 waitUntilData(); 787 } else { 788 st = State.lastStdout; 789 } 790 break; 791 case State.lastStdout: 792 if (p.pipe.hasData && p.pipe.hasPendingData) { 793 front_ = DrainElement(DrainElement.Type.stdout); 794 bufRead = p.pipe.read(buf); 795 } 796 797 front_.data = bufRead.dup; 798 if (!p.pipe.hasData || p.terminated) { 799 st = State.lastStderr; 800 } 801 break; 802 case State.lastStderr: 803 if (p.stderr.hasData && p.stderr.hasPendingData) { 804 front_ = DrainElement(DrainElement.Type.stderr); 805 bufRead = p.stderr.read(buf); 806 } 807 808 front_.data = bufRead.dup; 809 if (!p.stderr.hasData || p.terminated) { 810 st = State.lastElement; 811 } 812 break; 813 case State.lastElement: 814 st = State.empty; 815 break; 816 case State.empty: 817 break; 818 } 819 } 820 821 bool empty() @safe pure nothrow const @nogc { 822 return st == State.empty; 823 } 824 } 825 826 /// Drain a process pipe until empty. 827 auto drain(T)(T p, Duration timeout) { 828 return DrainRange!T(p, timeout); 829 } 830 831 /// Read the data from a ReadChannel by line. 832 struct DrainByLineCopyRange(ProcessT) { 833 private { 834 ProcessT process; 835 DrainRange!ProcessT range; 836 const(ubyte)[] buf; 837 const(char)[] line; 838 } 839 840 this(ProcessT p, Duration timeout) @safe { 841 process = p; 842 range = p.drain(timeout); 843 } 844 845 string front() @trusted pure nothrow const @nogc { 846 import std.exception : assumeUnique; 847 848 assert(!empty, "Can't get front of an empty range"); 849 return line.assumeUnique; 850 } 851 852 void popFront() @safe { 853 assert(!empty, "Can't pop front of an empty range"); 854 import std.algorithm : countUntil; 855 import std.array : array; 856 static import std.utf; 857 858 void fillBuf() { 859 if (!range.empty) { 860 range.popFront; 861 } 862 if (!range.empty) { 863 buf ~= range.front.data; 864 } 865 } 866 867 size_t idx; 868 do { 869 fillBuf(); 870 idx = buf.countUntil('\n'); 871 } 872 while (!range.empty && idx == -1); 873 874 const(ubyte)[] tmp; 875 if (buf.empty) { 876 // do nothing 877 } else if (idx == -1) { 878 tmp = buf; 879 buf = null; 880 } else { 881 idx = () { 882 if (idx < buf.length) { 883 return idx + 1; 884 } 885 return idx; 886 }(); 887 tmp = buf[0 .. idx]; 888 buf = buf[idx .. $]; 889 } 890 891 if (!tmp.empty && tmp[$ - 1] == '\n') { 892 tmp = tmp[0 .. $ - 1]; 893 } 894 895 line = std.utf.byUTF!(const(char))(cast(const(char)[]) tmp).array; 896 } 897 898 bool empty() @safe pure nothrow const @nogc { 899 return range.empty && buf.empty && line.empty; 900 } 901 } 902 903 @("shall drain the process output by line") 904 unittest { 905 import std.algorithm : filter, joiner, map; 906 import std.array : array; 907 908 auto p = pipeProcess(["dd", "if=/dev/zero", "bs=10", "count=3"]).scopeKill; 909 auto res = p.process.drainByLineCopy(1.dur!"minutes").filter!"!a.empty".array; 910 911 res.length.shouldEqual(4); 912 res.joiner.count.shouldBeGreaterThan(30); 913 p.wait.shouldEqual(0); 914 p.terminated.shouldBeTrue; 915 } 916 917 auto drainByLineCopy(T)(T p, Duration timeout) @safe { 918 return DrainByLineCopyRange!T(p, timeout); 919 } 920 921 /// Drain the process output until it is done executing. 922 auto drainToNull(T)(T p, Duration timeout) @safe { 923 foreach (l; p.drain(timeout)) { 924 } 925 return p; 926 } 927 928 /// Drain the output from the process into an output range. 929 auto drain(ProcessT, T)(ProcessT p, ref T range, Duration timeout) { 930 foreach (l; p.drain(timeout)) { 931 range.put(l); 932 } 933 return p; 934 } 935 936 @("shall drain the output of a process while it is running with a separation of stdout and stderr") 937 unittest { 938 auto p = pipeProcess(["dd", "if=/dev/urandom", "bs=10", "count=3"]).scopeKill; 939 auto res = p.process.drain(1.dur!"minutes").array; 940 941 // this is just a sanity check. It has to be kind a high because there is 942 // some wiggleroom allowed 943 res.count.shouldBeSmallerThan(50); 944 945 res.filter!(a => a.type == DrainElement.Type.stdout) 946 .map!(a => a.data) 947 .joiner 948 .count 949 .shouldEqual(30); 950 res.filter!(a => a.type == DrainElement.Type.stderr).count.shouldBeGreaterThan(0); 951 p.wait.shouldEqual(0); 952 p.terminated.shouldBeTrue; 953 } 954 955 @("shall kill the process tree when the timeout is reached") 956 unittest { 957 immutable script = makeScript(`#!/bin/bash 958 sleep 10m 959 `); 960 scope (exit) 961 remove(script); 962 963 auto p = pipeProcess([script]).sandbox.timeout(1.dur!"seconds").scopeKill; 964 waitUntilChildren(p.osHandle, 1); 965 const preChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length; 966 const res = p.process.drain(1.dur!"minutes").array; 967 const postChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length; 968 969 p.wait.shouldEqual(-9); 970 p.terminated.shouldBeTrue; 971 preChildren.shouldEqual(1); 972 postChildren.shouldEqual(0); 973 } 974 975 string makeScript(string script, string file = __FILE__, uint line = __LINE__) { 976 import core.sys.posix.sys.stat; 977 import std.file : getAttributes, setAttributes, thisExePath; 978 import std.stdio : File; 979 import std.path : baseName; 980 import std.conv : to; 981 982 immutable fname = thisExePath ~ "_" ~ file.baseName ~ line.to!string ~ ".sh"; 983 984 File(fname, "w").writeln(script); 985 setAttributes(fname, getAttributes(fname) | S_IXUSR | S_IXGRP | S_IXOTH); 986 return fname; 987 } 988 989 /// Wait for p to have num children or fail after 10s. 990 void waitUntilChildren(RawPid p, int num) { 991 import std.datetime : Clock; 992 993 const failAt = Clock.currTime + 10.dur!"seconds"; 994 do { 995 Thread.sleep(50.dur!"msecs"); 996 if (Clock.currTime > failAt) 997 break; 998 } 999 while (makePidMap.getSubMap(p).remove(p).length < num); 1000 }