Data Format

The main data format of Arcon is Protobuf. There are several reasons behind this choice and they are listed below:

  1. Universal data format that enables the data to be moved outside of the Rust ecosystem.
  2. Supports schema evolution.
  3. Good space utilisation on disk.
  4. Decent serialisation/deserialisation cost.

Serialised size of two different Rust structs (Reference):

FrameworkSmallLarge
Protobuf14 bytes106 bytes
Serde::Bincode20 bytes228 bytes

Arcon uses the prost crate to define its data types directly in Rust or through .proto files.

The section is divided into the following sub-sections:

ArconType

Data that is passed through the runtime must implement ArconType. There are a few mandatory attributes that must be added:

  • version An integer representing the version
  • reliable_ser_id An integer representing the version used for in-flight serialisation
  • keys fields that should be used to hash the key
    • If not specified, it will pick all hashable fields.
    • The Arcon derive macro will estimate the amount of bytes for the selected fields and pick a suitable hasher.
  • unsafe_ser_id An integer representing the unsafe in-flight serde if the unsafe_flight feature is enabled.

Declaring ArconType directly in Rust

First make sure that you have also added prost as a dependency.

[dependencies]
arcon = "LATEST_VERSION"
prost = "ARCON_PROST_VERSION"

Then you can directly declare ArconTypes by using the Arcon derive macro together with the Prost macro:

use arcon::prelude::*;
use examples::SnapshotComponent;
use std::sync::Arc;

#[cfg_attr(feature = "arcon_serde", derive(serde::Deserialize, serde::Serialize))]
#[cfg_attr(feature = "unsafe_flight", derive(abomonation_derive::Abomonation))]
#[derive(Arcon, Arrow, prost::Message, Copy, Clone)]
#[arcon(unsafe_ser_id = 12, reliable_ser_id = 13, version = 1, keys = "id")]
pub struct Event {
    #[prost(uint64)]
    pub id: u64,
    #[prost(float)]
    pub data: f32,
}

#[derive(ArconState)]
pub struct MyState<B: Backend> {
    #[table = "events"]
    events: EagerValue<Event, B>,
}

impl<B: Backend> StateConstructor for MyState<B> {
    type BackendType = B;

    fn new(backend: Arc<Self::BackendType>) -> Self {
        Self {
            events: EagerValue::new("_events", backend),
        }
    }
}


fn main() {
    let conf = ArconConf {
        epoch_interval: 2500,
        ctrl_system_host: Some("127.0.0.1:2000".to_string()),
        ..Default::default()
    };

    let mut pipeline = Pipeline::with_conf(conf)
        .collection(
            (0..1000000)
                .map(|x| Event { id: x, data: 1.5 })
                .collect::<Vec<Event>>(),
            |conf| {
                conf.set_timestamp_extractor(|x: &Event| x.id);
            },
        )
        .operator(OperatorBuilder {
            constructor: Arc::new(|backend| {
                Map::stateful(MyState::new(backend), |event, state| {
                    state.events().put(event)?;
                    Ok(event)
                })
            }),
            conf: Default::default(),
        })
        .build();

    pipeline.watch(|epoch: u64, _: MyState<Sled>| {
        println!("Got state object for epoch {}", epoch);
    });

    let kompact_system = KompactConfig::default().build().expect("KompactSystem");
    // Create Kompact component to receive snapshots
    let snapshot_comp = kompact_system.create(SnapshotComponent::new);

    kompact_system
        .start_notify(&snapshot_comp)
        .wait_timeout(std::time::Duration::from_millis(200))
        .expect("Failed to start component");

    pipeline.watch_with::<MyState<Sled>>(snapshot_comp);

    pipeline.start();
    pipeline.await_termination();
}

Generating ArconType from .proto files

Down below is an example of the same Event struct as defined above. Note that the mandatory attributes are gathered through regular // comments.

syntax = "proto3";

package event;

// reliable_ser_id = 101
// unsafe_ser_id = 102
// version = 1
message Event {
  uint64 id = 1;
}

We then need to use the arcon_build crate to generate the Protobuf messages into Arcon supported data.

[build-dependencies]
arcon_build = "ARCON_VERSION"

Add a build.rs file and change the paths if necessary.

fn main() -> Result<(), Box<dyn std::error::Error>> {
    arcon_build::compile_protos(&["src/event.proto"], &["src/"]).unwrap();
    Ok(())
}

The build script will then generate the .rs files and you can include the generated data into an Arcon application by doing the following:

use arcon::prelude::*;

// bring Event into scope
include!(concat!(env!("OUT_DIR"), "/event.rs"));

fn main() {
    let mut pipeline = Pipeline::default()
        .collection(
            (0..10000000)
                .map(|x| Event { id: x })
                .collect::<Vec<Event>>(),
            |conf| {
                conf.set_timestamp_extractor(|x: &Event| x.id);
            },
        )
        .operator(OperatorBuilder {
            constructor: Arc::new(|_| Filter::new(|event: &Event| event.id > 50)),
            conf: Default::default(),
        })
        .build();

    pipeline.start();
    pipeline.await_termination();
}