1 /** 2 Copyright: Copyright (c) 2020, Joakim Brännström. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 */ 6 module proc.process; 7 8 import core.sys.posix.signal : SIGKILL; 9 import core.thread : Thread; 10 import core.time : dur, Duration; 11 import logger = std.experimental.logger; 12 import std.algorithm : filter, count, joiner, map; 13 import std.array : appender, empty, array; 14 import std.exception : collectException; 15 import std.stdio : File, fileno, writeln; 16 import std.typecons : Flag, Yes; 17 static import std.process; 18 static import std.stdio; 19 20 import my.gc.refc; 21 import my.from_; 22 import my.path; 23 import my.named_type; 24 25 public import proc.channel; 26 public import proc.pid; 27 public import proc.tty; 28 29 version (unittest) { 30 import std.file : remove; 31 } 32 33 /** Manage a process by reference counting so that it is terminated when the it 34 * stops being used such as the instance going out of scope. 35 */ 36 auto rcKill(T)(T p, int signal = SIGKILL) { 37 return refCounted(ScopeKill!T(p, signal)); 38 } 39 40 // backward compatibility. 41 alias scopeKill = rcKill; 42 43 struct ScopeKill(T) { 44 T process; 45 alias process this; 46 47 private int signal = SIGKILL; 48 private bool hasProcess; 49 50 this(T process, int signal) @safe { 51 this.process = process; 52 this.signal = signal; 53 this.hasProcess = true; 54 } 55 56 ~this() @safe { 57 if (hasProcess) 58 process.dispose(); 59 } 60 } 61 62 /// Async process wrapper for a std.process SpawnProcess 63 struct SpawnProcess { 64 import core.sys.posix.signal : SIGKILL; 65 import std.algorithm : among; 66 67 private { 68 enum State { 69 running, 70 terminated, 71 exitCode 72 } 73 74 std.process.Pid process; 75 RawPid pid; 76 int status_; 77 State st; 78 } 79 80 this(std.process.Pid process) @safe { 81 this.process = process; 82 this.pid = process.osHandle.RawPid; 83 } 84 85 ~this() @safe { 86 } 87 88 /// Returns: The raw OS handle for the process ID. 89 RawPid osHandle() nothrow @safe { 90 return pid; 91 } 92 93 /// Kill and cleanup the process. 94 void dispose() @safe { 95 final switch (st) { 96 case State.running: 97 this.kill; 98 this.wait; 99 break; 100 case State.terminated: 101 this.wait; 102 break; 103 case State.exitCode: 104 break; 105 } 106 107 st = State.exitCode; 108 } 109 110 /** Send `signal` to the process. 111 * 112 * Param: 113 * signal = a signal from `core.sys.posix.signal` 114 */ 115 void kill(int signal = SIGKILL) nothrow @trusted { 116 final switch (st) { 117 case State.running: 118 break; 119 case State.terminated: 120 goto case; 121 case State.exitCode: 122 return; 123 } 124 125 try { 126 std.process.kill(process, signal); 127 } catch (Exception e) { 128 } 129 130 st = State.terminated; 131 } 132 133 /// Blocking wait for the process to terminated. 134 /// Returns: the exit status. 135 int wait() @safe { 136 final switch (st) { 137 case State.running: 138 status_ = std.process.wait(process); 139 break; 140 case State.terminated: 141 status_ = std.process.wait(process); 142 break; 143 case State.exitCode: 144 break; 145 } 146 147 st = State.exitCode; 148 149 return status_; 150 } 151 152 /// Non-blocking wait for the process termination. 153 /// Returns: `true` if the process has terminated. 154 bool tryWait() @safe { 155 final switch (st) { 156 case State.running: 157 auto s = std.process.tryWait(process); 158 if (s.terminated) { 159 st = State.exitCode; 160 status_ = s.status; 161 } 162 break; 163 case State.terminated: 164 status_ = std.process.wait(process); 165 st = State.exitCode; 166 break; 167 case State.exitCode: 168 break; 169 } 170 171 return st.among(State.terminated, State.exitCode) != 0; 172 } 173 174 /// Returns: The exit status of the process. 175 int status() @safe { 176 if (st != State.exitCode) { 177 throw new Exception( 178 "Process has not terminated and wait/tryWait been called to collect the exit status"); 179 } 180 return status_; 181 } 182 183 /// Returns: If the process has terminated. 184 bool terminated() @safe { 185 return st.among(State.terminated, State.exitCode) != 0; 186 } 187 } 188 189 /// Async process that do not block on read from stdin/stderr. 190 struct PipeProcess { 191 import std.algorithm : among; 192 import core.sys.posix.signal : SIGKILL; 193 194 private { 195 enum State { 196 running, 197 terminated, 198 exitCode 199 } 200 201 std.process.ProcessPipes process; 202 std.process.Pid pid; 203 204 FileReadChannel stderr_; 205 FileReadChannel stdout_; 206 FileWriteChannel stdin_; 207 int status_; 208 State st; 209 } 210 211 this(std.process.Pid pid, File stdin, File stdout, File stderr) @safe { 212 this.pid = pid; 213 214 this.stdin_ = FileWriteChannel(stdin); 215 this.stdout_ = FileReadChannel(stdout); 216 this.stderr_ = FileReadChannel(stderr); 217 } 218 219 this(std.process.ProcessPipes process, std.process.Redirect r) @safe { 220 this.process = process; 221 this.pid = process.pid; 222 223 if (r & std.process.Redirect.stdin) { 224 stdin_ = FileWriteChannel(this.process.stdin); 225 } 226 if (r & std.process.Redirect.stdout) { 227 stdout_ = FileReadChannel(this.process.stdout); 228 } 229 if (r & std.process.Redirect.stderr) { 230 this.stderr_ = FileReadChannel(this.process.stderr); 231 } 232 } 233 234 /// Returns: The raw OS handle for the process ID. 235 RawPid osHandle() nothrow @safe { 236 return pid.osHandle.RawPid; 237 } 238 239 /// Access to stdout. 240 ref FileWriteChannel stdin() return scope nothrow @safe { 241 return stdin_; 242 } 243 244 /// Access to stdout. 245 ref FileReadChannel stdout() return scope nothrow @safe { 246 return stdout_; 247 } 248 249 /// Access stderr. 250 ref FileReadChannel stderr() return scope nothrow @safe { 251 return stderr_; 252 } 253 254 /// Kill and cleanup the process. 255 void dispose() @safe { 256 final switch (st) { 257 case State.running: 258 this.kill; 259 this.wait; 260 .destroy(process); 261 break; 262 case State.terminated: 263 this.wait; 264 .destroy(process); 265 break; 266 case State.exitCode: 267 break; 268 } 269 270 st = State.exitCode; 271 } 272 273 /** Send `signal` to the process. 274 * 275 * Param: 276 * signal = a signal from `core.sys.posix.signal` 277 */ 278 void kill(int signal = SIGKILL) nothrow @trusted { 279 final switch (st) { 280 case State.running: 281 break; 282 case State.terminated: 283 return; 284 case State.exitCode: 285 return; 286 } 287 288 try { 289 std.process.kill(pid, signal); 290 } catch (Exception e) { 291 } 292 293 st = State.terminated; 294 } 295 296 /// Blocking wait for the process to terminated. 297 /// Returns: the exit status. 298 int wait() @safe { 299 final switch (st) { 300 case State.running: 301 status_ = std.process.wait(pid); 302 break; 303 case State.terminated: 304 status_ = std.process.wait(pid); 305 break; 306 case State.exitCode: 307 break; 308 } 309 310 st = State.exitCode; 311 312 return status_; 313 } 314 315 /// Non-blocking wait for the process termination. 316 /// Returns: `true` if the process has terminated. 317 bool tryWait() @safe { 318 final switch (st) { 319 case State.running: 320 auto s = std.process.tryWait(pid); 321 if (s.terminated) { 322 st = State.exitCode; 323 status_ = s.status; 324 } 325 break; 326 case State.terminated: 327 status_ = std.process.wait(pid); 328 st = State.exitCode; 329 break; 330 case State.exitCode: 331 break; 332 } 333 334 return st.among(State.terminated, State.exitCode) != 0; 335 } 336 337 /// Returns: The exit status of the process. 338 int status() @safe { 339 if (st != State.exitCode) { 340 throw new Exception( 341 "Process has not terminated and wait/tryWait been called to collect the exit status"); 342 } 343 return status_; 344 } 345 346 /// Returns: If the process has terminated. 347 bool terminated() @safe { 348 return st.among(State.terminated, State.exitCode) != 0; 349 } 350 } 351 352 SpawnProcess spawnProcess(scope const(char[])[] args, File stdin = std.stdio.stdin, 353 File stdout = std.stdio.stdout, File stderr = std.stdio.stderr, 354 const string[string] env = null, std.process.Config config = std.process.Config.none, 355 scope const char[] workDir = null) { 356 return SpawnProcess(std.process.spawnProcess(args, stdin, stdout, stderr, 357 env, config, workDir)); 358 } 359 360 SpawnProcess spawnProcess(scope const(char[])[] args, const string[string] env, 361 std.process.Config config = std.process.Config.none, scope const(char)[] workDir = null) { 362 return SpawnProcess(std.process.spawnProcess(args, std.stdio.stdin, 363 std.stdio.stdout, std.stdio.stderr, env, config, workDir)); 364 } 365 366 SpawnProcess spawnProcess(scope const(char)[] program, 367 File stdin = std.stdio.stdin, File stdout = std.stdio.stdout, 368 File stderr = std.stdio.stderr, const string[string] env = null, 369 std.process.Config config = std.process.Config.none, scope const(char)[] workDir = null) { 370 return SpawnProcess(std.process.spawnProcess((&program)[0 .. 1], stdin, 371 stdout, stderr, env, config, workDir)); 372 } 373 374 SpawnProcess spawnShell(scope const(char)[] command, File stdin = std.stdio.stdin, 375 File stdout = std.stdio.stdout, File stderr = std.stdio.stderr, 376 scope const string[string] env = null, std.process.Config config = std.process.Config.none, 377 scope const(char)[] workDir = null, scope string shellPath = std.process.nativeShell) { 378 return SpawnProcess(std.process.spawnShell(command, stdin, stdout, stderr, 379 env, config, workDir, shellPath)); 380 } 381 382 /// ditto 383 SpawnProcess spawnShell(scope const(char)[] command, scope const string[string] env, 384 std.process.Config config = std.process.Config.none, 385 scope const(char)[] workDir = null, scope string shellPath = std.process.nativeShell) { 386 return SpawnProcess(std.process.spawnShell(command, env, config, workDir, shellPath)); 387 } 388 389 PipeProcess pipeProcess(scope const(char[])[] args, 390 std.process.Redirect redirect = std.process.Redirect.all, 391 const string[string] env = null, std.process.Config config = std.process.Config.none, 392 scope const(char)[] workDir = null) @safe { 393 return PipeProcess(std.process.pipeProcess(args, redirect, env, config, workDir), redirect); 394 } 395 396 PipeProcess pipeShell(scope const(char)[] command, 397 std.process.Redirect redirect = std.process.Redirect.all, 398 const string[string] env = null, std.process.Config config = std.process.Config.none, 399 scope const(char)[] workDir = null, string shellPath = std.process.nativeShell) @safe { 400 return PipeProcess(std.process.pipeShell(command, redirect, env, config, 401 workDir, shellPath), redirect); 402 } 403 404 /** Moves the process to a separate process group and on exit kill it and all 405 * its children. 406 */ 407 @safe struct Sandbox(ProcessT) { 408 import core.sys.posix.signal : SIGKILL; 409 410 private { 411 ProcessT p; 412 RawPid pid; 413 } 414 415 this(ProcessT p) @safe { 416 import core.sys.posix.unistd : setpgid; 417 418 this.p = p; 419 this.pid = p.osHandle; 420 setpgid(pid, 0); 421 } 422 423 RawPid osHandle() nothrow @safe { 424 return pid; 425 } 426 427 static if (__traits(hasMember, ProcessT, "stdin")) { 428 ref FileWriteChannel stdin() nothrow @safe { 429 return p.stdin; 430 } 431 } 432 433 static if (__traits(hasMember, ProcessT, "stdout")) { 434 ref FileReadChannel stdout() nothrow @safe { 435 return p.stdout; 436 } 437 } 438 439 static if (__traits(hasMember, ProcessT, "stderr")) { 440 ref FileReadChannel stderr() nothrow @safe { 441 return p.stderr; 442 } 443 } 444 445 void dispose() @safe { 446 // this also reaps the children thus cleaning up zombies 447 this.kill; 448 p.dispose; 449 } 450 451 /** Send `signal` to the process. 452 * 453 * Param: 454 * signal = a signal from `core.sys.posix.signal` 455 */ 456 void kill(int signal = SIGKILL) nothrow @safe { 457 // must first retrieve the submap because after the process is killed 458 // its children may have changed. 459 auto pmap = makePidMap.getSubMap(pid); 460 461 p.kill(signal); 462 463 // only kill and reap the children 464 pmap.remove(pid); 465 proc.pid.kill(pmap, Yes.onlyCurrentUser, signal).reap; 466 } 467 468 int wait() @safe { 469 return p.wait; 470 } 471 472 bool tryWait() @safe { 473 return p.tryWait; 474 } 475 476 int status() @safe { 477 return p.status; 478 } 479 480 bool terminated() @safe { 481 return p.terminated; 482 } 483 } 484 485 auto sandbox(T)(T p) @safe { 486 return Sandbox!T(p); 487 } 488 489 @("shall terminate a group of processes") 490 unittest { 491 import std.datetime.stopwatch : StopWatch, AutoStart; 492 493 immutable scriptName = makeScript(`#!/bin/bash 494 sleep 10m & 495 sleep 10m & 496 sleep 10m 497 `); 498 scope (exit) 499 remove(scriptName); 500 501 auto p = pipeProcess([scriptName]).sandbox.rcKill; 502 waitUntilChildren(p.osHandle, 3); 503 const preChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length; 504 p.kill; 505 Thread.sleep(500.dur!"msecs"); // wait for the OS to kill the children 506 const postChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length; 507 508 assert(p.wait == -9); 509 assert(p.terminated); 510 assert(preChildren == 3); 511 assert(postChildren == 0); 512 } 513 514 /** dispose the process after the timeout. 515 */ 516 @safe struct Timeout(ProcessT) { 517 import core.sys.posix.signal : SIGKILL; 518 import core.thread; 519 import std.algorithm : among; 520 import std.datetime : Clock, Duration; 521 522 private { 523 enum Msg { 524 none, 525 stop, 526 status, 527 } 528 529 enum Reply { 530 none, 531 running, 532 normalDeath, 533 killedByTimeout, 534 } 535 536 static struct Payload { 537 ProcessT p; 538 RawPid pid; 539 Background background; 540 Reply backgroundReply; 541 } 542 543 RefCounted!Payload rc; 544 } 545 546 this(ProcessT p, Duration timeout) @trusted { 547 import std.algorithm : move; 548 549 auto pid = p.osHandle; 550 rc = refCounted(Payload(move(p), pid)); 551 rc.background = new Background(&rc.p, timeout); 552 rc.background.isDaemon = true; 553 rc.background.start; 554 } 555 556 ~this() @trusted { 557 rc.release; 558 } 559 560 private static class Background : Thread { 561 import core.sync.condition : Condition; 562 import core.sync.mutex : Mutex; 563 564 Duration timeout; 565 ProcessT* p; 566 Mutex mtx; 567 Msg[] msg; 568 Reply reply_; 569 RawPid pid; 570 int signal = SIGKILL; 571 572 this(ProcessT* p, Duration timeout) { 573 this.p = p; 574 this.timeout = timeout; 575 this.mtx = new Mutex(); 576 this.pid = p.osHandle; 577 578 super(&run); 579 } 580 581 void run() { 582 checkProcess(this.pid, this.timeout, this); 583 } 584 585 void put(Msg msg) @trusted nothrow { 586 this.mtx.lock_nothrow(); 587 scope (exit) 588 this.mtx.unlock_nothrow(); 589 this.msg ~= msg; 590 } 591 592 Msg popMsg() @trusted nothrow { 593 this.mtx.lock_nothrow(); 594 scope (exit) 595 this.mtx.unlock_nothrow(); 596 if (msg.empty) 597 return Msg.none; 598 auto rval = msg[$ - 1]; 599 msg = msg[0 .. $ - 1]; 600 return rval; 601 } 602 603 void setReply(Reply reply_) @trusted nothrow { 604 this.mtx.lock_nothrow(); 605 scope (exit) 606 this.mtx.unlock_nothrow(); 607 this.reply_ = reply_; 608 } 609 610 Reply reply() @trusted nothrow { 611 this.mtx.lock_nothrow(); 612 scope (exit) 613 this.mtx.unlock_nothrow(); 614 return reply_; 615 } 616 617 void setSignal(int signal) @trusted nothrow { 618 this.mtx.lock_nothrow(); 619 scope (exit) 620 this.mtx.unlock_nothrow(); 621 this.signal = signal; 622 } 623 624 void kill() @trusted nothrow { 625 this.mtx.lock_nothrow(); 626 scope (exit) 627 this.mtx.unlock_nothrow(); 628 p.kill(signal); 629 } 630 } 631 632 private static void checkProcess(RawPid p, Duration timeout, Background bg) nothrow { 633 import std.algorithm : max, min; 634 import std.variant : Variant; 635 static import core.sys.posix.signal; 636 637 const stopAt = Clock.currTime + timeout; 638 // the purpose is to poll the process often "enough" that if it 639 // terminates early `Process` detects it fast enough. 1000 is chosen 640 // because it "feels good". the purpose 641 auto sleepInterval = min(50, max(1, timeout.total!"msecs" / 1000)).dur!"msecs"; 642 643 bool forceStop; 644 bool running = true; 645 while (running && Clock.currTime < stopAt) { 646 const msg = bg.popMsg; 647 648 final switch (msg) { 649 case Msg.none: 650 () @trusted { Thread.sleep(sleepInterval); }(); 651 break; 652 case Msg.stop: 653 forceStop = true; 654 running = false; 655 break; 656 case Msg.status: 657 bg.setReply(Reply.running); 658 break; 659 } 660 661 () @trusted { 662 if (core.sys.posix.signal.kill(p, 0) == -1) { 663 running = false; 664 } 665 }(); 666 } 667 668 // may be children alive thus must ensure that the whole process tree 669 // is killed if this is a sandbox with a timeout. 670 bg.kill(); 671 672 if (!forceStop && Clock.currTime >= stopAt) { 673 bg.setReply(Reply.killedByTimeout); 674 } else { 675 bg.setReply(Reply.normalDeath); 676 } 677 } 678 679 RawPid osHandle() nothrow @trusted { 680 return rc.pid; 681 } 682 683 static if (__traits(hasMember, ProcessT, "stdin")) { 684 ref FileWriteChannel stdin() nothrow @safe { 685 return rc.p.stdin; 686 } 687 } 688 689 static if (__traits(hasMember, ProcessT, "stdout")) { 690 ref FileReadChannel stdout() nothrow @safe { 691 return rc.p.stdout; 692 } 693 } 694 695 static if (__traits(hasMember, ProcessT, "stderr")) { 696 ref FileReadChannel stderr() nothrow @trusted { 697 return rc.p.stderr; 698 } 699 } 700 701 void dispose() @trusted { 702 if (rc.backgroundReply.among(Reply.none, Reply.running)) { 703 rc.background.put(Msg.stop); 704 rc.background.join; 705 rc.backgroundReply = rc.background.reply; 706 } 707 rc.p.dispose; 708 } 709 710 /** Send `signal` to the process. 711 * 712 * Param: 713 * signal = a signal from `core.sys.posix.signal` 714 */ 715 void kill(int signal = SIGKILL) nothrow @trusted { 716 rc.background.setSignal(signal); 717 rc.background.kill(); 718 } 719 720 int wait() @trusted { 721 while (!this.tryWait) { 722 Thread.sleep(20.dur!"msecs"); 723 } 724 return rc.p.wait; 725 } 726 727 bool tryWait() @trusted { 728 return rc.p.tryWait; 729 } 730 731 int status() @trusted { 732 return rc.p.status; 733 } 734 735 bool terminated() @trusted { 736 return rc.p.terminated; 737 } 738 739 bool timeoutTriggered() @trusted { 740 if (rc.backgroundReply.among(Reply.none, Reply.running)) { 741 rc.background.put(Msg.status); 742 rc.backgroundReply = rc.background.reply; 743 } 744 return rc.backgroundReply == Reply.killedByTimeout; 745 } 746 } 747 748 auto timeout(T)(T p, Duration timeout_) @trusted { 749 return Timeout!T(p, timeout_); 750 } 751 752 /// Returns when the process has pending data. 753 void waitForPendingData(ProcessT)(Process p) { 754 while (!p.pipe.hasPendingData || !p.stderr.hasPendingData) { 755 Thread.sleep(20.dur!"msecs"); 756 } 757 } 758 759 @("shall kill the process after the timeout") 760 unittest { 761 import std.datetime.stopwatch : StopWatch, AutoStart; 762 763 auto p = pipeProcess(["sleep", "1m"]).timeout(100.dur!"msecs").rcKill; 764 auto sw = StopWatch(AutoStart.yes); 765 p.wait; 766 sw.stop; 767 768 assert(sw.peek >= 100.dur!"msecs"); 769 assert(sw.peek <= 500.dur!"msecs"); 770 assert(p.wait == -9); 771 assert(p.terminated); 772 assert(p.status == -9); 773 assert(p.timeoutTriggered); 774 } 775 776 struct DrainElement { 777 enum Type { 778 stdout, 779 stderr, 780 } 781 782 Type type; 783 const(ubyte)[] data; 784 785 /// Returns: iterates the data as an input range. 786 auto byUTF8() @safe pure nothrow const @nogc { 787 static import std.utf; 788 789 return std.utf.byUTF!(const(char))(cast(const(char)[]) data); 790 } 791 792 bool empty() @safe pure nothrow const @nogc { 793 return data.length == 0; 794 } 795 } 796 797 /** A range that drains a process stdout/stderr until it terminates. 798 * 799 * There may be `DrainElement` that are empty. 800 */ 801 struct DrainRange(ProcessT) { 802 private { 803 enum State { 804 start, 805 draining, 806 lastStdout, 807 lastStderr, 808 lastElement, 809 empty, 810 } 811 812 ProcessT p; 813 DrainElement front_; 814 State st; 815 ubyte[] buf; 816 } 817 818 this(ProcessT p) { 819 this.p = p; 820 this.buf = new ubyte[4096]; 821 } 822 823 DrainElement front() @safe pure nothrow const @nogc { 824 assert(!empty, "Can't get front of an empty range"); 825 return front_; 826 } 827 828 void popFront() @safe { 829 assert(!empty, "Can't pop front of an empty range"); 830 831 static bool isAnyPipeOpen(ref ProcessT p) { 832 return p.stdout.isOpen || p.stderr.isOpen; 833 } 834 835 DrainElement readData(ref ProcessT p) @safe { 836 if (p.stderr.hasPendingData) { 837 return DrainElement(DrainElement.Type.stderr, p.stderr.read(buf)); 838 } else if (p.stdout.hasPendingData) { 839 return DrainElement(DrainElement.Type.stdout, p.stdout.read(buf)); 840 } 841 return DrainElement.init; 842 } 843 844 DrainElement waitUntilData() @safe { 845 import std.datetime : Clock; 846 847 // may livelock if the process never terminates and never writes to 848 // the terminal. timeout ensure that it sooner or later is break 849 // the loop. This is important if the drain is part of a timeout 850 // wrapping. 851 852 const timeout = 100.dur!"msecs"; 853 const stopAt = Clock.currTime + timeout; 854 const sleepFor = timeout / 20; 855 const useSleep = Clock.currTime + sleepFor; 856 bool running = true; 857 while (running) { 858 const now = Clock.currTime; 859 860 running = (now < stopAt) && isAnyPipeOpen(p); 861 862 auto bufRead = readData(p); 863 864 if (!bufRead.empty) { 865 return DrainElement(bufRead.type, bufRead.data.dup); 866 } else if (running && now > useSleep && bufRead.empty) { 867 import core.thread : Thread; 868 869 () @trusted { Thread.sleep(sleepFor); }(); 870 } 871 } 872 873 return DrainElement.init; 874 } 875 876 front_ = DrainElement.init; 877 878 final switch (st) { 879 case State.start: 880 st = State.draining; 881 front_ = waitUntilData; 882 break; 883 case State.draining: 884 if (p.terminated) { 885 st = State.lastStdout; 886 } else if (isAnyPipeOpen(p)) { 887 front_ = waitUntilData(); 888 } else { 889 st = State.lastStdout; 890 } 891 break; 892 case State.lastStdout: 893 if (p.stdout.hasPendingData) { 894 front_ = DrainElement(DrainElement.Type.stdout, p.stdout.read(buf).dup); 895 } else { 896 st = State.lastStderr; 897 } 898 break; 899 case State.lastStderr: 900 if (p.stderr.hasPendingData) { 901 front_ = DrainElement(DrainElement.Type.stderr, p.stderr.read(buf).dup); 902 } else { 903 st = State.lastElement; 904 } 905 break; 906 case State.lastElement: 907 st = State.empty; 908 break; 909 case State.empty: 910 break; 911 } 912 } 913 914 bool empty() @safe pure nothrow const @nogc { 915 return st == State.empty; 916 } 917 } 918 919 /// Drain a process pipe until empty. 920 auto drain(T)(T p) { 921 return DrainRange!T(p); 922 } 923 924 /// Read the data from a ReadChannel by line. 925 struct DrainByLineCopyRange(ProcessT) { 926 private { 927 enum State { 928 start, 929 draining, 930 lastLine, 931 lastBuf, 932 empty, 933 } 934 935 ProcessT process; 936 DrainRange!ProcessT range; 937 State st; 938 const(ubyte)[] buf; 939 const(char)[] line; 940 } 941 942 this(ProcessT p) { 943 process = p; 944 range = p.drain; 945 } 946 947 string front() @trusted pure nothrow const @nogc { 948 import std.exception : assumeUnique; 949 950 assert(!empty, "Can't get front of an empty range"); 951 return line.assumeUnique; 952 } 953 954 void popFront() @safe { 955 assert(!empty, "Can't pop front of an empty range"); 956 import std.algorithm : countUntil; 957 import std.array : array; 958 static import std.utf; 959 960 const(ubyte)[] updateBuf(size_t idx) { 961 const(ubyte)[] tmp; 962 if (buf.empty) { 963 // do nothing 964 } else if (idx == -1) { 965 tmp = buf; 966 buf = null; 967 } else { 968 idx = () { 969 if (idx < buf.length) { 970 return idx + 1; 971 } 972 return idx; 973 }(); 974 tmp = buf[0 .. idx]; 975 buf = buf[idx .. $]; 976 } 977 978 if (!tmp.empty && tmp[$ - 1] == '\n') { 979 tmp = tmp[0 .. $ - 1]; 980 } 981 return tmp; 982 } 983 984 void drainLine() { 985 void fillBuf() { 986 if (!range.empty) { 987 range.popFront; 988 } 989 if (!range.empty) { 990 buf ~= range.front.data; 991 } 992 } 993 994 size_t idx; 995 () { 996 int cnt; 997 do { 998 fillBuf(); 999 idx = buf.countUntil('\n'); 1000 // 2 is a magic number which mean that it at most wait 2x timeout for data 1001 } 1002 while (!range.empty && idx == -1 && cnt++ < 2); 1003 }(); 1004 1005 if (idx != -1) { 1006 auto tmp = updateBuf(idx); 1007 line = std.utf.byUTF!(const(char))(cast(const(char)[]) tmp).array; 1008 } 1009 } 1010 1011 bool lastLine() { 1012 size_t idx = buf.countUntil('\n'); 1013 if (idx == -1) 1014 return true; 1015 1016 auto tmp = updateBuf(idx); 1017 line = std.utf.byUTF!(const(char))(cast(const(char)[]) tmp).array; 1018 return false; 1019 } 1020 1021 line = null; 1022 final switch (st) { 1023 case State.start: 1024 drainLine; 1025 st = State.draining; 1026 break; 1027 case State.draining: 1028 drainLine; 1029 if (range.empty) 1030 st = State.lastLine; 1031 break; 1032 case State.lastLine: 1033 if (lastLine) 1034 st = State.lastBuf; 1035 break; 1036 case State.lastBuf: 1037 line = std.utf.byUTF!(const(char))(cast(const(char)[]) buf).array; 1038 st = State.empty; 1039 break; 1040 case State.empty: 1041 break; 1042 } 1043 } 1044 1045 bool empty() @safe pure nothrow const @nogc { 1046 return st == State.empty; 1047 } 1048 } 1049 1050 @("shall drain the process output by line") 1051 unittest { 1052 import std.algorithm : filter, joiner, map; 1053 import std.array : array; 1054 1055 auto p = pipeProcess(["dd", "if=/dev/zero", "bs=10", "count=3"]).rcKill; 1056 auto res = p.process.drainByLineCopy.filter!"!a.empty".array; 1057 1058 assert(res.length == 3); 1059 assert(res.joiner.count >= 30); 1060 assert(p.wait == 0); 1061 assert(p.terminated); 1062 } 1063 1064 auto drainByLineCopy(T)(T p) { 1065 return DrainByLineCopyRange!T(p); 1066 } 1067 1068 /// Drain the process output until it is done executing. 1069 auto drainToNull(T)(T p) { 1070 foreach (l; p.drain()) { 1071 } 1072 return p; 1073 } 1074 1075 /// Drain the output from the process into an output range. 1076 auto drain(ProcessT, T)(ProcessT p, ref T range) { 1077 foreach (l; p.drain()) { 1078 range.put(l); 1079 } 1080 return p; 1081 } 1082 1083 @("shall drain the output of a process while it is running with a separation of stdout and stderr") 1084 unittest { 1085 auto p = pipeProcess(["dd", "if=/dev/urandom", "bs=10", "count=3"]).rcKill; 1086 auto res = p.drain.array; 1087 1088 // this is just a sanity check. It has to be kind a high because there is 1089 // some wiggleroom allowed 1090 assert(res.count > 1 && res.count <= 50); 1091 1092 assert(res.filter!(a => a.type == DrainElement.Type.stdout) 1093 .map!(a => a.data) 1094 .joiner 1095 .count == 30); 1096 assert(p.wait == 0); 1097 assert(p.terminated); 1098 } 1099 1100 @("shall kill the process tree when the timeout is reached") 1101 unittest { 1102 immutable script = makeScript(`#!/bin/bash 1103 sleep 10m 1104 `); 1105 scope (exit) 1106 remove(script); 1107 1108 auto p = pipeProcess([script]).sandbox.timeout(1.dur!"seconds").rcKill; 1109 waitUntilChildren(p.osHandle, 1); 1110 const preChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length; 1111 const res = p.process.drain.array; 1112 const postChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length; 1113 1114 assert(p.wait == -9); 1115 assert(p.terminated); 1116 assert(preChildren == 1); 1117 assert(postChildren == 0); 1118 } 1119 1120 string makeScript(string script, string file = __FILE__, uint line = __LINE__) { 1121 import core.sys.posix.sys.stat; 1122 import std.file : getAttributes, setAttributes, thisExePath; 1123 import std.stdio : File; 1124 import std.path : baseName; 1125 import std.conv : to; 1126 1127 immutable fname = thisExePath ~ "_" ~ file.baseName ~ line.to!string ~ ".sh"; 1128 1129 File(fname, "w").writeln(script); 1130 setAttributes(fname, getAttributes(fname) | S_IXUSR | S_IXGRP | S_IXOTH); 1131 return fname; 1132 } 1133 1134 /// Wait for p to have num children or fail after 10s. 1135 void waitUntilChildren(RawPid p, int num) { 1136 import std.datetime : Clock; 1137 1138 const failAt = Clock.currTime + 10.dur!"seconds"; 1139 do { 1140 Thread.sleep(50.dur!"msecs"); 1141 if (Clock.currTime > failAt) 1142 break; 1143 } 1144 while (makePidMap.getSubMap(p).remove(p).length < num); 1145 } 1146 1147 alias Address = NamedType!(ulong, Tag!"Address", ulong.init, TagStringable); 1148 struct AddressMap { 1149 Address begin; 1150 Address end; 1151 NamedType!(string, Tag!"Permission", string.init, TagStringable) perm; 1152 } 1153 1154 /** An assoc array mapping the path of each shared library loaded by 1155 * the process to the address it is loaded at in the process address space. 1156 */ 1157 AddressMap[][AbsolutePath] libs(RawPid pid) nothrow { 1158 import std.stdio : File; 1159 import std.format : formattedRead, format; 1160 import std.algorithm : countUntil; 1161 import std.typecons : tuple; 1162 import std.string : strip; 1163 1164 typeof(return) rval; 1165 1166 try { 1167 foreach (l; File(format!"/proc/%s/maps"(pid)).byLine 1168 .map!(a => tuple(a, a.countUntil('/'))) 1169 .filter!(a => a[1] != -1)) { 1170 try { 1171 auto p = AbsolutePath(l[0][l[1] .. $].idup); 1172 auto m = l[0][0 .. l[1]].strip; 1173 ulong begin; 1174 ulong end; 1175 char[] perm; 1176 if (formattedRead!"%x-%x %s "(m, begin, end, perm) != 3) { 1177 continue; 1178 } 1179 1180 auto amap = AddressMap(Address(begin), Address(end), 1181 typeof(AddressMap.init.perm)(perm.idup)); 1182 if (auto v = p in rval) { 1183 *v ~= amap; 1184 } else { 1185 rval[p] = [amap]; 1186 } 1187 } catch (Exception e) { 1188 } 1189 } 1190 } catch (Exception e) { 1191 logger.trace(e.msg).collectException; 1192 } 1193 1194 return rval; 1195 } 1196 1197 @("shall read the libraries the process is using") 1198 unittest { 1199 immutable scriptName = makeScript(`#!/bin/bash 1200 sleep 10m & 1201 sleep 10m & 1202 sleep 10m 1203 `); 1204 scope (exit) 1205 remove(scriptName); 1206 1207 auto p = pipeProcess([scriptName]).sandbox.rcKill; 1208 waitUntilChildren(p.osHandle, 3); 1209 auto res = libs(p.osHandle); 1210 p.kill; 1211 1212 assert(res.length != 0); 1213 assert(AbsolutePath("/bin/bash") in res); 1214 } 1215 1216 /** Parses the output from a run of 'ldd' on a binary. 1217 * 1218 * Params: 1219 * input = an input range of lines which is the output from ldd 1220 * 1221 * Example 1222 * --- 1223 * writeln(parseLddOutput(` 1224 * linux-vdso.so.1 => (0x00007fffbf5fe000) 1225 * libtinfo.so.5 => /lib/x86_64-linux-gnu/libtinfo.so.5 (0x00007fe28117f000) 1226 * libdl.so.2 => /lib/x86_64-linux-gnu/libdl.so.2 (0x00007fe280f7b000) 1227 * libc.so.6 => /lib/x86_64-linux-gnu/libc.so.6 (0x00007fe280bb4000) 1228 * /lib64/ld-linux-x86-64.so.2 (0x00007fe2813dd000) 1229 * `)) 1230 * --- 1231 * 1232 * Returns: a dictionary of {path: address} for each library required by the 1233 * specified binary. 1234 */ 1235 Address[AbsolutePath] parseLddOutput(T)(T input) if (std_.range.isInputRange!T) { 1236 import std.regex : regex, matchFirst; 1237 import std.format : formattedRead; 1238 1239 typeof(return) rval; 1240 const reLinux = regex(`\s(?P<lib>\S?/\S+)\s+\((?P<addr>0x.+)\)`); 1241 1242 foreach (l; input) { 1243 auto m = matchFirst(l, reLinux); 1244 if (m.empty || m.length < 3) { 1245 continue; 1246 } 1247 1248 try { 1249 ulong addr; 1250 formattedRead!"0x%x"(m["addr"], addr); 1251 rval[AbsolutePath(m["lib"])] = Address(addr); 1252 } catch (Exception e) { 1253 } 1254 } 1255 1256 return rval; 1257 } 1258 1259 @("shall parse the output of ldd") 1260 unittest { 1261 auto res = parseLddOutput([ 1262 "linux-vdso.so.1 => (0x00007fffbf5fe000)", 1263 "libtinfo.so.5 => /lib/x86_64-linux-gnu/libtinfo.so.5 (0x00007fe28117f000)", 1264 "libdl.so.2 => /lib/x86_64-linux-gnu/libdl.so.2 (0x00007fe280f7b000)", 1265 "libc.so.6 => /lib/x86_64-linux-gnu/libc.so.6 (0x00007fe280bb4000)", 1266 "/lib64/ld-linux-x86-64.so.2 (0x00007fe2813dd000)" 1267 ]); 1268 assert(res.length == 3); 1269 assert(res[AbsolutePath("/lib/x86_64-linux-gnu/libtinfo.so.5")].get == 140610805166080); 1270 }