1+ import {
2+ authBegin ,
3+ authCodeResponse ,
4+ configureContext ,
5+ dataSourceFns ,
6+ defaultApiLimits ,
7+ importStages ,
8+ initialPagingContext , reportImportProblem ,
9+ testConfig as testConfigImpl
10+ } from './handlerConfig.js' ;
11+
12+ export const HandlerFunctionEnum = Object . freeze ( {
13+ testConfig : 'testConfig' ,
14+ importObjects : 'importObjects' ,
15+ readDataSource : 'readDataSource' ,
16+ oAuth2 : 'oAuth2'
17+ } ) ;
18+
19+ // ============================================================================
20+ //
21+ // testConfig
22+ //
23+ export async function testConfig ( event , api ) {
24+ const { pluginConfig } = event ;
25+ const { log, report, patchConfig, runtimeContext } = api ;
26+
27+ const context = {
28+ pluginConfig,
29+
30+ log, report, patchConfig, runtimeContext
31+ } ;
32+
33+ await configureContext ( context , HandlerFunctionEnum . testConfig ) ;
34+
35+ return testConfigImpl ( context ) ;
36+ }
37+
38+ // ============================================================================
39+ //
40+ // importObjects
41+ //
42+ export async function importObjects ( event , api ) {
43+ const { pluginConfig, pagingContext } = event ;
44+ const { log, report, patchConfig, runtimeContext } = api ;
45+
46+ const context = {
47+ vertices : [ ] , edges : [ ] ,
48+
49+ pluginConfig, pagingContext,
50+
51+ log, report, patchConfig, runtimeContext,
52+
53+ apiLimits : Object . assign ( { } , defaultApiLimits , pluginConfig . testSettings ?. apiLimits ?? { } )
54+ } ;
55+ const pageAPI = ( context ) => {
56+ return {
57+ get : ( key ) => context . pagingContext [ key ] ,
58+ set : ( key , value ) => { context . pagingContext [ key ] = value ; } ,
59+ clear : ( ) => { context . pagingContext = { } ; }
60+ } ;
61+ } ;
62+ context . pageAPI = pageAPI ( context ) ;
63+ context . reportImportProblem = reportImportProblem ( context ) ;
64+
65+ await configureContext ( context , HandlerFunctionEnum . importObjects ) ;
66+
67+ if ( Array . isArray ( importStages ) && importStages . length > 0 ) {
68+
69+ if ( ! context . pageAPI . get ( 'squaredUp_isInit' ) ) {
70+ // Set initial paging context values
71+ context . pageAPI . set ( 'squaredUp_stage' , 0 ) ;
72+ for ( const [ key , value ] of Object . entries ( initialPagingContext ) ) {
73+ context . pageAPI . set ( key , value ) ;
74+ }
75+ context . pageAPI . set ( 'squaredUp_isInit' , true ) ;
76+ }
77+
78+ // Run through the appropriate stages until we've been running for 10 minutes or we've created results larger than 2MB.
79+ const maxElapsedTimeMSecs = pluginConfig . testSettings ?. maxElapsedTimeMSecs ?? 10 * 60 * 1000 ;
80+ const maxPayloadSize = pluginConfig . testSettings ?. maxPayloadSize ?? 2 * 1024 * 1024 ;
81+ let stage = context . pageAPI . get ( 'squaredUp_stage' ) ;
82+ context . log . debug ( 'importObjects starts: ' +
83+ `stage=${ stage } , ` +
84+ `apiLimits=${ JSON . stringify ( context . apiLimits ) } , ` +
85+ `maxElapsedTimeMSecs=${ maxElapsedTimeMSecs } , ` +
86+ `maxPayloadSize=${ maxPayloadSize } ` ) ;
87+ const start = Date . now ( ) ;
88+ let elapsed ;
89+ let payloadSize ;
90+ let rateLimited = false ;
91+ do {
92+ context . pageAPI . set ( 'rateLimitDelay' , undefined ) ;
93+ if ( await importStages [ stage ] ( context ) ) {
94+ // Stage reported it has finished... step to the next one
95+ stage ++ ;
96+ context . pageAPI . set ( 'squaredUp_stage' , stage ) ;
97+
98+ if ( stage >= importStages . length ) {
99+ // No more stages, so set pagingContext to an empty object to
100+ // indicate import is complete
101+ context . pageAPI . clear ( ) ;
102+ break ;
103+ }
104+ }
105+ elapsed = Date . now ( ) - start ;
106+ const pagingContextSize = JSON . stringify ( context . pagingContext ) . length ;
107+ payloadSize = JSON . stringify ( { vertices : context . vertices , edges : context . edges , pagingContext : context . pagingContext } ) . length ;
108+ const rateLimitDelay = context . pageAPI . get ( 'rateLimitDelay' ) ?? 0 ;
109+ if ( rateLimitDelay ) {
110+ // Stage reported it was rate limited, so wait synchronously before continuing if we have time, otherwise
111+ // end this page of import and return the results so far.
112+ if ( elapsed + rateLimitDelay < maxElapsedTimeMSecs && payloadSize < maxPayloadSize ) {
113+ context . log . debug ( `importObjects rate limited: elapsed = ${ elapsed } , synchronously delaying ${ rateLimitDelay } msecs` ) ;
114+ await new Promise ( ( resolve ) => setTimeout ( resolve , rateLimitDelay ) ) ;
115+ elapsed = Date . now ( ) - start ;
116+ } else {
117+ context . log . debug ( `importObjects rate limited: elapsed = ${ elapsed } , ending page early` ) ;
118+ rateLimited = true ;
119+ }
120+ }
121+ context . log . debug ( `importObjects looping: elapsed = ${ elapsed } , payloadSize=${ payloadSize } , pagingContextSize=${ pagingContextSize } ` ) ;
122+ } while ( ! rateLimited && elapsed < maxElapsedTimeMSecs && payloadSize < maxPayloadSize ) ;
123+ context . log . debug ( 'importObjects loop ends' ) ;
124+ }
125+
126+ // Return the results
127+ const result = {
128+ vertices : context . vertices ,
129+ edges : context . edges ,
130+ pagingContext : context . pagingContext
131+ } ;
132+ return result ;
133+
134+ }
135+
136+ // ============================================================================
137+ //
138+ // readDataSource
139+ //
140+ export async function readDataSource ( event , api ) {
141+ const { pluginConfig, dataSource, dataSourceConfig, targetNodes, timeframe } = event ;
142+ const { log, report, patchConfig, runtimeContext } = api ;
143+
144+ const context = {
145+ pluginConfig, dataSource, dataSourceConfig, targetNodes, timeframe,
146+ log, report, patchConfig, runtimeContext
147+ } ;
148+
149+ await configureContext ( context , HandlerFunctionEnum . readDataSource ) ;
150+
151+ const dataSourceFn = dataSourceFns [ dataSource . name ] ;
152+ if ( ! dataSourceFn ) {
153+ throw new Error ( `No data source function was found for data source ${ dataSource . name } ` ) ;
154+ }
155+
156+ return dataSourceFn ( context ) ;
157+ }
158+
159+ // ============================================================================
160+ //
161+ // oAuth2
162+ //
163+ export async function oAuth2 ( { pluginConfig, dataSourceConfig, oAuth2Config } , { log, report, patchConfig } ) {
164+ const context = {
165+ pluginConfig,
166+ dataSourceConfig,
167+ oAuth2Config,
168+ log,
169+ report,
170+ patchConfig
171+ } ;
172+
173+ await configureContext ( context , HandlerFunctionEnum . oAuth2 ) ;
174+
175+ switch ( dataSourceConfig . oAuth2Stage ) {
176+ case 'oAuth2Begin' :
177+ return authBegin ( context ) ;
178+
179+ case 'oAuth2CodeResponse' :
180+ return authCodeResponse ( context ) ;
181+
182+ default :
183+ throw new Error ( `Invalid oAuth2Stage: "${ dataSourceConfig . oAuth2Stage } "` ) ;
184+ }
185+ }
0 commit comments