Data Format

The main data format of Arcon is Protobuf. There are several reasons behind this choice and they are listed below:

  1. Universal data format that enables the data to be moved outside of the Rust ecosystem.
  2. Supports schema evolution.
  3. Good space utilisation on disk.
  4. Decent serialisation/deserialisation cost.

Serialised size of two different Rust structs (Reference):

Protobuf14 bytes106 bytes
Serde::Bincode20 bytes228 bytes

Arcon uses the prost crate to define its data types directly in Rust or through .proto files.

The section is divided into the following sub-sections:


Data that is passed through the runtime must implement ArconType. There are a few mandatory attributes that must be added:

  • version An integer representing the version
  • reliable_ser_id An integer representing the version used for in-flight serialisation
  • keys fields that should be used to hash the key
    • If not specified, it will pick all hashable fields.
    • The Arcon derive macro will estimate the amount of bytes for the selected fields and pick a suitable hasher.
  • unsafe_ser_id An integer representing the unsafe in-flight serde if the unsafe_flight feature is enabled.

Declaring ArconType directly in Rust

First make sure that you have also added prost as a dependency.


Then you can directly declare ArconTypes by using the Arcon derive macro together with the Prost macro:

use arcon::prelude::*;
use examples::SnapshotComponent;
use std::sync::Arc;

#[cfg_attr(feature = "arcon_serde", derive(serde::Deserialize, serde::Serialize))]
#[cfg_attr(feature = "unsafe_flight", derive(abomonation_derive::Abomonation))]
#[derive(Arcon, Arrow, prost::Message, Copy, Clone)]
#[arcon(unsafe_ser_id = 12, reliable_ser_id = 13, version = 1, keys = "id")]
pub struct Event {
    pub id: u64,
    pub data: f32,

pub struct MyState<B: Backend> {
    #[table = "events"]
    events: EagerValue<Event, B>,

impl<B: Backend> StateConstructor for MyState<B> {
    type BackendType = B;

    fn new(backend: Arc<Self::BackendType>) -> Self {
        Self {
            events: EagerValue::new("_events", backend),

fn main() {
    let conf = ArconConf {
        epoch_interval: 2500,
        ctrl_system_host: Some("".to_string()),

    let mut pipeline = Pipeline::with_conf(conf)
                .map(|x| Event { id: x, data: 1.5 })
            |conf| {
                conf.set_timestamp_extractor(|x: &Event|;
        .operator(OperatorBuilder {
            constructor: Arc::new(|backend| {
                Map::stateful(MyState::new(backend), |event, state| {
            conf: Default::default(),
        .build();|epoch: u64, _: MyState<Sled>| {
        println!("Got state object for epoch {}", epoch);

    let kompact_system = KompactConfig::default().build().expect("KompactSystem");
    // Create Kompact component to receive snapshots
    let snapshot_comp = kompact_system.create(SnapshotComponent::new);

        .expect("Failed to start component");



Generating ArconType from .proto files

Down below is an example of the same Event struct as defined above. Note that the mandatory attributes are gathered through regular // comments.

syntax = "proto3";

package event;

// reliable_ser_id = 101
// unsafe_ser_id = 102
// version = 1
message Event {
  uint64 id = 1;

We then need to use the arcon_build crate to generate the Protobuf messages into Arcon supported data.

arcon_build = "ARCON_VERSION"

Add a file and change the paths if necessary.

fn main() -> Result<(), Box<dyn std::error::Error>> {
    arcon_build::compile_protos(&["src/event.proto"], &["src/"]).unwrap();

The build script will then generate the .rs files and you can include the generated data into an Arcon application by doing the following:

use arcon::prelude::*;

// bring Event into scope
include!(concat!(env!("OUT_DIR"), "/"));

fn main() {
    let mut pipeline = Pipeline::default()
                .map(|x| Event { id: x })
            |conf| {
                conf.set_timestamp_extractor(|x: &Event|;
        .operator(OperatorBuilder {
            constructor: Arc::new(|_| Filter::new(|event: &Event| > 50)),
            conf: Default::default(),