Skip to content

Latest commit

 

History

History
809 lines (631 loc) · 25.2 KB

File metadata and controls

809 lines (631 loc) · 25.2 KB

connectrpc User Guide

This guide is the long-form companion to the crate README. It covers installation, code generation, server and client usage, streaming, tower middleware, TLS, error handling, and compression. If you just want to try the library, start with the README quick start and the examples/ directory.

Contents

Installation

connectrpc ships as three crates:

Crate Purpose
connectrpc Tower-based runtime: server dispatcher, client transports, codec, compression
protoc-gen-connect-rust (binary, in connectrpc-codegen) protoc plugin that generates service stubs
connectrpc-build build.rs integration that runs the codegen at build time

Add the runtime to your Cargo.toml:

[dependencies]
connectrpc = "0.3"

The runtime depends on buffa for protobuf message types. Generated code requires a small set of direct dependencies; see Generated Code Dependencies in the README for the exact list.

MSRV

The MSRV is Rust 1.88, declared on the workspace and verified in CI. The crate uses Rust 2024 edition.

Feature flags

The runtime is feature-gated so you only pay for what you use:

Feature Default What it adds
gzip yes Gzip compression via flate2
zstd yes Zstandard compression via zstd
streaming yes Streaming compression via async-compression
client no HTTP client transports (cleartext)
client-tls no TLS for client transports
server no Built-in hyper server (Server)
server-tls no TLS for the built-in server
tls no Convenience alias for both server-tls + client-tls
axum no Axum integration (Router::into_axum_service, Router::into_axum_router)

Common combinations:

# Just the server, behind axum
connectrpc = { version = "0.3", features = ["axum"] }

# Server + client, both with TLS
connectrpc = { version = "0.3", features = ["axum", "client", "tls"] }

# Built-in server (no axum)
connectrpc = { version = "0.3", features = ["server"] }

# Minimal (wasm-friendly: no networking, no native compression)
connectrpc = { version = "0.3", default-features = false }

Quick start

Define a service:

// proto/greet.proto
syntax = "proto3";
package greet.v1;

service GreetService {
  rpc Greet(GreetRequest) returns (GreetResponse);
}

message GreetRequest { string name = 1; }
message GreetResponse { string greeting = 1; }

Generate code with connectrpc-build in build.rs:

[build-dependencies]
connectrpc-build = "0.3"
// build.rs
fn main() {
    connectrpc_build::Config::new()
        .files(&["proto/greet.proto"])
        .includes(&["proto/"])
        .include_file("_connectrpc.rs")
        .compile()
        .unwrap();
}

Implement the service:

// src/main.rs
use std::sync::Arc;
use buffa::view::OwnedView;
use connectrpc::{ConnectError, Context, Router};

pub mod proto {
    include!(concat!(env!("OUT_DIR"), "/_connectrpc.rs"));
}
use proto::greet::v1::*;

struct MyGreet;

