diff --git a/src/App.hs b/src/App.hs index d810d17..dbf3969 100644 --- a/src/App.hs +++ b/src/App.hs @@ -8,7 +8,7 @@ module App where import Universum hiding (force) import System.Environment (setEnv, lookupEnv, getEnvironment) -import System.Process (CreateProcess (..), StdStream (CreatePipe, UseHandle), proc, waitForProcess, createPipe, readCreateProcess, withCreateProcess) +import System.Process (CreateProcess (..), StdStream (CreatePipe, UseHandle), proc, waitForProcess, createPipe, readCreateProcess, withCreateProcess) import System.IO ( openBinaryFile, hSetBuffering, BufferMode(..), hFlush ) import qualified System.FilePath as FilePath @@ -16,8 +16,7 @@ import System.FilePath (()) import System.Directory ( createDirectoryIfMissing, doesFileExist, getCurrentDirectory, createDirectory ) import qualified Data.ByteString.Char8 as B8 import Control.Concurrent.Async (async, wait, cancel) -import Control.Exception.Base (handle, throwIO) -import System.IO.Error (isEOFError, IOError) +import Control.Exception.Base (handle) import System.Posix.ByteString (stdOutput, fdToHandle, handleToFd) import System.Posix (Fd, dup, stdError) import System.Posix.Files (getFdStatus, deviceID, fileID) @@ -33,8 +32,6 @@ import qualified Data.Text.IO as Text import GHC.IO.Exception (ExitCode(..)) import Crypto.Hash qualified as H import Data.Containers.ListUtils (nubOrdOn) -import GHC.IO.Handle (hDuplicate) -import System.Timeout (timeout) import Prelude (read) import Types import Utils @@ -133,7 +130,7 @@ main = do -- Recursive: AppState is used before process is started (mostly for logging) rec - appState <- AppState settings jobName buildId isToplevel <$> newIORef Nothing <*> newIORef Nothing <*> newIORef False <*> pure toplevelStderr <*> pure subprocessStderr <*> pure logFile <*> newIORef [] + appState <- AppState settings jobName buildId isToplevel <$> newIORef Nothing <*> newIORef Nothing <*> newIORef False <*> pure toplevelStderr <*> pure logFile <*> newIORef [] <*> newIORef Nothing when (isToplevel && appState.settings.enableCommitStatus) do @@ -158,8 +155,6 @@ main = do ] <> parentEnv } - (subprocessStderrRead, subprocessStderr) <- createPipe - logDebug appState $ "Running command: " <> show (args.cmd : args.args) logDebug appState $ " buildId: " <> show buildId logDebug appState $ " cwd: " <> show cwd @@ -169,7 +164,6 @@ main = do stdoutHandler <- async $ outputStreamHandler appState toplevelStdout "stdout" stdoutPipe stderrHandler <- async $ outputStreamHandler appState toplevelStderr "stderr" stderrPipe - subprocessStderrHandler <- async $ outputStreamHandler appState toplevelStderr "stderr" subprocessStderrRead exitCode <- waitForProcess processHandle @@ -181,6 +175,11 @@ main = do then discardQuietBuffer appState -- Success: discard buffered output else flushQuietBuffer appState toplevelStderr -- Failure: show buffered output + -- Wait for output stream handlers to finish before logging status messages, + -- to ensure all subprocess output is flushed first. + timeoutStream appState "stdout" $ wait stdoutHandler + timeoutStream appState "stderr" $ wait stderrHandler + logDebug appState $ "Command " <> show (args.cmd : args.args) <> " exited with code " <> show exitCode logDebugParent m_parentRequestPipe $ "Subtask " <> toText jobName <> " finished with " <> show exitCode @@ -219,15 +218,6 @@ main = do branch <- getCurrentBranch appState RemoteCache.setLatestBuildHash appState s (toText appState.jobName) branch h.hash - timeoutStream appState "stdout" $ wait stdoutHandler - - -- We duplicate `subprocessStderr` before actually passing it to a - -- subprocess, so the original handle doesn't get closed by - -- `createProcess`. We must close it manually. - hClose appState.subprocessStderr - timeoutStream appState "stderr" $ wait stderrHandler - timeoutStream appState "stderr" $ wait subprocessStderrHandler - cancel cmdHandler -- Check if there's an existing status for this task/commit @@ -323,7 +313,7 @@ newBuildId = toText . iso8601Show <$> getCurrentTime runPostUnpackCmd :: AppState -> String -> IO () runPostUnpackCmd appState cmd = do logDebug appState $ "Running post-unpack cmd " <> show cmd - bracket (hDuplicate appState.subprocessStderr) hClose \stderr_ -> + withStderrPipe appState \stderr_ -> withCreateProcess ( (Process.shell cmd) { std_out = UseHandle stderr_ -- TODO: really, we should have subprocesStdout also @@ -380,26 +370,6 @@ toplevelStreams = do hSetBuffering hErr LineBuffering pure (hOut, hErr) -timeoutStream :: AppState -> Text -> IO () -> IO () -timeoutStream appState streamName action = do - result <- timeout (appState.settings.outputStreamTimeout * 1000000) action - when (isNothing result) do - logWarn appState $ "taskrunner: Task did not close " <> streamName <> " " <> show appState.settings.outputStreamTimeout <> " seconds after exiting." - logWarn appState "Perhaps the file descriptor was leaked to a background process?" - logWarn appState "Build will continue despite this error, but some output may be lost." - - -- Note: this used to be a fatal error (exited with non-zero status), - -- but builds failed too often due to this, and we don't want to be forced to debug it every time, - -- or to retry the build. - -- - -- Later we might want to add some monitoring so that we can see if this happens often. - -outputStreamHandler :: AppState -> Handle -> ByteString -> Handle -> IO () -outputStreamHandler appState toplevelOutput streamName stream = do - handle ignoreEOF $ forever do - line <- B8.hGetLine stream - outputLine appState toplevelOutput streamName line - commandHandler :: AppState -> Handle -> Handle -> IO () commandHandler appState requestPipe responsePipe = handle ignoreEOF $ forever do @@ -583,15 +553,11 @@ hashFilename appState = appState.settings.stateDirectory "hash" (appStat hashFileInputs :: AppState -> [FilePath] -> IO Text hashFileInputs appState inputs = - bracket (hDuplicate appState.subprocessStderr) hClose \stderr_ -> + withStderrPipe appState \stderr_ -> Text.strip . Text.pack <$> readCreateProcess (proc "bash" (["-c", $(embedStringFile "src/hash-files.sh"), "hash-files"] <> inputs)) { std_err = UseHandle stderr_ } "" -ignoreEOF :: IOError -> IO () -ignoreEOF e | isEOFError e = pure () - | otherwise = throwIO e - hexSha1 :: Text -> Text hexSha1 str = show (H.hash (encodeUtf8 str :: ByteString) :: H.Digest H.SHA1) diff --git a/src/RemoteCache.hs b/src/RemoteCache.hs index 481d9c7..48fa670 100644 --- a/src/RemoteCache.hs +++ b/src/RemoteCache.hs @@ -26,7 +26,7 @@ import Network.URI (parseURI, URI (..), URIAuth(..)) import System.Directory (makeAbsolute, canonicalizePath) import System.FilePath (makeRelative) import qualified System.FilePath as FP -import Utils (bail, logDebug, logFileName, logInfo) +import Utils (bail, logDebug, logFileName, logInfo, withStderrPipe) import qualified Amazonka as AWS import Control.Exception.Lens (handling) import System.Exit (ExitCode(..)) @@ -37,15 +37,15 @@ import qualified Data.Text.Lazy.Builder as TLB import Amazonka.S3.PutObject (newPutObject, PutObject(..)) -packTar :: MonadResource m => AppState -> FilePath -> [FilePath] -> ConduitT () BS.ByteString m () -packTar appState workdir files = do +packTar :: MonadResource m => AppState -> Handle -> FilePath -> [FilePath] -> ConduitT () BS.ByteString m () +packTar appState stderrHandle workdir files = do let cmd = "tar" let args = if null files then ["-c", "--files-from=/dev/null"] else ["-c"] <> files liftIO $ logDebug appState $ "Running subprocess: " <> show (cmd:args) <> " in cwd " <> show workdir bracketP ( createProcess_ "createProcess_" (proc cmd args) { std_out = CreatePipe - , std_err = UseHandle appState.subprocessStderr + , std_err = UseHandle stderrHandle , cwd = Just workdir } ) cleanupProcess \case @@ -57,15 +57,15 @@ packTar appState workdir files = do _ -> error "unable to obtain stdout pipe" -unpackTar :: MonadResource m => AppState -> FilePath -> ConduitT BS.ByteString Void m () -unpackTar appState workdir = do +unpackTar :: MonadResource m => AppState -> Handle -> FilePath -> ConduitT BS.ByteString Void m () +unpackTar appState stderrHandle workdir = do let cmd = "tar" let args = ["-x", "--zstd"] liftIO $ logDebug appState $ "Running subprocess: " <> show (cmd:args) <> " in cwd " <> show workdir bracketP ( createProcess_ "createProcess_" (proc cmd args) { std_in = CreatePipe - , std_err = UseHandle appState.subprocessStderr + , std_err = UseHandle stderrHandle , cwd = Just workdir } ) cleanupProcess \case @@ -145,11 +145,12 @@ saveCache appState settings relativeCacheRoot files archiveName = do logDebug appState $ "Uploading to s3://" <> bucket <> "/" <> objectKey - runConduitRes do + withStderrPipe appState \stderrHandle -> + runConduitRes do let multipartUpload = (newCreateMultipartUpload (BucketName bucket) (ObjectKey objectKey) :: CreateMultipartUpload) { storageClass = Just StorageClass_REDUCED_REDUNDANCY } result <- - packTar appState cacheRoot filesRelativeToCacheRoot + packTar appState stderrHandle cacheRoot filesRelativeToCacheRoot .| Zstd.compress 3 .| streamUpload env Nothing multipartUpload case result of @@ -180,13 +181,14 @@ restoreCache appState settings cacheRoot archiveName logMode = do logDebug appState $ "Remote cache archive not found s3://" <> bucket <> "/" <> objectKey pure False - handling _NoSuchKey onNoSuchKey $ runConduitRes do - response <- AWS.send env $ newGetObject (BucketName bucket) (ObjectKey objectKey) - when (logMode == Log) do - liftIO $ logInfo appState $ "Found remote cache " <> archiveName <> ", restoring" - response.body.body - .| unpackTar appState cacheRoot - pure True + handling _NoSuchKey onNoSuchKey $ withStderrPipe appState \stderrHandle -> + runConduitRes do + response <- AWS.send env $ newGetObject (BucketName bucket) (ObjectKey objectKey) + when (logMode == Log) do + liftIO $ logInfo appState $ "Found remote cache " <> archiveName <> ", restoring" + response.body.body + .| unpackTar appState stderrHandle cacheRoot + pure True getLatestBuildHash :: AppState diff --git a/src/Types.hs b/src/Types.hs index 8b6a70d..64eb6c1 100644 --- a/src/Types.hs +++ b/src/Types.hs @@ -50,7 +50,6 @@ data AppState = AppState , snapshotArgsRef :: IORef (Maybe SnapshotCliArgs) , skipped :: IORef Bool , toplevelStderr :: Handle - , subprocessStderr :: Handle , logOutput :: Handle , quietBuffer :: IORef [ByteString] diff --git a/src/Utils.hs b/src/Utils.hs index 565879d..30a67eb 100644 --- a/src/Utils.hs +++ b/src/Utils.hs @@ -5,15 +5,18 @@ import Universum import qualified Data.ByteString.Char8 as B8 import Data.Time (getCurrentTime, formatTime, defaultTimeLocale) import Types -import Control.Exception (throwIO) +import Control.Exception (throwIO, handle) +import System.IO.Error (isEOFError, IOError) import Text.Printf (printf) import Prelude (until) import Data.List ((!!)) -import System.Process (CreateProcess(..), StdStream (..), readCreateProcess) +import System.Process (CreateProcess(..), StdStream (..), readCreateProcess, createPipe) import Data.Conduit.Process (proc) import qualified Data.Text as Text -import GHC.IO.Handle (hDuplicate, hIsClosed) +import GHC.IO.Handle (hIsClosed) import System.FilePath (()) +import Control.Concurrent.Async (async, wait) +import System.Timeout (timeout) outputLine :: AppState -> Handle -> ByteString -> ByteString -> IO () outputLine appState toplevelOutput streamName line = do @@ -87,10 +90,21 @@ bytesfmt formatter bs = printf (formatter <> " %s") bytesSuffixes = ["B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB"] bytesSuffix = bytesSuffixes !! i +-- | Create a per-subprocess stderr pipe that prefixes output with the job name. +-- The pipe is fully drained before returning. +withStderrPipe :: AppState -> (Handle -> IO a) -> IO a +withStderrPipe appState action = do + (readEnd, writeEnd) <- createPipe + handler <- async $ outputStreamHandler appState appState.toplevelStderr "stderr" readEnd + result <- action writeEnd `finally` do + hClose writeEnd + timeoutStream appState "stderr" $ wait handler + pure result + isDirtyAtPaths :: AppState -> [FilePath] -> IO Bool isDirtyAtPaths _ [] = pure False isDirtyAtPaths appState paths = - bracket (hDuplicate appState.subprocessStderr) hClose \stderr_ -> do + withStderrPipe appState \stderr_ -> do output <- readCreateProcess (proc "git" (["status", "--porcelain", "--untracked-files=no", "--"] ++ paths)) @@ -101,7 +115,7 @@ isDirtyAtPaths appState paths = getCurrentBranch :: AppState -> IO Text getCurrentBranch appState = - bracket (hDuplicate appState.subprocessStderr) hClose \stderr_ -> + withStderrPipe appState \stderr_ -> Text.strip . Text.pack <$> readCreateProcess (proc "git" ["symbolic-ref", "--short", "HEAD"]) { std_err = UseHandle stderr_ } "" @@ -112,18 +126,16 @@ getMainBranchCommit appState = Nothing -> pure Nothing Just branch -> - bracket (hDuplicate appState.subprocessStderr) hClose \stderr_ -> + withStderrPipe appState \stderr_ -> Just . Text.strip . Text.pack <$> readCreateProcess (proc "git" ["merge-base", "HEAD", "origin/" <> toString branch]) { std_err = UseHandle stderr_ } "" getCurrentCommit :: AppState -> IO Text -getCurrentCommit _appState = - -- TODO: fix: we can't use subprocessStderr here because it's used after closing output collector - -- Using normal stderr for now --- bracket (hDuplicate appState.subprocessStderr) hClose \stderr_ -> +getCurrentCommit appState = + withStderrPipe appState \stderr_ -> Text.strip . Text.pack <$> readCreateProcess - (proc "git" ["rev-parse", "HEAD"]) + (proc "git" ["rev-parse", "HEAD"]) { std_err = UseHandle stderr_ } "" logFileName :: Settings -> BuildId -> JobName -> FilePath @@ -141,3 +153,21 @@ flushQuietBuffer appState toplevelOutput = do -- | Discard buffered output (used when task succeeds in quiet mode) discardQuietBuffer :: AppState -> IO () discardQuietBuffer appState = writeIORef appState.quietBuffer [] + +outputStreamHandler :: AppState -> Handle -> ByteString -> Handle -> IO () +outputStreamHandler appState toplevelOutput streamName stream = do + handle ignoreEOF $ forever do + line <- B8.hGetLine stream + outputLine appState toplevelOutput streamName line + +timeoutStream :: AppState -> Text -> IO () -> IO () +timeoutStream appState streamName action = do + result <- timeout (appState.settings.outputStreamTimeout * 1000000) action + when (isNothing result) do + logWarn appState $ "taskrunner: Task did not close " <> streamName <> " " <> show appState.settings.outputStreamTimeout <> " seconds after exiting." + logWarn appState "Perhaps the file descriptor was leaked to a background process?" + logWarn appState "Build will continue despite this error, but some output may be lost." + +ignoreEOF :: IOError -> IO () +ignoreEOF e | isEOFError e = pure () + | otherwise = throwIO e diff --git a/test/t/post-unpack-cmd.out b/test/t/post-unpack-cmd.out new file mode 100644 index 0000000..8ece074 --- /dev/null +++ b/test/t/post-unpack-cmd.out @@ -0,0 +1,6 @@ +-- output: +[mytask] info | Inputs changed, running task +[mytask] stdout | Main task output +[mytask] info | success +[mytask] stderr | post-unpack-stdout +[mytask] stderr | post-unpack-stderr diff --git a/test/t/post-unpack-cmd.txt b/test/t/post-unpack-cmd.txt new file mode 100644 index 0000000..74c9f55 --- /dev/null +++ b/test/t/post-unpack-cmd.txt @@ -0,0 +1,14 @@ +# no toplevel + +echo "input data" > input.txt +git init -q +git add input.txt +git commit -qm "initial" + +taskrunner -n mytask bash -e -c ' + snapshot input.txt --outputs --cmd " + echo post-unpack-stdout + echo post-unpack-stderr >&2 + " + echo "Main task output" +'