GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/fetch.h
Date: 2024-04-28 02:33:07
Exec Total Coverage
Lines: 22 29 75.9%
Branches: 3 14 21.4%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_FETCH_H_
6 #define CVMFS_FETCH_H_
7
8 #include <pthread.h>
9
10 #include <map>
11 #include <string>
12 #include <vector>
13
14 #include "cache.h"
15 #include "crypto/hash.h"
16 #include "gtest/gtest_prod.h"
17 #include "network/download.h"
18 #include "network/sink.h"
19
20 class BackoffThrottle;
21
22 namespace perf {
23 class Statistics;
24 }
25
26 namespace cvmfs {
27
28 /**
29 * TransacionSink uses an open transaction in a cache manager as a sink. It
30 * allows the download manager to write data without knowing about the cache
31 * manager.
32 */
33 class TransactionSink : public Sink {
34 public:
35 41 TransactionSink(CacheManager *cache_mgr, void *open_txn)
36 41 : Sink(false),
37 41 cache_mgr_(cache_mgr),
38 41 open_txn_(open_txn) { }
39 82 virtual ~TransactionSink() { }
40
41 /**
42 * Appends data to the sink
43 *
44 * @returns on success: number of bytes written (can be less than requested)
45 * on failure: -errno.
46 */
47 63 virtual int64_t Write(const void *buf, uint64_t sz) {
48 63 return cache_mgr_->Write(buf, sz, open_txn_);
49 }
50
51 /**
52 * Truncate all written data and start over at position zero.
53 *
54 * @returns Success = 0
55 * Failure = -errno
56 */
57 6 virtual int Reset() {
58 6 return cache_mgr_->Reset(open_txn_);
59 }
60
61 /**
62 * Purges all resources leaving the sink in an invalid state.
63 * More aggressive version of Reset().
64 * For some sinks it might do the same as Reset().
65 *
66 * @returns Success = 0
67 * Failure = -errno
68 */
69 5 virtual int Purge() {
70 5 return Reset();
71 }
72 /**
73 * @returns true if the object is correctly initialized.
74 */
75 41 virtual bool IsValid() {
76
2/4
✓ Branch 0 taken 41 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 41 times.
✗ Branch 3 not taken.
41 return cache_mgr_ != NULL && open_txn_ != NULL;
77 }
78
79 41 virtual int Flush() { return 0; }
80 virtual bool Reserve(size_t /*size*/) { return true; }
81 155 virtual bool RequiresReserve() { return false; }
82
83 /**
84 * Return a string representation describing the type of sink and its status
85 */
86 virtual std::string Describe() {
87 std::string result = "Transaction sink that is ";
88 result += IsValid() ? "valid" : "invalid";
89 return result;
90 }
91
92 private:
93 CacheManager *cache_mgr_;
94 void *open_txn_;
95 };
96
97
98 /**
99 * The Fetcher uses a cache manager and a download manager in order to provide a
100 * (virtual) file descriptor to a requested object, which is valid in the
101 * context of the cache manager.
102 * If the object is not in the cache, it is downloaded and stored in the cache.
103 *
104 * Concurrent download requests for the same id are collapsed.
105 */
106 class Fetcher : SingleCopy {
107 FRIEND_TEST(T_Fetcher, GetTls);
108 FRIEND_TEST(T_Fetcher, SignalWaitingThreads);
109 friend void *TestGetTls(void *data);
110 friend void *TestFetchCollapse(void *data);
111 friend void *TestFetchCollapse2(void *data);
112 friend void TLSDestructor(void *data);
113
114 public:
115 Fetcher(CacheManager *cache_mgr,
116 download::DownloadManager *download_mgr,
117 BackoffThrottle *backoff_throttle,
118 perf::StatisticsTemplate statistics);
119 ~Fetcher();
120
121 int Fetch(const CacheManager::LabeledObject &object,
122 const std::string &alt_url = "");
123
124 void ReplaceCacheManager(CacheManager *new_cache_mgr) {
125 cache_mgr_ = new_cache_mgr;
126 }
127 94 CacheManager *cache_mgr() { return cache_mgr_; }
128 20 download::DownloadManager *download_mgr() { return download_mgr_; }
129
130 private:
131 /**
132 * Multiple threads might want to download the same object at the same time.
133 * If that happens, only the first thread performs the download. The other
134 * threads wait on a pipe for a notification from the first thread.
135 */
136 struct ThreadLocalStorage {
137
1/2
✓ Branch 2 taken 31 times.
✗ Branch 3 not taken.
31 ThreadLocalStorage() {
138 31 pipe_wait[0] = -1;
139 31 pipe_wait[1] = -1;
140 31 fetcher = NULL;
141 31 }
142
143 /**
144 * Used during cleanup to find tls_blocks_.
145 */
146 Fetcher *fetcher;
147 /**
148 * Wait on the reading end if another thread is already downloading the same
149 * object.
150 */
151 int pipe_wait[2];
152 /**
153 * Writer ends of all the pipes of threads that want to download the same
154 * object.
155 */
156 std::vector<int> other_pipes_waiting;
157 /**
158 * It is sufficient to construct the JobInfo object once per thread, not
159 * on every call to Fetch().
160 */
161 download::JobInfo download_job;
162 };
163
164 /**
165 * Maps currently downloaded chunks to the other_pipes_waiting member of the
166 * thread local storage of the downloading thread. This way, a thread can
167 * enqueue itself to such an other_pipes_waiting list and gets informed when
168 * the download is completed.
169 */
170 typedef std::map< shash::Any, std::vector<int> * > ThreadQueues;
171
172 ThreadLocalStorage *GetTls();
173 void CleanupTls(ThreadLocalStorage *tls);
174 void SignalWaitingThreads(const int fd, const shash::Any &id,
175 ThreadLocalStorage *tls);
176 int OpenSelect(const CacheManager::LabeledObject &object);
177
178 /**
179 * Key to the thread's ThreadLocalStorage memory
180 */
181 pthread_key_t thread_local_storage_;
182
183 ThreadQueues queues_download_;
184 pthread_mutex_t *lock_queues_download_;
185
186 /**
187 * All the threads register their thread local storage here, so that it can
188 * be cleaned up properly in the destructor of Fetcher.
189 */
190 std::vector<ThreadLocalStorage *> tls_blocks_;
191 pthread_mutex_t *lock_tls_blocks_;
192
193 CacheManager *cache_mgr_;
194 download::DownloadManager *download_mgr_;
195 BackoffThrottle *backoff_throttle_;
196 perf::Counter *n_downloads;
197 perf::Counter *n_invocations;
198 };
199
200 } // namespace cvmfs
201
202 #endif // CVMFS_FETCH_H_
203