持久化数据到硬盘上并采用RocksDB作为存储引擎

电子说

1.3w人已加入

描述

在上一篇文章中,我们使用内存做数据的存储。在这一篇文章中,我们持久化数据到硬盘上,采用RocksDB作为存储引擎。同时也增加了一个service层,将命令解析和存储逻辑提取到这一层中。  

RocksDB 先在Cargo.toml文件中加入rocksdb依赖:

编解码

然后,在src/storage目录下创建rocksdb_storage.rs文件。代码如下:

 

 1#[derive(Debug)]
 2pub struct RocksDbStorage(DB);
 3
 4impl RocksDbStorage {
 5    pub fn new(path: impl AsRef) -> Self {
 6        Self(DB::open_default(path).unwrap())
 7    }
 8}
 9
10impl Storage for RocksDbStorage {
11    fn get(&self, key: &str) -> Result, Box> {
12        let v = self.0.get(key)?.unwrap();
13        Ok(Some(v.into()))
14    }
15
16    fn set(&self, key: &str, value: bytes::Bytes) -> Result, Box> {
17        self.0.put(key, value.clone()).unwrap();
18        Ok(Some(value))
19    }
20}
  Service层 创建src/service目录,然后创建mod.rs文件及cmd_service.rs文件。在mod.rs文件中加入如下代码:
1pub mod cmd_service;
2
3pub trait CmdService {
4    // 解析命令,返回Response
5    fn execute(self, store: &impl Storage) -> CmdResponse;
6}
在cmd_service.rs文件中为命令实现CmdService trait,代码如下:
 1use crate::{CmdResponse, CmdService, Get, Set};
 2
 3// 为 GET 实现 execute
 4impl CmdService for Get {
 5    fn execute(self, store: &impl crate::Storage) -> CmdResponse {
 6        // 从存储中获取数据,返回CmdResponse
 7        match store.get(&self.key) {
 8            Ok(Some(value)) => value.into(),
 9            Ok(None) => "Not found".into(),
10            Err(e) => e.into(),
11        }
12    }
13}
14
15// 为 SET 实现 execute
16impl CmdService for Set {
17    // 存储数据
18    fn execute(self, store: &impl crate::Storage) -> CmdResponse {
19        match store.set(&self.key, self.value) {
20            Ok(Some(value)) => value.into(),
21            Ok(None) => "Set fail".into(),
22            Err(e) => e.into(),
23        }
24    }
25}
在src/pb/mod.rs中实现从Bytes、&str、Box转换为CmdResponse:
 1impl From for CmdResponse {
 2    fn from(v: Bytes) -> Self {
 3        Self {
 4            status: 200u32,
 5            message: "success".to_string(),
 6            value: v,
 7        }
 8    }
 9}
10
11impl From<&str> for CmdResponse {
12    fn from(s: &str) -> Self {
13        Self {
14            status: 400u32,
15            message: s.to_string(),
16            ..Default::default()
17        }
18    }
19}
20
21impl From> for CmdResponse {
22    fn from(e: Box) -> Self {
23        Self {
24            status: 500u32,
25            message: e.to_string(),
26            ..Default::default()
27        }
28    }
29}
    然后在src/service/mod.rs中加入service代码:
 1// 设置默认存储为RocksDB
 2pub struct Service {
 3    store_svc: Arc>,
 4}
 5
 6// 在多线程中进行clone
 7pub struct StoreService {
 8    store: Store,
 9}
10
11impl Service {
12    pub fn new(store: Store) -> Self {
13        Self {
14            store_svc: Arc::new(StoreService { store }),
15        }
16    }
17
18    // 执行命令
19    pub async fn execute(&self, cmd_req: CmdRequest) -> CmdResponse {
20        println!("=== Execute Command Before ===");
21        let cmd_res = process_cmd(cmd_req, &self.store_svc.store).await;
22        println!("=== Execute Command After ===");
23        cmd_res
24    }
25}
26
27// 实现Clone trait
28impl Clone for Service {
29    fn clone(&self) -> Self {
30        Self {
31            store_svc: self.store_svc.clone(),
32        }
33    }
34}
35
36// 处理请求命令,返回Response
37async fn process_cmd(cmd_req: CmdRequest, store: &impl Storage) -> CmdResponse {
38    match cmd_req.req_data {
39        // 处理 GET 命令
40        Some(ReqData::Get(cmd_get)) => cmd_get.execute(store),
41        // 处理 SET 命令
42        Some(ReqData::Set(cmd_set)) => cmd_set.execute(store),
43        _ => "Invalid command".into(),
44    }
45}

配置 我们修改配置,在conf/server.conf中加入RocksDB路径
......[rocksdb_path]path = '/tmp/kvserver'
    在src/config.rs中加入如下代码:
 1// Server端配置
 2#[derive(Debug, Serialize, Deserialize)]
 3pub struct ServerConfig {
 4    ......
 5    pub rocksdb_path: RocksdbPath,
 6}
 7
 8......
 9
10// RocksDB存储目录
11#[derive(Debug, Serialize, Deserialize)]
12pub struct RocksdbPath {
13    pub path: String,
14}
修改kv_server 在kv_server.rs中使用service执行命令,删除process_cmd函数:
 1#[tokio::main]
 2async fn main() -> Result<(), Box> {
 3    ......
 4
 5    // 初始化Service及存储
 6    let service = Service::new(RocksDbStorage::new(rocksdb_path));
 7
 8    loop {
 9         ......
10        let svc = service.clone();
11
12        tokio::spawn(async move {
13            // 使用Frame的LengthDelimitedCodec进行编解码操作
14            let mut stream = Framed::new(stream, LengthDelimitedCodec::new());
15            while let Some(Ok(mut buf)) = stream.next().await {
16                ......
17
18                // 执行请求命令
19                let cmd_res = svc.execute(cmd_req).await;
20
21                ......
22            }
23            info!("Client {:?} disconnected", addr);
24        });
25    }
26}
 

 

审核编辑:刘清

打开APP阅读更多精彩内容
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉

全部0条评论

快来发表一下你的评论吧 !

×
20
完善资料,
赚取积分