Category Archives: Rust

Cancellation tokens in Rust

When using tokio::spawn we might wish to pass through a cancellation token to allow us to cancel a long running thread.

We can create a cancellation token like this

let token = CancellationToken::new();

From this we could take one or more child tokens like this

let child = token.child_token();

Using child token’s allows us to cancel all child tokens from the parent or we can cancel each one individually

Now if we spawn our threads, in this case we’ll create two concurrent branches. The first one that completes is the returning value. In this instance we’ll store the JoinHandle just to allow us to force the application to wait upon completion so we get something meaningful output to the console

let handle = tokio::spawn(async move {
  tokio::select! {
    _ = child.cancelled() => {
      println!("Child1 task cancelled");
    }
    _ = tokio::time::sleep(Duration::from_secs(30)) => {
      println!("Child2 task cancelled");
    }
  }
});

Here’s the full code, starting with cargo.toml dependencies

[dependencies]
tokio-util = "0.7.17"
tokio = { version = "1.48.0", features = ["rt", "rt-multi-thread", "macros", "time"] }
select = "0.6.1"
anyhow = "1.0.100"

Now the main.rs code

use std::io;
use std::time::Duration;
use tokio_util::sync::CancellationToken;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let token = CancellationToken::new();
    let child = token.child_token();

    let handle = tokio::spawn(async move {
        tokio::select! {
            _ = child.cancelled() => {
                println!("Child1 task cancelled");
            }
            _ = tokio::time::sleep(Duration::from_secs(30)) => {
                println!("Child2 task cancelled");
            }
        }
    });

    io::stdin().read_line(&mut String::new())?;
    token.cancel();

    handle.await.expect("Task panicked");
    println!("Task Completed");

    Ok(())
}

Rust and Sqlite

Add the dependencies

[dependencies]
rusqlite = { version = "0.37.0", features = ["bundled"] }

The bundled part will automatically compile and link an upto date SQLite, without this I got errors such as “LINK : fatal error LNK1181: cannot open input file ‘sqlite3.lib'”, obviously if you have everything installed for SQLite, then you might prefer the non-bundled dependency, so just replace this with.

[dependencies]
rusqlite = "0.37.0"

Create a DB

Now let’s create a database as a file and insert an initial row of data

use rusqlite::Connection;

fn main() {
    let connection = Connection::open("./data.db3").unwrap();
    connection.execute("CREATE TABLE app (id INTEGER PRIMARY KEY, name TEXT NOT NULL", ()).unwrap();
    connection.execute("INSERT INTO app (id, name) VALUES (?, ?)", (1, "Hello")).unwrap();
}

We could also do this in memory using the following

let connection = Connection::open_in_memory().unwrap();

Reading from our DB

We’ll create a simple structure representing the DB created above

#[derive(Debug)]
struct App {
    id: i32,
    name: String,
}

Now to read into this we use the following

let mut statement = connection.prepare("SELECT * FROM app").unwrap();
let app_iter = statement.query_map([], |row| {
  Ok(App {
    id: row.get(0)?,
    name: row.get(1)?,
  })
}).unwrap();

for app in app_iter {
  println!("{:?}", app.unwrap());
}

You’ll also need the following use clause

use rusqlite::fallible_iterator::FallibleIterator;

Creating kubectl plugins

To create a kubectl plugin whereby, for example, we could rung a new tool like this

kubectl log index echo 3 -n dev

Where the above would find pods with a partial name of echo and from those pods that match, finds the index 3 (0 indexed).

To create a plugin you use the naming convention

kubeclt-<your-plugin-name>

You need to build your plugin then ensure it’s copied into your PATH.

Once built and copied, you can use the following to check if kubectl can find the plugin

kubectl plugin list

Sample Plugin

I’ve created the plugin using Rust.

Note: This is just a quick implementation and not fully tested, but gives an idea of how to create such a plugin.

Set your Cargo.toml dependencies as follows

k8s-openapi = { version = "0.26.0", features = ["v1_32"] }
kube = { version = "2.0.1", features = ["runtime", "derive"] }
tokio = { version = "1", features = ["full"] }
clap = { version = "4", features = ["derive"] }
anyhow = "1.0"

Next we want to create the command line arguments using the following

