Skip to content

Commit fa6cc01

Browse files
authored
Merge pull request #22 from stackql-labs/claude/returning-callback-support-DbI9G
Add RETURNING * capture and callback polling support
2 parents 8da923b + 058389f commit fa6cc01

9 files changed

Lines changed: 1361 additions & 107 deletions

File tree

src/commands/base.rs

Lines changed: 203 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@ use crate::core::config::{get_full_context, render_globals, render_string_value}
1717
use crate::core::env::load_env_vars;
1818
use crate::core::templating::{self, ParsedQuery};
1919
use crate::core::utils::{
20-
catch_error_and_exit, check_exports_as_statecheck_proxy, export_vars, perform_retries,
21-
pull_providers, run_ext_script, run_stackql_command, run_stackql_query, show_query,
20+
catch_error_and_exit, check_exports_as_statecheck_proxy, check_short_circuit, export_vars,
21+
flatten_returning_row, has_returning_clause, perform_retries, pull_providers,
22+
run_callback_poll, run_ext_script, run_stackql_command, run_stackql_dml_returning,
23+
run_stackql_query, show_query,
2224
};
2325
use crate::resource::manifest::{Manifest, Resource};
2426
use crate::resource::validation::validate_manifest;
@@ -291,6 +293,9 @@ impl CommandRunner {
291293
}
292294

