Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 10 additions & 44 deletions src/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,15 @@ module App where
import Universum hiding (force)

import System.Environment (setEnv, lookupEnv, getEnvironment)
import System.Process (CreateProcess (..), StdStream (CreatePipe, UseHandle), proc, waitForProcess, createPipe, readCreateProcess, withCreateProcess)
import System.Process (CreateProcess (..), StdStream (CreatePipe, UseHandle), proc, waitForProcess, createPipe, readCreateProcess, withCreateProcess)
import System.IO
( openBinaryFile, hSetBuffering, BufferMode(..), hFlush )
import qualified System.FilePath as FilePath
import System.FilePath ((</>))
import System.Directory ( createDirectoryIfMissing, doesFileExist, getCurrentDirectory, createDirectory )
import qualified Data.ByteString.Char8 as B8
import Control.Concurrent.Async (async, wait, cancel)
import Control.Exception.Base (handle, throwIO)
import System.IO.Error (isEOFError, IOError)
import Control.Exception.Base (handle)
import System.Posix.ByteString (stdOutput, fdToHandle, handleToFd)
import System.Posix (Fd, dup, stdError)
import System.Posix.Files (getFdStatus, deviceID, fileID)
Expand All @@ -33,8 +32,6 @@ import qualified Data.Text.IO as Text
import GHC.IO.Exception (ExitCode(..))
import Crypto.Hash qualified as H
import Data.Containers.ListUtils (nubOrdOn)
import GHC.IO.Handle (hDuplicate)
import System.Timeout (timeout)
import Prelude (read)
import Types
import Utils
Expand Down Expand Up @@ -133,7 +130,7 @@ main = do
-- Recursive: AppState is used before process is started (mostly for logging)
rec

appState <- AppState settings jobName buildId isToplevel <$> newIORef Nothing <*> newIORef Nothing <*> newIORef False <*> pure toplevelStderr <*> pure subprocessStderr <*> pure logFile <*> newIORef []
appState <- AppState settings jobName buildId isToplevel <$> newIORef Nothing <*> newIORef Nothing <*> newIORef False <*> pure toplevelStderr <*> pure logFile <*> newIORef []
<*> newIORef Nothing

when (isToplevel && appState.settings.enableCommitStatus) do
Expand All @@ -158,8 +155,6 @@ main = do
] <> parentEnv
}

(subprocessStderrRead, subprocessStderr) <- createPipe

logDebug appState $ "Running command: " <> show (args.cmd : args.args)
logDebug appState $ " buildId: " <> show buildId
logDebug appState $ " cwd: " <> show cwd
Expand All @@ -169,7 +164,6 @@ main = do

stdoutHandler <- async $ outputStreamHandler appState toplevelStdout "stdout" stdoutPipe
stderrHandler <- async $ outputStreamHandler appState toplevelStderr "stderr" stderrPipe
subprocessStderrHandler <- async $ outputStreamHandler appState toplevelStderr "stderr" subprocessStderrRead

exitCode <- waitForProcess processHandle

Expand All @@ -181,6 +175,11 @@ main = do
then discardQuietBuffer appState -- Success: discard buffered output
else flushQuietBuffer appState toplevelStderr -- Failure: show buffered output

-- Wait for output stream handlers to finish before logging status messages,
-- to ensure all subprocess output is flushed first.
timeoutStream appState "stdout" $ wait stdoutHandler
timeoutStream appState "stderr" $ wait stderrHandler

logDebug appState $ "Command " <> show (args.cmd : args.args) <> " exited with code " <> show exitCode
logDebugParent m_parentRequestPipe $ "Subtask " <> toText jobName <> " finished with " <> show exitCode

Expand Down Expand Up @@ -219,15 +218,6 @@ main = do
branch <- getCurrentBranch appState
RemoteCache.setLatestBuildHash appState s (toText appState.jobName) branch h.hash

timeoutStream appState "stdout" $ wait stdoutHandler

-- We duplicate `subprocessStderr` before actually passing it to a
-- subprocess, so the original handle doesn't get closed by
-- `createProcess`. We must close it manually.
hClose appState.subprocessStderr
timeoutStream appState "stderr" $ wait stderrHandler
timeoutStream appState "stderr" $ wait subprocessStderrHandler

cancel cmdHandler