impl GreetService for MyGreet {
    async fn greet(
        &self,
        ctx: Context,
        req: OwnedView<GreetRequestView<'static>>,
    ) -> Result<(GreetResponse, Context), ConnectError> {
        Ok((
            GreetResponse {
                greeting: format!("Hello, {}!", req.name),
                ..Default::default()
            },
            ctx,
        ))
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    let service = Arc::new(MyGreet);
    let router = service.register(Router::new());
    let app = router.into_axum_router();

    let listener = tokio::net::TcpListener::bind("127.0.0.1:8080").await?;
    axum::serve(listener, app).await?;
    Ok(())
}

That's the full server. Make a request with curl to confirm it works:

curl -X POST http://localhost:8080/greet.v1.GreetService/Greet \
  -H 'content-type: application/json' \
  -d '{"name": "World"}'

For runnable end-to-end examples, see the examples/ directory.

Code generation

Two workflows are supported. Both produce the same runtime API.

connectrpc-build (build-time, simplest)

Used in build.rs. Compiles .proto files at build time, regenerates on change, no extra binaries needed.

// build.rs
fn main() {
    connectrpc_build::Config::new()
        .files(&["proto/greet.proto", "proto/billing.proto"])
        .includes(&["proto/"])
        .include_file("_connectrpc.rs")
        .compile()
        .unwrap();
}

Output is unified: message types and service stubs in one file per proto, assembled via a single include!. Best for simple projects.

buf generate (checked-in code, production-grade)

Recommended when you want generated code committed to the repo, multi-output structure (e.g. separate proto modules from service modules), or when generating across language boundaries from one schema. Requires three plugins: protoc-gen-buffa for message types, protoc-gen-connect-rust for service stubs, and protoc-gen-buffa-packaging for assembling mod.rs trees.

See the README's Code generation section for plugin installation, buf.gen.yaml configuration, and the buffa_module shorthand for cross-tree references.

Implementing servers

A service is a Rust trait generated from your .proto file. The trait name matches the proto service name (GreetService becomes trait GreetService), and each RPC becomes an async method.

Handler signatures

Unary handlers take a Context plus an OwnedView<RequestView<'static>>, and return (Response, Context) or a ConnectError:

impl GreetService for MyGreet {
    async fn greet(
        &self,
        ctx: Context,
        req: OwnedView<GreetRequestView<'static>>,
    ) -> Result<(GreetResponse, Context), ConnectError> {
        // req derefs to the view: zero-copy field access.
        // String fields are &str borrowed from the request buffer.
        Ok((
            GreetResponse {
                greeting: format!("Hello, {}!", req.name),
                ..Default::default()
            },
            ctx,
        ))
    }
}

The OwnedView shape lets handlers read string fields without allocating - req.name is a &str directly into the request bytes. Call .to_owned_message() to get the prost-style owned struct when you need it.

The Context parameter

Context carries per-call metadata in both directions:

Field Direction Purpose
headers request Caller-supplied headers (read with ctx.header(name) or ctx.headers.get(...))
response_headers response Headers to send back, set in handler before returning
trailers response Trailers to send back, set via ctx.set_trailer(name, value)
deadline request Absolute Instant if the caller set a timeout
extensions request http::Extensions carried from the underlying http::Request
compress_response response Override the server's compression policy for this RPC

The handler takes ownership of Context and returns it with the response. This is intentional: the returned future is 'static, so the context cannot be borrowed across the await. Set response headers or trailers before returning:

async fn greet(
    &self,
    mut ctx: Context,
    req: OwnedView<GreetRequestView<'static>>,
) -> Result<(GreetResponse, Context), ConnectError> {
    ctx.response_headers
        .insert("x-greet-version", "v2".parse().unwrap());
    ctx.set_trailer(
        http::header::HeaderName::from_static("x-server-id"),
        "node-7".parse().unwrap(),
    );
    Ok((/* response */, ctx))
}

Context::extensions is the passthrough channel for tower-layer state: a custom auth layer can stamp a UserId into the request's http::Extensions, and the dispatcher forwards that map verbatim into Context::extensions for the handler to read with ctx.extensions.get::<UserId>(). See Tower middleware for the full pattern.

Returning errors

Handlers return ConnectError for failures. Each error carries an ErrorCode (the canonical Connect/gRPC status), a message, optional structured details, and optional metadata (headers + trailers):

use connectrpc::{ConnectError, ErrorCode};

return Err(ConnectError::new(
    ErrorCode::NotFound,
    format!("user {name:?} not found"),
));

The dispatcher maps ErrorCode to the appropriate HTTP status and serializes the error in the protocol the caller is using (Connect JSON, Connect binary, gRPC trailers, or gRPC-Web). Handlers don't need to know which protocol the caller chose.

Registering services on a Router

Generated services have a register method (via the register extension trait) that wires every RPC into a connectrpc::Router:

let service = Arc::new(MyGreet);
let router = service.register(Router::new());

To compose multiple services on one server, chain register calls:

let router = Router::new();
let router = Arc::new(MyGreet).register(router);
let router = Arc::new(MyBilling).register(router);

The router is what you mount on axum (router.into_axum_router()) or pass to the built-in Server.

Streaming RPCs

ConnectRPC supports all four RPC types. Define them in your .proto file with the standard stream keyword:

service NumberService {
  rpc Square(SquareRequest) returns (SquareResponse);                 // unary
  rpc Range(RangeRequest) returns (stream RangeResponse);             // server stream
  rpc Sum(stream SumRequest) returns (SumResponse);                   // client stream
  rpc RunningSum(stream RunningSumRequest) returns (stream RunningSumResponse);  // bidi
}

The runnable demo for each type lives in examples/streaming-tour/. The handler signatures are summarized below.

The streaming-handler trait signatures use Pin<Box<dyn Stream<...> + Send>> for both inbound and outbound streams. That's verbose, so the snippets here use two local type aliases (the same ones used in examples/streaming-tour/src/server.rs):

type ResponseStream<T> = Pin<Box<dyn Stream<Item = Result<T, ConnectError>> + Send>>;
type RequestStream<V> = Pin<Box<dyn Stream<Item = Result<OwnedView<V>, ConnectError>> + Send>>;

Server streaming

The handler returns a stream of responses. Use any futures::Stream you like, then box-pin it:

async fn range(
    &self,
    ctx: Context,
    req: OwnedView<RangeRequestView<'static>>,
) -> Result<(ResponseStream<RangeResponse>, Context), ConnectError> {
    let stream = futures::stream::iter(/* ... */);
    Ok((Box::pin(stream), ctx))
}

Client streaming

The handler receives a stream of request views and returns a single response:

async fn sum(
    &self,
    ctx: Context,
    mut requests: RequestStream<SumRequestView<'static>>,
) -> Result<(SumResponse, Context), ConnectError> {
    let mut total: i64 = 0;
    while let Some(req) = requests.next().await {
        total += req?.value.unwrap_or(0) as i64;
    }
    Ok((SumResponse { total: Some(total), ..Default::default() }, ctx))
}

Bidirectional streaming

Takes a request stream and returns a response stream. Both sides can emit messages independently:

async fn running_sum(
    &self,
    ctx: Context,
    requests: RequestStream<RunningSumRequestView<'static>>,
) -> Result<(ResponseStream<RunningSumResponse>, Context), ConnectError> {
    // Map the request stream to a response stream however you like.
    let response_stream = futures::stream::unfold(/* ... */);
    Ok((Box::pin(response_stream), ctx))
}

For bidirectional streams that need true full-duplex behavior (server emits messages independently of client send rate), use a tokio::sync::mpsc channel: spawn a task that reads from requests and writes to the channel sender, return a ReceiverStream as the response. See tests/streaming/src/lib.rs for an example.

Calling streaming RPCs from a client

Generated clients expose a method for each RPC. Server streaming returns a stream you call .message().await? on; bidi returns a handle with .send(req).await? and .message().await? plus .close_send():

// Server streaming
let mut stream = client.range(req).await?;
while let Some(msg) = stream.message().await? {
    // ...
}

// Client streaming - takes a Vec
let resp = client.sum(vec![req1, req2, req3]).await?;

// Bidi
let mut bidi = client.running_sum().await?;
bidi.send(req).await?;
let reply = bidi.message().await?;
bidi.close_send();

Both streaming-tour/src/client.rs and the eliza example show these patterns end-to-end.

Tower middleware

The connect router is a tower::Service, so any tower layer composes on top. The full reference is in examples/middleware/, which uses an axum::middleware::from_fn for bearer-token auth and chains it with tower-http's TraceLayer and TimeoutLayer.

Composing layers

Use tower::ServiceBuilder for clear top-to-bottom ordering, mounted on axum::Router::layer() so axum handles the body conversion from ConnectRpcBody to axum::body::Body:

use std::sync::Arc;
use std::time::Duration;
use tower::ServiceBuilder;
use tower_http::{trace::TraceLayer, timeout::TimeoutLayer};

let connect_router = service.register(Router::new());
let tokens = Arc::new(token_table());
let app = axum::Router::new()
    .fallback_service(connect_router.into_axum_service())
    .layer(
        ServiceBuilder::new()
            .layer(TraceLayer::new_for_http())                  // outermost
            .layer(axum::middleware::from_fn_with_state(tokens, auth_middleware))
            .layer(TimeoutLayer::with_status_code(              // innermost
                http::StatusCode::REQUEST_TIMEOUT,
                Duration::from_secs(5),
            )),
    );

ServiceBuilder applies layers top-to-bottom: the first .layer() sees requests first (and responses last). A request flows trace -> auth -> timeout -> dispatcher -> handler.

For auth and similar interceptors, axum::middleware::from_fn (or from_fn_with_state for stateful cases) is usually the lightest path because it lets you write the middleware as a plain async function. A hand-rolled tower::Layer + tower::Service pair is also fine when you need finer control - both produce a Layer that ServiceBuilder accepts.

Passing data from a layer to a handler

The dispatch path moves the request's http::Extensions into Context::extensions verbatim. So a middleware that inserts a value via req.extensions_mut().insert(value) makes that value available to the handler via ctx.extensions.get::<T>(). This is the canonical way to pass per-request state from middleware (auth identity, trace IDs, remote addr, TLS peer info) into the handler.

The middleware example does exactly this with a UserId:

// In the auth middleware:
req.extensions_mut().insert(UserId(user.into()));
next.run(req).await

// In the handler:
let user = ctx.extensions.get::<UserId>().unwrap();

Short-circuit responses

A layer can short-circuit by returning a response without invoking the inner service. The middleware example does this for unauthorized requests, returning a 401 with a Connect-protocol JSON error body so clients see the failure on the same code path they use for handler errors.

Hosting

With axum (recommended)

Router::into_axum_service() returns a tower service you mount via axum::Router::fallback_service, and into_axum_router() returns a ready-to-merge axum router. This is the common path because it lets you compose connect RPC routes with regular HTTP routes (health checks, static files, OAuth callbacks):

let app = axum::Router::new()
    .route("/health", axum::routing::get(|| async { "OK" }))
    .fallback_service(connect_router.into_axum_service())
    .layer(/* tower layers */);

let listener = tokio::net::TcpListener::bind("0.0.0.0:8080").await?;
axum::serve(listener, app).await?;

Standalone server

Enable the server feature for a built-in hyper-based server. This is the no-frills path when you don't need axum's routing or per-route configuration:

use connectrpc::Server;

let connect_router = service.register(Router::new());
Server::new(connect_router)
    .serve("127.0.0.1:8080".parse()?)
    .await?;

The standalone Server handles HTTP/1.1, HTTP/2 with prior knowledge, and graceful shutdown. It's a single dispatcher with no per-route configuration, so add things like health endpoints either as RPC methods or by switching to the axum path.

TLS

Enable the server-tls feature (or the tls umbrella feature for both server and client TLS).

For the standalone Server:

use std::sync::Arc;

let server_config: Arc<rustls::ServerConfig> = /* load PEMs, build config */;

Server::new(connect_router)
    .with_tls(server_config)
    .serve("0.0.0.0:8443".parse()?)
    .await?;

For the axum path, wrap the listener with tokio_rustls::TlsAcceptor yourself (this is what the eliza example does).

The eliza example (examples/eliza/README.md) walks through generating self-signed certificates with openssl, configuring mTLS via --client-ca, and the rustls strict-PKI requirement that your CA cert must be distinct from the server leaf cert.

Clients

Enable the client feature for HTTP client support with connection pooling.

HttpClient

HttpClient is the standard transport built on hyper. Construct one of two variants: cleartext (http:// only) or TLS-enabled (https:// only):

use connectrpc::client::HttpClient;

// Cleartext
let http = HttpClient::plaintext();

// TLS - requires client-tls or tls feature
let tls_config: Arc<rustls::ClientConfig> = /* trust store + ALPN */;
let http = HttpClient::with_tls(tls_config);

A plaintext() client refuses https:// URIs and a with_tls() client refuses http:// URIs - this catches misconfiguration loudly rather than silently downgrading.

ClientConfig

ClientConfig carries the base URI and per-call defaults that apply to every RPC made with the client:

use std::time::Duration;
use connectrpc::client::ClientConfig;

let config = ClientConfig::new("http://localhost:8080".parse()?)
    .default_timeout(Duration::from_secs(30))
    .default_header("authorization", "Bearer demo-token")
    .default_header("x-trace-id", "trace-12345");

These defaults automatically apply to every call from that client. Use them for cross-cutting concerns like auth or tracing IDs.

CallOptions

For per-call overrides, use the _with_options method variants and pass CallOptions:

use connectrpc::client::CallOptions;

let resp = client.greet_with_options(
    GreetRequest { name: "World".into(), ..Default::default() },
    CallOptions::default()
        .with_timeout(Duration::from_secs(5))
        .with_max_message_size(1024 * 1024),
).await?;

Per-call options replace config defaults for the fields they set (timeout here); other defaults (the auth header) still apply.

Reading the response

Unary responses give you several access patterns:

let resp = client.greet(req).await?;

// Pattern 1: borrow the view via .view(). Zero-copy. Use this when
// you also need headers/trailers - OwnedView derefs to the view, so
// field access (.greeting -> &str) works directly.
println!("{}", resp.view().greeting);
let _ = resp.headers();
let _ = resp.trailers();

// Pattern 2: consume via .into_view() to get the OwnedView. Still
// zero-copy via Deref, but discards headers/trailers.
let msg = client.greet(req).await?.into_view();
let greeting: &str = msg.greeting;

// Pattern 3: .into_owned() for the prost-style owned struct.
// Allocates and copies all string/bytes fields.
let owned: GreetResponse = client.greet(req).await?.into_owned();

Custom transports

Generated clients are generic over ClientTransport, which is auto- implemented for any tower::Service that handles http::Request<ClientBody> and returns http::Response<B>. So you can plug in any tower stack as the transport:

use tower::ServiceBuilder;
use tower_http::timeout::TimeoutLayer;
use connectrpc::client::{Http2Connection, ServiceTransport};

let conn = Http2Connection::connect_plaintext(uri).await?.shared(1024);
let stacked = ServiceBuilder::new()
    .layer(TimeoutLayer::new(Duration::from_secs(30)))
    .service(conn);

let client = GreetServiceClient::new(
    ServiceTransport::new(stacked),
    config,
);

This is also how the wasm example (examples/wasm-client/) plugs in a browser fetch-based transport.

Errors and status codes

ConnectError is the error type for both server-returned and client-observed errors:

pub struct ConnectError {
    pub code: ErrorCode,
    pub message: Option<String>,
    pub details: Vec<ErrorDetail>,
    pub headers: http::HeaderMap,
    pub trailers: http::HeaderMap,
    // ...
}

ErrorCode is the canonical Connect/gRPC status set: Canceled, Unknown, InvalidArgument, DeadlineExceeded, NotFound, AlreadyExists, PermissionDenied, ResourceExhausted, FailedPrecondition, Aborted, OutOfRange, Unimplemented, Internal, Unavailable, DataLoss, Unauthenticated.

Construct one with the message:

return Err(ConnectError::new(
    ErrorCode::PermissionDenied,
    format!("user {user} cannot read {name}"),
));

The dispatcher maps each code to the appropriate HTTP status (e.g. NotFound -> 404, Unauthenticated -> 401, PermissionDenied -> 403) and the appropriate protocol-specific representation. Clients parse it back into the same ConnectError shape regardless of which protocol they're speaking.

For more structured errors, attach ErrorDetail entries (which carry typed protobuf messages) before returning. These flow through to clients in the standard Connect error-detail wire format.

Compression

The runtime ships with gzip, zstd, and identity by default. Servers advertise supported algorithms in the accept-encoding response and honor the client's connect-content-encoding request header (or grpc-encoding for the gRPC protocols).

Per-RPC compression control

A handler can override the server's compression policy for a single response by setting Context::set_compression:

async fn greet(
    &self,
    mut ctx: Context,
    req: OwnedView<GreetRequestView<'static>>,
) -> Result<(GreetResponse, Context), ConnectError> {
    if response_is_huge() {
        ctx.set_compression(true);  // force compress this response
    }
    Ok((/* ... */, ctx))
}

Custom compression algorithms

CompressionRegistry is pluggable. Implement CompressionProvider for your algorithm and register it on the dispatcher:

use connectrpc::{CompressionProvider, CompressionRegistry, ConnectError};
use bytes::Bytes;

struct MyCompression;

impl CompressionProvider for MyCompression {
    fn name(&self) -> &'static str { "my-algo" }

    fn compress(&self, data: &[u8]) -> Result<Bytes, ConnectError> {
        // ...
    }

    fn decompressor<'a>(
        &self,
        data: &'a [u8],
    ) -> Result<Box<dyn std::io::Read + 'a>, ConnectError> {
        // Return a reader that yields decompressed bytes. The framework
        // controls how much is read, so decompression is bounded by
        // ConnectRpcService::max_message_size.
        // ...
    }
}

let registry = CompressionRegistry::default().register(MyCompression);
let service = ConnectRpcService::new(router).with_compression(registry);

Examples directory tour

Example What it covers
streaming-tour/ All four RPC types (unary, server stream, client stream, bidi) on a trivial NumberService. Smallest demo of handler signatures and client invocation patterns.
middleware/ Server-side tower middleware composition: an axum::middleware::from_fn bearer-token auth, identity passthrough via Context::extensions, response trailers via Context::set_trailer. Client demos ClientConfig::default_header and CallOptions::with_timeout.
eliza/ Production-shaped streaming app: a port of the connectrpc/examples-go ELIZA demo. Server-streaming Introduce + bidi-streaming Converse, TLS, mTLS, CORS, IPv6, both server and client binaries, interoperates with the hosted Go reference at demo.connectrpc.com.
multiservice/ Multiple proto packages compiled together with buf generate, multiple services on one server, well-known type usage.
wasm-client/ Browser fetch transport: same generated client used from wasm32-unknown-unknown with a custom ClientTransport backed by web-sys::fetch.
bazel/ Bazel build integration via custom rules.

Each example has its own README with run instructions.