Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions docs/catalog-import-export.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Catalog import / export

Export and import the **catalog registry** (namespace and table metadata pointers in etcd). Table data stays in object storage (S3/MinIO/GCS); only registry keys are moved.

**Requires:** etcd-backed `ice-rest-catalog`. Configure the CLI with `.ice.yaml` (`uri`, optional `bearerToken`).

## Export

```bash
ice catalog-export -o catalog-snapshot.json
```

| Option | Description |
|--------|-------------|
| `-o`, `--output` | Output file (`-` or omit for stdout) |
| `--namespace` | Export one namespace and its tables only (e.g. `flowers`) |

Example (single namespace to stdout):

```bash
ice catalog-export --namespace flowers
```

Snapshot JSON fields: `version`, `catalog_name`, `exported_at`, `namespaces[]`, `tables[]` (each entry has `key` and `value`).

## Import

```bash
ice catalog-import -i catalog-snapshot.json
```

| Option | Description |
|--------|-------------|
| `-i`, `--input` | Input file (`-` or omit for stdin) |
| `--dry-run` | Preview only; no writes |
| `--overwrite` | Replace existing keys (default: skip existing) |

Preview changes:

```bash
ice catalog-import -i catalog-snapshot.json --dry-run
```

Import result JSON: `created`, `skipped`, `overwritten`, `catalog_name`, `exported_at`.

Pipe export → import:

```bash
ice catalog-export | ice catalog-import -
```

## REST API

Same operations on the main catalog server (`addr` in `.ice-rest-catalog.yaml`). Not exposed on optional `adminAddr`.

| Method | Path | Description |
|--------|------|-------------|
| `GET` | `/admin/v1/catalog-export` | Export; optional query `namespace` |
| `POST` | `/admin/v1/catalog-import` | Import; body = snapshot JSON; query `dry-run`, `overwrite` |

Example:

```bash
curl -s http://localhost:8181/admin/v1/catalog-export | jq .
curl -s -X POST "http://localhost:8181/admin/v1/catalog-import?dry-run=true" \
-H "Content-Type: application/json" \
-d @catalog-snapshot.json | jq .
```

## Typical uses

- Move catalog metadata between environments (dev → staging)
- Registry backup lighter than a full etcd snapshot
- Clone a namespace into another catalog instance