#[derive(Parser, Debug)]
#[command(name = "kubectl-log-index")]
#[command(author, version, about)]
pub struct Args {
    /// Partial name of the pod to match
    #[arg()]
    pub pod_part: String,
    /// Index of the pod (0-based)
    pub index: usize,
    /// Follow the log stream
    #[arg(short = 'f', long)]
    pub follow: bool,
    /// Kubernetes namespace (optional)
    #[arg(short, long)]
    pub namespace: Option<String>,
}

We’re supplying some short form parameters such as -f which can be used instead of –follow, likewise -n in place of –namespace.

Our main.rs looks like this

mod args;

use clap::Parser;
use anyhow::Result;
use kube::{Api, Client};
use k8s_openapi::api::core::v1::Pod;
use std::process::Command;
use kube::api::ListParams;
use kube::runtime::reflector::Lookup;
use crate::args::Args;

/// kubectl plugin to get logs by container index
#[tokio::main]
async fn main() -> Result<()> {
    let args = Args::parse();

    let namespace: &str = args.namespace
        .as_deref()
        .unwrap_or("default");

    let client = Client::try_default().await?;
    let pods: Api<Pod> = Api::namespaced(client, namespace);

    let pod_list = find_matching_pods(pods, &args.pod_part).await.expect("Failed to find matching pods");
    
    let pod = pod_list
        .get(args.index)
        .cloned()
        .ok_or_else(|| anyhow::anyhow!("Pod not found"))?;

    let pod_name = &pod.name().ok_or_else(|| anyhow::anyhow!("Pod name not found"))?;

    let mut cmd = Command::new("kubectl");

    cmd.args(["logs", pod_name]);

    if namespace != "default" {
        cmd.args(["-n", namespace]);
    }

    if args.follow {
        cmd.arg("-f");
    }

    cmd
        .status()?;

    Ok(())
}

pub async fn find_matching_pods(
    pods: Api<Pod>,
    partial: &str,
) -> Result<Vec<Pod>, Box<dyn std::error::Error>> {
    let pod_list = pods.list(&ListParams::default()).await?;

    let matches: Vec<Pod> = pod_list.items
        .into_iter()
        .filter(|pod| {
            pod.metadata.name
                .as_ref()
                .map(|name| name.contains(partial))
                .unwrap_or(false)
        })
        .collect();

    Ok(matches)
}

A simple Rust application using the Kube client

Rust has a kubernetes client crate which allows us to write code against Kubernetes via the client (i.e. instead of calling out to kubectl itself).

Create yourself a binary Rust application with the following dependencies

[dependencies]
k8s-openapi = { version = "0.26.0", default-features = false, features = ["v1_32"] }
kube = { version = "2.0.1", features = ["runtime", "client"] }
tokio = { version = "1.30", features = ["full"] }
anyhow = "1.0"

Note: Check the k8s-openapi features match you installed kubernetes, run kubectl version to check your server version and use this.

This is very much a starter post, so I’m going to just change main.rs to simply instantiate a client and get all pods running across all namespaces

use kube::{Api, Client};
use k8s_openapi::api::core::v1::Pod;
use kube::runtime::reflector::Lookup;

#[tokio::main]
async fn main() -> anyhow::Result<()> {

  let client = Client::try_default().await?;
  let pods: Api<Pod> = Api::all(client);

  let pod_list = pods.list(&Default::default()).await?;

  for p in pod_list {
    println!("Pod name: {:?}", p.name().expect("Pod name missing"));
  }

  Ok(())
}

If you want to use the default namespace then change the line

let pods: Api<Pod> = Api::all(client);

to

let pods: Api<Pod> = Api::default_namespaced(client);

or if you want to get pods from a specific namespace use

let pods: Api<Pod> = Api::namespaced(client, "mynamespace");

Note: Ofcourse you could use “default” for the default namespace or “” for all namespaces in place of “mynamespace”.

Code

Code is available on GitHub.

Trying out SurrealDB with Rust

SurrealDB is a multi-model database, which essentially means it allows storage of relation, document, graph, time-series, vector and search as well as geospatial models (as taken from the SurrealDB Overview).

SurrealDB allows queries through an SQL like query language as well as GraphQL, HTTP and RPC.

