1 |
|
|
/** |
2 |
|
|
* This file is part of the CernVM File System. |
3 |
|
|
*/ |
4 |
|
|
|
5 |
|
|
#ifndef CVMFS_UPLOAD_FACILITY_H_ |
6 |
|
|
#define CVMFS_UPLOAD_FACILITY_H_ |
7 |
|
|
|
8 |
|
|
#include <fcntl.h> |
9 |
|
|
#include <stdint.h> |
10 |
|
|
|
11 |
|
|
#include <string> |
12 |
|
|
|
13 |
|
|
#include "atomic.h" |
14 |
|
|
#include "ingestion/task.h" |
15 |
|
|
#include "ingestion/tube.h" |
16 |
|
|
#include "repository_tag.h" |
17 |
|
|
#include "statistics.h" |
18 |
|
|
#include "upload_spooler_definition.h" |
19 |
|
|
#include "util/posix.h" |
20 |
|
|
#include "util_concurrency.h" |
21 |
|
|
|
22 |
|
|
namespace upload { |
23 |
|
|
|
24 |
|
|
struct UploadCounters { |
25 |
|
|
perf::Counter *n_duplicated_files; |
26 |
|
|
perf::Counter *sz_uploaded_bytes; |
27 |
|
|
|
28 |
|
|
explicit UploadCounters(perf::StatisticsTemplate statistics) { |
29 |
|
|
n_duplicated_files = statistics.RegisterTemplated("n_duplicated_files", |
30 |
|
|
"Number of duplicated files added"); |
31 |
|
|
sz_uploaded_bytes = statistics.RegisterTemplated("sz_uploaded_bytes", |
32 |
|
|
"Number of uploaded bytes"); |
33 |
|
|
} |
34 |
|
|
}; // UploadCounters |
35 |
|
|
|
36 |
|
664559 |
struct UploaderResults { |
37 |
|
|
enum Type { kFileUpload, kBufferUpload, kChunkCommit, kRemove }; |
38 |
|
|
|
39 |
|
5140 |
UploaderResults(const int return_code, const std::string &local_path) |
40 |
|
|
: type(kFileUpload), |
41 |
|
|
return_code(return_code), |
42 |
|
5140 |
local_path(local_path) {} |
43 |
|
|
|
44 |
|
595840 |
explicit UploaderResults(Type t, const int return_code) |
45 |
|
|
: type(t), |
46 |
|
|
return_code(return_code), |
47 |
|
595840 |
local_path("") {} |
48 |
|
|
|
49 |
|
220 |
UploaderResults() |
50 |
|
|
: type(kRemove) |
51 |
|
220 |
, return_code(0) |
52 |
|
220 |
{ } |
53 |
|
|
|
54 |
|
|
const Type type; |
55 |
|
|
const int return_code; |
56 |
|
|
const std::string local_path; |
57 |
|
|
}; |
58 |
|
|
|
59 |
|
|
struct UploadStreamHandle; |
60 |
|
|
|
61 |
|
|
/** |
62 |
|
|
* Abstract base class for all backend upload facilities |
63 |
|
|
* This class defines an interface and constructs the concrete Uploaders, |
64 |
|
|
* futhermore it handles callbacks to the outside world to notify users of done |
65 |
|
|
* upload jobs. |
66 |
|
|
* |
67 |
|
|
* Note: Users could be both the Spooler (when calling Spooler::Upload()) and |
68 |
|
|
* the IngestionPipeline (when calling Spooler::Process()). We therefore |
69 |
|
|
* cannot use the Observable template here, since this would forward |
70 |
|
|
* finished upload jobs to ALL listeners instead of only the owner of the |
71 |
|
|
* specific job. |
72 |
|
|
*/ |
73 |
|
|
class AbstractUploader |
74 |
|
|
: public PolymorphicConstruction<AbstractUploader, SpoolerDefinition> |
75 |
|
|
, public Callbackable<UploaderResults> |
76 |
|
|
{ |
77 |
|
|
friend class TaskUpload; |
78 |
|
|
|
79 |
|
|
public: |
80 |
|
|
/** |
81 |
|
|
* A read-only memory block that is supposed to be written out. |
82 |
|
|
*/ |
83 |
|
|
struct UploadBuffer { |
84 |
|
3180 |
UploadBuffer() : size(0), data(NULL) { } |
85 |
|
585016 |
UploadBuffer(uint64_t s, void *d) : size(s), data(d) { } |
86 |
|
|
uint64_t size; |
87 |
|
|
void *data; |
88 |
|
|
}; |
89 |
|
|
|
90 |
|
|
struct JobStatus { |
91 |
|
|
enum State { kOk, kTerminate, kNoJobs }; |
92 |
|
|
}; |
93 |
|
|
|
94 |
|
|
struct UploadJob { |
95 |
|
|
enum Type { Upload, Commit, Terminate }; |
96 |
|
|
|
97 |
|
|
UploadJob(UploadStreamHandle *handle, UploadBuffer buffer, |
98 |
|
|
const CallbackTN *callback = NULL); |
99 |
|
|
UploadJob(UploadStreamHandle *handle, const shash::Any &content_hash); |
100 |
|
|
|
101 |
|
341 |
UploadJob() |
102 |
|
|
: type(Terminate) |
103 |
|
|
, stream_handle(NULL) |
104 |
|
|
, tag_(0) |
105 |
|
|
, buffer() |
106 |
|
341 |
, callback(NULL) {} |
107 |
|
|
|
108 |
|
341 |
static UploadJob *CreateQuitBeacon() { return new UploadJob(); } |
109 |
|
588181 |
bool IsQuitBeacon() { return type == Terminate; } |
110 |
|
|
|
111 |
|
|
Type type; |
112 |
|
|
UploadStreamHandle *stream_handle; |
113 |
|
|
/** |
114 |
|
|
* Ensure that upload jobs belonging to the same file end up in the same |
115 |
|
|
* upload task queue. |
116 |
|
|
*/ |
117 |
|
|
int64_t tag_; |
118 |
|
9008 |
int64_t tag() { return tag_; } |
119 |
|
|
|
120 |
|
|
// type==Upload specific fields |
121 |
|
|
UploadBuffer buffer; |
122 |
|
|
const CallbackTN *callback; |
123 |
|
|
|
124 |
|
|
// type==Commit specific fields |
125 |
|
|
shash::Any content_hash; |
126 |
|
|
}; |
127 |
|
|
|
128 |
✗✓✗✗ ✗✓ |
231 |
virtual ~AbstractUploader() { assert(!tasks_upload_.is_active()); } |
129 |
|
|
|
130 |
|
|
/** |
131 |
|
|
* A string identifying the uploader type |
132 |
|
|
*/ |
133 |
|
|
virtual std::string name() const = 0; |
134 |
|
|
|
135 |
|
|
/** |
136 |
|
|
* Concrete uploaders might want to use a customized setting for multi-stream |
137 |
|
|
* writing, for instance one per disk. Note that the S3 backend uses one task |
138 |
|
|
* but this one task uses internally mutliple HTTP streams through curl async |
139 |
|
|
* I/O. |
140 |
|
|
*/ |
141 |
|
572 |
virtual unsigned GetNumTasks() const { return num_upload_tasks_; } |
142 |
|
|
|
143 |
|
|
/** |
144 |
|
|
* This is called right after the constructor of AbstractUploader or/and its |
145 |
|
|
* derived class has been executed. You can override that to do additional |
146 |
|
|
* initialization that cannot be done in the constructor itself. |
147 |
|
|
* |
148 |
|
|
* @return true on successful initialization |
149 |
|
|
*/ |
150 |
|
|
virtual bool Initialize(); |
151 |
|
|
|
152 |
|
|
/** |
153 |
|
|
* Called during Spooler::WaitForUpload(), to ensure that the upload has |
154 |
|
|
* finished. If commit == true, then a Commit request is also sent, to apply |
155 |
|
|
* all the the changes accumulated during the session. "catalog_path" |
156 |
|
|
* represents the path of the root catalog with the changes. |
157 |
|
|
* By default it is a noop and returns true; |
158 |
|
|
*/ |
159 |
|
|
virtual bool FinalizeSession(bool commit, const std::string &old_root_hash, |
160 |
|
|
const std::string &new_root_hash, |
161 |
|
|
const RepositoryTag &tag); |
162 |
|
|
|
163 |
|
|
/** |
164 |
|
|
* This must be called right before the destruction of the AbstractUploader! |
165 |
|
|
* You are _not_ supposed to overwrite this method in your concrete Uploader. |
166 |
|
|
*/ |
167 |
|
|
void TearDown(); |
168 |
|
|
|
169 |
|
|
/** |
170 |
|
|
* Uploads the file at the path local_path into the backend storage under the |
171 |
|
|
* path remote_path. When the upload has finished it calls callback. |
172 |
|
|
* Note: This method might be implemented in a synchronous way. |
173 |
|
|
* |
174 |
|
|
* @param local_path path to the file to be uploaded |
175 |
|
|
* @param remote_path desired path for the file in the backend storage |
176 |
|
|
* @param callback (optional) gets notified when the upload was finished |
177 |
|
|
*/ |
178 |
|
2580 |
void Upload( |
179 |
|
|
const std::string &local_path, |
180 |
|
|
const std::string &remote_path, |
181 |
|
|
const CallbackTN *callback = NULL) |
182 |
|
|
{ |
183 |
|
2580 |
++jobs_in_flight_; |
184 |
|
2580 |
FileUpload(local_path, remote_path, callback); |
185 |
|
2580 |
} |
186 |
|
|
|
187 |
|
|
/** |
188 |
|
|
* This method is called before the first data block of a streamed upload is |
189 |
|
|
* scheduled (see above implementation of UploadStreamHandle for details). |
190 |
|
|
* |
191 |
|
|
* @param callback (optional) this callback will be invoked once this parti- |
192 |
|
|
* cular streamed upload is committed. |
193 |
|
|
* @return a pointer to the initialized UploadStreamHandle |
194 |
|
|
*/ |
195 |
|
|
virtual UploadStreamHandle *InitStreamedUpload( |
196 |
|
|
const CallbackTN *callback = NULL) = 0; |
197 |
|
|
|
198 |
|
|
/** |
199 |
|
|
* This method schedules a buffer to be uploaded in the context of the |
200 |
|
|
* given UploadStreamHandle. The actual upload will happen asynchronously by |
201 |
|
|
* a concrete implementation of AbstractUploader |
202 |
|
|
* (see AbstractUploader::StreamedUpload()). |
203 |
|
|
* As soon has the scheduled upload job is complete (either successful or not) |
204 |
|
|
* the optionally passed callback is supposed to be invoked using |
205 |
|
|
* AbstractUploader::Respond(). |
206 |
|
|
* |
207 |
|
|
* @param handle Pointer to a previously acquired UploadStreamHandle |
208 |
|
|
* @param buffer contains the data block to be uploaded |
209 |
|
|
* @param callback (optional) callback object to be invoked once the given |
210 |
|
|
* upload is finished (see AbstractUploader::Respond()) |
211 |
|
|
*/ |
212 |
|
585016 |
void ScheduleUpload( |
213 |
|
|
UploadStreamHandle *handle, |
214 |
|
|
UploadBuffer buffer, |
215 |
|
|
const CallbackTN *callback = NULL) |
216 |
|
|
{ |
217 |
|
585016 |
++jobs_in_flight_; |
218 |
|
585016 |
tubes_upload_.Dispatch(new UploadJob(handle, buffer, callback)); |
219 |
|
585016 |
} |
220 |
|
|
|
221 |
|
|
/** |
222 |
|
|
* This method schedules a commit job as soon as all data blocks of a streamed |
223 |
|
|
* upload are (successfully) uploaded. Derived classes must override |
224 |
|
|
* AbstractUploader::FinalizeStreamedUpload() for this to happen. |
225 |
|
|
* |
226 |
|
|
* @param handle Pointer to a previously acquired UploadStreamHandle |
227 |
|
|
* @param content_hash the content hash of the full uploaded data Chunk |
228 |
|
|
*/ |
229 |
|
2824 |
void ScheduleCommit( |
230 |
|
|
UploadStreamHandle *handle, |
231 |
|
|
const shash::Any &content_hash) |
232 |
|
|
{ |
233 |
|
2824 |
++jobs_in_flight_; |
234 |
|
2824 |
tubes_upload_.Dispatch(new UploadJob(handle, content_hash)); |
235 |
|
2824 |
} |
236 |
|
|
|
237 |
|
|
/** |
238 |
|
|
* Removes a file from the backend storage. |
239 |
|
|
* |
240 |
|
|
* Note: If the file doesn't exist before calling this won't be an error. |
241 |
|
|
* |
242 |
|
|
* @param file_to_delete path to the file to be removed |
243 |
|
|
*/ |
244 |
|
220 |
void RemoveAsync(const std::string &file_to_delete) { |
245 |
|
220 |
++jobs_in_flight_; |
246 |
|
220 |
DoRemoveAsync(file_to_delete); |
247 |
|
220 |
} |
248 |
|
|
|
249 |
|
|
/** |
250 |
|
|
* Overloaded method used to remove a object based on its content hash. |
251 |
|
|
* |
252 |
|
|
* @param hash_to_delete the content hash of a file to be deleted |
253 |
|
|
*/ |
254 |
|
214 |
void RemoveAsync(const shash::Any &hash_to_delete) { |
255 |
|
214 |
RemoveAsync("data/" + hash_to_delete.MakePath()); |
256 |
|
214 |
} |
257 |
|
|
|
258 |
|
|
/** |
259 |
|
|
* Get object size based on its content hash |
260 |
|
|
* |
261 |
|
|
* @param hash the content hash of a file |
262 |
|
|
*/ |
263 |
|
|
int64_t GetObjectSize(const shash::Any &hash) { |
264 |
|
|
return DoGetObjectSize("data/" + hash.MakePath()); |
265 |
|
|
} |
266 |
|
|
|
267 |
|
|
/** |
268 |
|
|
* Checks if a file is already present in the backend storage. This might be a |
269 |
|
|
* synchronous operation. |
270 |
|
|
* |
271 |
|
|
* @param path the path of the file to be checked |
272 |
|
|
* @return true if the file was found in the backend storage |
273 |
|
|
*/ |
274 |
|
|
virtual bool Peek(const std::string &path) const = 0; |
275 |
|
|
|
276 |
|
|
/** |
277 |
|
|
* Creates a top-level shortcut to the given data object. This is particularly |
278 |
|
|
* useful for bootstrapping repositories whose data-directory is secured by |
279 |
|
|
* a VOMS certificate. |
280 |
|
|
* |
281 |
|
|
* @param object content hash of the object to be exposed on the top-level |
282 |
|
|
* @return true on success |
283 |
|
|
*/ |
284 |
|
|
virtual bool PlaceBootstrappingShortcut(const shash::Any &object) const = 0; |
285 |
|
|
|
286 |
|
|
/** |
287 |
|
|
* Waits until the current upload queue is empty. |
288 |
|
|
* |
289 |
|
|
* Note: This does NOT necessarily mean, that all files are actuall uploaded. |
290 |
|
|
* If new jobs are concurrently scheduled the behavior of this method is |
291 |
|
|
* not defined (it returns also on intermediately empty queues) |
292 |
|
|
*/ |
293 |
|
|
virtual void WaitForUpload() const; |
294 |
|
|
|
295 |
|
|
virtual unsigned int GetNumberOfErrors() const = 0; |
296 |
|
|
static void RegisterPlugins(); |
297 |
|
|
void InitCounters(perf::StatisticsTemplate *statistics); |
298 |
|
|
|
299 |
|
|
protected: |
300 |
|
|
typedef Callbackable<UploaderResults>::CallbackTN *CallbackPtr; |
301 |
|
|
|
302 |
|
|
explicit AbstractUploader(const SpoolerDefinition &spooler_definition); |
303 |
|
|
|
304 |
|
|
/** |
305 |
|
|
* Implementation of plain file upload |
306 |
|
|
* Public interface: AbstractUploader::Upload() |
307 |
|
|
* |
308 |
|
|
* @param local_path file to be uploaded |
309 |
|
|
* @param remote_path destination to be written in the backend |
310 |
|
|
* @param callback callback to be called on completion |
311 |
|
|
*/ |
312 |
|
|
virtual void FileUpload(const std::string &local_path, |
313 |
|
|
const std::string &remote_path, |
314 |
|
|
const CallbackTN *callback = NULL) = 0; |
315 |
|
|
|
316 |
|
|
/** |
317 |
|
|
* Implementation of a streamed upload step. See public interface for details. |
318 |
|
|
* Public interface: AbstractUploader::ScheduleUpload() |
319 |
|
|
* |
320 |
|
|
* @param handle decendant of UploadStreamHandle specifying the stream |
321 |
|
|
* @param buffer the CharBuffer to be uploaded to the stream |
322 |
|
|
* @param callback callback to be called on completion |
323 |
|
|
*/ |
324 |
|
|
virtual void StreamedUpload(UploadStreamHandle *handle, |
325 |
|
|
UploadBuffer buffer, |
326 |
|
|
const CallbackTN *callback) = 0; |
327 |
|
|
|
328 |
|
|
/** |
329 |
|
|
* Implemetation of streamed upload commit |
330 |
|
|
* Public interface: AbstractUploader::ScheduleUpload() |
331 |
|
|
* |
332 |
|
|
* @param handle decendant of UploadStreamHandle specifying the stream |
333 |
|
|
* @param content_hash the computed content hash of the streamed object |
334 |
|
|
*/ |
335 |
|
|
virtual void FinalizeStreamedUpload(UploadStreamHandle *handle, |
336 |
|
|
const shash::Any &content_hash) = 0; |
337 |
|
|
|
338 |
|
|
|
339 |
|
|
virtual void DoRemoveAsync(const std::string &file_to_delete) = 0; |
340 |
|
|
|
341 |
|
|
virtual int64_t DoGetObjectSize(const std::string &file_name) = 0; |
342 |
|
|
|
343 |
|
|
/** |
344 |
|
|
* This notifies the callback that is associated to a finishing job. Please |
345 |
|
|
* do not call the handed callback yourself in concrete Uploaders! |
346 |
|
|
* |
347 |
|
|
* Note: Since the job is finished after we respond to it, the callback object |
348 |
|
|
* gets automatically destroyed by this call! |
349 |
|
|
* Therefore you must not call Respond() twice or use the callback later |
350 |
|
|
* by any means! |
351 |
|
|
*/ |
352 |
|
590640 |
void Respond(const CallbackTN *callback, |
353 |
|
|
const UploaderResults &result) const |
354 |
|
|
{ |
355 |
✓✓ |
590640 |
if (callback != NULL) { |
356 |
|
590420 |
(*callback)(result); |
357 |
✓✗ |
590420 |
delete callback; |
358 |
|
|
} |
359 |
|
|
|
360 |
|
590640 |
--jobs_in_flight_; |
361 |
|
590640 |
} |
362 |
|
|
|
363 |
|
|
/** |
364 |
|
|
* Creates a temporary file in the backend storage's temporary location |
365 |
|
|
* For the LocalUploader this usually is the 'txn' directory of the backend |
366 |
|
|
* storage. Otherwise it is some scratch area. |
367 |
|
|
* |
368 |
|
|
* @param path pointer to a string that will contain the created file path |
369 |
|
|
* @return a file descriptor to the opened file |
370 |
|
|
*/ |
371 |
|
|
int CreateAndOpenTemporaryChunkFile(std::string *path) const; |
372 |
|
|
|
373 |
|
|
const SpoolerDefinition &spooler_definition() const { |
374 |
|
|
return spooler_definition_; |
375 |
|
|
} |
376 |
|
|
|
377 |
|
|
void CountUploadedBytes(int64_t bytes_written) const; |
378 |
|
|
|
379 |
|
|
void CountDuplicates() const; |
380 |
|
|
|
381 |
|
|
private: |
382 |
|
|
const SpoolerDefinition spooler_definition_; |
383 |
|
|
|
384 |
|
|
/** |
385 |
|
|
* Number of threads used for I/O write calls. Effectively this paramater |
386 |
|
|
* sets the I/O depth. Defaults to 1. |
387 |
|
|
*/ |
388 |
|
|
unsigned num_upload_tasks_; |
389 |
|
|
mutable SynchronizingCounter<int32_t> jobs_in_flight_; |
390 |
|
|
TubeGroup<UploadJob> tubes_upload_; |
391 |
|
|
TubeConsumerGroup<UploadJob> tasks_upload_; |
392 |
|
|
mutable UniquePtr<UploadCounters> counters_; |
393 |
|
|
}; // class AbstractUploader |
394 |
|
|
|
395 |
|
|
|
396 |
|
|
/** |
397 |
|
|
* The actual writing is multi-threaded. |
398 |
|
|
*/ |
399 |
✗✓ |
682 |
class TaskUpload : public TubeConsumer<AbstractUploader::UploadJob> { |
400 |
|
|
public: |
401 |
|
341 |
explicit TaskUpload( |
402 |
|
|
AbstractUploader *uploader, |
403 |
|
|
Tube<AbstractUploader::UploadJob> *tube) |
404 |
|
|
: TubeConsumer<AbstractUploader::UploadJob>(tube) |
405 |
|
341 |
, uploader_(uploader) |
406 |
|
341 |
{ } |
407 |
|
|
|
408 |
|
|
protected: |
409 |
|
|
virtual void Process(AbstractUploader::UploadJob *upload_job); |
410 |
|
|
|
411 |
|
|
private: |
412 |
|
|
AbstractUploader *uploader_; |
413 |
|
|
}; |
414 |
|
|
|
415 |
|
|
|
416 |
|
|
/** |
417 |
|
|
* Each implementation of AbstractUploader must provide its own derivate of the |
418 |
|
|
* UploadStreamHandle that is supposed to contain state information for the |
419 |
|
|
* streamed upload of one specific chunk. |
420 |
|
|
* Each UploadStreamHandle contains a callback object that is invoked as soon as |
421 |
|
|
* the streamed upload is committed. |
422 |
|
|
*/ |
423 |
|
|
struct UploadStreamHandle { |
424 |
|
|
typedef AbstractUploader::CallbackTN CallbackTN; |
425 |
|
|
static atomic_int64 g_upload_stream_tag; |
426 |
|
|
|
427 |
|
2824 |
explicit UploadStreamHandle(const CallbackTN *commit_callback) |
428 |
|
|
: commit_callback(commit_callback) |
429 |
|
2824 |
, tag(atomic_xadd64(&g_upload_stream_tag, 1)) {} |
430 |
✗✓ |
2824 |
virtual ~UploadStreamHandle() {} |
431 |
|
|
|
432 |
|
|
const CallbackTN *commit_callback; |
433 |
|
|
|
434 |
|
|
int64_t tag; |
435 |
|
|
}; |
436 |
|
|
|
437 |
|
|
} // namespace upload |
438 |
|
|
|
439 |
|
|
#endif // CVMFS_UPLOAD_FACILITY_H_ |