@@ -61,6 +61,7 @@ rocksdb_wrapper::rocksdb_wrapper(pegasus_server_impl *server)
61
61
: replica_base(server),
62
62
_db (server->_db),
63
63
_rd_opts(server->_data_cf_rd_opts),
64
+ _data_cf(server->_data_cf),
64
65
_meta_cf(server->_meta_cf),
65
66
_pegasus_data_version(server->_pegasus_data_version),
66
67
METRIC_VAR_INIT_replica(read_expired_values),
@@ -78,7 +79,8 @@ int rocksdb_wrapper::get(std::string_view raw_key, /*out*/ db_get_context *ctx)
78
79
{
79
80
FAIL_POINT_INJECT_F (" db_get" , [](std::string_view) -> int { return FAIL_DB_GET; });
80
81
81
- rocksdb::Status s = _db->Get (_rd_opts, utils::to_rocksdb_slice (raw_key), &(ctx->raw_value ));
82
+ rocksdb::Status s =
83
+ _db->Get (_rd_opts, _data_cf, utils::to_rocksdb_slice (raw_key), &ctx->raw_value );
82
84
if (dsn_likely (s.ok ())) {
83
85
// success
84
86
ctx->found = true ;
@@ -94,7 +96,8 @@ int rocksdb_wrapper::get(std::string_view raw_key, /*out*/ db_get_context *ctx)
94
96
return rocksdb::Status::kOk ;
95
97
}
96
98
97
- dsn::blob hash_key, sort_key;
99
+ dsn::blob hash_key;
100
+ dsn::blob sort_key;
98
101
pegasus_restore_key (dsn::blob (raw_key.data (), 0 , raw_key.size ()), hash_key, sort_key);
99
102
LOG_ERROR_ROCKSDB (" Get" ,
100
103
s.ToString (),
@@ -152,9 +155,10 @@ int rocksdb_wrapper::write_batch_put_ctx(const db_write_context &ctx,
152
155
rocksdb::SliceParts skey_parts (&skey, 1 );
153
156
rocksdb::SliceParts svalue = _value_generator->generate_value (
154
157
_pegasus_data_version, value, db_expire_ts (expire_sec), new_timetag);
155
- rocksdb::Status s = _write_batch->Put (skey_parts, svalue);
158
+ rocksdb::Status s = _write_batch->Put (_data_cf, skey_parts, svalue);
156
159
if (dsn_unlikely (!s.ok ())) {
157
- ::dsn::blob hash_key, sort_key;
160
+ dsn::blob hash_key;
161
+ dsn::blob sort_key;
158
162
pegasus_restore_key (::dsn::blob (raw_key.data (), 0 , raw_key.size ()), hash_key, sort_key);
159
163
LOG_ERROR_ROCKSDB (" WriteBatchPut" ,
160
164
s.ToString (),
@@ -199,9 +203,10 @@ int rocksdb_wrapper::write_batch_delete(int64_t decree, std::string_view raw_key
199
203
FAIL_POINT_INJECT_F (" db_write_batch_delete" ,
200
204
[](std::string_view) -> int { return FAIL_DB_WRITE_BATCH_DELETE; });
201
205
202
- rocksdb::Status s = _write_batch->Delete (utils::to_rocksdb_slice (raw_key));
206
+ rocksdb::Status s = _write_batch->Delete (_data_cf, utils::to_rocksdb_slice (raw_key));
203
207
if (dsn_unlikely (!s.ok ())) {
204
- dsn::blob hash_key, sort_key;
208
+ dsn::blob hash_key;
209
+ dsn::blob sort_key;
205
210
pegasus_restore_key (dsn::blob (raw_key.data (), 0 , raw_key.size ()), hash_key, sort_key);
206
211
LOG_ERROR_ROCKSDB (" write_batch_delete" ,
207
212
s.ToString (),
@@ -223,7 +228,7 @@ int rocksdb_wrapper::ingest_files(int64_t decree,
223
228
ifo.move_files = true ;
224
229
ifo.ingest_behind = ingest_behind;
225
230
ifo.write_global_seqno = FLAGS_rocksdb_write_global_seqno;
226
- rocksdb::Status s = _db->IngestExternalFile (sst_file_list, ifo);
231
+ rocksdb::Status s = _db->IngestExternalFile (_data_cf, sst_file_list, ifo);
227
232
if (dsn_unlikely (!s.ok ())) {
228
233
LOG_ERROR_ROCKSDB (" IngestExternalFile" ,
229
234
s.ToString (),
0 commit comments