There are SDKs for Rust (which I’m going to use here) along with JavaScript, Java, Go, Python, .NET and PHP.

Whilst you can install on Windows, Linux and Mac I prefer using Docker, so let’s run up an instance of SurrealDB

docker run --rm -p 8000:8000 surrealdb/surrealdb:latest start --log trace --user root --pass root memory

With a volume, either create yourself a folder (i.e. mkdir mydata) or use an existing path

docker run --rm -p 8000:8000 surrealdb/surrealdb:latest start --log trace --user root --pass root mydb:/mydata/mydatabase.db

If you’d like to run a web based UI for SurrealDB, you can run Surrealist

docker run -d -p 8080:8080 surrealdb/surrealist:latest

Then use this to connect to your running instance, default user is admin, default password is admin (obviously change this in a real world usage).

Once connected via Surrealist we can create a namespace and database, here’s a simple example of such a query run via Surrealist

USE NS myns DB mydb;

Yes, we literally just use the namespace and database for the first time to create both. Now let’s a some data, creating a “table” using

CREATE person CONTENT {
  first_name: "Scooby",
  last_name: "Doo",
  age: 42,
  email: "scooby.doo@example.com"
};

We can query for the list of “person” rows using

SELECT * FROM person;

As you can see, it’s very SQL like syntax with some differences.

We didn’t created an id or such like field, but if you select the rows from the person table you’ve notice something like this

[
  {
    age: 42,
    email: 'scooby.doo@example.com',
    first_name: 'Scooby',
    id: person:77xrs2c05oe9bmtgjbhq,
    last_ame: 'Doo'
  }
]

We could have supplied an id ourselves like this

CREATE person CONTENT {
  first_name: "Fred",
  last_name: "Jones",
  age: 19,
  id: person:fredjones,
  email: "fred.jones@example.com"
};

We can update a row using

UPDATE person:77xrs2c05oe9bmtgjbhq SET name="Scrappy", age = 23;

There are obviously more commands/queries we could use, but let’s move on to using the DB from Rust.

We’ll start by adding a few dependencies to Cargo.toml

[dependencies]
tokio = { version = "1.47.1", features = ["full"] }
surrealdb = "2.3.8"
serde = { version = "1.0.219", features = ["derive"] }

Next update main.rs to look like this

use surrealdb::{Surreal};
use surrealdb::engine::remote::ws::Ws;
use std::error::Error;
use surrealdb::opt::auth::Root;

use surrealdb::sql::Thing;
use serde::Deserialize;

#[derive(Debug, Deserialize)]
struct Person {
    id: Thing,
    first_name: String,
    last_name: String,
    email: String,
    age: u32,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let db = Surreal::new::<Ws>("127.0.0.1:8000").await?;
    db.signin(Root { username: "root", password: "root" }).await?;
    db.use_ns("myns").use_db("mydb").await?;

    let result: Vec<Person> = db.query("SELECT * FROM person").await?.take(0)?;

    println!("{:?}", result);
    Ok(())
}

We’re using the default username and password. Ofcourse you should change the password for this user and create your own user, but for now, let’s just get things up and running.

Notice that we connect to SurrealDB via the web socket.

You may have also noticed that in our Person struct we have an id Thing. This is essentially a record pointer, which has the table name and record id.

Logging with Rust

Rust supports a logging facade, which we can include using the following dependency in Cargo.toml

[dependencies]
log = "0.4.28"

Now in our main.rs we can use the various levels of logging like this

use log::{info, warn, error, trace, debug, LevelFilter};

fn main() {
    debug!("Debug log message");
    trace!("Trace log message");
    info!("Info log message");
    warn!("Warning log message");
    error!("Error log message");
}

If you run this, nothing will be output because we need to add a logging provider.

One simple provider is env_logger which will log to standard out. To include, add the following to the Cargo.toml dependencies

env_logger = "0.11.8"

We’ll need to add the use clause

use env_logger::{Builder, Env};

and then we need to initialise the env_logger, we can use the following at the start of the main function

env_logger::init();

This will only output ERROR messages, we can change the log level using the environment variable like this

RUST_LOG=trace

Alternatively we can set the environment variable within code by replace the environment variable and the env_logger::init(); line with

