A robust, low-latency IoT gateway bridging embedded devices with cloud infrastructure, featuring MQTT protocol handling, device management, and real-time data processing using Rust's embedded ecosystem.
This IoT gateway system demonstrates Rust's unique advantages for embedded and edge computing scenarios. Built to handle thousands of connected devices, it provides protocol translation, data aggregation, edge analytics, and secure communication channels. The system operates on resource-constrained hardware while maintaining enterprise-grade reliability, showcasing how Rust's memory safety and performance characteristics make it ideal for IoT infrastructure.
Aim:
Create a production-ready IoT gateway that bridges embedded devices
with cloud services while providing edge computing capabilities.
Objectives:
Core components demonstrating Rust's embedded and async capabilities.
use rumqttc::{AsyncClient, Event, Incoming, MqttOptions, QoS};
use tokio::sync::mpsc;
use std::time::Duration;
use anyhow::Result;
pub struct MqttHandler {
client: AsyncClient,
device_registry: Arc>,
}
impl MqttHandler {
pub async fn new(broker_url: &str) -> Result {
let mut mqttoptions = MqttOptions::new("rust-iot-gateway", broker_url, 1883);
mqttoptions
.set_keep_alive(Duration::from_secs(30))
.set_clean_session(false)
.set_max_packet_size(10_000, 10_000);
let (client, mut eventloop) = AsyncClient::new(mqttoptions, 100);
// Spawn event loop handler
tokio::spawn(async move {
loop {
match eventloop.poll().await {
Ok(Event::Incoming(Incoming::Publish(p))) => {
Self::handle_message(p).await;
}
Ok(Event::Incoming(Incoming::ConnAck(_))) => {
log::info!("Connected to MQTT broker");
}
Err(e) => {
log::error!("MQTT error: {:?}", e);
tokio::time::sleep(Duration::from_secs(5)).await;
}
_ => {}
}
}
});
Ok(Self {
client,
device_registry: Arc::new(Mutex::new(DeviceRegistry::new())),
})
}
pub async fn subscribe_device_topics(&self) -> Result<()> {
let topics = vec![
("devices/+/telemetry", QoS::AtLeastOnce),
("devices/+/status", QoS::AtMostOnce),
("devices/+/commands", QoS::ExactlyOnce),
];
for (topic, qos) in topics {
self.client.subscribe(topic, qos).await?;
log::info!("Subscribed to {}", topic);
}
Ok(())
}
async fn handle_message(publish: rumqttc::Publish) {
let topic_parts: Vec<&str> = publish.topic.split('/').collect();
if topic_parts.len() >= 3 {
let device_id = topic_parts[1];
let message_type = topic_parts[2];
match message_type {
"telemetry" => Self::process_telemetry(device_id, &publish.payload).await,
"status" => Self::process_status(device_id, &publish.payload).await,
_ => log::warn!("Unknown message type: {}", message_type),
}
}
}
}
use std::collections::HashMap;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Device {
pub id: String,
pub device_type: DeviceType,
pub firmware_version: String,
pub last_seen: DateTime,
pub status: DeviceStatus,
pub metadata: HashMap,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DeviceType {
Sensor { capabilities: Vec },
Actuator { commands: Vec },
Gateway,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DeviceStatus {
Online,
Offline,
Error(String),
Maintenance,
}
pub struct DeviceRegistry {
devices: HashMap,
subscriptions: HashMap>,
}
impl DeviceRegistry {
pub fn register_device(&mut self, device: Device) -> Result<(), RegistryError> {
if self.devices.contains_key(&device.id) {
return Err(RegistryError::DeviceExists(device.id.clone()));
}
log::info!("Registering device: {}", device.id);
self.devices.insert(device.id.clone(), device);
Ok(())
}
pub async fn update_device_status(
&mut self,
device_id: &str,
status: DeviceStatus,
) -> Result<(), RegistryError> {
match self.devices.get_mut(device_id) {
Some(device) => {
device.status = status;
device.last_seen = Utc::now();
Ok(())
}
None => Err(RegistryError::DeviceNotFound(device_id.to_string())),
}
}
pub fn get_active_devices(&self) -> Vec<&Device> {
self.devices
.values()
.filter(|d| matches!(d.status, DeviceStatus::Online))
.collect()
}
}
use std::collections::VecDeque;
use statistical::{mean, standard_deviation};
pub struct EdgeAnalytics {
window_size: usize,
data_windows: HashMap>,
anomaly_threshold: f64,
}
impl EdgeAnalytics {
pub fn new(window_size: usize, anomaly_threshold: f64) -> Self {
Self {
window_size,
data_windows: HashMap::new(),
anomaly_threshold,
}
}
pub fn process_sensor_data(
&mut self,
sensor_id: &str,
value: f64,
) -> AnalyticsResult {
let window = self.data_windows
.entry(sensor_id.to_string())
.or_insert_with(|| VecDeque::with_capacity(self.window_size));
// Add new value to window
if window.len() >= self.window_size {
window.pop_front();
}
window.push_back(value);
// Calculate statistics
let values: Vec = window.iter().copied().collect();
let avg = mean(&values);
let std_dev = standard_deviation(&values, Some(avg));
// Detect anomalies
let z_score = (value - avg).abs() / std_dev;
let is_anomaly = z_score > self.anomaly_threshold;
AnalyticsResult {
sensor_id: sensor_id.to_string(),
current_value: value,
moving_average: avg,
std_deviation: std_dev,
is_anomaly,
confidence: 1.0 - (z_score / 10.0).min(1.0),
}
}
pub async fn run_aggregation_pipeline(
&self,
data_points: Vec,
) -> AggregatedData {
// Parallel processing using Rayon
use rayon::prelude::*;
let aggregated = data_points
.par_iter()
.map(|dp| self.aggregate_single(dp))
.reduce(AggregatedData::default, |a, b| a.merge(b));
aggregated
}
}
use async_trait::async_trait;
#[async_trait]
pub trait ProtocolAdapter: Send + Sync {
async fn translate(&self, message: Message) -> Result;
fn supported_protocols(&self) -> Vec;
}
pub struct ProtocolBridge {
adapters: HashMap>,
}
impl ProtocolBridge {
pub async fn route_message(
&self,
source: Protocol,
target: Protocol,
message: Vec,
) -> Result> {
let source_adapter = self.adapters
.get(&source)
.ok_or(BridgeError::UnsupportedProtocol(source))?;
let target_adapter = self.adapters
.get(&target)
.ok_or(BridgeError::UnsupportedProtocol(target))?;
// Parse with source protocol
let parsed = source_adapter.parse(message).await?;
// Translate to target protocol
let translated = target_adapter.encode(parsed).await?;
Ok(translated)
}
}
// CoAP to MQTT adapter example
pub struct CoapToMqttAdapter;
#[async_trait]
impl ProtocolAdapter for CoapToMqttAdapter {
async fn translate(&self, message: Message) -> Result {
// Extract CoAP payload and options
let coap_msg = coap::decode(&message.payload)?;
// Map CoAP method to MQTT topic
let topic = match coap_msg.method {
Method::Get => format!("coap/{}/get", coap_msg.path),
Method::Post => format!("coap/{}/post", coap_msg.path),
_ => return Err(BridgeError::UnsupportedMethod),
};
Ok(TranslatedMessage {
topic,
payload: coap_msg.payload,
qos: QoS::AtLeastOnce,
})
}
}
Optimized for resource-constrained edge devices:
# .cargo/config.toml
[target.armv7-unknown-linux-gnueabihf]
linker = "arm-linux-gnueabihf-gcc"
[target.aarch64-unknown-linux-gnu]
linker = "aarch64-linux-gnu-gcc"
[profile.release]
opt-level = "z" # Optimize for size
lto = true # Link-time optimization
codegen-units = 1 # Single codegen unit for better optimization
strip = true # Strip symbols
panic = "abort" # Smaller panic handler
This project represents my passion for building robust IoT infrastructure that bridges embedded devices with cloud services. It showcases my ability to create production-grade gateways that handle thousands of connected devices efficiently. If you have questions or wish to collaborate on a project, please reach out via the Contact section.
Let's build something great together.
Best regards,
Damilare Lekan Adekeye