11"""The composition function's main CLI."""
22
3- import warnings
4- warnings .filterwarnings ('ignore' , module = '^google[.]protobuf[.]runtime_version$' , lineno = 98 )
5-
63import argparse
4+ import asyncio
75import os
6+ import pathlib
87import shlex
8+ import signal
99import sys
10+ import traceback
11+
12+ import crossplane .function .logging
13+ import crossplane .function .proto .v1 .run_function_pb2_grpc as grpcv1
14+ import grpc
1015import pip ._internal .cli .main
11- from crossplane .function import logging , runtime
1216
1317from . import function
1418
1519
16- def main ():
20+ async def main ():
1721 parser = argparse .ArgumentParser ('Forta Crossplane Function' )
1822 parser .add_argument (
1923 '--debug' , '-d' ,
@@ -27,20 +31,38 @@ def main():
2731 )
2832 parser .add_argument (
2933 '--tls-certs-dir' ,
34+ default = os .getenv ('TLS_SERVER_CERTS_DIR' ),
3035 help = 'Serve using mTLS certificates.' ,
3136 )
3237 parser .add_argument (
3338 '--insecure' ,
3439 action = 'store_true' ,
3540 help = 'Run without mTLS credentials. If you supply this flag --tls-certs-dir will be ignored.' ,
3641 )
42+ parser .add_argument (
43+ '--packages' ,
44+ action = 'store_true' ,
45+ help = 'Discover python packages from function-pythonic ConfigMaps and Secrets.'
46+ )
47+ parser .add_argument (
48+ '--packages-namespace' ,
49+ action = 'append' ,
50+ default = [],
51+ help = 'Namespaces to discover function-pythonic ConfigMaps and Secrets in, default is cluster wide.' ,
52+ )
53+ parser .add_argument (
54+ '--packages-dir' ,
55+ default = './pythonic-packages' ,
56+ help = 'Directory to store discovered function-pythonic ConfigMaps and Secrets to, defaults "<cwd>/pythonic-packages"'
57+ )
3758 parser .add_argument (
3859 '--pip-install' ,
3960 help = 'Pip install command to install additional Python packages.'
4061 )
4162 parser .add_argument (
4263 '--python-path' ,
4364 action = 'append' ,
65+ default = [],
4466 help = 'Filing system directories to add to the python path' ,
4567 )
4668 parser .add_argument (
@@ -49,33 +71,81 @@ def main():
4971 help = 'Allow oversized protobuf messages'
5072 )
5173 args = parser .parse_args ()
52- if not args .tls_certs_dir :
53- args .tls_certs_dir = os .getenv ('TLS_SERVER_CERTS_DIR' )
74+
75+ if args .debug :
76+ crossplane .function .logging .configure (crossplane .function .logging .Level .DEBUG )
77+ else :
78+ crossplane .function .logging .configure (crossplane .function .logging .Level .INFO )
5479
5580 if args .pip_install :
5681 pip ._internal .cli .main .main (['install' , * shlex .split (args .pip_install )])
5782
58- if args .python_path :
59- for path in reversed (args .python_path ):
60- sys .path .insert (0 , path )
83+ # enables read only volumes or mismatched uid volumes
84+ sys .dont_write_bytecode = True
85+ for path in reversed (args .python_path ):
86+ sys .path .insert (0 , str (pathlib .Path (path ).resolve ()))
6187
6288 if args .allow_oversize_protos :
6389 from google .protobuf .internal import api_implementation
6490 if api_implementation ._c_module :
6591 api_implementation ._c_module .SetAllowOversizeProtos (True )
6692
67- logging .configure (logging .Level .DEBUG if args .debug else logging .Level .INFO )
68- runtime .serve (
69- function .FunctionRunner (args .debug ),
70- args .address ,
71- creds = runtime .load_credentials (args .tls_certs_dir ),
72- insecure = args .insecure ,
73- )
93+ grpc .aio .init_grpc_aio ()
94+ grpc_runner = function .FunctionRunner (args .debug )
95+ grpc_server = grpc .aio .server ()
96+ grpcv1 .add_FunctionRunnerServiceServicer_to_server (grpc_runner , grpc_server )
97+ if args .tls_certs_dir :
98+ certs = pathlib .Path (args .tls_certs_dir )
99+ grpc_server .add_secure_port (
100+ args .address ,
101+ grpc .ssl_server_credentials (
102+ private_key_certificate_chain_pairs = [(
103+ (certs / 'tls.key' ).read_bytes (),
104+ (certs / 'tls.crt' ).read_bytes (),
105+ )],
106+ root_certificates = (certs / 'ca.crt' ).read_bytes (),
107+ require_client_auth = True ,
108+ ),
109+ )
110+ else :
111+ if not args .insecure :
112+ raise ValueError ('Either --tls-certs-dir or --insecure must be specified' )
113+ grpc_server .add_insecure_port (args .address )
114+ await grpc_server .start ()
115+
116+ if args .packages :
117+ import kopf ._core .actions .loggers
118+ import kopf ._core .reactor .running
119+ from . import packages
120+ packages_dir = pathlib .Path (args .packages_dir ).expanduser ().resolve ()
121+ sys .path .insert (0 , str (packages_dir ))
122+ packages .setup (packages_dir , grpc_runner )
123+ kopf ._core .actions .loggers .configure ()
124+ @kopf .on .startup ()
125+ async def startup (settings , ** _ ):
126+ settings .scanning .disabled = True
127+ @kopf .on .cleanup ()
128+ async def cleanup (logger = None , ** _ ):
129+ await grpc_server .stop (5 )
130+ async with asyncio .TaskGroup () as tasks :
131+ tasks .create_task (grpc_server .wait_for_termination ())
132+ tasks .create_task (kopf ._core .reactor .running .operator (
133+ standalone = True ,
134+ clusterwide = not args .packages_namespace ,
135+ namespaces = args .packages_namespace ,
136+ ))
137+ else :
138+ def stop ():
139+ asyncio .ensure_future (grpc_server .stop (5 ))
140+ loop = asyncio .get_event_loop ()
141+ loop .add_signal_handler (signal .SIGINT , stop )
142+ loop .add_signal_handler (signal .SIGTERM , stop )
143+ await grpc_server .wait_for_termination ()
74144
75145
76146if __name__ == "__main__" :
77147 try :
78- main ()
79- except Exception as e :
80- print (f"Exception running main: { e } " , file = sys . stderr )
148+ asyncio . run ( main () )
149+ except :
150+ print (traceback . format_exc () )
81151 sys .exit (1 )
0 commit comments