let env = Env::default().filter_or("RUST_LOG", "trace");
Builder::from_env(env).init();

or we can set in code instead using

Builder::new()
   .filter_level(LevelFilter::Trace)
   .init();

Rust and reqwest

I’ve covered a few topics around Rust lately on this blog. Hopefully around technologies that are most likely to be used in many real world applications. This post is about one of the missing pieces – how do we call our web API’s/services etc.

In C# we have the HttpClient which is the usual type for such use cases. With Rust there are various options, but as the title suggests, we’re going to concentrate on reqwest.

All we really need to do is supply a couple of crates to Cargo.toml, as you’ll have guessed, one is reqwest. The other is tokio because I’m waiting to use async/await. So create yourself a project then update Cargo.toml to add these dependencies

reqwest = "0.12.23"
tokio = { version = "1.47.1", features = ["rt", "rt-multi-thread", "macros"] }

Next up, open src/main.rs (or create one) and let’s add a simply GET call

#[tokio::main]
async fn main() -> Result<(), reqwest::Error> {
let result = reqwest::get("https://httpbin.org/get")
  .await?
  .text()
  .await?;

  println!("{}", result);
  Ok(())
}

This is a “shortcut” to use a get method.

The following is a longer form and is what we’d use for other HTTP methods, but I’m showing how we can generate a RequestBuilder (the type returned from client.get) and then send this and retrieve the response

let client = reqwest::Client::new();
let result = client.get("https://httpbin.org/get");
let result = result.send().await?.text().await?;

Other HTTP methods, such as POST, DELETE etc. can be created from the RequestBuilder, for example

let post = client.post("https://httpbin.org/post")
  .body("hello world")
  .header("Content-Type", "text/plain");

let result = post.send().await?.text().await?;

JSON instead of plan text

Often we’ll want to deserialize to types, i.e. via JSON, so update Cargo.toml to lok like this

[dependencies]
reqwest = { version = "0.12.23", features = ["json"] }
tokio = { version = "1.47.1", features = ["rt", "rt-multi-thread", "macros"] }
serde = { version = "1.0.219", features = ["derive"] }

Now change main.rs to add this code to the start of the file

use serde::Deserialize;

#[derive(Deserialize)]
struct ApiResponse {
    message: String,
}

ApiResponse will represent our object and we use the following

let client = reqwest::Client::new();
let result = client.get("https://your_api");
let result = result
        .send()
        .await?
        .json::<ApiResponse>()
        .await?;

println!("{}", result.message);

Rust Rocket (and openapi/swagger)

Rocket provides code that allows us to build web servers and web based applications such as web APIs.

We’ll start by just creating a simple endpoint, and then we’ll look at adding Open API and swagger support.

Starting simple

Create yourself a Rust package, for example with cargo

cargo new myapi --bin

add the following dependency to your Cargo.toml

rocket = "0.5.1"

Now create a Rocket.toml so we can configure rocket’s server and mine looks like this

[default]
port = 8080
address = "127.0.0.1"

We need a main.rs (so you can delete the lib.rs if you wish for now) and here’s a very basic starting point

#[macro_use] extern crate rocket;

#[get("/")]
fn index() -> &'static str {
    "Hello, world!"
}

#[launch]
fn rocket() -> _ {
    rocket::build().mount("/", routes![index])
}

Now, run the application using

cargo run

As you can see this is a minimal API style, i.e. we create a function supplying it with an HTTP method and we add it to the routes list.

Adding the usual echo endpoint

Now let’s add my version of “Hello World” for API’s, a simple echo endpoint.

#[get("/echo?<text>")]
fn echo(text: &str) -> String {
    format!("Echo: {}", text)
}

Now add this to the routes i.e.

#[launch]
fn rocket() -> _ {
    rocket::build().mount("/", routes![index, echo])
}

Add Open API and Swagger

Now we have a couple of simple endpoints, let’s add Open API and Swagger and change the echo endpoint to use Json. I’m purposefully going to keep the index as non-Open API just to demonstrate running both Open API and non-Open API endpoints.

We’re going to need a few addition to our Cargo.toml – now, unfortunately it’s easy to get multiple version dependencies for these, so the one’s shown here will work together without warning/errors

