Directory: | cvmfs/ |
---|---|
File: | cvmfs/fetch.cc |
Date: | 2025-06-22 02:36:02 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 150 | 152 | 98.7% |
Branches: | 101 | 173 | 58.4% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | |||
6 | #include "fetch.h" | ||
7 | |||
8 | #include <unistd.h> | ||
9 | |||
10 | #include "backoff.h" | ||
11 | #include "cache.h" | ||
12 | #include "clientctx.h" | ||
13 | #include "interrupt.h" | ||
14 | #include "network/download.h" | ||
15 | #include "quota.h" | ||
16 | #include "statistics.h" | ||
17 | #include "util/concurrency.h" | ||
18 | #include "util/logging.h" | ||
19 | #include "util/posix.h" | ||
20 | |||
21 | using namespace std; // NOLINT | ||
22 | |||
23 | namespace cvmfs { | ||
24 | |||
25 | 2 | void TLSDestructor(void *data) { | |
26 | 2 | Fetcher::ThreadLocalStorage *tls = static_cast<Fetcher::ThreadLocalStorage *>( | |
27 | data); | ||
28 | 2 | std::vector<Fetcher::ThreadLocalStorage *> *tls_blocks = &tls->fetcher | |
29 | ->tls_blocks_; | ||
30 | |||
31 | { | ||
32 | 2 | const MutexLockGuard m(tls->fetcher->lock_tls_blocks_); | |
33 | 2 | for (vector<Fetcher::ThreadLocalStorage *>::iterator | |
34 | 2 | i = tls_blocks->begin(), | |
35 | 2 | iEnd = tls_blocks->end(); | |
36 |
1/2✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
|
4 | i != iEnd; |
37 | 2 | ++i) { | |
38 |
2/2✓ Branch 1 taken 2 times.
✓ Branch 2 taken 2 times.
|
4 | if ((*i) == tls) { |
39 |
1/2✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
|
2 | tls_blocks->erase(i); |
40 | 2 | break; | |
41 | } | ||
42 | } | ||
43 | 2 | } | |
44 | 2 | tls->fetcher->CleanupTls(tls); | |
45 | 2 | } | |
46 | |||
47 | |||
48 | /** | ||
49 | * Called when a thread exists, releases a ThreadLocalStorage object and | ||
50 | * removes the pointer to it from tls_blocks_. | ||
51 | */ | ||
52 | 887 | void Fetcher::CleanupTls(ThreadLocalStorage *tls) { | |
53 | 887 | ClosePipe(tls->pipe_wait); | |
54 |
1/2✓ Branch 0 taken 887 times.
✗ Branch 1 not taken.
|
887 | delete tls; |
55 | 887 | } | |
56 | |||
57 | |||
58 | /** | ||
59 | * Initialized thread-local storage if called the first time in a new thread. | ||
60 | */ | ||
61 | 1241 | Fetcher::ThreadLocalStorage *Fetcher::GetTls() { | |
62 | ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>( | ||
63 | 1241 | pthread_getspecific(thread_local_storage_)); | |
64 |
2/2✓ Branch 0 taken 354 times.
✓ Branch 1 taken 887 times.
|
1241 | if (tls != NULL) |
65 | 354 | return tls; | |
66 | |||
67 |
2/4✓ Branch 1 taken 887 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 887 times.
✗ Branch 5 not taken.
|
887 | tls = new ThreadLocalStorage(); |
68 | 887 | tls->fetcher = this; | |
69 |
1/2✓ Branch 1 taken 887 times.
✗ Branch 2 not taken.
|
887 | MakePipe(tls->pipe_wait); |
70 | 887 | tls->download_job.SetCompressed(true); | |
71 | 887 | tls->download_job.SetProbeHosts(true); | |
72 | 887 | const int retval = pthread_setspecific(thread_local_storage_, tls); | |
73 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 887 times.
|
887 | assert(retval == 0); |
74 | |||
75 | 887 | const MutexLockGuard m(lock_tls_blocks_); | |
76 |
1/2✓ Branch 1 taken 887 times.
✗ Branch 2 not taken.
|
887 | tls_blocks_.push_back(tls); |
77 | |||
78 | 887 | return tls; | |
79 | 887 | } | |
80 | |||
81 | |||
82 | 2013 | int Fetcher::Fetch(const CacheManager::LabeledObject &object, | |
83 | const std::string &alt_url) { | ||
84 | int fd_return; // Read-only file descriptor that is returned | ||
85 | int retval; | ||
86 | |||
87 | 2013 | perf::Inc(n_invocations); | |
88 | |||
89 | // Try to open from local cache | ||
90 |
3/4✓ Branch 1 taken 2013 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 782 times.
✓ Branch 4 taken 1231 times.
|
2013 | if ((fd_return = OpenSelect(object)) >= 0) { |
91 |
1/2✓ Branch 2 taken 782 times.
✗ Branch 3 not taken.
|
782 | LogCvmfs(kLogCache, kLogDebug, "hit: %s", object.label.path.c_str()); |
92 | 782 | return fd_return; | |
93 | } | ||
94 | |||
95 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 1231 times.
|
1231 | if (object.id.IsNull()) { |
96 | // This has been seen when trying to load the root catalog signed by an | ||
97 | // invalid certificate on an empty cache | ||
98 | // TODO(jblomer): check if still necessary after the catalog reload refactor | ||
99 | ✗ | LogCvmfs(kLogCache, kLogDebug, "cancel attempt to download null hash"); | |
100 | ✗ | return -EIO; | |
101 | } | ||
102 | |||
103 |
1/2✓ Branch 1 taken 1231 times.
✗ Branch 2 not taken.
|
1231 | ThreadLocalStorage *tls = GetTls(); |
104 | |||
105 | // Synchronization point: either act as a master thread for this object or | ||
106 | // enqueue to the list of waiting threads. | ||
107 | 1231 | pthread_mutex_lock(lock_queues_download_); | |
108 | const ThreadQueues::iterator iDownloadQueue = | ||
109 |
1/2✓ Branch 1 taken 1231 times.
✗ Branch 2 not taken.
|
1231 | queues_download_.find(object.id); |
110 |
2/2✓ Branch 2 taken 1 times.
✓ Branch 3 taken 1230 times.
|
1231 | if (iDownloadQueue != queues_download_.end()) { |
111 |
1/2✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
|
1 | LogCvmfs(kLogCache, kLogDebug, "waiting for download of %s", |
112 | object.label.path.c_str()); | ||
113 | |||
114 |
1/2✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
|
1 | iDownloadQueue->second->push_back(tls->pipe_wait[1]); |
115 | 1 | pthread_mutex_unlock(lock_queues_download_); | |
116 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
1 | ReadPipe(tls->pipe_wait[0], &fd_return, sizeof(int)); |
117 | |||
118 |
1/2✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
|
1 | LogCvmfs(kLogCache, kLogDebug, "received from another thread fd %d for %s", |
119 | fd_return, object.label.path.c_str()); | ||
120 | 1 | return fd_return; | |
121 | } else { | ||
122 | // Seems we are the first one, check again in the cache (race condition) | ||
123 |
1/2✓ Branch 1 taken 1230 times.
✗ Branch 2 not taken.
|
1230 | fd_return = OpenSelect(object); |
124 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1229 times.
|
1230 | if (fd_return >= 0) { |
125 | 1 | pthread_mutex_unlock(lock_queues_download_); | |
126 | 1 | return fd_return; | |
127 | } | ||
128 | |||
129 | // Create a new queue for this chunk | ||
130 |
1/2✓ Branch 1 taken 1229 times.
✗ Branch 2 not taken.
|
1229 | queues_download_[object.id] = &tls->other_pipes_waiting; |
131 | 1229 | pthread_mutex_unlock(lock_queues_download_); | |
132 | } | ||
133 | |||
134 | 1229 | perf::Inc(n_downloads); | |
135 | |||
136 | // Involve the download manager | ||
137 |
1/2✓ Branch 2 taken 1229 times.
✗ Branch 3 not taken.
|
1229 | LogCvmfs(kLogCache, kLogDebug, "downloading %s", object.label.path.c_str()); |
138 | 1229 | std::string url; | |
139 |
2/2✓ Branch 1 taken 3 times.
✓ Branch 2 taken 1226 times.
|
1229 | if (object.label.IsExternal()) { |
140 |
2/4✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 3 times.
✗ Branch 5 not taken.
|
3 | url = !alt_url.empty() ? alt_url : object.label.path; |
141 | } else { | ||
142 |
8/14✓ Branch 1 taken 1 times.
✓ Branch 2 taken 1225 times.
✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 1225 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 1225 times.
✗ Branch 11 not taken.
✓ Branch 13 taken 1226 times.
✗ Branch 14 not taken.
✓ Branch 18 taken 1225 times.
✓ Branch 19 taken 1 times.
✗ Branch 22 not taken.
✗ Branch 23 not taken.
|
1226 | url = "/" + (alt_url.size() ? alt_url : "data/" + object.id.MakePath()); |
143 | } | ||
144 |
1/2✓ Branch 1 taken 1229 times.
✗ Branch 2 not taken.
|
1229 | void *txn = alloca(cache_mgr_->SizeOfTxn()); |
145 |
1/2✓ Branch 1 taken 1229 times.
✗ Branch 2 not taken.
|
1229 | retval = cache_mgr_->StartTxn(object.id, object.label.size, txn); |
146 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1228 times.
|
1229 | if (retval < 0) { |
147 |
1/2✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
|
1 | LogCvmfs(kLogCache, kLogDebug, "could not start transaction on %s", |
148 | object.label.path.c_str()); | ||
149 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
1 | SignalWaitingThreads(retval, object.id, tls); |
150 | 1 | return retval; | |
151 | } | ||
152 |
1/2✓ Branch 1 taken 1228 times.
✗ Branch 2 not taken.
|
1228 | cache_mgr_->CtrlTxn(object.label, 0, txn); |
153 | |||
154 |
1/2✓ Branch 3 taken 1228 times.
✗ Branch 4 not taken.
|
1228 | LogCvmfs(kLogCache, kLogDebug, "miss: %s %s", object.label.path.c_str(), |
155 | url.c_str()); | ||
156 | 1228 | TransactionSink sink(cache_mgr_, txn); | |
157 | 1228 | tls->download_job.SetUrl(&url); | |
158 | 1228 | tls->download_job.SetSink(&sink); | |
159 | 1228 | tls->download_job.SetExpectedHash(&object.id); | |
160 | 1228 | tls->download_job.SetExtraInfo(&object.label.path); | |
161 |
1/2✓ Branch 1 taken 1228 times.
✗ Branch 2 not taken.
|
1228 | ClientCtx *ctx = ClientCtx::GetInstance(); |
162 |
3/4✓ Branch 1 taken 1228 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 194 times.
✓ Branch 4 taken 1034 times.
|
1228 | if (ctx->IsSet()) { |
163 |
1/2✓ Branch 5 taken 194 times.
✗ Branch 6 not taken.
|
194 | ctx->Get(tls->download_job.GetUidPtr(), |
164 | tls->download_job.GetGidPtr(), | ||
165 | tls->download_job.GetPidPtr(), | ||
166 | tls->download_job.GetInterruptCuePtr()); | ||
167 | } | ||
168 | 1228 | tls->download_job.SetCompressed(object.label.zip_algorithm | |
169 | == zlib::kZlibDefault); | ||
170 | 1228 | tls->download_job.SetRangeOffset(object.label.range_offset); | |
171 | 1228 | tls->download_job.SetRangeSize(static_cast<int64_t>(object.label.size)); | |
172 |
1/2✓ Branch 1 taken 1228 times.
✗ Branch 2 not taken.
|
1228 | download_mgr_->Fetch(&tls->download_job); |
173 | |||
174 |
2/2✓ Branch 1 taken 1176 times.
✓ Branch 2 taken 52 times.
|
1228 | if (tls->download_job.error_code() == download::kFailOk) { |
175 |
1/2✓ Branch 2 taken 1176 times.
✗ Branch 3 not taken.
|
1176 | LogCvmfs(kLogCache, kLogDebug, "finished downloading of %s", url.c_str()); |
176 | |||
177 |
1/2✓ Branch 1 taken 1176 times.
✗ Branch 2 not taken.
|
1176 | fd_return = cache_mgr_->OpenFromTxn(txn); |
178 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1175 times.
|
1176 | if (fd_return < 0) { |
179 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
1 | cache_mgr_->AbortTxn(txn); |
180 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
1 | SignalWaitingThreads(fd_return, object.id, tls); |
181 | 1 | return fd_return; | |
182 | } | ||
183 | |||
184 |
1/2✓ Branch 1 taken 1175 times.
✗ Branch 2 not taken.
|
1175 | retval = cache_mgr_->CommitTxn(txn); |
185 |
2/2✓ Branch 0 taken 2 times.
✓ Branch 1 taken 1173 times.
|
1175 | if (retval < 0) { |
186 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | cache_mgr_->Close(fd_return); |
187 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | SignalWaitingThreads(retval, object.id, tls); |
188 | 2 | return retval; | |
189 | } | ||
190 |
1/2✓ Branch 1 taken 1173 times.
✗ Branch 2 not taken.
|
1173 | SignalWaitingThreads(fd_return, object.id, tls); |
191 | 1173 | return fd_return; | |
192 | } | ||
193 | |||
194 | // Download failed | ||
195 |
1/5✗ Branch 4 not taken.
✓ Branch 5 taken 52 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✗ Branch 10 not taken.
|
104 | LogCvmfs(kLogCache, kLogDebug | kLogSyslogErr, |
196 | "failed to fetch %s (hash: %s, error %d [%s])", | ||
197 |
1/2✓ Branch 1 taken 52 times.
✗ Branch 2 not taken.
|
104 | object.label.path.c_str(), object.id.ToString().c_str(), |
198 | 52 | tls->download_job.error_code(), | |
199 | download::Code2Ascii(tls->download_job.error_code())); | ||
200 |
1/2✓ Branch 1 taken 52 times.
✗ Branch 2 not taken.
|
52 | cache_mgr_->AbortTxn(txn); |
201 |
1/2✓ Branch 1 taken 52 times.
✗ Branch 2 not taken.
|
52 | backoff_throttle_->Throttle(); |
202 |
1/2✓ Branch 1 taken 52 times.
✗ Branch 2 not taken.
|
52 | SignalWaitingThreads(-EIO, object.id, tls); |
203 | 52 | return -EIO; | |
204 | 1229 | } | |
205 | |||
206 | |||
207 | 3038 | Fetcher::Fetcher(CacheManager *cache_mgr, | |
208 | download::DownloadManager *download_mgr, | ||
209 | BackoffThrottle *backoff_throttle, | ||
210 | 3038 | perf::StatisticsTemplate statistics) | |
211 | 3038 | : lock_queues_download_(NULL) | |
212 | 3038 | , lock_tls_blocks_(NULL) | |
213 | 3038 | , cache_mgr_(cache_mgr) | |
214 | 3038 | , download_mgr_(download_mgr) | |
215 | 3038 | , backoff_throttle_(backoff_throttle) { | |
216 | int retval; | ||
217 | 3038 | retval = pthread_key_create(&thread_local_storage_, TLSDestructor); | |
218 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3038 times.
|
3038 | assert(retval == 0); |
219 | 3038 | lock_queues_download_ = reinterpret_cast<pthread_mutex_t *>( | |
220 | 3038 | smalloc(sizeof(pthread_mutex_t))); | |
221 | 3038 | retval = pthread_mutex_init(lock_queues_download_, NULL); | |
222 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3038 times.
|
3038 | assert(retval == 0); |
223 | 3038 | lock_tls_blocks_ = reinterpret_cast<pthread_mutex_t *>( | |
224 | 3038 | smalloc(sizeof(pthread_mutex_t))); | |
225 | 3038 | retval = pthread_mutex_init(lock_tls_blocks_, NULL); | |
226 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3038 times.
|
3038 | assert(retval == 0); |
227 |
3/6✓ Branch 2 taken 3038 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 3038 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 3038 times.
✗ Branch 10 not taken.
|
3038 | n_downloads = statistics.RegisterTemplated( |
228 | "n_downloads", | ||
229 | "overall number of downloaded files (incl. catalogs, chunks)"); | ||
230 |
3/6✓ Branch 2 taken 3038 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 3038 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 3038 times.
✗ Branch 10 not taken.
|
3038 | n_invocations = statistics.RegisterTemplated( |
231 | "n_invocations", | ||
232 | "overall number of object requests (incl. catalogs, chunks)"); | ||
233 | 3038 | } | |
234 | |||
235 | |||
236 | 3038 | Fetcher::~Fetcher() { | |
237 | int retval; | ||
238 | |||
239 | { | ||
240 | 3038 | const MutexLockGuard m(lock_tls_blocks_); | |
241 |
2/2✓ Branch 1 taken 885 times.
✓ Branch 2 taken 3038 times.
|
3923 | for (unsigned i = 0; i < tls_blocks_.size(); ++i) |
242 | 885 | CleanupTls(tls_blocks_[i]); | |
243 | 3038 | } | |
244 | |||
245 | 3038 | retval = pthread_mutex_destroy(lock_tls_blocks_); | |
246 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3038 times.
|
3038 | assert(retval == 0); |
247 | 3038 | free(lock_tls_blocks_); | |
248 | |||
249 | 3038 | retval = pthread_mutex_destroy(lock_queues_download_); | |
250 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3038 times.
|
3038 | assert(retval == 0); |
251 | 3038 | free(lock_queues_download_); | |
252 | |||
253 | 3038 | retval = pthread_key_delete(thread_local_storage_); | |
254 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3038 times.
|
3038 | assert(retval == 0); |
255 | 3038 | } | |
256 | |||
257 | |||
258 | /** | ||
259 | * Depending on the object type, uses either Open() or OpenPinned() from the | ||
260 | * cache manager | ||
261 | */ | ||
262 | 3243 | int Fetcher::OpenSelect(const CacheManager::LabeledObject &object) { | |
263 |
5/6✓ Branch 1 taken 268 times.
✓ Branch 2 taken 2975 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 268 times.
✓ Branch 6 taken 2975 times.
✓ Branch 7 taken 268 times.
|
3243 | if (object.label.IsCatalog() || object.label.IsPinned()) { |
264 | 2975 | return cache_mgr_->OpenPinned(object); | |
265 | } else { | ||
266 | 268 | return cache_mgr_->Open(object); | |
267 | } | ||
268 | } | ||
269 | |||
270 | |||
271 | 1232 | void Fetcher::SignalWaitingThreads(const int fd, | |
272 | const shash::Any &id, | ||
273 | ThreadLocalStorage *tls) { | ||
274 | 1232 | const MutexLockGuard m(lock_queues_download_); | |
275 |
2/2✓ Branch 1 taken 4 times.
✓ Branch 2 taken 1232 times.
|
1236 | for (unsigned i = 0, s = tls->other_pipes_waiting.size(); i < s; ++i) { |
276 |
3/4✓ Branch 0 taken 3 times.
✓ Branch 1 taken 1 times.
✓ Branch 3 taken 3 times.
✗ Branch 4 not taken.
|
4 | int fd_dup = (fd >= 0) ? cache_mgr_->Dup(fd) : fd; |
277 |
1/2✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
|
4 | WritePipe(tls->other_pipes_waiting[i], &fd_dup, sizeof(int)); |
278 | } | ||
279 | 1232 | tls->other_pipes_waiting.clear(); | |
280 |
1/2✓ Branch 1 taken 1232 times.
✗ Branch 2 not taken.
|
1232 | queues_download_.erase(id); |
281 | 1232 | } | |
282 | |||
283 | } // namespace cvmfs | ||
284 |