For full etcd cluster backup/restore, see [etcd backup & restore (3-node)](etcd-backup-restore-upgrade-3-node.md).
1 change: 1 addition & 0 deletions ice-rest-catalog/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ If `enabled` is true but the catalog backend is not etcd, the lock is ignored (w
- [GCS Setup](../docs/ice-rest-catalog-gcs.md) -- configuring ice-rest-catalog with Google Cloud Storage
- [etcd Backend Schema](../docs/etcd-backend-schema.md) -- etcd key/value schema (`n/`, `t/` prefixes) and mapping to SQLite
- [SQLite Backend Schema](../docs/sqlite-backend-schema.md) -- SQLite tables (`iceberg_tables`, `iceberg_namespace_properties`)
- [Catalog Import/Export](../docs/catalog-import-export.md) -- export and import catalog registry (namespaces and table metadata pointers) via CLI or REST API
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.altinity.ice.rest.catalog.internal.maintenance.SnapshotCleanup;
import com.altinity.ice.rest.catalog.internal.metrics.CatalogMetrics;
import com.altinity.ice.rest.catalog.internal.metrics.PrometheusMetricsReporter;
import com.altinity.ice.rest.catalog.internal.rest.CatalogAdminServlet;
import com.altinity.ice.rest.catalog.internal.rest.RESTCatalogAdapter;
import com.altinity.ice.rest.catalog.internal.rest.RESTCatalogAuthorizationHandler;
import com.altinity.ice.rest.catalog.internal.rest.RESTCatalogHandler;
Expand Down Expand Up @@ -237,7 +238,7 @@ static Server createServer(
Config config,
Map<String, String> icebergConfig,
PrometheusMetricsReporter metricsReporter) {
var s = createBaseServer(catalog, config, icebergConfig, true, metricsReporter);
var s = createBaseServer(catalog, config, icebergConfig, true, true, metricsReporter);
ServerConnector connector = new ServerConnector(s);
connector.setHost(host);
connector.setPort(port);
Expand All @@ -252,7 +253,7 @@ private static Server createAdminServer(
Config config,
Map<String, String> icebergConfig,
PrometheusMetricsReporter metricsReporter) {
var s = createBaseServer(catalog, config, icebergConfig, false, metricsReporter);
var s = createBaseServer(catalog, config, icebergConfig, false, false, metricsReporter);
ServerConnector connector = new ServerConnector(s);
connector.setHost(host);
connector.setPort(port);
Expand All @@ -265,6 +266,7 @@ private static Server createBaseServer(
Config config,
Map<String, String> icebergConfig,
boolean requireAuth,
boolean registerAdminServlet,
PrometheusMetricsReporter metricsReporter) {
var mux = new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
mux.insertHandler(new GzipHandler());
Expand Down Expand Up @@ -337,6 +339,11 @@ private static Server createBaseServer(
var h = new ServletHolder(new RESTCatalogServlet(restCatalogAdapter));
mux.addServlet(h, "/*");

if (registerAdminServlet) {
var adminServlet = new ServletHolder(new CatalogAdminServlet(catalog, config.name()));
mux.addServlet(adminServlet, "/admin/*");
}

var s = new Server();
overrideJettyDefaults(s);
s.setHandler(mux);
Expand Down Expand Up @@ -496,7 +503,7 @@ public Integer call() throws Exception {
icebergConfig,
metricsReporter);
adminServer.start();
logger.warn("Serving admin endpoint at http://{}/v1/{config,*}", adminHostAndPort);
logger.warn("Serving admin endpoint at http://{}/v1/{{config,*}}", adminHostAndPort);
}

HostAndPort hostAndPort = HostAndPort.fromString(config.addr());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Copyright (c) 2025 Altinity Inc and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package com.altinity.ice.rest.catalog.internal.cmd;

import com.altinity.ice.internal.strings.Strings;
import com.altinity.ice.rest.catalog.internal.etcd.EtcdCatalog;
import com.altinity.ice.rest.catalog.internal.rest.RESTObjectMapper;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.exceptions.RuntimeIOException;

/** Catalog registry export/import for etcd-backed catalogs. */
public final class CatalogAdminService {

private CatalogAdminService() {}

public static CatalogSnapshot export(
Catalog catalog, String catalogName, String namespaceFilter) {
EtcdCatalog etcdCatalog = requireEtcdCatalog(catalog);
List<CatalogSnapshot.NamespaceEntry> namespaces = new ArrayList<>();
for (EtcdCatalog.CatalogKv kv : etcdCatalog.listAllNamespaceKvs()) {
if (!matchesNamespaceFilter(kv.key(), namespaceFilter, etcdCatalog)) {
continue;
}
namespaces.add(new CatalogSnapshot.NamespaceEntry(kv.key(), parseJsonMap(kv.value())));
}

List<CatalogSnapshot.TableEntry> tables = new ArrayList<>();
for (EtcdCatalog.CatalogKv kv : etcdCatalog.listAllTableKvs(namespaceFilter)) {
tables.add(new CatalogSnapshot.TableEntry(kv.key(), parseJsonMap(kv.value())));
}

return new CatalogSnapshot(
CatalogSnapshot.CURRENT_VERSION, catalogName, Instant.now().toString(), namespaces, tables);
}

public static CatalogImportResult importSnapshot(
Catalog catalog, CatalogSnapshot snapshot, boolean dryRun, boolean overwrite) {
EtcdCatalog etcdCatalog = requireEtcdCatalog(catalog);

if (snapshot.version() != CatalogSnapshot.CURRENT_VERSION) {
throw new IllegalArgumentException(
"Unsupported snapshot version: "
+ snapshot.version()
+ " (expected "
+ CatalogSnapshot.CURRENT_VERSION
+ ")");
}

int created = 0;
int skipped = 0;
int overwritten = 0;

if (snapshot.namespaces() != null) {
for (CatalogSnapshot.NamespaceEntry entry : snapshot.namespaces()) {
EtcdCatalog.PutCatalogKvResult result =
etcdCatalog.putCatalogKv(entry.key(), marshal(entry.value()), overwrite, dryRun);
switch (result) {
case CREATED -> created++;
case SKIPPED -> skipped++;
case OVERWRITTEN -> overwritten++;
}
}
}
if (snapshot.tables() != null) {
for (CatalogSnapshot.TableEntry entry : snapshot.tables()) {
EtcdCatalog.PutCatalogKvResult result =
etcdCatalog.putCatalogKv(entry.key(), marshal(entry.value()), overwrite, dryRun);
switch (result) {
case CREATED -> created++;
case SKIPPED -> skipped++;
case OVERWRITTEN -> overwritten++;
}
}
}

return new CatalogImportResult(
created, skipped, overwritten, snapshot.catalogName(), snapshot.exportedAt());
}

private static EtcdCatalog requireEtcdCatalog(Catalog catalog) {
if (!(catalog instanceof EtcdCatalog etcdCatalog)) {
throw new IllegalArgumentException(
"Catalog export/import requires an etcd-backed catalog (uri: etcd:... in config)");
}
return etcdCatalog;
}

private static boolean matchesNamespaceFilter(
String namespaceKey, String namespaceFilter, EtcdCatalog catalog) {
if (Strings.isNullOrEmpty(namespaceFilter)) {
return true;
}
String prefix = catalogPrefixForFilter(catalog) + "n/";
if (!namespaceKey.startsWith(prefix)) {
return false;
}
String path = namespaceKey.substring(prefix.length());
return path.equals(namespaceFilter) || path.startsWith(namespaceFilter + "/");
}

private static String catalogPrefixForFilter(EtcdCatalog catalog) {
if ("default".equals(catalog.name())) {
return "";
}
return catalog.name() + "/";
}

private static Map<String, String> parseJsonMap(String json) {
try {
return RESTObjectMapper.mapper().readValue(json, new TypeReference<>() {});
} catch (JsonProcessingException e) {
throw new RuntimeIOException(e);
}
}

private static String marshal(Map<String, String> value) {
try {
return RESTObjectMapper.mapper().writeValueAsString(value);
} catch (JsonProcessingException e) {
throw new RuntimeIOException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright (c) 2025 Altinity Inc and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package com.altinity.ice.rest.catalog.internal.cmd;

import com.fasterxml.jackson.annotation.JsonProperty;

/** Summary returned by catalog import. */
public record CatalogImportResult(
int created,
int skipped,
int overwritten,
@JsonProperty("catalog_name") String catalogName,
@JsonProperty("exported_at") String exportedAt) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2025 Altinity Inc and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package com.altinity.ice.rest.catalog.internal.cmd;

import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import java.util.Map;

/** Portable JSON snapshot of etcd catalog registry keys ({@code n/} and {@code t/}). */
public record CatalogSnapshot(
int version,
@JsonProperty("catalog_name") String catalogName,
@JsonProperty("exported_at") String exportedAt,
List<NamespaceEntry> namespaces,
List<TableEntry> tables) {

public static final int CURRENT_VERSION = 1;

public record NamespaceEntry(String key, Map<String, String> value) {}

public record TableEntry(String key, Map<String, String> value) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -87,6 +88,64 @@ public Client etcdClient() {
return client;
}

/** UTF-8 etcd key and JSON value for catalog export/import. */
public record CatalogKv(String key, String value) {}

/** All namespace entries under this catalog's {@code n/} prefix. */
public List<CatalogKv> listAllNamespaceKvs() {
return prefixScan(namespacePrefix());
}

/**
* All table entries under this catalog's {@code t/} prefix. When {@code namespacePath} is set,
* only tables in that namespace (e.g. {@code flowers} or {@code parent/child}) are included.
*/
public List<CatalogKv> listAllTableKvs(String namespacePath) {
String prefix = tablePrefix();
if (namespacePath != null && !namespacePath.isBlank()) {
prefix = prefix + namespacePath + "/";
}
return prefixScan(prefix);
}

private List<CatalogKv> prefixScan(String prefix) {
GetResponse res = unwrap(kv.get(byteSeq(prefix), GetOption.builder().isPrefix(true).build()));
return res.getKvs().stream()
.map(
entry ->
new CatalogKv(
entry.getKey().toString(StandardCharsets.UTF_8),
entry.getValue().toString(StandardCharsets.UTF_8)))
.sorted(Comparator.comparing(CatalogKv::key))
.toList();
}

public enum PutCatalogKvResult {
CREATED,
SKIPPED,
OVERWRITTEN
}

/**
* Writes a catalog key. When {@code overwrite} is false and the key exists, returns {@link
* PutCatalogKvResult#SKIPPED} without writing. When {@code dryRun} is true, no write is
* performed.
*/
public PutCatalogKvResult putCatalogKv(
String key, String jsonValue, boolean overwrite, boolean dryRun) {
ByteSequence k = byteSeq(key);
GetResponse existing = unwrap(kv.get(k, GetOption.builder().withCountOnly(true).build()));
boolean exists = existing.getCount() > 0;
if (exists && !overwrite) {
return PutCatalogKvResult.SKIPPED;
}
if (dryRun) {
return exists ? PutCatalogKvResult.OVERWRITTEN : PutCatalogKvResult.CREATED;
}
unwrapCommit(kv.put(k, byteSeq(jsonValue)));
return exists ? PutCatalogKvResult.OVERWRITTEN : PutCatalogKvResult.CREATED;
}

// Used by EtcdCatalogTest to test concurrent modifications.
protected Txn kvtx() {
return kv.txn();
Expand Down
Loading
Loading