GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/upload_s3.h
Date: 2024-04-28 02:33:07
Exec Total Coverage
Lines: 9 10 90.0%
Branches: 3 8 37.5%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_UPLOAD_S3_H_
6 #define CVMFS_UPLOAD_S3_H_
7
8 #include <pthread.h>
9
10 #include <string>
11 #include <utility>
12 #include <vector>
13
14 #include "network/s3fanout.h"
15 #include "upload_facility.h"
16 #include "util/atomic.h"
17 #include "util/file_backed_buffer.h"
18 #include "util/pointer.h"
19 #include "util/single_copy.h"
20
21 namespace upload {
22
23 struct S3StreamHandle : public UploadStreamHandle {
24 101 S3StreamHandle(
25 const CallbackTN *commit_callback,
26 uint64_t in_memory_threshold,
27 const std::string &tmp_dir = "/tmp/")
28
1/2
✓ Branch 2 taken 101 times.
✗ Branch 3 not taken.
101 : UploadStreamHandle(commit_callback)
29 {
30
2/4
✓ Branch 1 taken 101 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 101 times.
✗ Branch 5 not taken.
101 buffer = FileBackedBuffer::Create(in_memory_threshold, tmp_dir);
31 101 }
32
33 // Ownership is later transferred to the S3 fanout
34 UniquePtr<FileBackedBuffer> buffer;
35 };
36
37 /**
38 * The S3Spooler implements the AbstractSpooler interface to push files
39 * into a S3 CVMFS repository backend.
40 * For a detailed description of the classes interface please have a look into
41 * the AbstractSpooler base class.
42 */
43 class S3Uploader : public AbstractUploader {
44 public:
45 explicit S3Uploader(const SpoolerDefinition &spooler_definition);
46 virtual ~S3Uploader();
47 static bool WillHandle(const SpoolerDefinition &spooler_definition);
48
49 virtual std::string name() const { return "S3"; }
50
51 virtual bool Create();
52
53 /**
54 * Upload() is not done concurrently in the current implementation of the
55 * S3Spooler, since it is a simple move or copy of a file without CPU
56 * intensive operation
57 * This method calls NotifyListeners and invokes a callback for all
58 * registered listeners (see the Observable template for details).
59 */
60 virtual void DoUpload(const std::string &remote_path,
61 IngestionSource *source,
62 const CallbackTN *callback);
63
64 virtual UploadStreamHandle *InitStreamedUpload(
65 const CallbackTN *callback = NULL);
66 virtual void StreamedUpload(UploadStreamHandle *handle, UploadBuffer buffer,
67 const CallbackTN *callback);
68 virtual void FinalizeStreamedUpload(UploadStreamHandle *handle,
69 const shash::Any &content_hash);
70
71 virtual void DoRemoveAsync(const std::string &file_to_delete);
72 virtual bool Peek(const std::string &path);
73 virtual bool Mkdir(const std::string &path);
74 virtual bool PlaceBootstrappingShortcut(const shash::Any &object);
75
76 virtual unsigned int GetNumberOfErrors() const;
77 int64_t DoGetObjectSize(const std::string &file_name);
78
79 // Only for testing
80 3 s3fanout::S3FanoutManager *GetS3FanoutManager() {
81 3 return s3fanout_mgr_.weak_ref();
82 }
83
84 private:
85 static const unsigned kDefaultPort = 80;
86 static const unsigned kHttpsPort = 443;
87 static const unsigned kDefaultNumParallelUploads = 16;
88 static const unsigned kDefaultNumRetries = 3;
89 static const unsigned kDefaultTimeoutSec = 60;
90 static const unsigned kDefaultBackoffInitMs = 100;
91 static const unsigned kDefaultBackoffMaxMs = 2000;
92 static const unsigned kInMemoryObjectThreshold = 500*1024; // 500KiB
93
94 // Used to make the async HTTP requests synchronous in Peek() Create(),
95 // and Upload() of single bits
96 struct RequestCtrl : SingleCopy {
97 512 RequestCtrl() : return_code(-1), callback_forward(NULL) {
98 512 pipe_wait[0] = pipe_wait[1] = -1;
99 512 }
100
101 void WaitFor();
102
103 int return_code;
104 const CallbackTN *callback_forward;
105 std::string original_path;
106 int pipe_wait[2];
107 };
108
109 void OnReqComplete(const upload::UploaderResults &results, RequestCtrl *ctrl);
110
111 static void *MainCollectResults(void *data);
112
113 bool ParseSpoolerDefinition(const SpoolerDefinition &spooler_definition);
114 void UploadJobInfo(s3fanout::JobInfo *info);
115
116 s3fanout::JobInfo *CreateJobInfo(const std::string &path) const;
117
118 UniquePtr<s3fanout::S3FanoutManager> s3fanout_mgr_;
119 std::string repository_alias_;
120 std::string host_name_port_;
121 std::string host_name_;
122 std::string region_;
123 std::string flavor_;
124 std::string bucket_;
125 bool dns_buckets_;
126 int num_parallel_uploads_;
127 unsigned num_retries_;
128 unsigned timeout_sec_;
129 std::string access_key_;
130 std::string secret_key_;
131 s3fanout::AuthzMethods authz_method_;
132 bool peek_before_put_;
133 bool use_https_;
134 std::string proxy_;
135
136 const std::string temporary_path_;
137 mutable atomic_int32 io_errors_;
138 pthread_t thread_collect_results_;
139
140 std::string x_amz_acl_;
141 }; // S3Uploader
142
143 } // namespace upload
144
145 #endif // CVMFS_UPLOAD_S3_H_
146