@@ -30,14 +30,94 @@ namespace curvefs {
30
30
namespace client {
31
31
namespace filesystem {
32
32
33
- DeferSync::DeferSync (DeferSyncOption option)
34
- : option_(option),
33
+ using ::curve::common::LockGuard;
34
+ using ::curve::common::ReadLockGuard;
35
+ using ::curve::common::WriteLockGuard;
36
+ using ::curvefs::client::filesystem::AttrCtime;
37
+
38
+ #define RETURN_FALSE_IF_CTO_ON () \
39
+ do { \
40
+ if (cto_) { \
41
+ return false ; \
42
+ } \
43
+ } while (0 )
44
+
45
+ DeferInodes::DeferInodes (bool cto)
46
+ : cto_(cto),
47
+ rwlock_ (),
48
+ inodes_() {}
49
+
50
+ bool DeferInodes::Add (const std::shared_ptr<InodeWrapper>& inode) {
51
+ RETURN_FALSE_IF_CTO_ON ();
52
+ WriteLockGuard lk (rwlock_);
53
+ Ino ino = inode->GetInodeId ();
54
+ auto ret = inodes_.emplace (ino, inode);
55
+ auto iter = ret.first ;
56
+ bool yes = ret.second ;
57
+ if (!yes) { // already exists
58
+ iter->second = inode;
59
+ }
60
+ return true ;
61
+ }
62
+
63
+ bool DeferInodes::Get (Ino ino, std::shared_ptr<InodeWrapper>* inode) {
64
+ RETURN_FALSE_IF_CTO_ON ();
65
+ ReadLockGuard lk (rwlock_);
66
+ auto iter = inodes_.find (ino);
67
+ if (iter == inodes_.end ()) {
68
+ return false ;
69
+ }
70
+ *inode = iter->second ;
71
+ return true ;
72
+ }
73
+
74
+ bool DeferInodes::Remove (const std::shared_ptr<InodeWrapper>& inode) {
75
+ RETURN_FALSE_IF_CTO_ON ();
76
+ WriteLockGuard lk (rwlock_);
77
+ InodeAttr attr;
78
+ inode->GetInodeAttrLocked (&attr);
79
+ auto iter = inodes_.find (attr.inodeid ());
80
+ if (iter == inodes_.end ()) {
81
+ return false ;
82
+ }
83
+
84
+ InodeAttr defered;
85
+ iter->second ->GetInodeAttrLocked (&defered);
86
+ if (AttrCtime (attr) < AttrCtime (defered)) {
87
+ // it means the old defered inode already replaced by the lastest one,
88
+ // so we can't remove it before it synced yet.
89
+ return false ;
90
+ }
91
+ inodes_.erase (iter);
92
+ return true ;
93
+ }
94
+
95
+ size_t DeferInodes::Size () {
96
+ ReadLockGuard lk (rwlock_);
97
+ return inodes_.size ();
98
+ }
99
+
100
+ SyncInodeClosure::SyncInodeClosure (const std::shared_ptr<DeferInodes>& inodes,
101
+ const std::shared_ptr<InodeWrapper>& inode)
102
+ : inodes_(inodes), inode_(inode) {}
103
+
104
+ void SyncInodeClosure::Run () {
105
+ std::unique_ptr<SyncInodeClosure> self_guard (this );
106
+ MetaStatusCode rc = GetStatusCode ();
107
+ if (rc == MetaStatusCode::OK || rc == MetaStatusCode::NOT_FOUND) {
108
+ inodes_->Remove (inode_);
109
+ }
110
+ }
111
+
112
+ DeferSync::DeferSync (bool cto, DeferSyncOption option)
113
+ : cto_(cto),
114
+ option_(option),
35
115
mutex_(),
36
116
running_(false ),
37
117
thread_(),
38
118
sleeper_(),
39
- inodes_() {
40
- }
119
+ pending_(),
120
+ inodes_(std::make_shared<DeferInodes>(cto)) { }
41
121
42
122
void DeferSync::Start () {
43
123
if (!running_.exchange (true )) {
@@ -55,20 +135,32 @@ void DeferSync::Stop() {
55
135
}
56
136
}
57
137
138
+ SyncInodeClosure* DeferSync::NewSyncInodeClosure (
139
+ const std::shared_ptr<InodeWrapper>& inode) {
140
+ // NOTE: we only store the defer inodes in nocto scenario,
141
+ // which means we don't need to remove the inode from defer inodes
142
+ // even if the inode already synced done in cto scenario.
143
+ if (cto_) {
144
+ return nullptr ;
145
+ }
146
+ return new SyncInodeClosure (inodes_, inode);
147
+ }
148
+
58
149
void DeferSync::SyncTask () {
59
- std::vector<std::shared_ptr<InodeWrapper>> inodes ;
150
+ std::vector<std::shared_ptr<InodeWrapper>> syncing ;
60
151
for ( ;; ) {
61
152
bool running = sleeper_.wait_for (std::chrono::seconds (option_.delay ));
62
153
63
154
{
64
155
LockGuard lk (mutex_);
65
- inodes .swap (inodes_ );
156
+ syncing .swap (pending_ );
66
157
}
67
- for (const auto & inode : inodes) {
158
+ for (const auto & inode : syncing) {
159
+ auto closure = NewSyncInodeClosure (inode);
68
160
UniqueLock lk (inode->GetUniqueLock ());
69
- inode->Async (nullptr , true );
161
+ inode->Async (closure , true );
70
162
}
71
- inodes .clear ();
163
+ syncing .clear ();
72
164
73
165
if (!running) {
74
166
break ;
@@ -78,18 +170,12 @@ void DeferSync::SyncTask() {
78
170
79
171
void DeferSync::Push (const std::shared_ptr<InodeWrapper>& inode) {
80
172
LockGuard lk (mutex_);
81
- inodes_.emplace_back (inode);
173
+ pending_.emplace_back (inode);
174
+ inodes_->Add (inode);
82
175
}
83
176
84
- bool DeferSync::IsDefered (Ino ino, InodeAttr* attr) {
85
- LockGuard lk (mutex_);
86
- for (const auto & inode : inodes_) {
87
- if (inode->GetInodeId () == ino) {
88
- inode->GetInodeAttr (attr);
89
- return true ;
90
- }
91
- }
92
- return false ;
177
+ bool DeferSync::IsDefered (Ino ino, std::shared_ptr<InodeWrapper>* inode) {
178
+ return inodes_->Get (ino, inode);
93
179
}
94
180
95
181
} // namespace filesystem
0 commit comments