-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmain.py
More file actions
142 lines (110 loc) · 4.41 KB
/
main.py
File metadata and controls
142 lines (110 loc) · 4.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# SPDX-FileCopyrightText: 2023-2024 DeepLime <contact@deeplime.io>
# SPDX-License-Identifier: MIT
#####################################
# !! DO NOT EDIT THIS FILE !! #
#####################################
import argparse
import json
import logging
import os
from importlib import import_module
from typing import Dict, List
from onecode import (
ConfigOption,
Env,
Logger,
Mode,
Project,
check_modules,
get_imported_modules,
register_ext_module
)
def main(
data: Dict = None,
flow_name: str = None,
logger: logging.Handler = None
):
"""
Starts the OneCode project with the given options.
Args:
data: Initialize the Project with data before execution (typically
`Mode.LOAD_THEN_EXECUTE`).
flow_name: Execute only the flow specified by its ID. If None provided, all flows will run.
logger: Add a logging handler to the OneCode Logger.
Raises:
FileNotFoundError: if the OneCode project configuration file is not found.
"""
cur_dir = os.path.dirname(__file__)
config_file = os.path.join(cur_dir, Env.ONECODE_CONFIG_FILE)
if not os.path.exists(config_file):
raise FileNotFoundError(
"""
Missing OneCode config files, it looks like someone tampered with the wrong file...
You may recreate the files according to the documentation specs.
"""
)
# register elements from OneCode inline extensions if any
globals()['onecode_ext'] = register_ext_module()
Project().data = data
Logger().reset()
Logger().add_handler(logger)
# check all packages are present in current Python env
if Project().get_config(ConfigOption.CHECK_MODULES):
Logger.info("Checking required dependencies are in Python environment...")
Logger.info("(Use Environment variable ONECODE_FLAG_CHECK_MODULES=0 to turn it off)")
modules = check_modules(
modules=get_imported_modules(cur_dir),
requirements_file=os.path.join(cur_dir, 'requirements.txt')
)
warn_mods = [m.get("msg") for _, m in modules.items() if m.get("msg") is not None]
if len(warn_mods) > 0:
Logger.warning(
"The following libraries may be missing from your Python environment or"
" mismatch the requirements."
)
for msg in warn_mods:
Logger.warning(msg)
Logger.warning(
"Run 'onecode-require requirements.txt' then 'pip install -r requirements.txt'"
" to try to fix it."
)
# start workflow
with open(config_file) as f:
workflows = json.load(f)
all_manifests = []
for wfl in workflows:
flow_file = wfl['file']
if flow_name is None or flow_name == flow_file:
if not os.path.exists(os.path.join(cur_dir, 'flows', f'{flow_file}.py')):
print(f"Registered flow {wfl['label']} ({flow_file}.py) doesn't exist => skipping")
else:
Project().current_flow = flow_file
# clear any previous MANIFEST.txt output
manifest = Project().get_output_manifest()
if os.path.exists(manifest):
os.remove(manifest)
flow = import_module(f"flows.{flow_file}")
flow.run()
all_manifests.append(manifest)
return all_manifests[0] if len(all_manifests) == 1 else all_manifests
def _main(raw_args: List[str] = None):
parser = argparse.ArgumentParser(description='Use optional JSON parameters file')
parser.add_argument('--flow', default=None, help='Specify the flow to run')
parser.add_argument('--flush', action="store_true", help='Flush the logs immediately')
parser.add_argument('file', nargs='?', help='Path to the input JSON file')
args = parser.parse_args(raw_args)
data = None
if args.file is not None:
if not os.path.exists(args.file):
raise FileNotFoundError(f'Input parameters file {args.file} does not exist')
print(f'Using provided parameter file: {args.file}')
with open(args.file, 'r') as f:
data = json.load(f)
Project().mode = Mode.LOAD_THEN_EXECUTE
else:
Project().mode = Mode.EXECUTE
if args.flush:
Project().set_config(ConfigOption.FLUSH_STDOUT, True)
main(data, args.flow)
if __name__ == '__main__':
_main()