293295
/// Create a resource.
296+
///
297+
/// Returns `(created, returning_row)` where `returning_row` is `Some` when
298+
/// the create query included `RETURNING *` and the provider returned data.
294299
#[allow(clippy::too_many_arguments)]
295300
pub fn create_resource(
296301
&mut self,
@@ -301,30 +306,53 @@ impl CommandRunner {
301306
dry_run: bool,
302307
show_queries: bool,
303308
ignore_errors: bool,
304-
) -> bool {
309+
) -> (bool, Option<HashMap<String, String>>) {
305310
if dry_run {
306-
info!(
307-
"dry run create for [{}]:\n\n/* insert (create) query */\n{}\n",
308-
resource.name, create_query
309-
);
310-
return false;
311+
if has_returning_clause(create_query) {
312+
info!(
313+
"dry run create for [{}]:\n\n/* insert (create) query with RETURNING */\n{}\n\
314+
[dry run: RETURNING * capture skipped]\n",
315+
resource.name, create_query
316+
);
317+
} else {
318+
info!(
319+
"dry run create for [{}]:\n\n/* insert (create) query */\n{}\n",
320+
resource.name, create_query
321+
);
322+
}
323+
return (false, None);
311324
}
312325

313326
info!("[{}] does not exist, creating...", resource.name);
314327
show_query(show_queries, create_query);
315328

316-
let msg = run_stackql_command(
317-
create_query,
318-
&mut self.client,
319-
ignore_errors,
320-
retries,
321-
retry_delay,
322-
);
323-
debug!("Create response: {}", msg);
324-
true
329+
if has_returning_clause(create_query) {
330+
let (msg, returning_row) = run_stackql_dml_returning(
331+
create_query,
332+
&mut self.client,
333+
ignore_errors,
334+
retries,
335+
retry_delay,
336+
);
337+
debug!("Create response: {}", msg);
338+
(true, returning_row)
339+
} else {
340+
let msg = run_stackql_command(
341+
create_query,
342+
&mut self.client,
343+
ignore_errors,
344+
retries,
345+
retry_delay,
346+
);
347+
debug!("Create response: {}", msg);
348+
(true, None)
349+
}
325350
}
326351

327352
/// Update a resource.
353+
///
354+
/// Returns `(updated, returning_row)` where `returning_row` is `Some` when
355+
/// the update query included `RETURNING *` and the provider returned data.
328356
#[allow(clippy::too_many_arguments)]
329357
pub fn update_resource(
330358
&mut self,
@@ -335,41 +363,64 @@ impl CommandRunner {
335363
dry_run: bool,
336364
show_queries: bool,
337365
ignore_errors: bool,
338-
) -> bool {
366+
) -> (bool, Option<HashMap<String, String>>) {
339367
match update_query {
340368
Some(query) => {
341369
if dry_run {
342-
info!(
343-
"dry run update for [{}]:\n\n/* update query */\n{}\n",
344-
resource.name, query
345-
);
346-
return false;
370+
if has_returning_clause(query) {
371+
info!(
372+
"dry run update for [{}]:\n\n/* update query with RETURNING */\n{}\n\
373+
[dry run: RETURNING * capture skipped]\n",
374+
resource.name, query
375+
);
376+
} else {
377+
info!(
378+
"dry run update for [{}]:\n\n/* update query */\n{}\n",
379+
resource.name, query
380+
);
381+
}
382+
return (false, None);
347383
}
348384

349385
info!("updating [{}]...", resource.name);
350386
show_query(show_queries, query);
351387

352-
let msg = run_stackql_command(
353-
query,
354-
&mut self.client,
355-
ignore_errors,
356-
retries,
357-
retry_delay,
358-
);
359-
debug!("Update response: {}", msg);
360-
true
388+
if has_returning_clause(query) {
389+
let (msg, returning_row) = run_stackql_dml_returning(
390+
query,
391+
&mut self.client,
392+
ignore_errors,
393+
retries,
394+
retry_delay,
395+
);
396+
debug!("Update response: {}", msg);
397+
(true, returning_row)
398+
} else {
399+
let msg = run_stackql_command(
400+
query,
401+
&mut self.client,
402+
ignore_errors,
403+
retries,
404+
retry_delay,
405+
);
406+
debug!("Update response: {}", msg);
407+
(true, None)
408+
}
361409
}
362410
None => {
363411
info!(
364412
"Update query not configured for [{}], skipping update...",
365413
resource.name
366414
);
367-
false
415+
(false, None)
368416
}
369417
}
370418
}
371419

372420
/// Delete a resource.
421+
///
422+
/// Returns `Some(first_row)` when the delete query included `RETURNING *`
423+
/// and the provider returned data; `None` otherwise.
373424
#[allow(clippy::too_many_arguments)]
374425
pub fn delete_resource(
375426
&mut self,
@@ -380,26 +431,136 @@ impl CommandRunner {
380431
dry_run: bool,
381432
show_queries: bool,
382433
ignore_errors: bool,
434+
) -> Option<HashMap<String, String>> {
435+
if dry_run {
436+
if has_returning_clause(delete_query) {
437+
info!(
438+
"dry run delete for [{}]:\n\n{}\n[dry run: RETURNING * capture skipped]\n",
439+
resource.name, delete_query
440+
);
441+
} else {
442+
info!(
443+
"dry run delete for [{}]:\n\n{}\n",
444+
resource.name, delete_query
445+
);
446+
}
447+
return None;
448+
}
449+
450+
info!("deleting [{}]...", resource.name);
451+
show_query(show_queries, delete_query);
452+
453+
if has_returning_clause(delete_query) {
454+
let (msg, returning_row) = run_stackql_dml_returning(
455+
delete_query,
456+
&mut self.client,
457+
ignore_errors,
458+
retries,
459+
retry_delay,
460+
);
461+
debug!("Delete response: {}", msg);
462+
returning_row
463+
} else {
464+
let msg = run_stackql_command(
465+
delete_query,
466+
&mut self.client,
467+
ignore_errors,
468+
retries,
469+
retry_delay,
470+
);
471+
debug!("Delete response: {}", msg);
472+
None
473+
}
474+
}
475+
476+
// -----------------------------------------------------------------------
477+
// RETURNING * capture and callback support
478+
// -----------------------------------------------------------------------
479+
480+
/// Store a RETURNING * row for `resource_name` in the global context.
481+
///
482+
/// Flat keys (`callback.{field}`, `{resource_name}.callback.{field}`) are
483+
/// inserted so they are accessible from subsequent template renders:
484+
/// - The unscoped `callback.*` form is available to the resource's own
485+
/// `.iql` templates (and is overwritten by the next DML that has
486+
/// RETURNING *).
487+
/// - The scoped `{resource_name}.callback.*` form is available to any
488+
/// downstream resource (written once, never overwritten).
489+
pub fn store_callback_data(
490+
&mut self,
491+
resource_name: &str,
492+
returning_row: &HashMap<String, String>,
493+
) {
494+
info!(
495+
"storing RETURNING * result for [{}] in callback context",
496+
resource_name
497+
);
498+
flatten_returning_row(returning_row, resource_name, &mut self.global_context);
499+
}
500+
501+
/// Execute a callback block associated with a DML operation.
502+
///
503+
/// 1. If `short_circuit_field` is set and the field in the current context
504+
/// equals `short_circuit_value`, skip polling.
505+
/// 2. Otherwise poll the callback query up to `retries` times.
506+
/// 3. On exhaustion, call `catch_error_and_exit`.
507+
///
508+
/// `operation` is used only for log messages (e.g. `"create"`).
509+
#[allow(clippy::too_many_arguments)]
510+
pub fn run_callback(
511+
&mut self,
512+
resource: &Resource,
513+
callback_query: &str,
514+
retries: u32,
515+
retry_delay: u32,
516+
short_circuit_field: Option<&str>,
517+
short_circuit_value: Option<&str>,
518+
operation: &str,
519+
dry_run: bool,
520+
show_queries: bool,
383521
) {
384522
if dry_run {
385523
info!(
386-
"dry run delete for [{}]:\n\n{}\n",
387-
resource.name, delete_query
524+
"dry run callback ({}) for [{}]:\n\n/* callback query */\n{}\n\
525+
[dry run: callback polling skipped]\n",
526+
operation, resource.name, callback_query
388527
);
389528
return;
390529
}
391530

392-
info!("deleting [{}]...", resource.name);
393-
show_query(show_queries, delete_query);
531+
// Short-circuit check.
532+
if let (Some(field), Some(expected)) = (short_circuit_field, short_circuit_value) {
533+
if check_short_circuit(&self.global_context, field, expected) {
534+
info!(
535+
"[{}] {} callback short-circuited (field '{}' = '{}')",
536+
resource.name, operation, field, expected
537+
);
538+
return;
539+
}
540+
}
394541

395-
let msg = run_stackql_command(
396-
delete_query,
397-
&mut self.client,
398-
ignore_errors,
542+
info!("running {} callback for [{}]...", operation, resource.name);
543+
show_query(show_queries, callback_query);
544+
545+
let succeeded = run_callback_poll(
546+
&resource.name,
547+
callback_query,
399548
retries,
400549
retry_delay,
550+
&mut self.client,
551+
);
552+
553+
if !succeeded {
554+
catch_error_and_exit(&format!(
555+
"callback timeout for [{}] {} operation after {} retries",
556+
resource.name, operation, retries
557+
));
558+
}
559+
560+
info!(
561+
"[{}] {} callback completed successfully",
562+
resource.name, operation
401563
);
402-
debug!("Delete response: {}", msg);
403564
}
404565

405566
/// Run a command-type query.

0 commit comments

Comments
 (0)