[dependencies]
rocket = { version = "0.5.1", features = ["json"] }
openapi = "0.1.5"
serde = "1.0.219"
rocket_okapi = { version = "0.9.0", features = ["swagger"] }
schemars = "0.8.22"

Notice we’re adding features to the rocket crate and we’ve got some creates for swagger and open api. The schamars crate 0.8.22 was being used by other crates, hence I locked this down to the same version.

We’ll extend our echo endpoint to return Json, but before we do I’ll list the use clauses that are listed after #[macro_use] extern crate rocket;

use rocket::serde::{Serialize, json::Json};
use rocket_okapi::{openapi, openapi_get_routes};
use rocket_okapi::swagger_ui::{make_swagger_ui, SwaggerUIConfig};
use schemars::JsonSchema;

We’ll create a response object for the echo endpoint and update the echo endpoint to both return this and add the openapi attribute to allow this endpoint to have an open api spec generated for it

#[derive(Serialize, JsonSchema)]
struct EchoResponse {
    message: String,
}

#[openapi]
#[get("/echo?<text>")]
fn echo(text: &str) -> Json<EchoResponse> {
    Json(EchoResponse {
        message: format!("Echo: {}", text),
    })
}

Next up we need to change the rocket function, so let’s just see the latest version

#[launch]
fn rocket() -> _ {
    rocket::build()
        .mount("/", routes![index])
        .mount("/", openapi_get_routes![echo])
        .mount(
            "/swagger",
            make_swagger_ui(&SwaggerUIConfig {
                url: "/openapi.json".to_owned(),
                ..Default::default()
            })
        )
}

Now I purposefully left the index route without an open api attribute just to demonstrate, if you have such endpoints, you need to still use the routes! macro, if you add index to openapi_get_routes! without the open api attribute you’ll get some slight ambiguous error’s such as a function with a similar name exists.

Now run your application and go to http://localhost:8080/swagger/index.html and you can interact with your endpoints via the Swagger UI you can also access the openapi.json file using http://localhost:8080/openapi.json.

Code

Available on GitHub.

Rust and gRPC (with Protocol Buffers)

Back in 2018 I published a couple of posts around Using Protocol Buffers and Using gRPC with Protocol Buffers.

For this post we’re going to look at using gRPC and Protocol Buffers from Rust.

Getting Started

Before we begin to do anything in Rust we’ll need protoc on our machine, so checkout https://github.com/protocolbuffers/protobuf/releases for a release.

Note: On Windows we can just use winget install protobuf then run protoc –version to check it was installed.

Also ensure protoc.exe is in your path or set-up via your development tools – in my case I’m using JetBrains RustRover and added the environment variable PROTOC to the project configuration with a value of C:\Users\{your-username}\AppData\Local\Microsoft\WinGet\Links\protoc.exe as I installed on Windows via winget.

Next, let’s create the bare bones project.

  • Create yourself a folder for your project then…
  • Run the following (replace rust_grpc) with your project name
    cargo new rust_grpc
    
  • cd into the folder just created
  • Update the Cargo.toml to include the following dependencies and build-dependencies
    [dependencies]
    tonic = "0.14.2"
    tokio = { version = "1", features = ["full"] }
    prost = "0.14.1"
    tonic-prost = "0.14.2"
    
    [build-dependencies]
    tonic-prost-build = "0.14.2"
    

Obviously change the dependencies versions to suit.

The build-dependencies will generate the source code from our .proto file.

Creating the proto file(s)

Let’s create a simple proto file.

  • Create a folder, we’ll use a standard name, so ours is called proto off of the root folder
  • Create a file names hello.proto and copy the code below into it (this is a sort of “Hello World” of proto files)
    syntax = "proto3";
    
    package hello;
    
    service Greeter {
      rpc SayHello (HelloRequest) returns (HelloReply);
    }
    
    message HelloRequest {
      string name = 1;
    }
    
    message HelloReply {
      string message = 1;
    }
    
  • To generate the code from the .proto, create a build.rs file with the following code
    use tonic_prost_build::configure;
    
    fn main() -> Result<(), Box<dyn std::error::Error>> {
        configure()
            .out_dir("src/generated")
            .compile_protos(&["proto/hello.proto"], &["proto"])
            .unwrap();
        Ok(())
    }
    

