11"""The composition function's main CLI."""
22
3- import warnings
4- warnings .filterwarnings ('ignore' , module = '^google[.]protobuf[.]runtime_version$' , lineno = 98 )
5-
63import argparse
4+ import asyncio
75import os
6+ import pathlib
87import shlex
8+ import signal
99import sys
10+ import traceback
11+
12+ import crossplane .function .logging
13+ import crossplane .function .proto .v1 .run_function_pb2_grpc as grpcv1
14+ import grpc
1015import pip ._internal .cli .main
11- from crossplane .function import logging , runtime
1216
1317from . import function
1418
1519
16- def main ():
20+ async def main ():
1721 parser = argparse .ArgumentParser ('Forta Crossplane Function' )
1822 parser .add_argument (
1923 '--debug' , '-d' ,
@@ -27,20 +31,33 @@ def main():
2731 )
2832 parser .add_argument (
2933 '--tls-certs-dir' ,
34+ default = os .getenv ('TLS_SERVER_CERTS_DIR' ),
3035 help = 'Serve using mTLS certificates.' ,
3136 )
3237 parser .add_argument (
3338 '--insecure' ,
3439 action = 'store_true' ,
3540 help = 'Run without mTLS credentials. If you supply this flag --tls-certs-dir will be ignored.' ,
3641 )
42+ parser .add_argument (
43+ '--packages' ,
44+ action = 'store_true' ,
45+ help = 'Discover python packages from function-pythonic ConfigMaps and Secrets.'
46+ )
47+ parser .add_argument (
48+ '--packages-namespace' ,
49+ action = 'append' ,
50+ default = [],
51+ help = 'Namespaces to discover function-pythonic ConfigMaps and Secrets in, default is cluster wide.' ,
52+ )
3753 parser .add_argument (
3854 '--pip-install' ,
3955 help = 'Pip install command to install additional Python packages.'
4056 )
4157 parser .add_argument (
4258 '--python-path' ,
4359 action = 'append' ,
60+ default = [],
4461 help = 'Filing system directories to add to the python path' ,
4562 )
4663 parser .add_argument (
@@ -49,33 +66,80 @@ def main():
4966 help = 'Allow oversized protobuf messages'
5067 )
5168 args = parser .parse_args ()
52- if not args .tls_certs_dir :
53- args .tls_certs_dir = os .getenv ('TLS_SERVER_CERTS_DIR' )
69+
70+ if args .debug :
71+ crossplane .function .logging .configure (crossplane .function .logging .Level .DEBUG )
72+ else :
73+ crossplane .function .logging .configure (crossplane .function .logging .Level .INFO )
5474
5575 if args .pip_install :
5676 pip ._internal .cli .main .main (['install' , * shlex .split (args .pip_install )])
5777
58- if args .python_path :
59- for path in reversed (args .python_path ):
60- sys .path .insert (0 , path )
78+ # enables read only volumes or mismatched uid volumes
79+ sys .dont_write_bytecode = True
80+ for path in reversed (args .python_path ):
81+ sys .path .insert (0 , path )
6182
6283 if args .allow_oversize_protos :
6384 from google .protobuf .internal import api_implementation
6485 if api_implementation ._c_module :
6586 api_implementation ._c_module .SetAllowOversizeProtos (True )
6687
67- logging .configure (logging .Level .DEBUG if args .debug else logging .Level .INFO )
68- runtime .serve (
69- function .FunctionRunner (args .debug ),
70- args .address ,
71- creds = runtime .load_credentials (args .tls_certs_dir ),
72- insecure = args .insecure ,
73- )
88+ grpc .aio .init_grpc_aio ()
89+ grpc_runner = function .FunctionRunner (args .debug )
90+ grpc_server = grpc .aio .server ()
91+ grpcv1 .add_FunctionRunnerServiceServicer_to_server (grpc_runner , grpc_server )
92+ if args .tls_certs_dir :
93+ certs = pathlib .Path (args .tls_certs_dir )
94+ grpc_server .add_secure_port (
95+ args .address ,
96+ grpc .ssl_server_credentials (
97+ private_key_certificate_chain_pairs = [(
98+ (certs / 'tls.key' ).read_bytes (),
99+ (certs / 'tls.crt' ).read_bytes (),
100+ )],
101+ root_certificates = (certs / 'ca.crt' ).read_bytes (),
102+ require_client_auth = True ,
103+ ),
104+ )
105+ else :
106+ if not args .insecure :
107+ raise ValueError ('Either --tls-certs-dir or --insecure must be specified' )
108+ grpc_server .add_insecure_port (args .address )
109+ await grpc_server .start ()
110+
111+ if args .packages :
112+ import kopf ._core .actions .loggers
113+ import kopf ._core .reactor .running
114+ from . import packages
115+ sys .path .insert (0 , str (packages .PACKAGES_DIR ))
116+ packages .register_grpc_runner (grpc_runner )
117+ kopf ._core .actions .loggers .configure ()
118+ @kopf .on .startup ()
119+ async def startup (settings , ** _ ):
120+ settings .scanning .disabled = True
121+ @kopf .on .cleanup ()
122+ async def cleanup (logger = None , ** _ ):
123+ await grpc_server .stop (5 )
124+ async with asyncio .TaskGroup () as tasks :
125+ tasks .create_task (grpc_server .wait_for_termination ())
126+ tasks .create_task (kopf ._core .reactor .running .operator (
127+ standalone = True ,
128+ clusterwide = not args .packages_namespace ,
129+ namespaces = args .packages_namespace ,
130+ ))
131+ else :
132+ def stop ():
133+ asyncio .ensure_future (grpc_server .stop (5 ))
134+ loop = asyncio .get_event_loop ()
135+ loop .add_signal_handler (signal .SIGINT , stop )
136+ loop .add_signal_handler (signal .SIGTERM , stop )
137+ await grpc_server .wait_for_termination ()
74138
75139
76140if __name__ == "__main__" :
77141 try :
78- main ()
79- except Exception as e :
80- print (f"Exception running main: { e } " , file = sys . stderr )
142+ asyncio . run ( main () )
143+ except :
144+ print (traceback . format_exc () )
81145 sys .exit (1 )
0 commit comments