-- Check if there's an existing status for this task/commit
Expand Down Expand Up @@ -323,7 +313,7 @@ newBuildId = toText . iso8601Show <$> getCurrentTime
runPostUnpackCmd :: AppState -> String -> IO ()
runPostUnpackCmd appState cmd = do
logDebug appState $ "Running post-unpack cmd " <> show cmd
bracket (hDuplicate appState.subprocessStderr) hClose \stderr_ ->
withStderrPipe appState \stderr_ ->
withCreateProcess (
(Process.shell cmd)
{ std_out = UseHandle stderr_ -- TODO: really, we should have subprocesStdout also
Expand Down Expand Up @@ -380,26 +370,6 @@ toplevelStreams = do
hSetBuffering hErr LineBuffering
pure (hOut, hErr)

timeoutStream :: AppState -> Text -> IO () -> IO ()
timeoutStream appState streamName action = do
result <- timeout (appState.settings.outputStreamTimeout * 1000000) action
when (isNothing result) do
logWarn appState $ "taskrunner: Task did not close " <> streamName <> " " <> show appState.settings.outputStreamTimeout <> " seconds after exiting."
logWarn appState "Perhaps the file descriptor was leaked to a background process?"
logWarn appState "Build will continue despite this error, but some output may be lost."

-- Note: this used to be a fatal error (exited with non-zero status),
-- but builds failed too often due to this, and we don't want to be forced to debug it every time,
-- or to retry the build.
--
-- Later we might want to add some monitoring so that we can see if this happens often.

outputStreamHandler :: AppState -> Handle -> ByteString -> Handle -> IO ()
outputStreamHandler appState toplevelOutput streamName stream = do
handle ignoreEOF $ forever do
line <- B8.hGetLine stream
outputLine appState toplevelOutput streamName line

commandHandler :: AppState -> Handle -> Handle -> IO ()
commandHandler appState requestPipe responsePipe =
handle ignoreEOF $ forever do
Expand Down Expand Up @@ -583,15 +553,11 @@ hashFilename appState = appState.settings.stateDirectory </> "hash" </> (appStat

hashFileInputs :: AppState -> [FilePath] -> IO Text
hashFileInputs appState inputs =
bracket (hDuplicate appState.subprocessStderr) hClose \stderr_ ->
withStderrPipe appState \stderr_ ->
Text.strip . Text.pack <$> readCreateProcess
(proc "bash" (["-c", $(embedStringFile "src/hash-files.sh"), "hash-files"] <> inputs))
{ std_err = UseHandle stderr_ }
""

ignoreEOF :: IOError -> IO ()
ignoreEOF e | isEOFError e = pure ()
| otherwise = throwIO e

hexSha1 :: Text -> Text
hexSha1 str = show (H.hash (encodeUtf8 str :: ByteString) :: H.Digest H.SHA1)
34 changes: 18 additions & 16 deletions src/RemoteCache.hs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import Network.URI (parseURI, URI (..), URIAuth(..))
import System.Directory (makeAbsolute, canonicalizePath)
import System.FilePath (makeRelative)
import qualified System.FilePath as FP
import Utils (bail, logDebug, logFileName, logInfo)
import Utils (bail, logDebug, logFileName, logInfo, withStderrPipe)
import qualified Amazonka as AWS
import Control.Exception.Lens (handling)
import System.Exit (ExitCode(..))
Expand All @@ -37,15 +37,15 @@ import qualified Data.Text.Lazy.Builder as TLB
import Amazonka.S3.PutObject (newPutObject, PutObject(..))


packTar :: MonadResource m => AppState -> FilePath -> [FilePath] -> ConduitT () BS.ByteString m ()
packTar appState workdir files = do
packTar :: MonadResource m => AppState -> Handle -> FilePath -> [FilePath] -> ConduitT () BS.ByteString m ()
packTar appState stderrHandle workdir files = do
let cmd = "tar"
let args = if null files then ["-c", "--files-from=/dev/null"] else ["-c"] <> files
liftIO $ logDebug appState $ "Running subprocess: " <> show (cmd:args) <> " in cwd " <> show workdir
bracketP ( createProcess_ "createProcess_"
(proc cmd args)
{ std_out = CreatePipe
, std_err = UseHandle appState.subprocessStderr
, std_err = UseHandle stderrHandle
, cwd = Just workdir
}
) cleanupProcess \case
Expand All @@ -57,15 +57,15 @@ packTar appState workdir files = do
_ ->
error "unable to obtain stdout pipe"

unpackTar :: MonadResource m => AppState -> FilePath -> ConduitT BS.ByteString Void m ()
unpackTar appState workdir = do
unpackTar :: MonadResource m => AppState -> Handle -> FilePath -> ConduitT BS.ByteString Void m ()
unpackTar appState stderrHandle workdir = do
let cmd = "tar"
let args = ["-x", "--zstd"]
liftIO $ logDebug appState $ "Running subprocess: " <> show (cmd:args) <> " in cwd " <> show workdir
bracketP ( createProcess_ "createProcess_"
(proc cmd args)
{ std_in = CreatePipe
, std_err = UseHandle appState.subprocessStderr
, std_err = UseHandle stderrHandle
, cwd = Just workdir
}
) cleanupProcess \case
Expand Down Expand Up @@ -145,11 +145,12 @@ saveCache appState settings relativeCacheRoot files archiveName = do

logDebug appState $ "Uploading to s3://" <> bucket <> "/" <> objectKey

runConduitRes do
withStderrPipe appState \stderrHandle ->
runConduitRes do
let multipartUpload = (newCreateMultipartUpload (BucketName bucket) (ObjectKey objectKey) :: CreateMultipartUpload)
{ storageClass = Just StorageClass_REDUCED_REDUNDANCY }
result <-
packTar appState cacheRoot filesRelativeToCacheRoot
packTar appState stderrHandle cacheRoot filesRelativeToCacheRoot
.| Zstd.compress 3
.| streamUpload env Nothing multipartUpload
case result of
Expand Down Expand Up @@ -180,13 +181,14 @@ restoreCache appState settings cacheRoot archiveName logMode = do
logDebug appState $ "Remote cache archive not found s3://" <> bucket <> "/" <> objectKey
pure False

handling _NoSuchKey onNoSuchKey $ runConduitRes do
response <- AWS.send env $ newGetObject (BucketName bucket) (ObjectKey objectKey)
when (logMode == Log) do
liftIO $ logInfo appState $ "Found remote cache " <> archiveName <> ", restoring"
response.body.body
.| unpackTar appState cacheRoot
pure True
handling _NoSuchKey onNoSuchKey $ withStderrPipe appState \stderrHandle ->
runConduitRes do
response <- AWS.send env $ newGetObject (BucketName bucket) (ObjectKey objectKey)
when (logMode == Log) do
liftIO $ logInfo appState $ "Found remote cache " <> archiveName <> ", restoring"
response.body.body
.| unpackTar appState stderrHandle cacheRoot
pure True

getLatestBuildHash
:: AppState
Expand Down
1 change: 0 additions & 1 deletion src/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ data AppState = AppState
, snapshotArgsRef :: IORef (Maybe SnapshotCliArgs)
, skipped :: IORef Bool
, toplevelStderr :: Handle
, subprocessStderr :: Handle
, logOutput :: Handle
, quietBuffer :: IORef [ByteString]

Expand Down
52 changes: 41 additions & 11 deletions src/Utils.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@ import Universum
import qualified Data.ByteString.Char8 as B8
import Data.Time (getCurrentTime, formatTime, defaultTimeLocale)
import Types
import Control.Exception (throwIO)
import Control.Exception (throwIO, handle)
import System.IO.Error (isEOFError, IOError)
import Text.Printf (printf)
import Prelude (until)
import Data.List ((!!))
import System.Process (CreateProcess(..), StdStream (..), readCreateProcess)
import System.Process (CreateProcess(..), StdStream (..), readCreateProcess, createPipe)
import Data.Conduit.Process (proc)
import qualified Data.Text as Text
import GHC.IO.Handle (hDuplicate, hIsClosed)
import GHC.IO.Handle (hIsClosed)
import System.FilePath ((</>))
import Control.Concurrent.Async (async, wait)
import System.Timeout (timeout)

outputLine :: AppState -> Handle -> ByteString -> ByteString -> IO ()
outputLine appState toplevelOutput streamName line = do
Expand Down Expand Up @@ -87,10 +90,21 @@ bytesfmt formatter bs = printf (formatter <> " %s")
bytesSuffixes = ["B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB"]
bytesSuffix = bytesSuffixes !! i

-- | Create a per-subprocess stderr pipe that prefixes output with the job name.
-- The pipe is fully drained before returning.
withStderrPipe :: AppState -> (Handle -> IO a) -> IO a
withStderrPipe appState action = do
(readEnd, writeEnd) <- createPipe
handler <- async $ outputStreamHandler appState appState.toplevelStderr "stderr" readEnd
result <- action writeEnd `finally` do
hClose writeEnd
timeoutStream appState "stderr" $ wait handler
pure result

isDirtyAtPaths :: AppState -> [FilePath] -> IO Bool
isDirtyAtPaths _ [] = pure False
isDirtyAtPaths appState paths =
bracket (hDuplicate appState.subprocessStderr) hClose \stderr_ -> do
withStderrPipe appState \stderr_ -> do
output <-
readCreateProcess
(proc "git" (["status", "--porcelain", "--untracked-files=no", "--"] ++ paths))
Expand All @@ -101,7 +115,7 @@ isDirtyAtPaths appState paths =

getCurrentBranch :: AppState -> IO Text
getCurrentBranch appState =
bracket (hDuplicate appState.subprocessStderr) hClose \stderr_ ->
withStderrPipe appState \stderr_ ->
Text.strip . Text.pack <$> readCreateProcess
(proc "git" ["symbolic-ref", "--short", "HEAD"]) { std_err = UseHandle stderr_ }
""
Expand All @@ -112,18 +126,16 @@ getMainBranchCommit appState =
Nothing ->
pure Nothing
Just branch ->
bracket (hDuplicate appState.subprocessStderr) hClose \stderr_ ->
withStderrPipe appState \stderr_ ->
Just . Text.strip . Text.pack <$> readCreateProcess
(proc "git" ["merge-base", "HEAD", "origin/" <> toString branch]) { std_err = UseHandle stderr_ }
""

getCurrentCommit :: AppState -> IO Text
getCurrentCommit _appState =
-- TODO: fix: we can't use subprocessStderr here because it's used after closing output collector
-- Using normal stderr for now
-- bracket (hDuplicate appState.subprocessStderr) hClose \stderr_ ->
getCurrentCommit appState =
withStderrPipe appState \stderr_ ->
Text.strip . Text.pack <$> readCreateProcess
(proc "git" ["rev-parse", "HEAD"])
(proc "git" ["rev-parse", "HEAD"]) { std_err = UseHandle stderr_ }
""

logFileName :: Settings -> BuildId -> JobName -> FilePath
Expand All @@ -141,3 +153,21 @@ flushQuietBuffer appState toplevelOutput = do
-- | Discard buffered output (used when task succeeds in quiet mode)
discardQuietBuffer :: AppState -> IO ()
discardQuietBuffer appState = writeIORef appState.quietBuffer []

outputStreamHandler :: AppState -> Handle -> ByteString -> Handle -> IO ()
outputStreamHandler appState toplevelOutput streamName stream = do
handle ignoreEOF $ forever do
line <- B8.hGetLine stream
outputLine appState toplevelOutput streamName line

timeoutStream :: AppState -> Text -> IO () -> IO ()
timeoutStream appState streamName action = do
result <- timeout (appState.settings.outputStreamTimeout * 1000000) action
when (isNothing result) do
logWarn appState $ "taskrunner: Task did not close " <> streamName <> " " <> show appState.settings.outputStreamTimeout <> " seconds after exiting."
logWarn appState "Perhaps the file descriptor was leaked to a background process?"
logWarn appState "Build will continue despite this error, but some output may be lost."

ignoreEOF :: IOError -> IO ()
ignoreEOF e | isEOFError e = pure ()
| otherwise = throwIO e
6 changes: 6 additions & 0 deletions test/t/post-unpack-cmd.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-- output:
[mytask] info | Inputs changed, running task
[mytask] stdout | Main task output
[mytask] info | success
[mytask] stderr | post-unpack-stdout
[mytask] stderr | post-unpack-stderr
14 changes: 14 additions & 0 deletions test/t/post-unpack-cmd.txt
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This checks if post-unpack-cmd can use stdout and stderr

Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# no toplevel

echo "input data" > input.txt
git init -q
git add input.txt
git commit -qm "initial"

taskrunner -n mytask bash -e -c '
snapshot input.txt --outputs --cmd "
echo post-unpack-stdout
echo post-unpack-stderr >&2
"
echo "Main task output"
'
Loading