I had problems getting the build.rs to generate source for the proto file, so you might need to create a folder /src/generated before running the command and the proto folder is off on the project root i.e. alongside the src folder as mentioned previous, so ensure that’s correct.

To generate the source files for the project we can run the build from a tool such as RustRover or use cargo build from your project folder.

I’m not going to include the whole file that’s generated but you should see bits like the following

/// Generated client implementations.
pub struct HelloRequest {
    #[prost(string, tag = "1")]
    pub name: ::prost::alloc::string::String,
}
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
pub struct HelloReply {
    #[prost(string, tag = "1")]
    pub message: ::prost::alloc::string::String,
}
pub mod greeter_client {
    #![allow(
        unused_variables,
        dead_code,
        missing_docs,
        clippy::wildcard_imports,
        clippy::let_unit_value,
    )]

As you can see we have representations of the request and reply from the .proto file.

I also added a mod.rs file to the src/generated folder which looks like this

pub mod hello;

This will make our generated source available to the main.rs file for importing.

This example exists on the tonic GitHub repo https://github.com/hyperium/tonic/tree/master/examples/src/helloworld, I hadn’t realised when I started this but I would suggest you check out their examples.

I’m going to place everything in the main.rs file for simplicity, but ofcourse the code should be split into client, server and main code when using in anything other than such a simple example, but let’s look at each section of code separately…

We have a GreeterSever generated from our proto code but we need to create the equivalent of an “endpoint” or “service”, so we’ll create service with the following code

#[derive(Default)]
pub struct GreeterService {}

#[tonic::async_trait]
impl Greeter for GreeterService {
    async fn say_hello(
        &self,
        request: Request<HelloRequest>,
    ) -> Result<Response<HelloReply>, Status> {
        let name = request.into_inner().name;
        let reply = HelloReply {
            message: format!("Hello, {}!", name),
        };
        Ok(Response::new(reply))
    }
}

This essentially responds to a HelloRequest returning a HelloReply – as mentioned, think of this as your service endpoint.

We’re going to need to create a server, which will look like this

async fn grpc_server() -> Result<(), Box<dyn std::error::Error>> {
    let addr = "[::1]:50051".parse()?;
    let greeter = GreeterService::default();

    println!("Server listening on {}", addr);

    Server::builder()
        .add_service(GreeterServer::new(greeter))
        .serve(addr)
        .await?;

    Ok(())
}

Notice that we are indeed creating a server, listening on a port. We supply the service to the Server::builder via add_service and that’s pretty much it.

Next we’re going to need a client to send some request, so here’s an example

async fn grpc_client() -> Result<(), Box<dyn std::error::Error>> {

    let mut client = GreeterClient::connect("http://[::1]:50051").await?;

    let request = Request::new(HelloRequest {
        name: "PutridParrot".into(),
    });
    let response = client.say_hello(request).await?;

    println!("Response is {:?}", response.into_inner().message);
    Ok(())
}

Ofcourse the client connects to the server, creates a request and sends it to the server via the say_hello function. This is a call via the generated code, not to be confused with the GreeterService function of the same name, however ofcourse this will then go via the wire to the server and be handled by the GreeterService’s say_hello function.

We await the response and println! it.

Now let’s just create a simple main/entry point to run the server then run the client and get a response (again this is made simple just for ease of using the one file (main.rs) and ofcourse should be separated in a real world use.

Note: I’ll also include all the use code as well in this sample

mod generated;

use tokio::spawn;
use tokio::time::{sleep, Duration};
use tonic::{Request, Response, Status};
use tonic::transport::Server;
use crate::generated::hello::greeter_client::GreeterClient;
use crate::generated::hello::greeter_server::{Greeter, GreeterServer};
use crate::generated::hello::{HelloReply, HelloRequest};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    spawn(async {
        grpc_server().await.unwrap();
    });

    sleep(Duration::from_millis(500)).await;

    grpc_client().await?;
    
    Ok(())
}

Use cargo run or run via RustRover or your preferred development tools and you should see

Server listening on [::1]:50051
Response is "Hello, PutridParrot!"

Code

Code is available for GitHub. Don’t forget to install protoc and ensure the path is set if you wish to run the code.